Sfoglia il codice sorgente

全量写出索引,企业债权关系表增量

许家凯 4 anni fa
parent
commit
6b045b8056

+ 172 - 0
src/main/scala/com/winhc/bigdata/spark/implicits/CompanyIndexSave2EsHelper.scala

@@ -0,0 +1,172 @@
+package com.winhc.bigdata.spark.implicits
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.DataFrame
+import org.apache.spark.sql.functions.col
+import org.elasticsearch.spark._
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/11 16:25
+ * @Description:
+ */
+object CompanyIndexSave2EsHelper {
+  val companyIndexFields = Seq(
+    "cid"
+    , "name"
+    , "history_names"
+    , "current_cid"
+    , "company_type" //公司类型
+
+    , "credit_code" //统一信用代码
+    , "reg_status" //公司状态
+    , "area_code" // 行政区划码
+    , "reg_location" //注册地址
+    , "estiblish_time" // 成立日期(注册时间)
+    , "lat" //公司纬度
+    , "lng" //公司经度
+    , "category_code" // 行业分类
+    , "reg_capital" // 注册资本,展示类型
+    , "reg_capital_amount" // 注册资本,数值类型
+    , "phones" //电话
+    , "emails" //邮箱
+  )
+
+  implicit class DataFrameEnhancer(df: DataFrame) {
+    def companyIndexSave2Es(): Unit = {
+      import df.sparkSession.implicits._
+      df.select(companyIndexFields.map(column => col(column).cast("string")): _*)
+        .rdd.map(r => {
+        val map = companyIndexFields.map(f => {
+          (f, r.getAs[String](f))
+        }).toMap
+        getEsDoc(map)
+      })
+        .saveToEsWithMeta("winhc-company-v5/company")
+    }
+  }
+
+  case class Geo(lat: String, lon: String)
+
+  case class CompanyName(show: String, value: String)
+
+  case class CompanyDoc(
+                         cname: CompanyName
+                         , current_id: String
+                         , history_name: Seq[CompanyName]
+                         , company_type: String
+                         , credit_code: String
+                         , reg_status: String
+                         //                         , geo: Geo
+                         , province_code: String
+                         , city_code: String
+                         , county_code: String
+                         , reg_location: String
+                         , estiblish_time: String
+                         , category_code: String
+                         , reg_capital: String
+                         , reg_capital_amount: String
+                         , phones: Seq[String]
+                         , emails: Seq[String]
+                       )
+
+  val pattern = "[^\\u4e00-\\u9fa50-9a-zA-Z]".r
+
+  def getEsDoc(map: Map[String, String]): (String, CompanyDoc) = {
+    val lat = map("lat")
+    val lng = map("lng")
+    var geo: String = null
+    if (StringUtils.isNotBlank(lat) && StringUtils.isNotBlank(lng)) {
+      geo = lat + "," + lng
+    }
+
+    val c = get_area_code(map("area_code"))
+    val province_code = c._1
+    val city_code = c._2
+    val county_code = c._3
+    val et = map("estiblish_time")
+    val time = if (StringUtils.isNotBlank(et)) et else null
+
+    val doc = CompanyDoc(
+      cname = getCompanyName(map("name"))
+      , current_id = map("current_cid")
+      , history_name = getHistoryName(map("name"), map("history_names"))
+      , company_type = map("company_type")
+      , credit_code = map("credit_code")
+      , reg_status = map("reg_status")
+      //      , geo = Geo(lat = lng, lon = lat)
+      , province_code = province_code
+      , city_code = city_code
+      , county_code = county_code
+      , reg_location = map("reg_location")
+      , estiblish_time = time
+      , category_code = map("category_code")
+      , reg_capital = map("reg_capital")
+      , reg_capital_amount = map("reg_capital_amount")
+      , phones = getSplit(map("phones"))
+      , emails = getSplit(map("emails"))
+    )
+    (map("cid"), doc)
+  }
+
+
+  private def getCompanyName(name: String): CompanyName = {
+    if (StringUtils.isEmpty(name)) null
+    else {
+      val value = pattern replaceAllIn(name, "")
+      CompanyName(name, value)
+    }
+  }
+
+  private def getHistoryName(cname: String, names: String): Seq[CompanyName] = {
+    if (StringUtils.isEmpty(names)) {
+      null
+    } else {
+      val res = getSplit(names)
+        .filter(!cname.equals(_))
+        .filter(StringUtils.isNoneEmpty(_))
+        .map(getCompanyName)
+      if (res.isEmpty) {
+        null
+      } else {
+        res
+      }
+    }
+  }
+
+  private def get_area_code(code: String): (String, String, String) = {
+    if (StringUtils.isNotBlank(code) && code.trim.length == 6) {
+      val c = code.trim
+      (c.substring(0, 2), c.substring(2, 4), c.substring(4, 6))
+    } else {
+      (null, null, null)
+    }
+  }
+
+  private def getSplit(str: String): Seq[String] = {
+    if (StringUtils.isNotBlank(str)) {
+      str.split("\t;\t").filter(StringUtils.isNotBlank).toSet.toList
+    } else {
+      Seq.empty[String]
+    }
+  }
+
+  private val DATE_TIME_FORMAT = "yyyy-MM-dd"
+
+  private def validateDf(str: String): Boolean = try {
+    if (StringUtils.isNotBlank(str)) {
+      java.time.LocalDateTime.parse(str, java.time.format.DateTimeFormatter.ofPattern(DATE_TIME_FORMAT))
+      true
+    } else {
+      false
+    }
+  } catch {
+    case ex: java.time.format.DateTimeParseException => {
+      false
+    }
+  }
+
+  def main(args: Array[String]): Unit = {
+    println(validateDf("2010-03-03 00:00:00"))
+  }
+}

