Ver código fonte

Merge remote-tracking branch 'origin/master'

许家凯 4 anos atrás
pai
commit
10f949b96d

+ 63 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCidChange.scala

@@ -0,0 +1,63 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.CompanyMapping
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+
+object CompanyCidChange {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyCidChange(spark, project).calc
+    spark.stop()
+  }
+
+}
+
+case class CompanyCidChange(s: SparkSession,
+                            project: String //表所在工程名
+                           ) extends LoggingUtils with CompanyMapping {
+  override protected val spark: SparkSession = s
+
+  def calc = {
+    val ods_company = s"$project.inc_ods_company"
+
+    val ds = BaseUtil.getPartion(ods_company, spark)
+
+    val env = "prod"
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${if (env.equals("dev")) "winhc_eci_dev" else "winhc_eci"}.ads_company_cid_change PARTITION(ds='$ds')
+         |SELECT  old_cid
+         |        ,old_company_name
+         |        ,new_cid
+         |        ,new_company_name
+         |        ,STATUS
+         |        ,DATE
+         |FROM    (
+         |            SELECT  cid AS old_cid
+         |                    ,name AS old_company_name
+         |                    ,current_cid AS new_cid
+         |                    ,NULL AS new_company_name
+         |                    ,0 AS STATUS
+         |                    ,SUBSTR(update_time,1,10) DATE
+         |                    ,ROW_NUMBER() OVER(PARTITION BY cid,current_cid ORDER BY update_time) num
+         |            FROM    $ods_company
+         |            WHERE   ds = '$ds'
+         |            AND     cid IS NOT NULL
+         |            AND     current_cid IS NOT NULL
+         |        )
+         |WHERE   num = 1
+         |""".stripMargin)
+  }
+}

+ 237 - 189
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPreNew.scala

@@ -2,7 +2,8 @@ package com.winhc.bigdata.spark.jobs.judicial
 
 import com.winhc.bigdata.spark.udf.{BaseFunc, CaseReasonAggs, CompanyMapping, CourtRank, NameAggs}
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.EsQueryUtils.updateAliases
+import com.winhc.bigdata.spark.utils.{BaseUtil, EsQueryUtils, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 
@@ -44,6 +45,8 @@ object JudicialCaseRelationPreNew {
     c match {
       case "calc" => r.calc()
       case "precalc" => r.precalc()
+      case "updateIndex" => r.updateIndex()
+      case "createIndex" => r.createIndex()
     }
 
     spark.stop()
@@ -322,186 +325,186 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
     val t5_ds = BaseUtil.getPartion(t5, spark) //映射表分区
 
     //替换司法案件id
-    sql(
-      s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.$t3 partition (ds = '$t1_ds')
-         |SELECT
-         |   judicase_id
-         |   ,flag
-         |   ,title
-         |   ,case_type
-         |   ,case_reason
-         |   ,case_no
-         |   ,court_name
-         |   ,case_stage
-         |   ,lable
-         |   ,map_2_json(${getStrToMap(cols)}) as detail
-         |   ,yg_name
-         |   ,bg_name
-         |   ,date
-         |   ,detail_id
-         |   ,case_amt
-         |FROM
-         |(
-         |   SELECT  COALESCE(b.judicase_id,a.new_judicase_id) judicase_id
-         |           ,a.flag
-         |           ,a.title
-         |           ,a.case_type
-         |           ,a.case_reason
-         |           ,case_no_trim(a.case_no) as case_no
-         |           ,a.court_name
-         |           ,a.case_stage
-         |           ,case_label(a.flag) lable
-         |           ,a.yg_name
-         |           ,a.bg_name
-         |           ,a.date
-         |           ,a.detail_id
-         |           ,a.case_amt
-         |           ,a.bg_name as name
-         |   FROM    (
-         |     SELECT
-         |        judicase_id
-         |        ,flag
-         |        ,title
-         |        ,case_type
-         |        ,case_reason
-         |        ,case_no_trim(case_no) as case_no
-         |        ,court_name
-         |        ,case_stage
-         |        ,replace_char(yg_name) as yg_name
-         |        ,replace_char(bg_name) as bg_name
-         |        ,date
-         |        ,detail_id
-         |        ,case_amt
-         |        ,md5(CLEANUP(case_no_trim(case_no))) as new_judicase_id
-         |     FROM $project.$t2
-         |     WHERE ds= '$t2_ds' and tn not in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
-         |           and case_no_trim(case_no) is not null
-         |           and date is not null and length(date) = 19
-         |   ) a
-         |   LEFT JOIN (
-         |     SELECT case_no_trim(case_no) as case_no,max(judicase_id) judicase_id
-         |     FROM $project.$t2
-         |     WHERE ds = '$t2_ds' and tn in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
-         |     and case_no_trim(case_no) is not null
-         |     GROUP BY case_no
-         |   ) b
-         |   ON  CLEANUP(a.case_no) = CLEANUP(b.case_no)
-         |   UNION ALL
-         |   SELECT   judicase_id
-         |           ,flag
-         |           ,title
-         |           ,case_type
-         |           ,case_reason
-         |           ,case_no_trim(case_no) as case_no
-         |           ,court_name
-         |           ,case_stage
-         |           ,case_label(flag) lable
-         |           ,replace_char(yg_name) as yg_name
-         |           ,replace_char(bg_name) as bg_name
-         |           ,date
-         |           ,detail_id
-         |           ,case_amt
-         |           ,replace_char(bg_name) as name
-         |   FROM $project.$t2
-         |   WHERE ds = '$t2_ds' and tn in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
-         |         and case_no_trim(case_no) is not null
-         |         and date is not null and length(date) = 19
-         |)
-         |""".stripMargin).show(10, false)
+        sql(
+          s"""
+             |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.$t3 partition (ds = '$t1_ds')
+             |SELECT
+             |   judicase_id
+             |   ,flag
+             |   ,title
+             |   ,case_type
+             |   ,case_reason
+             |   ,case_no
+             |   ,court_name
+             |   ,case_stage
+             |   ,lable
+             |   ,map_2_json(${getStrToMap(cols)}) as detail
+             |   ,yg_name
+             |   ,bg_name
+             |   ,date
+             |   ,detail_id
+             |   ,case_amt
+             |FROM
+             |(
+             |   SELECT  COALESCE(b.judicase_id,a.new_judicase_id) judicase_id
+             |           ,a.flag
+             |           ,a.title
+             |           ,a.case_type
+             |           ,a.case_reason
+             |           ,case_no_trim(a.case_no) as case_no
+             |           ,a.court_name
+             |           ,a.case_stage
+             |           ,case_label(a.flag) lable
+             |           ,a.yg_name
+             |           ,a.bg_name
+             |           ,a.date
+             |           ,a.detail_id
+             |           ,a.case_amt
+             |           ,a.bg_name as name
+             |   FROM    (
+             |     SELECT
+             |        judicase_id
+             |        ,flag
+             |        ,title
+             |        ,case_type
+             |        ,case_reason
+             |        ,case_no_trim(case_no) as case_no
+             |        ,court_name
+             |        ,case_stage
+             |        ,replace_char(yg_name) as yg_name
+             |        ,replace_char(bg_name) as bg_name
+             |        ,date
+             |        ,detail_id
+             |        ,case_amt
+             |        ,md5(CLEANUP(case_no_trim(case_no))) as new_judicase_id
+             |     FROM $project.$t2
+             |     WHERE ds= '$t2_ds' and tn not in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
+             |           and case_no_trim(case_no) is not null
+             |           and date is not null and length(date) = 19
+             |   ) a
+             |   LEFT JOIN (
+             |     SELECT case_no_trim(case_no) as case_no,max(judicase_id) judicase_id
+             |     FROM $project.$t2
+             |     WHERE ds = '$t2_ds' and tn in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
+             |     and case_no_trim(case_no) is not null
+             |     GROUP BY case_no
+             |   ) b
+             |   ON  CLEANUP(a.case_no) = CLEANUP(b.case_no)
+             |   UNION ALL
+             |   SELECT   judicase_id
+             |           ,flag
+             |           ,title
+             |           ,case_type
+             |           ,case_reason
+             |           ,case_no_trim(case_no) as case_no
+             |           ,court_name
+             |           ,case_stage
+             |           ,case_label(flag) lable
+             |           ,replace_char(yg_name) as yg_name
+             |           ,replace_char(bg_name) as bg_name
+             |           ,date
+             |           ,detail_id
+             |           ,case_amt
+             |           ,replace_char(bg_name) as name
+             |   FROM $project.$t2
+             |   WHERE ds = '$t2_ds' and tn in ('wenshu','zxr','zxr_person','company_dishonest_info','company_dishonest_info_person')
+             |         and case_no_trim(case_no) is not null
+             |         and date is not null and length(date) = 19
+             |)
+             |""".stripMargin).show(10, false)
 
-    //name 替换 cid
-    sql(
-      s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.$t6 partition (ds = '$t1_ds')
-         |SELECT
-         |        a.judicase_id
-         |        ,flag
-         |        ,title
-         |        ,case_type
-         |        ,case_reason
-         |        ,case_no
-         |        ,court_name
-         |        ,case_stage
-         |        ,lable
-         |        ,detail
-         |        ,yg_name
-         |        ,bg_name
-         |        ,DATE
-         |        ,detail_id
-         |        ,case_amt
-         |        ,coalesce(b.cids,'') AS cids
-         |FROM    (
-         |        SELECT  *
-         |        FROM    $project.$t3
-         |        WHERE   ds = '$t1_ds'
-         |        ) a
-         |LEFT JOIN (
-         |        SELECT
-         |                judicase_id
-         |                ,sort(concat_ws(',',collect_set(cid)),',') cids
-         |        FROM    (
-         |                SELECT
-         |                        e.judicase_id
-         |                        ,f.new_cid cid
-         |                FROM    (
-         |                        SELECT  *
-         |                        FROM    (
-         |                                SELECT
-         |                                        yg_name AS names
-         |                                        ,judicase_id
-         |                                FROM    $project.$t3
-         |                                WHERE   ds = '$t1_ds' AND length(cleanup(yg_name)) >4
-         |                                UNION ALL
-         |                                SELECT
-         |                                        bg_name AS names
-         |                                        ,judicase_id
-         |                                FROM    $project.$t3
-         |                                WHERE   ds = '$t1_ds' AND length(cleanup(bg_name)) >4
-         |                                ) a
-         |                        LATERAL VIEW explode(split(names,',')) t AS name
-         |                        ) e
-         |                JOIN (
-         |                        SELECT
-         |                                cname
-         |                                ,concat_ws(',',collect_set(new_cid)) AS new_cid
-         |                        FROM    $project.$t5
-         |                        WHERE   ds = '$t5_ds' AND length(cleanup(cname)) >4 AND company_type not in ('2','8') AND deleted = '0'
-         |                        GROUP BY cname
-         |                     ) f
-         |                ON      cleanup(e.name) = cleanup(f.cname)
-         |                )
-         |        GROUP BY judicase_id
-         |          ) b
-         |ON      a.judicase_id = b.judicase_id
-         |""".stripMargin)
+        //name 替换 cid
+        sql(
+          s"""
+             |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.$t6 partition (ds = '$t1_ds')
+             |SELECT
+             |        a.judicase_id
+             |        ,flag
+             |        ,title
+             |        ,case_type
+             |        ,case_reason
+             |        ,case_no
+             |        ,court_name
+             |        ,case_stage
+             |        ,lable
+             |        ,detail
+             |        ,yg_name
+             |        ,bg_name
+             |        ,DATE
+             |        ,detail_id
+             |        ,case_amt
+             |        ,coalesce(b.cids,'') AS cids
+             |FROM    (
+             |        SELECT  *
+             |        FROM    $project.$t3
+             |        WHERE   ds = '$t1_ds'
+             |        ) a
+             |LEFT JOIN (
+             |        SELECT
+             |                judicase_id
+             |                ,sort(concat_ws(',',collect_set(cid)),',') cids
+             |        FROM    (
+             |                SELECT
+             |                        e.judicase_id
+             |                        ,f.new_cid cid
+             |                FROM    (
+             |                        SELECT  *
+             |                        FROM    (
+             |                                SELECT
+             |                                        yg_name AS names
+             |                                        ,judicase_id
+             |                                FROM    $project.$t3
+             |                                WHERE   ds = '$t1_ds' AND length(cleanup(yg_name)) >4
+             |                                UNION ALL
+             |                                SELECT
+             |                                        bg_name AS names
+             |                                        ,judicase_id
+             |                                FROM    $project.$t3
+             |                                WHERE   ds = '$t1_ds' AND length(cleanup(bg_name)) >4
+             |                                ) a
+             |                        LATERAL VIEW explode(split(names,',')) t AS name
+             |                        ) e
+             |                JOIN (
+             |                        SELECT
+             |                                cname
+             |                                ,concat_ws(',',collect_set(new_cid)) AS new_cid
+             |                        FROM    $project.$t5
+             |                        WHERE   ds = '$t5_ds' AND length(cleanup(cname)) >4 AND company_type not in ('2','8') AND deleted = '0'
+             |                        GROUP BY cname
+             |                     ) f
+             |                ON      cleanup(e.name) = cleanup(f.cname)
+             |                )
+             |        GROUP BY judicase_id
+             |          ) b
+             |ON      a.judicase_id = b.judicase_id
+             |""".stripMargin)
 
     val second_ds = getSecondLastPartitionOrElse(t6, "0")
     println(s"calc ds: $t2_ds, par ds : $t1_ds, second_ds : $second_ds")
 
     //找出增量数据
-    sql(
-      s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.$t4
-         |SELECT  coalesce(a.judicase_id,b.judicase_id)judicase_id
-         |        ,CASE WHEN a.judicase_id IS NULL THEN 1 ELSE 0 END
-         |FROM    (
-         |            SELECT  judicase_id
-         |                    ,md5(concat_ws('', sort(concat_ws(',',collect_set(concat_ws('',flag,detail_id))),','), sort(concat_ws(',',collect_set(cids)),','))) r1
-         |            FROM    $project.$t6
-         |            WHERE   ds = '$t1_ds'
-         |            GROUP BY judicase_id
-         |        ) a
-         |FULL JOIN (
-         |              SELECT  judicase_id
-         |                      ,md5(concat_ws('', sort(concat_ws(',',collect_set(concat_ws('',flag,detail_id))),','), sort(concat_ws(',',collect_set(cids)),','))) r2
-         |              FROM    $project.$t6
-         |              WHERE   ds = '$second_ds'
-         |              GROUP BY judicase_id
-         |          ) b
-         |ON  r1 = r2
-         |WHERE   r1 IS NULL OR r2 IS NULL
-         |""".stripMargin)
+    //    sql(
+    //      s"""
+    //         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.$t4
+    //         |SELECT  coalesce(a.judicase_id,b.judicase_id)judicase_id
+    //         |        ,CASE WHEN a.judicase_id IS NULL THEN 1 ELSE 0 END
+    //         |FROM    (
+    //         |            SELECT  judicase_id
+    //         |                    ,md5(concat_ws('', sort(concat_ws(',',collect_set(concat_ws('',flag,detail_id))),','), sort(concat_ws(',',collect_set(cids)),','))) r1
+    //         |            FROM    $project.$t6
+    //         |            WHERE   ds = '$t1_ds'
+    //         |            GROUP BY judicase_id
+    //         |        ) a
+    //         |FULL JOIN (
+    //         |              SELECT  judicase_id
+    //         |                      ,md5(concat_ws('', sort(concat_ws(',',collect_set(concat_ws('',flag,detail_id))),','), sort(concat_ws(',',collect_set(cids)),','))) r2
+    //         |              FROM    $project.$t6
+    //         |              WHERE   ds = '$second_ds'
+    //         |              GROUP BY judicase_id
+    //         |          ) b
+    //         |ON  r1 = r2
+    //         |WHERE   r1 IS NULL OR r2 IS NULL
+    //         |""".stripMargin)
 
     //司法案件主表
     sql(
@@ -522,7 +525,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |    case_amt    ,
          |    date        ,
          |    court_level ,
-         |    y.deleted     ,
+         |    0 deleted   ,
          |    cids
          |FROM
          |(
@@ -546,16 +549,16 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |                   SELECT  judicase_id,flag,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,date,case_amt,cids
          |                   ,court_level(court_name) court_level
          |                   FROM    $project.$t6
-         |                   WHERE   ds >= '$second_ds'
+         |                   WHERE   ds = '$t1_ds'
          |                ) a
          |        )
          |GROUP BY judicase_id
          |)x
