瀏覽代碼

增量计算补充

xufei 4 年之前
父節點
當前提交
83ec3fd3a4

+ 11 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/CompanyChangeHandle.scala

@@ -155,6 +155,17 @@ case class company_copyright_reg_list(equCols: Seq[String]) extends CompanyChang
   override def getBizTime(newMap: Map[String, String]): String = newMap("reg_time")
 }
 
+//网站
+case class company_icp(equCols: Seq[String]) extends CompanyChangeHandle {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("domain"), s"${newMap("domain")}网站发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("domain"), s"新增${newMap("domain")}网站")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("网站", newMap("domain"), newMap("examine_date"), newMap("liscense"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("examine_date")
+}
+
 
 //购地信息
 case class company_land_announcement(equCols: Seq[String]) extends CompanyChangeHandle {

+ 125 - 53
src/main/scala/com/winhc/bigdata/spark/jobs/message/IntellectualMessage.scala

@@ -26,21 +26,33 @@ object IntellectualMessage {
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
     val spark = SparkUtils.InitEnv("CompanySummaryInc", config)
-    //    IntellectualMessage(spark, project, "").calc()
-    IntellectualMessage(spark, project, "").transForm("20200727", "tmp_xf_ads_intellectual_message")
+        IntellectualMessage(spark, project, "").calc()
+//    IntellectualMessage(spark, project, "").transForm()
     spark.stop()
   }
 
 }
 
 
-case class IntellectualMessage(s: SparkSession, project: String, sourceTable: String
-                              ) extends LoggingUtils {
+case class IntellectualMessage(s: SparkSession, project: String, sourceTable: String,
+                               runOld: Boolean = false) extends LoggingUtils {
 
   @(transient@getter) val spark: SparkSession = s
 
   import spark.implicits._
 
+  val t1 = s"company_tm" //商标
+  val t2 = s"company_patent_list" //专利
+  val t3 = s"company_icp" //网站
+  val t4 = s"company_copyright_reg_list" //著作权
+  val t5 = s"company_certificate" //证书
+  val t6 = s"company_copyright_works_list" //软著作权
+  val res_tb = s"$project.tmp_xf_ads_intellectual_message" //聚合结果表
+  val res_tb_res = s"$project.tmp_xf_ads_intellectual_message_res" //转换输出结果表
+
+  val ds = BaseUtil.getPartion(s"$project.inc_ads_$t1", spark)
+  val remainDs = BaseUtil.getPartion(s"$project.ads_$t1", spark)
+
   def col2Map(pkg: String, day: String): Map[String, String] = {
     Map(pkg -> day)
   }
@@ -49,17 +61,6 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
     spark.udf.register("col2Map", col2Map _)
     spark.udf.register("MapAggs", new MapAggs())
 
-    val t1 = s"company_tm" //商标
-    val t2 = s"company_patent_list" //专利
-    val t3 = s"company_icp" //网站
-    val t4 = s"company_copyright_reg_list" //著作权
-    val t5 = s"company_certificate" //证书
-    val t6 = s"company_copyright_works_list" //软著作权
-
-    val res_tb = s"$project.tmp_xf_ads_intellectual_message" //聚合结果表
-
-    val ds = BaseUtil.getPartion(s"$project.inc_ads_$t1", spark)
-
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $res_tb PARTITION(ds=$ds)
@@ -78,7 +79,7 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
          |union all
          |select *,
          |col2Map(type,CONCAT_WS(',',inc_cnt,del_cnt,total_cnt)) m1
-         |from ${assemblySQL(s"$project", s"$t2", "pub_date", "2")}
+         |from ${assemblySQL(s"$project", s"$t2", "app_date", "2")}
          |union all
          |select *,
          |col2Map(type,CONCAT_WS(',',inc_cnt,del_cnt,total_cnt)) m1
@@ -100,15 +101,16 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
          |)h
          |""".stripMargin)
 
-    transForm(ds, res_tb)
+    //转换输出格式
+    transForm()
 
   }
 
-  def transForm(ds: String, tb: String): Unit = {
+  def transForm(): Unit = {
     sql(
       s"""
          |select *
-         |from ${project}.${tb}
+         |from ${res_tb}
          |where ds = $ds
          |""".stripMargin).map(r => {
       val cid = r.getAs[String]("cid")
@@ -129,7 +131,7 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
 
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${tb}_res PARTITION (ds=$ds)
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${res_tb_res} PARTITION (ds=$ds)
          |select * from res
          |""".stripMargin)
   }
@@ -155,38 +157,108 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
     }
   }
 
