Переглянути джерело

公告增量,存量控制,cids去重

xufei 4 роки тому
батько
коміт
34f1a21a24

+ 7 - 0
pom.xml

@@ -301,6 +301,13 @@
             <artifactId>rest</artifactId>
             <version>5.5.3</version>
         </dependency>
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.72</version>
+        </dependency>
+
     </dependencies>
 
     <build>

+ 94 - 58
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCourtAnnouncement.scala

@@ -1,11 +1,13 @@
 package com.winhc.bigdata.spark.jobs
 
+import java.util
 import java.util.Collections
 
+import com.alibaba.fastjson.JSON
 import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping, JsonSerializable}
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import com.winhc.bigdata.spark.utils.BaseUtil._
-import org.apache.spark.sql.{Row, SparkSession}
+import org.apache.spark.sql.{DataFrame, Row, SparkSession}
 
 import scala.annotation.meta.getter
 import scala.collection.mutable
@@ -49,11 +51,16 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
 
   @(transient@getter) val spark: SparkSession = s
 
-  def calc() = {
+  def calc(runOld: Boolean = false) = {
     import spark.implicits._
 
     val inc_ads_company_tb_list = s"${project}.inc_ads_${tableName}_list" //增量ads_list表
-    val adsListDs = getPartion(inc_ads_company_tb_list, spark)
+
+    var adsListDs = getPartion(inc_ads_company_tb_list, spark)
+    //跑存量取第一个分区
+    if (runOld) {
+      adsListDs = getFirstPartion(inc_ads_company_tb_list, spark)
+    }
     val ads_eci_debtor_relation = s"${project}.ads_eci_debtor_relation" //债权全量表
     val debtorRelationDs = getPartion(ads_eci_debtor_relation, spark)
     val ads_address = s"${project}.inc_ads_${tableName}_address" //增量地址表
@@ -71,6 +78,7 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
          |            WHERE   ds = $adsListDs
          |            AND     LENGTH(litigant_name) > 4
          |            AND     announcement_type = '起诉状副本及开庭传票'
+         |            AND     publish_date >= '${atMonthsBefore(1)}'
          |        ) b
          |WHERE   num = 1
          |""".stripMargin)
@@ -104,6 +112,7 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
       s"""
          |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $ads_address  partition (ds=$adsListDs)
          |select
+         |md5(concat_ws('',rowkey,1,business_type,province_code,city_code)) as id,
          |'0' as flag,
          |rowkey,1 as address_type,province_code,city_code,county_code,business_type,publish_date,create_time
          |from t1
@@ -113,20 +122,23 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
     //原告
     sql(
       s"""
-         |SELECT  d.*,bg_cid
+         |SELECT  *
          |FROM    (
          |            SELECT  *
-         |            FROM    (
-         |                        SELECT  *
-         |                                ,ROW_NUMBER() OVER (PARTITION BY plaintiff_name ORDER BY publish_date DESC) num
-         |                                ,md5(CLEANUP(concat_ws('',plaintiff,litigant,announcement_type,publish_date,case_no,plaintiff_name))) AS rowkey_business
-         |                        FROM    $inc_ads_company_tb_list
-         |                        WHERE   ds = $adsListDs
-         |                        AND     announcement_type = '起诉状副本及开庭传票'
-         |                        AND     LENGTH(plaintiff_name) > 4
-         |                    ) x
-         |            WHERE   num = 1
-         |        ) d
+         |                    ,ROW_NUMBER() OVER (PARTITION BY plaintiff_name ORDER BY publish_date DESC) num
+         |                    ,md5(CLEANUP(concat_ws('',plaintiff,litigant,announcement_type,publish_date,case_no,plaintiff_name))) AS rowkey_business
+         |            FROM    $inc_ads_company_tb_list
+         |            WHERE   ds = $adsListDs
+         |            AND     announcement_type = '起诉状副本及开庭传票'
+         |            AND     LENGTH(plaintiff_name) > 4
+         |        ) x
+         |WHERE   num = 1 AND publish_date >= '${atMonthsBefore(3)}'
+         |""".stripMargin).cache().createOrReplaceTempView("announcement")
+
+    sql(
+      s"""
+         |SELECT  d.*,bg_cid
+         |FROM    announcement d
          |JOIN    (
          |            SELECT  bg_name,bg_cid
          |            FROM    $ads_eci_debtor_relation
@@ -183,19 +195,7 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
          |        ,yg_phones
          |        ,yg_emails
          |        ,rowkey_business
-         |FROM    (
-         |            SELECT  *
-         |            FROM    (
-         |                        SELECT  *
-         |                                ,ROW_NUMBER() OVER (PARTITION BY plaintiff_name ORDER BY publish_date DESC) num
-         |                                ,md5(CLEANUP(concat_ws('',plaintiff,litigant,announcement_type,publish_date,case_no,plaintiff_name))) AS rowkey_business
-         |                        FROM    $inc_ads_company_tb_list
-         |                        WHERE   ds = $adsListDs
-         |                        AND     announcement_type = '起诉状副本及开庭传票'
-         |                        AND     LENGTH(plaintiff_name) > 4
-         |                    ) x
-         |            WHERE   num = 1
-         |        ) d
+         |FROM    announcement d
          |JOIN    (
          |            SELECT  *
          |            FROM    $ads_eci_debtor_relation
@@ -209,7 +209,8 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
       s"""
          |insert into table $ads_address  partition (ds=$adsListDs)
          |SELECT
-         |        '1' as flag
+         |        md5(concat_ws('',rowkey_business,address_type,business_type,province_code,city_code)) as id
+         |        ,'1' as flag
          |        ,rowkey_business
          |        ,address_type
          |        ,province_code
@@ -220,7 +221,7 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
          |        ,create_time
          |FROM    (
          |            SELECT  *
-         |                    ,ROW_NUMBER() OVER (PARTITION BY rowkey_business,province_code,city_code,county_code,address_type ORDER BY publish_date DESC) num
+         |                    ,ROW_NUMBER() OVER (PARTITION BY rowkey_business,province_code,city_code,address_type ORDER BY publish_date DESC) num
          |            FROM    (
          |                        SELECT  rowkey_business
          |                                ,1 AS address_type
@@ -264,11 +265,10 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
         val m1: Map[String, String] = queryCompany(restClient, litigant_name)
         //动态变更内容
         val m2: Map[String, String] = CourtAnnouncement(r, Seq("plaintiff",
-          "litigant", "announcement_type", "court_name", "publish_date", "content", "litigant_name"))
+          "litigant", "announcement_type", "court_name", "publish_date", "content"))
         val dynamic_content = Json(DefaultFormats).write(m1 ++: m2)
         val publish_date = r.getAs[String]("publish_date") //动态变更时间
         val create_time = atMonthsBefore(0, "yyyy-MM-dd HH:mm:ss") //创建时间
-
         val province_code = m1.getOrElse("province_code", "") //省code
         val city_code = m1.getOrElse("city_code", "") //市code
         val county_code = m1.getOrElse("county_code", "") //区code
@@ -287,7 +287,6 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
   }
 
   def trans2(r: Row) = {
-    //    try {
     import org.json4s.DefaultFormats
     val rowkey_business = r.getAs[String]("rowkey_business") //案源机会主键
     val title = "" //标题
@@ -301,20 +300,12 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
     val business_type_name = "0" //动态类型name
     //动态变更内容
     val m2: Map[String, String] = CourtAnnouncement(r, Seq("plaintiff",
-      "litigant", "announcement_type", "court_name", "publish_date", "content", "litigant_name"))
+      "litigant", "announcement_type", "court_name", "publish_date", "content"))
     val dynamic_content = Json(DefaultFormats).write(m2)
     val publish_date = r.getAs[String]("publish_date") //动态变更时间
     val create_time = atMonthsBefore(0, "yyyy-MM-dd HH:mm:ss") //创建时间
