Ver código fonte

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

# Conflicts:
#	src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala
许家凯 4 anos atrás
pai
commit
3ce3b62429

+ 3 - 2
src/main/scala/com/winhc/bigdata/spark/config/PhoenixConfig.scala

@@ -13,7 +13,8 @@ object PhoenixConfig {
   private val DB_PHOENIX_DRIVER = BaseConst.DB_PHOENIX_DRIVER
   private val DB_PHOENIX_USER = ""
   private val DB_PHOENIX_PASS = ""
-  private val DB_PHOENIX_FETCHSIZE = "10000"
+  private val DB_PHOENIX_FETCHSIZE = "100"
+  private val DB_PHOENIX_BATCHSIZE = "100"
 
   def getPhoenixProperties: Properties = {
     val connProp = new Properties
@@ -21,7 +22,7 @@ object PhoenixConfig {
     connProp.put("user", DB_PHOENIX_USER)
     connProp.put("password", DB_PHOENIX_PASS)
     connProp.put("fetchsize", DB_PHOENIX_FETCHSIZE)
-    connProp.put("batchsize", "100000")
+    connProp.put("batchsize", DB_PHOENIX_BATCHSIZE)
     connProp
   }
 

+ 21 - 9
src/main/scala/com/winhc/bigdata/spark/jobs/CalcIncrTotal.scala

@@ -9,16 +9,28 @@ import scala.collection.mutable
  * @Description: 增量数据入口类
  * @author π
  * @date 2020/6/2810:43
+ *      old
+ *       //winhc_eci_dev company_icp liscense,domain,new_cid cid
+ *       //winhc_eci_dev company_app_info icon_oss_path,brief,name,new_cid cid
+ *       //winhc_eci_dev ads_company_tm app_date,tm_name,reg_no,new_cid cid
+ *       //winhc_eci_dev company_wechat title,public_num,new_cid cid
+ *
+ *       //winhc_eci_dev company_copyright_reg reg_num,full_name,cat_num,new_cid cids
+ *       //winhc_eci_dev company_copyright_works reg_num,name,type,new_cid cids
+ *       //winhc_eci_dev company_patent app_number,pub_number,title,new_cid cids
  */
 object CalcIncrTotal {
-  //winhc_eci_dev company_icp liscense,domain,new_cid cid
-  //winhc_eci_dev company_app_info icon_oss_path,brief,name,new_cid cid
-  //winhc_eci_dev ads_company_tm app_date,tm_name,reg_no,new_cid cid
-  //winhc_eci_dev company_wechat title,public_num,new_cid cid
-
-  //winhc_eci_dev company_copyright_reg reg_num,full_name,cat_num,new_cid cids
-  //winhc_eci_dev company_copyright_works reg_num,name,type,new_cid cids
-  //winhc_eci_dev company_patent app_number,pub_number,title,new_cid cids
+
+  //winhc_eci_dev company_icp new_cid,liscense,domain cid
+  //winhc_eci_dev company_app_info new_cid,name cid
+  //winhc_eci_dev ads_company_tm new_cid,reg_no cid
+  //winhc_eci_dev company_wechat new_cid,public_num cid
+
+  //winhc_eci_dev company_copyright_reg new_cid,reg_num cids
+  //winhc_eci_dev company_copyright_works new_cid,reg_num cids
+  //winhc_eci_dev company_patent new_cid,pub_number,app_number cids
+
+
   def main(args: Array[String]): Unit = {
 
     val Array(project, tableName, dupliCols, flag) = args
@@ -35,7 +47,7 @@ object CalcIncrTotal {
     }
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     flag match {

+ 8 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -4,9 +4,17 @@ import com.winhc.bigdata.calc.DimScore
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.SparkSession
+import com.winhc.bigdata.spark.utils.BaseUtil.cleanup
 
 trait CompanyMapping {
 
+  def prepareFunctions(spark: SparkSession): Unit ={
+    import spark._
+    //清理特殊字符
+    spark.udf.register("cleanup", (col: String) => {
+      cleanup(col)
+    })
+  }
 
   def prepare(spark: SparkSession): Unit = {
     import spark._

+ 8 - 0
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -5,6 +5,7 @@ import java.util.{Calendar, Date, Locale}
 import org.apache.commons.lang3.StringUtils
 import org.apache.commons.lang3.time.DateFormatUtils
 import org.apache.spark.sql.SparkSession
+import java.util.regex.Pattern
 
 /**
  * @Author: XuJiakai
@@ -65,4 +66,11 @@ object BaseUtil {
     c.add(Calendar.DATE, 1 * n)
     DateFormatUtils.format(c.getTime.getTime, pattern)
   }
+
+  def cleanup(col :String): String ={
+    if(StringUtils.isNotBlank(col)){
+      pattern.matcher(col).replaceAll("")
+    }else{""}
+  }
+
 }

+ 46 - 30
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.utils
 
 import java.util.Date
 
+import com.winhc.bigdata.spark.udf.CompanyMapping
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 
@@ -15,11 +16,14 @@ case class CompanyIncrForCidUtils(s: SparkSession,
                                   project: String, //表所在工程名
                                   tableName: String, //表名(不加前后辍)
                                   dupliCols: Seq[String] // 去重列
-                                 ) extends LoggingUtils {
+                                 ) extends LoggingUtils with CompanyMapping{
   @(transient@getter) val spark: SparkSession = s
 
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+
+    prepareFunctions(spark)
+
     val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
     val ads_company_tb = s"${project}.ads_${tableName}" //存量ads表
     val inc_ods_company_tb = s"${project}.inc_ods_$tableName" //增量ods表
@@ -45,8 +49,11 @@ case class CompanyIncrForCidUtils(s: SparkSession,
       runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
     }
 
+    val cols_md5 = dupliCols.filter(!_.equals("new_cid"))
+
     println(
       s"""
+         |cols_md5:$cols_md5
          |remainDs:$remainDs
          |lastDsIncOds:$lastDsIncOds
          |lastDsIncAds:$lastDsIncAds
@@ -73,48 +80,57 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |AND     cid IS NOT NULL
          |AND     current_cid IS NOT NULL
          |GROUP BY cid,current_cid
-         |""".stripMargin).cache().createOrReplaceTempView("mapping")
+         |""".stripMargin).createOrReplaceTempView("mapping")
+
 
     sql(
       s"""
          |INSERT OVERWRITE TABLE ${inc_ads_company_tb} PARTITION(ds=$lastDsIncOds)
-         |SELECT  CONCAT_WS('_',new_cid,id) AS rowkey
+         |SELECT  rowkey
          |        ,flag
          |        ,new_cid
          |        ,cid
          |        ,${columns.mkString(",")}
          |FROM    (
-         |            SELECT  "0" AS flag
-         |                    ,a.new_cid
-         |                    ,b.cid
+         |            SELECT  CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
+         |                    ,flag
+         |                    ,new_cid
+         |                    ,cid
          |                    ,${columns.mkString(",")}
-         |                    ,ROW_NUMBER() OVER (PARTITION BY ${dupliCols.mkString(",")} ORDER BY update_time DESC ) num
-         |            FROM    mapping a
-         |            JOIN    (
-         |                        SELECT  new_cid AS cid
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
+         |            FROM    (
+         |                        SELECT  "0" AS flag
+         |                                ,a.new_cid
+         |                                ,b.cid
          |                                ,${columns.mkString(",")}
-         |                        FROM    ${ads_company_tb}
-         |                        WHERE   ds >= ${remainDs}
+         |                        FROM    mapping a
+         |                        JOIN    (
+         |                                    SELECT  new_cid AS cid
+         |                                            ,${columns.mkString(",")}
+         |                                    FROM    ${ads_company_tb}
+         |                                    WHERE   ds >= ${remainDs}
+         |                                    UNION ALL
+         |                                    SELECT  new_cid AS cid
+         |                                            ,${columns.mkString(",")}
+         |                                    FROM    ${inc_ads_company_tb}
+         |                                    WHERE   ds >= ${runDs}
+         |                                ) b
+         |                        ON      a.cid = b.cid
          |                        UNION ALL
-         |                        SELECT  new_cid AS cid
+         |                        SELECT  "1" AS flag
+         |                                ,coalesce(b.new_cid,a.cid) new_cid
+         |                                ,CAST(a.cid AS STRING) AS cid
          |                                ,${columns.mkString(",")}
-         |                        FROM    ${inc_ads_company_tb}
-         |                        WHERE   ds >= ${runDs}
-         |                    ) b
-         |            ON      a.cid = b.cid
-         |            UNION ALL
-         |            SELECT  "1" AS flag
-         |                    ,coalesce(b.new_cid,a.cid) new_cid
-         |                    ,cast(a.cid as string) as cid
-         |                    ,${columns.mkString(",")}
-         |                    ,ROW_NUMBER() OVER (PARTITION BY ${dupliCols.mkString(",")} ORDER BY update_time DESC ) num
-         |            FROM    ${inc_ods_company_tb} a
-         |            LEFT JOIN mapping b
-         |            ON      a.cid = b.cid
-         |            WHERE   a.ds >= ${runDs}
-         |            AND     a.cid IS NOT NULL
-         |        ) d
+         |                        FROM    ${inc_ods_company_tb} a
+         |                        LEFT JOIN mapping b
+         |                        ON      a.cid = b.cid
+         |                        WHERE   a.ds >= ${runDs}
+         |                        AND     a.cid IS NOT NULL
+         |                    ) d
+         |        ) e
          |WHERE   num = 1
+         |AND     cleanup(CONCAT_WS('',${cols_md5.mkString(",")})) IS NOT NULL
+         |AND     trim(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))) <> ''
          |""".stripMargin)
 
     val colsTotal = columns ++ Seq("new_cid")
@@ -125,7 +141,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
       inc_ads_company_tb,
       tableName,
       lastDsIncOds,
-      Seq("new_cid", "id")
+      s"CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))))"
     ).syn()
 
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)

+ 55 - 44
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.utils
 
 import java.util.Date
 
+import com.winhc.bigdata.spark.udf.CompanyMapping
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 
@@ -17,11 +18,14 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
                                    mainTableName: String, //主表名(不加前辍)
                                    sublistTableName: String, //子表(不加前辍)
                                    dupliCols: Seq[String] // 去重列
-                                  ) extends LoggingUtils {
+                                  ) extends LoggingUtils  with CompanyMapping{
   @(transient@getter) val spark: SparkSession = s
 
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+
+    prepareFunctions(spark)
+
     val inc_ods_company = s"${project}.inc_ods_company" //每日公司基本信息增量
     val ads_company_tb = s"${project}.ads_$mainTableName" //存量ads主表数据
     val ads_company_tb_list = s"${project}.ads_$sublistTableName" //存量子表数据 用于读取表字段
@@ -53,8 +57,11 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
       runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
     }
 
+    val cols_md5 = dupliCols.filter(!_.equals("new_cid"))
+
     println(
       s"""
+         |cols_md5:$cols_md5
          |remainDs:$remainDs
          |lastDsIncOds:$lastDsIncOds
          |lastDsIncAds:$lastDsIncAds
@@ -82,7 +89,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |AND     cid IS NOT NULL
          |AND     current_cid IS NOT NULL
          |GROUP BY cid,current_cid
-         |""".stripMargin).cache().createOrReplaceTempView("mapping")
+         |""".stripMargin).createOrReplaceTempView("mapping")
 
     //增量打平
     sql(
@@ -99,56 +106,60 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     //替换cid,去重,复制老数据
     val df1 = sql(
       s"""
-         |INSERT OVERWRITE TABLE  $inc_ads_company_tb_list PARTITION(ds='$lastDsIncOds')
-         |SELECT  CONCAT_WS('_',new_cid,id) AS rowkey
-         |        ,"0" as flag
-         |        ,CAST(new_cid as string) AS new_cid
+         INSERT OVERWRITE TABLE $inc_ads_company_tb_list PARTITION(ds='$lastDsIncOds')
+         |SELECT  CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
+         |        ,flag
+         |        ,new_cid
          |        ,${sublistTableFieldName.mkString(",")}
          |FROM    (
-         |        SELECT
-         |                *
-         |                ,ROW_NUMBER() OVER (PARTITION BY ${dupliCols.mkString(",")} ORDER BY update_time DESC ) num
-         |        FROM    (
-         |                SELECT
-         |                        c.*
-         |                        ,coalesce(d.new_cid,c.cid) AS new_cid
-         |                FROM    incr_tb c
-         |                LEFT JOIN mapping d
-         |                ON      c.cid = d.cid
-         |                ) e
-         |        ) f
-         |WHERE   num =1
-         |UNION ALL
-         |SELECT  CONCAT_WS('_',new_cid,id) AS rowkey
-         |        ,"1" as flag
-         |        ,CAST(new_cid as string) AS new_cid
-         |        ,${sublistTableFieldName.mkString(",")}
-         |FROM    (
-         |            SELECT  a.new_cid
-         |                    ,${columns.mkString(",")}
-         |                    ,ROW_NUMBER() OVER (PARTITION BY ${dupliCols.mkString(",")} ORDER BY update_time DESC ) num
-         |            FROM    mapping a
-         |            JOIN    (
-         |                        SELECT  new_cid AS cid
-         |                                ,${columns.mkString(",")}
-         |                        FROM    ${inc_ads_company_tb_list}
-         |                        WHERE   ds >= ${runDs}
-         |                        UNION ALL
-         |                        SELECT  new_cid AS cid
+         |            SELECT  "0" AS flag
+         |                    ,CAST(new_cid AS STRING) AS new_cid
+         |                    ,${sublistTableFieldName.mkString(",")}
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
+         |            FROM    (
+         |                        SELECT  *
+         |                        FROM    (
+         |                                    SELECT  c.*
+         |                                            ,coalesce(d.new_cid,c.cid) AS new_cid
+         |                                    FROM    incr_tb c
+         |                                    LEFT JOIN mapping d
+         |                                    ON      c.cid = d.cid
+         |                                ) e
+         |                    ) f
+         |            UNION ALL
+         |            SELECT  "1" AS flag
+         |                    ,CAST(new_cid AS STRING) AS new_cid
+         |                    ,${sublistTableFieldName.mkString(",")}
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
+         |            FROM    (
+         |                        SELECT  a.new_cid
          |                                ,${columns.mkString(",")}
-         |                        FROM    ${ads_company_tb_list}
-         |                        WHERE   ds >= ${remainDs}
-         |                    ) b
-         |            ON      a.cid = b.cid
-         |        ) c
+         |                        FROM    mapping a
+         |                        JOIN    (
+         |                                    SELECT  new_cid AS cid
+         |                                            ,${columns.mkString(",")}
+         |                                    FROM    ${inc_ads_company_tb_list}
+         |                                    WHERE   ds >= ${runDs}
+         |                                    UNION ALL
+         |                                    SELECT  new_cid AS cid
+         |                                            ,${columns.mkString(",")}
+         |                                    FROM    ${ads_company_tb_list}
+         |                                    WHERE   ds >= ${remainDs}
+         |                                ) b
+         |                        ON      a.cid = b.cid
+         |                    ) c
+         |        ) e
          |WHERE   num = 1
+         |AND     cleanup(CONCAT_WS('',${cols_md5.mkString(",")})) IS NOT NULL
+         |AND     trim(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))) <> ''
          |""".stripMargin)
 
     //主表按照id去重落库
     sql(
       s"""
          |INSERT OVERWRITE TABLE  $inc_ads_company_tb PARTITION(ds='$lastDsIncOds')
-         |SELECT  cids,${columns.mkString(",")}
+         |SELECT  md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))) AS rowkey,
+         |        cids,${columns.mkString(",")}
          |FROM    (
          |            SELECT  cids,${columns.mkString(",")}
          |                    ,ROW_NUMBER() OVER (PARTITION BY id ORDER BY update_time DESC ) num
@@ -168,7 +179,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
       inc_ads_company_tb_list,
       sublistTableName,
       lastDsIncOds,
-      Seq("new_cid","id")
+      s"CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))))"
     ).syn()
 
     //同步增量主表数据
@@ -179,7 +190,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
       inc_ads_company_tb,
       mainTableName,
       lastDsIncOds,
-      Seq("id")
+      s"md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))"
     ).syn()
 
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)

+ 46 - 3
src/main/scala/com/winhc/bigdata/spark/utils/MaxComputer2Phoenix.scala

@@ -4,6 +4,8 @@ import java.util.Date
 
 import org.apache.spark.sql.SparkSession
 
+import scala.collection.mutable
+
 /**
  * @Description: MaxComputer2Phoenix
  * @author π
@@ -14,7 +16,7 @@ case class MaxComputer2Phoenix(spark: SparkSession,
                                adsTable: String, //odps表
                                htable: String, //hbase表
                                ds: String, //分区
-                               rowkey: Seq[String] //rowkey字段
+                               rowkey: String //rowkey字段
                               ) extends LoggingUtils {
   //同步max到phoenix
   def syn() = {
@@ -26,7 +28,8 @@ case class MaxComputer2Phoenix(spark: SparkSession,
 
     println("------------" + resTable + "---------")
 
-    val key = s"CONCAT_WS('_',${rowkey.mkString(",")}) AS rowkey"
+
+    val key = s"$rowkey AS rowkey"
     val res = phoenixCols.map(s => {
       if ("NEW_CID".equals(s.toUpperCase())) {
         s"cast ($s as string) as CID"
@@ -42,7 +45,7 @@ case class MaxComputer2Phoenix(spark: SparkSession,
          |from
          |${adsTable}
          |where ds = $ds
-         |""".stripMargin)
+         |""".stripMargin).repartition(100)
 
     import com.winhc.bigdata.spark.implicits.PhoenixHelper._
     df.save2PhoenixByJDBC(s"${resTable.toUpperCase}")
@@ -51,3 +54,43 @@ case class MaxComputer2Phoenix(spark: SparkSession,
   }
 
 }
+
+/**
+ * max 同步到 phoenix
+ */
+object MaxComputer2Phoenix {
+  def main(args: Array[String]): Unit = {
+
+    val map = mutable.Map[String, String](
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10",
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev"
+    )
+    val sparkSession = SparkUtils.InitEnv("scala spark on Phoenix5.x test", map)
+
+    import sparkSession._
+
+    val Array(project,table) = args
+
+    val odpsTable = "winhc_eci_dev.inc_ads_company_bid_list"
+    val phoenixTable = "company_bid_list"
+    val ds = "20200621"
+    val sublistTableFieldName = sparkSession.table(odpsTable).columns.filter(s => {
+      !s.equals("ds") && !s.equals("new_cid") && !s.equals("rowkey") && !s.equals("cids") && !s.equals("new_cids") && !s.equals("cid")
+    }).seq
+
+    val colsList = sublistTableFieldName ++ Seq("new_cid")
+
+    MaxComputer2Phoenix(
+      sparkSession,
+      colsList,
+      odpsTable,
+      phoenixTable,
+      "20200621",
+      "CONCAT_WS('_',new_cid,id)"
+    ).syn()
+
+    sparkSession.stop()
+  }
+
+
+}