Browse Source

Merge remote-tracking branch 'origin/master'

许家凯 3 years ago
parent
commit
ba8914f501

+ 7 - 2
src/main/scala/com/winhc/bigdata/spark/ng/jobs/args_company_job.scala

@@ -175,8 +175,8 @@ object args_company_job {
       , is_super_filter = false
     )
 
-    , args_company_job("bankruptcy_open_case", Seq("applicant", "respondent", "public_date")
-      , rowkey_udf = "md5(cleanup(concat_ws('',applicant,respondent,split_date(cast(public_date as String)))))"
+    , args_company_job("bankruptcy_open_case", Seq("applicant", "respondent", "public_date", "case_no")
+      , rowkey_udf = "md5(cleanup(concat_ws('', applicant, respondent, case_no, (cast(public_date as String)))))"
       , id_user_defined_rowkey = true
 
       , is_super_filter = false
@@ -408,6 +408,11 @@ object args_company_job {
       , rowkey_udf = "concat_ws('_', main_id, md5(cleanup(concat_ws('',main_id, split_date(cast(deal_time as String))))) )"
       , is_super_filter = false
     )
+
+    , args_company_job("company_change", Seq("company_id", "change_item", "change_time")
+      , rowkey_udf = "md5(cleanup(concat_ws('', company_id, change_item, split_date(cast(change_time as String)))))"
+      , is_super_filter = false
+    )
   )
 
   def get_args_company_job(tn: String): args_company_job = {

+ 239 - 239
src/main/scala/com/winhc/bigdata/spark/ng/judicial/JudicialCaseRelationAggs.scala

@@ -188,8 +188,8 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
   val ods_judicial_case_id_mapping = s" $project.ods_judicial_case_id_mapping"
   //主表
   val ads_judicial_case_relation_r1 = s" $project.ads_judicial_case_relation_r1"
-//  //明细表
-//  val ads_judicial_case_relation_r2 = s" $project.ads_judicial_case_relation_r2"
+  //  //明细表
+  //  val ads_judicial_case_relation_r2 = s" $project.ads_judicial_case_relation_r2"
   //明细表(增强)
   val ads_judicial_case_relation_r3 = s" $project.ads_judicial_case_relation_r3"
   //案件移除表
@@ -279,195 +279,195 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
     //增量关系和节点
     sql(
       s"""
-        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_node PARTITION(ds='$calc_ds')
-        |SELECT  concat_ws('_',start_id,tn_flag(tn)) start_id
-        |        ,'CASE' LABEL
-        |FROM    (
-        |            SELECT  *
-        |                    ,ROW_NUMBER() OVER (PARTITION BY start_id,tn ORDER BY start_id DESC) num
-        |            FROM    (
-        |                        SELECT  rowkey_1 AS start_id
-        |                                ,tn_1 AS tn
-        |                        FROM    $bds_judicial_case_relation
-        |                        WHERE   ds = '$calc_ds'
-        |                        AND     rowkey_1 IS NOT NULL
-        |                        AND     tn_1 IS NOT NULL
-        |                        UNION ALL
-        |                        SELECT  rowkey_2 AS start_id
-        |                                ,tn_2 AS tn
-        |                        FROM    $bds_judicial_case_relation
-        |                        WHERE   ds = '$calc_ds'
-        |                        AND     rowkey_2 IS NOT NULL
-        |                        AND     tn_2 IS NOT NULL
-        |                    )
-        |        )
-        |WHERE   num = 1
-        |""".stripMargin)
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_node PARTITION(ds='$calc_ds')
+         |SELECT  concat_ws('_',start_id,tn_flag(tn)) start_id
+         |        ,'CASE' LABEL
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER (PARTITION BY start_id,tn ORDER BY start_id DESC) num
+         |            FROM    (
+         |                        SELECT  rowkey_1 AS start_id
+         |                                ,tn_1 AS tn
+         |                        FROM    $bds_judicial_case_relation
+         |                        WHERE   ds = '$calc_ds'
+         |                        AND     rowkey_1 IS NOT NULL
+         |                        AND     tn_1 IS NOT NULL
+         |                        UNION ALL
+         |                        SELECT  rowkey_2 AS start_id
+         |                                ,tn_2 AS tn
+         |                        FROM    $bds_judicial_case_relation
+         |                        WHERE   ds = '$calc_ds'
+         |                        AND     rowkey_2 IS NOT NULL
+         |                        AND     tn_2 IS NOT NULL
+         |                    )
+         |        )
+         |WHERE   num = 1
+         |""".stripMargin)
 
     sql(
       s"""
-        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  TABLE $ads_judicial_case_relation PARTITION (ds='$calc_ds')
-        |SELECT start_id,end_id,connect_type,'RELATION' as TYPE
-        |from (
-        |SELECT  start_id,end_id,connect_type
-        |        ,ROW_NUMBER() OVER (PARTITION BY combine_id(start_id,end_id) ORDER BY start_id DESC) num
-        |FROM    (
-        |            SELECT  concat_ws('_',rowkey_1,tn_flag(tn_1)) start_id
-        |                    ,concat_ws('_',rowkey_2,tn_flag(tn_2)) end_id
-        |                    ,connect_type
-        |            FROM    $bds_judicial_case_relation
-        |            WHERE   ds = '$calc_ds'
-        |            AND     rowkey_1 IS NOT NULL AND  rowkey_2 IS NOT NULL
-        |            AND     tn_1 IS NOT NULL AND  tn_2 IS NOT NULL
-        |        )
-        |)
-        |WHERE  num = 1
-        |""".stripMargin)
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  TABLE $ads_judicial_case_relation PARTITION (ds='$calc_ds')
+         |SELECT start_id,end_id,connect_type,'RELATION' as TYPE
+         |from (
+         |SELECT  start_id,end_id,connect_type
+         |        ,ROW_NUMBER() OVER (PARTITION BY combine_id(start_id,end_id) ORDER BY start_id DESC) num
+         |FROM    (
+         |            SELECT  concat_ws('_',rowkey_1,tn_flag(tn_1)) start_id
+         |                    ,concat_ws('_',rowkey_2,tn_flag(tn_2)) end_id
+         |                    ,connect_type
+         |            FROM    $bds_judicial_case_relation
+         |            WHERE   ds = '$calc_ds'
+         |            AND     rowkey_1 IS NOT NULL AND  rowkey_2 IS NOT NULL
+         |            AND     tn_1 IS NOT NULL AND  tn_2 IS NOT NULL
+         |        )
+         |)
+         |WHERE  num = 1
+         |""".stripMargin)
 
     //发送kafka关系和节点
     sql(
       s"""
-        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_kafka PARTITION(ds='$calc_ds',tn='$incr')
-        |SELECT  a.start_id
-        |        ,a.end_id
-        |        ,IF(b.row_id IS NULL,"400","600") topic_type
-        |        ,connect_type
-        |        ,to_json(MAP('start_id',a.start_id,'end_id',a.end_id,"topic_type",IF(b.row_id IS NULL,"400","600"),"connect_type",connect_type)) relation_json
-        |FROM    (
-        |            SELECT  start_id
-        |                    ,end_id
-        |                    ,connect_type
-        |                    ,combine_id(start_id,end_id) row_id
-        |            FROM    $ads_judicial_case_relation
-        |            WHERE   ds = '$calc_ds'
-        |        ) a
-        |LEFT JOIN (
-        |              SELECT   combine_id(start_id,end_id) row_id
-        |              FROM    $ads_judicial_case_relation
-        |              WHERE   ds < '$calc_ds'
-        |              GROUP BY combine_id(start_id,end_id)
-        |          ) b
-        |ON      a.row_id = b.row_id
-        |LEFT JOIN    (
-        |            SELECT  case_id
-        |            FROM    $ads_case_id_big
-        |            GROUP by case_id
-        |        ) c
-        |ON      a.start_id = c.case_id
-        |LEFT JOIN (
-        |              SELECT  case_id
-        |              FROM    $ads_case_id_big
-        |              GROUP by case_id
-        |          ) d
-        |ON      a.end_id = d.case_id
-        |WHERE   c.case_id IS NULL
-        |AND     d.case_id IS NULL
-        |AND     a.start_id IS NOT NULL
-        |AND     a.end_id IS NOT NULL
-        |AND     b.row_id IS NULL
-        |""".stripMargin)
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_kafka PARTITION(ds='$calc_ds',tn='$incr')
+         |SELECT  a.start_id
+         |        ,a.end_id
+         |        ,IF(b.row_id IS NULL,"400","600") topic_type
+         |        ,connect_type
+         |        ,to_json(MAP('start_id',a.start_id,'end_id',a.end_id,"topic_type",IF(b.row_id IS NULL,"400","600"),"connect_type",connect_type)) relation_json
+         |FROM    (
+         |            SELECT  start_id
+         |                    ,end_id
+         |                    ,connect_type
+         |                    ,combine_id(start_id,end_id) row_id
+         |            FROM    $ads_judicial_case_relation
+         |            WHERE   ds = '$calc_ds'
+         |        ) a
+         |LEFT JOIN (
+         |              SELECT   combine_id(start_id,end_id) row_id
+         |              FROM    $ads_judicial_case_relation
+         |              WHERE   ds < '$calc_ds'
+         |              GROUP BY combine_id(start_id,end_id)
+         |          ) b
+         |ON      a.row_id = b.row_id
+         |LEFT JOIN    (
+         |            SELECT  case_id
+         |            FROM    $ads_case_id_big
+         |            GROUP by case_id
+         |        ) c
+         |ON      a.start_id = c.case_id
+         |LEFT JOIN (
+         |              SELECT  case_id
+         |              FROM    $ads_case_id_big
+         |              GROUP by case_id
+         |          ) d
+         |ON      a.end_id = d.case_id
+         |WHERE   c.case_id IS NULL
+         |AND     d.case_id IS NULL
+         |AND     a.start_id IS NOT NULL
+         |AND     a.end_id IS NOT NULL
+         |AND     b.row_id IS NULL
+         |""".stripMargin)
 
     sql(
       s"""
