xufei 4 سال پیش
والد
کامیت
2fd7edd5b4

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/ng/jobs/args_company_job.scala

@@ -32,7 +32,7 @@ object args_company_job {
     , args_company_job("company_court_open_announcement", Seq("case_no", "start_date")
       , rowkey_udf = "md5(cleanup(concat_ws('',case_no,split_date(cast(start_date as String)) )))"
       , is_super_filter = false
-      , where = "case_no_trim(case_no) is not null and is_json_str(defendant_info) and is_json_str(plaintiff_info) and is_json_str(litigant_info)"
+      , where = "is_json_str(defendant_info) and is_json_str(plaintiff_info) and is_json_str(litigant_info)"
       , explode_args = Seq(
         explode_args("plaintiff_info", "$.litigant_id", "plaintiff_info_id_explode")
         , explode_args("defendant_info", "$.litigant_id", "defendant_info_id_explode")
@@ -98,8 +98,8 @@ object args_company_job {
     )
 
 
-    , args_company_job("company_court_announcement", Seq("case_no", "announcement_type", "publish_date", "court_name")
-      , rowkey_udf = "md5(cleanup(concat_ws('',case_no,announcement_type,split_date(cast(publish_date as String)),court_name )))"
+    , args_company_job("company_court_announcement", Seq("case_no", "announcement_type", "publish_date", "court_name", "bltn_no")
+      , rowkey_udf = "md5(cleanup(concat_ws('',case_no,announcement_type,split_date(cast(publish_date as String)),court_name,bltn_no )))"
       , is_super_filter = false
       , where = "is_json_str(plaintiff_info) and is_json_str(litigant_info)"
       , explode_args = Seq(

+ 174 - 0
src/main/scala/com/winhc/bigdata/spark/ng/relation/company_back_update.scala

@@ -0,0 +1,174 @@
+package com.winhc.bigdata.spark.ng.relation
+
+import com.alibaba.fastjson.{JSON, JSONArray, JSONPath}
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.is_json_str
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+import scala.collection.mutable
+
+/**
+ * @Description:疑似问题数据回流线下处理
+ * @author π
+ * @date 2021/4/22 10:59
+ */
+case class job_params(tableName: String
+                      , rowkey: String = "rowkey" //主键rowkey
+                      , names: String = "" // 炸开语句
+                     )
+
+object job_params {
+  val tab_args = Seq(
+    job_params(tableName = "company_court_open_announcement"
+      , names = "split_names(CONCAT_WS('&',litigant_info,plaintiff_info,defendant_info),'$.name')"
+    )
+    , job_params(tableName = "company_court_announcement"
+      , names = "split_names(CONCAT_WS('&',plaintiff_info,litigant_info),'$.name')"
+    )
+    , job_params(tableName = "company_send_announcement"
+      , names = "split_names(CONCAT_WS('&',litigant_info,plaintiff_info,defendant_info),'$.name')"
+    )
+    , job_params(tableName = "company_court_register"
+      , names = "split_names(CONCAT_WS('&',litigant_info,plaintiff_info,defendant_info),'$.name')"
+    )
+    , job_params(tableName = "company_zxr_final_case"
+      , names = "split(CONCAT_WS('&',name),'&')"
+    )
+    , job_params(tableName = "company_equity_info"
+      , names = "split_names(CONCAT_WS('&', pledgor_info, pledgee_info),'$.pledgor&$.pledgee')"
+    )
+    , job_params(tableName = "company_zxr"
+      , names = "split(CONCAT_WS('&',name),'&')"
+    )
+    , job_params(tableName = "company_dishonest_info"
+      , names = "split(CONCAT_WS('&',name),'&')"
+    )
+    , job_params(tableName = "company_zxr_restrict"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+    , job_params(tableName = "zxr_evaluate_results"
+      , names = "split(CONCAT_WS('&',name),'&')"
+    )
+    , job_params(tableName = "zxr_evaluate"
+      , names = "split(CONCAT_WS('&',name),'&')"
+    )
+    , job_params(tableName = "restrictions_on_exit"
+      , names = "split(CONCAT_WS('&',executed_person_keyno),'&')"
+    )
+    , job_params(tableName = "company_punishment_info"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+    , job_params(tableName = "company_punishment_info_creditchina"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+    , job_params(tableName = "bankruptcy_open_case"
+      , names = "split_names(CONCAT_WS('&',applicant_info, respondent_info),'$.name')"
+    )
+    , job_params(tableName = "company_judicial_assistance"
+      , names = "split(CONCAT_WS('&',company_name),'&')"
+    )
+  )
+
+  def get_args_company_job(tn: String): job_params = {
+    tab_args.find(p => tn.equals(p.tableName)).getOrElse(throw new NullPointerException("tn is not fount"))
+  }
+
+}
+
+case class company_back_update(s: SparkSession
+                               , project: String //表所在工程名
+                               , job_params: job_params //入参
+                              ) extends LoggingUtils with BaseFunc with CompanyMapping {
+  override protected val spark: SparkSession = s
+  val tn: String = job_params.tableName
+  val rowkey: String = job_params.rowkey
+  val names: String = job_params.names
+  val ads_tab = s"$project.ads_$tn"
+  val inc_ads_tab = s"$project.inc_ads_$tn"
+
+  //回流结果表
+  val company_back_update = s"$project.tmp_xf_company_back_update"
+  //疑似问题公司名称
+  val companyid_name_mapping = s"$project.tmp_xf_companyid_name_mapping_tmp"
+
+  val inter_cols: Seq[String] = getColumns(ads_tab).intersect(getColumns(inc_ads_tab))
+  val lastDs: String = BaseUtil.getYesterday()
+
+  register()
+
+
+  private def register(): Unit = {
+    prepareFunctions(spark)
+  }
+
+
+  def calc() = {
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $company_back_update PARTITION(ds='$lastDs',tn='$tn')
+         |SELECT  $rowkey
+         |FROM    (
+         |            SELECT  $rowkey
+         |            FROM    (
+         |                        SELECT  $rowkey
+         |                                ,name
+         |                        FROM    (
+         |                                    SELECT  $rowkey
+         |                                            ,name
+         |                                    FROM    (
+         |                                                SELECT  $rowkey
+         |                                                        ,$names names
+         |                                                FROM    $ads_tab
+         |                                                WHERE   ds > 0
+         |                                                UNION ALL
+         |                                                SELECT  $rowkey
+         |                                                        ,$names names
+         |                                                FROM    $inc_ads_tab
+         |                                                WHERE   ds > 0
+         |                                            )
+         |                                    LATERAL VIEW explode(names) b AS name
+         |                                )
+         |                        WHERE   LENGTH(cleanup(name)) > 5
+         |                    ) a
+         |            JOIN    (
+         |                        SELECT  name
+         |                        FROM    $companyid_name_mapping
+         |                        GROUP BY name
+         |                    ) b
+         |            ON      name_cleanup(a.name) = name_cleanup(b.name)
+         |        )
+         |GROUP BY rowkey
+         |""".stripMargin)
+
+    sql(
+      s"""
+         |ALTER TABLE $company_back_update ADD IF NOT EXISTS PARTITION(ds='$lastDs',tn='$tn')
+         |""".stripMargin)
+
+  }
+
+
+}
+
+object company_back_update {
+  def main(args: Array[String]): Unit = {
+    if (args.size != 2) {
+      println(args.mkString(","))
+      println("please set project tn.")
+      sys.exit(-1)
+    }
+    val Array(project, tn) = args
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    val re = company_back_update(s = spark, project = project, job_params.get_args_company_job(tn))
+    re.calc()
+    spark.stop()
+  }
+}

+ 4 - 0
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -67,6 +67,10 @@ trait CompanyMapping {
       nameCleanup(name)
     })
 
+    spark.udf.register("split_names", (json_array: String, json_path: String) => {
+      split_names(json_array, json_path)
+    })
+
   }
 
   def prepare(spark: SparkSession): Unit = {

+ 40 - 3
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -1,17 +1,18 @@
 package com.winhc.bigdata.spark.utils
 
 import cn.hutool.core.util.StrUtil
-import com.alibaba.fastjson.JSON
+import com.alibaba.fastjson.{JSON, JSONArray, JSONPath}
 import com.winhc.bigdata.spark.implicits.RegexUtils._
 import org.apache.commons.lang3.StringUtils
 import org.apache.commons.lang3.time.DateFormatUtils
 import org.apache.spark.sql.SparkSession
-
 import java.security.MessageDigest
 import java.text.SimpleDateFormat
 import java.util.regex.Pattern
 import java.util.{Calendar, Date, Locale}
 
+import scala.collection.mutable
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/6/3 18:49
@@ -482,11 +483,47 @@ object BaseUtil {
     re
   }
 
+  def split_names(json_array: String, json_path: String): Seq[String] = {
+    try {
+      if (StringUtils.isEmpty(json_array)) {
+        return Seq.empty
+      }
+
+      val set = mutable.Set[String]()
+      val jsonArr = json_array.split("&", -1)
+      val pathArr = json_path.split("&", -1)
+      var path = pathArr.head
+      for (i <- 0 until jsonArr.length) {
+        if (pathArr.length > 1) {
+          path = pathArr(i)
+        }
+        parse(jsonArr(i), json_path, set)
+      }
+
+      set.toSeq
+    } catch {
+      case e: Exception => {
+        println(json_array)
+        Seq.empty
+      }
+    }
+  }
+
+  def parse(json: String, json_path: String, set: mutable.Set[String]): Unit = {
+    val jsonArray = JSONPath.eval(JSON.parse(json), json_path).asInstanceOf[JSONArray].toArray[String](Array())
+    for (o <- jsonArray) {
+      set.add(o)
+    }
+  }
+
   def main(args: Array[String]): Unit = {
+    val json_array = "[{\"name\": \"史某某\", \"litigant_id\": \"\"}, {\"name\": \"丁某某\", \"litigant_id\": \"\"}, {\"name\": \"杨某\", \"litigant_id\": \"\"}, {\"name\": \"陈某某\", \"litigant_id\": \"\"}]&[{\"name\": \"丁某某\", \"litigant_id\": \"\"}, {\"name\": \"史某某\", \"litigant_id\": \"\"}]&[{\"name\": \"杨某\", \"litigant_id\": \"\"}, {\"name\": \"陈某某\", \"litigant_id\": \"\"}]"
+    val json_path = "$.name"
+    println(split_names(json_array,json_path))
 //    println(nameCleanup("小米科技.;有,@限公  司  雷军"))
     //    println(title("xx", null, "reason"))
     //    println(parseAddress("大石桥市人民法院"))
-        println(case_no_trim_v2("(2019)年中国贸仲京裁字第0394号号"))
+      //  println(case_no_trim_v2("(2019)年中国贸仲京裁字第0394号号"))
     //    val seq = Seq("1", "3", "2", "7").mkString("\001")
     //    println(sortString(seq))
     //println(caseStage("(2019)鄂初7号"))