Browse Source

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

许家凯 4 years ago
parent
commit
455d676beb

+ 47 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CalcIncrTotal.scala

@@ -0,0 +1,47 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.utils.{CompanyIncrForCidUtils, CompanyIncrForCidsUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description: 增量数据入口类
+ * @author π
+ * @date 2020/6/2810:43
+ */
+object CalcIncrTotal {
+  //winhc_eci_dev company_icp liscense,domain,new_cid cid
+  //winhc_eci_dev company_app_info icon_oss_path,brief,name,new_cid cid
+  //winhc_eci_dev ads_company_tm app_date,tm_name,reg_no,new_cid cid
+  //winhc_eci_dev company_wechat title,public_num,new_cid cid
+
+  //winhc_eci_dev company_copyright_reg reg_num,full_name,cat_num,new_cid cids
+  //winhc_eci_dev company_copyright_works reg_num,name,type,new_cid cids
+  //winhc_eci_dev company_patent app_number,pub_number,title,new_cid cids
+  def main(args: Array[String]): Unit = {
+
+    val Array(project, tableName, dupliCols, flag) = args
+    println(
+      s"""
+         |project: $project
+         |tableName: $tableName
+         |dupliCols: $dupliCols
+         |flag: $flag
+         |""".stripMargin)
+    if (args.length != 4) {
+      println("请输入 project:项目, tableName:表名, dupliCols:去重字段, flag:标识 !!!")
+      sys.exit(-1)
+    }
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    flag match {
+      case "cid" => CompanyIncrForCidUtils(spark, project, tableName, (dupliCols.split(",").toSeq)).calc()
+      case "cids" => CompanyIncrForCidsUtils(spark, project, tableName, tableName + "_list", dupliCols.split(",").seq).calc()
+    }
+    spark.stop()
+  }
+}

+ 3 - 37
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncrForCid.scala

@@ -10,23 +10,13 @@ import scala.collection.mutable
  * π
  * π
  */
  */
 
 
-//liscense,domain,new_cid winhc_eci_dev.inc_ods_company winhc_eci_dev.ads_company_icp winhc_eci_dev.inc_ods_company_icp winhc_eci_dev.inc_ads_company_icp
-//winhc_eci_dev.tmp_xf_inc_ods_company_icp_2
 object CompanyIncrForCid {
 object CompanyIncrForCid {
 
 
-  val tabMapping: Map[String, (String, String, String, String, String)] =
-    Map("ods_company_icp" -> ("liscense,domain,new_cid", "inc_ods_company", "ads_company_icp", "inc_ods_company_icp", "winhc_eci_dev.inc_ads_company_icp"), //网站
-      "ods_company_tm" -> ("reg_no,new_cid", "", "", "", ""), //商标
-      "ods_company_wechat" -> ("public_num,new_cid", "", "", "", ""), //微信公众号
-      "ods_company_app_info" -> ("name,new_cid", "", "", "", ""), //产品信息
-      "ods_company_own_tax" -> ("own_tax_amount,tax_category,tax_num,new_cid", "", "", "", ""), //税
-      "ods_company_mortgage_info" -> ("reg_date,reg_num,amount,new_cid", "", "", "", "") //
-    )
-
-
   //winhc_eci_dev company_icp liscense,domain,new_cid
   //winhc_eci_dev company_icp liscense,domain,new_cid
+  //winhc_eci_dev company_app_info icon_oss_path,brief,name,new_cid
+  //winhc_eci_dev ads_company_tm app_date,tm_name,reg_no,new_cid
+  //winhc_eci_dev company_wechat title,public_num,new_cid
   def main(args: Array[String]): Unit = {
   def main(args: Array[String]): Unit = {
-    //    val (cols, t1, t2, t3, t4) = valid(args)
     val Array(project, tableName, dupliCols) = args
     val Array(project, tableName, dupliCols) = args
     println(
     println(
       s"""
       s"""
@@ -40,31 +30,7 @@ object CompanyIncrForCid {
     )
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     CompanyIncrForCidUtils(spark, project, tableName, (dupliCols.split(",").toSeq)).calc()
     CompanyIncrForCidUtils(spark, project, tableName, (dupliCols.split(",").toSeq)).calc()
-    //    CompanyIncrForCidUtils(spark, t1, t2, t3, t4, cols).calc()
     spark.stop()
     spark.stop()
   }
   }
 
 
-  def valid(args: Array[String]): (Seq[String], String, String, String, String) = {
-    println(args.toSeq.mkString(" "))
-
-    if (args.length == 1) {
-
-    } else if (args.length == 5) {
-      val Array(cols, t1, t2, t3, t4) = args
-      return (cols.split(",").toSeq, t1, t2, t3, t4)
-    } else {
-      println("请输入要计算的table!!!! ")
-      sys.exit(-1)
-    }
-    val Array(sourceTable) = args
-    val space = sourceTable.split("\\.")(0) + "."
-
-    val (cols, t1, t2, t3, t4) = tabMapping.getOrElse(sourceTable.split("\\.")(1), null)
-    if (cols.isEmpty) {
-      println("输入表不存在,请配置计算规则!!!   ")
-      sys.exit(-1)
-    }
-
-    (cols.split(",").toSeq, space + t1, space + t2, space + t3, space + t4)
-  }
 }
 }

+ 2 - 13
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncrForCids.scala

@@ -10,33 +10,22 @@ import scala.collection.mutable
  * π
  * π
  */
  */
 
 
-//reg_num,full_name,cat_num,new_cid
-// winhc_eci_dev.inc_ods_company
-// winhc_eci_dev.ads_company_copyright_reg_list
-// winhc_eci_dev.inc_ods_company_copyright_reg
-// winhc_eci_dev.inc_ads_company_copyright_reg
-// winhc_eci_dev.inc_ads_company_copyright_reg_list
 object CompanyIncrForCids {
 object CompanyIncrForCids {
 
 
   //winhc_eci_dev company_copyright_reg company_copyright_reg_list reg_num,full_name,cat_num,new_cid
   //winhc_eci_dev company_copyright_reg company_copyright_reg_list reg_num,full_name,cat_num,new_cid
+  //winhc_eci_dev company_copyright_works company_copyright_works_list reg_num,name,type,new_cid
+  //winhc_eci_dev company_patent company_patent_list app_number,pub_number,title,new_cid
   def main(args: Array[String]): Unit = {
   def main(args: Array[String]): Unit = {
 
 
     val Array(project,mainTableName,sublistTableName,dupliCols) = args
     val Array(project,mainTableName,sublistTableName,dupliCols) = args
 
 
-//    val (cols, t1, t2, t3, t4, t5) = valid(args)
     var config = mutable.Map(
     var config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     CompanyIncrForCidsUtils(spark,project,mainTableName,sublistTableName,dupliCols.split(",").seq).calc()
     CompanyIncrForCidsUtils(spark,project,mainTableName,sublistTableName,dupliCols.split(",").seq).calc()
-//    CompanyIncrForCidsUtils(spark, t1, t2, t3, t4, t5, cols).calc()
     spark.stop()
     spark.stop()
   }
   }
 
 
-  def valid(args: Array[String]): (Seq[String], String, String, String, String, String) = {
-    println(args.toSeq.mkString(" "))
-    val Array(cols, t1, t2, t3, t4, t5) = args
-    (cols.split(",").toSeq, t1, t2, t3, t4, t5)
-  }
 }
 }

+ 9 - 4
src/main/scala/com/winhc/bigdata/spark/model/CompanyIntellectualsScore.scala

@@ -30,7 +30,10 @@ object CompanyIntellectualsScore {
 
 
     val (sourceTable, flag, time, kind, project) = valid(args)
     val (sourceTable, flag, time, kind, project) = valid(args)
 
 
-    val config = mutable.Map.empty[String, String]
+    var config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
     config.+=("spark.hadoop.odps.project.name" -> "winhc_eci_dev")
     config.+=("spark.hadoop.odps.project.name" -> "winhc_eci_dev")
     println(config)
     println(config)
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
@@ -83,11 +86,13 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
 
 
     var appsql = ""
     var appsql = ""
     var apptab = ""
     var apptab = ""
+
+    // 专利类型(1(A).发明公布,2(B,C).发明授权,3(U,Y).实用新型,4(D,S).外观设计)
     if ("3".equals(flag)) { //实用新型、外观设计专利
     if ("3".equals(flag)) { //实用新型、外观设计专利
-      appsql = "AND substring(pub_number, 7,1) in ('2','3')"
+      appsql = "AND SUBSTR(pub_number, -1) in ('U','Y','D','S')"
       apptab = "_s"
       apptab = "_s"
     } else if ("4".equals(flag)) { //发明专利
     } else if ("4".equals(flag)) { //发明专利
-      appsql = "AND substring(pub_number, 7,1) not in ('2','3')"
+      appsql = "AND SUBSTR(pub_number, -1) in ('A','B','C')"
       apptab = "_f"
       apptab = "_f"
     }
     }
 
 
@@ -117,7 +122,7 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String,
     //         |- - - - - - - - - - - - - - - - - - - - - - - - -
     //         |- - - - - - - - - - - - - - - - - - - - - - - - -
     //         |${showString(sql(s"select * from t1_view"))}
     //         |${showString(sql(s"select * from t1_view"))}
     //         |- - - - - - - - - - - - - - - - - - - - - - - - -
     //         |- - - - - - - - - - - - - - - - - - - - - - - - -
-    //       """.stripMargin)
+    //           """.stripMargin)
 
 
     sql(s"insert overwrite table ${targetTable}${apptab} " +
     sql(s"insert overwrite table ${targetTable}${apptab} " +
       s"partition (ds='${ds}')  select * from t1_view")
       s"partition (ds='${ds}')  select * from t1_view")

+ 14 - 1
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -74,7 +74,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |            UNION ALL
          |            UNION ALL
          |            SELECT  "1" AS flag
          |            SELECT  "1" AS flag
          |                    ,coalesce(b.new_cid,a.cid) new_cid
          |                    ,coalesce(b.new_cid,a.cid) new_cid
-         |                    ,a.cid
+         |                    ,cast(a.cid as string) as cid
          |                    ,${columns.mkString(",")}
          |                    ,${columns.mkString(",")}
          |                    ,ROW_NUMBER() OVER (PARTITION BY ${dupliCols.mkString(",")} ORDER BY update_time DESC ) num
          |                    ,ROW_NUMBER() OVER (PARTITION BY ${dupliCols.mkString(",")} ORDER BY update_time DESC ) num
          |            FROM    ${inc_ods_company_tb} a
          |            FROM    ${inc_ods_company_tb} a
@@ -86,6 +86,19 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |WHERE   num = 1
          |WHERE   num = 1
          |""".stripMargin)
          |""".stripMargin)
 
 
+    val colsTotal = columns ++ Seq("new_cid")
+
+    MaxComputer2Phoenix(
+      spark,
+      colsTotal,
+      target_inc_ods_company_tb,
+      tableName,
+      firstDs,
+      Seq("new_cid","id")
+    ).syn()
+
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }
   }
+
+
 }
 }

