Преглед на файлове

fix: 动产抵押通用程序修改

许家凯 преди 4 години
родител
ревизия
150a2093d4
променени са 1 файла, в които са добавени 55 реда и са изтрити 2 реда
  1. 55 2
      src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidWithoutMD5Utils.scala

+ 55 - 2
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidWithoutMD5Utils.scala

@@ -22,6 +22,59 @@ case class CompanyIncrForCidWithoutMD5Utils(s: SparkSession,
                                            ) extends LoggingUtils with CompanyMapping {
   @(transient@getter) val spark: SparkSession = s
 
+
+  private case class MaxComputer2Phoenix(spark: SparkSession,
+                                         phoenixCols2: Seq[String], //phoenix 列
+                                         adsTable: String, //odps表
+                                         htable: String, //hbase表
+                                         ds: String, //分区
+                                         rowkey: String //rowkey字段
+                                        ) extends LoggingUtils {
+    //同步max到phoenix
+    def syn() = {
+
+      println(s"${htable} phoenix syn start! " + new Date().toString)
+
+      //    val resTable = s"TEST_${htable}"
+      val resTable = s"${htable}"
+
+      println("------------" + resTable + "---------")
+
+
+      val key = s"$rowkey AS rowkey"
+
+      var phoenixCols: Seq[String] = null
+      if (adsTable.equalsIgnoreCase("winhc_eci_dev.inc_ads_company_mortgage_info")) {
+        phoenixCols = phoenixCols2
+      } else {
+        phoenixCols = phoenixCols2.filter(!_.equalsIgnoreCase("id"))
+      }
+
+      val res = phoenixCols.map(s => {
+        if ("NEW_CID".equals(s.toUpperCase())) {
+          s"cast ($s as string) as CID"
+        } else {
+          s"cast ($s as string) as ${s.toUpperCase}"
+        }
+      }) ++ Seq(key)
+
+      val df = sql(
+        s"""
+           |select
+           |${res.mkString(", ")}
+           |from
+           |${adsTable}
+           |where ds = $ds
+           |""".stripMargin).repartition(100)
+
+      import com.winhc.bigdata.spark.implicits.PhoenixHelper._
+      df.save2PhoenixByJDBC(s"${resTable.toUpperCase}")
+
+      println(s"${htable} phoenix syn end! " + new Date().toString)
+    }
+
+  }
+
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
@@ -96,7 +149,7 @@ case class CompanyIncrForCidWithoutMD5Utils(s: SparkSession,
          |        ,cid
          |        ,${columns.mkString(",")}
          |FROM    (
-         |            SELECT  CONCAT_WS('_',new_cid,${cols_md5.mkString("cleanup(","), cleanup(",")")}) AS rowkey
+         |            SELECT  CONCAT_WS('_',new_cid,${cols_md5.mkString("cleanup(", "), cleanup(", ")")}) AS rowkey
          |                    ,flag
          |                    ,new_cid
          |                    ,cid
@@ -145,7 +198,7 @@ case class CompanyIncrForCidWithoutMD5Utils(s: SparkSession,
       inc_ads_company_tb,
       tableName,
       lastDsIncOds,
-      s"CONCAT_WS('_',new_cid,${cols_md5.mkString("cleanup(","), cleanup(",")")})"
+      s"CONCAT_WS('_',new_cid,${cols_md5.mkString("cleanup(", "), cleanup(", ")")})"
     ).syn()
 
     CompanyIncSummary(spark, project, tableName, "new_cid", dupliCols).calc