Explorar o código

fix: 主要成员增量

- 增量去重
- 加入批量写入phoenix
- 成员职位聚合函数加入默认值‘未公开’
许家凯 %!s(int64=4) %!d(string=hai) anos
pai
achega
e69a151f52

+ 116 - 57
src/main/scala/com/winhc/bigdata/spark/jobs/company_staff.scala

@@ -2,7 +2,7 @@ package com.winhc.bigdata.spark.jobs
 
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyStaffAggs}
-import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{LoggingUtils, MaxComputer2Phoenix, SparkUtils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 
@@ -16,7 +16,6 @@ import scala.collection.mutable
  */
 object company_staff {
 
-
   case class CompanyStaffUtil(s: SparkSession,
                               project: String
                              ) extends LoggingUtils with Logging with BaseFunc {
@@ -26,43 +25,43 @@ object company_staff {
     def init(): Unit = {
       cleanup()
       spark.udf.register("agg_val", new CompanyStaffAggs())
-      sql(
-        """
-          |CREATE TABLE IF NOT EXISTS winhc_eci_dev.ads_company_staff
-          |(
-          |    rowkey STRING COMMENT 'FIELD'
-          |    ,new_cid STRING COMMENT 'FIELD'
-          |    ,id BIGINT COMMENT ''
-          |    ,cid BIGINT COMMENT ''
-          |    ,hid BIGINT COMMENT ''
-          |    ,staff_type STRING COMMENT ''
-          |    ,create_time DATETIME COMMENT ''
-          |    ,update_time DATETIME COMMENT ''
-          |    ,deleted BIGINT COMMENT ''
-          |)
-          |COMMENT 'TABLE COMMENT'
-          |PARTITIONED BY
-          |(
-          |    ds STRING COMMENT '分区'
-          |)
-          |""".stripMargin)
+      /* sql(
+         """
+           |CREATE TABLE IF NOT EXISTS winhc_eci_dev.ads_company_staff
+           |(
+           |    rowkey STRING COMMENT 'FIELD'
+           |    ,new_cid STRING COMMENT 'FIELD'
+           |    ,id BIGINT COMMENT ''
+           |    ,cid BIGINT COMMENT ''
+           |    ,hid BIGINT COMMENT ''
+           |    ,staff_type STRING COMMENT ''
+           |    ,create_time DATETIME COMMENT ''
+           |    ,update_time DATETIME COMMENT ''
+           |    ,deleted BIGINT COMMENT ''
+           |)
+           |COMMENT 'TABLE COMMENT'
+           |PARTITIONED BY
+           |(
+           |    ds STRING COMMENT '分区'
+           |)
+           |""".stripMargin)*/
 
-      sql(
-        s"""
-           |CREATE TABLE IF NOT EXISTS `winhc_eci_dev`.`inc_ads_company_staff` (
-           |  `rowkey` STRING COMMENT 'FIELD',
-           |  `new_cid` STRING COMMENT 'FIELD',
-           |  `id` BIGINT,
-           |  `cid` BIGINT,
-           |  `hid` BIGINT,
-           |  `staff_type` STRING,
-           |  `create_time` DATETIME,
-           |  `update_time` DATETIME,
-           |  `deleted` BIGINT)
-           | COMMENT 'TABLE COMMENT'
-           |PARTITIONED BY (
-           |  `ds` STRING COMMENT '分区')
-           |""".stripMargin)
+      /* sql(
+         s"""
+            |CREATE TABLE IF NOT EXISTS `winhc_eci_dev`.`inc_ads_company_staff` (
+            |  `rowkey` STRING COMMENT 'FIELD',
+            |  `new_cid` STRING COMMENT 'FIELD',
+            |  `id` BIGINT,
+            |  `cid` BIGINT,
+            |  `hid` BIGINT,
+            |  `staff_type` STRING,
+            |  `create_time` DATETIME,
+            |  `update_time` DATETIME,
+            |  `deleted` BIGINT)
+            | COMMENT 'TABLE COMMENT'
+            |PARTITIONED BY (
+            |  `ds` STRING COMMENT '分区')
+            |""".stripMargin)*/
     }
 
 
@@ -205,6 +204,7 @@ object company_staff {
            |                                    SELECT  *
            |                                    FROM    winhc_eci_dev.inc_ods_company_staff
            |                                    WHERE   ds = '$ds'
+           |                                    AND     cid is not null
            |                                ) AS ta1
            |                        LEFT JOIN mapping AS ta2
            |                        ON      ta1.cid = ta2.cid
@@ -217,27 +217,82 @@ object company_staff {
       sql(
         s"""
            |INSERT OVERWRITE TABLE winhc_eci_dev.inc_ads_company_staff PARTITION(ds=$ds)
-           |SELECT  t2.rowkey
-           |        ,t2.new_cid
-           |        ,t2.id
-           |        ,t2.cid
-           |        ,t2.hid
-           |        ,t3.types AS staff_type
-           |        ,t2.create_time
-           |        ,t2.update_time
-           |        ,t2.deleted
-           |FROM    inc_ads_tmp AS t2
-           |LEFT JOIN (
-           |              SELECT  t1.new_cid
-           |                      ,t1.hid
-           |                      ,agg_val(t1.staff_type) AS types
-           |              FROM    inc_ads_tmp AS t1
-           |              GROUP BY t1.new_cid
-           |                       ,t1.hid
-           |          ) AS t3
-           |ON      CONCAT_WS('',t2.hid,t2.new_cid) = CONCAT_WS('',t3.hid,t3.new_cid)
+           |SELECT  t5.rowkey
+           |        ,t5.new_cid
+           |        ,t5.id
+           |        ,t5.cid
+           |        ,t5.hid
+           |        ,t5.staff_type
+           |        ,t5.create_time
+           |        ,t5.update_time
+           |        ,t5.deleted
+           |FROM    (
+           |            SELECT  t4.*
+           |                    ,ROW_NUMBER() OVER (PARTITION BY t4.rowkey ORDER BY t4.rowkey DESC ) num
+           |            FROM    (
+           |                        SELECT  t2.rowkey
+           |                                ,t2.new_cid
+           |                                ,t2.id
+           |                                ,t2.cid
+           |                                ,t2.hid
+           |                                ,t3.types AS staff_type
+           |                                ,t2.create_time
+           |                                ,t2.update_time
+           |                                ,t2.deleted
+           |                        FROM    inc_ads_tmp AS t2
+           |                        LEFT JOIN (
+           |                                      SELECT  t1.new_cid
+           |                                              ,t1.hid
+           |                                              ,agg_val(t1.staff_type) AS types
+           |                                      FROM    inc_ads_tmp AS t1
+           |                                      GROUP BY t1.new_cid
+           |                                               ,t1.hid
+           |                                  ) AS t3
+           |                        ON      CONCAT_WS('',t2.hid,t2.new_cid) = CONCAT_WS('',t3.hid,t3.new_cid)
+           |                    ) AS t4
+           |        ) AS t5
+           |WHERE   t5.num = 1
            |""".stripMargin)
 
+
+      MaxComputer2Phoenix(spark, Seq("new_cid"
+        , "hid"
+        , "staff_type"
+        , "create_time"
+        , "update_time"
+        , "deleted"), "winhc_eci_dev.ads_company_staff", "COMPANY_STAFF", ds, "rowkey").syn()
+    }
+
+    def inc_bulk_save(startDs: String): Unit = {
+      import com.winhc.bigdata.spark.implicits.PhoenixHelper._
+      sql(
+        s"""
+           |SELECT  tmp.rowkey
+           |        ,tmp.new_cid as cid
+           |        ,tmp.hid
+           |        ,tmp.staff_type
+           |        ,tmp.create_time
+           |        ,tmp.update_time
+           |        ,tmp.deleted
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS c
+           |            FROM    (
+           |                        SELECT  *
+           |                        FROM    winhc_eci_dev.inc_ads_company_staff
+           |                        WHERE   ds >= $startDs
+           |                        AND     new_cid is not null
+           |                    )
+           |        ) AS tmp
+           |WHERE   tmp.c = 1
+           |""".stripMargin).repartition(400)
+        .save2PhoenixByJDBC(s"COMPANY_STAFF", Seq("rowkey"
+          , "cid"
+          , "hid"
+          , "staff_type"
+          , "create_time"
+          , "update_time"
+          , "deleted"))
     }
   }
 
@@ -252,12 +307,16 @@ object company_staff {
 
     val e = CompanyStaffUtil(spark, project)
     e.init()
+
+//    e.inc_bulk_save("20200603")
+
     if (args.length == 1) {
       val Array(ds) = args
       e.inc(ds)
     } else {
       e.inc()
     }
+
     spark.stop()
   }
 }

+ 10 - 3
src/main/scala/com/winhc/bigdata/spark/udf/CompanyStaffAggs.scala

@@ -10,7 +10,7 @@ import org.apache.spark.sql.types._
  * @Date: 2020/8/3 14:52
  * @Description:
  */
-class CompanyStaffAggs(max: Int = 500) extends UserDefinedAggregateFunction {
+class CompanyStaffAggs extends UserDefinedAggregateFunction {
   override def inputSchema: StructType = StructType(StructField("types", StringType) :: Nil)
 
   override def bufferSchema: StructType = StructType(Array(StructField("buffer", ArrayType(StringType, containsNull = false))))
@@ -32,6 +32,13 @@ class CompanyStaffAggs(max: Int = 500) extends UserDefinedAggregateFunction {
     buffer1(0) = buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)
   }
 
-  //.diff(Seq("未知", "未公开"))
-  override def evaluate(buffer: Row): Any = buffer.getSeq[String](0).distinct.mkString(",")
+
+  override def evaluate(buffer: Row): Any = {
+    val v = buffer.getSeq[String](0).distinct.diff(Seq("未知", "未公开")).mkString(",")
+    if (StringUtils.isEmpty(v)) {
+      "未公开"
+    } else {
+      v
+    }
+  }
 }