Browse Source

Merge remote-tracking branch 'origin/master'

xufei 4 years ago
parent
commit
d8caf0b058

+ 7 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncCompany2Es.scala

@@ -78,6 +78,8 @@ object CompanyIncCompany2Es {
     ,"HUMAN_PID"
     ,"STATUS"
     ,"CREATE_TIME"
+    ,"UPDATE_TIME"
+    ,"DELETED"
   )
 
   case class Company2Es(s: SparkSession, project: String, bizDate: String) extends LoggingUtils with BaseFunc {
@@ -101,7 +103,7 @@ object CompanyIncCompany2Es {
         sys.exit(-999)
       }
 
-      val companyCols = spark.table("ads_company").columns
+      val companyCols = spark.table(s"${project}.ads_company").columns
         .filter(!_.equals("ds"))
         .seq
 
@@ -175,7 +177,7 @@ object CompanyIncCompany2Es {
         sys.exit(-999)
       }
 
-      val companyCols = spark.table("ads_company_human_relation").columns.filter(_!="rowkey")
+      val companyCols = spark.table(s"${project}.ads_company_human_relation").columns
         .filter(!_.equals("ds"))
         .seq
 
@@ -183,14 +185,14 @@ object CompanyIncCompany2Es {
       // 去除数据本身重复
       val df = sql(
         s"""
-           |SELECT  ${companyCols.mkString(",")}
+           |SELECT ${companyCols.mkString(",")}
            |FROM    (
            |            SELECT  CONCAT_WS("_",new_cid,hid) AS rowkey,a.*
            |                    ,row_number() OVER (PARTITION BY a.cid,a.hid,a.human_pid ORDER BY update_time DESC) c
            |            FROM    (
-           |                        SELECT  *
+           |                        SELECT  *,cid as new_cid
            |                        FROM    $project.inc_ods_company_human_relation
-           |                        WHERE   ds > $start_partition and ds <= $end_partition and cid is not null
+           |                        WHERE   ds > $start_partition and ds <= $end_partition and cid is not null and hid is not null
            |                    ) as a
            |        ) AS tmp
            |WHERE   tmp.c = 1

+ 14 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIndexSave2Es.scala

@@ -1,6 +1,7 @@
 package com.winhc.bigdata.spark.jobs
 
 import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
 import org.apache.spark.sql.SparkSession
 
@@ -14,17 +15,23 @@ import scala.collection.mutable
  */
 object CompanyIndexSave2Es {
 
-  case class CompanyIndexSave2Es_all_inc(s: SparkSession, project: String) extends LoggingUtils {
+  case class CompanyIndexSave2Es_all_inc(s: SparkSession, project: String) extends LoggingUtils with BaseFunc {
     @(transient@getter) val spark: SparkSession = s
 
+    val tmp_table = "xjk_company_test_0721_step2"
 
     def calc(): Unit = {
       import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper._
-
       val all_company_max_ds = getLastPartitionsOrElse(s"${project}.ads_company", "0")
+      val code = code2Name()
 
-    /*  println(
+      sql(
+        s"""
+           |DROP TABLE IF EXISTS winhc_eci_dev.$tmp_table
+           |""".stripMargin)
+      sql(
         s"""
+           |CREATE TABLE IF NOT EXISTS winhc_eci_dev.$tmp_table AS
            |SELECT  ${companyIndexFields.map(f => if (f.eq("estiblish_time")) "date_format(tmp.estiblish_time,'yyyy-MM-dd') estiblish_time" else "tmp." + f).mkString(",")}
            |FROM    (
            |            SELECT  ${companyIndexFields.mkString(",")},update_time
@@ -43,12 +50,12 @@ object CompanyIndexSave2Es {
            |        ) AS tmp
            |WHERE   tmp.num = 1
            |""".stripMargin)
-      */
       sql(
         s"""
-           |select * from winhc_test_dev.xjk_tmp_company_all
+           |SELECT  *
+           |FROM    winhc_eci_dev.$tmp_table
            |""".stripMargin)
-        .companyIndexSave2Es()
+        .companyIndexSave2Es(code._1, code._2)
     }
   }
 
@@ -57,7 +64,7 @@ object CompanyIndexSave2Es {
     val map = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
       "spark.debug.maxToStringFields" -> "200",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "2"
+      "spark.hadoop.odps.spark.local.partition.amt" -> "3800"
     )
 
     val spark = SparkUtils.InitEnv("CompanyIndexSave2Es", map)

+ 273 - 12
src/main/scala/com/winhc/bigdata/spark/jobs/chance/Inc_eci_debtor_relation.scala

@@ -3,10 +3,9 @@ package com.winhc.bigdata.spark.jobs.chance
 import java.sql.Timestamp
 import java.util.NoSuchElementException
 
-import com.winhc.bigdata.spark.utils.BaseUtil._
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.udf.BaseFunc
-import com.winhc.bigdata.spark.utils.BaseUtil.atDaysAfter
+import com.winhc.bigdata.spark.utils.BaseUtil.{atDaysAfter, _}
 import com.winhc.bigdata.spark.utils.{EsRestUtils, LoggingUtils, SparkUtils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.types.{StructField, StructType}
@@ -22,6 +21,8 @@ import scala.collection.mutable
  */
 object Inc_eci_debtor_relation {
 
+  private val env = "prod"
+
   def parseMap(map: Map[String, AnyVal]): eci_debtor_relation = {
     val id = map("id").asInstanceOf[String]
     val yg_name = map("yg_name").asInstanceOf[String]
@@ -30,21 +31,33 @@ object Inc_eci_debtor_relation {
     val bg_cid = map("bg_cid").asInstanceOf[String]
     val yg_reg_status = map("yg_reg_status").asInstanceOf[String]
     val yg_province_code = map("yg_province_code").asInstanceOf[String]
+    val yg_province_name = map("yg_province_name").asInstanceOf[String]
     val yg_city_code = map("yg_city_code").asInstanceOf[String]
+    val yg_city_name = map("yg_city_name").asInstanceOf[String]
     val yg_county_code = map("yg_county_code").asInstanceOf[String]
+    val yg_county_name = map("yg_county_name").asInstanceOf[String]
     val yg_reg_location = map("yg_reg_location").asInstanceOf[String]
     val yg_estiblish_time = map("yg_estiblish_time").asInstanceOf[String]
     val yg_category_code = map("yg_category_code").asInstanceOf[String]
+    val yg_category_first = map("yg_category_first").asInstanceOf[String]
+    val yg_category_second = map("yg_category_second").asInstanceOf[String]
+    val yg_category_third = map("yg_category_third").asInstanceOf[String]
     val yg_reg_capital = map("yg_reg_capital").asInstanceOf[String]
     val yg_phones = map("yg_phones").asInstanceOf[String]
     val yg_emails = map("yg_emails").asInstanceOf[String]
     val bg_reg_status = map("bg_reg_status").asInstanceOf[String]
     val bg_province_code = map("bg_province_code").asInstanceOf[String]
+    val bg_province_name = map("bg_province_name").asInstanceOf[String]
     val bg_city_code = map("bg_city_code").asInstanceOf[String]
+    val bg_city_name = map("bg_city_name").asInstanceOf[String]
     val bg_county_code = map("bg_county_code").asInstanceOf[String]
+    val bg_county_name = map("bg_county_name").asInstanceOf[String]
     val bg_reg_location = map("bg_reg_location").asInstanceOf[String]
     val bg_estiblish_time = map("bg_estiblish_time").asInstanceOf[String]
     val bg_category_code = map("bg_category_code").asInstanceOf[String]
+    val bg_category_first = map("bg_category_first").asInstanceOf[String]
+    val bg_category_second = map("bg_category_second").asInstanceOf[String]
+    val bg_category_third = map("bg_category_third").asInstanceOf[String]
     val bg_reg_capital = map("bg_reg_capital").asInstanceOf[String]
     val bg_phones = map("bg_phones").asInstanceOf[String]
     val bg_emails = map("bg_emails").asInstanceOf[String]
@@ -59,21 +72,33 @@ object Inc_eci_debtor_relation {
       , bg_cid
       , yg_reg_status
       , yg_province_code
+      , yg_province_name
       , yg_city_code
+      , yg_city_name
       , yg_county_code
+      , yg_county_name
       , yg_reg_location
       , yg_estiblish_time
       , yg_category_code
+      , yg_category_first
+      , yg_category_second
+      , yg_category_third
       , yg_reg_capital
       , yg_phones
       , yg_emails
       , bg_reg_status
       , bg_province_code
+      , bg_province_name
       , bg_city_code
+      , bg_city_name
       , bg_county_code
+      , bg_county_name
       , bg_reg_location
       , bg_estiblish_time
       , bg_category_code
+      , bg_category_first
+      , bg_category_second
+      , bg_category_third
       , bg_reg_capital
       , bg_phones
       , bg_emails
@@ -90,21 +115,33 @@ object Inc_eci_debtor_relation {
                                  , bg_cid: String
                                  , yg_reg_status: String
                                  , yg_province_code: String
+                                 , yg_province_name: String
                                  , yg_city_code: String
+                                 , yg_city_name: String
                                  , yg_county_code: String
+                                 , yg_county_name: String
                                  , yg_reg_location: String
                                  , yg_estiblish_time: String
                                  , yg_category_code: String
+                                 , yg_category_first: String
+                                 , yg_category_second: String
+                                 , yg_category_third: String
                                  , yg_reg_capital: String
                                  , yg_phones: String
                                  , yg_emails: String
                                  , bg_reg_status: String
                                  , bg_province_code: String
+                                 , bg_province_name: String
                                  , bg_city_code: String
+                                 , bg_city_name: String
                                  , bg_county_code: String
+                                 , bg_county_name: String
                                  , bg_reg_location: String
                                  , bg_estiblish_time: String
                                  , bg_category_code: String
+                                 , bg_category_first: String
+                                 , bg_category_second: String
+                                 , bg_category_third: String
                                  , bg_reg_capital: String
                                  , bg_phones: String
                                  , bg_emails: String
@@ -119,21 +156,33 @@ object Inc_eci_debtor_relation {
     private var bg_cid_val = bg_cid
     private var yg_reg_status_val = yg_reg_status
     private var yg_province_code_val = yg_province_code
+    private var yg_province_name_val = yg_province_name
     private var yg_city_code_val = yg_city_code
+    private var yg_city_name_val = yg_city_name
     private var yg_county_code_val = yg_county_code
+    private var yg_county_name_val = yg_county_name
     private var yg_reg_location_val = yg_reg_location
     private var yg_estiblish_time_val = yg_estiblish_time
     private var yg_category_code_val = yg_category_code
+    private var yg_category_first_val = yg_category_first
+    private var yg_category_second_val = yg_category_second
+    private var yg_category_third_val = yg_category_third
     private var yg_reg_capital_val = yg_reg_capital
     private var yg_phones_val = yg_phones
     private var yg_emails_val = yg_emails
     private var bg_reg_status_val = bg_reg_status
     private var bg_province_code_val = bg_province_code
+    private var bg_province_name_val = bg_province_name
     private var bg_city_code_val = bg_city_code
+    private var bg_city_name_val = bg_city_name
     private var bg_county_code_val = bg_county_code
+    private var bg_county_name_val = bg_county_name
     private var bg_reg_location_val = bg_reg_location
     private var bg_estiblish_time_val = bg_estiblish_time
     private var bg_category_code_val = bg_category_code
+    private var bg_category_first_val = bg_category_first
+    private var bg_category_second_val = bg_category_second
+    private var bg_category_third_val = bg_category_third
     private var bg_reg_capital_val = bg_reg_capital
     private var bg_phones_val = bg_phones
     private var bg_emails_val = bg_emails
@@ -155,21 +204,33 @@ object Inc_eci_debtor_relation {
         , bg_cid_val
         , yg_reg_status_val
         , yg_province_code_val
+        , yg_province_name_val
         , yg_city_code_val
+        , yg_city_name_val
         , yg_county_code_val
+        , yg_county_name_val
         , yg_reg_location_val
         , yg_estiblish_time_val
         , yg_category_code_val
+        , yg_category_first_val
+        , yg_category_second_val
+        , yg_category_third_val
         , yg_reg_capital_val
         , yg_phones_val
         , yg_emails_val
         , bg_reg_status_val
         , bg_province_code_val
+        , bg_province_name_val
         , bg_city_code_val
+        , bg_city_name_val
         , bg_county_code_val
+        , bg_county_name_val
         , bg_reg_location_val
         , bg_estiblish_time_val
         , bg_category_code_val
+        , bg_category_first_val
+        , bg_category_second_val
+        , bg_category_third_val
         , bg_reg_capital_val
         , bg_phones_val
         , bg_emails_val
@@ -181,13 +242,166 @@ object Inc_eci_debtor_relation {
 
   }
 
-  val target_ads_creditor_info = "ads_creditor_info"
-  val target_ads_eci_debtor_relation = "ads_eci_debtor_relation"
-  val target_write_debtor_relation = "ads_write_eci_debtor_relation"
+  val target_ads_creditor_info = "ads_creditor_info" // "ads_creditor_info"
+  val target_ads_eci_debtor_relation ="ads_eci_debtor_relation" //"ads_eci_debtor_relation"
+  val target_write_debtor_relation = "ads_write_eci_debtor_relation"//"ads_write_eci_debtor_relation"
 
   case class DebtorRelation(s: SparkSession, ds: String) extends LoggingUtils with BaseFunc with Logging {
     @(transient@getter) val spark: SparkSession = s
 
+
+    def prefix(): Unit = {
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS `winhc_eci_dev`.`$target_ads_creditor_info` (
+           |  `id` BIGINT,
+           |  `case_id` BIGINT,
+           |  `case_no` STRING,
+           |  `case_type` STRING,
+           |  `case_reason` STRING,
+           |  `case_stage` STRING,
+           |  `case_amt` DOUBLE,
+           |  `ys_yg` STRING,
+           |  `ys_bg` STRING,
+           |  `judge_date` DATETIME,
+           |  `zhixing_date` STRING,
+           |  `zhixing_result` STRING,
+           |  `curr_stage` STRING,
+           |  `curr_date` STRING,
+           |  `curr_result` STRING,
+           |  `ys_yg_cid` STRING COMMENT '一审原告cid',
+           |  `ys_bg_cid` STRING COMMENT '一审被告cid',
+           |  `yg_reg_status` STRING,
+           |  `yg_province_code` STRING,
+           |  `yg_province_name` STRING,
+           |  `yg_city_code` STRING,
+           |  `yg_city_name` STRING,
+           |  `yg_county_code` STRING,
+           |  `yg_county_name` STRING,
+           |  `yg_reg_location` STRING,
+           |  `yg_estiblish_time` STRING,
+           |  `yg_category_code` STRING,
+           |  `yg_category_first` STRING,
+           |  `yg_category_second` STRING,
+           |  `yg_category_third` STRING,
+           |  `yg_reg_capital` STRING,
+           |  `yg_phones` STRING,
+           |  `yg_emails` STRING,
+           |  `bg_reg_status` STRING,
+           |  `bg_province_code` STRING,
+           |  `bg_province_name` STRING,
+           |  `bg_city_code` STRING,
+           |  `bg_city_name` STRING,
+           |  `bg_county_code` STRING,
+           |  `bg_county_name` STRING,
+           |  `bg_reg_location` STRING,
+           |  `bg_estiblish_time` STRING,
+           |  `bg_category_code` STRING,
+           |  `bg_category_first` STRING,
+           |  `bg_category_second` STRING,
+           |  `bg_category_third` STRING,
+           |  `bg_reg_capital` STRING,
+           |  `bg_phones` STRING,
+           |  `bg_emails` STRING,
+           |  `deleted` BIGINT
+           |  )
+           |""".stripMargin)
+
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS `winhc_eci_dev`.`$target_ads_eci_debtor_relation` (
+           |  `id` STRING,
+           |  `yg_name` STRING,
+           |  `bg_name` STRING,
+           |  `yg_cid` STRING,
+           |  `bg_cid` STRING,
+           |  `yg_reg_status` STRING,
+           |  `yg_province_code` STRING,
+           |  `yg_province_name` STRING,
+           |  `yg_city_code` STRING,
+           |  `yg_city_name` STRING,
+           |  `yg_county_code` STRING,
+           |  `yg_county_name` STRING,
+           |  `yg_reg_location` STRING,
+           |  `yg_estiblish_time` STRING,
+           |  `yg_category_code` STRING,
+           |  `yg_category_first` STRING,
+           |  `yg_category_second` STRING,
+           |  `yg_category_third` STRING,
+           |  `yg_reg_capital` STRING,
+           |  `yg_phones` STRING,
+           |  `yg_emails` STRING,
+           |  `bg_reg_status` STRING,
+           |  `bg_province_code` STRING,
+           |  `bg_province_name` STRING,
+           |  `bg_city_code` STRING,
+           |  `bg_city_name` STRING,
+           |  `bg_county_code` STRING,
+           |  `bg_county_name` STRING,
+           |  `bg_reg_location` STRING,
+           |  `bg_estiblish_time` STRING,
+           |  `bg_category_code` STRING,
+           |  `bg_category_first` STRING,
+           |  `bg_category_second` STRING,
+           |  `bg_category_third` STRING,
+           |  `bg_reg_capital` STRING,
+           |  `bg_phones` STRING,
+           |  `bg_emails` STRING,
+           |  `deleted` BIGINT,
+           |  `update_time` DATETIME,
+           |  `create_time` DATETIME)
+           |PARTITIONED BY (
+           |  `ds` STRING)
+           |LIFECYCLE 30
+           |""".stripMargin)
+
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS `${if (env.equals("dev")) "winhc_eci_dev" else "winhc_eci"}`.`$target_write_debtor_relation` (
+           |  `id` STRING,
+           |  `yg_name` STRING,
+           |  `bg_name` STRING,
+           |  `yg_cid` STRING,
+           |  `bg_cid` STRING,
+           |  `yg_reg_status` STRING,
+           |  `yg_province_code` STRING,
+           |  `yg_province_name` STRING,
+           |  `yg_city_code` STRING,
+           |  `yg_city_name` STRING,
+           |  `yg_county_code` STRING,
+           |  `yg_county_name` STRING,
+           |  `yg_reg_location` STRING,
+           |  `yg_estiblish_time` STRING,
+           |  `yg_category_code` STRING,
+           |  `yg_category_first` STRING,
+           |  `yg_category_second` STRING,
+           |  `yg_category_third` STRING,
+           |  `yg_reg_capital` STRING,
+           |  `yg_phones` STRING,
+           |  `yg_emails` STRING,
+           |  `bg_reg_status` STRING,
+           |  `bg_province_code` STRING,
+           |  `bg_province_name` STRING,
+           |  `bg_city_code` STRING,
+           |  `bg_city_name` STRING,
+           |  `bg_county_code` STRING,
+           |  `bg_county_name` STRING,
+           |  `bg_reg_location` STRING,
+           |  `bg_estiblish_time` STRING,
+           |  `bg_category_code` STRING,
+           |  `bg_category_first` STRING,
+           |  `bg_category_second` STRING,
+           |  `bg_category_third` STRING,
+           |  `bg_reg_capital` STRING,
+           |  `bg_phones` STRING,
+           |  `bg_emails` STRING,
+           |  `deleted` BIGINT,
+           |  `update_time` DATETIME,
+           |  `create_time` DATETIME)
+           |""".stripMargin)
+    }
+
+
     def inc(): Unit = {
       val yesterday_ds = atDaysAfter(-1, ds)
       company_split()
@@ -212,27 +426,39 @@ object Inc_eci_debtor_relation {
            |        ,'' as ys_bg_cid
            |        ,'' as yg_reg_status
            |        ,'' as yg_province_code
+           |        ,'' as yg_province_name
            |        ,'' as yg_city_code
+           |        ,'' as yg_city_name
            |        ,'' as yg_county_code
+           |        ,'' as yg_county_name
            |        ,'' as yg_reg_location
            |        ,'' as yg_estiblish_time
            |        ,'' as yg_category_code
+           |        ,'' as yg_category_first
+           |        ,'' as yg_category_second
+           |        ,'' as yg_category_third
            |        ,'' as yg_reg_capital
            |        ,'' as yg_phones
            |        ,'' as yg_emails
            |        ,'' as bg_reg_status
            |        ,'' as bg_province_code
+           |        ,'' as bg_province_name
            |        ,'' as bg_city_code
+           |        ,'' as bg_city_name
            |        ,'' as bg_county_code
+           |        ,'' as bg_county_name
            |        ,'' as bg_reg_location
            |        ,'' as bg_estiblish_time
            |        ,'' as bg_category_code
+           |        ,'' as bg_category_first
+           |        ,'' as bg_category_second
+           |        ,'' as bg_category_third
            |        ,'' as bg_reg_capital
            |        ,'' as bg_phones
            |        ,'' as bg_emails
            |        ,CASE (zhixing_result = 2 OR( zhixing_result IS NULL AND curr_result = '胜')) WHEN TRUE THEN 0 ELSE 1 END AS deleted
            |        ,1 as flag
-           |FROM    winhc_eci.inc_ods_creditor_info
+           |FROM    ${if (env.equals("dev")) "winhc_eci_dev" else "winhc_eci"}.inc_ods_creditor_info
            |LATERAL VIEW explode(company_split(ys_bg)) a AS ys_bg_xjk
            |LATERAL VIEW explode(company_split(ys_yg)) b AS ys_yg_xjk
            |WHERE   ds = $ds
@@ -279,21 +505,33 @@ object Inc_eci_debtor_relation {
 
               val yg_reg_status = yg_map("reg_status")
               val yg_province_code = yg_map("province_code")
+              val yg_province_name = yg_map("province_name")
               val yg_city_code = yg_map("city_code")
+              val yg_city_name = yg_map("city_name")
               val yg_county_code = yg_map("county_code")
+              val yg_county_name = yg_map("county_name")
               val yg_reg_location = yg_map("reg_location")
               val yg_estiblish_time = yg_map("estiblish_time")
               val yg_category_code = yg_map("category_code")
+              val yg_category_first = yg_map("category_first")
+              val yg_category_second = yg_map("category_second")
+              val yg_category_third = yg_map("category_third")
               val yg_reg_capital = yg_map("reg_capital")
               val yg_phones = yg_map("phones")
               val yg_emails = yg_map("emails")
               val bg_reg_status = bg_map("reg_status")
               val bg_province_code = bg_map("province_code")
+              val bg_province_name = bg_map("province_name")
               val bg_city_code = bg_map("city_code")
+              val bg_city_name = bg_map("city_name")
               val bg_county_code = bg_map("county_code")
+              val bg_county_name = bg_map("county_name")
               val bg_reg_location = bg_map("reg_location")
               val bg_estiblish_time = bg_map("estiblish_time")
               val bg_category_code = bg_map("category_code")
+              val bg_category_first = yg_map("category_first")
+              val bg_category_second = yg_map("category_second")
+              val bg_category_third = yg_map("category_third")
               val bg_reg_capital = bg_map("reg_capital")
               val bg_phones = bg_map("phones")
               val bg_emails = bg_map("emails")
@@ -319,21 +557,33 @@ object Inc_eci_debtor_relation {
                 , ys_bg_cid
                 , yg_reg_status
                 , yg_province_code
+                , yg_province_name
                 , yg_city_code
+                , yg_city_name
                 , yg_county_code
+                , yg_county_name
                 , yg_reg_location
                 , yg_estiblish_time
                 , yg_category_code
+                , yg_category_first
+                , yg_category_second
+                , yg_category_third
                 , yg_reg_capital
                 , yg_phones
                 , yg_emails
                 , bg_reg_status
                 , bg_province_code
+                , bg_province_name
                 , bg_city_code
+                , bg_city_name
                 , bg_county_code
+                , bg_county_name
                 , bg_reg_location
                 , bg_estiblish_time
                 , bg_category_code
+                , bg_category_first
+                , bg_category_second
+                , bg_category_third
                 , bg_reg_capital
                 , bg_phones
                 , bg_emails
@@ -352,7 +602,7 @@ object Inc_eci_debtor_relation {
       spark.createDataFrame(inc_rdd, schema)
         .createOrReplaceTempView("inc_tmp_creditor_info")
 
-      val cols = getColumns("winhc_eci_dev.ads_creditor_info")
+      val cols = getColumns(s"winhc_eci_dev.$target_ads_creditor_info")
 
       //全量覆盖写出文书债权关系表
       sql(
@@ -385,21 +635,33 @@ object Inc_eci_debtor_relation {
            |        ,ys_bg_cid AS bg_cid
            |        ,yg_reg_status
            |        ,yg_province_code
+           |        ,yg_province_name
            |        ,yg_city_code
+           |        ,yg_city_name
            |        ,yg_county_code
+           |        ,yg_county_name
            |        ,yg_reg_location
            |        ,yg_estiblish_time
            |        ,yg_category_code
+           |        ,yg_category_first
+           |        ,yg_category_second
+           |        ,yg_category_third
            |        ,yg_reg_capital
            |        ,yg_phones
            |        ,yg_emails
            |        ,bg_reg_status
            |        ,bg_province_code
+           |        ,bg_province_name
            |        ,bg_city_code
+           |        ,bg_city_name
            |        ,bg_county_code
+           |        ,bg_county_name
            |        ,bg_reg_location
            |        ,bg_estiblish_time
            |        ,bg_category_code
+           |        ,bg_category_first
+           |        ,bg_category_second
+           |        ,bg_category_third
            |        ,bg_reg_capital
            |        ,bg_phones
            |        ,bg_emails
@@ -416,9 +678,6 @@ object Inc_eci_debtor_relation {
 
       val eci_cols = getColumns(s"winhc_eci_dev.$target_ads_eci_debtor_relation")
 
-      println(eci_cols)
-
-
       val write_schema = StructType(sql(
         s"""
            |select * from winhc_eci_dev.$target_ads_eci_debtor_relation where 1==0 and ds = $ds
@@ -464,7 +723,7 @@ object Inc_eci_debtor_relation {
       write_df
         .write
         .mode(if (isWindows) "append" else "overwrite")
-        .insertInto(s"winhc_eci.$target_write_debtor_relation")
+        .insertInto(s"${if (env.equals("dev")) "winhc_eci_dev" else "winhc_eci"}.$target_write_debtor_relation")
 
     }
   }
@@ -477,7 +736,9 @@ object Inc_eci_debtor_relation {
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
     val spark = SparkUtils.InitEnv("eci_debtor_relation", config)
-    DebtorRelation(spark, ds).inc
+    val v = DebtorRelation(spark, ds)
+//    v.prefix()
+    v.inc()
     spark.stop()
   }
 

+ 330 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/creditor_info_add_other.scala

@@ -0,0 +1,330 @@
+package com.winhc.bigdata.spark.jobs.chance
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/13 11:14
+ * @Description: 全量文书债权表补充数据
+ */
+object creditor_info_add_other {
+
+  case class add_cols_other(s: SparkSession, project: String) extends LoggingUtils with BaseFunc {
+    @(transient@getter) val spark: SparkSession = s
+
+    def prefix(ds: String, targetTable: String): Unit = {
+      company_split()
+      area_code()
+      tyc_split()
+      code2Name()
+
+      sql(
+        s"""
+           |DROP TABLE IF EXISTS winhc_eci_dev.$targetTable
+           |""".stripMargin)
+
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS `winhc_eci_dev`.`$targetTable` (
+           |  `id` BIGINT,
+           |  `case_id` BIGINT,
+           |  `case_no` STRING,
+           |  `case_type` STRING,
+           |  `case_reason` STRING,
+           |  `case_stage` STRING,
+           |  `case_amt` DOUBLE,
+           |  `ys_yg` STRING,
+           |  `ys_bg` STRING,
+           |  `judge_date` DATETIME,
+           |  `zhixing_date` STRING,
+           |  `zhixing_result` STRING,
+           |  `curr_stage` STRING,
+           |  `curr_date` STRING,
+           |  `curr_result` STRING,
+           |  `ys_yg_cid` STRING COMMENT '一审原告cid',
+           |  `ys_bg_cid` STRING COMMENT '一审被告cid',
+           |  `yg_reg_status` STRING,
+           |  `yg_province_code` STRING,
+           |  `yg_province_name` STRING,
+           |  `yg_city_code` STRING,
+           |  `yg_city_name` STRING,
+           |  `yg_county_code` STRING,
+           |  `yg_county_name` STRING,
+           |  `yg_reg_location` STRING,
+           |  `yg_estiblish_time` STRING,
+           |  `yg_category_code` STRING,
+           |  `yg_category_first` STRING,
+           |  `yg_category_second` STRING,
+           |  `yg_category_third` STRING,
+           |  `yg_reg_capital` STRING,
+           |  `yg_phones` STRING,
+           |  `yg_emails` STRING,
+           |  `bg_reg_status` STRING,
+           |  `bg_province_code` STRING,
+           |  `bg_province_name` STRING,
+           |  `bg_city_code` STRING,
+           |  `bg_city_name` STRING,
+           |  `bg_county_code` STRING,
+           |  `bg_county_name` STRING,
+           |  `bg_reg_location` STRING,
+           |  `bg_estiblish_time` STRING,
+           |  `bg_category_code` STRING,
+           |  `bg_category_first` STRING,
+           |  `bg_category_second` STRING,
+           |  `bg_category_third` STRING,
+           |  `bg_reg_capital` STRING,
+           |  `bg_phones` STRING,
+           |  `bg_emails` STRING,
+           |  `deleted` BIGINT
+           |  )
+           |""".stripMargin)
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.$targetTable
+           |SELECT  id
+           |        ,case_id
+           |        ,case_no
+           |        ,case_type
+           |        ,case_reason
+           |        ,case_stage
+           |        ,case_amt
+           |        ,ys_yg_xjk AS ys_yg
+           |        ,ys_bg_xjk AS ys_bg
+           |        ,judge_date
+           |        ,zhixing_date
+           |        ,zhixing_result
+           |        ,curr_stage
+           |        ,curr_date
+           |        ,curr_result
+           |        ,'' AS ys_yg_cid
+           |        ,'' AS ys_bg_cid
+           |        ,'' AS yg_reg_status
+           |        ,'' AS yg_province_code
+           |        ,'' AS yg_province_name
+           |        ,'' AS yg_city_code
+           |        ,'' AS yg_city_name
+           |        ,'' AS yg_county_code
+           |        ,'' AS yg_county_name
+           |        ,'' AS yg_reg_location
+           |        ,'' AS yg_estiblish_time
+           |        ,'' AS yg_category_code
+           |        ,'' AS yg_category_first
+           |        ,'' AS yg_category_second
+           |        ,'' AS yg_category_third
+           |        ,'' AS yg_reg_capital
+           |        ,'' AS yg_phones
+           |        ,'' AS yg_emails
+           |        ,'' AS bg_reg_status
+           |        ,'' AS bg_province_code
+           |        ,'' AS bg_province_name
+           |        ,'' AS bg_city_code
+           |        ,'' AS bg_city_name
+           |        ,'' AS bg_county_code
+           |        ,'' AS bg_county_name
+           |        ,'' AS bg_reg_location
+           |        ,'' AS bg_estiblish_time
+           |        ,'' AS bg_category_code
+           |        ,'' AS bg_category_first
+           |        ,'' AS bg_category_second
+           |        ,'' AS bg_category_third
+           |        ,'' AS bg_reg_capital
+           |        ,'' AS bg_phones
+           |        ,'' AS bg_emails
+           |        ,CASE (zhixing_result = 2 OR( zhixing_result IS NULL AND curr_result = '胜')) WHEN TRUE THEN 0 ELSE 1 END AS deleted
+           |FROM    winhc_eci_dev.ods_creditor_info
+           |LATERAL VIEW explode(company_split(ys_bg)) a AS ys_bg_xjk
+           |LATERAL VIEW explode(company_split(ys_yg)) b AS ys_yg_xjk
+           |WHERE   ds = '$ds'
+           |AND     yg_type = '企业'
+           |AND     bg_type = '企业'
+           |AND     LENGTH(ys_yg_xjk) > 4
+           |AND     LENGTH(ys_bg_xjk) > 4
+           |""".stripMargin)
+    }
+
+    def suffix(targetTable: String): Unit = {
+      val cols = getColumns(s"winhc_eci_dev.$targetTable").map("tmp."+_).mkString(",")
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.$targetTable
+           |SELECT  $cols
+           |FROM    (
+           |            SELECT  a.*
+           |                    ,row_number() OVER (PARTITION BY a.id,a.ys_yg_cid,a.ys_bg_cid ORDER BY id DESC) c
+           |            FROM    (
+           |                        SELECT  *
+           |                        FROM    winhc_eci_dev.$targetTable
+           |                        WHERE   ys_yg_cid IS NOT NULL
+           |                        AND     ys_bg_cid IS NOT NULL
+           |                        AND     ys_yg_cid <> ''
+           |                        AND     ys_bg_cid <> ''
+           |                    ) AS a
+           |        ) AS tmp
+           |WHERE   tmp.c = 1
+           |""".stripMargin)
+
+    }
+
+    def calc(pre: String, table: String): Unit = {
+      val fields = Seq(
+        "reg_status", "province_code", "city_code", "county_code", "province_name", "city_name", "county_name", "reg_location", "estiblish_time", "category_code",
+        "category_first", "category_second", "category_third", "reg_capital", "phones", "emails"
+      )
+
+      var org_name = s"ys_${pre}"
+      org_name = org_name.substring(0, org_name.length - 1)
+
+      val cid = "ys_" + pre + "cid"
+      val fs = fields.map(pre + _).toSet ++ Set(cid)
+
+      val str = getColumns(s"winhc_eci_dev.$table").map(f => {
+        if (fs.contains(f)) {
+          if(f.endsWith("cid")){
+            s"tt2.${f.substring(6)} as $f"
+          }else{
+            s"tt2.${f.substring(3)} as $f"
+          }
+        } else {
+          s"tt1.$f"
+        }
+      }).mkString(",")
+
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.$table
+           |SELECT  $str
+           |FROM    (
+           |            SELECT  *
+           |            FROM    winhc_eci_dev.$table
+           |            WHERE   $org_name IS NOT NULL
+           |        ) AS tt1
+           |LEFT JOIN (
+           |              SELECT  cast(tmp.cid as string) cid
+           |                      ,tmp.name
+           |                      ,tmp.history_names
+           |                      ,cast(tmp.current_cid as string) current_cid
+           |                      ,cast(tmp.company_type as string) company_type
+           |                      ,get_province_code(tmp.area_code) as province_code
+           |                      ,get_province_name(tmp.area_code) as province_name
+           |                      ,get_city_code(tmp.area_code) as city_code
+           |                      ,get_city_name(tmp.area_code) as city_name
+           |                      ,get_county_code(tmp.area_code) as county_code
+           |                      ,get_county_name(tmp.area_code) as county_name
+           |                      ,tmp.credit_code
+           |                      ,cast(tmp.reg_status as string) as reg_status
+           |                      ,tmp.reg_location
+           |                      ,date_format(tmp.estiblish_time,'yyyy-MM-dd') estiblish_time
+           |                      ,cast(tmp.lat as string) lat
+           |                      ,cast(tmp.lng as String) lng
+           |                      ,cast(tmp.category_code as string) category_code
+           |                      ,get_category_first(cast(tmp.category_code as string)) category_first
+           |                      ,get_category_second(cast(tmp.category_code as string)) category_second
+           |                      ,get_category_third(cast(tmp.category_code as string)) category_third
+           |                      ,tmp.reg_capital
+           |                      ,cast(tmp.reg_capital_amount as string) reg_capital_amount
+           |                      ,CONCAT_WS(',',tyc_split(tmp.phones)) phones
+           |                      ,CONCAT_WS(',',tyc_split(tmp.emails)) emails
+           |              FROM    (
+           |                          SELECT  cid
+           |                                  ,name
+           |                                  ,history_names
+           |                                  ,current_cid
+           |                                  ,company_type
+           |                                  ,credit_code
+           |                                  ,reg_status
+           |                                  ,area_code
+           |                                  ,reg_location
+           |                                  ,estiblish_time
+           |                                  ,lat
+           |                                  ,lng
+           |                                  ,category_code
+           |                                  ,reg_capital
+           |                                  ,reg_capital_amount
+           |                                  ,phones
+           |                                  ,emails
+           |                                  ,update_time
+           |                                  ,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY update_time DESC ) AS num
+           |                          FROM    (
+           |                                      SELECT  cid
+           |                                              ,name
+           |                                              ,history_names
+           |                                              ,current_cid
+           |                                              ,company_type
+           |                                              ,credit_code
+           |                                              ,reg_status
+           |                                              ,area_code
+           |                                              ,reg_location
+           |                                              ,estiblish_time
+           |                                              ,lat
+           |                                              ,lng
+           |                                              ,category_code
+           |                                              ,reg_capital
+           |                                              ,reg_capital_amount
+           |                                              ,phones
+           |                                              ,emails
+           |                                              ,update_time
+           |                                      FROM    winhc_eci_dev.ads_company
+           |                                      WHERE   ds = 20200604
+           |                                      UNION ALL
+           |                                      SELECT  cid
+           |                                              ,name
+           |                                              ,history_names
+           |                                              ,current_cid
+           |                                              ,company_type
+           |                                              ,credit_code
+           |                                              ,reg_status
+           |                                              ,area_code
+           |                                              ,reg_location
+           |                                              ,estiblish_time
+           |                                              ,lat
+           |                                              ,lng
+           |                                              ,category_code
+           |                                              ,reg_capital
+           |                                              ,reg_capital_amount
+           |                                              ,phones
+           |                                              ,emails
+           |                                              ,update_time
+           |                                      FROM    winhc_eci_dev.inc_ads_company
+           |                                      WHERE   ds > 20200604
+           |                                  )
+           |                      ) AS tmp
+           |              WHERE   tmp.num = 1
+           |          ) AS tt2
+           |ON      tt1.$org_name = tt2.name
+           |""".stripMargin)
+
+    }
+  }
+
+
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    val map = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "3800"
+    )
+    val Array(ds) = args
+    //    val ds = "20200721"
+    val spark = SparkUtils.InitEnv("add_cols_other", map)
+
+    val table = "xjk_tmp_ads_cre_info_test_v2"
+
+    val v = add_cols_other(spark, project)
+    v.prefix(ds, table)
+
+    for (pre <- Seq("yg_", "bg_")) {
+      v.calc(pre, table)
+    }
+
+    v.suffix(table)
+    spark.stop()
+  }
+}