-
     (rowkey_business, title, plaintiff, litigant, plaintiff_name, plaintiff_cid, label, business_id, business_type,
       business_type_name, dynamic_content, publish_date, create_time)
-    //    } catch {
-    //      case e: Exception => {
-    //        logWarning(r.toString())
-    //        logError(e.getMessage, e)
-    //        null
-    //      }
-    //    }
   }
 
   def regfun() = {
@@ -322,7 +313,7 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
     company_split()
   }
 
-  def preCalc() = {
+  def preCalc(): Boolean = {
 
     val inc_ods_company_tb = s"${project}.inc_ods_$tableName" //增量ods表
     val inc_ads_company_tb = s"${project}.inc_ads_$tableName" //增量ads表
@@ -365,6 +356,12 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
          |firstDsIncOds:$firstDsIncOds
          |""".stripMargin)
 
+    //增量ods和增量ads最后一个分区相等,跳出
+    if (lastDsIncOds.equals(lastDsIncAds)) {
+      println("inc_ods equals inc_ads ds ,please delete last ds !!!")
+      return false
+    }
+
     //增量去重解析case_no
     sql(
       s"""
@@ -450,6 +447,8 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
          |WHERE   trim(b.litigant_name) <> '' and trim(c.plaintiff_name) <> ''
          |)c
          |""".stripMargin)
+
+    true
   }
 
   def queryCompany(restClient: RestClient, companyName: String) = {
@@ -463,7 +462,14 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
          |    "term": {
          |      "cname.value.keyword": "${BaseUtil.cleanup(companyName)}"
          |    }
-         |  }
+         |  },
+         |  "sort": [
+         |    {
+         |      "company_type": {
+         |        "order": "asc"
+         |      }
+         |    }
+         |  ]
          |}
          |""".stripMargin
     val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
@@ -473,19 +479,20 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
       "/winhc-company/company/_search",
       Collections.emptyMap[String, String](),
       entity)
