Explorar o código

Merge remote-tracking branch 'origin/master'

xufei %!s(int64=4) %!d(string=hai) anos
pai
achega
67809d19c2

+ 11 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/company_judicial_assistance.scala

@@ -3,7 +3,7 @@ package com.winhc.bigdata.spark.jobs
 import com.winhc.bigdata.spark.config.PhoenixConfig
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{CompanyCidAndNameUtils, CompanySummaryPro, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{BaseUtil, CompanyCidAndNameUtils, CompanySummaryPro, LoggingUtils, SparkUtils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.functions.col
@@ -323,14 +323,14 @@ case class company_judicial_assistance(s: SparkSession,
     }
 
 
-    def inc(): Unit = {
+    def inc(ds: String): Unit = {
       sql(
         s"""
            |SELECT  cid
            |        ,current_cid AS new_cid
            |        ,name AS cname
            |FROM    winhc_eci_dev.inc_ads_company
-           |WHERE   ds > '$ads_last_ds'
+           |WHERE   ds > '$ds'
            |AND     current_cid IS NOT NULL
            |""".stripMargin)
         .createOrReplaceTempView("tmp_company_cid_change")
@@ -339,7 +339,7 @@ case class company_judicial_assistance(s: SparkSession,
         s"""
            |SELECT  ${intersect_ods_cols.mkString(",")}
            |FROM    $inc_ods_tab
-           |WHERE   ds > '$inc_ads_last_ds'
+           |WHERE   ds > '$ds'
            |""".stripMargin)
         .repartition(500)
         .createOrReplaceTempView("company_judicial_assistance_inc")
@@ -491,13 +491,14 @@ case class company_judicial_assistance(s: SparkSession,
            |WHERE   ds = '$inc_ods_last_ds'
            |""".stripMargin)
         .save2HBase("COMPANY_JUDICIAL_ASSISTANCE", "rowkey", ads_cols.diff(Seq("ds", "id")))
+
       sql(
         s"""
            |SELECT  *
            |FROM    $inc_ads_list_tab
            |WHERE   ds = '$inc_ods_last_ds'
            |""".stripMargin)
-        .select(ads_cols.diff(Seq("ds")).map(column => col(column).cast("string")): _*)
+        .select(getColumns(inc_ads_list_tab).diff(Seq("ds")).map(column => col(column).cast("string")): _*)
         .write
         .mode("append")
         .jdbc(PhoenixConfig.getPhoenixJDBCUrl, "COMPANY_JUDICIAL_ASSISTANCE_LIST", PhoenixConfig.getPhoenixProperties)
@@ -516,8 +517,12 @@ case class company_judicial_assistance(s: SparkSession,
       println("all...")
       all()
     } else {
+      var ds: String = inc_ads_last_ds
+      if (inc_ods_last_ds.equals(inc_ads_last_ds)) {
+        ds = BaseUtil.atDaysAfter(-1, inc_ads_last_ds)
+      }
       println("inc...")
-      inc()
+      inc(ds)
     }
 
   }

+ 70 - 32
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation_CaseAgg.scala

@@ -29,6 +29,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
 
   import spark.implicits._
 
+  case_no_trim_udf()
   is_id_card_udf()
 
   def etl(ds: String): Unit = {
@@ -37,7 +38,8 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
         etl_company_dishonest_info(ds)
         true
       })
-      , ("wenshu etl...", () => {
+      ,
+      ("wenshu etl...", () => {
         etl_wenshu(ds)
         true
       })
@@ -48,7 +50,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
       })
     ))
 
-    etc_dwd_judicial_case(ds)
+        etc_dwd_judicial_case(ds)
     relationByGroup()
   }
 
@@ -68,7 +70,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
       .withColumn("id", monotonically_increasing_id)
       .rdd.map(r =>
       (s"${r.getAs[String]("rowkey")}_${r.getAs[String]("tn")}", r)
-    ).groupByKey().flatMap(r => {
+    ).groupByKey(500).flatMap(r => {
       val li = r._2
       val id = li.last.getAs[Long]("id")
       li.map(r => {
@@ -91,7 +93,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
       StructField("tn", StringType)
     ))
 
-    spark.createDataFrame(rdd, schema).createOrReplaceTempView("all_tmp_xjk")
+    spark.createDataFrame(rdd, schema).createTempView("all_tmp_xjk")
 
 
     sql(
@@ -116,7 +118,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          | ,court as connect_court_name
          | ,gist_id as connect_case_no
          | ,null as yg_name
-         | ,null as bg_name
+         | ,cname as bg_name
          | ,ds
          |""".stripMargin
 
@@ -150,7 +152,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
            |        ) AS t2
            |WHERE   t2.num = 1
            |$verify
-           |""".stripMargin).createOrReplaceTempView("xjk_tmp")
+           |""".stripMargin).createTempView("xjk_tmp")
 
       var other_cols: Seq[String] = null
       var result_tab: String = null
@@ -164,7 +166,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
              |FROM    $res_tab
              |""".stripMargin)
           .cache()
-          .createOrReplaceTempView(result_tab)
+          .createTempView(result_tab)
       } else {
         result_tab = s"xjk_tmp_xjk_tmp"
         sql(
@@ -172,7 +174,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
              |SELECT  $view
              |FROM    xjk_tmp
              |""".stripMargin)
-          .createOrReplaceTempView(result_tab)
+          .createTempView(result_tab)
       }
       other_cols = getColumns(result_tab)
 
@@ -245,26 +247,28 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |        ) AS t2
          |WHERE   t2.num = 1
          |""".stripMargin)
-      .createOrReplaceTempView(tmp_tab)
+      .createTempView(tmp_tab)
     val other_cols = getColumns(tmp_tab)
 
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ds',tn='$tn')
-         |SELECT  0 as id
+         |SELECT  null as id
          |        , 1 as main_case_no
-         |        ,case_no
+         |        ,case_no_trim(case_no) as case_no
          |        ,id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    $tmp_tab
+         |WHERE   case_no_trim(case_no) is  not null
          |UNION ALL
-         |SELECT  0 as id
+         |SELECT  null as id
          |        , 0 as main_case_no
-         |        ,connect_case_no as case_no
+         |        ,case_no_trim(connect_case_no) as case_no
          |        ,id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    $tmp_tab
          |WHERE   connect_case_no is not null
+         |AND     case_no_trim(connect_case_no) is not null
          |""".stripMargin)
   }
 
