瀏覽代碼

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

晏永年 4 年之前
父節點
當前提交
6e334a0361

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCid.scala

@@ -35,6 +35,8 @@ object CompanyForCid {
 //  winhc_eci_dev ods_company_dishonest_info new_cid,case_no
 
 //  winhc_eci_dev wenshu_detail_combine new_cid,case_no,cname,name_type
+
+//  winhc_eci_dev company_tax new_cid,year
   def main(args: Array[String]): Unit = {
     val Array(space, sourceTable, cols) = args
 

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncrForCid.scala

@@ -24,6 +24,7 @@ object CompanyIncrForCid {
   // winhc_eci_dev company_illegal_info new_cid,put_reason,put_date,put_department
   //  winhc_eci_dev company_finance new_cid,round,money
   // winhc_eci_dev company_dishonest_info new_cid,case_no
+  //winhc_eci_dev company_tax new_cid,year
   def main(args: Array[String]): Unit = {
     val Array(project, tableName, dupliCols) = args
     println(

+ 127 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/company_account_with_bank.scala

@@ -0,0 +1,127 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2020/11/9 16:52
+ */
+case class company_account_with_bank(s: SparkSession,
+                                     project: String
+                                    ) extends LoggingUtils with Logging with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+
+  def calc(): Unit = {
+    val inc_ads_tab = s"$project.inc_ads_company_account_with_bank"
+    val inc_ods_tab = s"$project.inc_ods_company_account_with_bank"
+    var inc_ads_last_ds = getLastPartitionsOrElse(inc_ads_tab, "0")
+    val inc_ods_last_ds = getLastPartitionsOrElse(inc_ods_tab, null)
+    if (inc_ods_last_ds == null) {
+      println(s"$inc_ods_tab is empty!!!")
+      return
+    }
+    if (inc_ads_last_ds.equals(inc_ods_last_ds)) {
+      println("rerun...")
+      inc_ads_last_ds = getSecondLastPartitionOrElse(inc_ads_tab, "0")
+    }
+    val all_cols = getColumns(inc_ads_tab)
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_tab PARTITION(ds='$inc_ods_last_ds')
+         |SELECT  ${all_cols.diff(Seq("ds")).mkString(",")}
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+         |            FROM    (
+         |                        SELECT  t2.current_cid AS rowkey
+         |                                ,t2.current_cid as cid
+         |                                ,t1.name
+         |                                ,t1.credit_code
+         |                                ,t1.address
+         |                                ,t1.phone
+         |                                ,t1.bank
+         |                                ,t1.bank_account
+         |                                ,t1.create_time
+         |                                ,t1.update_time
+         |                                ,t1.deleted
+         |                                ,t1.ds
+         |                        FROM    (
+         |                                    SELECT  *
+         |                                    FROM    $inc_ads_tab
+         |                                    WHERE   ds > 0
+         |                                ) AS t1
+         |                        JOIN    (
+         |                                    SELECT  *
+         |                                    FROM    winhc_eci_dev.inc_ods_company
+         |                                    WHERE   ds > '$inc_ads_last_ds'
+         |                                    AND     current_cid IS NOT NULL
+         |                                ) AS t2
+         |                        ON      t1.cid = t2.cid
+         |                        UNION ALL
+         |                        SELECT  cid AS rowkey
+         |                                ,cid
+         |                                ,name
+         |                                ,credit_code
+         |                                ,address
+         |                                ,phone
+         |                                ,bank
+         |                                ,bank_account
+         |                                ,create_time
+         |                                ,update_time
+         |                                ,deleted
+         |                                ,ds
+         |                        FROM    $inc_ods_tab
+         |                        WHERE   ds > '$inc_ads_last_ds'
+         |                    ) AS t3
+         |        ) AS t4
+         |WHERE   t4.num = 1
+         |AND     t4.cid is not null
+         |""".stripMargin)
+    import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
+
+    sql(
+      s"""
+         |SELECT  *
+         |FROM    $inc_ads_tab
+         |WHERE   ds = '$inc_ods_last_ds'
+         |AND     rowkey is not null
+         |AND     cid is not null
+         |""".stripMargin)
+      .save2HBase("COMPANY_ACCOUNT_WITH_BANK", "rowkey", all_cols.diff(Seq("rowkey", "ds")))
+
+    sql(
+      s"""
+         |SELECT  DISTINCT rowkey as cid
+         |        ,1 AS company_account_with_bank
+         |FROM    $inc_ads_tab
+         |WHERE   ds = '$inc_ods_last_ds'
+         |AND     cid is not null
+         |""".stripMargin)
+      .save2HBase("COMPANY_SUMMARY", "cid", Seq("company_account_with_bank"))
+  }
+}
+
+
+object company_account_with_bank {
+
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
+    company_account_with_bank(s = spark, project = project).calc()
+    spark.stop()
+  }
+}

+ 10 - 11
src/main/scala/com/winhc/bigdata/spark/jobs/company_judicial_assistance.scala

@@ -3,7 +3,7 @@ package com.winhc.bigdata.spark.jobs
 import com.winhc.bigdata.spark.config.PhoenixConfig
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{BaseUtil, CompanyCidAndNameUtils, CompanySummaryPro, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.functions.col
@@ -206,8 +206,10 @@ case class company_judicial_assistance(s: SparkSession,
   def calc(): Unit = {
     val ods_tab = s"$project.ods_company_judicial_assistance"
     val inc_ods_tab = s"$project.inc_ods_company_judicial_assistance"
+
     val inc_ads_tab = s"$project.inc_ads_company_judicial_assistance"
     val ads_tab = s"$project.ads_company_judicial_assistance"
+
     val ads_list_tab = s"$project.ads_company_judicial_assistance_list"
     val inc_ads_list_tab = s"$project.inc_ads_company_judicial_assistance_list"
 
@@ -225,6 +227,7 @@ case class company_judicial_assistance(s: SparkSession,
     val inc_ads_list_last_ds = getLastPartitionsOrElse(inc_ads_list_tab, "0")
 
     val list_tab_row_num = "cleanup(concat_ws('',rowkey,cid,flag,execute_notice_num))"
+    val main_tab_row_num = "cleanup(CONCAT_ws('',cid,executed_person,execute_notice_num,executive_court,type_state))"
 
 
     def all(): Unit = {
@@ -249,9 +252,9 @@ case class company_judicial_assistance(s: SparkSession,
            |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $ads_tab PARTITION(ds='$ods_last_ds')
            |SELECT  ${ads_cols.diff(Seq("ds")).mkString(",")}
            |FROM    (
-           |            SELECT  MD5(cleanup(CONCAT_ws('',name,executed_person,execute_notice_num))) AS rowkey
+           |            SELECT  MD5($main_tab_row_num) AS rowkey
            |                    ,*
-           |                    ,ROW_NUMBER() OVER(PARTITION BY cleanup(CONCAT_ws('',name,executed_person)) ORDER BY ds DESC ) AS num
+           |                    ,ROW_NUMBER() OVER(PARTITION BY $main_tab_row_num ORDER BY ds DESC ) AS num
            |            FROM    $new_tab
            |        ) AS t1
            |WHERE   t1.num = 1
@@ -352,9 +355,9 @@ case class company_judicial_assistance(s: SparkSession,
            |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $inc_ads_tab PARTITION(ds='$inc_ods_last_ds')
            |SELECT  ${ads_cols.diff(Seq("ds")).mkString(",")}
            |FROM    (
-           |            SELECT  MD5(cleanup(CONCAT_ws('',name,executed_person,execute_notice_num))) AS rowkey
+           |            SELECT  MD5($main_tab_row_num) AS rowkey
            |                    ,*
-           |                    ,ROW_NUMBER() OVER(PARTITION BY cleanup(CONCAT_ws('',name,executed_person)) ORDER BY ds DESC ) AS num
+           |                    ,ROW_NUMBER() OVER(PARTITION BY $main_tab_row_num ORDER BY ds DESC ) AS num
            |            FROM    $new_tab
            |        ) AS t1
            |WHERE   t1.num = 1
@@ -504,12 +507,8 @@ case class company_judicial_assistance(s: SparkSession,
         .jdbc(PhoenixConfig.getPhoenixJDBCUrl, "COMPANY_JUDICIAL_ASSISTANCE_LIST", PhoenixConfig.getPhoenixProperties)
       //        .save2PhoenixByJDBC("COMPANY_JUDICIAL_ASSISTANCE_LIST")
 
-      CompanySummaryPro(s = spark
-        , project = "winhc_eci_dev"
-        , tableName = "company_judicial_assistance_list"
-        , cidField = "split(rowkey,'_')[0]"
-        , where = "deleted = 0"
-      ).calc()
+      CompanySummaryPro.run(spark, "company_judicial_assistance_list")
+
     }
 
 

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/deadbeat/deadbeat_info.scala

@@ -363,7 +363,7 @@ case class deadbeat_info(s: SparkSession,
          |SELECT  *
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey,tn ORDER BY ds DESC ) AS num
          |            FROM    (
          |                        SELECT  *
          |                        FROM    winhc_eci_dev.ads_deadbeat_company

+ 5 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -30,7 +30,7 @@ object CompanyDynamic {
                                 project: String, //表所在工程名
                                 ds: String //此维度主键
 
-                               ) extends LoggingUtils with Logging with BaseFunc{
+                               ) extends LoggingUtils with Logging with BaseFunc {
     @(transient@getter) val spark: SparkSession = s
 
 
@@ -141,12 +141,14 @@ object CompanyDynamic {
         val biz_date = r.getAs[String]("biz_date")
         val fields = r.getAs[String]("fields")
         val cname = r.getAs[String]("cname")
+        if (biz_date == null)
+          None
         val result = handle.handle(rowkey, biz_date, cid, if (fields == null) null else fields.split(","), old_data, new_data, cname)
         if (result == null) {
           None
         }
         else {
-          result.map(res => Row(CompanyDynamicHandleUtils.getDynamicId(res._1, res._4, res._7, res._6), res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, res._9, res._10, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
+          result.map(res => Row(CompanyDynamicHandleUtils.getDynamicId(res._1, res._4, res._7, res._6), res._1, res._2, res._3, res._4.replaceAll("null", ""), res._5, res._6, res._7, res._8, res._9, res._10, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
         }
       })
 
@@ -167,7 +169,7 @@ object CompanyDynamic {
       spark.createDataFrame(rdd, schema)
         .createOrReplaceTempView(s"company_dynamic_tmp$tableName")
 
-//      val cols = getColumns(s"$project.$targetTab").filter(!_.equals("ds")).filter(!_.equals("tn"))
+      //      val cols = getColumns(s"$project.$targetTab").filter(!_.equals("ds")).filter(!_.equals("tn"))
       unescapeHtml4()
       sql(
         s"""

+ 22 - 12
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -1,11 +1,13 @@
 package com.winhc.bigdata.spark.jobs.dynamic
 
+import org.apache.spark.internal.Logging
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/7/27 17:05
  * @Description:
  */
-trait CompanyDynamicHandle {
+trait CompanyDynamicHandle extends Logging {
 
   //废弃
   private val table_2_sub_info_type_map = Map(
@@ -201,17 +203,25 @@ trait CompanyDynamicHandle {
     if (rta_desc == null) {
       return Seq.empty
     }
-    Seq((cid
-      , cname
-      , get_info_type()
-      , rta_desc
-      , get_change_content(old_map, new_map)
-      , get_change_time(bizDate, new_map)
-      , get_biz_id(rowkey, new_map)
-      , get_sub_info_type()
-      , get_info_risk_level(old_map, new_map)
-      , if (suggestion == null) null else suggestion
-    ))
+    try {
+      Seq((cid
+        , cname
+        , get_info_type()
+        , rta_desc
+        , get_change_content(old_map, new_map)
+        , get_change_time(bizDate, new_map)
+        , get_biz_id(rowkey, new_map)
+        , get_sub_info_type()
+        , get_info_risk_level(old_map, new_map)
+        , if (suggestion == null) null else suggestion
+      ))
+    } catch {
+      case e: Exception => {
+        logError(e.getMessage, e)
+      }
+        Seq.empty
+    }
+
   }
 
   /**

+ 7 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/inc_company_equity_info.scala

@@ -26,6 +26,8 @@ object inc_company_equity_info {
     @(transient@getter) val spark: SparkSession = s
 
 
+    private val dedup = Seq("reg_number", "pledgor", "pledgee")
+
     def calc(): Unit = {
       cleanup()
       val cols = getColumns(s"winhc_eci_dev.ads_company_equity_info").diff(Seq("ds", "rowkey", "id"))
@@ -64,7 +66,7 @@ object inc_company_equity_info {
            |        ,t1.update_time
            |        ,t1.deleted
            |        from (
-           |SELECT  md5(cleanup(CONCAT_WS('',tmp.reg_number,tmp.reg_date,tmp.equity_amount))) as id
+           |SELECT  md5(cleanup(CONCAT_WS('',${dedup.map("tmp." + _)}))) as id
            |        ,tmp.cid
            |        ,null as cname
            |        ,tmp.base
@@ -89,7 +91,7 @@ object inc_company_equity_info {
            |        ,tmp.deleted
            |FROM    (
            |            SELECT  a.*
-           |                    ,row_number() OVER (PARTITION BY a.reg_number,a.cid,a.pledgor,a.pledgee ORDER BY update_time DESC) c
+           |                    ,row_number() OVER (PARTITION BY cleanup(CONCAT_WS('',${dedup.map("a." + _)})) ORDER BY update_time DESC) c
            |            FROM    (
            |                        SELECT  *
            |                        FROM    winhc_eci_dev.inc_ods_company_equity_info
@@ -102,7 +104,7 @@ object inc_company_equity_info {
            |(
            |SELECT  *
            |              FROM    winhc_eci_dev.base_company_mapping
-           |              WHERE   ds = '${getLastPartitionsOrElse("winhc_eci_dev.base_company_mapping","0")}'
+           |              WHERE   ds = '${getLastPartitionsOrElse("winhc_eci_dev.base_company_mapping", "0")}'
            |) as t2
            |on t1.cid = t2.cid
            |""".stripMargin).createOrReplaceTempView("tmp_company_equity_info_all")
@@ -263,8 +265,8 @@ object inc_company_equity_info {
         , cidField = "split(rowkey,'_')[0]"
         , groupByInfo = GroupByInfo(field = "type", value_alias = Seq(
           ("0", "company_equity_info_list_0")
-          ,("1", "company_equity_info_list_1")
-          ,("2", "company_equity_info_list_2")
+          , ("1", "company_equity_info_list_1")
+          , ("2", "company_equity_info_list_2")
         ))
       )
         .calc()

+ 7 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre10.scala

@@ -94,7 +94,7 @@ case class JudicialCaseRelationPre10(s: SparkSession, project: String
          |                WHERE   length(case_no) >0 AND ds> '0'
          |                UNION ALL
          |                SELECT
-         |                        "7" AS flag
+         |                        "-1" AS flag
          |                        ,concat_ws('',cname,'被执行人') AS title
          |                        ,concat_ws('',case_type(case_no)) AS case_type
          |                        ,NULL AS case_reason
@@ -110,7 +110,7 @@ case class JudicialCaseRelationPre10(s: SparkSession, project: String
          |                WHERE   length(gist_id) >0 AND ds> '0'
          |                UNION ALL
          |                SELECT
-         |                        "7" AS flag
+         |                        "-1" AS flag
          |                        ,concat_ws('',cname,'被执行人') AS title
          |                        ,concat_ws('',case_type(case_no)) AS case_type
          |                        ,NULL AS case_reason
@@ -164,7 +164,7 @@ case class JudicialCaseRelationPre10(s: SparkSession, project: String
          |from (
          |      select
          |      COALESCE(C.judicase_id,md5(cleanup(A.case_no))) as judicase_id
-         |      ,"10" as flag
+         |      ,flag
          |      ,concat_ws('',A.cname,'被执行人') AS title
          |      ,concat_ws('',case_type(A.case_no)) as case_type
          |      ,NULL AS case_reason
@@ -178,19 +178,19 @@ case class JudicialCaseRelationPre10(s: SparkSession, project: String
          |      ,exec_money as case_amt
          |      ,row_number() over(partition by A.rowkey,A.case_no order by update_time desc) num
          |      from (
-         |        select case_no,court,cname,case_create_time,rowkey,update_time,card,exec_money
+         |        select case_no,court,cname,case_create_time,rowkey,update_time,card,exec_money,"10" as flag
          |        from $project.ads_company_zxr_person
          |        where length(case_no) > 0 and ds>'0'
          |        union all
-         |        select case_no,court,cname,case_create_time,rowkey,update_time,card,exec_money
+         |        select case_no,court,cname,case_create_time,rowkey,update_time,card,exec_money,"10" as flag
          |        from $project.inc_ads_company_zxr_person
          |        where length(case_no) > 0 and ds>'0'
          |        union all
-         |        select gist_id as case_no,court,cname,case_create_time,rowkey,update_time,card,exec_money
+         |        select gist_id as case_no,court,cname,case_create_time,rowkey,update_time,card,exec_money,"-1" as flag
          |        from $project.ads_company_zxr_person
          |        where length(gist_id) > 0 and ds>'0'
          |        union all
-         |        select gist_id as case_no,court,cname,case_create_time,rowkey,update_time,card,exec_money
+         |        select gist_id as case_no,court,cname,case_create_time,rowkey,update_time,card,exec_money,"-1" as flag
          |        from $project.inc_ads_company_zxr_person
          |        where length(gist_id) > 0 and ds>'0'
          |      ) A

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPre39.scala

@@ -116,7 +116,7 @@ case class JudicialCaseRelationPre39(s: SparkSession,
         s"""
            |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.ads_judicial_case_relation_pre PARTITION(ds='$inc_last_ds',tn='$table_name')
            |SELECT  judicase_id
-           |        ,flag
+           |        ,"-1" as flag
            |        ,title
            |        ,case_type
            |        ,case_reason

+ 104 - 87
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPreNew.scala

@@ -222,8 +222,8 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |            ,concat_ws('',case_stage(case_no)) as case_stage
          |            ,replace_char(plaintiff) as yg_name
          |            ,replace_char(defendant) as bg_name
+         |            ,start_date as date
          |            ,md5(cleanup(CONCAT_WS('',case_no,start_date))) as detail_id
-         |            ,rowkey as detail_id
          |            ,0.0 as case_amt
          |      from $project.ads_company_court_open_announcement
          |      where length(case_no) > 0 and ds > '0'
@@ -303,15 +303,15 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     spark.udf.register("name_aggs", new NameAggs(1000))
     spark.udf.register("case_reason", new CaseReasonAggs(1000))
     //预处理数据
-    val cols = Seq("flag", "date", "detail_id")
-
+    val cols = Seq("flag", "date", "detail_id","name")
     val t1 = s"$project.inc_ads_company_court_announcement"
     val t2 = s"ads_judicial_case_relation_pre"
     var t2_ds = ds
     var t1_ds = ds
     if (StringUtils.isBlank(ds)) {
       t2_ds = BaseUtil.getPartion(t2, "wenshu", spark)
-      t1_ds = BaseUtil.getPartion(t1, spark)
+      //t1_ds = BaseUtil.getPartion(t1, spark)
+      t1_ds = t2_ds
     }
 
     val t3 = "ads_judicial_case_relation_replace" //司法案件id交换表
@@ -325,52 +325,42 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.$t3 partition (ds = '$t1_ds')
-         |SELECT  COALESCE(b.judicase_id,a.new_judicase_id) judicase_id
-         |        ,a.flag
-         |        ,a.title
-         |        ,a.case_type
-         |        ,a.case_reason
-         |        ,case_no_trim(a.case_no) as case_no
-         |        ,a.court_name
-         |        ,a.case_stage
-         |        ,case_label(a.flag) lable
-         |        ,map_2_json(${getStrToMap(cols)}) as detail
-         |        ,a.yg_name
-         |        ,a.bg_name
-         |        ,a.date
-         |        ,a.detail_id
-         |        ,a.case_amt
-         |FROM    (
-         |  select
-         |     judicase_id
-         |     ,flag
-         |     ,title
-         |     ,case_type
-         |     ,case_reason
-         |     ,case_no_trim(case_no) as case_no
-         |     ,court_name
-         |     ,case_stage
-         |     ,replace_char(yg_name) as yg_name
-         |     ,replace_char(bg_name) as bg_name
-         |     ,date
-         |     ,detail_id
-         |     ,case_amt
-         |     ,md5(CLEANUP(case_no_trim(case_no))) as new_judicase_id
-         |  from $project.$t2
-         |  where ds= '$t2_ds' and tn not in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
-         |        and case_no_trim(case_no) is not null
-         |        and date is not null and length(date) = 19
-         |) a
-         |LEFT JOIN (
-         |  select case_no_trim(case_no) as case_no,max(judicase_id) judicase_id
-         |  from $project.$t2
-         |  where ds = '$t2_ds' and tn in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
-         |  and case_no_trim(case_no) is not null
-         |  group by case_no
-         |) b
-         |ON  CLEANUP(a.case_no) = CLEANUP(b.case_no)
-         |union all
-         |SELECT   judicase_id
+         |SELECT
+         |   judicase_id
+         |   ,flag
+         |   ,title
+         |   ,case_type
+         |   ,case_reason
+         |   ,case_no
+         |   ,court_name
+         |   ,case_stage
+         |   ,lable
+         |   ,map_2_json(${getStrToMap(cols)}) as detail
+         |   ,yg_name
+         |   ,bg_name
+         |   ,date
+         |   ,detail_id
+         |   ,case_amt
+         |FROM
+         |(
+         |   SELECT  COALESCE(b.judicase_id,a.new_judicase_id) judicase_id
+         |           ,a.flag
+         |           ,a.title
+         |           ,a.case_type
+         |           ,a.case_reason
+         |           ,case_no_trim(a.case_no) as case_no
+         |           ,a.court_name
+         |           ,a.case_stage
+         |           ,case_label(a.flag) lable
+         |           ,a.yg_name
+         |           ,a.bg_name
+         |           ,a.date
+         |           ,a.detail_id
+         |           ,a.case_amt
+         |           ,a.bg_name as name
+         |   FROM    (
+         |     SELECT
+         |        judicase_id
          |        ,flag
          |        ,title
          |        ,case_type
@@ -378,17 +368,46 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        ,case_no_trim(case_no) as case_no
          |        ,court_name
          |        ,case_stage
-         |        ,case_label(flag) lable
-         |        ,map_2_json(${getStrToMap(cols)}) as detail
          |        ,replace_char(yg_name) as yg_name
          |        ,replace_char(bg_name) as bg_name
          |        ,date
          |        ,detail_id
          |        ,case_amt
-         |from $project.$t2
-         |where ds = '$t2_ds' and tn in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
-         |      and case_no_trim(case_no) is not null
-         |      and date is not null and length(date) = 19
+         |        ,md5(CLEANUP(case_no_trim(case_no))) as new_judicase_id
+         |     FROM $project.$t2
+         |     WHERE ds= '$t2_ds' and tn not in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
+         |           and case_no_trim(case_no) is not null
+         |           and date is not null and length(date) = 19
+         |   ) a
+         |   LEFT JOIN (
+         |     SELECT case_no_trim(case_no) as case_no,max(judicase_id) judicase_id
+         |     FROM $project.$t2
+         |     WHERE ds = '$t2_ds' and tn in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
+         |     and case_no_trim(case_no) is not null
+         |     GROUP BY case_no
+         |   ) b
+         |   ON  CLEANUP(a.case_no) = CLEANUP(b.case_no)
+         |   UNION ALL
+         |   SELECT   judicase_id
+         |           ,flag
+         |           ,title
+         |           ,case_type
+         |           ,case_reason
+         |           ,case_no_trim(case_no) as case_no
+         |           ,court_name
+         |           ,case_stage
+         |           ,case_label(flag) lable
+         |           ,replace_char(yg_name) as yg_name
+         |           ,replace_char(bg_name) as bg_name
+         |           ,date
+         |           ,detail_id
+         |           ,case_amt
+         |           ,replace_char(bg_name) as name
+         |   FROM $project.$t2
+         |   WHERE ds = '$t2_ds' and tn in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
+         |         and case_no_trim(case_no) is not null
+         |         and date is not null and length(date) = 19
+         |)
          |""".stripMargin).show(10, false)
 
     //name 替换 cid
@@ -487,9 +506,9 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     //司法案件主表
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.ads_judicial_case_relation_r1
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci.ads_judicial_case_relation_r1
          |SELECT
-         |    judicase_id ,
+         |    x.judicase_id ,
          |    title       ,
          |    case_type   ,
          |    case_reason ,
@@ -503,50 +522,49 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |    case_amt    ,
          |    date        ,
          |    court_level ,
-         |    deleted     ,
+         |    y.deleted     ,
          |    cids
          |FROM
          |(
          |SELECT  judicase_id
-         |        ,max(first_title) title
-         |        ,max(case_type) case_type
+         |        ,max(title) title
+         |        ,concat_ws(',',collect_set(case_type)) case_type
          |        ,case_reason(case_reason,date,flag) case_reason
          |        ,concat_ws(',',collect_set(case_no)) case_no
          |        ,concat_ws(',',collect_set(court_name)) court_name
          |        ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
-         |        ,concat_ws(',',max(case_type),collect_set(lable)) lable
-         |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
+         |        ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
+         |        ,null as detail
          |        ,max(case_amt) AS case_amt
          |        ,max(date) AS date
          |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
-         |        ,max(deleted) deleted
          |        ,concat_ws(',',collect_set(cids)) cids
          |        ,name_aggs(yg_name,bg_name,flag,date) name_aggs
          |FROM    (
          |        SELECT  a.*
-         |                ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
-         |                ,b.deleted
          |        FROM    (
-         |                   SELECT  *,court_level(court_name) court_level
+         |                   SELECT  judicase_id,flag,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,date,case_amt,cids
+         |                   ,court_level(court_name) court_level
          |                   FROM    $project.$t6
          |                   WHERE   ds >= '$second_ds'
-         |                ) a JOIN
-         |                (
-         |                   select *
-         |                   from $project.$t4
-         |                ) b on a.judicase_id = b.judicase_id
+         |                ) a
          |        )
          |GROUP BY judicase_id
-         |)
+         |)x
+         |JOIN
+         |(
+         |   select *
+         |   from $project.$t4
+         |) y on x.judicase_id = y.judicase_id
          |""".stripMargin).show(20, false)
 
     //明细表
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.ads_judicial_case_relation_r2
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci.ads_judicial_case_relation_r2
          |SELECT
          |    id    ,
-         |    judicase_id ,
+         |    x.judicase_id ,
          |    title       ,
          |    case_type   ,
          |    case_reason ,
@@ -558,40 +576,39 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |    name_aggs['yg_name'] yg_name,
          |    name_aggs['bg_name'] bg_name,
          |    last_date   ,
-         |    deleted
+         |    y.deleted
          |FROM
          |(
          |SELECT  md5(concat_ws('',judicase_id,CLEANUP(case_no))) id
          |        ,judicase_id
          |        ,max(first_title) title
-         |        ,max(case_type) case_type
+         |        ,case_type(max(case_no)) as case_type
          |        ,case_reason(case_reason,date,flag) case_reason
          |        ,case_no
          |        ,max(court_name) court_name
          |        ,case_stage(max(case_no)) as case_stage
-         |        ,concat_ws(',',max(case_type),collect_set(lable)) lable
+         |        ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
          |        ,concat('[',concat_ws(',',collect_set(detail)),']') detail
          |        ,max(last_date) last_date
-         |        ,max(deleted) deleted
          |        ,name_aggs(yg_name,bg_name,flag,date) name_aggs
          |FROM    (
          |        SELECT  a.*
          |                ,first_value(title) OVER (PARTITION BY a.judicase_id ORDER BY date ASC ) AS first_title
          |                ,first_value(date) OVER (PARTITION BY a.judicase_id ORDER BY date DESC ) AS last_date
-         |                ,b.deleted
          |        FROM    (
          |                   SELECT  *
          |                   FROM    $project.$t6
-         |                   WHERE   ds >= '$second_ds'
-         |                )a JOIN
-         |                (
-         |                   select *
-         |                   from $project.$t4
-         |                )b on a.judicase_id = b.judicase_id
+         |                   WHERE   ds >= '$second_ds' AND length(lable) > 0
+         |                )a
          |)
          |GROUP BY judicase_id
          |         ,case_no
-         |)
+         |) x
+         |JOIN
+         |(
+         |   select *
+         |   from $project.$t4
+         |) y on x.judicase_id = y.judicase_id
          |""".stripMargin).show(10, false)
 
   }

+ 72 - 58
src/main/scala/com/winhc/bigdata/spark/jobs/reduction_registered_capital_info.scala

@@ -17,54 +17,54 @@ case class reduction_registered_capital_info(s: SparkSession,
                                              project: String //表所在工程名
                                             ) extends LoggingUtils with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
-  private val target_tab = s"$project.ads_reduction_registered_capital_info"
-  private val inc_target_tab = s"$project.inc_ads_reduction_registered_capital_info"
 
-  def init(): Unit = {
-    if (!spark.catalog.tableExists(target_tab)) {
-      sql(
-        s"""
-           |CREATE TABLE IF NOT EXISTS $target_tab (
-           |  `rowkey` STRING COMMENT 'FIELD',
-           |  `new_cid` STRING COMMENT 'FIELD',
-           |  `category` STRING,
-           |  `change_item` STRING,
-           |  `content_before` STRING,
-           |  `content_after` STRING,
-           |  `change_time` DATETIME,
-           |  `create_time` DATETIME,
-           |  `update_time` DATETIME,
-           |  `deleted` BIGINT)
-           | COMMENT 'TABLE COMMENT'
-           |PARTITIONED BY (
-           |  `ds` STRING COMMENT '分区')
-           |""".stripMargin)
-    }
-
-    if (!spark.catalog.tableExists(inc_target_tab)) {
-      sql(
-        s"""
-           |CREATE TABLE IF NOT EXISTS $inc_target_tab (
-           |  `rowkey` STRING COMMENT 'FIELD',
-           |  `new_cid` STRING COMMENT 'FIELD',
-           |  `category` STRING,
-           |  `change_item` STRING,
-           |  `content_before` STRING,
-           |  `content_after` STRING,
-           |  `change_time` DATETIME,
-           |  `create_time` DATETIME,
-           |  `update_time` DATETIME,
-           |  `deleted` BIGINT)
-           | COMMENT 'TABLE COMMENT'
-           |PARTITIONED BY (
-           |  `ds` STRING COMMENT '分区')
-           |""".stripMargin)
+  def calc(tn: String, symbol: String): Unit = {
+    val target_tab = s"$project.ads_$tn"
+    val inc_target_tab = s"$project.inc_ads_$tn"
+    val target_cols = getColumns(target_tab)
+
+    def init(): Unit = {
+      if (!spark.catalog.tableExists(target_tab)) {
+        sql(
+          s"""
+             |CREATE TABLE IF NOT EXISTS $target_tab (
+             |  `rowkey` STRING COMMENT 'FIELD',
+             |  `new_cid` STRING COMMENT 'FIELD',
+             |  `category` STRING,
+             |  `change_item` STRING,
+             |  `content_before` STRING,
+             |  `content_after` STRING,
+             |  `change_time` DATETIME,
+             |  `create_time` DATETIME,
+             |  `update_time` DATETIME,
+             |  `deleted` BIGINT)
+             | COMMENT 'TABLE COMMENT'
+             |PARTITIONED BY (
+             |  `ds` STRING COMMENT '分区')
+             |""".stripMargin)
+      }
+
+      if (!spark.catalog.tableExists(inc_target_tab)) {
+        sql(
+          s"""
+             |CREATE TABLE IF NOT EXISTS $inc_target_tab (
+             |  `rowkey` STRING COMMENT 'FIELD',
+             |  `new_cid` STRING COMMENT 'FIELD',
+             |  `category` STRING,
+             |  `change_item` STRING,
+             |  `content_before` STRING,
+             |  `content_after` STRING,
+             |  `change_time` DATETIME,
+             |  `create_time` DATETIME,
+             |  `update_time` DATETIME,
+             |  `deleted` BIGINT)
+             | COMMENT 'TABLE COMMENT'
+             |PARTITIONED BY (
+             |  `ds` STRING COMMENT '分区')
+             |""".stripMargin)
+      }
     }
-  }
 
-  private val target_cols = getColumns(target_tab)
-
-  def calc(): Unit = {
     spark.udf.register("registered_capital_trim", RegisteredCapitalUtil.registered_capital_trim _)
     val ads_ds = getLastPartitionsOrElse(target_tab, null)
 
@@ -80,28 +80,32 @@ case class reduction_registered_capital_info(s: SparkSession,
            |AND     category LIKE '%注册资本%'
            |AND     registered_capital_trim(content_before) IS NOT NULL
            |AND     registered_capital_trim(content_after) IS NOT NULL
-           |AND     CAST(registered_capital_trim(content_before) AS DOUBLE ) > CAST( registered_capital_trim(content_after) AS DOUBLE )
+           |AND     CAST(registered_capital_trim(content_before) AS DOUBLE ) $symbol CAST( registered_capital_trim(content_after) AS DOUBLE )
            |""".stripMargin)
     }
 
     def inc(): Unit = {
-      val inc_ads_tab = s"$project.inc_ads_company_change"
-      val inc_ads_last_ds = getLastPartitionsOrElse(inc_ads_tab, "0")
-
+      val inc_org_tab = s"$project.inc_ads_company_change"
+      val inc_org_last_ds = getLastPartitionsOrElse(inc_org_tab, "0")
+      var inc_target_tab_ds = getLastPartitionsOrElse(inc_target_tab, "0")
+      if (inc_target_tab_ds.equals(inc_org_last_ds)) {
+        println("rerun...")
+        inc_target_tab_ds = getSecondLastPartitionOrElse(inc_target_tab, "0")
+      }
       sql(
         s"""
-           |INSERT OVERWRITE TABLE $inc_target_tab PARTITION(ds='$inc_ads_last_ds')
+           |INSERT OVERWRITE TABLE $inc_target_tab PARTITION(ds='$inc_org_last_ds')
            |SELECT  ${target_cols.diff(Seq("ds")).mkString(",")}
            |FROM    (
            |        SELECT  * ,ROW_NUMBER()OVER (PARTITION BY rowkey ORDER BY ds DESC ) AS num
            |        FROM    (
            |                SELECT  *
-           |                FROM    $inc_ads_tab
-           |                WHERE   ds > $ads_ds
+           |                FROM    $inc_org_tab
+           |                WHERE   ds > $inc_target_tab_ds
            |                AND     category LIKE '%注册资本%'
            |                AND     registered_capital_trim(content_before) IS NOT NULL
            |                AND     registered_capital_trim(content_after) IS NOT NULL
-           |                AND     CAST (registered_capital_trim(content_before) AS DOUBLE )> CAST (registered_capital_trim(content_after) AS DOUBLE )
+           |                AND     CAST (registered_capital_trim(content_before) AS DOUBLE ) $symbol CAST (registered_capital_trim(content_after) AS DOUBLE )
            |                ) AS t1
            |        ) AS t2
            |WHERE   t2.num =1
@@ -110,14 +114,14 @@ case class reduction_registered_capital_info(s: SparkSession,
 
       MaxComputer2Phoenix(spark
         , target_cols.diff(Seq("ds"))
-        , inc_ads_tab
-        , "REDUCTION_REGISTERED_CAPITAL_INFO"
-        , inc_ads_last_ds
+        , inc_target_tab
+        , tn.toUpperCase
+        , inc_org_last_ds
         , "rowkey").syn()
 
       CompanySummaryPro(s = spark
         , project = "winhc_eci_dev"
-        , tableName = "reduction_registered_capital_info"
+        , tableName = tn
         , cidField = "split(rowkey,'_')[0]"
       )
         .calc()
@@ -132,6 +136,15 @@ case class reduction_registered_capital_info(s: SparkSession,
       inc()
     }
   }
+
+  def additionalShareAndReduction(): Unit = {
+    for (t <- Seq(
+      ("reduction_registered_capital_info", ">")
+      , ("increase_registered_capital_info", "<")
+    )) {
+      calc(t._1, t._2)
+    }
+  }
 }
 
 object reduction_registered_capital_info {
@@ -144,7 +157,8 @@ object reduction_registered_capital_info {
     )
 
     val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
-    reduction_registered_capital_info(s = spark, project = "winhc_eci_dev").calc()
+    reduction_registered_capital_info(s = spark, project = "winhc_eci_dev")
+      .additionalShareAndReduction()
 
     spark.stop()
   }

+ 10 - 8
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -184,6 +184,7 @@ object BaseUtil {
     }
     return hash
   }
+
   def MD5hash(content: String): String = {
     val md5 = MessageDigest.getInstance("MD5")
     val encoded = md5.digest(cleanup(content).getBytes)
@@ -222,21 +223,22 @@ object BaseUtil {
 
   def caseStage(caseNo: String): String = {
     if (StringUtils.isNotBlank(caseNo)) {
-      if (StrUtil.containsAny(caseNo, "破申", "商申")) {
-        return "其它"
+      val casePre = caseType(caseNo).replaceAll("案件", "")
+      if (StrUtil.containsAny(caseNo, "破申", "商申") || casePre.contains("其它")) {
+        return "其它阶段"
       }
       if (StrUtil.containsAny(caseNo, "监", "抗", "再", "申", "提", "再")) {
-        return "再审"
+        return s"${casePre}再审"
       }
       if (StrUtil.containsAny(caseNo, "初")) {
-        return "一审"
+        return s"${casePre}一审"
       } else if (StrUtil.containsAny(caseNo, "终")) {
-        return "二审"
+        return s"${casePre}二审"
       } else if (StrUtil.containsAny(caseNo, "执")) {
-        return "执行"
+        return "首次执行"
       }
     }
-    "其它"
+    "其它阶段"
   }
 
   def lastStage(s: String): String = {
@@ -398,7 +400,7 @@ object BaseUtil {
     //    println(case_no_trim("(2015)怀执字第03601号号"))
     //    val seq = Seq("1", "3", "2", "7").mkString("\001")
     //    println(sortString(seq))
-    println(case_no_trim("(2019)鄂执7号"))
+    println(caseStage("(2019)鄂初7号"))
   }
 
 }

+ 10 - 4
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncSummary.scala

@@ -31,7 +31,8 @@ case class CompanyIncSummary(s: SparkSession,
   )
 
   def calc(): Unit = {
-    tableName match {
+    CompanySummaryPro.run(spark, tableName)
+   /* tableName match {
       case "company_dishonest_info" => {
         CompanySummaryPro(s = spark
           , project = "winhc_eci_dev"
@@ -74,7 +75,7 @@ case class CompanyIncSummary(s: SparkSession,
           , project = "winhc_eci_dev"
           , tableName = "company_land_mortgage"
           , cidField = "split(rowkey,'_')[0]"
-//          , where = "deleted = 0"
+          //          , where = "deleted = 0"
           , groupByInfo = GroupByInfo(field = "type", value_alias = Seq(
             ("mortgagor", "company_land_mortgage_mortgagor")
             , ("mortgagee", "company_land_mortgage_mortgagee")
@@ -103,9 +104,14 @@ case class CompanyIncSummary(s: SparkSession,
       }
 
       case _ => {
-        my_calc()
+        CompanySummaryPro(s = spark
+          , project = "winhc_eci_dev"
+          , tableName = tableName
+          , cidField = "split(rowkey,'_')[0]"
+        ).calc()
+        //        my_calc()
       }
-    }
+    }*/
   }
 
   def my_calc(): Unit = {

+ 35 - 7
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrCombineUtils.scala

@@ -12,13 +12,32 @@ import scala.collection.mutable
  */
 object CompanyIncrCombineUtils {
   def main(args: Array[String]): Unit = {
-    val Array(project, source, target) = args
+    var project = ""
+    var source = ""
+    var target = ""
+    var flag = "0"
+    if (args.length == 4) {
+      val Array(project1, source1, target1, flag1) = args
+      project = project1
+      source = source1
+      target = target1
+      flag = flag1
+    } else if (args.length == 3) {
+      val Array(project1, source1, target1) = args
+      project = project1
+      source = source1
+      target = target1
+    } else {
+      println("please set project, source, target, flag")
+      sys.exit(-1)
+    }
 
     println(
       s"""
          |project:$project
          |source:$source
          |target:$target
+         |flag:$flag
          |""".stripMargin)
 
     val config = mutable.Map(
@@ -26,17 +45,26 @@ object CompanyIncrCombineUtils {
       "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
     val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
-    CompanyIncrCombineUtils(spark, source, target).calc()
+    CompanyIncrCombineUtils(spark, source, target, flag).calc()
     spark.stop()
   }
 }
 
-case class CompanyIncrCombineUtils(s: SparkSession, source: String, target: String) extends LoggingUtils {
+case class CompanyIncrCombineUtils(s: SparkSession, source: String, target: String, flag: String = "0" //0=>插入目标表 1=>插入源表分区
+                                  ) extends LoggingUtils {
   override protected val spark: SparkSession = s
 
   def calc(): Unit = {
 
-    val ds2 = BaseUtil.getPartion(s"$target", spark) //目标表数据
+    var ds2 = ""
+    if (flag.equals("1")) {
+      ds2 = BaseUtil.getPartion(s"$source", spark) //源表分区
+    } else {
+      ds2 = BaseUtil.getPartion(s"$target", spark) //目标表数据
+    }
+    if (StringUtils.isBlank(ds2)) {
+      ds2 = BaseUtil.getYesterday()
+    }
 
     val cols: Seq[String] = spark.table(target).schema.map(_.name).filter(s => {
       !s.equals("ds")
@@ -48,14 +76,14 @@ case class CompanyIncrCombineUtils(s: SparkSession, source: String, target: Stri
          |select max(ds) max_ds from $target where id = -1 and ds > '0'
          |""".stripMargin).collect().toList.map(_.getAs[String]("max_ds"))
 
-    println(s"list: $list")
+    println(s"list: $list" + s", ds: $ds2")
 
     sql(
       s"""
-         |INSERT into table $target PARTITION(ds=$ds2)
+         |INSERT ${if (flag.equals("0")) "INTO" else "OVERWRITE"} table $target PARTITION(ds=$ds2)
          |SELECT ${cols.mkString(",")} from
          |$source
          |where ds > '${if (StringUtils.isNotBlank(list.head)) s"${list.head}" else s"0"}'
-         |""".stripMargin)
+         |""".stripMargin).show(100)
   }
 }

+ 101 - 13
src/main/scala/com/winhc/bigdata/spark/utils/CompanySummaryPro.scala

@@ -67,7 +67,7 @@ case class CompanySummaryPro(s: SparkSession,
       case true => s""
       case false => s"AND   $where"
     }
-    val tmp_tab = "all_data_summary_tmp"
+    val tmp_tab = "inc_tmp_view"
 
     is_inc match {
       case true => {
@@ -235,7 +235,105 @@ case class CompanySummaryPro(s: SparkSession,
 }
 
 object CompanySummaryPro {
+
+
+  def run(spark: SparkSession, tab: String, target_tab: String = null): Unit = {
+    var csp: CompanySummaryPro = null
+
+    tab match {
+      case "company_dishonest_info" => {
+        csp = CompanySummaryPro(s = spark
+          , project = "winhc_eci_dev"
+          , tableName = "company_dishonest_info"
+          , cidField = "split(rowkey,'_')[0]"
+          , where = "deleted = 0"
+          , groupByInfo = GroupByInfo(field = "status", value_alias = Seq(
+            ("0", "company_dishonest_info_0")
+            , ("1", "company_dishonest_info_1")
+          ))
+        )
+      }
+      case "company_zxr_list" => {
+        csp = CompanySummaryPro(s = spark
+          , project = "winhc_eci_dev"
+          , tableName = "company_zxr_list"
+          , cidField = "split(rowkey,'_')[0]"
+          , where = "deleted = 0"
+          , groupByInfo = GroupByInfo(field = "status", value_alias = Seq(
+            ("0", "company_zxr_list_0")
+            , ("1", "company_zxr_list_1")
+          ))
+        )
+      }
+      case "company_zxr_restrict" => {
+        csp = CompanySummaryPro(s = spark
+          , project = "winhc_eci_dev"
+          , tableName = "company_zxr_restrict"
+          , cidField = "split(rowkey,'_')[0]"
+          , where = "deleted = 0"
+          , groupByInfo = GroupByInfo(field = "status", value_alias = Seq(
+            ("0", "company_zxr_restrict_0")
+            , ("1", "company_zxr_restrict_1")
+          ))
+        )
+      }
+      case "company_land_mortgage" => {
+        //土地抵押
+        csp = CompanySummaryPro(s = spark
+          , project = "winhc_eci_dev"
+          , tableName = "company_land_mortgage"
+          , cidField = "split(rowkey,'_')[0]"
+          //          , where = "deleted = 0"
+          , groupByInfo = GroupByInfo(field = "type", value_alias = Seq(
+            ("mortgagor", "company_land_mortgage_mortgagor")
+            , ("mortgagee", "company_land_mortgage_mortgagee")
+            , ("bothsame", "company_land_mortgage_bothsame")
+            , ("bothone", "company_land_mortgage_bothone")
+            , ("bothtwo", "company_land_mortgage_bothtwo")
+          ))
+        )
+      }
+
+      case "company_land_transfer" => {
+        //土地转让
+        csp = CompanySummaryPro(s = spark
+          , project = "winhc_eci_dev"
+          , tableName = "company_land_transfer"
+          , cidField = "split(rowkey,'_')[0]"
+          //          , where = "deleted = 0"
+          , groupByInfo = GroupByInfo(field = "type", value_alias = Seq(
+            ("pre", "company_land_transfer_pre")
+            , ("now", "company_land_transfer_now")
+            , ("bothsame", "company_land_transfer_bothsame")
+            , ("bothone", "company_land_transfer_bothone")
+            , ("bothtwo", "company_land_transfer_bothtwo")
+          ))
+        )
+      }
+      case _ =>
+        csp = CompanySummaryPro(s = spark
+          , project = "winhc_eci_dev"
+          , tableName = tab
+          , cidField = "split(rowkey,'_')[0]"
+        )
+    }
+
+    if (target_tab == null)
+      csp.calc()
+    else
+      csp.calc(is_inc = false, target_tab = target_tab)
+  }
+
+
   def main(args: Array[String]): Unit = {
+
+    val Array(tab) = args
+
+    println(
+      s"""
+         |tab: $tab
+         |""".stripMargin)
+
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
       "spark.debug.maxToStringFields" -> "200",
@@ -243,18 +341,8 @@ object CompanySummaryPro {
     )
 
     val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
-    CompanySummaryPro(s = spark
-      , project = "winhc_eci_dev"
-      , tableName = "company_equity_info_list"
-      , cidField = "split(rowkey,'_')[0]"
-      , groupByInfo = GroupByInfo(field = "type", value_alias = Seq(
-        ("0", "company_equity_info_list_0")
-        , ("1", "company_equity_info_list_1")
-        , ("2", "company_equity_info_list_2")
-      ))
-      , where = "deleted = 0"
-    )
-      .calc(is_inc = false, target_tab = "winhc_eci_dev.xjk_tmp_summary")
+
+    run(spark, tab, "winhc_eci_dev.xjk_tmp_summary")
 
     spark.stop()
   }