-         |JOIN
-         |(
-         |   select *
-         |   from $project.$t4
-         |) y on x.judicase_id = y.judicase_id
+         |-- JOIN
+         |-- (
+         |--    select *
+         |--    from $project.$t4
+         |-- ) y on x.judicase_id = y.judicase_id
          |""".stripMargin).show(20, false)
 
     //明细表
@@ -576,7 +579,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |    name_aggs['yg_name'] yg_name,
          |    name_aggs['bg_name'] bg_name,
          |    last_date   ,
-         |    y.deleted
+         |    0 deleted
          |FROM
          |(
          |SELECT  md5(concat_ws('',concat_ws('',judicase_id,${t1_ds.substring(2)}),CLEANUP(case_no))) id
@@ -598,19 +601,64 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |        FROM    (
          |                   SELECT  *
          |                   FROM    $project.$t6
-         |                   WHERE   ds >= '$second_ds' AND length(lable) > 0
+         |                   WHERE   ds = '$t1_ds' AND length(lable) > 0
          |                )a
          |)
          |GROUP BY judicase_id
          |         ,case_no
          |) x
-         |JOIN
-         |(
-         |   select *
-         |   from $project.$t4
-         |) y on x.judicase_id = y.judicase_id
+         |-- JOIN
+         |-- (
+         |--    select *
+         |--    from $project.$t4
+         |-- ) y on x.judicase_id = y.judicase_id
          |""".stripMargin).show(10, false)
 
