瀏覽代碼

添加人与公司表增量同步

晏永年 4 年之前
父節點
當前提交
71f4559997
共有 1 個文件被更改,包括 83 次插入0 次删除
  1. 83 0
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncCompany2Es.scala

+ 83 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncCompany2Es.scala

@@ -68,6 +68,17 @@ object CompanyIncCompany2Es {
     , "UPDATE_TIME"
     , "DELETED"
   )
+  val outFields_Human = Seq(
+    ,"NEW_CID"
+    ,"CID"
+    ,"ID"
+    ,"COMPANY_NAME"
+    ,"HUMAN_NAME"
+    ,"HID"
+    ,"HUMAN_PID"
+    ,"STATUS"
+    ,"CREATE_TIME"
+  )
 
   case class Company2Es(s: SparkSession, project: String, bizDate: String) extends LoggingUtils with BaseFunc {
     @(transient@getter) val spark: SparkSession = s
@@ -144,6 +155,78 @@ object CompanyIncCompany2Es {
 
     }
   }
+  case class Company_Human_Relation2HBase(s: SparkSession, project: String, bizDate: String) extends LoggingUtils {
+    @(transient@getter) val spark: SparkSession = s
+
+    def calc() {
+      val partition = bizDate.replaceAll("\\-", "")
+      if (partition.length != 8) {
+        println("biz date is error!")
+        sys.exit(-99)
+      }
+      val inc_ods_partitions = BaseUtil.getPartitions(s"${project}.inc_ods_company_human_relation", spark)
+      val end_partition = if (inc_ods_partitions.isEmpty) partition else inc_ods_partitions.last
+
+      val inc_ads_partitions = BaseUtil.getPartitions(s"${project}.inc_ads_company_human_relation", spark)
+      val start_partition = if (inc_ads_partitions.isEmpty) '0' else inc_ads_partitions.last
+
+      if (start_partition.equals(end_partition)) {
+        println("start_partition == end_partition")
+        sys.exit(-999)
+      }
+
+      val companyCols = spark.table("ads_company_human_relation").columns
+        .filter(!_.equals("ds"))
+        .seq
+
+      //读取数据
+      // 去除数据本身重复
+      val df = sql(
+        s"""
+           |SELECT  ${companyCols.mkString(",")}
+           |FROM    (
+           |            SELECT  a.*
+           |                    ,row_number() OVER (PARTITION BY a.cid,a.hid,a.human_pid ORDER BY update_time DESC) c
+           |            FROM    (
+           |                        SELECT  *
+           |                        FROM    $project.inc_ods_company_human_relation
+           |                        WHERE   ds > $start_partition and ds <= $end_partition and cid is not null
+           |                    ) as a
+           |        ) AS tmp
+           |WHERE   tmp.c = 1
+           |""".stripMargin)
+
+      df.cache().createOrReplaceTempView("tmp_company_human_relation_inc")
+
+      //写出到ads
+      sql(
+        s"""
+           |INSERT ${if (BaseUtil.isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.inc_ads_company_human_relation PARTITION(ds='$end_partition')
+           |SELECT ${companyCols.mkString(",")}
+           |FROM
+           |    tmp_company_human_relation_inc
+           |""".stripMargin)
+
+      import spark.implicits._
+      //写出到hbase
+      import org.apache.spark.sql.functions.col
+      val jobConf = HBaseConfig.HBaseOutputJobConf("COMPANY_HUMAN_RELATION")
+      val stringDf = df.select(companyCols.map(column => col(column).cast("string")): _*)
+      stringDf.rdd.map(row => {
+        val id = row.getAs[String]("rowkey")
+        val put = new Put(Bytes.toBytes(id))
+        for (f <- outFields_Human) {
+          val v = row.getAs[String](f.toLowerCase)
+          if (v != null) {
+            put.addColumn(BaseConst.F_BYTES, Bytes.toBytes(f), Bytes.toBytes(v))
+          }
+        }
+        (new ImmutableBytesWritable, put)
+      }).filter(_ != null)
+        .saveAsHadoopDataset(jobConf)
+
+    }
+  }
 
   def main(args: Array[String]): Unit = {
     if (args.length != 2) {