晏永年 4 gadi atpakaļ
vecāks
revīzija
02e66a747e

+ 1 - 1
src/main/resources/env.yaml

@@ -1,5 +1,5 @@
 profile:
-  activate: prod
+  activate: dev
 
 ---
 env:

+ 74 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/increment/inc_phx_cid_ads.scala

@@ -0,0 +1,74 @@
+package com.winhc.bigdata.spark.jobs.increment
+
+import java.util.Date
+
+import com.winhc.bigdata.spark.jobs.CompanyForCid.valid
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions.col
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author yyn
+ * @Date 2020/7/15
+ * @Description TODO
+ */
+object inc_phx_cid_ads extends LoggingUtils {
+  var config = mutable.Map(
+    "spark.hadoop.odps.project.name" -> "winhc_eci_dev"
+  )
+  val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+  def main(args: Array[String]): Unit = {
+    var hasList: Boolean = false
+    if (args.length < 1) {
+      println("请输入:1、表名;2、是否有list表 ")
+      sys.exit(-1)
+    } else if (args.length == 2) {
+      hasList = args(1).toBoolean
+    }
+    val sourceTable = args(0)
+    val adsTable = s"inc_ads_$sourceTable"
+    val phxTable = sourceTable.toUpperCase
+    val adsColumns: Seq[String] = spark.table(adsTable).schema.map(_.name).filter(!_.equals("ds"))
+    import com.winhc.bigdata.spark.implicits.PhoenixHelper._
+    //处理主表
+    //增量ads最后一个分区
+    val lastDsIncAds = BaseUtil.getPartion(adsTable, spark)
+    val df1 = sql(s"""SELECT ${adsColumns.mkString(",")} FROM ${adsTable} WHERE ds=${lastDsIncAds}""")
+    if (hasList) {
+      df1.columns.foldLeft(df1) {
+        (currentDF, column) =>
+          currentDF.withColumn(column, col(column).cast("String"))
+      }.drop("cids").drop("flag")
+        .withColumnRenamed("new_cids", "cids")
+        .createOrReplaceTempView(s"tmp")
+      sql(s"""SELECT id AS ROWKEY,* FROM tmp""").repartition(200).save2PhoenixByJDBC(s"${phxTable}")
+      println(s"${this.getClass.getSimpleName} phx main table writed! " + new Date().toString)
+      //处理list表
+      val adsListColumns: Seq[String] = spark.table(adsTable + "_list").schema.map(_.name).filter(!_.equals("ds"))
+      val lastDsIncAdsList = BaseUtil.getPartion(adsTable + "_list", spark)
+      val df2 = sql(s"""SELECT ${adsListColumns.mkString(",")} FROM ${adsTable}_list WHERE ds=${lastDsIncAdsList}""")
+      df2.columns.foldLeft(df2) {
+        (currentDF, column) =>
+          currentDF.withColumn(column, col(column).cast("String"))
+      }.drop("cid")
+        .withColumnRenamed("new_cid", "cid")
+        .createOrReplaceTempView(s"tmp2")
+      sql(s"""SELECT * FROM tmp2""").repartition(200).save2PhoenixByJDBC(s"${phxTable}_LIST")
+      println(s"${this.getClass.getSimpleName} phx list table writed! " + new Date().toString)
+    } else {
+      df1.columns.foldLeft(df1) {
+        (currentDF, column) =>
+          currentDF.withColumn(column, col(column).cast("String"))
+      }.drop("cid").drop("flag")
+        .withColumnRenamed("new_cid", "cid")
+        .createOrReplaceTempView(s"tmp")
+      sql(s"""SELECT * FROM tmp""").repartition(200).save2PhoenixByJDBC(s"${phxTable}")
+      println(s"${this.getClass.getSimpleName} phx single table writed! " + new Date().toString)
+    }
+    spark.stop()
+  }
+}

+ 163 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/increment/script/ads_company_land_transfer_ods.sql