@@ -277,7 +281,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          | ,case_no
          | ,gist_unit as connect_court_name
          | ,gist_dd as connect_case_no
-         | ,name as yg_name
+         | ,null as yg_name
          | ,name as bg_name
          | ,ds
          |""".stripMargin
@@ -320,7 +324,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |        ) AS t2
          |WHERE   t2.num = 1
          |""".stripMargin)
-      .createOrReplaceTempView(tmp_tab)
+      .createTempView(tmp_tab)
 
 
     sql(
@@ -329,25 +333,27 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |FROM    $tmp_tab lateral view explode(split(connect_case_no,'\\n')) t as single_connect_case_no
          |""".stripMargin)
       //      .cache()
-      .createOrReplaceTempView(s"explode_$tmp_tab")
+      .createTempView(s"explode_$tmp_tab")
 
     sql(
       s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.dwd_judicial_case  PARTITION(ds='$ds',tn='$tableName')
-         |SELECT  0 as id
+         |SELECT  null as id
          |        , 1 as main_case_no
-         |        ,case_no
+         |        ,case_no_trim(case_no) as case_no
          |        ,$table_id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    explode_$tmp_tab
+         |WHERE   case_no_trim(case_no) is not null
          |UNION ALL
-         |SELECT  0 as id
+         |SELECT  null as id
          |        , 0 as main_case_no
-         |        ,single_connect_case_no as case_no
+         |        ,case_no_trim(single_connect_case_no) as case_no
          |        ,$table_id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |FROM    explode_$tmp_tab
          |WHERE   single_connect_case_no is not null
+         |AND     case_no_trim(single_connect_case_no) is not null
          |""".stripMargin)
   }
 
@@ -360,18 +366,19 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
     spark.udf.register("str_sort", (v1: String, v2: String) => Seq(v1, v2).filter(_ != null).sorted.mkString(""))
     spark.udf.register("match_case_no", (case_no: String) => pat matches case_no)
 