+ 50 - 23
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIndexSave2Es.scala

@@ -1,8 +1,11 @@
 package com.winhc.bigdata.spark.jobs
 
 import com.winhc.bigdata.spark.config.EsConfig
-import com.winhc.bigdata.spark.utils.CompanyEsUtils.getEsDoc
-import com.winhc.bigdata.spark.utils.SparkUtils
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
 
 /**
  * @Author: XuJiakai
@@ -10,33 +13,57 @@ import com.winhc.bigdata.spark.utils.SparkUtils
  * @Description:
  */
 object CompanyIndexSave2Es {
-  private def getOtherIdName(str: String): scala.collection.Map[String, String] = {
-    if (str == null) {
-      return null
+
+  case class CompanyIndexSave2Es_all_inc(s: SparkSession, project: String) extends LoggingUtils {
+    @(transient@getter) val spark: SparkSession = s
+
+
+    def calc(): Unit = {
+      import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper._
+
+      val all_company_max_ds = getLastPartitionsOrElse(s"${project}.ads_company", "0")
+
+    /*  println(
+        s"""
+           |SELECT  ${companyIndexFields.map(f => if (f.eq("estiblish_time")) "date_format(tmp.estiblish_time,'yyyy-MM-dd') estiblish_time" else "tmp." + f).mkString(",")}
+           |FROM    (
+           |            SELECT  ${companyIndexFields.mkString(",")},update_time
+           |                    ,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY update_time DESC ) AS num
+           |            FROM    (
+           |                        SELECT  ${companyIndexFields.mkString(",")}
+           |                                ,update_time
+           |                        FROM    winhc_eci_dev.ads_company
+           |                        WHERE   ds = $all_company_max_ds
+           |                        UNION ALL
+           |                        SELECT  ${companyIndexFields.mkString(",")}
+           |                                ,update_time
+           |                        FROM    winhc_eci_dev.inc_ads_company
+           |                        WHERE   ds > $all_company_max_ds
+           |                    )
+           |        ) AS tmp
+           |WHERE   tmp.num = 1
+           |""".stripMargin)
+      */
+      sql(
+        s"""
+           |select * from winhc_test_dev.xjk_tmp_company_all
+           |""".stripMargin)
+        .companyIndexSave2Es()
     }
-    str.split("\002").map(s => {
-      val sp = s.split("\001")
-      (sp(0), sp(1))
-    }).toMap
   }
 
   def main(args: Array[String]): Unit = {
-    val map = EsConfig.getEsConfigMap
-
-    val company_name_mapping = "winhc_eci_dev.company_name_mapping_pro_v2"
+    val project = "winhc_eci_dev"
+    val map = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "2"
+    )
 
     val spark = SparkUtils.InitEnv("CompanyIndexSave2Es", map)
-    val df = spark.sql(s"select cid,cname,other_id_name,new_cid,company_type from $company_name_mapping")
-    import org.elasticsearch.spark._
-    import spark.implicits._
-    df.map(r => {
-      val cid = r.getLong(0).toString
-      val cname = r.getString(1)
-      val other_id_name = getOtherIdName(r.getString(2))
-      val new_cid = r.getString(3)
-      val company_type = r.getString(4)
-      getEsDoc(cid, cname, other_id_name, new_cid, company_type)
-    }).rdd.saveToEsWithMeta("winhc-company-v2/company")
+
+    CompanyIndexSave2Es_all_inc(spark, project).calc
+
     spark.stop()
   }
 }

+ 9 - 9
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -98,15 +98,15 @@ object ChangeExtract {
 
       // (123_abc,insert,{a->b},all,新增某土地公示,1(1.一般变更,2.风险变更))
       val schema = StructType(Array(
-        StructField("rowkey", StringType),
-        StructField("table_name", StringType),
-        StructField("type", StringType),
-        StructField("data", MapType(StringType, StringType)),
-        StructField("fields", StringType),
-        StructField("title", StringType),
-        StructField("label", StringType),
-        StructField("biz_time", StringType),
-        StructField("update_time", StringType)
+        StructField("rowkey", StringType), //表数据主建
+        StructField("table_name", StringType), //表名
+        StructField("type", StringType), // 变更类型 insert update
+        StructField("data", MapType(StringType, StringType)), //变更后数据
+        StructField("fields", StringType), //如果是更新 则显示更新字段
+        StructField("title", StringType), // 动态数据展示 ps. 新增某土地公示
+        StructField("label", StringType), // 1.一般变更,2.风险变更
+        StructField("biz_time", StringType), //业务时间
+        StructField("update_time", StringType) //处理时间
       ))
 
       val df = spark.createDataFrame(rdd, schema) //

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/CompanyChangeHandle.scala

@@ -38,6 +38,7 @@ trait CompanyChangeHandle extends Serializable {
   }
 }
 
+//土地公示
 case class company_land_publicity(equCols: Seq[String]) extends CompanyChangeHandle with Serializable {
   override def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, Map[String, String], String, String, String, String) = {
     if (oldMap == null) {

+ 481 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/Inc_eci_debtor_relation.scala

@@ -0,0 +1,481 @@
+package com.winhc.bigdata.spark.jobs.chance
+
+import java.sql.Timestamp
+import java.util.NoSuchElementException
+
+import com.winhc.bigdata.spark.utils.BaseUtil._
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.atDaysAfter
+import com.winhc.bigdata.spark.utils.{EsRestUtils, LoggingUtils, SparkUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.types.{StructField, StructType}
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/13 16:32
+ * @Description:
+ */
+object Inc_eci_debtor_relation {
+
+  def parseMap(map: Map[String, AnyVal]): eci_debtor_relation = {
+    val id = map("id").asInstanceOf[String]
+    val yg_name = map("yg_name").asInstanceOf[String]
+    val bg_name = map("bg_name").asInstanceOf[String]
+    val yg_cid = map("yg_cid").asInstanceOf[String]
+    val bg_cid = map("bg_cid").asInstanceOf[String]
+    val yg_reg_status = map("yg_reg_status").asInstanceOf[String]
+    val yg_province_code = map("yg_province_code").asInstanceOf[String]
+    val yg_city_code = map("yg_city_code").asInstanceOf[String]
+    val yg_county_code = map("yg_county_code").asInstanceOf[String]
+    val yg_reg_location = map("yg_reg_location").asInstanceOf[String]
+    val yg_estiblish_time = map("yg_estiblish_time").asInstanceOf[String]
+    val yg_category_code = map("yg_category_code").asInstanceOf[String]
+    val yg_reg_capital = map("yg_reg_capital").asInstanceOf[String]
+    val yg_phones = map("yg_phones").asInstanceOf[String]
+    val yg_emails = map("yg_emails").asInstanceOf[String]
+    val bg_reg_status = map("bg_reg_status").asInstanceOf[String]
+    val bg_province_code = map("bg_province_code").asInstanceOf[String]
+    val bg_city_code = map("bg_city_code").asInstanceOf[String]
+    val bg_county_code = map("bg_county_code").asInstanceOf[String]
+    val bg_reg_location = map("bg_reg_location").asInstanceOf[String]
+    val bg_estiblish_time = map("bg_estiblish_time").asInstanceOf[String]
+    val bg_category_code = map("bg_category_code").asInstanceOf[String]
+    val bg_reg_capital = map("bg_reg_capital").asInstanceOf[String]
+    val bg_phones = map("bg_phones").asInstanceOf[String]
+    val bg_emails = map("bg_emails").asInstanceOf[String]
+    val deleted = map("deleted").asInstanceOf[Long]
+    val update_time = map("update_time").asInstanceOf[Timestamp]
+    val create_time = map("create_time").asInstanceOf[Timestamp]
+    val ds = map("ds").asInstanceOf[String]
+    val e = eci_debtor_relation(id
+      , yg_name
+      , bg_name
+      , yg_cid
+      , bg_cid
+      , yg_reg_status
+      , yg_province_code
+      , yg_city_code
+      , yg_county_code
+      , yg_reg_location
+      , yg_estiblish_time
+      , yg_category_code
+      , yg_reg_capital
+      , yg_phones
+      , yg_emails
+      , bg_reg_status
+      , bg_province_code
+      , bg_city_code
+      , bg_county_code
+      , bg_reg_location
+      , bg_estiblish_time
+      , bg_category_code
+      , bg_reg_capital
+      , bg_phones
+      , bg_emails
+      , deleted
+      , update_time
+      , create_time, ds)
+    e
+  }
+
+  case class eci_debtor_relation(id: String
+                                 , yg_name: String
+                                 , bg_name: String
+                                 , yg_cid: String
+                                 , bg_cid: String
+                                 , yg_reg_status: String
+                                 , yg_province_code: String
+                                 , yg_city_code: String
+                                 , yg_county_code: String
+                                 , yg_reg_location: String
+                                 , yg_estiblish_time: String
+                                 , yg_category_code: String
+                                 , yg_reg_capital: String
+                                 , yg_phones: String
+                                 , yg_emails: String
+                                 , bg_reg_status: String
+                                 , bg_province_code: String
+                                 , bg_city_code: String
+                                 , bg_county_code: String
+                                 , bg_reg_location: String
+                                 , bg_estiblish_time: String
+                                 , bg_category_code: String
+                                 , bg_reg_capital: String
+                                 , bg_phones: String
+                                 , bg_emails: String
+                                 , deleted: Long
+                                 , update_time: Timestamp
+                                 , create_time: Timestamp
+                                 , ds: String) extends Serializable {
+    private var id_val = id
+    private var yg_name_val = yg_name
+    private var bg_name_val = bg_name
+    private var yg_cid_val = yg_cid
+    private var bg_cid_val = bg_cid
+    private var yg_reg_status_val = yg_reg_status
+    private var yg_province_code_val = yg_province_code
+    private var yg_city_code_val = yg_city_code
+    private var yg_county_code_val = yg_county_code
+    private var yg_reg_location_val = yg_reg_location
+    private var yg_estiblish_time_val = yg_estiblish_time
+    private var yg_category_code_val = yg_category_code
+    private var yg_reg_capital_val = yg_reg_capital
+    private var yg_phones_val = yg_phones
+    private var yg_emails_val = yg_emails
+    private var bg_reg_status_val = bg_reg_status
+    private var bg_province_code_val = bg_province_code
+    private var bg_city_code_val = bg_city_code
+    private var bg_county_code_val = bg_county_code
+    private var bg_reg_location_val = bg_reg_location
+    private var bg_estiblish_time_val = bg_estiblish_time
+    private var bg_category_code_val = bg_category_code
+    private var bg_reg_capital_val = bg_reg_capital
+    private var bg_phones_val = bg_phones
+    private var bg_emails_val = bg_emails
+    private var deleted_val = deleted
+    private var update_time_val = update_time
+    private var create_time_val = create_time
+    private var ds_val = ds
+
+    def setCreateTime(createTime: Timestamp): Unit = {
+      create_time_val = createTime
+    }
+
+    def toRow(): Row = {
+      Row(
+        id_val
+        , yg_name_val
+        , bg_name_val
+        , yg_cid_val
+        , bg_cid_val
+        , yg_reg_status_val
+        , yg_province_code_val
+        , yg_city_code_val
+        , yg_county_code_val
+        , yg_reg_location_val
+        , yg_estiblish_time_val
+        , yg_category_code_val
+        , yg_reg_capital_val
+        , yg_phones_val
+        , yg_emails_val
+        , bg_reg_status_val
+        , bg_province_code_val
+        , bg_city_code_val
+        , bg_county_code_val
+        , bg_reg_location_val
+        , bg_estiblish_time_val
+        , bg_category_code_val
+        , bg_reg_capital_val
+        , bg_phones_val
+        , bg_emails_val
+        , deleted_val
+        , update_time_val
+        , create_time_val
+      )
+    }
+
+  }
+
+  val target_ads_creditor_info = "xjk_ads_creditor_info_test"
+  val target_ads_eci_debtor_relation = "xjk_ads_eci_debtor_relation_test"
+  val target_write_debtor_relation = "xjk_write_debtor_relation_test"
+
+  case class DebtorRelation(s: SparkSession, ds: String) extends LoggingUtils with BaseFunc with Logging {
+    @(transient@getter) val spark: SparkSession = s
+
+    def inc(): Unit = {
+      val yesterday_ds = atDaysAfter(-1, ds)
+      company_split()
+      val sql1 =
+        s"""
+           |SELECT  id
+           |        ,case_id
+           |        ,case_no
+           |        ,case_type
+           |        ,case_reason
+           |        ,case_stage
+           |        ,case_amt
+           |        ,ys_yg_xjk as ys_yg
+           |        ,ys_bg_xjk as ys_bg
+           |        ,judge_date
+           |        ,zhixing_date
+           |        ,zhixing_result
+           |        ,curr_stage
+           |        ,curr_date
+           |        ,curr_result
+           |        ,'' as ys_yg_cid
+           |        ,'' as ys_bg_cid
+           |        ,'' as yg_reg_status
+           |        ,'' as yg_province_code
+           |        ,'' as yg_city_code
+           |        ,'' as yg_county_code
+           |        ,'' as yg_reg_location
+           |        ,'' as yg_estiblish_time
+           |        ,'' as yg_category_code
+           |        ,'' as yg_reg_capital
+           |        ,'' as yg_phones
+           |        ,'' as yg_emails
+           |        ,'' as bg_reg_status
+           |        ,'' as bg_province_code
+           |        ,'' as bg_city_code
+           |        ,'' as bg_county_code
+           |        ,'' as bg_reg_location
+           |        ,'' as bg_estiblish_time
+           |        ,'' as bg_category_code
+           |        ,'' as bg_reg_capital
+           |        ,'' as bg_phones
+           |        ,'' as bg_emails
+           |        ,CASE (zhixing_result = 2 OR( zhixing_result IS NULL AND curr_result = '胜')) WHEN TRUE THEN 0 ELSE 1 END AS deleted
+           |        ,1 as flag
+           |FROM    winhc_eci_dev.inc_ods_creditor_info
+           |LATERAL VIEW explode(company_split(ys_bg)) a AS ys_bg_xjk
+           |LATERAL VIEW explode(company_split(ys_yg)) b AS ys_yg_xjk
+           |WHERE   ds = $ds
+           |AND     yg_type = '企业'
+           |AND     bg_type = '企业'
+           |AND     LENGTH(ys_yg_xjk) > 4
+           |AND     LENGTH(ys_bg_xjk) > 4
+           |""".stripMargin
+
+      val df = sql(sql1)
+
+      //增量文书原被告打平、补全数据
+      val schema = StructType(df.schema.map(s => {
+        StructField(s.name, s.dataType, nullable = true)
+      }))
+      val inc_rdd = df
+        .rdd
+        .mapPartitions(rp => {
+          val restClient = EsRestUtils.getRestClient()
+
+          val rd = rp.map(r => {
+            try {
+              val id = r.getAs[Long]("id")
+              val case_id = r.getAs[Long]("case_id")
+              val case_no = r.getAs[String]("case_no")
+              val case_type = r.getAs[String]("case_type")
+              val case_reason = r.getAs[String]("case_reason")
+              val case_stage = r.getAs[String]("case_stage")
+              val case_amt = r.getAs[Double]("case_amt")
+              val ys_yg = r.getAs[String]("ys_yg")
+              val ys_bg = r.getAs[String]("ys_bg")
+              val judge_date = r.getAs[Timestamp]("judge_date")
+              val zhixing_date = r.getAs[String]("zhixing_date")
+              val zhixing_result = r.getAs[String]("zhixing_result")
+              val curr_stage = r.getAs[String]("curr_stage")
+              val curr_date = r.getAs[String]("curr_date")
+              val curr_result = r.getAs[String]("curr_result")
+
+              val yg_map = EsRestUtils.getCidByCompanyName(restClient, ys_yg)
+              val bg_map = EsRestUtils.getCidByCompanyName(restClient, ys_bg)
+
+              val ys_yg_cid = yg_map("cid")
+              val ys_bg_cid = bg_map("cid")
+
+              val yg_reg_status = yg_map("reg_status")
+              val yg_province_code = yg_map("province_code")
+              val yg_city_code = yg_map("city_code")
+              val yg_county_code = yg_map("county_code")
+              val yg_reg_location = yg_map("reg_location")
+              val yg_estiblish_time = yg_map("estiblish_time")
+              val yg_category_code = yg_map("category_code")
+              val yg_reg_capital = yg_map("reg_capital")
+              val yg_phones = yg_map("phones")
+              val yg_emails = yg_map("emails")
+              val bg_reg_status = bg_map("reg_status")
+              val bg_province_code = bg_map("province_code")
+              val bg_city_code = bg_map("city_code")
+              val bg_county_code = bg_map("county_code")
+              val bg_reg_location = bg_map("reg_location")
+              val bg_estiblish_time = bg_map("estiblish_time")
+              val bg_category_code = bg_map("category_code")
+              val bg_reg_capital = bg_map("reg_capital")
+              val bg_phones = bg_map("phones")
+              val bg_emails = bg_map("emails")
+
+              val deleted = r.getAs[Integer]("deleted")
+              val flag = r.getAs[Integer]("flag")
+              Row(id
+                , case_id
+                , case_no
+                , case_type
+                , case_reason
+                , case_stage
+                , case_amt
+                , ys_yg
+                , ys_bg
+                , judge_date
+                , zhixing_date
+                , zhixing_result
+                , curr_stage
+                , curr_date
+                , curr_result
+                , ys_yg_cid
+                , ys_bg_cid
+                , yg_reg_status
+                , yg_province_code
+                , yg_city_code
+                , yg_county_code
+                , yg_reg_location
+                , yg_estiblish_time
+                , yg_category_code
+                , yg_reg_capital
+                , yg_phones
+                , yg_emails
+                , bg_reg_status
+                , bg_province_code
+                , bg_city_code
+                , bg_county_code
+                , bg_reg_location
+                , bg_estiblish_time
+                , bg_category_code
+                , bg_reg_capital
+                , bg_phones
+                , bg_emails
+                , deleted
+                , flag)
+            } catch {
+              case e: NoSuchElementException => null
+              case e: Exception => logError(e.getMessage, e)
+                null
+            }
+          }).filter(_ != null)
+          rd
+        })
+        .filter(_ != null)
+
+      spark.createDataFrame(inc_rdd, schema)
+        .createOrReplaceTempView("inc_tmp_creditor_info")
+
+      val cols = getColumns("winhc_eci_dev.ads_creditor_info")
+
+      //全量覆盖写出文书债权关系表
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.$target_ads_creditor_info
+           |SELECT  ${cols.map("tmp." + _).mkString(",")}
+           |FROM    (
+           |            SELECT  *
+           |                    ,row_number() OVER (PARTITION BY a.id,a.ys_yg_cid,a.ys_bg_cid ORDER BY flag DESC) c
+           |            FROM    (
+           |                        SELECT  ${cols.mkString(",")},0 as flag
+           |                        FROM    winhc_eci_dev.ads_creditor_info
+           |                        UNION ALL
+           |                        SELECT  ${cols.mkString(",")},flag
+           |                        FROM    inc_tmp_creditor_info
+           |                    ) AS a
+           |        ) AS tmp
+           |WHERE   tmp.c = 1
+           |""".stripMargin)
+
+
+      //全量分区写出企业关系表
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.$target_ads_eci_debtor_relation PARTITION(ds='$ds')
+           |SELECT  CONCAT_WS('_',ys_yg_cid,ys_bg_cid) AS id
+           |        ,ys_yg AS yg_name
+           |        ,ys_bg AS bg_name
+           |        ,ys_yg_cid AS yg_cid
+           |        ,ys_bg_cid AS bg_cid
+           |        ,yg_reg_status
+           |        ,yg_province_code
+           |        ,yg_city_code
+           |        ,yg_county_code
+           |        ,yg_reg_location
+           |        ,yg_estiblish_time
+           |        ,yg_category_code
+           |        ,yg_reg_capital
+           |        ,yg_phones
+           |        ,yg_emails
+           |        ,bg_reg_status
+           |        ,bg_province_code
+           |        ,bg_city_code
+           |        ,bg_county_code
+           |        ,bg_reg_location
+           |        ,bg_estiblish_time
+           |        ,bg_category_code
+           |        ,bg_reg_capital
+           |        ,bg_phones
+           |        ,bg_emails
+           |        ,deleted
+           |        ,now() AS update_time
+           |        ,now() AS create_time
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY ys_yg_cid,ys_bg_cid ORDER BY deleted) AS num
+           |            FROM    winhc_eci_dev.$target_ads_creditor_info
+           |        ) AS t
+           |WHERE   t.num = 1
+           |""".stripMargin)
+
+      val eci_cols = getColumns(s"winhc_eci_dev.$target_ads_eci_debtor_relation")
+
+      println(eci_cols)
+
+
+      val write_schema = StructType(sql(
+        s"""
+           |select * from winhc_eci_dev.$target_ads_eci_debtor_relation where 1==0 and ds = $ds
+           |""".stripMargin).schema.filter(!_.name.equals("ds")))
+
+      //写出差值
+      val write_rdd = sql(
+        s"""
+           |select *
+           |from winhc_eci_dev.$target_ads_eci_debtor_relation
+           |where ds=$ds or ds=${yesterday_ds}
+           |""".stripMargin)
+        .rdd
+        .map(r => {
+          val id = r.getAs[String]("id")
+          val map = eci_cols.map(f => (f, r.getAs(f))).toMap
+          val c = parseMap(map)
+          (id, c)
+        }).groupByKey()
+        .map(x => {
+          val id = x._1
+          val list = x._2
+          if (list.size == 1) {
+            list.head.toRow
+          } else {
+            if (list.size > 2) {
+              logger.warn("list.size >2 ,id:" + id)
+            }
+            val all_map = list.map(e => (e.ds, e)).toMap
+            val today = all_map(ds)
+            val yesterday = all_map(yesterday_ds)
+            if (today.deleted == yesterday.deleted) {
+              null
+            } else {
+              today.setCreateTime(yesterday.create_time)
+              today.toRow
+            }
+          }
+        }).filter(_ != null)
+
+      val write_df = spark.createDataFrame(write_rdd, write_schema)
+
+      write_df.write.mode(if (isWindows) "append" else "overwrite").insertInto(s"winhc_eci_dev.$target_write_debtor_relation")
+
+    }
+  }
+
+  def main(args: Array[String]): Unit = {
+    val Array(ds) = args
+
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark = SparkUtils.InitEnv("eci_debtor_relation", config)
+    DebtorRelation(spark, ds).inc
+    spark.stop()
+  }
+
+}

+ 61 - 0
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -0,0 +1,61 @@
+package com.winhc.bigdata.spark.udf
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/10 13:49
+ * @Description:
+ */
+trait BaseFunc {
+  @(transient@getter) protected val spark: SparkSession
+  private val pattern = "[^\\u4e00-\\u9fa5a-zA-Z \\(\\)().]+".r
+
+  def tyc_split(): Unit = {
+    spark.udf.register("tyc_split", (name: String) => {
+      if (StringUtils.isEmpty(name)) {
+        null
+      } else {
+        name.split("\t;\t")
+      }
+    })
+  }
+
+  def company_split(): Unit = {
+    spark.udf.register("company_split", (name: String) => {
+      if (StringUtils.isEmpty(name)) {
+        Array("")
+      } else {
+        pattern.split(name)
+      }
+    })
+  }
+
+
+  def area_code(): Unit = {
+    spark.udf.register("get_province_code", (name: String) => {
+      if (StringUtils.isNotEmpty(name) && name.trim.length == 6) {
+        name.trim.substring(0, 2)
+      } else {
+        null
+      }
+    })
+    spark.udf.register("get_city_code", (name: String) => {
+      if (StringUtils.isNotEmpty(name) && name.trim.length == 6) {
+        name.trim.substring(2, 4)
+      } else {
+        null
+      }
+    })
+    spark.udf.register("get_county_code", (name: String) => {
+      if (StringUtils.isNotEmpty(name) && name.trim.length == 6) {
+        name.trim.substring(4, 6)
+      } else {
+        null
+      }
+    })
+  }
+}

+ 20 - 8
src/main/scala/com/winhc/bigdata/spark/utils/EsRestUtils.scala

@@ -43,12 +43,12 @@ object EsRestUtils {
     restClient
   }
 
-  def getCidByCompanyName(restClient: RestClient, companyName: String): String = {
+  def getCidByCompanyName(restClient: RestClient, companyName: String): Map[String, String] = {
     val query =
       s"""
          |{
          |  "_source": {
-         |     "includes": [ "_id" ]
+         |     "includes": [ "_id","reg_status","province_code","city_code","county_code","reg_location","estiblish_time","category_code","reg_capital","phones","emails" ]
          |   },
          |  "query": {
          |    "term": {
@@ -58,8 +58,6 @@ object EsRestUtils {
          |}
          |""".stripMargin
     val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
-
-
     val indexResponse = restClient.performRequest(
       "GET",
       "/winhc-company/company/_search",
@@ -71,16 +69,30 @@ object EsRestUtils {
     val res = EntityUtils.toString(en)
     val list = getIndexResult(res)
     if (list.nonEmpty) {
-      list.head("_id").asInstanceOf[String]
+      val v = list.head
+      val map = v("_source").asInstanceOf[Map[String, String]]
+      val res = Map(
+        "cid" -> v("_id").asInstanceOf[String]
+        , "reg_status" -> map("reg_status")
+        , "province_code" -> map("province_code")
+        , "city_code" -> map("city_code")
+        , "county_code" -> map("county_code")
+        , "reg_location" -> map("reg_location")
+        , "estiblish_time" -> map("estiblish_time")
+        , "category_code" -> map("category_code")
+        , "reg_capital" -> map("reg_capital")
+        , "phones" -> map("phones").asInstanceOf[Seq[String]].mkString(",")
+        , "emails" -> map("emails").asInstanceOf[Seq[String]].mkString(",")
+      )
+      res
     } else {
-      ""
+      Map.empty[String, String]
     }
   }
 
-
   def main(args: Array[String]): Unit = {
     val restClient = getRestClient()
-    val id = getCidByCompanyName(restClient, "珠海格力电器股份有限公司")
+    val id = getCidByCompanyName(restClient, "长春市大轮食品商贸有限公司")
     println(id)
     restClient.close()
   }