Explorar o código

Merge remote-tracking branch 'origin/master'

xufei %!s(int64=4) %!d(string=hai) anos
pai
achega
9648366c8f

+ 263 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/company_staff.scala

@@ -0,0 +1,263 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyStaffAggs}
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/3 14:48
+ * @Description:
+ */
+object company_staff {
+
+
+  case class CompanyStaffUtil(s: SparkSession,
+                              project: String
+                             ) extends LoggingUtils with Logging with BaseFunc {
+    @(transient@getter) val spark: SparkSession = s
+
+
+    def init(): Unit = {
+      cleanup()
+      spark.udf.register("agg_val", new CompanyStaffAggs())
+      sql(
+        """
+          |CREATE TABLE IF NOT EXISTS winhc_eci_dev.ads_company_staff
+          |(
+          |    rowkey STRING COMMENT 'FIELD'
+          |    ,new_cid STRING COMMENT 'FIELD'
+          |    ,id BIGINT COMMENT ''
+          |    ,cid BIGINT COMMENT ''
+          |    ,hid BIGINT COMMENT ''
+          |    ,staff_type STRING COMMENT ''
+          |    ,create_time DATETIME COMMENT ''
+          |    ,update_time DATETIME COMMENT ''
+          |    ,deleted BIGINT COMMENT ''
+          |)
+          |COMMENT 'TABLE COMMENT'
+          |PARTITIONED BY
+          |(
+          |    ds STRING COMMENT '分区'
+          |)
+          |""".stripMargin)
+
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS `winhc_eci_dev`.`inc_ads_company_staff` (
+           |  `rowkey` STRING COMMENT 'FIELD',
+           |  `new_cid` STRING COMMENT 'FIELD',
+           |  `id` BIGINT,
+           |  `cid` BIGINT,
+           |  `hid` BIGINT,
+           |  `staff_type` STRING,
+           |  `create_time` DATETIME,
+           |  `update_time` DATETIME,
+           |  `deleted` BIGINT)
+           | COMMENT 'TABLE COMMENT'
+           |PARTITIONED BY (
+           |  `ds` STRING COMMENT '分区')
+           |""".stripMargin)
+    }
+
+
+    def all(): Unit = {
+
+      sql(
+        s"""
+           |SELECT  a.*
+           |        ,coalesce(b.new_cid,a.cid) AS new_cid
+           |FROM    winhc_eci_dev.ods_company_staff a
+           |LEFT JOIN winhc_eci_dev.company_map b
+           |ON      a.cid = b.cid
+           |WHERE   a.ds = 20200604
+           |AND     a.cid IS NOT NULL
+           |""".stripMargin)
+        //        .cache()
+        .createOrReplaceTempView("tmp_all")
+
+
+      sql(
+        s"""
+           |INSERT OVERWRITE TABLE winhc_eci_dev.ads_company_staff PARTITION(ds=20200604)
+           |SELECT  t2.rowkey
+           |        ,t2.new_cid
+           |        ,t2.id
+           |        ,t2.cid
+           |        ,t2.hid
+           |        ,t3.types AS staff_type
+           |        ,t2.create_time
+           |        ,t2.update_time
+           |        ,t2.deleted
+           |FROM    (
+           |            SELECT  rowkey
+           |                    ,new_cid
+           |                    ,id
+           |                    ,cid
+           |                    ,hid
+           |                    ,staff_type
+           |                    ,create_time
+           |                    ,update_time
+           |                    ,deleted
+           |            FROM    (
+           |                        SELECT  *
+           |                                ,ROW_NUMBER() OVER (PARTITION BY new_cid,hid ORDER BY id DESC ) num
+           |                                ,CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',hid)))) AS rowkey
+           |                                ,cleanup(CONCAT_WS('',hid)) AS cols
+           |                        FROM    tmp_all AS c
+           |                    ) d
+           |            WHERE   num = 1
+           |            AND     cols IS NOT NULL
+           |            AND     trim(cols) <> ''
+           |        ) AS t2
+           |LEFT JOIN (
+           |              SELECT  t1.new_cid
+           |                      ,t1.hid
+           |                      ,agg_val(t1.staff_type) AS types
+           |              FROM    tmp_all AS t1
+           |              GROUP BY t1.new_cid
+           |                       ,t1.hid
+           |          ) AS t3
+           |ON      CONCAT_WS('',t2.hid,t2.new_cid) = CONCAT_WS('',t3.hid,t3.new_cid)
+           |""".stripMargin)
+    }
+
+    def inc(): Unit = {
+      val lastDs = getLastPartitionsOrElse("winhc_eci_dev.inc_ads_company_staff", "20200604")
+      val dss = getPartitions("winhc_eci_dev.inc_ods_company_staff").filter(_ > lastDs)
+
+      println("计算分区:" + dss.mkString(","))
+
+      for (ds <- dss) {
+        inc(ds)
+      }
+    }
+
+    def inc(ds: String): Unit = {
+      val lastPat = getLastPartitionsOrElse("winhc_eci_dev.ads_company_staff", "0")
+      sql(
+        s"""
+           |SELECT  cid
+           |        ,current_cid AS new_cid
+           |FROM    winhc_eci_dev.inc_ods_company
+           |WHERE   ds = '$ds'
+           |AND     cid IS NOT NULL
+           |AND     current_cid IS NOT NULL
+           |""".stripMargin)
+        .createOrReplaceTempView("mapping")
+
+      sql(
+        s"""
+           |SELECT  concat_ws(
+           |            '_'
+           |            ,coalesce(t2.new_cid,t1.cid)
+           |            ,split(rowkey, '_')[1]
+           |        ) AS rowkey
+           |        ,coalesce(t2.new_cid,t1.cid) as new_cid
+           |        ,id
+           |        ,t1.cid
+           |        ,hid
+           |        ,staff_type
+           |        ,create_time
+           |        ,update_time
+           |        ,deleted
+           |FROM    (
+           |            SELECT  *
+           |            FROM    winhc_eci_dev.ads_company_staff
+           |            WHERE   ds = '$lastPat'
+           |            UNION ALL
+           |            SELECT  *
+           |            FROM    winhc_eci_dev.inc_ads_company_staff
+           |            WHERE   ds > $lastPat
+           |            AND     ds <= $ds
+           |        ) AS t1
+           |JOIN mapping AS t2
+           |ON      t1.new_cid = t2.cid
+           |UNION ALL
+           |SELECT  rowkey
+           |        ,new_cid
+           |        ,id
+           |        ,cid
+           |        ,hid
+           |        ,staff_type
+           |        ,create_time
+           |        ,update_time
+           |        ,deleted
+           |FROM    (
+           |            SELECT  *
+           |                    ,CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',hid)))) AS rowkey
+           |                    ,ROW_NUMBER() OVER (PARTITION BY new_cid,hid ORDER BY id DESC ) num
+           |            FROM    (
+           |                        SELECT  coalesce(ta2.new_cid,ta1.cid) AS new_cid
+           |                                ,id
+           |                                ,ta1.cid
+           |                                ,hid
+           |                                ,staff_type
+           |                                ,create_time
+           |                                ,update_time
+           |                                ,deleted
+           |                        FROM    (
+           |                                    SELECT  *
+           |                                    FROM    winhc_eci_dev.inc_ods_company_staff
+           |                                    WHERE   ds = '$ds'
+           |                                ) AS ta1
+           |                        LEFT JOIN mapping AS ta2
+           |                        ON      ta1.cid = ta2.cid
+           |                    )
+           |        )
+           |WHERE   num = 1
+           |""".stripMargin)
+        .cache().createOrReplaceTempView("inc_ads_tmp")
+
+      sql(
+        s"""
+           |INSERT OVERWRITE TABLE winhc_eci_dev.inc_ads_company_staff PARTITION(ds=$ds)
+           |SELECT  t2.rowkey
+           |        ,t2.new_cid
+           |        ,t2.id
+           |        ,t2.cid
+           |        ,t2.hid
+           |        ,t3.types AS staff_type
+           |        ,t2.create_time
+           |        ,t2.update_time
+           |        ,t2.deleted
+           |FROM    inc_ads_tmp AS t2
+           |LEFT JOIN (
+           |              SELECT  t1.new_cid
+           |                      ,t1.hid
+           |                      ,agg_val(t1.staff_type) AS types
+           |              FROM    inc_ads_tmp AS t1
+           |              GROUP BY t1.new_cid
+           |                       ,t1.hid
+           |          ) AS t3
+           |ON      CONCAT_WS('',t2.hid,t2.new_cid) = CONCAT_WS('',t3.hid,t3.new_cid)
+           |""".stripMargin)
+
+    }
+  }
+
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv("company_staff", config)
+
+    val e = CompanyStaffUtil(spark, project)
+    e.init()
+    if (args.length == 1) {
+      val Array(ds) = args
+      e.inc(ds)
+    } else {
+      e.inc()
+    }
+    spark.stop()
+  }
+}

+ 37 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CompanyStaffAggs.scala

@@ -0,0 +1,37 @@
+package com.winhc.bigdata.spark.udf
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
+import org.apache.spark.sql.types._
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/3 14:52
+ * @Description:
+ */
+class CompanyStaffAggs(max: Int = 500) extends UserDefinedAggregateFunction {
+  override def inputSchema: StructType = StructType(StructField("types", StringType) :: Nil)
+
+  override def bufferSchema: StructType = StructType(Array(StructField("buffer", ArrayType(StringType, containsNull = false))))
+
+  override def dataType: DataType = StringType
+
+  override def deterministic: Boolean = true
+
+  override def initialize(buffer: MutableAggregationBuffer): Unit = buffer.update(0, Seq.empty[String])
+
+  override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
+    val v = input.getString(0)
+    if (StringUtils.isNotEmpty(v)) {
+      buffer(0) = v.split(",") ++ buffer.getSeq[String](0)
+    }
+  }
+
+  override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
+    buffer1(0) = buffer1.getSeq[String](0) ++ buffer2.getSeq[String](0)
+  }
+
+  //.diff(Seq("未知", "未公开"))
+  override def evaluate(buffer: Row): Any = buffer.getSeq[String](0).distinct.mkString(",")
+}

+ 20 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidUtils.scala

@@ -28,6 +28,26 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
 
     val cols_md5 = disCol.filter(!_.equals("new_cid"))
 
+   val ddl =  spark.table(odsTable).schema.filter(s=>{!"ds".equals(s.name)}).map(s=>{
+
+      val name = s.name
+      val dataType = s.dataType
+      s"$name ${DataTypeUtils.getDataType(dataType)} COMMENT '${s.getComment().getOrElse("")}'\n"
+    }).mkString(",")
+
+
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS ${adsTable}
+         |(
+         |    rowkey  STRING COMMENT 'FIELD'
+         |    ,new_cid STRING COMMENT 'FIELD'
+         |    ,$ddl
+         |)
+         |COMMENT 'TABLE COMMENT'
+         |PARTITIONED BY (ds STRING COMMENT '分区')
+         |""".stripMargin)
+
     //替换字段
     sql(
       s"""

+ 32 - 0
src/main/scala/com/winhc/bigdata/spark/utils/DataTypeUtils.scala

@@ -0,0 +1,32 @@
+package com.winhc.bigdata.spark.utils
+
+import org.apache.spark.sql.types.{DataType, DoubleType, LongType, StringType, TimestampType}
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/3 11:38
+ * @Description:
+ */
+object DataTypeUtils {
+  def getDataType(dataType: DataType): String = {
+    dataType match {
+      case StringType => {
+        s"STRING"
+      }
+      case LongType => {
+        s"BIGINT"
+      }
+      case TimestampType => {
+        s"DATETIME"
+      }
+      case DoubleType=>{
+        "DOUBLE"
+      }
+      case _ => {
+        println(s"other type:${dataType.typeName}")
+        throw new RuntimeException
+      }
+    }
+  }
+
+}