+    //建表
+    createIndex()
+
+  }
+
+  def createIndex(i1: String = "judicial_case_v", i2: String = "judicial_case_detail_v"): Unit = {
+    val bizdate = BaseUtil.getYesterday()
+    val index1 = s"$i1$bizdate"
+    val index2 = s"$i2$bizdate"
+    val client = EsQueryUtils.getClient()
+    //建表
+    EsQueryUtils.createTable(index1, EsQueryUtils.judicial_case, client)
+    EsQueryUtils.createTable(index2, EsQueryUtils.judicial_case_detail, client)
+    client.close()
+  }
+
+  def updateIndex(pre: String = "winhc_", i1: String = "judicial_case_v", i2: String = "judicial_case_detail_v"): Unit = {
+    val bizdate = BaseUtil.getYesterday()
+    val aliases1 = s"${pre}${i1.replace("_v","")}"
+    val aliases2 = s"${pre}${i2.replace("_v","")}"
+    val index1 = s"$i1$bizdate"
+    val index2 = s"$i2$bizdate"
+    val client = EsQueryUtils.getClient()
+
+    if(!EsQueryUtils.tableExists(index1,client) || !EsQueryUtils.tableExists(index2,client)){
+      println(s"index not exists $index1 -- $index2 ...")
+      sys.exit(-1)
+    }
+    //获取index
+    val list1 = EsQueryUtils.getAliases(aliases1, client)
+    val list2 = EsQueryUtils.getAliases(aliases2, client)
+    //移除索引
+    list1.foreach(updateAliases(_, aliases1, "remove", client))
+    list2.foreach(updateAliases(_, aliases2, "remove", client))
+    //新增索引
+    updateAliases(index1, aliases1, "add", client)
+    updateAliases(index2, aliases2, "add", client)
+    val list3 = EsQueryUtils.getIndexs(index1.substring(0, index1.indexOf("2")), client)
+    val list4 = EsQueryUtils.getIndexs(index2.substring(0, index1.indexOf("2")), client)
+    //删除多余索引
+    if (list3.size > 2 && list4.size > 2) {
+      EsQueryUtils.deleteIndex(list3.head, client)
+      EsQueryUtils.deleteIndex(list4.head, client)
+    }
+    client.close()
   }
 
 }

