فهرست منبع

Merge remote-tracking branch 'origin/master'

xufei 4 سال پیش
والد
کامیت
c09bc8d333

+ 0 - 183
src/main/scala/com/winhc/bigdata/spark/jobs/chance/CompanyChangeHandle.scala

@@ -1,7 +1,6 @@
 package com.winhc.bigdata.spark.jobs.chance
 
 import com.winhc.bigdata.spark.utils.BaseUtil.cleanup
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.internal.Logging
 
@@ -70,186 +69,4 @@ trait CompanyChangeHandle extends Serializable with Logging {
       null
     }
   }
-}
-
-//土地公示
-case class company_land_publicity(equCols: Seq[String]) extends CompanyChangeHandle with Serializable {
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = {
-    val str = ChangeExtractUtils.getTags(newMap, "地块公示", Array("project_name", "location", "application_name", "area", "final_price", "land_num"))
-    str
-  }
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("publication_start_date")
-
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("project_name"), s"${newMap("project_name")}地块公示发生变更")
-
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("project_name"), s"新增${newMap("project_name")}地块公示")
-}
-
-
-case class company(equCols: Seq[String]) extends CompanyChangeHandle with Serializable {
-  override def getCid(rowkey: String, newMap: Map[String, String]): String = rowkey
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = "1"
-
-  override def getBizTime(newMap: Map[String, String]): String = "业务时间"
-
-  override def getUpdateTitle(newMap: Map[String, String]): String = "更新一家公司"
-
-  override def getInsertTitle(newMap: Map[String, String]): String = "新增一家公司"
-}
-
-//商标
-case class company_tm(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("tm_name"), s"${newMap("tm_name")}商标发生变更")
-
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("tm_name"), s"新增${newMap("tm_name")}商标")
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("商标", newMap("tm_name"), newMap("app_date"), newMap("reg_no"))
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("app_date")
-}
-
-//专利
-case class company_patent_list(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"${newMap("title")}专利发生变更")
-
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"新增${newMap("title")}专利")
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("专利", newMap("title"), newMap("app_date"), newMap("app_number"))
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("app_date")
-}
-
-//资质证书
-case class company_certificate(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("cert_no"), s"${newMap("cert_no")}资质证书发生变更")
-
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("cert_no"), s"新增${newMap("cert_no")}资质证书")
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("资质证书", newMap("cert_no"), newMap("start_date"), newMap("type"))
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("start_date")
-}
-
-//作品著作权
-case class company_copyright_works_list(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("name"), s"${newMap("name")}作品著作权发生变更")
-
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("name"), s"新增${newMap("name")}作品著作权")
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("作品著作权", newMap("name"), newMap("reg_time"), newMap("reg_num"))
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("reg_time")
-}
-
-//软件著作权
-case class company_copyright_reg_list(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("full_name"), s"${newMap("full_name")}软件著作权发生变更")
-
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("full_name"), s"新增${newMap("full_name")}软件著作权")
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("软件著作权", newMap("full_name"), newMap("reg_time"), newMap("reg_num"))
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("reg_time")
-}
-
-//网站
-case class company_icp(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("domain"), s"${newMap("domain")}网站备案发生变更")
-
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("domain"), s"新增${newMap("domain")}网站备案")
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("网站备案", newMap("domain"), newMap("examine_date"), newMap("liscense"))
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("examine_date")
-}
-
-
-//购地信息
-case class company_land_announcement(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("project_name"), s"${newMap("project_name")}购地信息发生变更")
-
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("project_name"), s"新增${newMap("project_name")}购地信息")
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "购地信息", Array("project_name", "project_loc", "area", "tran_price", "e_number"))
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("contract_date")
-}
-
-//招聘
-case class company_employment(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"${newMap("title")}招聘信息发生变更")
-
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"新增${newMap("title")}招聘信息")
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "招聘", Array("title", "city->employment_city", "employ_num", "start_date"))
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("start_date")
-}
-
-//招投标
-case class company_bid_list(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"${newMap("title")}招投标信息发生变更")
-
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"新增${newMap("title")}招投标信息")
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "招投标", Array("publish_time", "title", "purchaser", "province", "abs"))
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("publish_time")
-}
-
-//土地转让
-case class company_land_transfer(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("location"), s"${newMap("title")}土地转让信息发生变更")
-
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("location"), s"新增${newMap("location")}土地转让信息")
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "土地转让", Array("merchandise_time", "user_pre", "user_now", "location", "area", "merchandise_price", "aministrative_area"))
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("merchandise_time")
-}
-
-//环保处罚
-case class company_env_punishment(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("punish_number"), s"${newMap("title")}环保处罚信息发生变更")
-
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("punish_number"), s"新增${newMap("punish_number")}环保处罚信息")
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "环保处罚", Array("name", "department", "punish_number", "punish_basis", "law_break", "reason", "content", "deleted"))
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("publish_time")
-}
-
-case class company_abnormal_info(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = "经营异常发生变更"
-
-  override def getInsertTitle(newMap: Map[String, String]): String = "新增一条经营异常"
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "经营异常", Array("put_department", "remove_department", "put_reason", "put_date", "remove_date", "remove_reason"))
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("put_date")
-}
-
-//欠税
-case class company_own_tax(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("name"), s"${newMap("name")}欠税公告发生变更")
-
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("name"), s"新增${newMap("name")}欠税公告")
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("欠税公告", newMap("name"), newMap("publish_date"), newMap("tax_num"))
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("publish_date")
-}
-
-//被执行人
-case class company_zxr_list(equCols: Seq[String]) extends CompanyChangeHandle {
-  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("name"), s"${newMap("name")}被执行人发生变更")
-
-  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("name"), s"新增${newMap("name")}被执行人")
-
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap,"成为被执行人", Array("case_create_time", "case_no", "exec_money"))
-
-  override def getBizTime(newMap: Map[String, String]): String = newMap("case_create_time")
 }

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -174,6 +174,7 @@ object CompanyDynamic {
            |SELECT ${cols.mkString(",")}
            |FROM
            |    company_dynamic_tmp$tableName
+           |WHERE id IS NOT NULL
            |""".stripMargin)
     }
   }

+ 86 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic2Es.scala

@@ -0,0 +1,86 @@
+package com.winhc.bigdata.spark.jobs.dynamic
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/9/15 14:27
+ * @Description:
+ */
+object CompanyDynamic2Es {
+
+  def main(args: Array[String]): Unit = {
+    val Array(project, ds) = args
+
+    println(
+      s"""
+         |project: $project
+         |ds: $ds
+         |""".stripMargin)
+
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark = SparkUtils.InitEnv("CompanyDynamic2Es", config)
+    CompanyDynamic2Es(s = spark, project = project).save(ds)
+    spark.stop()
+  }
+
+}
+
+case class CompanyDynamic2Es(s: SparkSession,
+                             project: String
+                            ) extends LoggingUtils with Logging {
+  private val env = "prod"
+
+  @(transient@getter) val spark: SparkSession = s
+
+  def save(ds: String): Unit = {
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE ${getEnvProjectName(env, "winhc_eci_dev")}.ads_company_dynamic_2_es   PARTITION(ds,tn)
+         |SELECT  t4.id
+         |        ,t4.cid
+         |        ,t4.cname
+         |        ,t4.info_type
+         |        ,t4.rta_desc
+         |        ,t4.change_content
+         |        ,t4.change_time
+         |        ,t4.biz_id
+         |        ,t4.sub_info_type
+         |        ,t4.info_risk_level
+         |        ,t4.winhc_suggest
+         |        ,t4.create_time
+         |        ,t4.ds
+         |        ,t4.tn
+         |FROM    (
+         |            SELECT  DISTINCT t2.new_cid AS cid
+         |            FROM    (
+         |                        SELECT  *
+         |                        FROM    ${getEnvProjectName(env, "winhc_eci_dev")}.ods_radar_rta
+         |                        WHERE   ds = '$ds'
+         |                    ) AS t1
+         |            JOIN    (
+         |                        SELECT  *
+         |                        FROM    winhc_eci_dev.base_company_mapping
+         |                        WHERE   ds = '$ds'
+         |                    ) AS t2
+         |            ON      t1.comp_name = t2.cname
+         |        ) AS t3
+         |JOIN    (
+         |            SELECT  *
+         |            FROM    ${getEnvProjectName(env, "winhc_eci_dev")}.ads_company_dynamic
+         |            WHERE   ds = '$ds'
+         |        ) AS t4
+         |ON      t3.cid = t4.cid
+         |""".stripMargin)
+  }
+}

