Procházet zdrojové kódy

Merge remote-tracking branch 'origin/master'

lyb před 4 roky
rodič
revize
a64b82be91
19 změnil soubory, kde provedl 1645 přidání a 230 odebrání
  1. 1 1
      src/main/resources/env.yaml
  2. 18 8
      src/main/scala/com/winhc/bigdata/spark/const/CaseChanceConst.scala
  3. 74 18
      src/main/scala/com/winhc/bigdata/spark/implicits/CompanyIndexSave2EsHelper.scala
  4. 86 62
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCourtAnnouncement.scala
  5. 89 5
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncCompany2Es.scala
  6. 14 7
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIndexSave2Es.scala
  7. 49 17
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala
  8. 23 3
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/CompanyChangeHandle.scala
  9. 273 12
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/Inc_eci_debtor_relation.scala
  10. 330 0
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/creditor_info_add_other.scala
  11. 3 3
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/eci_good_news.scala
  12. 74 0
      src/main/scala/com/winhc/bigdata/spark/jobs/increment/inc_phx_cid_ads.scala
  13. 163 0
      src/main/scala/com/winhc/bigdata/spark/jobs/increment/script/ads_company_land_transfer_ods.sql
  14. 192 81
      src/main/scala/com/winhc/bigdata/spark/model/CompanyBidScore.scala
  15. 125 0
      src/main/scala/com/winhc/bigdata/spark/model/CompanyEmploymentScore.scala
  16. 49 3
      src/main/scala/com/winhc/bigdata/spark/model/CompanyIntellectualsScore.scala
  17. 49 4
      src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala
  18. 31 5
      src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala
  19. 2 1
      src/main/scala/com/winhc/bigdata/spark/utils/ChangeExtractUtils.scala

+ 1 - 1
src/main/resources/env.yaml

@@ -1,5 +1,5 @@
 profile:
-  activate: prod
+  activate: dev
 
 ---
 env:

+ 18 - 8
src/main/scala/com/winhc/bigdata/spark/const/CaseChanceConst.scala

