Browse Source

fix: 司法案件fix bugs

许家凯 4 years ago
parent
commit
0f07c5ac80

+ 47 - 14
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation_CaseAgg.scala

@@ -33,7 +33,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
   is_id_card_udf()
   is_id_card_udf()
 
 
   def etl(ds: String): Unit = {
   def etl(ds: String): Unit = {
-    AsyncExtract.startAndWait(spark, Seq(
+    /*AsyncExtract.startAndWait(spark, Seq(
       ("dishonest etl...", () => {
       ("dishonest etl...", () => {
         etl_company_dishonest_info(ds)
         etl_company_dishonest_info(ds)
         true
         true
@@ -48,9 +48,9 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
         etl_company_zxf(ds)
         etl_company_zxf(ds)
         true
         true
       })
       })
-    ))
+    ))*/
 
 
-    etc_dwd_judicial_case(ds)
+    //    etc_dwd_judicial_case(ds)
     relationByGroup()
     relationByGroup()
   }
   }
 
 
@@ -262,7 +262,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |WHERE   case_no_trim(case_no) is  not null
          |WHERE   case_no_trim(case_no) is  not null
          |UNION ALL
          |UNION ALL
          |SELECT  null as id
          |SELECT  null as id
-         |        , 1 as main_case_no
+         |        , 0 as main_case_no
          |        ,case_no_trim(connect_case_no) as case_no
          |        ,case_no_trim(connect_case_no) as case_no
          |        ,id as rowkey
          |        ,id as rowkey
          |        ,${getStrToMap(other_cols)} as case_attribute
          |        ,${getStrToMap(other_cols)} as case_attribute
@@ -366,14 +366,15 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
     spark.udf.register("str_sort", (v1: String, v2: String) => Seq(v1, v2).filter(_ != null).sorted.mkString(""))
     spark.udf.register("str_sort", (v1: String, v2: String) => Seq(v1, v2).filter(_ != null).sorted.mkString(""))
     spark.udf.register("match_case_no", (case_no: String) => pat matches case_no)
     spark.udf.register("match_case_no", (case_no: String) => pat matches case_no)
 
 
+    //---        winhc_eci_dev.xjk_test_ttttt
     sql(
     sql(
       s"""
       s"""
          | SELECT  *
          | SELECT  *
          | FROM    $org_tab
          | FROM    $org_tab
          | WHERE   ds = '$dwd_last_ds'
          | WHERE   ds = '$dwd_last_ds'
-         | AND     case_no IS NOT NULL
-         | AND     case_no <> ''
-         | AND     match_case_no(case_no)
+         |--- AND     case_no IS NOT NULL
+         |--- AND     case_no <> ''
+         |--- AND     match_case_no(case_no)
          |""".stripMargin)
          |""".stripMargin)
       .repartition(500)
       .repartition(500)
       //      .cache()
       //      .cache()
@@ -396,10 +397,25 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |        ) AS t1
          |        ) AS t1
          |WHERE   length(t1.party) > 4
          |WHERE   length(t1.party) > 4
          |GROUP BY case_no,party
          |GROUP BY case_no,party
+         |UNION ALL
+         |SELECT  case_no,null as party,collect_set(id) as connect_case_id
+         |FROM    (
+         |            SELECT  concat_ws('$separation',id,tn) as id
+         |                    ,case_no
+         |                    ,tn
+         |                    ,main_case_no
+         |                    ,case_attribute
+         |                    ,party
+         |            FROM    dwd_judicial_case_tmp
+         |            LATERAL VIEW explode(split(concat_ws('\\n',case_attribute['yg_name'],case_attribute['bg_name']) ,'\\n')) t AS party
+         |            WHERE   tn in ('company_dishonest_info','company_dishonest_info_person','company_zxr','company_zxr_person')
+         |        ) AS t1
+         |GROUP BY case_no
          |""".stripMargin).rdd
          |""".stripMargin).rdd