-
     val en = indexResponse.getEntity
     val res = EntityUtils.toString(en)
-    val list = getIndexResult(res)
+    import scala.collection.JavaConverters._
+    val list = getIndexResult2(res)
     if (list.nonEmpty) {
       val id = list.head("_id").asInstanceOf[String]
-      val source = list.head("_source").asInstanceOf[Map[String, Any]]
-      val province_code = source("province_code").asInstanceOf[String]
-      val city_code = source("city_code").asInstanceOf[String]
-      val county_code = source("county_code").asInstanceOf[String]
-      val reg_capital = source("reg_capital").asInstanceOf[String]
-      val estiblish_time = source("estiblish_time").asInstanceOf[String]
-      val phones = source("phones").asInstanceOf[List[String]].mkString(",")
+      val source: util.Map[String, Any] = list.head("_source").asInstanceOf[util.Map[String, Any]]
+      val province_code = source.get("province_code").asInstanceOf[String]
+      val city_code = source.get("city_code").asInstanceOf[String]
+      val county_code = source.get("county_code").asInstanceOf[String]
+      val reg_capital = source.get("reg_capital").asInstanceOf[String]
+      val estiblish_time = source.get("estiblish_time").asInstanceOf[String]
+      val phones = source.get("phones").asInstanceOf[util.List[String]].asScala.mkString(",")
+
       Map(
         "id" -> id,
         "province_code" -> province_code,
@@ -500,11 +507,38 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
     }
   }
 