+ 27 - 7
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -29,17 +29,16 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
 
 
 
 
     val sublistTableFieldName = spark.table(ads_company_tb_list).columns.filter(s => {
     val sublistTableFieldName = spark.table(ads_company_tb_list).columns.filter(s => {
-      !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey")
+      !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")
     }).seq
     }).seq
 
 
-
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
 
     val firstDs = BaseUtil.getFirstPartion("winhc_eci_dev.inc_ods_company", spark)
     val firstDs = BaseUtil.getFirstPartion("winhc_eci_dev.inc_ods_company", spark)
 
 
     //table字段
     //table字段
     val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
     val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
-      !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids")
+      !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids")
     })
     })
 
 
     //mapping 映射关系
     //mapping 映射关系
@@ -105,7 +104,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |                        UNION ALL
          |                        UNION ALL
          |                        SELECT  new_cid AS cid
          |                        SELECT  new_cid AS cid
          |                                ,${columns.mkString(",")}
          |                                ,${columns.mkString(",")}
-         |                        FROM    ${ads_company_tb}
+         |                        FROM    ${ads_company_tb_list}
          |                        WHERE   ds >= ${firstDs}
          |                        WHERE   ds >= ${firstDs}
          |                    ) b
          |                    ) b
          |            ON      a.cid = b.cid
          |            ON      a.cid = b.cid