+ 6 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandleUtils.scala

@@ -10,6 +10,11 @@ import com.winhc.bigdata.spark.utils.DateUtils
  */
 object CompanyDynamicHandleUtils {
   def getDynamicId(cid: String, rta_desc: String, biz_id: String, change_time: String): String = {
-    s"$cid-${SecureUtil.md5(rta_desc + biz_id)}-${DateUtils.toUnixTimestamp(date = change_time)}"
+    val timestamp = DateUtils.toUnixTimestamp(date = change_time)
+    // 过滤1970年以前的数据
+    if (timestamp < 0)
+      null
+    else
+      s"${cid}_${9999999999L - timestamp}_${SecureUtil.md5(rta_desc + biz_id)}"
   }
 }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_annual_report_out_guarantee.scala

@@ -16,7 +16,7 @@ case class company_annual_report_out_guarantee() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = "对外担保"
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = "新增一条对外担保"
 
   /**
    * 变更内容
@@ -25,7 +25,7 @@ case class company_annual_report_out_guarantee() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map("credito_term") + new_map("guarantee_way")
+//  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map("credito_term") + new_map("guarantee_way")
   /**
    * 风险等级
    *

+ 7 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_bid_list.scala

@@ -71,4 +71,11 @@ case class company_bid_list() extends CompanyDynamicHandle {
       super.handle(rowkey, bizDate, cid, change_fields, old_map, new_map, cname, "该企业发布或参与招投标行为")
     }
   }
+
+  /**
+   * 信息类型,大类
+   *
+   * @return
+   */
+  override protected def get_info_type(): String = "company_bid"
 }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_holder.scala