+  def getIndexResult2(json: String) = {
+    import scala.collection.JavaConverters._
+    JSON.parseObject(json).getJSONObject("hits").getJSONArray("hits").toArray().map(m => m.asInstanceOf[util.Map[String, Any]]).map(_.asScala).toList
+  }
 
 }
 
 object CompanyCourtAnnouncement {
   def main(args: Array[String]): Unit = {
+    var project = ""
+    var table = ""
+    var runOld = false
+
+    if (args.length == 2) {
+      val Array(project1, table1) = args
+      project = project1
+      table = table1
+    } else if (args.length == 3) {
+      val Array(project1, table1, remain) = args
+      project = project1
+      table = table1
+      if (remain.equals("1"))
+        runOld = true
+    } else {
+      println("please set project,table...")
+      sys.exit(-1)
+    }
+
+    println(
+      s"""
+         |project: $project| table: $table| runOld: $runOld
+         |""".stripMargin)
 
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
@@ -512,13 +546,15 @@ object CompanyCourtAnnouncement {
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 
-    val project = "winhc_eci_dev"
-    val table = "company_court_announcement"
-
     val announcement = CompanyCourtAnnouncement(spark, project, table)
     announcement.regfun()
-    announcement.preCalc()
-    announcement.calc()
+    //是否跑全量数据
+    if (!runOld) {
+      val flag = announcement.preCalc()
+      //增量没更新返回
+      if(!flag) return
+    }
+    announcement.calc(runOld)
     spark.stop()
   }
 

+ 40 - 35
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -106,52 +106,57 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     //替换cid,去重,复制老数据
     val df1 = sql(
       s"""
-         INSERT OVERWRITE TABLE $inc_ads_company_tb_list PARTITION(ds='$lastDsIncOds')
-         |SELECT  CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
+         |INSERT OVERWRITE TABLE $inc_ads_company_tb_list PARTITION(ds='$lastDsIncOds')
+         |SELECT  rowkey
          |        ,flag
          |        ,new_cid
          |        ,${sublistTableFieldName.mkString(",")}
          |FROM    (
-         |            SELECT  "0" AS flag
-         |                    ,CAST(new_cid AS STRING) AS new_cid
+         |            SELECT  CONCAT_WS( '_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
+         |                    ,flag
+         |                    ,new_cid
          |                    ,${sublistTableFieldName.mkString(",")}
          |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
          |            FROM    (
-         |                        SELECT  *
+         |                        SELECT  "0" AS flag
+         |                                ,CAST(new_cid AS STRING) AS new_cid
+         |                                ,${sublistTableFieldName.mkString(",")}
          |                        FROM    (
-         |                                    SELECT  c.*
-         |                                            ,coalesce(d.new_cid,c.cid) AS new_cid
-         |                                    FROM    incr_tb c
-         |                                    LEFT JOIN mapping d
-         |                                    ON      c.cid = d.cid
-         |                                ) e
-         |                    ) f
-         |            UNION ALL
-         |            SELECT  "1" AS flag
-         |                    ,CAST(new_cid AS STRING) AS new_cid
-         |                    ,${sublistTableFieldName.mkString(",")}
-         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
-         |            FROM    (
-         |                        SELECT  a.new_cid
+         |                                    SELECT  *
+         |                                    FROM    (
+         |                                                SELECT  c.*
+         |                                                        ,coalesce(d.new_cid,c.cid) AS new_cid
+         |                                                FROM    incr_tb c
+         |                                                LEFT JOIN mapping d
+         |                                                ON      c.cid = d.cid
+         |                                            ) e
+         |                                ) f
+         |                        UNION ALL
+         |                        SELECT  "1" AS flag
+         |                                ,CAST(new_cid AS STRING) AS new_cid
          |                                ,${sublistTableFieldName.mkString(",")}
-         |                        FROM    mapping a
-         |                        JOIN    (
-         |                                    SELECT  new_cid AS cid
-         |                                            ,${sublistTableFieldName.mkString(",")}
-         |                                    FROM    ${inc_ads_company_tb_list}
-         |                                    WHERE   ds >= ${runDs}
-         |                                    UNION ALL
-         |                                    SELECT  new_cid AS cid
+         |                        FROM    (
+         |                                    SELECT  a.new_cid
          |                                            ,${sublistTableFieldName.mkString(",")}
-         |                                    FROM    ${ads_company_tb_list}
-         |                                    WHERE   ds >= ${remainDs}
-         |                                ) b
-         |                        ON      a.cid = b.cid
-         |                    ) c
-         |        ) e
+         |                                    FROM    mapping a
+         |                                    JOIN    (
+         |                                                SELECT  new_cid AS cid
+         |                                                        ,${sublistTableFieldName.mkString(",")}
+         |                                                FROM    ${inc_ads_company_tb_list}
+         |                                                WHERE   ds >= ${runDs}
+         |                                                UNION ALL
+         |                                                SELECT  new_cid AS cid
+         |                                                        ,${sublistTableFieldName.mkString(",")}
+         |                                                FROM    ${ads_company_tb_list}
+         |                                                WHERE   ds >= ${remainDs}
+         |                                            ) b
+         |                                    ON      a.cid = b.cid
+         |                                ) c
+         |                    ) e
+         |            WHERE   cleanup(CONCAT_WS('',${cols_md5.mkString(",")})) IS NOT NULL
+         |            AND     trim(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))) <> ''
+         |        ) x
          |WHERE   num = 1
-         |AND     cleanup(CONCAT_WS('',${cols_md5.mkString(",")})) IS NOT NULL
-         |AND     trim(cleanup(CONCAT_WS('',${cols_md5.mkString(",")}))) <> ''
          |""".stripMargin)
 
     //主表按照id去重落库

+ 9 - 2
src/main/scala/com/winhc/bigdata/spark/utils/EsRestUtils.scala

@@ -54,7 +54,14 @@ object EsRestUtils {
          |    "term": {
          |      "cname.value.keyword": "${BaseUtil.cleanup(companyName)}"
          |    }
-         |  }
+         |  },
+         |  "sort": [
+         |    {
+         |      "company_type": {
+         |        "order": "asc"
+         |      }
+         |    }
+         |  ]
          |}
          |""".stripMargin
     val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
@@ -92,7 +99,7 @@ object EsRestUtils {
 
   def main(args: Array[String]): Unit = {
     val restClient = getRestClient()
-    val id = getCidByCompanyName(restClient, "长春市大轮食品商贸有限公司")
+    val id = getCidByCompanyName(restClient, "华为技术有限公司")
     println(id)
     restClient.close()
   }