@@ -113,14 +112,13 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |WHERE   num = 1
          |WHERE   num = 1
          |""".stripMargin)
          |""".stripMargin)
 
 
-
     //主表按照id去重落库
     //主表按照id去重落库
     sql(
     sql(
       s"""
       s"""
          |INSERT OVERWRITE TABLE  $target_inc_ads_company_tb PARTITION(ds='$firstDs')
          |INSERT OVERWRITE TABLE  $target_inc_ads_company_tb PARTITION(ds='$firstDs')
-         |SELECT  ${columns.mkString(",")}
+         |SELECT  cids,${columns.mkString(",")}
          |FROM    (
          |FROM    (
-         |            SELECT  ${columns.mkString(",")}
+         |            SELECT  cids,${columns.mkString(",")}
          |                    ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC ) num
          |                    ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC ) num
          |            FROM    ${inc_ods_company_tb}
          |            FROM    ${inc_ods_company_tb}
          |            WHERE   ds >= ${firstDs}
          |            WHERE   ds >= ${firstDs}
@@ -130,6 +128,28 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |WHERE   num = 1
          |WHERE   num = 1
          |""".stripMargin)
          |""".stripMargin)
 
 
+    //同步增量索引表数据
+    val colsList = sublistTableFieldName ++ Seq("new_cid")
+    MaxComputer2Phoenix(
+      spark,
+      colsList,
+      target_inc_ads_company_tb_list,
+      sublistTableName,
+      firstDs,
+      Seq("new_cid","id")
+    ).syn()
+
+    //同步增量主表数据
+    val cols = columns ++ Seq("cids")
+    MaxComputer2Phoenix(
+      spark,
+      cols,
+      target_inc_ads_company_tb,
+      mainTableName,
+      firstDs,
+      Seq("id")
+    ).syn()
+
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }
   }
 }
 }