-        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_kafka PARTITION(ds='$calc_ds',tn='$update')
-        |SELECT  a.start_id
-        |        ,a.end_id
-        |        ,"600" topic_type
-        |        ,connect_type
-        |        ,to_json(MAP('start_id',a.start_id,'end_id',a.end_id,"topic_type","600","connect_type",connect_type)) relation_json
-        |FROM    (
-        |            SELECT  start_id
-        |                    ,end_id
-        |                    ,connect_type
-        |                    ,combine_id(start_id,end_id) row_id
-        |            FROM    $ads_judicial_case_relation
-        |            WHERE   ds = '$calc_ds'
-        |        ) a
-        |LEFT JOIN (
-        |              SELECT  case_id
-        |              FROM    (
-        |                          SELECT  case_id
-        |                          FROM    $ads_case_id_big
-        |                          UNION ALL
-        |                          SELECT  start_id case_id
-        |                          FROM    $ads_judicial_case_relation_kafka
-        |                          WHERE   ds = '$calc_ds'
-        |                          AND     tn = '$incr'
-        |                          UNION ALL
-        |                          SELECT  end_id case_id
-        |                          FROM    $ads_judicial_case_relation_kafka
-        |                          WHERE   ds = '$calc_ds'
-        |                          AND     tn = '$incr'
-        |                      )
-        |              GROUP BY case_id
-        |          ) c
-        |ON      a.start_id = c.case_id
-        |LEFT  JOIN    (
-        |            SELECT  case_id
-        |            FROM    (
-        |                        SELECT  case_id
-        |                        FROM    $ads_case_id_big
-        |                        UNION ALL
-        |                        SELECT  start_id case_id
-        |                        FROM    $ads_judicial_case_relation_kafka
-        |                        WHERE   ds = '$calc_ds'
-        |                        AND     tn = '$incr'
-        |                        UNION ALL
-        |                        SELECT  end_id case_id
-        |                        FROM    $ads_judicial_case_relation_kafka
-        |                        WHERE   ds = '$calc_ds'
-        |                        AND     tn = '$incr'
-        |                    )
-        |            GROUP BY case_id
-        |        ) d
-        |ON      a.end_id = d.case_id
-        |WHERE   c.case_id IS NULL
-        |AND     d.case_id IS NULL
-        |AND     a.start_id IS NOT NULL
-        |AND     a.end_id IS NOT NULL
-        |""".stripMargin)
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_kafka PARTITION(ds='$calc_ds',tn='$update')
+         |SELECT  a.start_id
+         |        ,a.end_id
+         |        ,"600" topic_type
+         |        ,connect_type
+         |        ,to_json(MAP('start_id',a.start_id,'end_id',a.end_id,"topic_type","600","connect_type",connect_type)) relation_json
+         |FROM    (
+         |            SELECT  start_id
+         |                    ,end_id
+         |                    ,connect_type
+         |                    ,combine_id(start_id,end_id) row_id
+         |            FROM    $ads_judicial_case_relation
+         |            WHERE   ds = '$calc_ds'
+         |        ) a
+         |LEFT JOIN (
+         |              SELECT  case_id
+         |              FROM    (
+         |                          SELECT  case_id
+         |                          FROM    $ads_case_id_big
+         |                          UNION ALL
+         |                          SELECT  start_id case_id
+         |                          FROM    $ads_judicial_case_relation_kafka
+         |                          WHERE   ds = '$calc_ds'
+         |                          AND     tn = '$incr'
+         |                          UNION ALL
+         |                          SELECT  end_id case_id
+         |                          FROM    $ads_judicial_case_relation_kafka
+         |                          WHERE   ds = '$calc_ds'
+         |                          AND     tn = '$incr'
+         |                      )
+         |              GROUP BY case_id
+         |          ) c
+         |ON      a.start_id = c.case_id
+         |LEFT  JOIN    (
+         |            SELECT  case_id
+         |            FROM    (
+         |                        SELECT  case_id
+         |                        FROM    $ads_case_id_big
+         |                        UNION ALL
+         |                        SELECT  start_id case_id
+         |                        FROM    $ads_judicial_case_relation_kafka
+         |                        WHERE   ds = '$calc_ds'
+         |                        AND     tn = '$incr'
+         |                        UNION ALL
+         |                        SELECT  end_id case_id
+         |                        FROM    $ads_judicial_case_relation_kafka
+         |                        WHERE   ds = '$calc_ds'
+         |                        AND     tn = '$incr'
+         |                    )
+         |            GROUP BY case_id
+         |        ) d
+         |ON      a.end_id = d.case_id
+         |WHERE   c.case_id IS NULL
+         |AND     d.case_id IS NULL
+         |AND     a.start_id IS NOT NULL
+         |AND     a.end_id IS NOT NULL
+         |""".stripMargin)
 
     sql(
       s"""
-        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_node_kafka PARTITION(ds,tn)
-        |SELECT  a.start_id
-        |        ,IF(b.start_id IS NULL,"500","700") topic_type
-        |        ,to_json(MAP('start_id',a.start_id,"topic_type",IF(b.start_id IS NULL,"500","700"))) node_json
-        |        ,'$calc_ds' ds
-        |        ,IF(b.start_id IS NULL,"$incr","$update") tn
-        |FROM    (
-        |            SELECT  start_id
-        |            FROM    $ads_judicial_case_node
-        |            WHERE   ds = '$calc_ds'
-        |            GROUP by start_id
-        |        ) a
-        |LEFT JOIN (
-        |              SELECT  start_id
-        |              FROM    $ads_judicial_case_node
-        |              WHERE   ds < '$calc_ds'
-        |              GROUP BY start_id
-        |          ) b
-        |ON      a.start_id = b.start_id
-        |LEFT JOIN    (
-        |            SELECT  case_id
-        |            FROM    (
-        |                        SELECT  case_id
-        |                        FROM    $ads_case_id_big
-        |                        UNION ALL
-        |                        SELECT  start_id case_id
-        |                        FROM    $ads_judicial_case_relation_kafka
-        |                        WHERE   ds = '$calc_ds'
-        |                        UNION ALL
-        |                        SELECT  end_id case_id
-        |                        FROM    $ads_judicial_case_relation_kafka
-        |                        WHERE   ds = '$calc_ds'
-        |                    )
-        |            GROUP BY case_id
-        |        ) c
-        |ON      a.start_id = c.case_id
-        |WHERE   c.case_id IS NULL
-        |AND     a.start_id IS NOT NULL
-        |""".stripMargin)
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_node_kafka PARTITION(ds,tn)
+         |SELECT  a.start_id
+         |        ,IF(b.start_id IS NULL,"500","700") topic_type
+         |        ,to_json(MAP('start_id',a.start_id,"topic_type",IF(b.start_id IS NULL,"500","700"))) node_json
+         |        ,'$calc_ds' ds
+         |        ,IF(b.start_id IS NULL,"$incr","$update") tn
+         |FROM    (
+         |            SELECT  start_id
+         |            FROM    $ads_judicial_case_node
+         |            WHERE   ds = '$calc_ds'
+         |            GROUP by start_id
+         |        ) a
+         |LEFT JOIN (
+         |              SELECT  start_id
+         |              FROM    $ads_judicial_case_node
+         |              WHERE   ds < '$calc_ds'
+         |              GROUP BY start_id
+         |          ) b
+         |ON      a.start_id = b.start_id
+         |LEFT JOIN    (
+         |            SELECT  case_id
+         |            FROM    (
+         |                        SELECT  case_id
+         |                        FROM    $ads_case_id_big
+         |                        UNION ALL
+         |                        SELECT  start_id case_id
+         |                        FROM    $ads_judicial_case_relation_kafka
+         |                        WHERE   ds = '$calc_ds'
+         |                        UNION ALL
+         |                        SELECT  end_id case_id
+         |                        FROM    $ads_judicial_case_relation_kafka
+         |                        WHERE   ds = '$calc_ds'
+         |                    )
+         |            GROUP BY case_id
+         |        ) c
+         |ON      a.start_id = c.case_id
+         |WHERE   c.case_id IS NULL
+         |AND     a.start_id IS NOT NULL
+         |""".stripMargin)
 
     //分区不存在
     addEmptyPartitionOrSkipPlus(ads_judicial_case_node_kafka, calc_ds, incr)
@@ -580,10 +580,15 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_id PARTITION(ds='$calc_ds')
-         |SELECT  b.id
-         |        ,a.flag,a.title,a.case_type,a.case_reason,a.case_no,a.court_name,a.case_stage,a.lable,a.detail
-         |        ,a.yg_name,a.bg_name,a.all_name,a.date,a.detail_id,a.case_amt,a.case_id,a.tn,a.data
+         |SELECT  a.id
+         |        ,b.flag,b.title,b.case_type,b.case_reason,b.case_no,b.court_name,b.case_stage,b.lable,b.detail
+         |        ,b.yg_name,b.bg_name,b.all_name,b.date,b.detail_id,b.case_amt,b.case_id,b.tn,b.data
          |FROM    (
+         |           SELECT  id, concat_ws('',rowkey,tn) row_id
+         |           FROM    $ads_judicial_case_id_mapping
+         |           WHERE   ds = '$calc_ds'
+         |        ) a
+         |JOIN    (
          |            SELECT   flag
          |                    ,title
          |                    ,case_type(case_no) case_type
@@ -610,14 +615,9 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
          |                        WHERE   ds > 0 AND  case_no_trim(case_no) is not null AND  date is not null
          |                    )
          |            WHERE   num = 1
-         |        ) a
-         |JOIN    (
-         |           SELECT  id, concat_ws('',rowkey,tn) row_id
-         |           FROM    $ads_judicial_case_id_mapping
-         |           WHERE   ds = '$calc_ds'
          |        ) b
          |ON      a.row_id = b.row_id
-         |""".stripMargin)
+         |""".stripMargin).show(20, false)
 
     //明细表
     sql(
@@ -678,58 +678,58 @@ case class JudicialCaseRelationAggs(s: SparkSession, project: String, args_case:
          |""".stripMargin).show(10, false)
 
     //司法案件主表