@@ -10,11 +10,16 @@ object CaseChanceConst {
   val TABLE_2_TYPE = Map(
     "" -> "1"
     , "" -> "2"
+    , "company_bid_list" -> "3"
+    , "company_land_transfer" -> "3"
     , "company_tm" -> "3"
     , "company_patent_list" -> "3"
     , "company_certificate" -> "3"
     , "company_copyright_works_list" -> "3"
     , "company_copyright_reg_list" -> "3"
+    , "company_employment" -> "3"
+    , "company_land_announcement" -> "3"
+    , "company_land_publicity" -> "3"
     , "" -> "4"
   )
 
@@ -23,16 +28,21 @@ object CaseChanceConst {
   val CHANCE_DYNAMIC_TYPE = Map(
     /*"" -> "3-1" //企业增资
     , "" -> "3-2" //企业新增对外投资
-    , "" -> "3-3" //新增招投标
+    , "company_bid_list" -> "3-3" //新增招投标
     , "" -> "3-4" //新增招聘
     , "" -> "3-5" //地块公示
     , "" -> "3-6" //购地信息
-    , "" -> "3-7" //土地转让
-
-    ,*/ "company_tm" -> "3-8" //知识产权-商标
-    , "company_patent_list" -> "3-9" //专利
-    , "company_certificate" -> "3-10" //资质证书   X
-    , "company_copyright_works_list" -> "3-11" //作品著作权
-    , "company_copyright_reg_list" -> "3-12" //软件著作权
+    , "company_land_transfer" -> "3-7" //土地转让
+    ,*/
+    "company_bid_list" -> "3" //新增招投标
+    , "company_employment" -> "4" //新增招聘
+    , "company_land_publicity" -> "5" //地块公示
+    , "company_land_announcement" -> "6" //购地信息
+    , "company_land_transfer" -> "7" //土地转让
+    , "company_tm" -> "8" //知识产权-商标
+    , "company_patent_list" -> "9" //专利
+    , "company_certificate" -> "10" //资质证书   X
+    , "company_copyright_works_list" -> "11" //作品著作权
+    , "company_copyright_reg_list" -> "12" //软件著作权
   )
 }

+ 74 - 18
src/main/scala/com/winhc/bigdata/spark/implicits/CompanyIndexSave2EsHelper.scala

@@ -1,6 +1,10 @@
 package com.winhc.bigdata.spark.implicits
 
+import java.time.LocalDate
+import java.time.format.DateTimeFormatter
+
 import org.apache.commons.lang3.StringUtils
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.DataFrame
 import org.apache.spark.sql.functions.col
 import org.elasticsearch.spark._
@@ -33,17 +37,29 @@ object CompanyIndexSave2EsHelper {
   )
 
   implicit class DataFrameEnhancer(df: DataFrame) {
-    def companyIndexSave2Es(): Unit = {
-      import df.sparkSession.implicits._
+    def companyIndexSave2Es(category_code: Broadcast[Map[String, Seq[String]]], area_code: Broadcast[Map[String, Seq[String]]]): Unit = {
       df.select(companyIndexFields.map(column => col(column).cast("string")): _*)
         .rdd.map(r => {
         val map = companyIndexFields.map(f => {
           (f, r.getAs[String](f))
         }).toMap
-        getEsDoc(map)
+        getEsDoc(map, category_code, area_code)
       })
-        .saveToEsWithMeta("winhc-company-v5/company")
+        .saveToEsWithMeta("winhc-company-v6/company")
     }
+
+   /* def test2Es(spark:SparkSession,category_code: Broadcast[Map[String, Seq[String]]], area_code: Broadcast[Map[String, Seq[String]]]): Unit = {
+      val rdd = df.select(companyIndexFields.map(column => col(column).cast("string")): _*)
+        .rdd.map(r => {
+        val map = companyIndexFields.map(f => {
+          (f, r.getAs[String](f))
+        }).toMap
+        getEsDoc(map, category_code, area_code)
+      }).map(r=>Row(r._1,r._2.estiblish_time))
+      spark.createDataFrame(rdd,StructType(Array(StructField("cid",StringType),StructField("time",StringType))))
+       .write.mode("overwrite")
+       .insertInto("winhc_eci_dev.xjk_test_save_2_es_0721")
+    }*/
   }
 
   case class Geo(lat: String, lon: String)
@@ -59,11 +75,17 @@ object CompanyIndexSave2EsHelper {
                          , reg_status: String
                          //                         , geo: Geo
                          , province_code: String
+                         , province_name: String
                          , city_code: String
+                         , city_name: String
                          , county_code: String
+                         , county_name: String
                          , reg_location: String
                          , estiblish_time: String
                          , category_code: String
+                         , category_first: String
+                         , category_second: String
+                         , category_third: String
                          , reg_capital: String
                          , reg_capital_amount: String
                          , phones: Seq[String]
@@ -71,8 +93,9 @@ object CompanyIndexSave2EsHelper {
                        )
 
   val pattern = "[^\\u4e00-\\u9fa50-9a-zA-Z]".r
+  //  val time_pattern = ""
 
-  def getEsDoc(map: Map[String, String]): (String, CompanyDoc) = {
+  def getEsDoc(map: Map[String, String], category_map: Broadcast[Map[String, Seq[String]]], area_map: Broadcast[Map[String, Seq[String]]]): (String, CompanyDoc) = {
     val lat = map("lat")
     val lng = map("lng")
     var geo: String = null
@@ -80,19 +103,32 @@ object CompanyIndexSave2EsHelper {
       geo = lat + "," + lng
     }
 
-    val c = get_area_code(map("area_code"))
+    val c = get_area_code(map("area_code"), area_map)
     val province_code = c._1
-    val city_code = c._2
-    val county_code = c._3
+    val province_name = c._2
+    val city_code = c._3
+    val city_name = c._4
+    val county_code = c._5
+    val county_name = c._6
+
+    val category = get_category_code(map("category_code"), category_map)
+    val category_code = category._1
+    val category_first = category._2
+    val category_second = category._3
+    val category_third = category._4
+
     val et = map("estiblish_time")
-    var time = if (StringUtils.isNotBlank(et)){
+    var time = if (StringUtils.isNotBlank(et)) {
       if (et.contains(" ")) {
         et.split(" ")(0)
-      }else{
+      } else {
         et
       }
-    }  else null
+    } else null
 
+    if(!validateDf(time)){
+      time = null
+    }
 
     val doc = CompanyDoc(
       cname = getCompanyName(map("name"))
@@ -103,11 +139,17 @@ object CompanyIndexSave2EsHelper {
       , reg_status = map("reg_status")
       //      , geo = Geo(lat = lng, lon = lat)
       , province_code = province_code
+      , province_name = province_name
       , city_code = city_code
+      , city_name = city_name
       , county_code = county_code
+      , county_name = county_name
       , reg_location = map("reg_location")
       , estiblish_time = time
-      , category_code = map("category_code")
+      , category_code = category_code
+      , category_first = category_first
+      , category_second = category_second
+      , category_third = category_third
       , reg_capital = map("reg_capital")
       , reg_capital_amount = map("reg_capital_amount")
       , phones = getSplit(map("phones"))
@@ -141,15 +183,29 @@ object CompanyIndexSave2EsHelper {
     }
   }
 
-  private def get_area_code(code: String): (String, String, String) = {
+  private def get_area_code(code: String, area_code: Broadcast[Map[String, Seq[String]]]): (String, String, String, String, String, String) = {
     if (StringUtils.isNotBlank(code) && code.trim.length == 6) {
       val c = code.trim
-      (c.substring(0, 2), c.substring(2, 4), c.substring(4, 6))
+      (c.substring(0, 2), get_seq_by_index(area_code, c, 0), c.substring(2, 4), get_seq_by_index(area_code, c, 1), c.substring(4, 6), get_seq_by_index(area_code, c, 2))
+    } else {
+      (null, null, null, null, null, null)
+    }
+  }
+
+  private def get_category_code(code: String, category_code: Broadcast[Map[String, Seq[String]]]): (String, String, String, String) = {
+    if (StringUtils.isNotBlank(code)) {
+      val c = code.trim
+      (c, get_seq_by_index(category_code, c, 0), get_seq_by_index(category_code, c, 1), get_seq_by_index(category_code, c, 2))
     } else {
-      (null, null, null)
+      (null, null, null, null)
     }
   }
 
+  def get_seq_by_index(area_code: Broadcast[Map[String, Seq[String]]], code: String, index: Int): String = {
+    val c = area_code.value.getOrElse(code, null)
+    if (c == null) null else if (c(index) != null) c(index).trim else null
+  }
+
   private def getSplit(str: String): Seq[String] = {
     if (StringUtils.isNotBlank(str)) {
       str.split("\t;\t").filter(StringUtils.isNotBlank).toSet.toList
@@ -158,11 +214,11 @@ object CompanyIndexSave2EsHelper {
     }
   }
 
-  private val DATE_TIME_FORMAT = "yyyy-MM-dd"
+  private val df = DateTimeFormatter.ofPattern("yyyy-MM-dd")
 
   private def validateDf(str: String): Boolean = try {
     if (StringUtils.isNotBlank(str)) {
-      java.time.LocalDateTime.parse(str, java.time.format.DateTimeFormatter.ofPattern(DATE_TIME_FORMAT))
+      LocalDate.parse(str, df)
       true
     } else {
       false
@@ -174,6 +230,6 @@ object CompanyIndexSave2EsHelper {
   }
 
   def main(args: Array[String]): Unit = {
-    println(validateDf("2010-03-03 00:00:00"))
+    println(validateDf("2010-03-03"))
   }
 }

+ 86 - 62
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCourtAnnouncement.scala

@@ -5,22 +5,22 @@ import java.util.Collections
 
 import com.alibaba.fastjson.JSON
 import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping, JsonSerializable}
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import com.winhc.bigdata.spark.utils.BaseUtil._
-import org.apache.spark.sql.{DataFrame, Row, SparkSession}
-
-import scala.annotation.meta.getter
-import scala.collection.mutable
-import com.winhc.bigdata.spark.utils.EsRestUtils.{getIndexResult, getRestClient}
+import com.winhc.bigdata.spark.utils.EsRestUtils.getRestClient
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.http.entity.ContentType
 import org.apache.http.nio.entity.NStringEntity
 import org.apache.http.util.EntityUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.{Row, SparkSession}
 import org.elasticsearch.client.RestClient
 import org.json4s.jackson.Json
 
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
 /**
  * @Description: 法院公告
  * @author π
@@ -137,14 +137,14 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
 
     sql(
       s"""
-         |SELECT  d.*,bg_cid
+         |SELECT  d.*,bg_cid,bg_city_name
          |FROM    announcement d
          |JOIN    (
-         |            SELECT  bg_name,bg_cid
+         |            SELECT  bg_name,bg_cid,bg_city_name
          |            FROM    $ads_eci_debtor_relation
          |            WHERE   ds = $debtorRelationDs
          |            AND     deleted = 0
-         |            group by bg_name,bg_cid
+         |            group by bg_name,bg_cid,bg_city_name
          |        ) e
          |ON      cleanup(d.plaintiff_name) = cleanup(e.bg_name)
          |""".stripMargin).map(r => {
@@ -258,11 +258,14 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
         val plaintiff = r.getAs[String]("plaintiff") //原告
         val litigant = r.getAs[String]("litigant") //当事人
         val litigant_name = r.getAs[String]("litigant_name") //被告企业
-        val label: String = Json(DefaultFormats).write(CourtAnnouncement(r, Seq("announcement_type", "publish_date"))) //标签列表
         val business_id = r.getAs[String]("rowkey") //业务主键id
         val business_type = "8" //动态类型
         val business_type_name = "0" //动态类型name
-        val m1: Map[String, String] = queryCompany(restClient, litigant_name)
+        val m1: Map[String, String] = EsQuery.queryCompany(restClient, litigant_name)
+        //标签列表
+        val label: String = Json(DefaultFormats).write(
+          CourtAnnouncement(r, Seq("announcement_type", "publish_date")) ++ Map("city_name" -> m1("city_name"))
+        )
         //动态变更内容
         val m2: Map[String, String] = CourtAnnouncement(r, Seq("plaintiff",
           "litigant", "announcement_type", "court_name", "publish_date", "content"))
@@ -294,7 +297,10 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
     val litigant = r.getAs[String]("litigant") //当事人
     val plaintiff_name = r.getAs[String]("plaintiff_name") //原告企业
     val plaintiff_cid = r.getAs[String]("bg_cid") //原告企业
-    val label: String = Json(DefaultFormats).write(CourtAnnouncement(r, Seq("announcement_type", "publish_date"))) //标签列表
+    val city_name = r.getAs[String]("bg_city_name") //原告企业
+    val label: String = Json(DefaultFormats).write(
+      CourtAnnouncement(r, Seq("announcement_type", "publish_date")) ++ Map("city_name" -> city_name)
+    ) //标签列表
     val business_id = r.getAs[String]("rowkey") //业务主键id
     val business_type = "7" //动态类型
     val business_type_name = "0" //动态类型name
@@ -451,12 +457,64 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
     true
   }
 
+
+
+}
+
+object CompanyCourtAnnouncement {
+  def main(args: Array[String]): Unit = {
+    var project = ""
+    var table = ""
+    var runOld = false
+
+    if (args.length == 2) {
+      val Array(project1, table1) = args
+      project = project1
+      table = table1
+    } else if (args.length == 3) {
+      val Array(project1, table1, remain) = args
+      project = project1
+      table = table1
+      if (remain.equals("1"))
+        runOld = true
+    } else {
+      println("please set project,table...")
+      sys.exit(-1)
+    }
+
+    println(
+      s"""
+         |project: $project| table: $table| runOld: $runOld
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    val announcement = CompanyCourtAnnouncement(spark, project, table)
+    announcement.regfun()
+    //是否跑全量数据
+    if (!runOld) {
+      val flag = announcement.preCalc()
+      //增量没更新返回
+      if(!flag) return
+    }
+    announcement.calc(runOld)
+    spark.stop()
+  }
+
+}
+
+object EsQuery{
   def queryCompany(restClient: RestClient, companyName: String) = {
     val query =
       s"""
          |{
-         |  "_source": {
-         |     "includes": [ "_id","province_code", "city_code","county_code","reg_capital","estiblish_time","phones"]
+         |   "_source": {
+         |     "includes": [ "_id","province_code", "city_code","county_code","reg_capital","estiblish_time","phones",
+         |     "province_name","city_name","county_name","category_first","category_second","category_third"]
          |   },
          |  "query": {
          |    "term": {
@@ -493,6 +551,13 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
       val estiblish_time = source.get("estiblish_time").asInstanceOf[String]
       val phones = source.get("phones").asInstanceOf[util.List[String]].asScala.mkString(",")
 
+      val province_name = source.get("province_name").asInstanceOf[String]
+      val city_name = source.get("city_name").asInstanceOf[String]
+      val county_name = source.get("county_name").asInstanceOf[String]
+      val category_first = source.get("category_first").asInstanceOf[String]
+      val category_second = source.get("category_second").asInstanceOf[String]
+      val category_third = source.get("category_third").asInstanceOf[String]
+
       Map(
         "id" -> id,
         "province_code" -> province_code,
@@ -500,7 +565,13 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
         "county_code" -> county_code,
         "reg_capital" -> reg_capital,
         "estiblish_time" -> estiblish_time,
-        "phones" -> phones
+        "phones" -> phones,
+        "province_name" -> province_name,
+        "city_name" -> city_name,
+        "county_name" -> county_name,
+        "category_first" -> category_first,
+        "category_second" -> category_second,
+        "category_third" -> category_third
       )
     } else {
       Map.empty[String, String]
@@ -511,53 +582,6 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
     import scala.collection.JavaConverters._
     JSON.parseObject(json).getJSONObject("hits").getJSONArray("hits").toArray().map(m => m.asInstanceOf[util.Map[String, Any]]).map(_.asScala).toList
   }
-
-}
-
-object CompanyCourtAnnouncement {
-  def main(args: Array[String]): Unit = {
-    var project = ""
-    var table = ""
-    var runOld = false
-
-    if (args.length == 2) {
-      val Array(project1, table1) = args
-      project = project1
-      table = table1
-    } else if (args.length == 3) {
-      val Array(project1, table1, remain) = args
-      project = project1
-      table = table1
-      if (remain.equals("1"))
-        runOld = true
-    } else {
-      println("please set project,table...")
-      sys.exit(-1)
-    }
-
-    println(
-      s"""
-         |project: $project| table: $table| runOld: $runOld
-         |""".stripMargin)
-
-    val config = mutable.Map(
-      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
-    )
-    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-
-    val announcement = CompanyCourtAnnouncement(spark, project, table)
-    announcement.regfun()
-    //是否跑全量数据
-    if (!runOld) {
-      val flag = announcement.preCalc()
-      //增量没更新返回
-      if(!flag) return
-    }
-    announcement.calc(runOld)
-    spark.stop()
-  }
-
 }
 
 

+ 89 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncCompany2Es.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.jobs
 
 import com.winhc.bigdata.spark.config.{EsConfig, HBaseConfig}
 import com.winhc.bigdata.spark.const.BaseConst
+import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
@@ -67,11 +68,25 @@ object CompanyIncCompany2Es {
     , "UPDATE_TIME"
     , "DELETED"
   )
+  val outFields_Human = Seq(
+    "NEW_CID"
+    ,"CID"
+    ,"ID"
+    ,"COMPANY_NAME"
+    ,"HUMAN_NAME"
+    ,"HID"
+    ,"HUMAN_PID"
+    ,"STATUS"
+    ,"CREATE_TIME"
+    ,"UPDATE_TIME"
+    ,"DELETED"
+  )
 
-  case class Company2Es(s: SparkSession, project: String, bizDate: String) extends LoggingUtils {
+  case class Company2Es(s: SparkSession, project: String, bizDate: String) extends LoggingUtils with BaseFunc {
     @(transient@getter) val spark: SparkSession = s
 
     def calc() {
+      val code = code2Name()
       val partition = bizDate.replaceAll("\\-", "")
       if (partition.length != 8) {
         println("biz date is error!")
@@ -88,7 +103,7 @@ object CompanyIncCompany2Es {
         sys.exit(-999)
       }
 
-      val companyCols = spark.table("ads_company").columns
+      val companyCols = spark.table(s"${project}.ads_company").columns
         .filter(!_.equals("ds"))
         .seq
 
@@ -119,8 +134,6 @@ object CompanyIncCompany2Es {
            |FROM
            |    tmp_company_inc
            |""".stripMargin)
-
-      import spark.implicits._
       //写出到hbase
       import org.apache.spark.sql.functions.col
       val jobConf = HBaseConfig.HBaseOutputJobConf("COMPANY")
@@ -140,7 +153,77 @@ object CompanyIncCompany2Es {
 
       //写出到es
       import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper._
-      stringDf.companyIndexSave2Es()
+      stringDf.companyIndexSave2Es(code._1, code._2)
+
+    }
+  }
+  case class Company_Human_Relation2HBase(s: SparkSession, project: String, bizDate: String) extends LoggingUtils {
+    @(transient@getter) val spark: SparkSession = s
+
+    def calc() {
+      val partition = bizDate.replaceAll("\\-", "")
+      if (partition.length != 8) {
+        println("biz date is error!")
+        sys.exit(-99)
+      }
+      val inc_ods_partitions = BaseUtil.getPartitions(s"${project}.inc_ods_company_human_relation", spark)
+      val end_partition = if (inc_ods_partitions.isEmpty) partition else inc_ods_partitions.last
+
+      val inc_ads_partitions = BaseUtil.getPartitions(s"${project}.inc_ads_company_human_relation", spark)
+      val start_partition = if (inc_ads_partitions.isEmpty) '0' else inc_ads_partitions.last
+
+      if (start_partition.equals(end_partition)) {
+        println("start_partition == end_partition")
+        sys.exit(-999)
+      }
+
+      val companyCols = spark.table(s"${project}.ads_company_human_relation").columns
+        .filter(!_.equals("ds"))
+        .seq
+
+      //读取数据
+      // 去除数据本身重复
+      val df = sql(
+        s"""
+           |SELECT ${companyCols.mkString(",")}
+           |FROM    (
+           |            SELECT  CONCAT_WS("_",new_cid,hid) AS rowkey,a.*
+           |                    ,row_number() OVER (PARTITION BY a.cid,a.hid,a.human_pid ORDER BY update_time DESC) c
+           |            FROM    (
+           |                        SELECT  *,cid as new_cid
+           |                        FROM    $project.inc_ods_company_human_relation
+           |                        WHERE   ds > $start_partition and ds <= $end_partition and cid is not null and hid is not null
+           |                    ) as a
+           |        ) AS tmp
+           |WHERE   tmp.c = 1
+           |""".stripMargin)
+
+      df.cache().createOrReplaceTempView("tmp_company_human_relation_inc")
+
+      //写出到ads
+      sql(
+        s"""
+           |INSERT ${if (BaseUtil.isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.inc_ads_company_human_relation PARTITION(ds='$end_partition')
+           |SELECT ${companyCols.mkString(",")}
+           |FROM
+           |    tmp_company_human_relation_inc
+           |""".stripMargin)
+      //写出到hbase
+      import org.apache.spark.sql.functions.col
+      val jobConf = HBaseConfig.HBaseOutputJobConf("COMPANY_HUMAN_RELATION")
+      val stringDf = df.select(companyCols.map(column => col(column).cast("string")): _*)
+      stringDf.rdd.map(row => {
+        val id = row.getAs[String]("rowkey")
+        val put = new Put(Bytes.toBytes(id))
+        for (f <- outFields_Human) {
+          val v = row.getAs[String](f.toLowerCase)
+          if (v != null) {
+            put.addColumn(BaseConst.F_BYTES, Bytes.toBytes(f), Bytes.toBytes(v))
+          }
+        }
+        (new ImmutableBytesWritable, put)
+      }).filter(_ != null)
+        .saveAsHadoopDataset(jobConf)
 
     }
   }
@@ -161,6 +244,7 @@ object CompanyIncCompany2Es {
     val spark = SparkUtils.InitEnv("company2Es", config)
 
     Company2Es(spark, project, bizDate).calc
+    Company_Human_Relation2HBase(spark, project, bizDate).calc
     spark.stop()
   }
 }

+ 14 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIndexSave2Es.scala

@@ -1,6 +1,7 @@
 package com.winhc.bigdata.spark.jobs
 
 import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
 import org.apache.spark.sql.SparkSession
 
@@ -14,17 +15,23 @@ import scala.collection.mutable
  */
 object CompanyIndexSave2Es {
 
-  case class CompanyIndexSave2Es_all_inc(s: SparkSession, project: String) extends LoggingUtils {
+  case class CompanyIndexSave2Es_all_inc(s: SparkSession, project: String) extends LoggingUtils with BaseFunc {
     @(transient@getter) val spark: SparkSession = s
 
+    val tmp_table = "xjk_company_test_0721_step2"
 
     def calc(): Unit = {
       import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper._
-
       val all_company_max_ds = getLastPartitionsOrElse(s"${project}.ads_company", "0")
+      val code = code2Name()
 
-    /*  println(
+      sql(
+        s"""
+           |DROP TABLE IF EXISTS winhc_eci_dev.$tmp_table
+           |""".stripMargin)
+      sql(
         s"""
+           |CREATE TABLE IF NOT EXISTS winhc_eci_dev.$tmp_table AS
            |SELECT  ${companyIndexFields.map(f => if (f.eq("estiblish_time")) "date_format(tmp.estiblish_time,'yyyy-MM-dd') estiblish_time" else "tmp." + f).mkString(",")}
            |FROM    (
            |            SELECT  ${companyIndexFields.mkString(",")},update_time
@@ -43,12 +50,12 @@ object CompanyIndexSave2Es {
            |        ) AS tmp
            |WHERE   tmp.num = 1
            |""".stripMargin)
-      */
       sql(
         s"""
-           |select * from winhc_test_dev.xjk_tmp_company_all
+           |SELECT  *
+           |FROM    winhc_eci_dev.$tmp_table
            |""".stripMargin)
-        .companyIndexSave2Es()
+        .companyIndexSave2Es(code._1, code._2)
     }
   }
 
@@ -57,7 +64,7 @@ object CompanyIndexSave2Es {
     val map = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
       "spark.debug.maxToStringFields" -> "200",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "2"
+      "spark.hadoop.odps.spark.local.partition.amt" -> "3800"
     )
 
     val spark = SparkUtils.InitEnv("CompanyIndexSave2Es", map)

+ 49 - 17
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -3,6 +3,7 @@ package com.winhc.bigdata.spark.jobs.chance
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.functions.col
 import org.apache.spark.sql.types.{MapType, StringType, StructField, StructType}
@@ -157,26 +158,57 @@ object ChangeExtract {
   }
 
 
-  // winhc_eci_dev company_tm rowkey 20200707 rowkey,status_new
-  // winhc_eci_dev company_patent_list rowkey 20200715 lprs
-  // winhc_eci_dev company_certificate rowkey 20200707 lprs
-  // winhc_eci_dev company_copyright_works_list rowkey 20200715 type
-  // winhc_eci_dev company_copyright_reg_list rowkey 20200707 lprs
+  // winhc_eci_dev company_tm rowkey 20200717 status_new
+  // winhc_eci_dev company_patent_list rowkey 20200717 lprs
+  // winhc_eci_dev company_certificate rowkey 20200707 type
+  // winhc_eci_dev company_copyright_works_list rowkey 20200717 type
+  // winhc_eci_dev company_copyright_reg_list rowkey 20200717 version
+  // winhc_eci_dev company_employment rowkey 20200630 source
 
+  // winhc_eci_dev company_land_publicity rowkey 20200717 title,location,use_for
+  // winhc_eci_dev company_land_announcement rowkey 20200717 e_number,project_name
 
-  // winhc_eci_dev company cid 20200630 legal_entity_id,reg_location,business_scope,reg_status,reg_capital,emails,phones
-  def main(args: Array[String]): Unit = {
-    val Array(project, tableName, rowkey, inc_ds, pf) = args
+  // winhc_eci_dev company_bid_list rowkey 20200717 title
+  // winhc_eci_dev company_land_transfer rowkey 20200717 num,location
 
-    val config = EsConfig.getEsConfigMap ++ mutable.Map(
-      "spark.hadoop.odps.project.name" -> project,
-      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
-    )
 
-    val spark = SparkUtils.InitEnv("ChangeExtract", config)
-
-    ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc
-    spark.stop()
+  // winhc_eci_dev company cid 20200630 legal_entity_id,reg_location,business_scope,reg_status,reg_capital,emails,phones
+  def main(args: Array[String]): Unit = {
+    if (args.length == 5) {
+      val Array(project, tableName, rowkey, inc_ds, pf) = args
+      val config = EsConfig.getEsConfigMap ++ mutable.Map(
+        "spark.hadoop.odps.project.name" -> project,
+        "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+      )
+      val spark = SparkUtils.InitEnv("ChangeExtract", config)
+
+
+      ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc
+      spark.stop()
+    } else {
+      val project = "winhc_eci_dev"
+      val config = EsConfig.getEsConfigMap ++ mutable.Map(
+        "spark.hadoop.odps.project.name" -> project,
+        "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+      )
+      val spark = SparkUtils.InitEnv("ChangeExtract", config)
+      val rows =
+        """winhc_eci_dev company_tm rowkey 20200717 status_new
+          |winhc_eci_dev company_patent_list rowkey 20200717 lprs
+          |winhc_eci_dev company_copyright_works_list rowkey 20200717 type
+          |winhc_eci_dev company_copyright_reg_list rowkey 20200717 version
+          |winhc_eci_dev company_land_publicity rowkey 20200717 title,location,use_for
+          |winhc_eci_dev company_land_announcement rowkey 20200717 e_number,project_name
+          |winhc_eci_dev company_bid_list rowkey 20200717 title
+          |winhc_eci_dev company_land_transfer rowkey 20200717 num,location
+          |""".stripMargin
+      for (r <- rows.split("\r\n")) {
+        if(StringUtils.isNotEmpty(r)){
+          val Array(tmp,tableName,rowkey,inc_ds,pf) =r.split(" ")
+          ChangeExtractHandle(spark, project, tableName, rowkey, inc_ds, pf.split(",")).calc
+        }
+      }
+      spark.stop()
+    }
   }
-
 }

+ 23 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/chance/CompanyChangeHandle.scala

@@ -81,11 +81,11 @@ case class company_land_publicity(equCols: Seq[String]) extends CompanyChangeHan
     str
   }
 
-  override def getBizTime(newMap: Map[String, String]): String = "业务时间"
+  override def getBizTime(newMap: Map[String, String]): String = newMap("publication_start_date")
 
-  override def getUpdateTitle(newMap: Map[String, String]): String = "更新某地块公示"
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("project_name"), s"${newMap("project_name")}地块公示发生变更")
 
-  override def getInsertTitle(newMap: Map[String, String]): String = "新增某地块公示"
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("project_name"), s"新增${newMap("project_name")}地块公示")
 }
 
 
@@ -177,4 +177,24 @@ case class company_employment(equCols: Seq[String]) extends CompanyChangeHandle
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "招聘", Array("title", "city->employment_city", "employ_num", "start_date"))
 
   override def getBizTime(newMap: Map[String, String]): String = newMap("start_date")
+}
+//招投标
+case class company_bid_list(equCols:Seq[String])extends CompanyChangeHandle{
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"${newMap("title")}招投标信息发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String =  getValueOrNull(newMap("title"), s"新增${newMap("title")}招投标信息")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap,"招投标", Array("publish_time","title", "purchaser", "province", "abs"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("publish_time")
+}
+//土地转让
+case class company_land_transfer(equCols:Seq[String])extends CompanyChangeHandle {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("location"), s"${newMap("title")}土地转让信息发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("location"), s"新增${newMap("location")}土地转让信息")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "土地转让", Array("merchandise_time", "user_pre", "user_now", "location", "area", "merchandise_price", "aministrative_area"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("merchandise_time")
 }

+ 273 - 12
src/main/scala/com/winhc/bigdata/spark/jobs/chance/Inc_eci_debtor_relation.scala

@@ -3,10 +3,9 @@ package com.winhc.bigdata.spark.jobs.chance
 import java.sql.Timestamp
 import java.util.NoSuchElementException
 
-import com.winhc.bigdata.spark.utils.BaseUtil._
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.udf.BaseFunc
-import com.winhc.bigdata.spark.utils.BaseUtil.atDaysAfter
+import com.winhc.bigdata.spark.utils.BaseUtil.{atDaysAfter, _}
 import com.winhc.bigdata.spark.utils.{EsRestUtils, LoggingUtils, SparkUtils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.types.{StructField, StructType}
@@ -22,6 +21,8 @@ import scala.collection.mutable
  */
 object Inc_eci_debtor_relation {
 
+  private val env = "prod"
+
   def parseMap(map: Map[String, AnyVal]): eci_debtor_relation = {
     val id = map("id").asInstanceOf[String]
     val yg_name = map("yg_name").asInstanceOf[String]
@@ -30,21 +31,33 @@ object Inc_eci_debtor_relation {
     val bg_cid = map("bg_cid").asInstanceOf[String]
     val yg_reg_status = map("yg_reg_status").asInstanceOf[String]
     val yg_province_code = map("yg_province_code").asInstanceOf[String]
+    val yg_province_name = map("yg_province_name").asInstanceOf[String]
     val yg_city_code = map("yg_city_code").asInstanceOf[String]
+    val yg_city_name = map("yg_city_name").asInstanceOf[String]
     val yg_county_code = map("yg_county_code").asInstanceOf[String]
+    val yg_county_name = map("yg_county_name").asInstanceOf[String]
     val yg_reg_location = map("yg_reg_location").asInstanceOf[String]
     val yg_estiblish_time = map("yg_estiblish_time").asInstanceOf[String]
     val yg_category_code = map("yg_category_code").asInstanceOf[String]
+    val yg_category_first = map("yg_category_first").asInstanceOf[String]
+    val yg_category_second = map("yg_category_second").asInstanceOf[String]
+    val yg_category_third = map("yg_category_third").asInstanceOf[String]
     val yg_reg_capital = map("yg_reg_capital").asInstanceOf[String]
     val yg_phones = map("yg_phones").asInstanceOf[String]
     val yg_emails = map("yg_emails").asInstanceOf[String]
     val bg_reg_status = map("bg_reg_status").asInstanceOf[String]
     val bg_province_code = map("bg_province_code").asInstanceOf[String]
+    val bg_province_name = map("bg_province_name").asInstanceOf[String]
     val bg_city_code = map("bg_city_code").asInstanceOf[String]
+    val bg_city_name = map("bg_city_name").asInstanceOf[String]
     val bg_county_code = map("bg_county_code").asInstanceOf[String]
+    val bg_county_name = map("bg_county_name").asInstanceOf[String]
     val bg_reg_location = map("bg_reg_location").asInstanceOf[String]
     val bg_estiblish_time = map("bg_estiblish_time").asInstanceOf[String]
     val bg_category_code = map("bg_category_code").asInstanceOf[String]
+    val bg_category_first = map("bg_category_first").asInstanceOf[String]
+    val bg_category_second = map("bg_category_second").asInstanceOf[String]
+    val bg_category_third = map("bg_category_third").asInstanceOf[String]
     val bg_reg_capital = map("bg_reg_capital").asInstanceOf[String]
     val bg_phones = map("bg_phones").asInstanceOf[String]
     val bg_emails = map("bg_emails").asInstanceOf[String]
@@ -59,21 +72,33 @@ object Inc_eci_debtor_relation {
       , bg_cid
       , yg_reg_status
       , yg_province_code
+      , yg_province_name
       , yg_city_code
+      , yg_city_name
       , yg_county_code
+      , yg_county_name
       , yg_reg_location
       , yg_estiblish_time
       , yg_category_code
+      , yg_category_first
+      , yg_category_second
+      , yg_category_third
       , yg_reg_capital
       , yg_phones
       , yg_emails
       , bg_reg_status
       , bg_province_code
+      , bg_province_name
       , bg_city_code
+      , bg_city_name
       , bg_county_code
+      , bg_county_name
       , bg_reg_location
       , bg_estiblish_time
       , bg_category_code
+      , bg_category_first
+      , bg_category_second
+      , bg_category_third
       , bg_reg_capital
       , bg_phones
       , bg_emails
@@ -90,21 +115,33 @@ object Inc_eci_debtor_relation {
                                  , bg_cid: String
                                  , yg_reg_status: String
                                  , yg_province_code: String
+                                 , yg_province_name: String
                                  , yg_city_code: String
+                                 , yg_city_name: String
                                  , yg_county_code: String
+                                 , yg_county_name: String
                                  , yg_reg_location: String
                                  , yg_estiblish_time: String
                                  , yg_category_code: String
+                                 , yg_category_first: String
+                                 , yg_category_second: String
+                                 , yg_category_third: String
                                  , yg_reg_capital: String
                                  , yg_phones: String
                                  , yg_emails: String
                                  , bg_reg_status: String
                                  , bg_province_code: String
+                                 , bg_province_name: String
                                  , bg_city_code: String
+                                 , bg_city_name: String
                                  , bg_county_code: String
+                                 , bg_county_name: String
                                  , bg_reg_location: String
                                  , bg_estiblish_time: String
                                  , bg_category_code: String
+                                 , bg_category_first: String
+                                 , bg_category_second: String
+                                 , bg_category_third: String
                                  , bg_reg_capital: String
                                  , bg_phones: String
                                  , bg_emails: String
@@ -119,21 +156,33 @@ object Inc_eci_debtor_relation {
     private var bg_cid_val = bg_cid
     private var yg_reg_status_val = yg_reg_status
     private var yg_province_code_val = yg_province_code
+    private var yg_province_name_val = yg_province_name
     private var yg_city_code_val = yg_city_code
+    private var yg_city_name_val = yg_city_name
     private var yg_county_code_val = yg_county_code
+    private var yg_county_name_val = yg_county_name
     private var yg_reg_location_val = yg_reg_location
     private var yg_estiblish_time_val = yg_estiblish_time
     private var yg_category_code_val = yg_category_code
+    private var yg_category_first_val = yg_category_first
+    private var yg_category_second_val = yg_category_second
+    private var yg_category_third_val = yg_category_third
     private var yg_reg_capital_val = yg_reg_capital
     private var yg_phones_val = yg_phones
     private var yg_emails_val = yg_emails
     private var bg_reg_status_val = bg_reg_status
     private var bg_province_code_val = bg_province_code
+    private var bg_province_name_val = bg_province_name
     private var bg_city_code_val = bg_city_code
+    private var bg_city_name_val = bg_city_name
     private var bg_county_code_val = bg_county_code
+    private var bg_county_name_val = bg_county_name
     private var bg_reg_location_val = bg_reg_location
     private var bg_estiblish_time_val = bg_estiblish_time
     private var bg_category_code_val = bg_category_code
+    private var bg_category_first_val = bg_category_first
+    private var bg_category_second_val = bg_category_second
+    private var bg_category_third_val = bg_category_third
     private var bg_reg_capital_val = bg_reg_capital
     private var bg_phones_val = bg_phones
     private var bg_emails_val = bg_emails
@@ -155,21 +204,33 @@ object Inc_eci_debtor_relation {
         , bg_cid_val
         , yg_reg_status_val
         , yg_province_code_val
+        , yg_province_name_val
         , yg_city_code_val
+        , yg_city_name_val
         , yg_county_code_val
+        , yg_county_name_val
         , yg_reg_location_val
         , yg_estiblish_time_val
         , yg_category_code_val
+        , yg_category_first_val
+        , yg_category_second_val
+        , yg_category_third_val
         , yg_reg_capital_val
         , yg_phones_val
         , yg_emails_val
         , bg_reg_status_val
         , bg_province_code_val
+        , bg_province_name_val
         , bg_city_code_val
+        , bg_city_name_val
         , bg_county_code_val
+        , bg_county_name_val
         , bg_reg_location_val
         , bg_estiblish_time_val
         , bg_category_code_val
+        , bg_category_first_val
+        , bg_category_second_val
+        , bg_category_third_val
         , bg_reg_capital_val
         , bg_phones_val
         , bg_emails_val
@@ -181,13 +242,166 @@ object Inc_eci_debtor_relation {
 
   }
 
-  val target_ads_creditor_info = "ads_creditor_info"
-  val target_ads_eci_debtor_relation = "ads_eci_debtor_relation"
-  val target_write_debtor_relation = "ads_write_eci_debtor_relation"
+  val target_ads_creditor_info = "ads_creditor_info" // "ads_creditor_info"
+  val target_ads_eci_debtor_relation ="ads_eci_debtor_relation" //"ads_eci_debtor_relation"
+  val target_write_debtor_relation = "ads_write_eci_debtor_relation"//"ads_write_eci_debtor_relation"
 
   case class DebtorRelation(s: SparkSession, ds: String) extends LoggingUtils with BaseFunc with Logging {
     @(transient@getter) val spark: SparkSession = s
 
+
+    def prefix(): Unit = {
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS `winhc_eci_dev`.`$target_ads_creditor_info` (
+           |  `id` BIGINT,
+           |  `case_id` BIGINT,
+           |  `case_no` STRING,
+           |  `case_type` STRING,
+           |  `case_reason` STRING,
+           |  `case_stage` STRING,
+           |  `case_amt` DOUBLE,
+           |  `ys_yg` STRING,
+           |  `ys_bg` STRING,
+           |  `judge_date` DATETIME,
+           |  `zhixing_date` STRING,
+           |  `zhixing_result` STRING,
+           |  `curr_stage` STRING,
+           |  `curr_date` STRING,
+           |  `curr_result` STRING,
+           |  `ys_yg_cid` STRING COMMENT '一审原告cid',
+           |  `ys_bg_cid` STRING COMMENT '一审被告cid',
+           |  `yg_reg_status` STRING,
+           |  `yg_province_code` STRING,
+           |  `yg_province_name` STRING,
+           |  `yg_city_code` STRING,
+           |  `yg_city_name` STRING,
+           |  `yg_county_code` STRING,
+           |  `yg_county_name` STRING,
+           |  `yg_reg_location` STRING,
+           |  `yg_estiblish_time` STRING,
+           |  `yg_category_code` STRING,
+           |  `yg_category_first` STRING,
+           |  `yg_category_second` STRING,
+           |  `yg_category_third` STRING,
+           |  `yg_reg_capital` STRING,
+           |  `yg_phones` STRING,
+           |  `yg_emails` STRING,
+           |  `bg_reg_status` STRING,
+           |  `bg_province_code` STRING,
+           |  `bg_province_name` STRING,
+           |  `bg_city_code` STRING,
+           |  `bg_city_name` STRING,
+           |  `bg_county_code` STRING,
+           |  `bg_county_name` STRING,
+           |  `bg_reg_location` STRING,
+           |  `bg_estiblish_time` STRING,
+           |  `bg_category_code` STRING,
+           |  `bg_category_first` STRING,
+           |  `bg_category_second` STRING,
+           |  `bg_category_third` STRING,
+           |  `bg_reg_capital` STRING,
+           |  `bg_phones` STRING,
+           |  `bg_emails` STRING,
+           |  `deleted` BIGINT
+           |  )
+           |""".stripMargin)
+
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS `winhc_eci_dev`.`$target_ads_eci_debtor_relation` (
+           |  `id` STRING,
+           |  `yg_name` STRING,
+           |  `bg_name` STRING,
+           |  `yg_cid` STRING,
+           |  `bg_cid` STRING,
+           |  `yg_reg_status` STRING,
+           |  `yg_province_code` STRING,
+           |  `yg_province_name` STRING,
+           |  `yg_city_code` STRING,
+           |  `yg_city_name` STRING,
+           |  `yg_county_code` STRING,
+           |  `yg_county_name` STRING,
+           |  `yg_reg_location` STRING,
+           |  `yg_estiblish_time` STRING,
+           |  `yg_category_code` STRING,
+           |  `yg_category_first` STRING,
+           |  `yg_category_second` STRING,
+           |  `yg_category_third` STRING,
+           |  `yg_reg_capital` STRING,
+           |  `yg_phones` STRING,
+           |  `yg_emails` STRING,
+           |  `bg_reg_status` STRING,
+           |  `bg_province_code` STRING,
+           |  `bg_province_name` STRING,
+           |  `bg_city_code` STRING,
+           |  `bg_city_name` STRING,
+           |  `bg_county_code` STRING,
+           |  `bg_county_name` STRING,
+           |  `bg_reg_location` STRING,
+           |  `bg_estiblish_time` STRING,
+           |  `bg_category_code` STRING,
+           |  `bg_category_first` STRING,
+           |  `bg_category_second` STRING,
+           |  `bg_category_third` STRING,
+           |  `bg_reg_capital` STRING,
+           |  `bg_phones` STRING,
+           |  `bg_emails` STRING,
+           |  `deleted` BIGINT,
+           |  `update_time` DATETIME,
+           |  `create_time` DATETIME)
+           |PARTITIONED BY (
+           |  `ds` STRING)
+           |LIFECYCLE 30
+           |""".stripMargin)
+
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS `${if (env.equals("dev")) "winhc_eci_dev" else "winhc_eci"}`.`$target_write_debtor_relation` (
+           |  `id` STRING,
+           |  `yg_name` STRING,
+           |  `bg_name` STRING,
+           |  `yg_cid` STRING,
+           |  `bg_cid` STRING,
+           |  `yg_reg_status` STRING,
+           |  `yg_province_code` STRING,
+           |  `yg_province_name` STRING,
+           |  `yg_city_code` STRING,
+           |  `yg_city_name` STRING,
+           |  `yg_county_code` STRING,
+           |  `yg_county_name` STRING,
+           |  `yg_reg_location` STRING,
+           |  `yg_estiblish_time` STRING,
+           |  `yg_category_code` STRING,
+           |  `yg_category_first` STRING,
+           |  `yg_category_second` STRING,
+           |  `yg_category_third` STRING,
+           |  `yg_reg_capital` STRING,
+           |  `yg_phones` STRING,
+           |  `yg_emails` STRING,
+           |  `bg_reg_status` STRING,
+           |  `bg_province_code` STRING,
+           |  `bg_province_name` STRING,
+           |  `bg_city_code` STRING,
+           |  `bg_city_name` STRING,
+           |  `bg_county_code` STRING,
+           |  `bg_county_name` STRING,
+           |  `bg_reg_location` STRING,
+           |  `bg_estiblish_time` STRING,
+           |  `bg_category_code` STRING,
+           |  `bg_category_first` STRING,
+           |  `bg_category_second` STRING,
+           |  `bg_category_third` STRING,
+           |  `bg_reg_capital` STRING,
+           |  `bg_phones` STRING,
+           |  `bg_emails` STRING,
+           |  `deleted` BIGINT,
+           |  `update_time` DATETIME,
+           |  `create_time` DATETIME)
+           |""".stripMargin)
+    }
+
+
     def inc(): Unit = {
       val yesterday_ds = atDaysAfter(-1, ds)
       company_split()
@@ -212,27 +426,39 @@ object Inc_eci_debtor_relation {
            |        ,'' as ys_bg_cid
            |        ,'' as yg_reg_status
            |        ,'' as yg_province_code
+           |        ,'' as yg_province_name
            |        ,'' as yg_city_code
+           |        ,'' as yg_city_name
            |        ,'' as yg_county_code
+           |        ,'' as yg_county_name
            |        ,'' as yg_reg_location
            |        ,'' as yg_estiblish_time
            |        ,'' as yg_category_code
+           |        ,'' as yg_category_first
+           |        ,'' as yg_category_second
+           |        ,'' as yg_category_third
            |        ,'' as yg_reg_capital
            |        ,'' as yg_phones
            |        ,'' as yg_emails
            |        ,'' as bg_reg_status
            |        ,'' as bg_province_code
+           |        ,'' as bg_province_name
            |        ,'' as bg_city_code
+           |        ,'' as bg_city_name
            |        ,'' as bg_county_code
+           |        ,'' as bg_county_name
            |        ,'' as bg_reg_location
            |        ,'' as bg_estiblish_time
            |        ,'' as bg_category_code
+           |        ,'' as bg_category_first
+           |        ,'' as bg_category_second
+           |        ,'' as bg_category_third
            |        ,'' as bg_reg_capital
            |        ,'' as bg_phones
            |        ,'' as bg_emails
            |        ,CASE (zhixing_result = 2 OR( zhixing_result IS NULL AND curr_result = '胜')) WHEN TRUE THEN 0 ELSE 1 END AS deleted
            |        ,1 as flag
-           |FROM    winhc_eci.inc_ods_creditor_info
+           |FROM    ${if (env.equals("dev")) "winhc_eci_dev" else "winhc_eci"}.inc_ods_creditor_info
            |LATERAL VIEW explode(company_split(ys_bg)) a AS ys_bg_xjk
            |LATERAL VIEW explode(company_split(ys_yg)) b AS ys_yg_xjk
            |WHERE   ds = $ds
@@ -279,21 +505,33 @@ object Inc_eci_debtor_relation {
 
               val yg_reg_status = yg_map("reg_status")
               val yg_province_code = yg_map("province_code")
+              val yg_province_name = yg_map("province_name")
               val yg_city_code = yg_map("city_code")
+              val yg_city_name = yg_map("city_name")
               val yg_county_code = yg_map("county_code")
+              val yg_county_name = yg_map("county_name")
               val yg_reg_location = yg_map("reg_location")
               val yg_estiblish_time = yg_map("estiblish_time")
               val yg_category_code = yg_map("category_code")
+              val yg_category_first = yg_map("category_first")
+              val yg_category_second = yg_map("category_second")
+              val yg_category_third = yg_map("category_third")
               val yg_reg_capital = yg_map("reg_capital")
               val yg_phones = yg_map("phones")
               val yg_emails = yg_map("emails")
               val bg_reg_status = bg_map("reg_status")
               val bg_province_code = bg_map("province_code")
+              val bg_province_name = bg_map("province_name")
               val bg_city_code = bg_map("city_code")
+              val bg_city_name = bg_map("city_name")
               val bg_county_code = bg_map("county_code")
+              val bg_county_name = bg_map("county_name")
               val bg_reg_location = bg_map("reg_location")
               val bg_estiblish_time = bg_map("estiblish_time")
               val bg_category_code = bg_map("category_code")
+              val bg_category_first = yg_map("category_first")
+              val bg_category_second = yg_map("category_second")
+              val bg_category_third = yg_map("category_third")
               val bg_reg_capital = bg_map("reg_capital")
               val bg_phones = bg_map("phones")
               val bg_emails = bg_map("emails")
@@ -319,21 +557,33 @@ object Inc_eci_debtor_relation {
                 , ys_bg_cid
                 , yg_reg_status
                 , yg_province_code
+                , yg_province_name
                 , yg_city_code
+                , yg_city_name
                 , yg_county_code
+                , yg_county_name
                 , yg_reg_location
                 , yg_estiblish_time
                 , yg_category_code
+                , yg_category_first
+                , yg_category_second
+                , yg_category_third
                 , yg_reg_capital
                 , yg_phones
                 , yg_emails
                 , bg_reg_status
                 , bg_province_code
+                , bg_province_name
                 , bg_city_code
+                , bg_city_name
                 , bg_county_code
+                , bg_county_name
                 , bg_reg_location
                 , bg_estiblish_time
                 , bg_category_code
+                , bg_category_first
+                , bg_category_second
+                , bg_category_third
                 , bg_reg_capital
                 , bg_phones
                 , bg_emails
@@ -352,7 +602,7 @@ object Inc_eci_debtor_relation {
       spark.createDataFrame(inc_rdd, schema)
         .createOrReplaceTempView("inc_tmp_creditor_info")
 
-      val cols = getColumns("winhc_eci_dev.ads_creditor_info")
+      val cols = getColumns(s"winhc_eci_dev.$target_ads_creditor_info")
 
       //全量覆盖写出文书债权关系表
       sql(
@@ -385,21 +635,33 @@ object Inc_eci_debtor_relation {
            |        ,ys_bg_cid AS bg_cid
            |        ,yg_reg_status
            |        ,yg_province_code
+           |        ,yg_province_name
            |        ,yg_city_code
+           |        ,yg_city_name
            |        ,yg_county_code
+           |        ,yg_county_name
            |        ,yg_reg_location
            |        ,yg_estiblish_time
            |        ,yg_category_code
+           |        ,yg_category_first
+           |        ,yg_category_second
+           |        ,yg_category_third
            |        ,yg_reg_capital
            |        ,yg_phones
            |        ,yg_emails
            |        ,bg_reg_status
            |        ,bg_province_code
+           |        ,bg_province_name
            |        ,bg_city_code
+           |        ,bg_city_name
            |        ,bg_county_code
+           |        ,bg_county_name
            |        ,bg_reg_location
            |        ,bg_estiblish_time
            |        ,bg_category_code
+           |        ,bg_category_first
+           |        ,bg_category_second
+           |        ,bg_category_third
            |        ,bg_reg_capital
            |        ,bg_phones
            |        ,bg_emails
@@ -416,9 +678,6 @@ object Inc_eci_debtor_relation {
 
       val eci_cols = getColumns(s"winhc_eci_dev.$target_ads_eci_debtor_relation")
 
-      println(eci_cols)
-
-
       val write_schema = StructType(sql(
         s"""
            |select * from winhc_eci_dev.$target_ads_eci_debtor_relation where 1==0 and ds = $ds
@@ -464,7 +723,7 @@ object Inc_eci_debtor_relation {
       write_df
         .write
         .mode(if (isWindows) "append" else "overwrite")
-        .insertInto(s"winhc_eci.$target_write_debtor_relation")
+        .insertInto(s"${if (env.equals("dev")) "winhc_eci_dev" else "winhc_eci"}.$target_write_debtor_relation")
 
     }
   }
@@ -477,7 +736,9 @@ object Inc_eci_debtor_relation {
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
     val spark = SparkUtils.InitEnv("eci_debtor_relation", config)
-    DebtorRelation(spark, ds).inc
+    val v = DebtorRelation(spark, ds)
+//    v.prefix()
+    v.inc()
     spark.stop()
   }
 

+ 330 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/creditor_info_add_other.scala

@@ -0,0 +1,330 @@
+package com.winhc.bigdata.spark.jobs.chance
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/13 11:14
+ * @Description: 全量文书债权表补充数据
+ */
+object creditor_info_add_other {
+
+  case class add_cols_other(s: SparkSession, project: String) extends LoggingUtils with BaseFunc {
+    @(transient@getter) val spark: SparkSession = s
+
+    def prefix(ds: String, targetTable: String): Unit = {
+      company_split()
+      area_code()
+      tyc_split()
+      code2Name()
+
+      sql(
+        s"""
+           |DROP TABLE IF EXISTS winhc_eci_dev.$targetTable
+           |""".stripMargin)
+
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS `winhc_eci_dev`.`$targetTable` (
+           |  `id` BIGINT,
+           |  `case_id` BIGINT,
+           |  `case_no` STRING,
+           |  `case_type` STRING,
+           |  `case_reason` STRING,
+           |  `case_stage` STRING,
+           |  `case_amt` DOUBLE,
+           |  `ys_yg` STRING,
+           |  `ys_bg` STRING,
+           |  `judge_date` DATETIME,
+           |  `zhixing_date` STRING,
+           |  `zhixing_result` STRING,
+           |  `curr_stage` STRING,
+           |  `curr_date` STRING,
+           |  `curr_result` STRING,
+           |  `ys_yg_cid` STRING COMMENT '一审原告cid',
+           |  `ys_bg_cid` STRING COMMENT '一审被告cid',
+           |  `yg_reg_status` STRING,
+           |  `yg_province_code` STRING,
+           |  `yg_province_name` STRING,
+           |  `yg_city_code` STRING,
+           |  `yg_city_name` STRING,
+           |  `yg_county_code` STRING,
+           |  `yg_county_name` STRING,
+           |  `yg_reg_location` STRING,
+           |  `yg_estiblish_time` STRING,
+           |  `yg_category_code` STRING,
+           |  `yg_category_first` STRING,
+           |  `yg_category_second` STRING,
+           |  `yg_category_third` STRING,
+           |  `yg_reg_capital` STRING,
+           |  `yg_phones` STRING,
+           |  `yg_emails` STRING,
+           |  `bg_reg_status` STRING,
+           |  `bg_province_code` STRING,
+           |  `bg_province_name` STRING,
+           |  `bg_city_code` STRING,
+           |  `bg_city_name` STRING,
+           |  `bg_county_code` STRING,
+           |  `bg_county_name` STRING,
+           |  `bg_reg_location` STRING,
+           |  `bg_estiblish_time` STRING,
+           |  `bg_category_code` STRING,
+           |  `bg_category_first` STRING,
+           |  `bg_category_second` STRING,
+           |  `bg_category_third` STRING,
+           |  `bg_reg_capital` STRING,
+           |  `bg_phones` STRING,
+           |  `bg_emails` STRING,
+           |  `deleted` BIGINT
+           |  )
+           |""".stripMargin)
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.$targetTable
+           |SELECT  id
+           |        ,case_id
+           |        ,case_no
+           |        ,case_type
+           |        ,case_reason
+           |        ,case_stage
+           |        ,case_amt
+           |        ,ys_yg_xjk AS ys_yg
+           |        ,ys_bg_xjk AS ys_bg
+           |        ,judge_date
+           |        ,zhixing_date
+           |        ,zhixing_result
+           |        ,curr_stage
+           |        ,curr_date
+           |        ,curr_result
+           |        ,'' AS ys_yg_cid
+           |        ,'' AS ys_bg_cid
+           |        ,'' AS yg_reg_status
+           |        ,'' AS yg_province_code
+           |        ,'' AS yg_province_name
+           |        ,'' AS yg_city_code
+           |        ,'' AS yg_city_name
+           |        ,'' AS yg_county_code
+           |        ,'' AS yg_county_name
+           |        ,'' AS yg_reg_location
+           |        ,'' AS yg_estiblish_time
+           |        ,'' AS yg_category_code
+           |        ,'' AS yg_category_first
+           |        ,'' AS yg_category_second
+           |        ,'' AS yg_category_third
+           |        ,'' AS yg_reg_capital
+           |        ,'' AS yg_phones
+           |        ,'' AS yg_emails
+           |        ,'' AS bg_reg_status
+           |        ,'' AS bg_province_code
+           |        ,'' AS bg_province_name
+           |        ,'' AS bg_city_code
+           |        ,'' AS bg_city_name
+           |        ,'' AS bg_county_code
+           |        ,'' AS bg_county_name
+           |        ,'' AS bg_reg_location
+           |        ,'' AS bg_estiblish_time
+           |        ,'' AS bg_category_code
+           |        ,'' AS bg_category_first
+           |        ,'' AS bg_category_second
+           |        ,'' AS bg_category_third
+           |        ,'' AS bg_reg_capital
+           |        ,'' AS bg_phones
+           |        ,'' AS bg_emails
+           |        ,CASE (zhixing_result = 2 OR( zhixing_result IS NULL AND curr_result = '胜')) WHEN TRUE THEN 0 ELSE 1 END AS deleted
+           |FROM    winhc_eci_dev.ods_creditor_info
+           |LATERAL VIEW explode(company_split(ys_bg)) a AS ys_bg_xjk
+           |LATERAL VIEW explode(company_split(ys_yg)) b AS ys_yg_xjk
+           |WHERE   ds = '$ds'
+           |AND     yg_type = '企业'
+           |AND     bg_type = '企业'
+           |AND     LENGTH(ys_yg_xjk) > 4
+           |AND     LENGTH(ys_bg_xjk) > 4
+           |""".stripMargin)
+    }
+
+    def suffix(targetTable: String): Unit = {
+      val cols = getColumns(s"winhc_eci_dev.$targetTable").map("tmp."+_).mkString(",")
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.$targetTable
+           |SELECT  $cols
+           |FROM    (
+           |            SELECT  a.*
+           |                    ,row_number() OVER (PARTITION BY a.id,a.ys_yg_cid,a.ys_bg_cid ORDER BY id DESC) c
+           |            FROM    (
+           |                        SELECT  *
+           |                        FROM    winhc_eci_dev.$targetTable
+           |                        WHERE   ys_yg_cid IS NOT NULL
+           |                        AND     ys_bg_cid IS NOT NULL
+           |                        AND     ys_yg_cid <> ''
+           |                        AND     ys_bg_cid <> ''
+           |                    ) AS a
+           |        ) AS tmp
+           |WHERE   tmp.c = 1
+           |""".stripMargin)
+
+    }
+
+    def calc(pre: String, table: String): Unit = {
+      val fields = Seq(
+        "reg_status", "province_code", "city_code", "county_code", "province_name", "city_name", "county_name", "reg_location", "estiblish_time", "category_code",
+        "category_first", "category_second", "category_third", "reg_capital", "phones", "emails"
+      )
+
+      var org_name = s"ys_${pre}"
+      org_name = org_name.substring(0, org_name.length - 1)
+
+      val cid = "ys_" + pre + "cid"
+      val fs = fields.map(pre + _).toSet ++ Set(cid)
+
+      val str = getColumns(s"winhc_eci_dev.$table").map(f => {
+        if (fs.contains(f)) {
+          if(f.endsWith("cid")){
+            s"tt2.${f.substring(6)} as $f"
+          }else{
+            s"tt2.${f.substring(3)} as $f"
+          }
+        } else {
+          s"tt1.$f"
+        }
+      }).mkString(",")
+
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.$table
+           |SELECT  $str
+           |FROM    (
+           |            SELECT  *
+           |            FROM    winhc_eci_dev.$table
+           |            WHERE   $org_name IS NOT NULL
+           |        ) AS tt1
+           |LEFT JOIN (
+           |              SELECT  cast(tmp.cid as string) cid
+           |                      ,tmp.name
+           |                      ,tmp.history_names
+           |                      ,cast(tmp.current_cid as string) current_cid
+           |                      ,cast(tmp.company_type as string) company_type
+           |                      ,get_province_code(tmp.area_code) as province_code
+           |                      ,get_province_name(tmp.area_code) as province_name
+           |                      ,get_city_code(tmp.area_code) as city_code
+           |                      ,get_city_name(tmp.area_code) as city_name
+           |                      ,get_county_code(tmp.area_code) as county_code
+           |                      ,get_county_name(tmp.area_code) as county_name
+           |                      ,tmp.credit_code
+           |                      ,cast(tmp.reg_status as string) as reg_status
+           |                      ,tmp.reg_location
+           |                      ,date_format(tmp.estiblish_time,'yyyy-MM-dd') estiblish_time
+           |                      ,cast(tmp.lat as string) lat
+           |                      ,cast(tmp.lng as String) lng
+           |                      ,cast(tmp.category_code as string) category_code
+           |                      ,get_category_first(cast(tmp.category_code as string)) category_first
+           |                      ,get_category_second(cast(tmp.category_code as string)) category_second
+           |                      ,get_category_third(cast(tmp.category_code as string)) category_third
+           |                      ,tmp.reg_capital
+           |                      ,cast(tmp.reg_capital_amount as string) reg_capital_amount
+           |                      ,CONCAT_WS(',',tyc_split(tmp.phones)) phones
+           |                      ,CONCAT_WS(',',tyc_split(tmp.emails)) emails
+           |              FROM    (
+           |                          SELECT  cid
+           |                                  ,name
+           |                                  ,history_names
+           |                                  ,current_cid
+           |                                  ,company_type
+           |                                  ,credit_code
+           |                                  ,reg_status
+           |                                  ,area_code
+           |                                  ,reg_location
+           |                                  ,estiblish_time
+           |                                  ,lat
+           |                                  ,lng
+           |                                  ,category_code
+           |                                  ,reg_capital
+           |                                  ,reg_capital_amount
+           |                                  ,phones
+           |                                  ,emails
+           |                                  ,update_time
+           |                                  ,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY update_time DESC ) AS num
+           |                          FROM    (
+           |                                      SELECT  cid
+           |                                              ,name
+           |                                              ,history_names
+           |                                              ,current_cid
+           |                                              ,company_type
+           |                                              ,credit_code
+           |                                              ,reg_status
+           |                                              ,area_code
+           |                                              ,reg_location
+           |                                              ,estiblish_time
+           |                                              ,lat
+           |                                              ,lng
+           |                                              ,category_code
+           |                                              ,reg_capital
+           |                                              ,reg_capital_amount
+           |                                              ,phones
+           |                                              ,emails
+           |                                              ,update_time
+           |                                      FROM    winhc_eci_dev.ads_company
+           |                                      WHERE   ds = 20200604
+           |                                      UNION ALL
+           |                                      SELECT  cid
+           |                                              ,name
+           |                                              ,history_names
+           |                                              ,current_cid
+           |                                              ,company_type
+           |                                              ,credit_code
+           |                                              ,reg_status
+           |                                              ,area_code
+           |                                              ,reg_location
+           |                                              ,estiblish_time
+           |                                              ,lat
+           |                                              ,lng
+           |                                              ,category_code
+           |                                              ,reg_capital
+           |                                              ,reg_capital_amount
+           |                                              ,phones
+           |                                              ,emails
+           |                                              ,update_time
+           |                                      FROM    winhc_eci_dev.inc_ads_company
+           |                                      WHERE   ds > 20200604
+           |                                  )
+           |                      ) AS tmp
+           |              WHERE   tmp.num = 1
+           |          ) AS tt2
+           |ON      tt1.$org_name = tt2.name
+           |""".stripMargin)
+
+    }
+  }
+
+
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    val map = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "3800"
+    )
+    val Array(ds) = args
+    //    val ds = "20200721"
+    val spark = SparkUtils.InitEnv("add_cols_other", map)
+
+    val table = "xjk_tmp_ads_cre_info_test_v2"
+
+    val v = add_cols_other(spark, project)
+    v.prefix(ds, table)
+
+    for (pre <- Seq("yg_", "bg_")) {
+      v.calc(pre, table)
+    }
+
+    v.suffix(table)
+    spark.stop()
+  }
+}

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/chance/eci_good_news.scala

@@ -111,7 +111,7 @@ object eci_good_news {
            |        ,null AS defendant
            |        ,rel_bg_name AS company_name
            |        ,detail_cid AS cid
-           |        ,json_add_str(detail_label,CONCAT_WS(',',get_json_kv('reg_capital',rel_bg_reg_capital),get_json_kv('province',rel_bg_province_code),get_json_kv('city',rel_bg_city_code),get_json_kv('county',rel_bg_county_code))) AS tags
+           |        ,json_add_str(detail_label,CONCAT_WS(',',get_json_kv('reg_capital',rel_bg_reg_capital),get_json_kv('province',rel_bg_province_code),get_json_kv('city',rel_bg_city_code),get_json_kv('county',rel_bg_county_code),get_json_kv('estiblish_time',rel_bg_estiblish_time),get_json_kv('category_code',rel_bg_category_code))) AS tags
            |        ,detail_rowkey AS biz_id
            |        ,get_table_type(detail_table_name) AS type
            |        ,get_chance_dynamic_type(detail_table_name) AS dynamic_type
@@ -131,14 +131,14 @@ object eci_good_news {
 
 
   def main(args: Array[String]): Unit = {
-    //    val Array(ds) = args
+        val Array(ds) = args
 
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
     val spark = SparkUtils.InitEnv("eci_good_news", config)
-    eci_good_news_handle(spark, "20200707").company_ip()
+    eci_good_news_handle(spark, ds).company_ip()
     spark.stop()
   }
 

+ 74 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/increment/inc_phx_cid_ads.scala

@@ -0,0 +1,74 @@
+package com.winhc.bigdata.spark.jobs.increment
+
+import java.util.Date
+
+import com.winhc.bigdata.spark.jobs.CompanyForCid.valid
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions.col
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author yyn
+ * @Date 2020/7/15
+ * @Description TODO
+ */
+object inc_phx_cid_ads extends LoggingUtils {
+  var config = mutable.Map(
+    "spark.hadoop.odps.project.name" -> "winhc_eci_dev"
+  )
+  val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+  def main(args: Array[String]): Unit = {
+    var hasList: Boolean = false
+    if (args.length < 1) {
+      println("请输入:1、表名;2、是否有list表 ")
+      sys.exit(-1)
+    } else if (args.length == 2) {
+      hasList = args(1).toBoolean
+    }
+    val sourceTable = args(0)
+    val adsTable = s"inc_ads_$sourceTable"
+    val phxTable = sourceTable.toUpperCase
+    val adsColumns: Seq[String] = spark.table(adsTable).schema.map(_.name).filter(!_.equals("ds"))
+    import com.winhc.bigdata.spark.implicits.PhoenixHelper._
+    //处理主表
+    //增量ads最后一个分区
+    val lastDsIncAds = BaseUtil.getPartion(adsTable, spark)
+    val df1 = sql(s"""SELECT ${adsColumns.mkString(",")} FROM ${adsTable} WHERE ds=${lastDsIncAds}""")
+    if (hasList) {
+      df1.columns.foldLeft(df1) {
+        (currentDF, column) =>
+          currentDF.withColumn(column, col(column).cast("String"))
+      }.drop("cids").drop("flag")
+        .withColumnRenamed("new_cids", "cids")
+        .createOrReplaceTempView(s"tmp")
+      sql(s"""SELECT id AS ROWKEY,* FROM tmp""").repartition(200).save2PhoenixByJDBC(s"${phxTable}")
+      println(s"${this.getClass.getSimpleName} phx main table writed! " + new Date().toString)
+      //处理list表
+      val adsListColumns: Seq[String] = spark.table(adsTable + "_list").schema.map(_.name).filter(!_.equals("ds"))
+      val lastDsIncAdsList = BaseUtil.getPartion(adsTable + "_list", spark)
+      val df2 = sql(s"""SELECT ${adsListColumns.mkString(",")} FROM ${adsTable}_list WHERE ds=${lastDsIncAdsList}""")
+      df2.columns.foldLeft(df2) {
+        (currentDF, column) =>
+          currentDF.withColumn(column, col(column).cast("String"))
+      }.drop("cid")
+        .withColumnRenamed("new_cid", "cid")
+        .createOrReplaceTempView(s"tmp2")
+      sql(s"""SELECT * FROM tmp2""").repartition(200).save2PhoenixByJDBC(s"${phxTable}_LIST")
+      println(s"${this.getClass.getSimpleName} phx list table writed! " + new Date().toString)
+    } else {
+      df1.columns.foldLeft(df1) {
+        (currentDF, column) =>
+          currentDF.withColumn(column, col(column).cast("String"))
+      }.drop("cid").drop("flag")
+        .withColumnRenamed("new_cid", "cid")
+        .createOrReplaceTempView(s"tmp")
+      sql(s"""SELECT * FROM tmp""").repartition(200).save2PhoenixByJDBC(s"${phxTable}")
+      println(s"${this.getClass.getSimpleName} phx single table writed! " + new Date().toString)
+    }
+    spark.stop()
+  }
+}

+ 163 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/increment/script/ads_company_land_transfer_ods.sql

@@ -0,0 +1,163 @@
+INSERT OVERWRITE TABLE winhc_eci_dev.ads_company_land_transfer PARTITION (ds='20200604')
+SELECT --只有有现土地使用权人
+    CONCAT_WS("_",new_cid,id) AS rowkey,
+--    NVL(B.new_cid,A.pre_cid)   AS pre_cid ,--没有原土地使用权人
+    NVL(B.new_cid,A.now_cid)   AS new_cid,
+    A.now_cid AS cid,
+    'now' AS  type,
+    id                      ,
+    mark               ,
+    num                ,
+    location           ,
+    aministrative_area ,
+    user_pre           ,
+    user_now           ,
+    area               ,
+    use_for            ,
+    use_type           ,
+    years_of_use       ,
+    situation          ,
+    level              ,
+    merchandise_type   ,
+    merchandise_price  ,
+    merchandise_time,
+    url                ,
+    create_time,
+    update_time,
+    deleted
+FROM winhc_eci_dev.ods_company_land_transfer AS A
+LEFT JOIN winhc_eci_dev.company_map AS B
+ON A.now_cid=B.cid
+WHERE A.ds='20200604'
+AND A.pre_cid is NULL--没有原土地使用权人
+AND A.now_cid is NOT NULL--但有现土地使用权人
+UNION --只有原土地使用权人
+SELECT
+    CONCAT_WS("_",new_cid,id) AS rowkey,
+    NVL(B.new_cid,A.pre_cid)   AS new_cid ,
+    A.pre_cid AS cid,
+--    NVL(B.new_cid,A.now_cid)   AS cid,--没有现土地使用权人
+    'pre' AS  type,
+    id                     ,
+    mark               ,
+    num                ,
+    location           ,
+    aministrative_area ,
+    user_pre           ,
+    user_now           ,
+    area               ,
+    use_for            ,
+    use_type           ,
+    years_of_use       ,
+    situation          ,
+    level              ,
+    merchandise_type   ,
+    merchandise_price  ,
+    merchandise_time,
+    url                ,
+    create_time,
+    update_time,
+    deleted
+FROM winhc_eci_dev.ods_company_land_transfer AS A
+LEFT JOIN winhc_eci_dev.company_map AS B
+ON A.pre_cid=B.cid
+WHERE A.ds='20200604'
+AND A.pre_cid is NOT NULL--但原土地使用权人
+AND A.now_cid is NULL--没有现土地使用权人
+UNION--原土地使用权人与现土地使用权人为同一人或企业(cid)
+SELECT
+    CONCAT_WS("_",new_cid,id) AS rowkey,
+    NVL(B.new_cid,A.pre_cid)   AS new_cid ,
+    A.pre_cid AS cid,
+--    NVL(B.new_cid,A.now_cid)   AS cid,--与pre_cid相同
+    'bothsame' AS  type,
+    id                     ,
+    mark               ,
+    num                ,
+    location           ,
+    aministrative_area ,
+    user_pre           ,
+    user_now           ,
+    area               ,
+    use_for            ,
+    use_type           ,
+    years_of_use       ,
+    situation          ,
+    level              ,
+    merchandise_type   ,
+    merchandise_price  ,
+    merchandise_time,
+    url                ,
+    create_time,
+    update_time,
+    deleted
+FROM winhc_eci_dev.ods_company_land_transfer AS A
+LEFT JOIN winhc_eci_dev.company_map AS B
+ON A.pre_cid=B.cid
+WHERE A.ds='20200604'
+AND A.pre_cid is NOT NULL AND A.pre_cid=A.now_cid--原土地使用权人与现土地使用权人相同
+UNION--原土地使用权人与现土地使用权人都有但不为同一人或企业(cid),拆成二条的第一条(抵押人)
+SELECT
+    CONCAT_WS("_",new_cid,id) AS rowkey,
+    NVL(B.new_cid,A.pre_cid)   AS new_cid ,--一分为二的第一条
+    A.pre_cid AS cid,
+--    NVL(B.new_cid,A.now_cid)   AS cid,
+    'bothone' AS  type,
+    id                      ,
+    mark               ,
+    num                ,
+    location           ,
+    aministrative_area ,
+    user_pre           ,
+    user_now           ,
+    area               ,
+    use_for            ,
+    use_type           ,
+    years_of_use       ,
+    situation          ,
+    level              ,
+    merchandise_type   ,
+    merchandise_price  ,
+    merchandise_time,
+    url                ,
+    create_time,
+    update_time,
+    deleted
+FROM winhc_eci_dev.ods_company_land_transfer AS A
+LEFT JOIN winhc_eci_dev.company_map AS B
+ON A.pre_cid=B.cid
+WHERE A.ds='20200604'
+AND A.pre_cid is NOT NULL AND A.now_cid is NOT NULL AND A.pre_cid!=A.now_cid--抵押人、抵押权人都有但不相同
+UNION--原土地使用权人与现土地使用权人都有但不为同一人或企业(cid),拆成二条的第二条(抵押权人)
+SELECT
+    CONCAT_WS("_",new_cid,id) AS rowkey,
+--    NVL(B.new_cid,A.pre_cid)   AS cid ,
+    NVL(B.new_cid,A.now_cid)   AS new_cid,--一分为二的第二条
+    A.now_cid AS cid,
+    'bothtwo' AS  type,
+    id                      ,
+    mark               ,
+    num                ,
+    location           ,
+    aministrative_area ,
+    user_pre           ,
+    user_now           ,
+    area               ,
+    use_for            ,
+    use_type           ,
+    years_of_use       ,
+    situation          ,
+    level              ,
+    merchandise_type   ,
+    merchandise_price  ,
+    merchandise_time,
+    url                ,
+    create_time,
+    update_time,
+    deleted
+FROM winhc_eci_dev.ods_company_land_transfer AS A
+LEFT JOIN winhc_eci_dev.company_map AS B
+ON A.now_cid=B.cid
+WHERE A.ds='20200604'
+AND A.pre_cid is NOT NULL AND A.now_cid is NOT NULL AND A.pre_cid!=A.now_cid--抵押人、抵押权人都有但不相同
+;

+ 192 - 81
src/main/scala/com/winhc/bigdata/spark/model/CompanyBidScore.scala

@@ -1,12 +1,21 @@
 package com.winhc.bigdata.spark.model
 
-import java.util.Date
-
-import com.winhc.bigdata.calc.{DimScore, DimScoreV2}
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import java.util
+import java.util.{Collections, Date}
+
+import com.alibaba.fastjson.JSON
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.jobs.EsQuery
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.EsRestUtils.getRestClient
+import com.winhc.bigdata.spark.utils.{BaseUtil, EsRestUtils, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
 import org.apache.commons.lang3.StringUtils
-import org.apache.spark.broadcast.Broadcast
+import org.apache.http.entity.ContentType
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Row, SparkSession}
+import org.elasticsearch.client.RestClient
 
 import scala.annotation.meta.getter
 import scala.collection.mutable
@@ -19,91 +28,79 @@ import scala.collection.mutable
 object CompanyBidScore {
 
   val tabMapping: Map[String, (String, String, String, String)] =
-    Map("ads_company_bid_list" -> ("1", "publish_time", "资产权益", "招投标") //招投标
+    Map("company_bid_list" -> ("306", "publish_time", "资产权益", "招投标") //招投标
     )
 
   def main(args: Array[String]): Unit = {
 
-    val (sourceTable, flag, time, kind, project) = valid(args)
+    val (namespace, sourceTable, flag, time, kind, project) = valid(args)
 
-    var config = mutable.Map.empty[String, String]
+    var config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
 
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
 
     println(s"company ${this.getClass.getSimpleName} calc start! " + new Date().toString)
+        spark.sql(
+          """
+            |select "24416401" as new_cid,1111L as id,'2020-07-18' as publish_time
+            |""".stripMargin).createOrReplaceTempView("inc_view")
+
+    new CompanyBidScore(spark, sourceTable, "inc_view", flag, time, kind, project, "1", namespace).calc()
+//    new CompanyBidScore(spark, sourceTable, "", flag, time, kind, project, "0", namespace).calc()
 
-    new CompanyBidScore(spark, sourceTable, flag, time, kind, project).calc()
 
     println(s"company ${this.getClass.getSimpleName} calc end! " + new Date().toString)
     spark.stop()
   }
 
   def valid(args: Array[String]) = {
-    if (args.length != 1) {
-      println("请输入要计算的table!!!! ")
+    println(args.mkString(", "))
+    if (args.length != 2) {
+      println("please enter namespace, table!!!! ")
       sys.exit(-1)
     }
-    val Array(sourceTable) = args
+    val Array(namespace, sourceTable) = args
 
     val (flag, time, kind, project) = tabMapping.getOrElse(sourceTable, ("", "", "", ""))
     if (flag.isEmpty || time.isEmpty || kind.isEmpty || project.isEmpty) {
-      println("输入表不存在!!!   ")
+      println("table not found!!!   ")
       sys.exit(-1)
     }
-    (sourceTable, flag, time, kind, project)
+    (namespace, sourceTable, flag, time, kind, project)
   }
 }
 
-case class CompanyBidScore(s: SparkSession, sourceTable: String,
-                           flag: String, time: String, kind: String, project: String
-                          ) extends LoggingUtils {
+case class CompanyBidScore(s: SparkSession, sourceTable: String, tableView: String,
+                           flag: String, time: String, kind: String, project: String, tp: String = "0", namespace: String
+                          ) extends LoggingUtils with Logging {
 
   @(transient@getter) val spark: SparkSession = s
 
   import spark.implicits._
 
-  def calc(): Unit = {
-
-    val ods_company = "new_ods_company"
-    val company_category = "const_company_category_code"
-
-    //    //所属行业
-    //    val code2Name: Broadcast[Map[String, String]] = spark.sparkContext.broadcast(sql(
-    //      s"""
-    //         |select category_code,category_str_big
-    //         |from $company_category
-    //      """.stripMargin).collect().map(r => {
-    //      (r.getString(0), r.getString(1))
-    //    }).toMap)
-    //
-    //    spark.udf.register("industry_name", (code: String) => {
-    //      code2Name.value.getOrElse(code, null)
-    //    })
-    //
-    //    val industry = sql(
-    //      s"""
-    //         |select category_code,cast(cid as string) as ncid,
-    //         |       industry_name(category_code) AS industry_name
-    //         |from $ods_company where cid is not null
-    //         |""".stripMargin)
-    //
-    //    industry.show(100)
-    //
-    //
-    //    industry.createOrReplaceTempView("t1")
-
-    val industry2 = sql(
-      s"""
-         |select a.category_code,cast(a.cid as string) as ncid,
-         |       b.category_str_big AS industry_name
-         |from $ods_company  a
-         |left join const_company_category_code b on a.category_code = b.category_code
-         |where cid is not null
-         |""".stripMargin)
-    industry2.createOrReplaceTempView("t1")
+  def calc() = {
+
+    val ads_company = s"$namespace.ads_company"
+    val company_category = s"$namespace.const_company_category_code"
+    val ads_company_tb = s"$namespace.ads_$sourceTable"
+    val inc_ads_company_tb = s"$namespace.inc_ads_$sourceTable"
 
-    //    注意线上是否分区
-    //     ds = '${BaseUtil.getPartion(sourceTable, spark)}' AND
+    val adsCompanyPar = BaseUtil.getPartion(ads_company, spark)
+//    val adsPar = BaseUtil.getPartion(ads_company_tb, spark)
+
+    var ds = ""
+    var appsql2 = ""
+    var tb = ads_company_tb
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(inc_ads_company_tb, spark)
+    } else {
+      ds = BaseUtil.getPartion(ads_company_tb, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
 
     val df = sql(
       s"""
@@ -111,45 +108,100 @@ case class CompanyBidScore(s: SparkSession, sourceTable: String,
          |FROM    (
          |        SELECT
          |                *
-         |                ,COUNT(ncid) OVER(PARTITION BY ncid ) AS cnt1
-         |                ,row_number() OVER(PARTITION BY ncid ORDER BY $time DESC ) AS num
-         |        FROM    $sourceTable
-         |        WHERE
-         |
-         |             ncid IS NOT NULL
+         |                ,COUNT(new_cid) OVER(PARTITION BY new_cid ) AS cnt1
+         |                ,ROW_NUMBER() OVER(PARTITION BY new_cid ORDER BY $time DESC ) AS num
+         |        FROM    $tb
+         |        WHERE new_cid IS NOT NULL
+         |        ${appsql2}
          |        ) a
          |WHERE   num =1
-         |""".stripMargin).createOrReplaceTempView("t2")
-    //      .join(industry, Seq("ncid"), "left")
-    //      .select("cid", "id", "cnt1", "industry_name", "ncid")
+         |""".stripMargin)
 
-    val df2 = sql(
-      """
-        |select t2.*,t1.industry_name,category_code from t2 left join t1 on t2.ncid = t1.ncid
-        |""".stripMargin)
-    df2.show(100)
+    df.createOrReplaceTempView("t2")
+
+    if (tp.equals("0")) {
+      sql(
+        s"""
+           |select a.category_code,cast(a.cid as string) as new_cid,
+           |       b.category_str_big AS industry_name
+           |from $ads_company  a
+           |left join $company_category b on a.category_code = b.category_code
+           |where a.cid is not null and a.ds=${adsCompanyPar}
+           |""".stripMargin).createOrReplaceTempView("t1")
+
+      sql(
+        """
+          |select t2.*,t1.industry_name,category_code from t2 left join t1 on t2.new_cid = t1.new_cid
+          |""".stripMargin).map(r => {
+        trans(r, flag, kind, project)
+      }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+        "score", "total", "extraScore")
+        .createOrReplaceTempView(s"tmp_view")
 
-    df2.map(r => {
-      trans(r, flag, kind, project)
-    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
-      "score", "total", "extraScore")
-      .createOrReplaceTempView(s"${sourceTable}_tmp_view")
+    } else {
+      df.mapPartitions(iter => {
+        trans2(iter, flag, kind, project)
+      }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+        "score", "total", "extraScore")
+        .createOrReplaceTempView(s"tmp_view")
+    }
 
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table ${ads_company_tb}_score partition(ds=$ds)
+         |select id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+         |from tmp_view
+         |""".stripMargin)
 
-    sql(s"select * from ${sourceTable}_tmp_view").show(10)
-    sql(s"insert overwrite table ${sourceTable}_score  select * from ${sourceTable}_tmp_view")
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from tmp_view
+           |""".stripMargin)
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
   }
 
+  //存量逻辑
   def trans(r: Row, flag: String, kind: String, prpject: String) = {
     val id = r.getAs[Long]("id")
-    val cid = r.getAs[Long]("ncid").toString
+    val cid = r.getAs[String]("new_cid")
     val cnt1 = r.getAs[Long]("cnt1")
     val industry_name = r.getAs[String]("industry_name")
     flag match {
-      case "1" => tenderScore(id, cid, cnt1, kind, prpject, industry_name)
+      case "306" => tenderScore(id, cid, cnt1, kind, prpject, industry_name)
     }
   }
 
+  //增量逻辑
+  def trans2(iter: Iterator[Row], flag: String, kind: String, prpject: String) = {
+    val restClient = getRestClient()
+    val df = iter.map(r => {
+      try {
+        val id = r.getAs[Long]("id")
+        val cid = r.getAs[String]("new_cid")
+        val cnt1 = r.getAs[Long]("cnt1")
+        var m1: Map[String, String] = Map.empty[String, String]
+        m1 = EsQuery2.queryCompanyForCid(restClient, cid)
+        val industry_name = m1("category_first")
+        flag match {
+          case "306" => tenderScore(id, cid, cnt1, kind, prpject, industry_name)
+        }
+      } catch {
+        case e: Exception => {
+          logWarning(r.toString())
+          logError(e.getMessage, e)
+          null
+        }
+      }
+    })
+    df
+  }
+
   //招投标
   def tenderScore(id: Long, cid: String, cnt1: Long, kind: String, project: String, industry_name: String) = {
     var score = 0f
@@ -196,3 +248,62 @@ case class CompanyBidScore(s: SparkSession, sourceTable: String,
   }
 
 }
+
+object EsQuery2 {
+
+  def main(args: Array[String]): Unit = {
+    val client = EsRestUtils.getRestClient()
+    val map = queryCompanyForCid(client, "23537076")
+    println(map)
+  }
+
+  def queryCompanyForCid(restClient: RestClient, cid: String) = {
+    val query = ""
+    val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
+
+    val indexResponse = restClient.performRequest(
+      "GET",
+      s"/winhc-company/company/_search/?q=_id:$cid",
+      Collections.emptyMap[String, String](),
+      entity)
+    val en = indexResponse.getEntity
+    val res = EntityUtils.toString(en)
+    import scala.collection.JavaConverters._
+    val list = getIndexResult2(res)
+    if (list.nonEmpty) {
+      val id = list.head("_id").asInstanceOf[String]
+      val source: util.Map[String, Any] = list.head("_source").asInstanceOf[util.Map[String, Any]]
+      val province_code = source.get("province_code").asInstanceOf[String]
+      val city_code = source.get("city_code").asInstanceOf[String]
+      val county_code = source.get("county_code").asInstanceOf[String]
+      val reg_capital = source.get("reg_capital").asInstanceOf[String]
+      val category_first = source.get("category_first").asInstanceOf[String]
+      val category_second = source.get("category_second").asInstanceOf[String]
+      val category_third = source.get("category_third").asInstanceOf[String]
+      val estiblish_time = source.get("estiblish_time").asInstanceOf[String]
+      val phones = source.get("phones").asInstanceOf[util.List[String]].asScala.mkString(",")
+
+      Map(
+        "id" -> id,
+        "province_code" -> province_code,
+        "city_code" -> city_code,
+        "county_code" -> county_code,
+        "reg_capital" -> reg_capital,
+        "estiblish_time" -> estiblish_time,
+        "phones" -> phones,
+        "category_first" -> category_first,
+        "category_second" -> category_second,
+        "category_third" -> category_third
+      )
+    } else {
+      Map.empty[String, String]
+    }
+  }
+
+  def getIndexResult2(json: String) = {
+    import scala.collection.JavaConverters._
+    JSON.parseObject(json).getJSONObject("hits").getJSONArray("hits").toArray().map(m => m.asInstanceOf[util.Map[String, Any]]).map(_.asScala).toList
+  }
+}
+
+

+ 125 - 0
src/main/scala/com/winhc/bigdata/spark/model/CompanyEmploymentScore.scala

@@ -0,0 +1,125 @@
+package com.winhc.bigdata.spark.model
+
+import java.util.Date
+
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.utils.BaseUtil.atMonthsBefore
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * 招聘得分
+ */
+case class CompanyEmploymentScore(s: SparkSession, sourceTable: String, tableView: String = "",
+                                  flag: String, time: String, kind: String, project: String,
+                                  tp: String = "0", namespace: String = ""
+                                 ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+    //val targetTable = "ads_company_total_score"
+    val adsTable = namespace + ".ads_" + sourceTable
+    val incAdsTable = namespace + ".inc_ads_" + sourceTable
+    val targetTable = namespace + ".ads_" + sourceTable + "_score"
+    var ds = ""
+
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
+
+    val df = sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |        SELECT
+         |                *
+         |                ,COUNT(new_cid) OVER(PARTITION BY new_cid ) AS cnt1
+         |                ,SUM(CASE WHEN cast(start_date as string) >= '${atMonthsBefore(12)}' THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid ) AS cnt2
+         |                ,ROW_NUMBER() OVER(PARTITION BY new_cid ORDER BY $time DESC ) AS num
+         |        FROM    $tb
+         |        WHERE   new_cid IS NOT NULL
+         |        ${appsql2}
+         |        ) a
+         |WHERE   num =1
+         |""".stripMargin)
+
+    df.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"t1_view")
+
+    sql(s"insert overwrite table ${targetTable} " +
+      s"partition (ds='${ds}')  select * from t1_view")
+
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from t1_view
+           |""".stripMargin)
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = r.getAs[Long]("id")
+    val cid = r.getAs[Long]("new_cid").toString
+    val cnt1 = r.getAs[Long]("cnt1")
+    val cnt2 = r.getAs[Long]("cnt2")
+    flag match {
+      case "302" => employmentScore(id, cid, cnt1,cnt2, kind, prpject)
+    }
+  }
+
+  //招聘
+  def employmentScore(id: Long, cid: String, cnt1: Long, cnt2: Long,kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt2 > 0) {
+      score = 5f
+      ty = "近一年内有招聘信息"
+    } else if (cnt1 > 0) {
+      score = 4f
+      ty = "有招聘信息,但是近1年内无招聘信息"
+    } else{
+      score = 3f
+      ty = "无招聘信息"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+}
+
+object CompanyEmploymentScore {
+  def main(args: Array[String]): Unit = {
+    var config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyEmploymentScore(spark, "company_employment","", "302", "start_date", "经营情况", "招聘", "0", "winhc_eci_dev").calc()
+    spark.stop()
+  }
+}

+ 49 - 3
src/main/scala/com/winhc/bigdata/spark/model/CompanyIntellectualsScore.scala

@@ -23,7 +23,9 @@ object CompanyIntellectualsScore {
       "company_patent_list" -> ("3;4", "pub_date", "资产权益", "实用新型、外观设计专利;发明专利"), //专利
       "company_icp" -> ("5", "examine_date", "资产权益", "网站"), //网站
       "company_tm" -> ("6", "app_date", "资产权益", "商标"), //商标
-      "company_land_announcement" -> ("7", "commit_time", "资产权益", "购地信息") //购地信息
+      "company_land_announcement" -> ("7", "commit_time", "资产权益", "购地信息"), //购地信息
+      "company_land_publicity" -> ("8", "publication_start_date", "资产权益", "土地公示"), //地块公示
+      "company_employment" -> ("208", "start_date", "经营情况", "招聘") //招聘
     )
 
   def main(args: Array[String]): Unit = {
@@ -40,7 +42,6 @@ object CompanyIntellectualsScore {
 
     //专利分成两部分
     if (flag.contains(";")) {
-
       flag.split(";").foreach(f => {
         new CompanyIntellectualsScore(spark, sourceTable, "", f, time, kind, project, "0", namespace + ".").calc()
       })
@@ -77,7 +78,11 @@ object CompanyIntellectualsScore {
       //sys.exit(0)
       return
     }
-
+    //招聘
+    if (flag.equals("302")) {
+      new CompanyEmploymentScore(spark, sourceTable, tableView, flag, time, kind, project, "1", namespace).calc()
+      return
+    }
     //专利分成两部分
     if (flag.contains(";")) {
       flag.split(";").foreach(f => {
@@ -187,6 +192,7 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String, table
       case "5" => webSiteScore(id, cid, cnt1, kind, prpject)
       case "6" => tradeMarkScore(id, cid, cnt1, kind, prpject)
       case "7" => immovableScore(id, cid, cnt1, kind, prpject)
+      case "8" => land_publicity(id, cid, cnt1, kind, prpject)
     }
   }
 
@@ -344,4 +350,44 @@ case class CompanyIntellectualsScore(s: SparkSession, sourceTable: String, table
       score, total, extraScore)
   }
 
+  //地块公示
+  def land_publicity(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 15f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 7f
+      ty = "无"
+    } else if (cnt1 <= 2) {
+      score = 12f
+      ty = "≤2"
+    } else {
+      score = 15f
+      ty = ">2"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //招聘
+  def company_employment(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 3f
+      ty = "无招聘信息"
+    } else if (cnt1 <= 2) {
+      score = 12f
+      ty = "有招聘信息,但是近1年内无招聘信息"
+    } else {
+      score = 15f
+      ty = "近一年内有招聘信息"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
 }

+ 49 - 4
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -1,13 +1,15 @@
 package com.winhc.bigdata.spark.udf
 
+import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper
 import com.winhc.bigdata.spark.utils.BaseUtil
 import org.apache.commons.lang3.StringUtils
+import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.SparkSession
-
-import scala.annotation.meta.getter
 import org.json4s._
 import org.json4s.jackson.JsonMethods._
 
+import scala.annotation.meta.getter
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/7/10 13:49
@@ -18,7 +20,50 @@ trait BaseFunc {
   private val pattern = "[^\\u4e00-\\u9fa5a-zA-Z \\(\\)().]+".r
 
 
-  def cleanup(): Unit ={
+  def code2Name(): (Broadcast[Map[String, Seq[String]]], Broadcast[Map[String, Seq[String]]]) = {
+    val categoryCode2Name = spark.sparkContext.broadcast(spark.sql(
+      s"""
+         |select category_code,
+         |       cate_first,
+         |       cate_second,
+         |       cate_third
+         |from winhc_eci_dev.ods_category_code
+         |where ds = '20200604'
+      """.stripMargin).collect().map(r => {
+      (r.getString(0), Seq(r.getString(1), r.getString(2), r.getString(3)))
+    }).toMap)
+
+    val areaCode2Name = spark.sparkContext.broadcast(spark.sql(
+      s"""
+         |select area_code,province,city,district
+         |from winhc_eci_dev.ods_area_code where ds = '20200604'
+      """.stripMargin).collect().map(r => {
+      (r.getString(0), Seq(r.getString(1), r.getString(2), r.getString(3)))
+    }).toMap)
+
+    spark.udf.register("get_category_first", (code: String) => {
+      CompanyIndexSave2EsHelper.get_seq_by_index(categoryCode2Name, code, 0)
+    })
+    spark.udf.register("get_category_second", (code: String) => {
+      CompanyIndexSave2EsHelper.get_seq_by_index(categoryCode2Name, code, 1)
+    })
+    spark.udf.register("get_category_third", (code: String) => {
+      CompanyIndexSave2EsHelper.get_seq_by_index(categoryCode2Name, code, 2)
+    })
+
+    spark.udf.register("get_province_name", (code: String) => {
+      CompanyIndexSave2EsHelper.get_seq_by_index(areaCode2Name, code, 0)
+    })
+    spark.udf.register("get_city_name", (code: String) => {
+      CompanyIndexSave2EsHelper.get_seq_by_index(areaCode2Name, code, 1)
+    })
+    spark.udf.register("get_county_name", (code: String) => {
+      CompanyIndexSave2EsHelper.get_seq_by_index(areaCode2Name, code, 2)
+    })
+    (categoryCode2Name, areaCode2Name)
+  }
+
+  def cleanup(): Unit = {
     //清理特殊字符
     spark.udf.register("cleanup", (col: String) => {
       BaseUtil.cleanup(col)
@@ -40,7 +85,7 @@ trait BaseFunc {
       if (StringUtils.isNotBlank(value)) {
         "\"" + key + "\":\"" + value + "\""
       } else {
-        "\"" + key + "\":" + value
+        "\"" + key + "\":\"\""
       }
     })
   }

+ 31 - 5
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.spark.utils
 
 import java.text.SimpleDateFormat
-import java.util.regex.{Matcher, Pattern}
+import java.util.regex.{Pattern}
 import java.util.{Calendar, Date, Locale}
 import org.apache.commons.lang3.StringUtils
 import org.apache.commons.lang3.time.DateFormatUtils
@@ -83,12 +83,38 @@ object BaseUtil {
     ""
   }
 
-  def replaceChar(s: String): String = {
-    if (StringUtils.isNotBlank(s)) s.replaceAll("、", ",").replaceAll(";", ",").replaceAll(",", ",").replaceAll(" ", ",")
-    else ""
+  private val replace_char = "[^\\u4e00-\\u9fa5a-zA-Z\\(\\)()]+".r
+
+  def replaceChar(s: String) = {
+    if (StringUtils.isNotBlank(s)) {
+      val arr =
+        s.replaceAll("、", ",")
+          .replaceAll(";", ",")
+          .replaceAll(",", ",")
+          .replaceAll(" ", ",")
+          .replaceAll("。", ",")
+          .replaceAll(";", ",")
+          .replaceAll(":", ",")
+          .replaceAll("\\s+", ",").split(",")
+
+      val list = arr.filter(_.length > 1).toList
+      if (list.nonEmpty) {
+        val sb = new StringBuilder
+        for (a <- list) {
+          sb.append(a).append(",")
+        }
+        sb.substring(0, sb.lastIndexOf(",")) toString()
+      }else{
+        ""
+      }
+    } else {
+      ""
+    }
   }
 
   def main(args: Array[String]): Unit = {
-    println(atMonthsBefore(0,"yyyy-MM-dd HH:mm:ss"))
+    println(replaceChar(",x,"))
+    println(replaceChar("华为信息科技公司,。百度科技公司"))
+    println(replaceChar("2015)深南法蛇民初第883-887受理郑委,曹   连云,庄忠杰,曹元洪,曹硕"))
   }
 }

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/utils/ChangeExtractUtils.scala

@@ -35,12 +35,13 @@ object ChangeExtractUtils {
     if (StringUtils.isNotBlank(value)) {
       "\"" + value + "\""
     } else {
-      null
+      "\"\""
     }
   }
 
 
   def main(args: Array[String]): Unit = {
     val name = get_ip_tags("a", null, "b", null)
+    println(name)
   }
 }