@@ -26,9 +26,9 @@ case class company_holder() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
     if (old_map == null) {
-      "insert"
+      "新增股东"
     } else {
-      "update"
+      "股东出资发生变化"
     }
   }
 

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_public_announcement2_list.scala

@@ -19,8 +19,8 @@ case class company_public_announcement2_list()extends CompanyDynamicHandle {
   override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
     s"""票号:${new_map("bill_num")}
        |申请人:${new_map("cname")}
-       |票面金额:${new_map("start_date")}
-       |公告日期:${new_map("end_date")}""".stripMargin
+       |票面金额:${new_map("bill_amt")}元
+       |公告日期:${new_map("publish_date")}""".stripMargin
   }
   /**
    * 变更时间

+ 47 - 82
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation.scala

@@ -20,50 +20,75 @@ case class JudicialCaseRelation(s: SparkSession,
                                 project: String //表所在工程名
                                ) extends LoggingUtils with Logging with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
-  val table_id_map = Map("justicase" -> "case_id")
-  val pat = ".*\\d+.*".r
+  private val table_id_map = Map("justicase" -> "case_id")
+  private val pat = ".*\\d+.*".r
 
   import spark.implicits._
 
-  def all(tableName: String): Unit = {
+  def etl(): Unit = {
+    val ds = "20200913"
+    etl_wenshu(ds)
+    relationByGroup()
+  }
+
+  private def etl_wenshu(ds: String): Unit = {
+    def tableName = "justicase"
+
     val table_id = table_id_map(tableName)
-    val ods_table_name = s"ods_$tableName"
-    val ods_last_ds = getLastPartitionsOrElse(ods_table_name, "0")
-    //    val other_cols = getColumns(ods_table_name).diff(Seq("ds", "case_no", "connect_case_no", table_id))
-    val other_cols = Seq("yg_name", "court_name", "case_no", "bg_name")
+    val other_cols = Seq("yg_name", "court_name", "case_no", "bg_name") ++ Seq(table_id,"ds","connect_case_no")
+
+    val ods_end_ds = getLastPartitionsOrElse(s"winhc_eci_dev.ods_$tableName", "0")
+    val tmp_tab = s"all_${tableName}_tmp_$ods_end_ds"
 
     sql(
       s"""
          |SELECT  *
-         |FROM    winhc_eci_dev.$ods_table_name lateral view explode(split(connect_case_no,'\n')) t as single_connect_case_no
-         |WHERE   ds = '$ods_last_ds'
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY $table_id ORDER BY ds DESC ) AS num
+         |            FROM    (
+         |                        SELECT  ${other_cols.mkString(",")}
+         |                        FROM    winhc_eci_dev.ods_$tableName
+         |                        WHERE   ds = '$ods_end_ds'
+         |                        UNION ALL
+         |                        SELECT  ${other_cols.mkString(",")}
+         |                        FROM    winhc_eci_dev.inc_ods_$tableName
+         |                        WHERE   ds > $ods_end_ds
+         |                    ) AS t1
+         |        ) AS t2
+         |WHERE   t2.num = 1
          |""".stripMargin)
-      .createOrReplaceTempView(s"all_case_tmp_$tableName")
+      .createOrReplaceTempView(tmp_tab)
+
 
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ods_last_ds',tn='$tableName')
+         |SELECT  *
+         |FROM    $tmp_tab lateral view explode(split(connect_case_no,'\\n')) t as single_connect_case_no
+         |""".stripMargin)
+      .cache()
+      .createOrReplaceTempView(s"explode_$tmp_tab")
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ds',tn='$tableName')
          |SELECT  $table_id as id
          |        , 1 as main_case_no
          |        ,case_no
          |        ,${getStrToMap(other_cols)} as case_attribute
-         |FROM    all_case_tmp_$tableName
+         |FROM    explode_$tmp_tab
          |UNION ALL
          |SELECT  $table_id as id
          |        , 0 as main_case_no
          |        ,single_connect_case_no as case_no
          |        ,${getStrToMap(other_cols)} as case_attribute
