|
@@ -26,6 +26,8 @@ object inc_company_equity_info {
|
|
@(transient@getter) val spark: SparkSession = s
|
|
@(transient@getter) val spark: SparkSession = s
|
|
|
|
|
|
|
|
|
|
|
|
+ private val dedup = Seq("reg_number", "pledgor", "pledgee")
|
|
|
|
+
|
|
def calc(): Unit = {
|
|
def calc(): Unit = {
|
|
cleanup()
|
|
cleanup()
|
|
val cols = getColumns(s"winhc_eci_dev.ads_company_equity_info").diff(Seq("ds", "rowkey", "id"))
|
|
val cols = getColumns(s"winhc_eci_dev.ads_company_equity_info").diff(Seq("ds", "rowkey", "id"))
|
|
@@ -64,7 +66,7 @@ object inc_company_equity_info {
|
|
| ,t1.update_time
|
|
| ,t1.update_time
|
|
| ,t1.deleted
|
|
| ,t1.deleted
|
|
| from (
|
|
| from (
|
|
- |SELECT md5(cleanup(CONCAT_WS('',tmp.reg_number,tmp.reg_date,tmp.equity_amount))) as id
|
|
|
|
|
|
+ |SELECT md5(cleanup(CONCAT_WS('',${dedup.map("tmp." + _)}))) as id
|
|
| ,tmp.cid
|
|
| ,tmp.cid
|
|
| ,null as cname
|
|
| ,null as cname
|
|
| ,tmp.base
|
|
| ,tmp.base
|
|
@@ -89,7 +91,7 @@ object inc_company_equity_info {
|
|
| ,tmp.deleted
|
|
| ,tmp.deleted
|
|
|FROM (
|
|
|FROM (
|
|
| SELECT a.*
|
|
| SELECT a.*
|
|
- | ,row_number() OVER (PARTITION BY a.reg_number,a.cid,a.pledgor,a.pledgee ORDER BY update_time DESC) c
|
|
|
|
|
|
+ | ,row_number() OVER (PARTITION BY cleanup(CONCAT_WS('',${dedup.map("a." + _)})) ORDER BY update_time DESC) c
|
|
| FROM (
|
|
| FROM (
|
|
| SELECT *
|
|
| SELECT *
|
|
| FROM winhc_eci_dev.inc_ods_company_equity_info
|
|
| FROM winhc_eci_dev.inc_ods_company_equity_info
|
|
@@ -102,7 +104,7 @@ object inc_company_equity_info {
|
|
|(
|
|
|(
|
|
|SELECT *
|
|
|SELECT *
|
|
| FROM winhc_eci_dev.base_company_mapping
|
|
| FROM winhc_eci_dev.base_company_mapping
|
|
- | WHERE ds = '${getLastPartitionsOrElse("winhc_eci_dev.base_company_mapping","0")}'
|
|
|
|
|
|
+ | WHERE ds = '${getLastPartitionsOrElse("winhc_eci_dev.base_company_mapping", "0")}'
|
|
|) as t2
|
|
|) as t2
|
|
|on t1.cid = t2.cid
|
|
|on t1.cid = t2.cid
|
|
|""".stripMargin).createOrReplaceTempView("tmp_company_equity_info_all")
|
|
|""".stripMargin).createOrReplaceTempView("tmp_company_equity_info_all")
|
|
@@ -263,8 +265,8 @@ object inc_company_equity_info {
|
|
, cidField = "split(rowkey,'_')[0]"
|
|
, cidField = "split(rowkey,'_')[0]"
|
|
, groupByInfo = GroupByInfo(field = "type", value_alias = Seq(
|
|
, groupByInfo = GroupByInfo(field = "type", value_alias = Seq(
|
|
("0", "company_equity_info_list_0")
|
|
("0", "company_equity_info_list_0")
|
|
- ,("1", "company_equity_info_list_1")
|
|
|
|
- ,("2", "company_equity_info_list_2")
|
|
|
|
|
|
+ , ("1", "company_equity_info_list_1")
|
|
|
|
+ , ("2", "company_equity_info_list_2")
|
|
))
|
|
))
|
|
)
|
|
)
|
|
.calc()
|
|
.calc()
|