Parcourir la source

odps同步phoenix

xufei il y a 5 ans
Parent
commit
f0cd844d0a

+ 47 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CalcIncrTotal.scala

@@ -0,0 +1,47 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.utils.{CompanyIncrForCidUtils, CompanyIncrForCidsUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description: 增量数据入口类
+ * @author π
+ * @date 2020/6/2810:43
+ */
+object CalcIncrTotal {
+  //winhc_eci_dev company_icp liscense,domain,new_cid cid
+  //winhc_eci_dev company_app_info icon_oss_path,brief,name,new_cid cid
+  //winhc_eci_dev ads_company_tm app_date,tm_name,reg_no,new_cid cid
+  //winhc_eci_dev company_wechat title,public_num,new_cid cid
+
+  //winhc_eci_dev company_copyright_reg reg_num,full_name,cat_num,new_cid cids
+  //winhc_eci_dev company_copyright_works reg_num,name,type,new_cid cids
+  //winhc_eci_dev company_patent app_number,pub_number,title,new_cid cids
+  def main(args: Array[String]): Unit = {
+
+    val Array(project, tableName, dupliCols, flag) = args
+    println(
+      s"""
+         |project: $project
+         |tableName: $tableName
+         |dupliCols: $dupliCols
+         |flag: $flag
+         |""".stripMargin)
+    if (args.length != 4) {
+      println("请输入 project:项目, tableName:表名, dupliCols:去重字段, flag:标识 !!!")
+      sys.exit(-1)
+    }
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    flag match {
+      case "cid" => CompanyIncrForCidUtils(spark, project, tableName, (dupliCols.split(",").toSeq)).calc()
+      case "cids" => CompanyIncrForCidsUtils(spark, project, tableName, tableName + "_list", dupliCols.split(",").seq).calc()
+    }
+    spark.stop()
+  }
+}

+ 12 - 12
src/main/scala/com/winhc/bigdata/spark/model/CompanyIntellectualsScore.scala

@@ -114,18 +114,18 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
     df.map(r => {
       trans(r, flag, kind, project)
     }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
-      "score", "total", "extraScore").show(10)
-//      .createOrReplaceTempView(s"t1_view")
-
-//        logger.info(
-//          s"""
-//             |- - - - - - - - - - - - - - - - - - - - - - - - -
-//             |${showString(sql(s"select * from t1_view"))}
-//             |- - - - - - - - - - - - - - - - - - - - - - - - -
-//           """.stripMargin)
-
-//    sql(s"insert overwrite table ${targetTable}${apptab} " +
-//      s"partition (ds='${ds}')  select * from t1_view")
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"t1_view")
+
+    //    logger.info(
+    //      s"""
+    //         |- - - - - - - - - - - - - - - - - - - - - - - - -
+    //         |${showString(sql(s"select * from t1_view"))}
+    //         |- - - - - - - - - - - - - - - - - - - - - - - - -
+    //           """.stripMargin)
+
+    sql(s"insert overwrite table ${targetTable}${apptab} " +
+      s"partition (ds='${ds}')  select * from t1_view")
   }
 
   def trans(r: Row, flag: String, kind: String, prpject: String) = {

+ 13 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -86,6 +86,19 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |WHERE   num = 1
          |""".stripMargin)
 
+    val colsTotal = columns ++ Seq("new_cid")
+
+    MaxComputer2Phoenix(
+      spark,
+      colsTotal,
+      target_inc_ods_company_tb,
+      tableName,
+      firstDs,
+      Seq("new_cid","id")
+    ).syn()
+
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }
+
+
 }

+ 22 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -128,6 +128,28 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |WHERE   num = 1
          |""".stripMargin)
 
+    //同步增量索引表数据
+    val colsList = sublistTableFieldName ++ Seq("new_cid")
+    MaxComputer2Phoenix(
+      spark,
+      colsList,
+      target_inc_ads_company_tb_list,
+      sublistTableName,
+      firstDs,
+      Seq("new_cid","id")
+    ).syn()
+
+    //同步增量主表数据
+    val cols = columns ++ Seq("cids")
+    MaxComputer2Phoenix(
+      spark,
+      cols,
+      target_inc_ads_company_tb,
+      mainTableName,
+      firstDs,
+      Seq("id")
+    ).syn()
+
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }
 }

+ 52 - 0
src/main/scala/com/winhc/bigdata/spark/utils/MaxComputer2Phoenix.scala

@@ -0,0 +1,52 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.Date
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * @Description: MaxComputer2Phoenix
+ * @author π
+ * @date 2020/6/2815:44
+ */
+case class MaxComputer2Phoenix(spark: SparkSession,
+                               phoenixCols: Seq[String], //phoenix 列
+                               adsTable: String, //odps表
+                               htable: String, //hbase表
+                               ds: String, //分区
+                               rowkey: Seq[String] //rowkey字段
+                              ) extends LoggingUtils {
+  //同步max到phoenix
+  def syn() = {
+
+    println(s"${htable} phoenix syn start! " + new Date().toString)
+
+    val resTable = s"TEST_${htable}"
+
+    println("------------" + resTable + "---------")
+
+    val key = s"CONCAT_WS('_',${rowkey.mkString(",")}) AS rowkey"
+    val res = phoenixCols.map(s => {
+      if ("NEW_CID".equals(s.toUpperCase())) {
+        s"cast ($s as string) as CID"
+      } else {
+        s"cast ($s as string) as ${s.toUpperCase}"
+      }
+    }) ++ Seq(key)
+
+    val df = sql(
+      s"""
+         |select
+         |${res.mkString(", ")}
+         |from
+         |${adsTable}
+         |where ds = $ds
+         |""".stripMargin)
+
+    import com.winhc.bigdata.spark.implicits.PhoenixHelper._
+    df.save2PhoenixByJDBC(s"${resTable.toUpperCase}")
+
+    println(s"${htable} phoenix syn end! " + new Date().toString)
+  }
+
+}