소스 검색

Merge remote-tracking branch 'origin/master'

许家凯 4 년 전
부모
커밋
5170670c88

+ 26 - 11
src/main/scala/com/winhc/bigdata/spark/ng/jobs/args_company_job.scala

@@ -32,7 +32,7 @@ object args_company_job {
     , args_company_job("company_court_open_announcement", Seq("case_no", "start_date")
       , rowkey_udf = "md5(cleanup(concat_ws('',case_no,split_date(cast(start_date as String)) )))"
       , is_super_filter = false
-      , where = "case_no_trim(case_no) is not null and is_json_str(defendant_info) and is_json_str(plaintiff_info) and is_json_str(litigant_info)"
+      , where = "is_json_str(defendant_info) and is_json_str(plaintiff_info) and is_json_str(litigant_info)"
       , explode_args = Seq(
         explode_args("plaintiff_info", "$.litigant_id", "plaintiff_info_id_explode")
         , explode_args("defendant_info", "$.litigant_id", "defendant_info_id_explode")
@@ -99,8 +99,8 @@ object args_company_job {
     )
 
 
-    , args_company_job("company_court_announcement", Seq("case_no", "announcement_type", "publish_date", "court_name")
-      , rowkey_udf = "md5(cleanup(concat_ws('',case_no,announcement_type,split_date(cast(publish_date as String)),court_name )))"
+    , args_company_job("company_court_announcement", Seq("case_no", "announcement_type", "publish_date", "court_name", "bltn_no")
+      , rowkey_udf = "md5(cleanup(concat_ws('',case_no,announcement_type,split_date(cast(publish_date as String)),court_name,bltn_no )))"
       , is_super_filter = false
       , where = "is_json_str(plaintiff_info) and is_json_str(litigant_info)"
       , explode_args = Seq(
@@ -177,6 +177,8 @@ object args_company_job {
 
     , args_company_job("bankruptcy_open_case", Seq("applicant", "respondent", "public_date")
       , rowkey_udf = "md5(cleanup(concat_ws('',applicant,respondent,split_date(cast(public_date as String)))))"
+      , id_user_defined_rowkey = true
+
       , is_super_filter = false
       , where = "is_json_str(applicant_info) and is_json_str(respondent_info) "
       , explode_args = Seq(
@@ -187,12 +189,14 @@ object args_company_job {
     )
 
     , args_company_job("bankruptcy_open_announcement", Seq("title", "case_no")
-      , rowkey_udf = "md5(cleanup(concat_ws('',title,case_no)))"
+      , rowkey_udf = "concat_ws('_', main_id, md5(cleanup(concat_ws('',title,case_no))) )"
+
       , is_super_filter = false
     )
 
     , args_company_job("bankruptcy_judgment_document", Seq("title", "case_no")
-      , rowkey_udf = "md5(cleanup(concat_ws('',title,case_no)))"
+      , rowkey_udf = "concat_ws('_', main_id, md5(cleanup(concat_ws('',title,case_no))) )"
+
       , is_super_filter = false
     )
 
@@ -206,30 +210,41 @@ object args_company_job {
       )
     )
 
-    , args_company_job("company_punishment_info", Seq("punish_number")
-      , rowkey_udf = "md5(cleanup(concat_ws('',punish_number)))"
+    , args_company_job("company_punishment_info", Seq("company_name", "punish_number")
+      , rowkey_udf = "md5(cleanup(concat_ws('',company_name, punish_number)))"
       , is_super_filter = false
     )
 
-    , args_company_job("company_punishment_info_creditchina", Seq("punish_number")
-      , rowkey_udf = "md5(cleanup(concat_ws('',punish_number)))"
+    , args_company_job("company_punishment_info_creditchina", Seq("company_name", "punish_number")
+      , rowkey_udf = "md5(cleanup(concat_ws('',company_name, punish_number)))"
       , is_super_filter = false
     )
 
     , args_company_job("company_brief_cancel_announcement", Seq("company_id")
       , rowkey_udf = "md5(cleanup(concat_ws('',company_id)))"
+      , id_user_defined_rowkey = true
       , is_super_filter = false
     )
 
     , args_company_job("company_brief_cancel_announcement_objection", Seq("main_id", "objection_date", "objection_content")
-      , rowkey_udf = "md5(cleanup(concat_ws('',main_id, split_date(cast(objection_date as String)), objection_content)))"
+      , rowkey_udf = "concat_ws('_', main_id, md5(cleanup(concat_ws('',main_id, split_date(cast(objection_date as String)), objection_content))) )"
       , is_super_filter = false
     )
 
     , args_company_job("company_brief_cancel_announcement_result", Seq("main_id", "announcement_apply_date", "brief_cancel_result")
-      , rowkey_udf = "md5(cleanup(concat_ws('',main_id, split_date(cast(announcement_apply_date as String)), brief_cancel_result)))"
+      , rowkey_udf = "concat_ws('_', main_id, md5(cleanup(concat_ws('',main_id, split_date(cast(announcement_apply_date as String)), brief_cancel_result))) )"
       , is_super_filter = false
     )
+
+    , args_company_job("company_lawsuit", Seq("case_no", "title")
+      , rowkey_udf = "md5(cleanup(concat_ws('', case_no, title)))"
+      , is_super_filter = false
+      , where = "is_json_str(litigant_info) and is_json_str(defendant_info) and is_json_str(plaintiff_info)"
+      , explode_args = Seq(
+        explode_args("plaintiff_info", "$.litigant_id", "plaintiff_info_id_explode")
+        , explode_args("defendant_info", "$.litigant_id", "defendant_info_id_explode")
+      )
+    )
   )
 
   def get_args_company_job(tn: String): args_company_job = {

+ 213 - 0
src/main/scala/com/winhc/bigdata/spark/ng/relation/company_back_update.scala

@@ -0,0 +1,213 @@
+package com.winhc.bigdata.spark.ng.relation
+
+import com.alibaba.fastjson.{JSON, JSONArray, JSONPath}
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.is_json_str
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+import scala.collection.mutable
+
+/**
+ * @Description:疑似问题数据回流线下处理
+ * @author π
+ * @date 2021/4/22 10:59
+ */
+case class job_params(tableName: String
+                      , rowkey: String = "rowkey" //主键rowkey
+                      , names: String = "" // 炸开语句
+                     )
+
+object job_params {
+  val tab_args = Seq(
+    job_params(tableName = "company_court_open_announcement"
+      , names = "split_names(CONCAT_WS('&',litigant_info,plaintiff_info,defendant_info),'$.name')"
+    )
+    , job_params(tableName = "company_court_announcement"
+      , names = "split_names(CONCAT_WS('&',plaintiff_info,litigant_info),'$.name')"
+    )
+    , job_params(tableName = "company_send_announcement"
+      , names = "split_names(CONCAT_WS('&',litigant_info,plaintiff_info,defendant_info),'$.name')"
+    )
+    , job_params(tableName = "company_court_register"
+      , names = "split_names(CONCAT_WS('&',litigant_info,plaintiff_info,defendant_info),'$.name')"
+    )
+    , job_params(tableName = "company_zxr_final_case"
+      , names = "split(CONCAT_WS('&',name),'&')"
+    )
+    , job_params(tableName = "company_equity_info"
+      , names = "split_names(CONCAT_WS('&', pledgor_info, pledgee_info),'$.pledgor&$.pledgee')"
+    )
+    , job_params(tableName = "company_zxr"
+      , names = "split(CONCAT_WS('&',name),'&')"
+    )
+    , job_params(tableName = "company_dishonest_info"
+      , names = "split(CONCAT_WS('&',name),'&')"
+    )
+    , job_params(tableName = "company_zxr_restrict"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+    , job_params(tableName = "zxr_evaluate_results"
+      , names = "split(CONCAT_WS('&',name),'&')"
+    )
+    , job_params(tableName = "zxr_evaluate"
+      , names = "split(CONCAT_WS('&',name),'&')"
+    )
+    , job_params(tableName = "restrictions_on_exit"
+      , names = "split(CONCAT_WS('&',executed_person_keyno),'&')"
+    )
+    , job_params(tableName = "company_punishment_info"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+    , job_params(tableName = "company_punishment_info_creditchina"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+    , job_params(tableName = "bankruptcy_open_case"
+      , names = "split_names(CONCAT_WS('&',applicant_info, respondent_info),'$.name')"
+    )
+    , job_params(tableName = "company_judicial_assistance"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+
+    )
+    , job_params(tableName = "company_land_mortgage"
+      , names = "split(CONCAT_WS('&',mortgagor,mortgagee),'&')"
+    )
+    , job_params(tableName = "company_public_announcement"
+      , names = "split(CONCAT_WS('&',pay_bank,gather_name,drawer,owner,apply_name),'&')"
+    )
+    , job_params(tableName = "company_abnormal_info"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+    , job_params(tableName = "company_illegal_info"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+    , job_params(tableName = "auction_tracking"
+      , names = "split_names(CONCAT_WS('&',company_info),'$.company_name')"
+    )
+    , job_params(tableName = "company_brief_cancel_announcement"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+    , job_params(tableName = "company_lawsuit"
+      , names = "split_names(CONCAT_WS('&',litigant_info,plaintiff_info,defendant_info),'$.name')"
+    )
+    , job_params(tableName = "company_check_info"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+    , job_params(tableName = "company_double_random_check_info"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+    , job_params(tableName = "company_mortgage_info"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+    , job_params(tableName = "company_own_tax"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+    , job_params(tableName = "company_tax_contravention"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+  )
+
+  def get_args_company_job(tn: String): job_params = {
+    tab_args.find(p => tn.equals(p.tableName)).getOrElse(throw new NullPointerException("tn is not fount"))
+  }
+
+}
+
+case class company_back_update(s: SparkSession
+                               , project: String //表所在工程名
+                               , job_params: job_params //入参
+                              ) extends LoggingUtils with BaseFunc with CompanyMapping {
+  override protected val spark: SparkSession = s
+  val tn: String = job_params.tableName
+  val rowkey: String = job_params.rowkey
+  val names: String = job_params.names
+  val ads_tab = s"$project.ads_$tn"
+  val inc_ads_tab = s"$project.inc_ads_$tn"
+
+  //回流结果表
+  val company_back_update = s"$project.tmp_xf_company_back_update"
+  //疑似问题公司名称
+  //val companyid_name_mapping = s"$project.tmp_xf_companyid_name_mapping_tmp"
+  //曾用名问题数据
+  val companyid_name_mapping = s"$project.tmp_xf_spider_rizhi_name"
+
+  val inter_cols: Seq[String] = getColumns(ads_tab).intersect(getColumns(inc_ads_tab))
+  val lastDs: String = BaseUtil.getYesterday()
+
+  register()
+
+
+  private def register(): Unit = {
+    prepareFunctions(spark)
+  }
+
+
+  def calc() = {
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $company_back_update PARTITION(ds='$lastDs',tn='$tn')
+         |SELECT  $rowkey
+         |FROM    (
+         |            SELECT  $rowkey
+         |            FROM    (
+         |                        SELECT  $rowkey
+         |                                ,name
+         |                        FROM    (
+         |                                    SELECT  $rowkey
+         |                                            ,name
+         |                                    FROM    (
+         |                                                SELECT  $rowkey
+         |                                                        ,$names names
+         |                                                FROM    $ads_tab
+         |                                                WHERE   ds > 0
+         |                                                UNION ALL
+         |                                                SELECT  $rowkey
+         |                                                        ,$names names
+         |                                                FROM    $inc_ads_tab
+         |                                                WHERE   ds > 0
+         |                                            )
+         |                                    LATERAL VIEW explode(names) b AS name
+         |                                )
+         |                        WHERE   LENGTH(cleanup(name)) > 5
+         |                    ) a
+         |            JOIN    (
+         |                        SELECT  name
+         |                        FROM    $companyid_name_mapping
+         |                        GROUP BY name
+         |                    ) b
+         |            ON      name_cleanup(a.name) = name_cleanup(b.name)
+         |        )
+         |GROUP BY rowkey
+         |""".stripMargin)
+
+    sql(
+      s"""
+         |ALTER TABLE $company_back_update ADD IF NOT EXISTS PARTITION(ds='$lastDs',tn='$tn')
+         |""".stripMargin)
+
+  }
+
+
+}
+
+object company_back_update {
+  def main(args: Array[String]): Unit = {
+    if (args.size != 2) {
+      println(args.mkString(","))
+      println("please set project tn.")
+      sys.exit(-1)
+    }
+    val Array(project, tn) = args
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    val re = company_back_update(s = spark, project = project, job_params.get_args_company_job(tn))
+    re.calc()
+    spark.stop()
+  }
+}

+ 5 - 2
src/main/scala/com/winhc/bigdata/spark/ng/utils/export_company_index_2_es.scala

@@ -238,13 +238,16 @@ object export_company_index_2_es {
     , export_2_es_args("bankruptcy_open_case"
       , "rowkey,case_no,case_type,agency_court,applicant,applicant_info,respondent,respondent_info,public_date,deleted".split(","))
     , export_2_es_args("auction_tracking"
-      , "rowkey,auction_items_id,company_info,case_no,auction_title,initial_price,start_time,apply_count,url,deleted".split(","))
+      , "rowkey,auction_items_id,company_info,case_no,auction_title,initial_price,start_time,apply_count,url,deleted".split(",")
+      , handles = Seq(field_handle(field_name = "initial_price", handle = "round(initial_price, 2)")))
     , export_2_es_args("company_punishment_info"
-      , "rowkey,company_id,company_name,person_name,pid,type,department_name,decision_date,publish_date,deleted".split(","))
+      , "rowkey,company_id,punish_number,company_name,person_name,pid,type,department_name,decision_date,publish_date,deleted".split(","))
     , export_2_es_args("company_punishment_info_creditchina"
       , "rowkey,company_id,company_name,punish_number,punish_name,person_name,pid,decision_date,status,deleted".split(","))
     , export_2_es_args("company_brief_cancel_announcement"
       , "rowkey,company_id,deleted".split(","))
+    , export_2_es_args("company_lawsuit"
+      , "rowkey,case_id,doc_id,case_no,doc_type,case_type,case_reason_level2,case_reason_level3,case_reason_level4,case_reason,case_reason_levelnum,case_stage,case_amt,court_name,court_province,court_city,court_level,judge_date,judge_year,judge_result,title,spider_date,update_date,plaintiff_info,defendant_info,litigant_info,all_lawyer,pub_date,pub_year,deleted".split(","))
   )
 
 

+ 4 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -67,6 +67,10 @@ trait CompanyMapping {
       nameCleanup(name)
     })
 
+    spark.udf.register("split_names", (json_array: String, json_path: String) => {
+      split_names(json_array, json_path)
+    })
+
   }
 
   def prepare(spark: SparkSession): Unit = {

+ 40 - 3
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -1,17 +1,18 @@
 package com.winhc.bigdata.spark.utils
 
 import cn.hutool.core.util.StrUtil
-import com.alibaba.fastjson.JSON
+import com.alibaba.fastjson.{JSON, JSONArray, JSONPath}
 import com.winhc.bigdata.spark.implicits.RegexUtils._
 import org.apache.commons.lang3.StringUtils
 import org.apache.commons.lang3.time.DateFormatUtils
 import org.apache.spark.sql.SparkSession
-
 import java.security.MessageDigest
 import java.text.SimpleDateFormat
 import java.util.regex.Pattern
 import java.util.{Calendar, Date, Locale}
 
+import scala.collection.mutable
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/6/3 18:49
@@ -482,11 +483,47 @@ object BaseUtil {
     re
   }
 
+  def split_names(json_array: String, json_path: String): Seq[String] = {
+    try {
+      if (StringUtils.isEmpty(json_array)) {
+        return Seq.empty
+      }
+
+      val set = mutable.Set[String]()
+      val jsonArr = json_array.split("&", -1)
+      val pathArr = json_path.split("&", -1)
+      var path = pathArr.head
+      for (i <- 0 until jsonArr.length) {
+        if (pathArr.length > 1) {
+          path = pathArr(i)
+        }
+        parse(jsonArr(i), json_path, set)
+      }
+
+      set.toSeq
+    } catch {
+      case e: Exception => {
+        println(json_array)
+        Seq.empty
+      }
+    }
+  }
+
+  def parse(json: String, json_path: String, set: mutable.Set[String]): Unit = {
+    val jsonArray = JSONPath.eval(JSON.parse(json), json_path).asInstanceOf[JSONArray].toArray[String](Array())
+    for (o <- jsonArray) {
+      set.add(o)
+    }
+  }
+
   def main(args: Array[String]): Unit = {
+    val json_array = "[{\"name\": \"史某某\", \"litigant_id\": \"\"}, {\"name\": \"丁某某\", \"litigant_id\": \"\"}, {\"name\": \"杨某\", \"litigant_id\": \"\"}, {\"name\": \"陈某某\", \"litigant_id\": \"\"}]&[{\"name\": \"丁某某\", \"litigant_id\": \"\"}, {\"name\": \"史某某\", \"litigant_id\": \"\"}]&[{\"name\": \"杨某\", \"litigant_id\": \"\"}, {\"name\": \"陈某某\", \"litigant_id\": \"\"}]"
+    val json_path = "$.name"
+    println(split_names(json_array,json_path))
 //    println(nameCleanup("小米科技.;有,@限公  司  雷军"))
     //    println(title("xx", null, "reason"))
     //    println(parseAddress("大石桥市人民法院"))
-        println(case_no_trim_v2("(2019)年中国贸仲京裁字第0394号号"))
+      //  println(case_no_trim_v2("(2019)年中国贸仲京裁字第0394号号"))
     //    val seq = Seq("1", "3", "2", "7").mkString("\001")
     //    println(sortString(seq))
     //println(caseStage("(2019)鄂初7号"))