-  //存量历史计算
+  //增量,存量历史计算
   def assemblySQL(project: String, table: String, date: String, tp: String) = {
-    s"""
-       |(
-       |SELECT  new_cid
-       |        ,SUBSTR(CAST($date AS STRING),1,10) AS DATE
-       |        ,sum(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid,$date) inc_cnt
-       |        ,sum(CASE WHEN deleted = 1 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid,$date) del_cnt
-       |        ,sum(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid) total_cnt
-       |        ,$tp type
-       |FROM    (
-       |         select * from (
-       |            SELECT  rowkey
-       |                    ,new_cid
-       |                    ,$date
-       |                    ,deleted
-       |            FROM    $project.inc_ads_$table x
-       |            WHERE   x.ds > 0
-       |            union all
-       |            SELECT  rowkey
-       |                    ,new_cid
-       |                    ,$date
-       |                    ,deleted
-       |            FROM    $project.ads_$table y
-       |            WHERE   y.ds > 0
-       |         )z
-       |            GROUP BY rowkey
-       |                     ,new_cid
-       |                     ,$date
-       |                     ,deleted
-       |        ) b
-       |)
-       |""".stripMargin
+    if (runOld) { //存量
+      s"""
+         |(
+         |SELECT  new_cid
+         |        ,SUBSTR(CAST($date AS STRING),1,10) AS DATE
+         |        ,sum(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid,$date) inc_cnt
+         |        ,0 del_cnt
+         |        ,sum(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid) total_cnt
+         |        ,$tp type
+         |FROM    (
+         |         select * from (
+         |            SELECT  rowkey
+         |                    ,new_cid
+         |                    ,$date
+         |                    ,deleted
+         |            FROM    $project.inc_ads_$table x
+         |            WHERE   x.ds > 0
+         |            union all
+         |            SELECT  rowkey
+         |                    ,new_cid
+         |                    ,$date
+         |                    ,deleted
+         |            FROM    $project.ads_$table y
+         |            WHERE   y.ds > 0
+         |         )z
+         |            GROUP BY rowkey
+         |                     ,new_cid
+         |                     ,$date
+         |                     ,deleted
+         |        ) b
+         |)
+         |""".stripMargin
+    } else { //增量
+      s"""
+         |(
+         |select * from
+         |(
+         |SELECT  new_cid
+         |        ,SUBSTR(CAST(date AS STRING),1,10) AS DATE
+         |        ,count(rowkey) OVER(PARTITION BY new_cid,date) inc_cnt
+         |        ,0 del_cnt
+         |        ,count(rowkey) OVER(PARTITION BY new_cid) total_cnt
+         |        ,$tp TYPE
+         |        ,f
+         |FROM    (
+         |            SELECT  b.*,"1" f
+         |            FROM    (
+         |                        SELECT  rowkey
+         |                                ,cid as new_cid
+         |                                ,biz_date as date
+         |                        FROM    $project.ads_change_extract
+         |                        WHERE   ds = $ds
+         |                        AND     tn = '$table'
+         |                        AND     type = 'insert'
+         |                    ) a
+         |            JOIN    (
+         |                        SELECT  rowkey
+         |                                ,new_cid
+         |                                ,$date as date
+         |                        FROM    (
+         |                                    SELECT  *
+         |                                            ,row_number() OVER (PARTITION BY rowkey ORDER BY update_time DESC) num
+         |                                    FROM    (
+         |                                                SELECT  rowkey
+         |                                                        ,new_cid
+         |                                                        ,$date
+         |                                                        ,deleted
+         |                                                        ,update_time
+         |                                                FROM    $project.ads_$table
+         |                                                WHERE   ds = $remainDs
+         |                                                UNION ALL
+         |                                                SELECT  rowkey
+         |                                                        ,new_cid
+         |                                                        ,$date
+         |                                                        ,deleted
+         |                                                        ,update_time
+         |                                                FROM    $project.inc_ads_$table
+         |                                                WHERE   ds > $remainDs
+         |                                                AND     ds < $ds
+         |                                            ) c
+         |                                ) d
+         |                        WHERE   num = 1
+         |                    ) b
+         |            ON      a.rowkey = b.rowkey
+         |            UNION ALL
+         |            SELECT  rowkey
+         |                    ,cid new_cid
+         |                    ,biz_date date
+         |                    ,"0" f
+         |            FROM    $project.ads_change_extract
+         |            WHERE   ds = $ds
+         |            AND     tn = '$table'
+         |            AND     type = 'insert'
+         |        ) e
+         |)
+         |where f = "0"
+         |)
+         |""".stripMargin
+    }
+
   }
-}
+
+}