Quellcode durchsuchen

增量同步简单模式下的ODPS SQL脚本

晏永年 vor 5 Jahren
Ursprung
Commit
acb6ef6c41

+ 63 - 0
src/main/java/com/winhc/dataworks/flow/touch/script/ads_simple_ods.sql

@@ -0,0 +1,63 @@
+--odps sql
+--********************************************************************--
+--author:yyn
+--create time:2020-06-20 09:58:58
+--简单的维度增量数据插入、复制方案
+--${PROJECT},空间名
+--${DIM_TABLE},维度表,如:company_icp,下面会在各个层次添加前缀,如:winhc_eci_dev.inc_ads_company_icp
+--${DS},当天分区
+--${DIM_COLUMS},维度表的字段集
+--${DUPLI_COLS},维度表的去重字段集
+--********************************************************************--
+
+--1 先取得当天企业增量数据
+WITH MAPPING AS(
+    SELECT  cid,current_cid as new_cid
+    FROM    ${PROJECT}.inc_ods_company
+    WHERE   ds >= ${DS}
+    AND     cid IS NOT NULL
+    AND     current_cid IS NOT NULL
+    GROUP BY cid,current_cid
+)
+--2 将维度增量数据和企业增量数据对应的本维度存量数据合并插入ads维度增量表
+INSERT OVERWRITE TABLE ${PROJECT}.inc_ads_${DIM_TABLE} PARTITION(ds=${DS})
+SELECT  CONCAT_WS('_',new_cid,id) AS rowkey
+--        ,flag
+        ,new_cid
+        ,cid
+        ,${DIM_COLUMS}
+FROM    (
+--2.1  企业增量表里存在于维度存量和增量表里的企业,获取其new_cid及其本身字段
+            SELECT  "0" AS flag
+                    ,a.new_cid
+                    ,b.cid
+                    ,${DIM_COLUMS}
+                    ,ROW_NUMBER() OVER (PARTITION BY ${DUPLI_COLS} ORDER BY update_time DESC ) num
+            FROM    MAPPING a
+            JOIN    (
+                        SELECT  new_cid AS cid
+                                ,${DIM_COLUMS}
+                        FROM    ${PROJECT}.ads_${DIM_TABLE}
+                        WHERE   ds >= ${DS}
+                        UNION ALL
+                        SELECT  new_cid AS cid
+                                ,${DIM_COLUMS}
+                        FROM    ${PROJECT}.inc_ads_${DIM_TABLE}
+                        WHERE   ds >= ${DS}
+                    ) b
+            ON      a.cid = b.cid
+--2.2 维度增量表里的企业,有在企业增量表里存则
+            UNION ALL
+            SELECT  "1" AS flag
+                    ,coalesce(b.new_cid,a.cid) new_cid
+                    ,a.cid
+                    ,${DIM_COLUMS}
+                    ,ROW_NUMBER() OVER (PARTITION BY ${DUPLI_COLS} ORDER BY update_time DESC ) num
+            FROM    ${PROJECT}.inc_ods_${DIM_TABLE} a
+            LEFT JOIN MAPPING b
+            ON      a.cid = b.cid
+            WHERE   a.ds >= ${DS}
+            AND     a.cid IS NOT NULL
+        ) d
+WHERE   num = 1
+;