+      //(tn = 'wenshu_detail' and main_case_no = 1) and
       .flatMap(r => {
       .flatMap(r => {
-        val case_no = r.getAs[String]("case_no")
         val party = r.getAs[String]("party")
         val party = r.getAs[String]("party")
+        val case_no = r.getAs[String]("case_no")
         val connect_case_id = r.getAs[Seq[String]]("connect_case_id")
         val connect_case_id = r.getAs[Seq[String]]("connect_case_id")
         val list = ArrayBuffer[(String, String, String, String, String, String, Int)]()
         val list = ArrayBuffer[(String, String, String, String, String, String, Int)]()
         if (connect_case_id.length < 2) {
         if (connect_case_id.length < 2) {
@@ -426,17 +442,25 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |        ,t2.tn AS tn_2
          |        ,t2.tn AS tn_2
          |        ,1 as connect_type
          |        ,1 as connect_type
          |        ,str_sort(concat_ws('',t1.id,t1.tn),concat_ws('',t2.id,t2.tn)) as xjk_sorted
          |        ,str_sort(concat_ws('',t1.id,t1.tn),concat_ws('',t2.id,t2.tn)) as xjk_sorted
-         |FROM    (select * from dwd_judicial_case_tmp where main_case_no = 1) AS t1
-         |FULL JOIN (select * from dwd_judicial_case_tmp where main_case_no = 0) AS t2
+         |FROM    (
+         |select * from dwd_judicial_case_tmp where main_case_no = 1 and tn = 'wenshu_detail'
+         |) AS t1
+         |FULL JOIN (
+         |
+         |select * from dwd_judicial_case_tmp where main_case_no = 0
+         |UNION ALL
+         |select * from dwd_judicial_case_tmp where main_case_no = 1 and tn <> 'wenshu_detail'
+         |
+         |) AS t2
          |ON      t1.case_no = t2.case_no
          |ON      t1.case_no = t2.case_no
          |AND     t1.id <> t2.id
          |AND     t1.id <> t2.id
-         |AND     case_equ(t1.case_attribute , t2.case_attribute)
+         |AND     case_equ(t1.case_attribute , t2.case_attribute,t1.tn,t2.tn)
          |""".stripMargin)
          |""".stripMargin)
       .createTempView("connect_tmp_2")
       .createTempView("connect_tmp_2")
 
 
     sql(
     sql(
       s"""
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.ads_judicial_case_relation
+         |--- INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.ads_judicial_case_relation
          |SELECT  id_1
          |SELECT  id_1
          |        ,id_2
          |        ,id_2
          |        ,case_no_1
          |        ,case_no_1
@@ -471,6 +495,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
          |        ) AS t2
          |        ) AS t2
          |WHERE   t2.num = 1
          |WHERE   t2.num = 1
          |""".stripMargin)
          |""".stripMargin)
+      .show(500, false)
 
 
   }
   }
 
 
@@ -482,9 +507,16 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
     s"str_to_map(concat_ws('\002',$str),'\002','\001')"
     s"str_to_map(concat_ws('\002',$str),'\002','\001')"
   }
   }
 
 
-  private def getVal(map: Map[String, String], key: String): String = map.getOrElse(key, "")
+  private def getVal(map: Map[String, String], key: String): String = {
+    val v = map.getOrElse(key, "")
+    if (v == null) {
+      ""
+    } else {
+      v
+    }
+  }
 
 
-  private def case_equ(m1: Map[String, String], m2: Map[String, String]): Boolean = {
+  private def case_equ(m1: Map[String, String], m2: Map[String, String], tn1: String, tn2: String): Boolean = {
     try {
     try {
       val current_case_party_list_org: Seq[String] = getVal(m1, "yg_name").split("\n") ++ getVal(m1, "bg_name").split("\n")
       val current_case_party_list_org: Seq[String] = getVal(m1, "yg_name").split("\n") ++ getVal(m1, "bg_name").split("\n")
       val connect_case_party_list_org: Seq[String] = getVal(m2, "yg_name").split("\n") ++ getVal(m2, "bg_name").split("\n")
       val connect_case_party_list_org: Seq[String] = getVal(m2, "yg_name").split("\n") ++ getVal(m2, "bg_name").split("\n")
@@ -501,6 +533,7 @@ case class JudicialCaseRelation_CaseAgg(s: SparkSession,
         println("error")
         println("error")
         println(m1)
         println(m1)
         println(m2)
         println(m2)
+        throw new RuntimeException(ex)
       }
       }
         false
         false
     }
     }