Jelajahi Sumber

feat: 股权出质加入新字段

许家凯 4 tahun lalu
induk
melakukan
5394578b8a

+ 99 - 13
src/main/scala/com/winhc/bigdata/spark/jobs/inc_company_equity_info.scala

@@ -3,7 +3,7 @@ package com.winhc.bigdata.spark.jobs
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{BaseUtil, CompanyIncSummary, LoggingUtils, MaxComputer2Phoenix, SparkUtils}
+import com.winhc.bigdata.spark.utils._
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 
@@ -40,8 +40,33 @@ object inc_company_equity_info {
 
       sql(
         s"""
+           |select  t1.id
+           |        ,t1.cid
+           |        ,t2.cname
+           |        ,t1.base
+           |        ,t1.reg_number
+           |        ,t1.pledgor
+           |        ,t1.certif_number_l
+           |        ,t1.equity_amount
+           |        ,t1.pledgee
+           |        ,t1.certif_number_r
+           |        ,t1.reg_date
+           |        ,t1.state
+           |        ,t1.pub_date
+           |        ,t1.change_situation
+           |        ,t1.cancel_date
+           |        ,t1.cancel_reason
+           |        ,t1.pledgor_type
+           |        ,t1.pledgor_id
+           |        ,t1.pledgee_type
+           |        ,t1.pledgee_id
+           |        ,t1.create_time
+           |        ,t1.update_time
+           |        ,t1.deleted
+           |        from (
            |SELECT  md5(cleanup(CONCAT_WS('',tmp.reg_number,tmp.reg_date,tmp.equity_amount))) as id
            |        ,tmp.cid
+           |        ,null as cname
            |        ,tmp.base
            |        ,tmp.reg_number
            |        ,tmp.pledgor
@@ -72,6 +97,14 @@ object inc_company_equity_info {
            |                    ) as a
            |        ) AS tmp
            |WHERE   tmp.c = 1
+           |) as t1
+           |left join
+           |(
+           |SELECT  *
+           |              FROM    winhc_eci_dev.base_company_mapping
+           |              WHERE   ds = '${getLastPartitionsOrElse("winhc_eci_dev.base_company_mapping","0")}'
+           |) as t2
+           |on t1.cid = t2.cid
            |""".stripMargin).createOrReplaceTempView("tmp_company_equity_info_all")
 
 
@@ -96,7 +129,9 @@ object inc_company_equity_info {
 
       sql(
         s"""
-           |SELECT   CONCAT_WS('_',t2.new_cid,split(t1.rowkey,'_')[1]) as rowkey,t2.new_cid as cid,${getColumns("winhc_eci_dev.ads_company_equity_info_list").diff(Seq("ds", "cid", "rowkey")).map("t1." + _).mkString(",")}
+           |SELECT   CONCAT_WS('_',t2.new_cid,split(t1.rowkey,'_')[1]) as rowkey
+           |         ,t2.new_cid as cid
+           |         ,${getColumns("winhc_eci_dev.ads_company_equity_info_list").diff(Seq("ds", "cid", "rowkey")).map("t1." + _).mkString(",")}
            |FROM    (
            |            SELECT  ${getColumns("winhc_eci_dev.ads_company_equity_info_list").diff(Seq("ds")).map("tmp." + _).mkString(",")}
            |            FROM    (
@@ -126,6 +161,20 @@ object inc_company_equity_info {
            |        ,id AS main_id
            |        ,pledgee AS cname
            |        ,2 AS type
+           |        ,cname AS target
+           |        ,target_cid
+           |        ,pledgee
+           |        ,pledgee_type
+           |        ,pledgee_id
+           |        ,certif_number_l
+           |        ,pledgor
+           |        ,pledgor_type
+           |        ,pledgor_id
+           |        ,certif_number_r
+           |        ,equity_amount
+           |        ,reg_date
+           |        ,reg_number
+           |        ,pub_date
            |        ,state
            |        ,deleted
            |FROM    tmp_company_equity_info_all
@@ -139,6 +188,20 @@ object inc_company_equity_info {
            |        ,id AS main_id
            |        ,pledgor AS cname
            |        ,1 AS type
+           |        ,cname AS target
+           |        ,target_cid
+           |        ,pledgee
+           |        ,pledgee_type
+           |        ,pledgee_id
+           |        ,certif_number_l
+           |        ,pledgor
+           |        ,pledgor_type
+           |        ,pledgor_id
+           |        ,certif_number_r
+           |        ,equity_amount
+           |        ,reg_date
+           |        ,reg_number
+           |        ,pub_date
            |        ,state
            |        ,deleted
            |FROM    tmp_company_equity_info_all
@@ -152,6 +215,20 @@ object inc_company_equity_info {
            |        ,id AS main_id
            |        ,null AS cname
            |        ,0 AS type
+           |        ,cname AS target
+           |        ,target_cid
+           |        ,pledgee
+           |        ,pledgee_type
+           |        ,pledgee_id
+           |        ,certif_number_l
+           |        ,pledgor
+           |        ,pledgor_type
+           |        ,pledgor_id
+           |        ,certif_number_r
+           |        ,equity_amount
+           |        ,reg_date
+           |        ,reg_number
+           |        ,pub_date
            |        ,state
            |        ,deleted
            |FROM    tmp_company_equity_info_all
@@ -162,6 +239,15 @@ object inc_company_equity_info {
            |select * from replace_cid_tab
            |""".stripMargin)
 
+      val outFields = getColumns("winhc_eci_dev.inc_ads_company_equity_info").map(_.toUpperCase)
+      import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
+      sql(
+        s"""
+           |SELECT  *
+           |FROM    winhc_eci_dev.inc_ads_company_equity_info
+           |WHERE   ds = '$endPart'
+           |""".stripMargin)
+        .save2HBase("COMPANY_EQUITY_INFO", "id", outFields)
       val writeCols = getColumns("winhc_eci_dev.inc_ads_company_equity_info_list").diff(Seq("ds", "rowkey"))
 
       MaxComputer2Phoenix(spark
@@ -170,18 +256,19 @@ object inc_company_equity_info {
         , "COMPANY_EQUITY_INFO_LIST"
         , endPart
         , "CONCAT_WS('_',cid,main_id)").syn()
-      import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
 
-      CompanyIncSummary(spark, project, "company_equity_info_list", "cid", Seq("cid", "main_id")).calc
+      CompanySummaryPro(s = spark
+        , project = "winhc_eci_dev"
+        , tableName = "company_equity_info_list"
+        , cidField = "split(rowkey,'_')[0]"
+        , groupByInfo = GroupByInfo(field = "type", value_alias = Seq(
+          ("0", "company_equity_info_list_0")
+          ,("1", "company_equity_info_list_1")
+          ,("2", "company_equity_info_list_2")
+        ))
+      )
+        .calc()
 
-      val outFields = getColumns("winhc_eci_dev.inc_ads_company_equity_info").map(_.toUpperCase)
-      sql(
-        s"""
-           |SELECT  *
-           |FROM    winhc_eci_dev.inc_ads_company_equity_info
-           |WHERE   ds = '$endPart'
-           |""".stripMargin)
-        .save2HBase("COMPANY_EQUITY_INFO", "id", outFields)
     }
   }
 
@@ -204,5 +291,4 @@ object inc_company_equity_info {
     IncCompanyEquityInfoUtils(spark, project, ds).calc()
     spark.stop()
   }
-
 }