+ 362 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPreNewTmp2.scala

@@ -0,0 +1,362 @@
+package com.winhc.bigdata.spark.jobs.judicial
+
+import java.util.NoSuchElementException
+
+import com.winhc.bigdata.spark.udf._
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, EsRestUtils, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.broadcast.Broadcast
+import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
+object JudicialCaseRelationPreNewTmp2 {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    val ds = ""
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    JudicialCaseRelationPreNewTmp2(spark, project, ds).calc()
+    spark.stop()
+  }
+}
+
+case class JudicialCaseRelationPreNewTmp2(s: SparkSession, project: String, ds: String
+                                         ) extends LoggingUtils with CompanyMapping with BaseFunc with CourtRank {
+  override protected val spark: SparkSession = s
+
+  def get_seq_by_index(area_code: Broadcast[Map[String, Seq[String]]], code: String, index: Int): String = {
+    val c = area_code.value.getOrElse(code, null)
+    if (c == null) "" else if (c(index) != null) c(index).trim else ""
+  }
+
+  private def get_area_code(code: String, area_code: Broadcast[Map[String, Seq[String]]]) = {
+    val listbuffer = ListBuffer[String]()
+    if (StringUtils.isNotBlank(code) && code.trim.length == 6) {
+      val c = code.trim
+      listbuffer.append(
+        c.substring(0, 2), get_seq_by_index(area_code, c, 0),
+        c.substring(2, 4), get_seq_by_index(area_code, c, 1),
+        c.substring(4, 6), get_seq_by_index(area_code, c, 2))
+    } else {
+      listbuffer.append("", "", "", "", "", "")
+    }
+    listbuffer.toSeq
+  }
+
+
+  private def get_category_code(code: String, category_code: Broadcast[Map[String, Seq[String]]]) = {
+    val listbuffer = ListBuffer[String]()
+    if (StringUtils.isNotBlank(code)) {
+      val c = code.trim
+      (c, get_seq_by_index(category_code, c, 0), get_seq_by_index(category_code, c, 1), get_seq_by_index(category_code, c, 2))
+      listbuffer.append(
+        get_seq_by_index(category_code, c, 0),
+        get_seq_by_index(category_code, c, 1),
+        get_seq_by_index(category_code, c, 2)
+      )
+    } else {
+      listbuffer.append("", "", "")
+    }
+    listbuffer.toSeq
+  }
+
+  def calc(): Unit = {
+    prepareFunctions(spark)
+    map_2_json()
+    case_no_trim_udf()
+    registerCourtRank()
+    val (m1, m2) = code2Name()
+
+    spark.udf.register("category_code", (code: String) => {
+      get_category_code(code, m1)
+    })
+    spark.udf.register("area_code", (code: String) => {
+      get_area_code(code, m2)
+    })
+
+    spark.udf.register("name_aggs", new NameAggs(1000))
+    spark.udf.register("case_reason", new CaseReasonAggs(1000))
+    //预处理数据
+    val t2 = s"ads_judicial_case_relation_pre"
+    var t2_ds = ds
+    var t1_ds = ds
+    if (StringUtils.isBlank(ds)) {
+      t2_ds = BaseUtil.getPartion(t2, "wenshu", spark)
+      t1_ds = t2_ds
+    }
+
+    val t3 = "ads_judicial_case_relation_replace" //司法案件id交换表
+    val t4 = "ads_judicial_case_incr_mapping"
+    val t5 = s"base_company_mapping" //公司name和cid映射
+    val t6 = s"ads_judicial_case_relation_replace_cids" //公司name和cid映射
+    val eci_debtor_relation = "ads_eci_debtor_relation_xf"
+    val deadbeat_company = "ads_deadbeat_company"
+
+
+    //println(schema)
+
+    val t5_ds = BaseUtil.getPartion(t5, spark) //映射表分区
+
+    //    sql(
+    //      """
+    //        |insert overwrite table winhc_eci_dev.tmp_xf_deadbeat_company_deleted
+    //        |SELECT rowkey,cid,name,card_num,publish_date,deleted,tn,flag from (
+    //        |SELECT *,case when tn ='company_zxr' then 7 when tn ='company_dishonest_info' then 3 when tn ='company_zxr_restrict' then 5 else '' end as flag
+    //        |,ROW_NUMBER() OVER(PARTITION BY rowkey,tn ORDER BY publish_date DESC) AS num
+    //        |from ads_deadbeat_company
+    //        |where ds > '0' and deleted = 1 and tn <> 'company_zxr_final_case'
+    //        |)
+    //        |where num = 1;
+    //        |""".stripMargin)
+
+    //    sql(
+    //      s"""
+    //         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.ads_judicial_case_relation_r1
+    //         |SELECT
+    //         |    concat_ws('',x.judicase_id,${t1_ds.substring(2)}) judicase_id,
+    //         |    title       ,
+    //         |    case_type   ,
+    //         |    case_reason ,
+    //         |    case_no     ,
+    //         |    court_name  ,
+    //         |    case_stage  ,
+    //         |    lable       ,
+    //         |    detail      ,
+    //         |    -- name_aggs['yg_name'] yg_name,
+    //         |    -- name_aggs['bg_name'] bg_name,
+    //         |    yg_name,
+    //         |    bg_name,
+    //         |    case_amt    ,
+    //         |    date        ,
+    //         |    court_level ,
+    //         |    0 as deleted     ,
+    //         |    cids
+    //         |FROM
+    //         |(
+    //         |SELECT  judicase_id
+    //         |        ,max(title) title
+    //         |        ,concat_ws(',',collect_set(case_type)) case_type
+    //         |        ,case_reason(case_reason,date,flag) case_reason
+    //         |        ,concat_ws(',',collect_set(case_no)) case_no
+    //         |        ,concat_ws(',',collect_set(court_name)) court_name
+    //         |        ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
+    //         |        ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
+    //         |        ,null as detail
+    //         |        ,max(case_amt) AS case_amt
+    //         |        ,max(date) AS date
+    //         |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
+    //         |        ,concat_ws(',',collect_set(cids)) cids
+    //         |        -- ,name_aggs(yg_name,bg_name,flag,date) name_aggs
+    //         |        ,concat_ws(',',collect_set(yg_name)) yg_name
+    //         |        ,concat_ws(',',collect_set(bg_name)) bg_name
+    //         |FROM    (
+    //         |        SELECT  a.*
+    //         |        FROM    (
+    //         |                   SELECT  judicase_id,flag,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,date,case_amt,cids,detail_id
+    //         |                   ,court_level(court_name) court_level
+    //         |                   FROM    $project.$t6
+    //         |                   WHERE   ds = '$t1_ds'
+    //         |                ) a
+    //         |                left join
+    //         |                (
+    //         |                select rowkey,flag from winhc_eci_dev.tmp_xf_deadbeat_company_deleted
+    //         |                )b on a.detail_id = b.rowkey and a.flag = b.flag
+    //         |                where b.rowkey is null
+    //         |        )
+    //         |GROUP BY judicase_id
+    //         |)x
+    //         |""".stripMargin).show(20, false)
+
+    //ads_eci_debtor_relation_xf
+
+    //生成债权表
+    //    val df = sql(
+    //      s"""
+    //        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.tmp_xf_yg_bg_name
+    //        |SELECT md5(concat_ws('',yg_name,bg_name)) id,yg_name,bg_name
+    //        |FROM (
+    //        |SELECT
+    //        |yg_name_x as yg_name
+    //        |,bg_name_x as bg_name
+    //        |FROM winhc_eci_dev.ads_judicial_case_relation_r1
+    //        |LATERAL VIEW explode(split(yg_name,',')) a AS yg_name_x
+    //        |LATERAL VIEW explode(split(bg_name,',')) b AS bg_name_x
+    //        |WHERE compare_name(yg_name,bg_name)
+    //        |AND (lable like '%被执行人%' or lable like '%限制高消费%'  or lable like '%失信人%')
+    //        |AND  LENGTH(cleanup(yg_name_x)) > 4
+    //        |AND  LENGTH(cleanup(bg_name_x)) > 4
+    //        |AND size(split(yg_name,',')) < 500
+    //        |AND size(split(bg_name,',')) < 500
+    //        |)
+    //        |GROUP BY yg_name,bg_name
+    //        |""".stripMargin)
+
+
+//    sql(
+//      """
+//        |INSERT OVERWRITE  TABLE winhc_eci_dev.tmp_xf_base_company_mapping
+//        |SELECT
+//        |a.cid,a.cname,a.new_cid,id,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
+//        |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,b.company_type,
+//        |credit_code,score,category_code,lat2,lng2,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
+//        |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,b.update_time,b.deleted
+//        |from (
+//        |SELECT * from winhc_eci_dev.base_company_mapping where ds = '20201125'
+//        |)a
+//        |JOIN
+//        |(
+//        |    SELECT
+//        |id,cid,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
+//        |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,company_type,
+//        |credit_code,score,category_code,lat2,lng2,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
+//        |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,update_time,deleted
+//        | from
+//        |(
+//        |SELECT
+//        |*,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY update_time DESC) AS num,split(verify,',')[0] lng2,split(verify,',')[1] lat2
+//        |from (
+//        |SELECT
+//        |verify(lng,lat) verify,
+//        |id,cid,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
+//        |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,company_type,
+//        |credit_code,score,category_code,lat,lng,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
+//        |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,update_time,deleted
+//        |from winhc_eci_dev.ads_company where ds ='20200604'
+//        |UNION ALL
+//        |SELECT
+//        |verify(lng,lat) verify,
+//        |id,cid,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
+//        |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,company_type,
+//        |credit_code,score,category_code,lat,lng,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
+//        |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,update_time,deleted
+//        |from winhc_eci_dev.inc_ads_company where ds >'20200604'
+//        |)
+//        |)
+//        |where num = 1
+//        |)b on a.cid =b.cid
+//        |""".stripMargin)
+
+    //公司基本信息全量表
+    sql(
+      s"""
+        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  TABLE $project.tmp_xf_base_company_mapping_new
+        |select
+        |cid,new_cid,cname,reg_status,area_code,
+        |a1[0]  province_code,a1[1]  province_name,
+        |a1[2]  city_code,a1[3]  city_name,
+        |a1[4]  county_code,a1[5]  county_name,
+        |reg_location,estiblish_time,category_code,
+        |a2[0]  category_first,a2[1]  category_second,a2[2]  category_third,
+        |reg_capital,phones,emails,company_type
+        |from
+        |(
+        |select
+        | area_code(area_code) a1
+        | ,category_code(category_code) a2
+        | ,*
+        |from ${project}.tmp_xf_base_company_mapping
+        |)
+        |""".stripMargin).show(100,false)
+
+    //生成债权表
+    sql(
+      """
+        |insert overwrite table winhc_eci_dev.ads_eci_debtor_relation_xf PARTITION (ds='20201127')
+        |select e.* from (
+        |SELECT
+        |c.id,c.yg_name,d.bg_name,c.new_cid yg_cid,d.new_cid bg_cid,
+        |c.reg_status yg_reg_status,
+        |c.province_code yg_province_code,
+        |c.province_name yg_province_name,
+        |c.city_code yg_city_code,
+        |c.city_name yg_city_name,
+        |c.county_code yg_county_code,
+        |c.county_name yg_county_name,
+        |c.reg_location yg_reg_location,
+        |c.estiblish_time yg_estiblish_time,
+        |c.category_code yg_category_code,
+        |c.category_first yg_category_first,
+        |c.category_second yg_category_second,
+        |c.category_third yg_category_third,
+        |c.reg_capital yg_reg_capital,
+        |c.phones yg_phones,
+        |c.emails yg_emails,
+        |d.reg_status bg_reg_status,
+        |d.province_code bg_province_code,
+        |d.province_name bg_province_name,
+        |d.city_code bg_city_code,
+        |d.city_name bg_city_name,
+        |d.county_code bg_county_code,
+        |d.county_name bg_county_name,
+        |d.reg_location bg_reg_location,
+        |d.estiblish_time bg_estiblish_time,
+        |d.category_code bg_category_code,
+        |d.category_first bg_category_first,
+        |d.category_second bg_category_second,
+        |d.category_third bg_category_third,
+        |d.reg_capital bg_reg_capital,
+        |d.phones bg_phones,
+        |d.emails bg_emails,
+        |0 as deleted
+        |FROM    (
+        |            SELECT  a.id
+        |                    ,a.yg_name
+        |                    ,b.*
+        |            FROM    (
+        |                        SELECT  id
+        |                                ,yg_name
+        |                        FROM    tmp_xf_yg_bg_name
+        |                    ) a
+        |            JOIN    (
+        |                        SELECT  *
+        |                                ,ROW_NUMBER() OVER(PARTITION BY CLEANUP(cname) ORDER BY estiblish_time DESC) AS num
+        |                        FROM    tmp_xf_base_company_mapping_new
+        |                        WHERE   length(cleanup(cname)) > 4
+        |                        AND     company_type NOT IN ('2','8')
+        |                    ) b
+        |            ON      cleanup(a.yg_name) = cleanup(b.cname)
+        |        )c
+        |JOIN    (
+        |            SELECT  a.id
+        |                    ,a.bg_name
+        |                    ,b.*
+        |            FROM    (
+        |                        SELECT  id
+        |                                ,bg_name
+        |                        FROM    tmp_xf_yg_bg_name
+        |                    ) a
+        |            JOIN    (
+        |                        SELECT  *
+        |                                ,ROW_NUMBER() OVER(PARTITION BY CLEANUP(cname) ORDER BY estiblish_time DESC) AS num
+        |                        FROM    tmp_xf_base_company_mapping_new
+        |                        WHERE   length(cleanup(cname)) > 4
+        |                        AND     company_type NOT IN ('2','8')
+        |                    ) b
+        |            ON      cleanup(a.bg_name) = cleanup(b.cname)
+        |        )d
+        |ON      c.id = d.id
+        |)e
+        |JOIN
+        |(
+        |SELECT * from (
+        |SELECT cid
+        |,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY publish_date DESC) AS num
+        |from ads_deadbeat_company
+        |where ds > '0' and deleted = 0 and tn <> 'company_zxr_final_case'
+        |)
+        |where num = 1
+        |)f
+        |on e.bg_cid = f.cid
+        |""".stripMargin)
+
+  }
+
+}

