Forráskód Böngészése

fix: 股权出质

许家凯 4 éve
szülő
commit
4ecb0e8748

+ 20 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/inc_company_equity_info.scala

@@ -26,6 +26,8 @@ object inc_company_equity_info {
     @(transient@getter) val spark: SparkSession = s
 
 
+    private val dedup = Seq("reg_number", "pledgor", "pledgee")
+
     def calc(): Unit = {
       cleanup()
       val cols = getColumns(s"winhc_eci_dev.ads_company_equity_info").diff(Seq("ds", "rowkey", "id"))
@@ -64,7 +66,7 @@ object inc_company_equity_info {
            |        ,t1.update_time
            |        ,t1.deleted
            |        from (
-           |SELECT  md5(cleanup(CONCAT_WS('',tmp.reg_number,tmp.reg_date,tmp.equity_amount))) as id
+           |SELECT  md5(cleanup(CONCAT_WS('',${dedup.map("tmp." + _)}))) as id
            |        ,tmp.cid
            |        ,null as cname
            |        ,tmp.base
@@ -89,7 +91,7 @@ object inc_company_equity_info {
            |        ,tmp.deleted
            |FROM    (
            |            SELECT  a.*
-           |                    ,row_number() OVER (PARTITION BY a.reg_number,a.cid,a.pledgor,a.pledgee ORDER BY update_time DESC) c
+           |                    ,row_number() OVER (PARTITION BY cleanup(CONCAT_WS('',${dedup.map("a." + _)})) ORDER BY update_time DESC) c
            |            FROM    (
            |                        SELECT  *
            |                        FROM    winhc_eci_dev.inc_ods_company_equity_info
@@ -102,7 +104,7 @@ object inc_company_equity_info {
            |(
            |SELECT  *
            |              FROM    winhc_eci_dev.base_company_mapping
-           |              WHERE   ds = '${getLastPartitionsOrElse("winhc_eci_dev.base_company_mapping","0")}'
+           |              WHERE   ds = '${getLastPartitionsOrElse("winhc_eci_dev.base_company_mapping", "0")}'
            |) as t2
            |on t1.cid = t2.cid
            |""".stripMargin).createOrReplaceTempView("tmp_company_equity_info_all")
@@ -263,13 +265,26 @@ object inc_company_equity_info {
         , cidField = "split(rowkey,'_')[0]"
         , groupByInfo = GroupByInfo(field = "type", value_alias = Seq(
           ("0", "company_equity_info_list_0")
-          ,("1", "company_equity_info_list_1")
-          ,("2", "company_equity_info_list_2")
+          , ("1", "company_equity_info_list_1")
+          , ("2", "company_equity_info_list_2")
         ))
       )
         .calc()
 
     }
+
+    def tmp(): Unit ={
+
+      sql(
+        s"""
+           |
+           |""".stripMargin)
+
+
+
+    }
+
+
   }