瀏覽代碼

公司基本信息

许家凯 4 年之前
父節點
當前提交
dc96700725

+ 1 - 1
src/main/resources/env.yaml

@@ -1,5 +1,5 @@
 profile:
-  activate: dev
+  activate: prod
 
 ---
 env:

+ 21 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncCompany2Es.scala

@@ -93,11 +93,27 @@ object CompanyIncCompany2Es {
         .seq
 
       //读取数据
+      /* val df = sql(
+         s"""
+            |SELECT  ${companyCols.mkString(",")}
+            |FROM    $project.inc_ods_company
+            |WHERE   ds > $start_partition and ds <= $end_partition
+            |""".stripMargin)   */
+
+      // 去除数据本身重复
       val df = sql(
         s"""
            |SELECT  ${companyCols.mkString(",")}
-           |FROM    $project.inc_ods_company
-           |WHERE   ds > $start_partition and ds <= $end_partition
+           |FROM    (
+           |            SELECT  a.*
+           |                    ,row_number() OVER (PARTITION BY a.cid ORDER BY update_time DESC) c
+           |            FROM    (
+           |                        SELECT  *
+           |                        FROM    $project.inc_ods_company
+           |                        WHERE   ds > $start_partition and ds <= $end_partition
+           |                    ) as a
+           |        ) AS tmp
+           |WHERE   tmp.c = 1
            |""".stripMargin)
 
       df.createOrReplaceTempView("tmp_company_inc")
@@ -126,7 +142,9 @@ object CompanyIncCompany2Es {
           }
         }
         (new ImmutableBytesWritable, put)
-      }).filter(_ != null).saveAsHadoopDataset(jobConf)
+      }).filter(_ != null)
+        .saveAsNewAPIHadoopDataset(jobConf)
+      //        .saveAsHadoopDataset(jobConf)
 
       //写出到es
       import com.winhc.bigdata.spark.utils.CompanyEsUtils.getEsDoc

+ 7 - 6
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncSummary.scala

@@ -36,18 +36,19 @@ case class CompanyIncSummary(s: SparkSession,
 
     val ads_table_cols = spark.table(ads_table).columns.filter(l => {
       !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag")
-    }).toList.sorted
+    }).toList
 
     val inc_ads_table_cols = spark.table(inc_ads_table).columns.filter(l => {
       !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag")
-    }).toList.sorted
+    }).toList
 
-    val new_cols = (ads_table_cols ::: inc_ads_table_cols).distinct.sorted
+
+    val new_cols = ads_table_cols.intersect(inc_ads_table_cols)
     if (new_cols.size != inc_ads_table_cols.size || new_cols.size != ads_table_cols.size) {
-      println(ads_table_cols)
-      println(inc_ads_table_cols)
+      println(ads_table_cols.sorted)
+      println(inc_ads_table_cols.sorted)
       println("cols not equals!")
-      sys.exit(-99)
+      println(new_cols.sorted)
     }
 
     sql(