+    //---        winhc_eci_dev.xjk_test_ttttt
     sql(
       s"""
          | SELECT  *
          | FROM    $org_tab
          | WHERE   ds = '$dwd_last_ds'
-         | AND     case_no IS NOT NULL
-         | AND     case_no <> ''
-         | AND     match_case_no(case_no)
+         |--- AND     case_no IS NOT NULL
+         |--- AND     case_no <> ''
+         |--- AND     match_case_no(case_no)
          |""".stripMargin)
       .repartition(500)
       //      .cache()
-      .createOrReplaceTempView("dwd_judicial_case_tmp")
+      .createTempView("dwd_judicial_case_tmp")
 
     //需要区分group by ,只用一个
     sql(
@@ -390,10 +397,25 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |        ) AS t1
          |WHERE   length(t1.party) > 4
          |GROUP BY case_no,party
+         |UNION ALL
+         |SELECT  case_no,null as party,collect_set(id) as connect_case_id
+         |FROM    (
+         |            SELECT  concat_ws('$separation',id,tn) as id
+         |                    ,case_no
+         |                    ,tn
+         |                    ,main_case_no
+         |                    ,case_attribute
+         |                    ,party
+         |            FROM    dwd_judicial_case_tmp
+         |            LATERAL VIEW explode(split(concat_ws('\\n',case_attribute['yg_name'],case_attribute['bg_name']) ,'\\n')) t AS party
+         |            WHERE   tn in ('company_dishonest_info','company_dishonest_info_person','company_zxr','company_zxr_person')
+         |        ) AS t1
+         |GROUP BY case_no
          |""".stripMargin).rdd
+      //(tn = 'wenshu_detail' and main_case_no = 1) and
       .flatMap(r => {
-        val case_no = r.getAs[String]("case_no")
         val party = r.getAs[String]("party")
+        val case_no = r.getAs[String]("case_no")
         val connect_case_id = r.getAs[Seq[String]]("connect_case_id")
         val list = ArrayBuffer[(String, String, String, String, String, String, Int)]()
         if (connect_case_id.length < 2) {
@@ -408,7 +430,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
         list
       })
       .toDF("id_1", "id_2", "case_no_1", "case_no_2", "tn_1", "tn_2", "connect_type")
-      .createOrReplaceTempView("connect_tmp_1")
+      .createTempView("connect_tmp_1")
 
     sql(
       s"""
@@ -420,13 +442,21 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |        ,t2.tn AS tn_2
          |        ,1 as connect_type
          |        ,str_sort(concat_ws('',t1.id,t1.tn),concat_ws('',t2.id,t2.tn)) as xjk_sorted
-         |FROM    (select * from dwd_judicial_case_tmp where main_case_no = 1) AS t1
-         |FULL JOIN (select * from dwd_judicial_case_tmp where main_case_no = 0) AS t2
+         |FROM    (
+         |select * from dwd_judicial_case_tmp where main_case_no = 1 and tn = 'wenshu_detail'
+         |) AS t1
+         |FULL JOIN (
+         |
+         |select * from dwd_judicial_case_tmp where main_case_no = 0
+         |UNION ALL
+         |select * from dwd_judicial_case_tmp where main_case_no = 1 and tn <> 'wenshu_detail'
+         |
+         |) AS t2
          |ON      t1.case_no = t2.case_no
          |AND     t1.id <> t2.id
-         |AND     case_equ(t1.case_attribute , t2.case_attribute)
+         |AND     case_equ(t1.case_attribute , t2.case_attribute,t1.tn,t2.tn)
          |""".stripMargin)
-      .createOrReplaceTempView("connect_tmp_2")
+      .createTempView("connect_tmp_2")
 
     sql(
       s"""
@@ -476,9 +506,16 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
     s"str_to_map(concat_ws('\002',$str),'\002','\001')"
   }
 
-  private def getVal(map: Map[String, String], key: String): String = map.getOrElse(key, "")
+  private def getVal(map: Map[String, String], key: String): String = {
+    val v = map.getOrElse(key, "")
+    if (v == null) {
+      ""
+    } else {
+      v
+    }
+  }
 