+ 52 - 0
src/main/scala/com/winhc/bigdata/spark/utils/MaxComputer2Phoenix.scala

@@ -0,0 +1,52 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.Date
+
+import org.apache.spark.sql.SparkSession
+
+/**
+ * @Description: MaxComputer2Phoenix
+ * @author π
+ * @date 2020/6/2815:44
+ */
+case class MaxComputer2Phoenix(spark: SparkSession,
+                               phoenixCols: Seq[String], //phoenix 列
+                               adsTable: String, //odps表
+                               htable: String, //hbase表
+                               ds: String, //分区
+                               rowkey: Seq[String] //rowkey字段
+                              ) extends LoggingUtils {
+  //同步max到phoenix
+  def syn() = {
+
+    println(s"${htable} phoenix syn start! " + new Date().toString)
+
+    val resTable = s"TEST_${htable}"
+
+    println("------------" + resTable + "---------")
+
+    val key = s"CONCAT_WS('_',${rowkey.mkString(",")}) AS rowkey"
+    val res = phoenixCols.map(s => {
+      if ("NEW_CID".equals(s.toUpperCase())) {
+        s"cast ($s as string) as CID"
+      } else {
+        s"cast ($s as string) as ${s.toUpperCase}"
+      }
+    }) ++ Seq(key)
+
+    val df = sql(
+      s"""
+         |select
+         |${res.mkString(", ")}
+         |from
+         |${adsTable}
+         |where ds = $ds
+         |""".stripMargin)
+
+    import com.winhc.bigdata.spark.implicits.PhoenixHelper._
+    df.save2PhoenixByJDBC(s"${resTable.toUpperCase}")
+
+    println(s"${htable} phoenix syn end! " + new Date().toString)
+  }
+
+}