-         |FROM    all_case_tmp_$tableName
+         |FROM    explode_$tmp_tab
          |WHERE   single_connect_case_no is not null
          |""".stripMargin)
   }
 
 
-  def inc(tableName: String, ds: String): Unit = {
-  }
-
-
-  def relationByGroup(): Unit = {
+  private def relationByGroup(): Unit = {
     val dwd_last_ds = getLastPartitionsOrElse("winhc_eci_dev.dwd_judicial_case", "0")
     spark.udf.register("case_equ", case_equ _)
     spark.udf.register("str_sort", (v1: String, v2: String) => Seq(v1, v2).filter(_ != null).sorted.mkString(""))
@@ -174,66 +199,7 @@ case class JudicialCaseRelation(s: SparkSession,
 
   }
 
-  /* def relation(): Unit = {
-     spark.udf.register("case_equ", case_equ _)
-     spark.udf.register("str_sort", sort _)
-     spark.udf.register("match_case_no", match_case_no _)
-     val dwd_last_ds = getLastPartitionsOrElse("winhc_eci_dev.dwd_judicial_case", "0")
-     val ignoreCaseNo = JudicialCaseRelation.getTopCaseNo()
-     sql(
-       s"""
-          | SELECT  *
-          | FROM    winhc_eci_dev.dwd_judicial_case
-          | WHERE   ds = '$dwd_last_ds'
-          | AND     case_no IS NOT NULL
-          | AND     case_no <> ''
-          | AND     match_case_no(case_no)
-          | ${
-         ignoreCaseNo.isEmpty match {
-           case true => ""
-
-           case false => s"AND case_no not in (${ignoreCaseNo.map(ss => "\"" + ss + "\"").mkString(",")})"
-
-         }
-       }
-          |""".stripMargin)
-       .cache()
-       .createOrReplaceTempView("dwd_judicial_case_tmp")
-     sql(
-       s"""
-          |--- INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.xjk_ads_judicial_case_relation3
-          | SELECT  id_1
-          |         ,id_2
-          |         ,case_no_1
-          |         ,case_no_2
-          |         ,tn_1
-          |         ,tn_2
-          |         ,connect_type
-          | FROM    (
-          |             SELECT  *
-          |                     ,ROW_NUMBER() OVER(PARTITION BY xjk_sorted ORDER BY xjk_sorted) AS num
-          |             FROM    (
-          |                        SELECT  t1.id AS id_1
-          |                                ,t2.id AS id_2
-          |                                ,t1.case_no AS case_no_1
-          |                                ,t2.case_no AS case_no_2
-          |                                ,t1.tn AS tn_1
-          |                                ,t2.tn AS tn_2
-          |                                ,1 as connect_type
-          |                                ,str_sort(concat_ws('',t1.id,t1.tn),concat_ws('',t2.id,t2.tn)) as xjk_sorted
-          |                        FROM    (select * from dwd_judicial_case_tmp where main_case_no = 1) AS t1
-          |                        FULL JOIN (select * from dwd_judicial_case_tmp where main_case_no = 0) AS t2
-          |                        ON      t1.case_no = t2.case_no
-          |                        AND     t1.id <> t2.id
-          |                        AND     case_equ(t1.case_attribute , t2.case_attribute)
-          |                     ) AS t1
-          |         ) AS t2
-          | WHERE   t2.num = 1
-          |""".stripMargin)
-   }*/
-
-
-  def getStrToMap(cols: Seq[String]): String = {
+  private def getStrToMap(cols: Seq[String]): String = {
     val set = cols.toSet
     val str = set.map(e => {
       s"concat_ws('\001','$e',cast($e as string))"
@@ -243,7 +209,7 @@ case class JudicialCaseRelation(s: SparkSession,
 
   private def getVal(map: Map[String, String], key: String): String = map.getOrElse(key, "")
 
-  def case_equ(m1: Map[String, String], m2: Map[String, String]): Boolean = {
+  private def case_equ(m1: Map[String, String], m2: Map[String, String]): Boolean = {
     try {
       val current_case_party_list_org: Seq[String] = getVal(m1, "yg_name").split("\n") ++ getVal(m1, "bg_name").split("\n")
       val connect_case_party_list_org: Seq[String] = getVal(m2, "yg_name").split("\n") ++ getVal(m2, "bg_name").split("\n")
@@ -276,9 +242,8 @@ object JudicialCaseRelation {
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     val jcr = JudicialCaseRelation(spark, project = "winhc_eci_dev")
-    //    jcr.all("justicase")
-    //    jcr.relation()
-    jcr.relationByGroup()
+    jcr.etl()
+//    jcr.relationByGroup()
     spark.stop()
   }
 }