-  private def case_equ(m1: Map[String, String], m2: Map[String, String]): Boolean = {
+  private def case_equ(m1: Map[String, String], m2: Map[String, String], tn1: String, tn2: String): Boolean = {
     try {
       val current_case_party_list_org: Seq[String] = getVal(m1, "yg_name").split("\n") ++ getVal(m1, "bg_name").split("\n")
       val connect_case_party_list_org: Seq[String] = getVal(m2, "yg_name").split("\n") ++ getVal(m2, "bg_name").split("\n")
@@ -495,6 +532,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
         println("error")
         println(m1)
         println(m2)
+        throw new RuntimeException(ex)
       }
         false
     }

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/utils/CompanyCidAndNameUtils.scala

@@ -27,7 +27,7 @@ case class CompanyCidAndNameUtils(s: SparkSession
          |""".stripMargin)
       .repartition(500)
       .cache()
-      .createOrReplaceTempView("all_company_tmp")
+      .createTempView("all_company_tmp")
   }
 
   def addNewNameByCid(org_table_name: String, cidField: String, addName: String): String = {
@@ -54,7 +54,7 @@ case class CompanyCidAndNameUtils(s: SparkSession
          |FROM    $org_table_name
          |WHERE   $cidField IS NULL
          |""".stripMargin)
-      .createOrReplaceTempView(res_tab)
+      .createTempView(res_tab)
     res_tab
   }
 
@@ -88,7 +88,7 @@ case class CompanyCidAndNameUtils(s: SparkSession
          |FROM    $org_table_name
          |WHERE   $cidField IS NULL
          |""".stripMargin)
-      .createOrReplaceTempView(res_tab)
+      .createTempView(res_tab)
     res_tab
   }
 
@@ -121,7 +121,7 @@ case class CompanyCidAndNameUtils(s: SparkSession
          |FROM    $org_table_name
          |WHERE   $cidField IS NULL
          |""".stripMargin)
-      .createOrReplaceTempView(res_tab)
+      .createTempView(res_tab)
     res_tab
   }
 }

+ 14 - 10
src/main/scala/com/winhc/bigdata/spark/utils/case_connect_utils.scala

@@ -55,9 +55,9 @@ object case_connect_utils {
 
     for (char <- current_party.mkString("").toCharArray.map(_.toString).filter(vague_word.contains(_))) {
       for (userName <- current_party) {
-        for (splitUser <- userName.split(char)) {
+        for (splitUser <- userName.replace(char,"\001").split("\001")) {
           val all_str = connect_party.mkString("")
-          if (!all_str.contains(splitUser.substring(0, 1))) {
+          if (StringUtils.isNotEmpty(splitUser) && (!all_str.contains(splitUser.substring(0, 1)))) {
             return false
           }
         }
@@ -96,14 +96,18 @@ object case_connect_utils {
   }
 
   def main(args: Array[String]): Unit = {
-    //    val current_case_party_list: Seq[String] = Seq("张三", "张二", "张一", "张四")
-    //    val connect_case_party_list: Seq[String] = Seq("张三", "张二")
-    //
-    //    val current_case_no = ""
-    //    val connect_case_no = ""
-    //    val current_court_name = ""
-    //    val connect_court_name = ""
+    val current_case_party_list: Seq[String] = Seq("乐视控股(北京)有限公司", "贾跃亭", "甘薇", "贾跃民", "武汉信用小额贷款股份有限公司")
+    val connect_case_party_list: Seq[String] = Seq("贾跃民")
+
+    val current_case_no = "(2017)鄂01民初3720号"
+    val connect_case_no = "(2017)鄂01民初3720号"
+    val current_court_name = "湖北省武汉市中级人民法院"
+    val connect_court_name = "湖北省武汉市中级人民法院"
     //
-    //    println(isConnect(current_case_party_list, connect_case_party_list, current_case_no, connect_case_no, current_court_name, connect_court_name))
+    println(isConnect(current_case_party_list, connect_case_party_list, current_case_no, connect_case_no, current_court_name, connect_court_name))
+    var userName = "jasdfja*sldfjk"
+    var char = "*"
+
+    println(userName.replace(char,"\001").split("\001").mkString("\t"))
   }
 }