@@ -0,0 +1,368 @@
+package com.winhc.bigdata.spark.ng.jobs
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.sql.SparkSession
+import scala.annotation.meta.getter
+import scala.collection.mutable
+ * @Author: XuJiakai
+ * @Date: 2020/10/10 10:23
+ * @Description: winhc_ng空间下摘要增强版
+ */
+case class GroupByInfoNg(field: String, value_alias: Seq[(String, String)])
+case class CompanySummaryNg(s: SparkSession,
+ project: String, //表所在工程名
+ tableName: String, //表名(不加前辍)
+ companyIdField: String, // 公司id fieldName,例如:split(rowkey,'_')
+ distinctField: String = "rowkey", //去重主键
+ groupByInfo: GroupByInfoNg = null, //group的其它条件
+ where: String = "", //where条件,例如:deleted = 0
+ sortField: String = "ds"
+ ) extends LoggingUtils {
+ @(transient@getter) val spark: SparkSession = s
+ private val f_bytes: Array[Byte] = Bytes.toBytes("F")
+ private val name_bytes: Array[Byte] = Bytes.toBytes(tableName.toUpperCase)
+ val ads_table = s"${project}.ads_$tableName" //存量ads表
+ val inc_ads_table = s"${project}.inc_ads_$tableName"
+ val new_cols = getColumns(ads_table).intersect(getColumns(inc_ads_table))
+ private def create_or_replace_summary(target_tab: String): Unit = {
+ val ddl = groupByInfo == null match {
+ case true => s"$tableName BIGINT"
+ case false => groupByInfo.value_alias.map(r => {
+ s"${r._2} BIGINT"
+ }).mkString(",")
+ }
+ val d =
+ s"""
+ |(
+ | company_id string
+ | ,${ddl}
+ |)
+ |COMMENT 'summary tmp,create by ${BaseUtil.nowDate(pattern = "yyyy-MM-dd HH:mm:ss")}'
+ |""".stripMargin
+ if (spark.catalog.tableExists(target_tab)) {
+ sql(
+ s"""
+ |DROP TABLE IF EXISTS $target_tab
+ |""".stripMargin)
+ }
+ sql(d)
+ }
+ def calc(is_inc: Boolean = true, target_tab: String = ""): Unit = {
+ val ads_last_ds = getLastPartitionsOrElse(ads_table, "0")
+ val inc_ads_last_ds = getLastPartitionsOrElse(inc_ads_table, "0")
+ val wh = StringUtils.isEmpty(where) match {
+ case true => s""
+ case false => s"AND $where"
+ }
+ val tmp_tab = "inc_tmp_view"
+ is_inc match {
+ case true => {
+ sql(
+ s"""
+ |SELECT ${new_cols.map(getCastCols(_, "org_tab.")).mkString(",")}
+ |FROM (
+ | SELECT DISTINCT $companyIdField as xjk_cid
+ | FROM $inc_ads_table
+ | WHERE ds = $inc_ads_last_ds
+ | ) id_table
+ |JOIN (
+ | SELECT ${new_cols.map(getCastCols(_, "")).mkString(",")}
+ | ,$companyIdField as xjk_cid
+ | FROM $inc_ads_table
+ | WHERE ds > '$ads_last_ds'
+ | AND ds < '$inc_ads_last_ds'
+ | SELECT ${new_cols.map(getCastCols(_, "")).mkString(",")}
+ | ,$companyIdField as xjk_cid
+ | FROM $ads_table
+ | WHERE ds = '$ads_last_ds'
+ | ) org_tab
+ |ON id_table.xjk_cid = org_tab.xjk_cid
+ |SELECT ${new_cols.map(getCastCols(_, "")).mkString(",")}
+ |FROM $inc_ads_table
+ |WHERE ds = $inc_ads_last_ds
+ |""".stripMargin)
+ .createOrReplaceTempView(tmp_tab)
+ }
+ case false => {
+ sql(
+ s"""
+ |SELECT ${new_cols.map(getCastCols(_, "")).mkString(",")}
+ |FROM $ads_table
+ |WHERE ds = '$ads_last_ds'
+ |SELECT ${new_cols.map(getCastCols(_, "")).mkString(",")}
+ |FROM $inc_ads_table
+ |WHERE ds > $ads_last_ds
+ |""".stripMargin)
+ .createOrReplaceTempView(tmp_tab)
+ }
+ }
+ val distinct_tab = s"${tmp_tab}_distinct"
+ sql(
+ s"""
+ |SELECT ${new_cols.map(getCastCols(_, "")).mkString(",")}
+ |FROM (
+ | SELECT tmp.*
+ | ,ROW_NUMBER() OVER(PARTITION BY $distinctField ORDER BY $sortField DESC ) c
+ | FROM $tmp_tab AS tmp
+ | ) tmp2
+ |WHERE tmp2.c = 1
+ |$wh
+ |""".stripMargin)
+ .createOrReplaceTempView(distinct_tab)
+ val view = groupByInfo == null match {
+ case true => s"arr[0] as $tableName"
+ case false => groupByInfo.value_alias.indices.map(i => {
+ s"arr[$i] as ${groupByInfo.value_alias(i)._2}"
+ }).mkString(",")
+ }
+ //注册函数
+ if (groupByInfo != null) {
+ val fieldSeq = groupByInfo.value_alias.map(r => {
+ (s"${r._1}", r._2)
+ })
+ def getResArr(group_val: String, num: Long): Seq[Long] = {
+ val res = scala.collection.mutable.ArrayBuffer[Long]()
+ for (i <- fieldSeq) {
+ if (i._1.equals(group_val)) {
+ res += num
+ } else {
+ res += 0
+ }
+ }
+ res.toSeq
+ }
+ spark.udf.register("xjk_func", getResArr _)
+ }
+ val groupKey_show = groupByInfo == null match {
+ case true => s",array(count(1)) as arr"
+ case false => s",xjk_func(cast(${groupByInfo.field} as STRING),count(1)) as arr"
+ }
+ val groupKey = groupByInfo == null match {
+ case true => s""
+ case false => s",${groupByInfo.field}"
+ }
+ sql(
+ s"""
+ |SELECT company_id
+ | ,${view}
+ |FROM (
+ | SELECT $companyIdField as company_id
+ | $groupKey_show
+ | FROM $distinct_tab
+ | GROUP BY $companyIdField ${groupKey}
+ |)
+ |""".stripMargin)
+ .createOrReplaceTempView("summary_tab")
+ if (groupByInfo != null) {
+ sql(
+ s"""
+ |SELECT company_id
+ | ,${groupByInfo.value_alias.map(_._2).map(f => s"sum($f) as $f").mkString(",")}
+ |FROM summary_tab
+ |GROUP BY company_id
+ |""".stripMargin)
+ .createOrReplaceTempView("summary_tab")
+ }
+ if (StringUtils.isEmpty(target_tab)) {
+ import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
+ val writF = getColumns("summary_tab").diff(Seq("company_id"))
+ sql(
+ s"""
+ | summary_tab
+ |""".stripMargin)
+ .save2HBase("NG_COMPANY_SUMMARY", "company_id", writF)
+ /*create_or_replace_summary(target_tab)
+ sql(
+ s"""
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab
+ | summary_tab
+ |""".stripMargin)*/
+ } else {
+ create_or_replace_summary(target_tab)
+ sql(
+ s"""
+ |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_tab
+ | summary_tab
+ |""".stripMargin)
+ }
+ }
+ private def getCastCols(name: String, pre: String): String = {
+ val list = List("cid", "new_cid", "ncid")
+ if (list.contains(name)) {
+ return s"CAST(${pre}${name} as BIGINT) $name"
+ }
+ pre + name
+ }
+object CompanySummaryNg {
+ def run(spark: SparkSession, tab: String, target_tab: String = null): Unit = {
+ var csp: CompanySummaryNg = null
+ val project = "winhc_ng"
+ tab match {
+ case "company_zxr_final_case" => {
+ //终本案件
+ csp = CompanySummaryNg(s = spark
+ , project = project
+ , tableName = "company_zxr_final_case"
+ , companyIdField = "split(rowkey,'_')[0]"
+ , where = "deleted = 0"
+ , groupByInfo = GroupByInfoNg(field = "status", value_alias = Seq(
+ ("0", "company_zxr_final_case_0")
+ , ("1", "company_zxr_final_case_1")
+ ))
+ )
+ }
+ case "company_dishonest_info" => {
+ //失信人
+ csp = CompanySummaryNg(s = spark
+ , project = project
+ , tableName = "company_dishonest_info"
+ , companyIdField = "split(rowkey,'_')[0]"
+ , where = "deleted = 0"
+ , groupByInfo = GroupByInfoNg(field = "status", value_alias = Seq(
+ ("0", "company_dishonest_info_0")
+ , ("1", "company_dishonest_info_1")
+ ))
+ )
+ }
+ case "company_zxr_list" => {
+ //执行人
+ csp = CompanySummaryNg(s = spark
+ , project = project
+ , tableName = "company_zxr_list"
+ , companyIdField = "split(rowkey,'_')[0]"
+ , where = "deleted = 0"
+ , groupByInfo = GroupByInfoNg(field = "status", value_alias = Seq(
+ ("0", "company_zxr_list_0")
+ , ("1", "company_zxr_list_1")
+ ))
+ )
+ }
+ case "company_zxr_restrict" => {
+ //限制高消费
+ csp = CompanySummaryNg(s = spark
+ , project = project
+ , tableName = "company_zxr_restrict"
+ , companyIdField = "split(rowkey,'_')[0]"
+ , where = "deleted = 0"
+ , groupByInfo = GroupByInfoNg(field = "status", value_alias = Seq(
+ ("0", "company_zxr_restrict_0")
+ , ("1", "company_zxr_restrict_1")
+ ))
+ )
+ }
+ case "company_land_mortgage" => {
+ //土地抵押
+ csp = CompanySummaryNg(s = spark
+ , project = project
+ , tableName = "company_land_mortgage"
+ , companyIdField = "split(rowkey,'_')[0]"
+ // , where = "deleted = 0"
+ , groupByInfo = GroupByInfoNg(field = "type", value_alias = Seq(
+ ("mortgagor", "company_land_mortgage_mortgagor")
+ , ("mortgagee", "company_land_mortgage_mortgagee")
+ , ("bothsame", "company_land_mortgage_bothsame")
+ , ("bothone", "company_land_mortgage_bothone")
+ , ("bothtwo", "company_land_mortgage_bothtwo")
+ ))
+ )
+ }
+ case "company_land_transfer" => {
+ //土地转让
+ csp = CompanySummaryNg(s = spark
+ , project = project
+ , tableName = "company_land_transfer"
+ , companyIdField = "split(rowkey,'_')[0]"
+ // , where = "deleted = 0"
+ , groupByInfo = GroupByInfoNg(field = "type", value_alias = Seq(
+ ("pre", "company_land_transfer_pre")
+ , ("now", "company_land_transfer_now")
+ , ("bothsame", "company_land_transfer_bothsame")
+ , ("bothone", "company_land_transfer_bothone")
+ , ("bothtwo", "company_land_transfer_bothtwo")
+ ))
+ )
+ }
+ case _ =>
+ csp = CompanySummaryNg(s = spark
+ , project = project
+ , tableName = tab
+ , companyIdField = "split(rowkey,'_')[0]"
+ ,where = "instr(rowkey,'_') != 0 and deleted = 0"
+ )
+ }
+ if (target_tab == null)
+ csp.calc()
+ else
+ csp.calc(is_inc = false, target_tab = target_tab)
+ }
+ def main(args: Array[String]): Unit = {
+ val Array(tab) = args
+ println(
+ s"""
+ |tab: $tab
+ |""".stripMargin)
+ val config = mutable.Map(
+ "spark.hadoop.odps.project.name" -> "winhc_ng",
+ "spark.debug.maxToStringFields" -> "200",
+ "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+ )
+ val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
+// run(spark, tab,"winhc_ng.tmp_xjk_summary")
+ spark.stop()
+ }