ソースを参照

Merge remote-tracking branch 'origin/master'

许家凯 3 年 前
コミット
88fc543b56

+ 218 - 0
src/main/scala/com/winhc/bigdata/spark/ng/relation/lookup_tab_pid.scala

@@ -0,0 +1,218 @@
+package com.winhc.bigdata.spark.ng.relation
+
+import com.alibaba.fastjson.{JSON, JSONArray, JSONPath}
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.{isWindows, is_json_str}
+import com.winhc.bigdata.spark.utils.{BaseUtil, CompanyRelationUtils, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions.{col, struct, to_json}
+
+import scala.collection.mutable
+
+/**
+ * @Description:pid回溯查询
+ * @author π
+ * @date 2021/3/15 10:59
+ */
+case class args_job(tableName: String
+                    , rowkey: String = "rowkey" //主键rowkey
+                    , explode: String = "" // 炸开语句
+                    , keyno: String = "" // id字段
+                   )
+
+object args_job {
+  val tab_args = Seq(
+    args_job(tableName = "company_court_open_announcement"
+      , explode = "LATERAL VIEW explode(jsonall_2_array('$.litigant_id', concat_ws('\u0001', defendant_info, plaintiff_info, litigant_info)) ) key AS key_no"
+      , keyno = "key_no"
+    )
+    , args_job(tableName = "company_court_announcement"
+      , explode = "LATERAL VIEW explode(jsonall_2_array('$.litigant_id', concat_ws('\u0001', plaintiff_info, litigant_info)) ) key AS key_no"
+      , keyno = "key_no"
+    )
+    , args_job(tableName = "company_send_announcement"
+      , explode = "LATERAL VIEW explode(jsonall_2_array('$.litigant_id', concat_ws('\u0001', defendant_info, plaintiff_info, litigant_info)) ) key AS key_no"
+      , keyno = "key_no"
+    )
+    , args_job(tableName = "company_court_register"
+      , explode = "LATERAL VIEW explode(jsonall_2_array('$.litigant_id', concat_ws('\u0001', defendant_info, plaintiff_info, litigant_info)) ) key AS key_no"
+      , keyno = "key_no"
+    )
+    , args_job(tableName = "company_zxr_final_case", keyno = "keyno")
+    , args_job(tableName = "company_equity_info"
+      , explode = "LATERAL VIEW explode(jsonall_2_array('$.pledgor_id\u0001$.pledgee_id', concat_ws('\u0001', pledgor_info, pledgee_info)) ) key AS key_no"
+      , keyno = "key_no"
+    )
+    , args_job(tableName = "company_zxr", keyno = "keyno")
+    , args_job(tableName = "company_dishonest_info", keyno = "keyno")
+    , args_job(tableName = "company_zxr_restrict", keyno = "pid")
+    , args_job(tableName = "zxr_evaluate_results", keyno = "keyno")
+    , args_job(tableName = "zxr_evaluate", keyno = "keyno")
+    , args_job(tableName = "restrictions_on_exit"
+      , explode = "LATERAL VIEW explode( split(concat_ws('\u0001', limited_person_pid, executed_person_keyno),'\u0001') ) key AS keyno"
+      , keyno = "keyno"
+    )
+  )
+
+  def get_args_company_job(tn: String): args_job = {
+    tab_args.find(p => tn.equals(p.tableName)).getOrElse(throw new NullPointerException("tn is not fount"))
+  }
+
+}
+
+case class lookup_tab_pid(s: SparkSession
+                          , project: String //表所在工程名
+                          , args_job: args_job //入参
+                         ) extends LoggingUtils with BaseFunc with CompanyMapping {
+  override protected val spark: SparkSession = s
+  val tn: String = args_job.tableName
+  val rowkey: String = args_job.rowkey
+  val ads_tab = s"$project.ads_$tn"
+  val inc_ads_tab = s"$project.inc_ads_$tn"
+  val keyno: String = args_job.keyno
+
+  val tab_back_deleted = s" $project.inc_ads_company_human_relation_back_deleted"
+  val tar_tab = s"$project.tmp_xf_person_id_rowkey"
+
+  var tar_cols: Seq[String] = getColumns(tar_tab).diff(Seq("ds", "tn", "message"))
+  val sort: String = get_partition_order_by()
+  val inter_cols: Seq[String] = getColumns(ads_tab).intersect(getColumns(inc_ads_tab))
+
+  val ds: String = {
+    var par = BaseUtil.getPartion(tar_tab, tn, spark)
+    if (StringUtils.isBlank(par)) {
+      par = "0"
+    }
+    par
+  }
+  val lastDs: String = BaseUtil.getYesterday()
+
+  register()
+
+
+  private def register(): Unit = {
+
+    def json_2_array(json_path: String, json_array: String): Seq[String] = {
+      try {
+        if (StringUtils.isEmpty(json_array)) {
+          return Seq.empty
+        }
+        if (!is_json_str(json_array)) {
+          return Seq.empty
+        }
+        JSONPath.eval(JSON.parse(json_array), json_path).asInstanceOf[JSONArray].toArray[String](Array()).toSeq.distinct.diff(Seq(""))
+      } catch {
+        case e: Exception => {
+          println(json_array)
+          Seq.empty
+        }
+      }
+    }
+
+    def jsonall_2_array(json_path: String, jsonall: String): Seq[String] = {
+      var re: Seq[String] = Seq.empty
+      if (StringUtils.isEmpty(jsonall)) return re
+      val pathArr = json_path.split("\\u0001", -1)
+      val jsonArr = jsonall.split("\\u0001", -1)
+      var path = pathArr.head
+      for (i <- 0 until jsonArr.length) {
+        if (pathArr.length > 1) {
+          path = pathArr(i)
+        }
+        re ++= json_2_array(path, jsonArr(i))
+      }
+      re.distinct.diff(Seq(""))
+    }
+
+    spark.udf.register("jsonall_2_array", (json_path: String, s2: String) => {
+      jsonall_2_array(json_path, s2)
+    })
+
+    def get_table_message(person_id: String, rowkey: String, table: String): String =
+      CompanyRelationUtils.get_table_message(person_id, rowkey, table)
+
+    spark.udf.register("get_table_message", get_table_message _)
+  }
+
+
+  def calc() = {
+
+    sql(
+      s"""
+         |SELECT  person_id
+         |FROM    $tab_back_deleted
+         |WHERE   ds > $ds
+         |GROUP BY person_id
+         |""".stripMargin).createOrReplaceTempView("mapping")
+
+    sql(
+      s"""
+         |SELECT *
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY ${rowkey} ORDER BY $sort) AS num
+         |            FROM    (
+         |                        SELECT  ${inter_cols.mkString(",")}
+         |                        FROM    $ads_tab
+         |                        WHERE   ds > 0
+         |                        UNION ALL
+         |                        SELECT  ${inter_cols.mkString(",")}
+         |                        FROM    $inc_ads_tab
+         |                        WHERE   ds > 0
+         |                    ) AS t1
+         |        ) AS t2
+         |WHERE   t2.num = 1
+         |""".stripMargin).createOrReplaceTempView("tab_tmp")
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $tar_tab PARTITION(ds='$lastDs',tn='$tn')
+         |SELECT  ${tar_cols.mkString(",")},
+         |        get_table_message(${tar_cols.mkString(",")}, '$tn') message
+         |FROM    mapping a
+         |JOIN    (
+         |            SELECT  ${rowkey},$keyno
+         |            FROM  tab_tmp
+         |            ${args_job.explode}
+         |        ) b
+         |ON      a.person_id = b.$keyno
+         |""".stripMargin).show(10, false)
+
+    sql(
+      s"""
+         |ALTER TABLE $tar_tab ADD IF NOT EXISTS PARTITION(ds='$lastDs',tn='$tn')
+         |""".stripMargin)
+
+  }
+
+  private def get_partition_order_by(): String = {
+    if (ads_tab.contains("update_time")) {
+      " ds DESC,update_time DESC "
+    } else {
+      " ds DESC "
+    }
+  }
+
+}
+
+object lookup_tab_pid {
+  def main(args: Array[String]): Unit = {
+    if (args.size != 2) {
+      println(args.mkString(","))
+      println("please set project ds.")
+      sys.exit(-1)
+    }
+    val Array(project, tn) = args
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    val re = lookup_tab_pid(s = spark, project = project, args_job.get_args_company_job(tn))
+    re.calc()
+    spark.stop()
+  }
+}

+ 4 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyRelationUtils.scala

@@ -21,6 +21,8 @@ case class relation_legal_entity(start_id: String, start_name: String, end_id: S
 
 case class success_status(ds: String, status: String, topic_type: String)
 
+case class table_message(person_id: String, rowkey: String, table: String)
+
 object CompanyRelationUtils {
 
   def get_company_node(id: String, name: String, deleted: String, topic_type: String): String =
@@ -44,4 +46,6 @@ object CompanyRelationUtils {
   def get_success_status(ds: String, status: String, topic_type: String): String =
     success_status(ds, status, topic_type).toJson()
 
+  def get_table_message(person_id: String, rowkey: String, table: String): String =
+    table_message(person_id, rowkey, table).toJson()
 }