소스 검색

feat: 个人索引

许家凯 3 년 전
부모
커밋
3ef49679b4
1개의 변경된 파일340개의 추가작업 그리고 0개의 파일을 삭제
  1. 340 0
      src/main/scala/com/winhc/bigdata/spark/ng/utils/PersonSummaryNg_new.scala

+ 340 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/PersonSummaryNg_new.scala

@@ -0,0 +1,340 @@
+package com.winhc.bigdata.spark.ng.utils
+
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions.{col, struct, to_json}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/10/10 10:23
+ * @Description: winhc_ng空间下摘要增强版
+ */
+
+case class PersonSummaryNg_new(s: SparkSession,
+                               project: String //表所在工程名
+                               , args: Seq[company_summary_args]
+                              ) extends LoggingUtils {
+  @(transient@getter) val spark: SparkSession = s
+  private val target_tab = "winhc_ng.out_es_summary_person"
+  init()
+
+  private def init() {
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS $target_tab
+         |(
+         |    company_id  STRING COMMENT '人id'
+         |    ,summary STRING COMMENT '格子中的摘要信息,json格式'
+         |    ,detail STRING COMMENT '个别维度详细的摘要信息'
+         |)
+         |COMMENT 'out es person summary,create by ${BaseUtil.nowDate(pattern = "yyyy-MM-dd HH:mm:ss")}'
+         |PARTITIONED BY (ds STRING COMMENT '分区')
+         |LIFECYCLE 15
+         |""".stripMargin)
+  }
+
+
+  private def get_table_data(arg: company_summary_args): String = {
+    val ds = getLastPartitionsOrElse(target_tab, null)
+
+    val tab = arg.table_name
+    val companyIdField = arg.companyIdField
+    val distinctField = arg.distinctField
+
+    val sortField = arg.sortField
+    val where = if (StringUtils.isEmpty(arg.where)) {
+      s""
+    } else {
+      s"AND   ${arg.where}"
+    }
+
+
+    val ads_table = s"${project}.ads_$tab" //存量ads表
+    val inc_ads_table = s"${project}.inc_ads_$tab"
+
+    val inc_ads_last_ds = getLastPartitionsOrElse(inc_ads_table, "0")
+
+    val new_cols = getColumns(ads_table).intersect(getColumns(inc_ads_table))
+
+    val ads_last_ds = getLastPartitionsOrElse(ads_table, "0")
+    val tab_tmp_view = s"${arg.winhc_hash()}_data"
+
+    if (ds == null) {
+      sql(
+        s"""
+           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |FROM    $ads_table
+           |WHERE   ds = '$ads_last_ds'
+           |UNION ALL
+           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |FROM    $inc_ads_table
+           |WHERE   ds > $ads_last_ds
+           |""".stripMargin)
+        .createOrReplaceTempView(tab_tmp_view)
+    } else {
+      sql(
+        s"""
+           |SELECT  ${new_cols.map(getCastCols(_, "org_tab.")).mkString(",")}
+           |FROM    (
+           |            SELECT  DISTINCT $companyIdField as xjk_cid
+           |            FROM    $inc_ads_table
+           |            WHERE   ds = $inc_ads_last_ds
+           |        ) id_table
+           |JOIN (
+           |              SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |                      ,$companyIdField as xjk_cid
+           |              FROM    $inc_ads_table
+           |              WHERE   ds > '$ads_last_ds'
+           |              AND     ds < '$inc_ads_last_ds'
+           |              UNION ALL
+           |              SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |                      ,$companyIdField as xjk_cid
+           |              FROM    $ads_table
+           |              WHERE   ds = '$ads_last_ds'
+           |          ) org_tab
+           |ON      id_table.xjk_cid = org_tab.xjk_cid
+           |UNION ALL
+           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |FROM    $inc_ads_table
+           |WHERE   ds = $inc_ads_last_ds
+           |""".stripMargin)
+        .createOrReplaceTempView(tab_tmp_view)
+    }
+
+    val distinct_tab = s"${tab_tmp_view}_distinct"
+    sql(
+      s"""
+         |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+         |FROM    (
+         |            SELECT  tmp.*
+         |                    ,ROW_NUMBER() OVER(PARTITION BY $distinctField ORDER BY $sortField DESC ) c
+         |            FROM    $tab_tmp_view AS tmp
+         |        ) tmp2
+         |WHERE   tmp2.c = 1
+         |$where
+         |""".stripMargin)
+      .createOrReplaceTempView(distinct_tab)
+    distinct_tab
+  }
+
+  private def get_tab_summary(arg: company_summary_args): String = {
+    val tab_hash = arg.winhc_hash()
+
+    val tab = arg.table_name
+    val companyIdField = arg.companyIdField
+
+    val result_tab = s"${tab_hash}_summary_tab"
+
+    val all_tab = get_table_data(arg)
+
+    val func_name = s"xjk_func_${tab_hash}"
+
+    val view = if (arg.groupByInfo == null) {
+      if (arg.field_prefix != null) {
+        s"arr[0] as ${arg.field_prefix}"
+      } else {
+        s"arr[0] as $tab"
+      }
+
+    } else {
+      arg.groupByInfo.value_alias.indices.map(i => {
+        s"arr[$i] as ${arg.groupByInfo.value_alias(i)._2}"
+      }).mkString(",")
+    }
+
+    //注册函数
+    if (arg.groupByInfo != null) {
+      val fieldSeq = arg.groupByInfo.value_alias.map(r => {
+        (s"${r._1}", r._2)
+      })
+
+      def getResArr(group_val: String, num: Long): Seq[Long] = {
+        val res = scala.collection.mutable.ArrayBuffer[Long]()
+        for (i <- fieldSeq) {
+          if (i._1.equals(group_val)) {
+            res += num
+          } else {
+            res += 0
+          }
+        }
+        res
+      }
+
+      spark.udf.register(func_name, getResArr _)
+    }
+
+    val groupKey_show = if (arg.groupByInfo == null) {
+      s",array(count(1)) as arr"
+    } else {
+      s",$func_name(cast(${arg.groupByInfo.field} as STRING),count(1)) as arr"
+    }
+
+    val groupKey = if (arg.groupByInfo == null) {
+      s""
+    } else {
+      s",${arg.groupByInfo.field}"
+    }
+
+    sql(
+      s"""
+         |SELECT  company_id
+         |        ,${view}
+         |FROM    (
+         |        SELECT  $companyIdField as company_id
+         |                $groupKey_show
+         |        FROM    $all_tab
+         |        GROUP BY $companyIdField ${groupKey}
+         |)
+         |""".stripMargin)
+      .createOrReplaceTempView(result_tab)
+
+    if (arg.groupByInfo != null) {
+      sql(
+        s"""
+           |SELECT  company_id
+           |        ,${arg.groupByInfo.value_alias.map(_._2).map(f => s"sum($f) as $f").mkString(",")}
+           |FROM    $result_tab
+           |GROUP BY company_id
+           |""".stripMargin)
+        .createOrReplaceTempView(result_tab)
+    }
+    result_tab
+  }
+
+
+  def calc(): Unit = {
+    val summary_tab = "summary_tab_xjk"
+    val summary_tabs = args.map(get_tab_summary).seq
+    val merge = merge_table(spark, summary_tabs, "company_id")
+    merge.calc(summary_tab)
+    val cols = getColumns(summary_tab).diff(Seq("company_id"))
+
+    sql(
+      s"""
+         |select * from $summary_tab
+         |""".stripMargin)
+      .withColumn("summary", to_json(struct(cols.map(col): _*))).createTempView("xjk_tmp_summary_tab")
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}')
+         |SELECT company_id,summary,null as detail
+         |FROM
+         |    xjk_tmp_summary_tab
+         |""".stripMargin)
+    merge.drop()
+  }
+
+  private def getCastCols(name: String, pre: String): String = {
+    val list = List("cid", "new_cid", "ncid")
+    if (list.contains(name)) {
+      return s"CAST(${pre}${name} as BIGINT) $name"
+    }
+    pre + name
+  }
+}
+
+object PersonSummaryNg_new {
+
+  private def get_default_summary_args(tableName: String, person_id: String): company_summary_args = {
+    company_summary_args(
+      table_name = tableName
+      , companyIdField = person_id
+      , where = s"$person_id is not null and length($person_id) = 33 "
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"${tableName}_del_0")
+        , ("1", s"${tableName}_del_1")
+      ))
+    )
+  }
+
+  private val start_args = Seq(
+    get_default_summary_args("company_zxr", "keyno")
+    , get_default_summary_args("company_dishonest_info", "keyno")
+    , get_default_summary_args("company_zxr_restrict", "pid")
+    , get_default_summary_args("company_judicial_assistance", "fz_executed_person_id")
+    , get_default_summary_args("restrictions_on_exit", "limited_person_pid")
+    , get_default_summary_args("company_zxr_final_case", "keyno")
+
+
+    , company_summary_args(table_name = "company_court_open_announcement_explode"
+      , companyIdField = "plaintiff_info_id_explode"
+      , distinctField = "rowkey,plaintiff_info_id_explode"
+      , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 33"
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_court_open_announcement_del_0_plaintiff")
+        , ("1", s"company_court_open_announcement_del_1_plaintiff")
+      ))
+    )
+    , company_summary_args(table_name = "company_court_open_announcement_explode"
+      , companyIdField = "defendant_info_id_explode"
+      , distinctField = "rowkey,defendant_info_id_explode"
+      , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 33 "
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_court_open_announcement_del_0_defendant")
+        , ("1", s"company_court_open_announcement_del_1_defendant")
+      ))
+    )
+
+
+    , company_summary_args(table_name = "company_court_announcement_explode"
+      , companyIdField = "plaintiff_info_id_explode"
+      , distinctField = "rowkey,plaintiff_info_id_explode"
+      , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 33"
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_court_announcement_del_0_plaintiff")
+        , ("1", s"company_court_announcement_del_1_plaintiff")
+      ))
+    )
+    , company_summary_args(table_name = "company_court_announcement_explode"
+      , companyIdField = "litigant_info_id_explode"
+      , distinctField = "rowkey,litigant_info_id_explode"
+      , where = "litigant_info_id_explode is not null and length(litigant_info_id_explode) = 33"
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_court_announcement_del_0_litigant")
+        , ("1", s"company_court_announcement_del_1_litigant")
+      ))
+    )
+
+    , company_summary_args(table_name = "company_court_register_explode"
+      , companyIdField = "plaintiff_info_id_explode"
+      , distinctField = "rowkey,plaintiff_info_id_explode"
+      , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 33 "
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_court_register_del_0_plaintiff")
+        , ("1", s"company_court_register_del_1_plaintiff")
+      ))
+    )
+
+    , company_summary_args(table_name = "company_court_register_explode"
+      , companyIdField = "defendant_info_id_explode"
+      , distinctField = "rowkey,defendant_info_id_explode"
+      , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 33 "
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_court_register_del_0_defendant")
+        , ("1", s"company_court_register_del_1_defendant")
+      ))
+    )
+
+
+
+  )
+
+
+  def main(args: Array[String]): Unit = {
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_ng",
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+
+    val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
+    PersonSummaryNg_new(s = spark, project = "winhc_ng", args = start_args).calc()
+    spark.stop()
+  }
+}