Browse Source

Merge remote-tracking branch 'origin/master'

许家凯 4 years ago
parent
commit
2f066dce3b

+ 231 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/JudicialCaseRelationPre2.scala

@@ -0,0 +1,231 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+import scala.collection.mutable
+
+/**
+ * @Description:司法案件预处理
+ * @author π
+ * @date 2020/9/17 14:45
+ */
+object JudicialCaseRelationPre2 {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    println(
+      s"""
+         |project: $project
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    JudicialCaseRelationPre2(spark, project).calc()
+    spark.stop()
+  }
+}
+
+case class JudicialCaseRelationPre2(s: SparkSession, project: String
+                                   ) extends LoggingUtils with CompanyMapping with BaseFunc {
+  override protected val spark: SparkSession = s
+
+  def precalc(): Unit = {
+    prepareFunctions(spark)
+    //文书预处理
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='20200924',tn='wenshu')
+         |SELECT  a.judicase_id
+         |        ,'0' as flag
+         |        ,title
+         |        ,case_type
+         |        ,case_reason
+         |        ,case_no
+         |        ,court_name
+         |        ,concat_ws('',case_stage(case_no)) as case_stage
+         |        --,'裁判文书' lable
+         |        --,concat_ws('|','民事判决日期',judge_date,case_id) as detail
+         |        ,regexp_replace(yg_name,'\n',',') as yg_name
+         |        ,regexp_replace(bg_name,'\n',',') as bg_name
+         |        ,judge_date as date
+         |        ,case_id as detail_id
+         |        ,case_amt
+         |FROM    (
+         |            SELECT  *
+         |            FROM $project.xjk_ads_judicial_case_relation1_tmp
+         |        ) a
+         |JOIN    (
+         |            SELECT  *
+         |            FROM    $project.ods_justicase
+         |            WHERE   ds = '20200830'
+         |        ) b
+         |ON      a.id = b.case_id
+         |""".stripMargin).show(10, false)
+
+
+
+    //法院公告预处理
+    sql(
+      s"""
+         |insert ${if (isWindows) "INTO" else "OVERWRITE"} table $project.ads_judicial_case_relation_pre partition(ds='20200924',tn='court_open_announcement')
+         |select
+         |  judicase_id
+         |  ,flag
+         |  ,title
+         |  ,case_type
+         |  ,case_reason
+         |  ,case_no
+         |  ,court_name
+         |  ,case_stage
+         |  ,yg_name
+         |  ,bg_name
+         |  ,date
+         |  ,detail_id
+         |  ,case_amt
+         |from (
+         |      select
+         |      md5(cleanup(case_no)) as judicase_id
+         |      ,"1" as flag
+         |      ,concat_ws('',plaintiff,'与',defendant,case_reason) as title
+         |      ,concat_ws('',case_type(case_no)) as case_type
+         |      ,case_reason
+         |      ,case_no
+         |      ,court as court_name
+         |      ,concat_ws('',case_stage(case_no)) as case_stage
+         |      ,plaintiff as yg_name
+         |      ,defendant as bg_name
+         |      ,start_date as date
+         |      ,rowkey as detail_id
+         |      ,0.0 as case_amt
+         |      ,row_number() over(partition by rowkey order by update_time desc) num
+         |      from $project.inc_ads_company_court_open_announcement
+         |      where length(case_no) > 0 and ds > '0'
+         |   )
+         |where num = 1
+         |""".stripMargin).show(10, false)
+
+
+
+    //tmp_xf_judicial_case_relation_open_counrt
+    //tmp_xf_judicial_case_relation_wenshu
+
+  }
+
+  def calc(): Unit = {
+    prepareFunctions(spark)
+    //预处理数据
+    //precalc()
+
+    //替换司法案件id
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.tmp_xf_judicial_case_relation_replace
+         |SELECT  COALESCE(b.judicase_id,a.judicase_id) judicase_id
+         |        ,a.flag
+         |        ,a.title
+         |        ,a.case_type
+         |        ,a.case_reason
+         |        ,a.case_no
+         |        ,a.court_name
+         |        ,a.case_stage
+         |        ,case_label(a.flag) lable
+         |        ,concat_ws('|',a.flag,a.date,a.detail_id) as detail
+         |        ,a.yg_name
+         |        ,a.bg_name
+         |        ,a.date
+         |        ,a.detail_id
+         |        ,a.case_amt
+         |FROM    (
+         |  select * from $project.ads_judicial_case_relation_pre where ds = '20200924' and tn ='court_open_announcement'
+         |) a
+         |LEFT JOIN (
+         |  select case_no,max(judicase_id) judicase_id
+         |  from $project.ads_judicial_case_relation_pre
+         |  where ds = '20200924' and tn ='wenshu' and  length(trim(case_no)) > 0
+         |  group by case_no
+         |) b
+         |ON  CLEANUP(a.case_no) = CLEANUP(b.case_no)
+         |union all
+         |SELECT   judicase_id
+         |        ,flag
+         |        ,title
+         |        ,case_type
+         |        ,case_reason
+         |        ,case_no
+         |        ,court_name
+         |        ,case_stage
+         |        ,case_label(flag) lable
+         |        ,concat_ws('|',flag,date,detail_id) as detail
+         |        ,yg_name
+         |        ,bg_name
+         |        ,date
+         |        ,detail_id
+         |        ,case_amt
+         |from $project.ads_judicial_case_relation_pre where ds = '20200924' and tn ='wenshu' and length(trim(case_no)) > 0
+         |""".stripMargin).show(10, false)
+
+    //司法案件主表
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.tmp_xf_judicial_case_relation_r1
+         |SELECT  judicase_id
+         |        ,max(first_title) title
+         |        ,max(case_type) case_type
+         |        ,max(case_reason) case_reason
+         |        ,concat_ws(',',collect_set(case_no)) case_no
+         |        ,concat_ws(',',collect_set(court_name)) court_name
+         |        ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
+         |        ,concat_ws(',',max(case_type),collect_set(lable)) lable
+         |        ,concat_ws(',',collect_set(detail)) detail
+         |        ,max(first_yg_name) AS yg_name
+         |        ,max(first_bg_name) AS bg_name
+         |        ,max(case_amt) AS case_amt
+         |FROM    (
+         |        SELECT  * ,first_value(yg_name)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_yg_name
+         |                ,first_value(bg_name)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_bg_name
+         |                ,first_value(title)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_title
+         |        FROM    (
+         |                SELECT  *
+         |                FROM    $project.tmp_xf_judicial_case_relation_replace
+         |                )
+         |        )
+         |GROUP BY judicase_id
+         |""".stripMargin).show(10, false)
+
+    //明细表
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.tmp_xf_judicial_case_relation_r2
+         |SELECT  md5(concat_ws('',judicase_id,CLEANUP(case_no),case_stage)) id
+         |        ,judicase_id
+         |        ,max(first_title) title
+         |        ,max(case_type) case_type
+         |        ,max(case_reason) case_reason
+         |        ,case_no
+         |        ,max(court_name) court_name
+         |        ,case_stage
+         |        ,concat_ws(',',max(case_type),collect_set(lable)) lable
+         |        ,concat_ws(',',collect_set(detail)) detail
+         |        ,max(first_yg_name) yg_name
+         |        ,max(first_bg_name) bg_name
+         |FROM    (
+         |        SELECT  * ,first_value(yg_name)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_yg_name
+         |                ,first_value(bg_name)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_bg_name
+         |                ,first_value(title)OVER (PARTITION BY judicase_id ORDER BY date ASC ) AS first_title
+         |        FROM    (
+         |                SELECT  *
+         |                FROM    $project.tmp_xf_judicial_case_relation_replace
+         |                )
+         |)
+         |GROUP BY judicase_id
+         |         ,case_no
+         |         ,case_stage
+         |""".stripMargin).show(10, false)
+
+  }
+
+}

+ 0 - 10
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_env_punishment.scala

@@ -23,16 +23,6 @@ case class company_env_punishment()extends CompanyDynamicHandle {
        |违反法律:${new_map("law_break")}
        |执行情况:${new_map("execution")}""".stripMargin
   }
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map("content")
-
   /**
    * 变更时间
    *

+ 0 - 10
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_announcement.scala

@@ -21,16 +21,6 @@ case class company_land_announcement() extends CompanyDynamicHandle {
        |项目位置:${new_map("project_loc")}
        |合同签订日期:${new_map("contract_date")}""".stripMargin
   }
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map("project_loc") + new_map("supply_method")
-
   /**
    * 变更时间
    *

+ 0 - 10
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_publicity.scala

@@ -23,16 +23,6 @@ case class company_land_publicity() extends CompanyDynamicHandle {
        |公布日期:${new_map("publication_date")}""".stripMargin
 
   }
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map("location") + new_map("use_for")
-
   /**
    * 变更时间
    *

+ 0 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_liquidating_info.scala

@@ -21,7 +21,6 @@ case class company_liquidating_info() extends CompanyDynamicHandle {
        |清算成员:${new_map("member")}""".stripMargin
   }
 
-
   /**
    * 变更时间
    *

+ 16 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -31,6 +31,22 @@ trait CompanyMapping {
     spark.udf.register("name_judge", (name: String, yg_name: String, bg_name: String) => {
       nameJudge(name, yg_name, bg_name)
     })
+
+    spark.udf.register("case_type", (case_no: String) => {
+      caseType(case_no)
+    })
+
+    spark.udf.register("case_stage", (case_no: String) => {
+      caseStage(case_no)
+    })
+
+    spark.udf.register("last_stage", (stages: String) => {
+      lastStage(stages)
+    })
+    spark.udf.register("case_label", (l: String) => {
+      label(l)
+    })
+
   }
 
   def prepare(spark: SparkSession): Unit = {

+ 82 - 8
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -1,8 +1,10 @@
 package com.winhc.bigdata.spark.utils
 
 import java.text.SimpleDateFormat
-import java.util.regex.{Pattern}
+import java.util.regex.Pattern
 import java.util.{Calendar, Date, Locale}
+
+import cn.hutool.core.util.StrUtil
 import org.apache.commons.lang3.StringUtils
 import org.apache.commons.lang3.time.DateFormatUtils
 import org.apache.spark.sql.SparkSession
@@ -66,9 +68,9 @@ object BaseUtil {
     atDaysAfter(-1, nowDate("yyyyMMdd"))
   }
 
-  def getYear(n:Int): String ={
+  def getYear(n: Int): String = {
     val cal = Calendar.getInstance(Locale.CHINA)
-    cal.add(Calendar.YEAR,1*n)
+    cal.add(Calendar.YEAR, 1 * n)
     cal.get(Calendar.YEAR).toString
   }
 
@@ -133,18 +135,90 @@ object BaseUtil {
   }
 
   def nameJudge(name: String, yg_name: String, bg_name: String): String = {
-    if(StringUtils.isNotBlank(name)){
-      if(StringUtils.isNotBlank(yg_name)&&yg_name.contains(name)){
+    if (StringUtils.isNotBlank(name)) {
+      if (StringUtils.isNotBlank(yg_name) && yg_name.contains(name)) {
         return "y"
-      }else if(StringUtils.isNotBlank(bg_name)&&bg_name.contains(name)){
+      } else if (StringUtils.isNotBlank(bg_name) && bg_name.contains(name)) {
         return "b"
       }
     }
-     ""
+    ""
+  }
+
+  def caseType(caseNO: String): String = {
+    if (StringUtils.isNotBlank(caseNO)) {
+      if (caseNO.contains("辖")) {
+        return "管辖案件"
+      } else if (caseNO.contains("民")) {
+        return "民事案件"
+      } else if (caseNO.contains("执")) {
+        return "执行案件"
+      } else if (caseNO.contains("行")) {
+        return "行政案件"
+      } else if (caseNO.contains("刑")) {
+        return "刑事案件"
+      } else if (caseNO.contains("陪")) {
+        return "赔偿案件"
+      }
+    }
+    "其它案件"
+  }
+
+  def caseStage(caseNo: String): String = {
+    if (StringUtils.isNotBlank(caseNo)) {
+      if (StrUtil.containsAny(caseNo, "破申", "商申")) {
+        return "其它"
+      }
+      if (StrUtil.containsAny(caseNo, "监", "抗", "再", "申", "提", "再")) {
+        return "再审"
+      }
+      if (StrUtil.containsAny(caseNo, "初")) {
+        return "一审"
+      } else if (StrUtil.containsAny(caseNo, "终")) {
+        return "二审"
+      } else if (StrUtil.containsAny(caseNo, "执")) {
+        return "执行"
+      }
+    }
+    "其它"
+  }
+
+  def lastStage(s: String): String = {
+    if (StringUtils.isNotBlank(s)) {
+      val arr = s.split(" ")
+      if (StrUtil.containsAny(s, "执")) {
+        return arr.filter(StrUtil.containsAny(_, "执")).head
+      }
+      if (StrUtil.containsAny(s, "再")) {
+        return arr.filter(StrUtil.containsAny(_, "再")).head
+      }
+      if (StrUtil.containsAny(s, "二")) {
+        return arr.filter(StrUtil.containsAny(_, "二")).head
+      }
+      if (StrUtil.containsAny(s, "一")) {
+        return arr.filter(StrUtil.containsAny(_, "一")).head
+      }
+      return arr.head
+    }
+    "其它"
+  }
+
+  def label(s: String): String = {
+    var r = ""
+    if (StringUtils.isNotBlank(s)) {
+      r = s match {
+        case "0" => "裁判文书"
+        case "1" => "开庭公告"
+        case _ => ""
+      }
+    }
+    r
   }
 
   def main(args: Array[String]): Unit = {
-    println(getYear(-1))
+    println(label("1"))
+    println(label("0"))
+    println(label("2"))
   }
 
 }

+ 30 - 8
src/main/scala/com/winhc/bigdata/spark/utils/HbaseUtil.scala

@@ -64,6 +64,30 @@ object HbaseUtil {
     ms.toList
   }
 
+
+  def deleteRowKeys(tb: Table, prefix: String, family: String = FAMILY_NAME) = {
+    import org.apache.hadoop.hbase.client.Delete
+    import scala.collection.JavaConverters._
+    val scan = new Scan()
+    scan.setRowPrefixFilter(prefix.getBytes())
+    val list = ListBuffer[String]()
+    val deletes: ListBuffer[Delete] = ListBuffer[Delete]()
+    try {
+      val scanner = tb.getScanner(scan)
+      import scala.collection.JavaConversions._
+      import collection.JavaConverters._
+      for (res <- scanner) {
+        list.append(Bytes.toString(res.getRow))
+        val t1: Delete = new Delete(res.getRow)
+        deletes += t1
+      }
+    } catch {
+      case e: Throwable => e.printStackTrace()
+    }
+    tb.delete(deletes.asJava)
+    list.toList
+  }
+
   def deleteRows(tb: Table, rowkeys: List[String], family: String = FAMILY_NAME): Unit = {
     import org.apache.hadoop.hbase.client.Delete
     import scala.collection.JavaConverters._
@@ -71,19 +95,17 @@ object HbaseUtil {
     for (r <- rowkeys) {
       val t: Delete = new Delete(Bytes.toBytes(r))
       deletes += t
-      //println(t)
+      println(t)
     }
     tb.delete(deletes.asJava)
   }
 
   def main(args: Array[String]): Unit = {
-//    val rows = getRowDataScan(getTable("COMPANY_SCORE"), "23402373")
-//    for (r <- rows) {
-//      println(r)
-//    }
-
-    val rows = List[String]("4")
-    deleteRows(getTable("TEST_COMPANY"), rows)
+    val tb: Table = getTable("COMPANY_PATENT_LIST_INDEX_INCLUDE")
+    val rows: List[String] = deleteRowKeys(tb, "624378817,3137567565")
+    println(rows)
+    //val rows = List[String]("4")
+    //deleteRows(tb, rows)
 
   }
 }