-        sql(
-          s"""
-             |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r1 PARTITION(ds='$calc_ds')
-             |SELECT
-             |    judicase_id,
-             |    title       ,
-             |    case_type   ,
-             |    case_reason ,
-             |    case_no     ,
-             |    court_name  ,
-             |    case_stage  ,
-             |    lable       ,
-             |    name_aggs['yg_name'] yg_name,
-             |    name_aggs['bg_name'] bg_name,
-             |    all_name,
-             |    case_amt    ,
-             |    date        ,
-             |    court_level ,
-             |    0 deleted
-             |FROM
-             |(
-             |SELECT  judicase_id
-             |        ,max(title) title
-             |        ,concat_ws(',',collect_set(case_type)) case_type
-             |        ,case_reason(case_reason,date,'0') case_reason
-             |        ,concat_ws(',',collect_set(case_no)) case_no
-             |        ,trim_black(concat_ws(',',collect_set(court_name))) court_name
-             |        ,max(last_stage) case_stage
-             |        ,trim_black(concat_ws(',', collect_set(lable)) ) lable
-             |        -- ,max(case_amt) AS case_amt
-             |        ,max(first_case_amt) case_amt
-             |        -- ,cast(case_amt_plus(case_amt['case_amt'], case_amt['date'], case_amt['flag'])['case_amt'] as double) AS case_amt
-             |        ,max(date) AS date
-             |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
-             |        ,name_aggs(yg_name,bg_name,'0',date) name_aggs
-             |        -- ,all_name(concat_ws('\u0001',collect_set(all_name))) all_name
-             |        ,all_name_plus_v2(all_name) all_name
-             |        -- ,null all_name
-             |FROM    (
-             |        SELECT  a.*
-             |        FROM    (
-             |                   SELECT  judicase_id,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,all_name,date,case_amt
-             |                   ,court_level(court_name) court_level
-             |                   ,first_value(case_stage) OVER (PARTITION BY judicase_id ORDER BY date DESC ) AS last_stage
-             |                   ,first_value(case_amt['case_amt']) OVER (PARTITION BY judicase_id ORDER BY case_amt['flag'] DESC ) AS first_case_amt
-             |                   FROM    $ads_judicial_case_relation_r3
-             |                   WHERE   ds = '$calc_ds'
-             |                ) a
-             |        )
-             |GROUP BY judicase_id
-             |)x
-             |""".stripMargin).show(20, false)
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_judicial_case_relation_r1 PARTITION(ds='$calc_ds')
+         |SELECT
+         |    judicase_id,
+         |    title       ,
+         |    case_type   ,
+         |    case_reason ,
+         |    case_no     ,
+         |    court_name  ,
+         |    case_stage  ,
+         |    lable       ,
+         |    name_aggs['yg_name'] yg_name,
+         |    name_aggs['bg_name'] bg_name,
+         |    all_name,
+         |    case_amt    ,
+         |    date        ,
+         |    court_level ,
+         |    0 deleted
+         |FROM
+         |(
+         |SELECT  judicase_id
+         |        ,max(title) title
+         |        ,concat_ws(',',collect_set(case_type)) case_type
+         |        ,case_reason(case_reason,date,'0') case_reason
+         |        ,concat_ws(',',collect_set(case_no)) case_no
+         |        ,trim_black(concat_ws(',',collect_set(court_name))) court_name
+         |        ,max(last_stage) case_stage
+         |        ,trim_black(concat_ws(',', collect_set(lable)) ) lable
+         |        -- ,max(case_amt) AS case_amt
+         |        ,max(first_case_amt) case_amt
+         |        -- ,cast(case_amt_plus(case_amt['case_amt'], case_amt['date'], case_amt['flag'])['case_amt'] as double) AS case_amt
+         |        ,max(date) AS date
+         |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
+         |        ,name_aggs(yg_name,bg_name,'0',date) name_aggs
+         |        -- ,all_name(concat_ws('\u0001',collect_set(all_name))) all_name
+         |        ,all_name_plus_v2(all_name) all_name
+         |        -- ,null all_name
+         |FROM    (
+         |        SELECT  a.*
+         |        FROM    (
+         |                   SELECT  judicase_id,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,all_name,date,case_amt
+         |                   ,court_level(court_name) court_level
+         |                   ,first_value(case_stage) OVER (PARTITION BY judicase_id ORDER BY date DESC ) AS last_stage
+         |                   ,first_value(case_amt['case_amt']) OVER (PARTITION BY judicase_id ORDER BY case_amt['flag'] DESC ) AS first_case_amt
+         |                   FROM    $ads_judicial_case_relation_r3
+         |                   WHERE   ds = '$calc_ds'
+         |                ) a
+         |        )
+         |GROUP BY judicase_id
+         |)x
+         |""".stripMargin).show(20, false)
 
     //分区不存在,插入空分区
     addEmptyPartitionOrSkip(ads_judicial_case_relation_r1, calc_ds)

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/CompanySummaryNg_new.scala

@@ -59,6 +59,7 @@ object CompanySummaryNg_new {
     , get_default_summary_args("company_land_publicity", "company_id")
     , get_default_summary_args("company_bond", "company_id")
     , get_default_summary_args("company_tele_license", "company_id")
+    , get_default_summary_args("company_change", "company_id")
 
 
     , SummaryArgs(table_name = "company_court_open_announcement_explode"

+ 3 - 1
src/main/scala/com/winhc/bigdata/spark/ng/utils/export_company_index_2_es.scala

@@ -226,7 +226,7 @@ object export_company_index_2_es {
     , export_2_es_args("company_check_info"
       , "rowkey,company_id,company_name,check_org,check_type,check_date,check_result,deleted".split(","))
     , export_2_es_args("company_mortgage_info"
-      , "rowkey,company_id,company_name,reg_num,reg_date,publish_date,reg_department,type,deleted".split(","))
+      , "rowkey,company_id,company_name,reg_num,reg_date,publish_date,status,reg_department,type,amount,deleted".split(","))
     , export_2_es_args("company_double_random_check_info"
       , "rowkey,company_id,company_name,check_plan_num,check_plan_name,check_task_num,check_task_name,check_type,check_department,check_date,deleted".split(","))
 
@@ -300,6 +300,8 @@ object export_company_index_2_es {
       , "rowkey,company_id,bond_name,bond_full_name,bond_defined_code,bond_num,publisher_name,bond_type,publish_time,bond_trade_time,debt_rtng,create_time,update_time,deleted".split(","))
     , export_2_es_args("company_tele_license"
       , "rowkey,company_id,company_name,license_number,business_scope,is_available,create_time,update_time,deleted".split(","))
+    , export_2_es_args("company_change"
+      , "rowkey,company_id,company_name,category,change_item,content_before,content_after,change_time,create_time,update_time,deleted".split(","))
   )