许家凯 %!s(int64=3) %!d(string=hai) anos
pai
achega
57fb65e6cc

+ 27 - 3
src/main/scala/com/winhc/bigdata/spark/ng/jobs/CompanyIndexJob.scala

@@ -147,7 +147,7 @@ case class CompanyIndexJob(s: SparkSession,
          |            FROM    winhc_ng.inc_${org_prefix}_company_staff
          |            WHERE   ds > '$target_ds'
          |            UNION ALL
-         |            SELECT  DISTINCT company_id
+         |            SELECT  DISTINCT keyno as company_id
          |            FROM    winhc_ng.inc_${org_prefix}_company_tm
          |            WHERE   ds > '$target_ds'
          |            UNION ALL
@@ -200,7 +200,7 @@ case class CompanyIndexJob(s: SparkSession,
     val base_holder = get_company_holder_tab(holder_tab)
     val base_staff = get_staff_tab(staff_tab)
 
-    val base_tm = getTab(company_tm_tab, "tm_name", "company_tm")
+    val base_tm = get_company_tm_tab(company_tm_tab, "tm_name", "company_tm")
     val base_app_info = getTab(company_app_info_tab, "filter_name", "app_info")
     val base_icp = getTab(company_icp_tab, "web_name", "icp")
 
@@ -381,6 +381,20 @@ case class CompanyIndexJob(s: SparkSession,
     alias_tab
   }
 
+  private def get_company_tm_tab(tab: String, name: String, alias_name: String): String = {
+    val alias_tab = "alias_base_" + tab
+    sql(
+      s"""
+         | SELECT  company_id
+         |         ,concat_ws(',',collect_set($name)) AS $alias_name
+         | FROM    $tab
+         | where $name is not null and trim($name) <> '' and deleted = 0 and company_id is not null and length(company_id) = 32
+         | GROUP BY company_id
+         |""".stripMargin)
+      .createTempView(alias_tab)
+    alias_tab
+  }
+
   private def getTab(tab: String, name: String, alias_name: String): String = {
     val alias_tab = "alias_base_" + tab
     sql(
@@ -424,9 +438,19 @@ case class CompanyIndexJob(s: SparkSession,
 
 
   private def get_row_number(tab: String, partition_by: Seq[String], order_by: String = "ds"): String = {
+
+    def switch(f: String): String = {
+      if (tab.equals("company_tm") && f.equals("keyno"))
+        s"cast($f as string) as company_id"
+      else
+        s"cast($f as string) as $f"
+    }
+
     val org_tab = s"winhc_ng.${org_prefix}_$tab"
     val inc_org_tab = s"winhc_ng.inc_${org_prefix}_$tab"
-    val cols = getColumns(org_tab).intersect(getColumns(inc_org_tab)).map(f => s"cast($f as string) as $f").mkString(",")
+    val cols = getColumns(org_tab).intersect(getColumns(inc_org_tab))
+      .map(switch)
+      .mkString(",")
 
     val up = cols.contains("update_time") match {
       case true => " DESC,update_time"

+ 7 - 4
src/main/scala/com/winhc/bigdata/spark/ng/jobs/args_company_job.scala

@@ -23,7 +23,11 @@ object args_company_job {
     , args_company_job("company_staff", Seq("staff_name"))
     , args_company_job("company_holder", Seq("holder_name"), rowkey_udf = "concat_ws('_',company_id,md5(cleanup(concat_ws('',holder_switch_rowkey(holder_type,holder_id,holder_name)))))", where = " holder_name is not null and trim(holder_name) <> '' AND ( not (holder_type = 2 AND length(holder_id) <> 32)) ")
     , args_company_job("company_icp", Seq("liscense", "domain"))
-    , args_company_job("company_tm", Seq("reg_no"))
+    , args_company_job("company_tm", Seq("reg_no")
+      , rowkey_udf = "md5(trim(reg_no))"
+      , where = "reg_no is not null and trim(reg_no) <> '' "
+      , is_super_filter = false
+    )
 
     , args_company_job("company_court_open_announcement", Seq("case_no", "start_date")
       , rowkey_udf = "md5(cleanup(concat_ws('',case_no_trim(case_no),split_date(cast(start_date as String)) )))"
@@ -65,7 +69,7 @@ object args_company_job {
     )
     , args_company_job("company_public_announcement", Seq("bill_num")
       , rowkey_add_company_id = false
-      ,verify_company_id = false
+      , verify_company_id = false
     )
 
     , args_company_job("company_abnormal_info", Seq("company_id", "put_reason", "put_date")
@@ -87,12 +91,11 @@ object args_company_job {
     )
 
     , args_company_job("restrictions_on_exit", Seq("case_no", "limited_person", "executed_person_name", "publish_time")
-      ,rowkey_udf = "md5(cleanup(concat_ws('',case_no,limited_person,get_fixed_val(executed_person_keyno,executed_person_name),publish_time )))"
+      , rowkey_udf = "md5(cleanup(concat_ws('',case_no,limited_person,get_fixed_val(executed_person_keyno,executed_person_name),publish_time )))"
       , is_super_filter = false
     )
 
 
-
     , args_company_job("company_court_announcement", Seq("case_no", "announcement_type", "publish_date", "court_name")
       , rowkey_udf = "md5(cleanup(concat_ws('',case_no_trim(case_no),announcement_type,split_date(cast(publish_date as String)),court_name )))"
       , is_super_filter = false

+ 21 - 3
src/main/scala/com/winhc/bigdata/spark/ng/utils/PersonSummaryNg_new.scala

@@ -252,13 +252,14 @@ object PersonSummaryNg_new {
     )
   }
 
-  private val start_args = Seq(
+  val start_args = Seq(
     get_default_summary_args("company_zxr", "keyno")
     , get_default_summary_args("company_dishonest_info", "keyno")
     , get_default_summary_args("company_zxr_restrict", "pid")
-    , get_default_summary_args("company_judicial_assistance", "fz_executed_person_id")
+    , get_default_summary_args("company_judicial_assistance", "executed_person_id")
     , get_default_summary_args("restrictions_on_exit", "limited_person_pid")
     , get_default_summary_args("company_zxr_final_case", "keyno")
+    , get_default_summary_args("zxr_evaluate_results", "keyno")
 
 
     , company_summary_args(table_name = "company_court_open_announcement_explode"
@@ -319,7 +320,24 @@ object PersonSummaryNg_new {
         , ("1", s"company_court_register_del_1_defendant")
       ))
     )
-
+    , company_summary_args(table_name = "company_send_announcement_explode"
+      , companyIdField = "plaintiff_info_id_explode"
+      , distinctField = "rowkey,plaintiff_info_id_explode"
+      , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 33 "
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_send_announcement_del_0_plaintiff")
+        , ("1", s"company_send_announcement_del_1_plaintiff")
+      ))
+    )
+    , company_summary_args(table_name = "company_send_announcement_explode"
+      , companyIdField = "defendant_info_id_explode"
+      , distinctField = "rowkey,defendant_info_id_explode"
+      , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 33 "
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_send_announcement_del_0_defendant")
+        , ("1", s"company_send_announcement_del_1_defendant")
+      ))
+    )
 
 
   )

+ 18 - 6
src/main/scala/com/winhc/bigdata/spark/ng/utils/export_company_index_2_es.scala

@@ -127,10 +127,27 @@ case class export_company_index_2_es(s: SparkSession,
 
 object export_company_index_2_es {
   val as = Seq(
+
+
     export_2_es_args("company_court_open_announcement"
       , "rowkey,defendant_info,plaintiff_info,start_date,case_no,case_reason".split(","))
 
-      , export_2_es_args("company_dishonest_info"
+    , export_2_es_args("company_staff"
+      , "rowkey,company_id,staff_name,staff_type,deleted".split(","))
+
+    , export_2_es_args("company_tm"
+      , "rowkey,keyno,reg_no,int_cls,tm_name,app_date,applicant_cn,applicant_en,status,app_year,logo,deleted".split(","))
+
+    , export_2_es_args("company_icp"
+      , "rowkey,company_id,web_baxh,site_url,review_time,web_name,domain,deleted".split(","))
+
+    , export_2_es_args("company_app_info"
+      , "rowkey,company_id,name,filter_name,icon,type,classes,deleted".split(","))
+
+    , export_2_es_args("company_holder"
+      , "rowkey,company_id,holder_name,holder_id,holder_type,percent,deleted".split(","))
+
+    , export_2_es_args("company_dishonest_info"
       , "rowkey,keyno,name,case_no,court,gist_dd,reg_time,performance,pub_date,status,deleted".split(","))
 
     , export_2_es_args("company_zxr_restrict"
@@ -164,11 +181,6 @@ object export_company_index_2_es {
       , "rowkey,limited_person_pid,case_no,limited_person,executed_person_keyno,executed_person_name,executed_address,executed_amount,deleted".split(","))
 
 
-
-
-
-
-
     , export_2_es_args("company_court_announcement"
       , "rowkey,announcement_type,case_no,court_name,plaintiff_info,litigant_info,publish_date,deleted".split(","))
     , export_2_es_args("company_send_announcement"