+ 4 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -59,6 +59,10 @@ trait CompanyMapping {
       title(yg, bg, reason)
     })
 
+    spark.udf.register("compare_name", (yg: String, bg: String) => {
+      compareName(yg, bg)
+    })
+
   }
 
   def prepare(spark: SparkSession): Unit = {

+ 16 - 1
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -265,6 +265,18 @@ object BaseUtil {
     Seq(replaceChar(ygname), replaceChar(bgname), reason).filter(s => StringUtils.isNotBlank(s)).mkString(",")
   }
 
+  def compareName(ygname: String, bgname: String): Boolean = {
+    if (StringUtils.isBlank(ygname) || StringUtils.isBlank(bgname)) {
+      return false
+    }
+    val a1 = ygname.split(",", -1).filter(StringUtils.isNotBlank(_)).toSet
+    val a2 = bgname.split(",", -1).filter(StringUtils.isNotBlank(_)).toSet
+    if(a1.size == 0 || a2.size == 0){
+      return false
+    }
+    (a1 & a2).size == 0
+  }
+
   def trimBlack(s: String): String = {
     var r = ""
     if (StringUtils.isNotBlank(s)) {
@@ -400,7 +412,10 @@ object BaseUtil {
     //    println(case_no_trim("(2015)怀执字第03601号号"))
     //    val seq = Seq("1", "3", "2", "7").mkString("\001")
     //    println(sortString(seq))
-    println(caseStage("(2019)鄂初7号"))
+    //println(caseStage("(2019)鄂初7号"))
+    val yg_name = ",,"
+    val bg_name = "张三,,小米,"
+    println(compareName(yg_name,bg_name))
   }
 
 }

+ 374 - 0
src/main/scala/com/winhc/bigdata/spark/utils/EsQueryUtils.scala

@@ -0,0 +1,374 @@
+package com.winhc.bigdata.spark.utils
+
+import java.util.Collections
+
+import com.alibaba.fastjson.JSON
+import com.winhc.bigdata.spark.const.EnvConst
+import org.apache.http.HttpHost
+import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
+import org.apache.http.entity.ContentType
+import org.apache.http.impl.client.BasicCredentialsProvider
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.spark.internal.Logging
+import org.elasticsearch.client.{RestClient, RestClientBuilder}
+
+/**
+ * @Description:
+ * @author π
+ * @date 2020/12/5 11:14
+ */
+object EsQueryUtils extends Logging {
+  def getClient(): RestClient = {
+    val credentialsProvider = new BasicCredentialsProvider();
+    credentialsProvider.setCredentials(AuthScope.ANY,
+      new UsernamePasswordCredentials("elastic", "elastic_168"))
+    // 单击所创建的Elasticsearch实例ID,在基本信息页面获取公网地址,即为HOST。
+    val restClient = RestClient.builder(new HttpHost(EnvConst.getEnv().getValue("es.nodes"), 9200))
+      .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
+        override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
+      }).build()
+    restClient
+  }
+
+  def createTable(index: String, query: String, client: RestClient): Unit = {
+    if (!tableExists(index, client)) {
+      logInfo(query)
+      try {
+        val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
+        val indexResponse = client.performRequest(
+          "PUT",
+          index,
+          Collections.emptyMap[String, String](),
+          entity)
+        val res = EntityUtils.toString(indexResponse.getEntity)
+        println(s"create table $index success .")
+      } catch {
+        case e: Exception => {
+          println(s"create table $index error .")
+          logError(e.getMessage, e)
+        }
+      }
+    }
+  }
+
+  def tableExists(index: String, client: RestClient): Boolean = {
+    try {
+      val entity = new NStringEntity("{\"size\":1}", ContentType.APPLICATION_JSON)
+      val indexResponse = client.performRequest(
+        "GET",
+        index,
+        Collections.emptyMap[String, String](),
+        entity)
+      val res = EntityUtils.toString(indexResponse.getEntity)
+      println(res)
+      println(s"tableExists $index true .")
+      true
+    } catch {
+      case e: Exception => {
+        println(s"tableExists $index false .")
+        //logError(e.getMessage, e)
+        false
+      }
+    }
+  }
+
+  def getAliases(index: String, client: RestClient) = {
+    try {
+      import scala.collection.JavaConverters._
+      val indexResponse1 = client.performRequest("GET", s"/$index/_alias/*")
+      val list = JSON.parseObject(EntityUtils.toString(indexResponse1.getEntity)).keySet().asScala.toList.sorted
+      println(s"getAliases $index success .")
+      list
+    } catch {
+      case e: Exception => {
+        println(s"getAliases $index error .")
+        logError(e.getMessage, e)
+        List()
+      }
+    }
+  }
+
+  def getIndexs(index: String, client: RestClient) = {
+    try {
+      val indexResponse1 = client.performRequest("GET", s"_cat/indices/$index*?h=index")
+      val res = EntityUtils.toString(indexResponse1.getEntity)
+      val list = res.split("\\n", -1).filter(_.length > 0).toList.sorted
+      println(s"getIndexs $index success .")
+      list
+    } catch {
+      case e: Exception => {
+        println(s"getIndexs $index error .")
+        logError(e.getMessage, e)
+        List()
+      }
+    }
+  }
+
+  def deleteIndex(index: String, client: RestClient) = {
+    try {
+      val indexResponse1 = client.performRequest("DELETE", index)
+      println(EntityUtils.toString(indexResponse1.getEntity))
+      println(s"deleteIndex $index success .")
+    } catch {
+      case e: Exception => {
+        println(s"deleteIndex $index error .")
+        logError(e.getMessage, e)
+      }
+    }
+  }
+
+  def updateAliases(index: String, aliases: String, aliasesType: String, client: RestClient) = {
+    val query =
+      s"""
+         |{
+         |  "actions": [
+         |    {
+         |      "$aliasesType": {
+         |        "index": "$index",
+         |        "alias": "$aliases"
+         |      }
+         |    }
+         |  ]
+         |}
+         |""".stripMargin
+
+    try {
+      val entity = new NStringEntity(query, ContentType.APPLICATION_JSON)
+      val indexResponse = client.performRequest(
+        "POST",
+        "/_aliases/",
+        Collections.emptyMap[String, String](),
+        entity)
+      println(query)
+      println(s"updateAliases $index success .")
+    } catch {
+      case e: Exception => {
+        println(s"updateAliases $index error .")
+        logError(e.getMessage, e)
+      }
+    }
+  }
+
+  lazy val judicial_case_detail: String =
+    """
+      |{
+      |  "settings": {
+      |    "number_of_shards": 20,
+      |    "number_of_replicas": 0,
+      |    "index": {
+      |      "mapping": {
+      |        "nested_fields": {
+      |          "limit": "300"
+      |        }
+      |      },
+      |      "analysis": {
+      |        "analyzer": {
+      |          "ik_analyzer": {
+      |            "tokenizer": "ik_max_tokenizer"
+      |          }
+      |        },
+      |        "tokenizer": {
+      |          "ik_max_tokenizer": {
+      |            "type": "ik_max_word"
+      |          }
+      |        }
+      |      }
+      |    }
+      |  },
+      |  "mappings": {
+      |    "judicial": {
+      |      "properties": {
+      |         "judicase_id": {
+      |          "type": "keyword"
+      |        },
+      |        "title": {
+      |          "type": "text",
+      |          "fields": {
+      |            "keyword": {
+      |              "type": "keyword",
+      |              "ignore_above": 256
+      |            }
+      |          },"analyzer": "ik_analyzer"
+      |        },
+      |        "case_type": {
+      |          "type": "keyword"
+      |        },
+      |        "case_reason": {
+      |          "type": "keyword"
+      |        },
+      |        "case_no": {
+      |          "type": "keyword"
+      |        },
+      |        "court_name": {
+      |          "type": "keyword"
+      |        },
+      |        "case_stage": {
+      |          "type": "keyword"
+      |        },
+      |        "lable": {
+      |          "type": "keyword"
+      |        },
+      |        "detail": {
+      |          "properties": {
+      |            "flag": {
+      |              "type": "keyword"
+      |            },
+      |            "detail_id": {
+      |              "type": "keyword"
+      |            },
+      |            "date": {
+      |              "type": "date",
+      |              "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd"
+      |            },
+      |            "name": {
+      |              "type": "keyword"
+      |            }
+      |          }
+      |        },
+      |         "yg_name": {
+      |          "type": "text",
+      |          "fields": {
+      |            "keyword": {
+      |              "type": "keyword",
+      |              "ignore_above": 256
+      |            }
+      |          },"analyzer": "ik_analyzer"
+      |        },
+      |        "bg_name": {
+      |          "type": "text",
+      |          "fields": {
+      |            "keyword": {
+      |              "type": "keyword",
+      |              "ignore_above": 256
+      |            }
+      |          },"analyzer": "ik_analyzer"
+      |        },
+      |        "last_date": {
+      |          "type": "date",
+      |          "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd"
+      |        },
+      |        "deleted": {
+      |          "type": "integer"
+      |        }
+      |      }
+      |    }
+      |  }
+      |}
+      |""".stripMargin
+
+  lazy val judicial_case: String =
+    """
+      |{
+      |  "settings": {
+      |    "number_of_shards": 20,
+      |    "number_of_replicas": 0,
+      |    "index": {
+      |      "mapping": {
+      |        "nested_fields": {
+      |          "limit": "300"
+      |        }
+      |      },
+      |      "analysis": {
+      |        "analyzer": {
+      |          "ik_analyzer": {
+      |            "tokenizer": "ik_max_tokenizer"
+      |          }
+      |        },
+      |        "tokenizer": {
+      |          "ik_max_tokenizer": {
+      |            "type": "ik_max_word"
+      |          }
+      |        }
+      |      }
+      |    }
+      |  },
+      |  "mappings": {
+      |    "judicial": {
+      |      "properties": {
+      |        "title": {
+      |          "type": "text",
+      |          "fields": {
+      |            "keyword": {
+      |              "type": "keyword",
+      |              "ignore_above": 256
+      |            }
+      |          },
+      |          "analyzer": "ik_analyzer"
+      |        },
+      |        "case_type": {
+      |          "type": "keyword"
+      |        },
+      |        "case_reason": {
+      |          "type": "keyword"
+      |        },
+      |        "case_no": {
+      |          "type": "text",
+      |          "fields": {
+      |            "keyword": {
+      |              "type": "keyword",
+      |              "ignore_above": 256
+      |            }
+      |          },
+      |          "analyzer": "ik_analyzer"
+      |        },
+      |        "court_name": {
+      |          "type": "keyword"
+      |        },
+      |        "case_stage": {
+      |          "type": "keyword"
+      |        },
+      |        "lable": {
+      |          "type": "keyword"
+      |        },
+      |        "yg_name": {
+      |          "type": "text",
+      |          "fields": {
+      |            "keyword": {
+      |              "type": "keyword",
+      |              "ignore_above": 256
+      |            }
+      |          },
+      |          "analyzer": "ik_analyzer"
+      |        },
+      |        "bg_name": {
+      |          "type": "text",
+      |          "fields": {
+      |            "keyword": {
+      |              "type": "keyword",
+      |              "ignore_above": 256
+      |            }
+      |          },
+      |          "analyzer": "ik_analyzer"
+      |        },
+      |        "case_amt": {
+      |          "type": "double"
+      |        },
+      |        "date": {
+      |          "type": "date",
+      |          "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd"
+      |        },
+      |        "court_level": {
+      |          "type": "keyword"
+      |        },
+      |        "deleted": {
+      |          "type": "integer"
+      |        },
+      |        "cids": {
+      |          "type": "keyword"
+      |        }
+      |      }
+      |    }
+      |  }
+      |}
+      |""".stripMargin
+
+  def main(args: Array[String]): Unit = {
+    val client = EsQueryUtils.getClient()
+    deleteIndex("xf_test02", client)
+    //    updateAliases("xf_test02", "xf_test", "add", client)
+    //    println(getAliases("xf_test", client))
+    //EsQueryUtils.createTable("test05", EsQueryUtils.judicial_case_detail, client)
+  }
+}