@@ -0,0 +1,163 @@
+INSERT OVERWRITE TABLE winhc_eci_dev.ads_company_land_transfer PARTITION (ds='20200604')
+SELECT --只有有现土地使用权人
+    CONCAT_WS("_",new_cid,id) AS rowkey,
+--    NVL(B.new_cid,A.pre_cid)   AS pre_cid ,--没有原土地使用权人
+    NVL(B.new_cid,A.now_cid)   AS new_cid,
+    A.now_cid AS cid,
+    'now' AS  type,
+    id                      ,
+    mark               ,
+    num                ,
+    location           ,
+    aministrative_area ,
+    user_pre           ,
+    user_now           ,
+    area               ,
+    use_for            ,
+    use_type           ,
+    years_of_use       ,
+    situation          ,
+    level              ,
+    merchandise_type   ,
+    merchandise_price  ,
+    merchandise_time,
+    url                ,
+    create_time,
+    update_time,
+    deleted
+FROM winhc_eci_dev.ods_company_land_transfer AS A
+LEFT JOIN winhc_eci_dev.company_map AS B
+ON A.now_cid=B.cid
+WHERE A.ds='20200604'
+AND A.pre_cid is NULL--没有原土地使用权人
+AND A.now_cid is NOT NULL--但有现土地使用权人
+UNION --只有原土地使用权人
+SELECT
+    CONCAT_WS("_",new_cid,id) AS rowkey,
+    NVL(B.new_cid,A.pre_cid)   AS new_cid ,
+    A.pre_cid AS cid,
+--    NVL(B.new_cid,A.now_cid)   AS cid,--没有现土地使用权人
+    'pre' AS  type,
+    id                     ,
+    mark               ,
+    num                ,
+    location           ,
+    aministrative_area ,
+    user_pre           ,
+    user_now           ,
+    area               ,
+    use_for            ,
+    use_type           ,
+    years_of_use       ,
+    situation          ,
+    level              ,
+    merchandise_type   ,
+    merchandise_price  ,
+    merchandise_time,
+    url                ,
+    create_time,
+    update_time,
+    deleted
+FROM winhc_eci_dev.ods_company_land_transfer AS A
+LEFT JOIN winhc_eci_dev.company_map AS B
+ON A.pre_cid=B.cid
+WHERE A.ds='20200604'
+AND A.pre_cid is NOT NULL--但原土地使用权人
+AND A.now_cid is NULL--没有现土地使用权人
+UNION--原土地使用权人与现土地使用权人为同一人或企业(cid)
+SELECT
+    CONCAT_WS("_",new_cid,id) AS rowkey,
+    NVL(B.new_cid,A.pre_cid)   AS new_cid ,
+    A.pre_cid AS cid,
+--    NVL(B.new_cid,A.now_cid)   AS cid,--与pre_cid相同
+    'bothsame' AS  type,
+    id                     ,
+    mark               ,
+    num                ,
+    location           ,
+    aministrative_area ,
+    user_pre           ,
+    user_now           ,
+    area               ,
+    use_for            ,
+    use_type           ,
+    years_of_use       ,
+    situation          ,
+    level              ,
+    merchandise_type   ,
+    merchandise_price  ,
+    merchandise_time,
+    url                ,
+    create_time,
+    update_time,
+    deleted
+FROM winhc_eci_dev.ods_company_land_transfer AS A
+LEFT JOIN winhc_eci_dev.company_map AS B
+ON A.pre_cid=B.cid
+WHERE A.ds='20200604'
+AND A.pre_cid is NOT NULL AND A.pre_cid=A.now_cid--原土地使用权人与现土地使用权人相同
+UNION--原土地使用权人与现土地使用权人都有但不为同一人或企业(cid),拆成二条的第一条(抵押人)
+SELECT
+    CONCAT_WS("_",new_cid,id) AS rowkey,
+    NVL(B.new_cid,A.pre_cid)   AS new_cid ,--一分为二的第一条
+    A.pre_cid AS cid,
+--    NVL(B.new_cid,A.now_cid)   AS cid,
+    'bothone' AS  type,
+    id                      ,
+    mark               ,
+    num                ,
+    location           ,
+    aministrative_area ,
+    user_pre           ,
+    user_now           ,
+    area               ,
+    use_for            ,
+    use_type           ,
+    years_of_use       ,
+    situation          ,
+    level              ,
+    merchandise_type   ,
+    merchandise_price  ,
+    merchandise_time,
+    url                ,
+    create_time,
+    update_time,
+    deleted
+FROM winhc_eci_dev.ods_company_land_transfer AS A
+LEFT JOIN winhc_eci_dev.company_map AS B
+ON A.pre_cid=B.cid
+WHERE A.ds='20200604'
+AND A.pre_cid is NOT NULL AND A.now_cid is NOT NULL AND A.pre_cid!=A.now_cid--抵押人、抵押权人都有但不相同
+UNION--原土地使用权人与现土地使用权人都有但不为同一人或企业(cid),拆成二条的第二条(抵押权人)
+SELECT
+    CONCAT_WS("_",new_cid,id) AS rowkey,
+--    NVL(B.new_cid,A.pre_cid)   AS cid ,
+    NVL(B.new_cid,A.now_cid)   AS new_cid,--一分为二的第二条
+    A.now_cid AS cid,
+    'bothtwo' AS  type,
+    id                      ,
+    mark               ,
+    num                ,
+    location           ,
+    aministrative_area ,
+    user_pre           ,
+    user_now           ,
+    area               ,
+    use_for            ,
+    use_type           ,
+    years_of_use       ,
+    situation          ,
+    level              ,
+    merchandise_type   ,
+    merchandise_price  ,
+    merchandise_time,
+    url                ,
+    create_time,
+    update_time,
+    deleted
+FROM winhc_eci_dev.ods_company_land_transfer AS A
+LEFT JOIN winhc_eci_dev.company_map AS B
+ON A.now_cid=B.cid
+WHERE A.ds='20200604'
+AND A.pre_cid is NOT NULL AND A.now_cid is NOT NULL AND A.pre_cid!=A.now_cid--抵押人、抵押权人都有但不相同
+;