Browse Source

Merge remote-tracking branch 'origin/master'

许家凯 3 years ago
parent
commit
36f620a0d7

+ 12 - 4
src/main/scala/com/winhc/bigdata/spark/ng/jobs/args_company_job.scala

@@ -297,16 +297,24 @@ object args_company_job {
     )
 
 
-    , args_company_job("company_copyright_reg", Seq("reg_num")
-      , rowkey_udf = "md5(cleanup(concat_ws('', reg_num)))"
+
+
+    , args_company_job("trust_financing", Seq("company_id", "consignee_name_list", "notice_date", "start_date")
+      , rowkey_udf = "md5(cleanup(concat_ws('', company_id, split_date(cast(notice_date as String)), get_text_from_json(consignee_name_list, 'company_name'), split_date(cast(start_date as String)))))"
+      , is_super_filter = false
+    )
+
+
+    , args_company_job("company_copyright_reg", Seq("reg_num", "author_nationality_info")
+      , rowkey_udf = "md5(cleanup(concat_ws('', reg_num, get_text_from_json(author_nationality_info, 'name'))))"
       , is_super_filter = false
       , explode_args = Seq(
         explode_args("author_nationality_info", "$.keyno", "author_nationality_info_keyno_explode")
       )
     )
 
-    , args_company_job("company_copyright_works", Seq("reg_num")
-      , rowkey_udf = "md5(cleanup(concat_ws('', reg_num)))"
+    , args_company_job("company_copyright_works", Seq("reg_num", "author_info")
+      , rowkey_udf = "md5(cleanup(concat_ws('', reg_num, get_text_from_json(author_info, 'name'))))"
       , is_super_filter = false
       , explode_args = Seq(
         explode_args("author_info", "$.keyno", "author_info_keyno_explode")

+ 41 - 36
src/main/scala/com/winhc/bigdata/spark/ng/monitor/table/company_holder.scala

@@ -35,43 +35,48 @@ case class company_holder(is_inc: Boolean) extends AbstractDailyHandle {
     val update_time = change_extract.update_time
     val change_fields = change_extract.change_fields
 
-    //新增了股东
-    val `type1` = "10"
-    val args1 = CompanyMonitorMappings.get_args(`type1`)
-    list = list :+ CompanyMonitorRecord(
-      id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type1`)
-      , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("company_id"), name = new_data.getOrEmptyStr("company_name"), type_id = `type1`))
-      , dimension_type = args1.dimension_type
-      , flow_type = args1.flow_type
-      , rta_desc = ""
-      , change_time = CompanyMonitorUtils.formatDate(biz_date)
-      , biz_id = rowkey
-      , info_risk_level = args1.info_risk_level
-      , `type` = `type1`
-      , create_time = CompanyMonitorUtils.formatDate(update_time)
-      , amt = get_money(new_data)
-      , update_type = update_type
-      , deleted = get_deleted(new_data)
-    )
+    if (new_data != null) {
+      if (change_fields == null || change_fields.contains("holder_id")) {
+        //新增了股东
+        val `type1` = "10"
+        val args1 = CompanyMonitorMappings.get_args(`type1`)
+        list = list :+ CompanyMonitorRecord(
+          id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type1`)
+          , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("company_id"), name = new_data.getOrEmptyStr("company_name"), type_id = `type1`))
+          , dimension_type = args1.dimension_type
+          , flow_type = args1.flow_type
+          , rta_desc = ""
+          , change_time = CompanyMonitorUtils.formatDate(biz_date)
+          , biz_id = rowkey
+          , info_risk_level = args1.info_risk_level
+          , `type` = `type1`
+          , create_time = CompanyMonitorUtils.formatDate(update_time)
+          , amt = get_money(new_data)
+          , update_type = update_type
+          , deleted = get_deleted(new_data)
+        )
+
+        //新增对外投资
+        val `type2` = "11"
+        val args2 = CompanyMonitorMappings.get_args(`type2`)
+        list = list :+ CompanyMonitorRecord(
+          id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type2`)
+          , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("holder_id"), name = new_data.getOrEmptyStr("holder_name"), type_id = `type2`))
+          , dimension_type = args2.dimension_type
+          , flow_type = args2.flow_type
+          , rta_desc = ""
+          , change_time = CompanyMonitorUtils.formatDate(biz_date)
+          , biz_id = rowkey
+          , info_risk_level = args2.info_risk_level
+          , `type` = `type2`
+          , create_time = CompanyMonitorUtils.formatDate(update_time)
+          , amt = get_money(new_data)
+          , update_type = update_type
+          , deleted = get_deleted(new_data)
+        )
+      }
+    }
 
-    //新增对外投资
-    val `type2` = "11"
-    val args2 = CompanyMonitorMappings.get_args(`type2`)
-    list = list :+ CompanyMonitorRecord(
-      id = CompanyMonitorUtils.generateId(rowkey, biz_date, tn, `type2`)
-      , entity_info = Seq(EntityInfo(keyno = new_data.getOrEmptyStr("holder_id"), name = new_data.getOrEmptyStr("holder_name"), type_id = `type2`))
-      , dimension_type = args2.dimension_type
-      , flow_type = args2.flow_type
-      , rta_desc = ""
-      , change_time = CompanyMonitorUtils.formatDate(biz_date)
-      , biz_id = rowkey
-      , info_risk_level = args2.info_risk_level
-      , `type` = `type2`
-      , create_time = CompanyMonitorUtils.formatDate(update_time)
-      , amt = get_money(new_data)
-      , update_type = update_type
-      , deleted = get_deleted(new_data)
-    )
 
     //投资的公司,所占股份下降
     if (new_data != null && old_data != null) {