许家凯 4 роки тому
батько
коміт
9427944984

+ 4 - 0
src/main/resources/env.yaml

@@ -6,6 +6,7 @@ env:
   name: dev-local
   config:
     es.nodes: es-cn-0pp0r32zf000ipovd.public.elasticsearch.aliyuncs.com
+    es.eci.nodes: es-cn-oew22t8bw002iferu.public.elasticsearch.aliyuncs.com
     zk.address: hb-proxy-pub-uf63a7d09rpl8mcvm-001.hbase.rds.aliyuncs.com:2181
     phoenix.address: http://hb-proxy-pub-uf63a7d09rpl8mcvm-001.hbase.rds.aliyuncs.com:8765
 
@@ -14,6 +15,7 @@ env:
   name: dev-remote
   config:
     es.nodes: es-cn-0pp0r32zf000ipovd.elasticsearch.aliyuncs.com
+    es.eci.nodes: es-cn-oew22t8bw002iferu.elasticsearch.aliyuncs.com
     zk.address: hb-uf63a7d09rpl8mcvm-001.hbase.rds.aliyuncs.com:2181
     phoenix.address: http://hb-uf63a7d09rpl8mcvm-001.hbase.rds.aliyuncs.com:8765
 
@@ -22,6 +24,7 @@ env:
   name: prod-local
   config:
     es.nodes: es-cn-0pp0r32zf000ipovd.public.elasticsearch.aliyuncs.com
+    es.eci.nodes: es-cn-oew22t8bw002iferu.public.elasticsearch.aliyuncs.com
     zk.address: hb-proxy-pub-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com:2181
     phoenix.address: http://hb-uf6m8e1nu4ivp06m5-proxy-phoenix-pub.hbase.rds.aliyuncs.com:8765
 
@@ -30,5 +33,6 @@ env:
   name: prod-remote
   config:
     es.nodes: es-cn-0pp0r32zf000ipovd.elasticsearch.aliyuncs.com
+    es.eci.nodes: es-cn-oew22t8bw002iferu.elasticsearch.aliyuncs.com
     zk.address: hb-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com:2181,hb-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com:2181,hb-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com:2181
     phoenix.address: http://hb-uf6m8e1nu4ivp06m5-proxy-phoenix.hbase.rds.aliyuncs.com:8765

+ 67 - 27
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -2,7 +2,7 @@ package com.winhc.bigdata.spark.ng.jobs
 
 import com.alibaba.fastjson.JSON
 import com.winhc.bigdata.spark.config.EsConfig
-import com.winhc.bigdata.spark.ng.utils.explode_tab
+import com.winhc.bigdata.spark.ng.utils.{StartAndEndDsUtils, explode_tab}
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{DateUtils, LoggingUtils, SparkUtils}
@@ -23,19 +23,19 @@ case class general_handler(s: SparkSession,
                           ) extends LoggingUtils with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
 
-  val md5_fields: Seq[String] = job_args.md5_fields
-  val tn: String = job_args.tableName
-  val is_super_filter: Boolean = job_args.is_super_filter
-  val where: String = job_args.where
+  private val md5_fields: Seq[String] = job_args.md5_fields
+  private val tn: String = job_args.tableName
+  private val is_super_filter: Boolean = job_args.is_super_filter
+  private val where: String = job_args.where
 
 
-  val ods_tab = s"$project.ods_$tn"
-  val inc_ods_tab = s"$project.inc_ods_$tn"
+  private val ods_tab = s"$project.ods_$tn"
+  private val inc_ods_tab = s"$project.inc_ods_$tn"
 
-  val ads_tab = s"$project.ads_$tn"
-  val inc_ads_tab = s"$project.inc_ads_$tn"
+  private val ads_tab = s"$project.ads_$tn"
+  private val inc_ads_tab = s"$project.inc_ads_$tn"
 
-  val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
+  private val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
 
 
   verify()
@@ -184,13 +184,13 @@ case class general_handler(s: SparkSession,
   }
 
 
-  val rowkey_f = get_rowkey_udf()
+  private val rowkey_f = get_rowkey_udf()
 
 
-  val clean_up = get_clean_up()
+  private val clean_up = get_clean_up()
 
 
-  val up = get_partition_order_by()
+  private val up = get_partition_order_by()
 
 
   def all(): Unit = {
@@ -232,12 +232,12 @@ case class general_handler(s: SparkSession,
          |        ) AS t2
          |WHERE   t2.num = 1
          |""".stripMargin)
-    explode_calc(ads_tab, inc_ods_ds)
+    explode_calc()
   }
 
   def inc(): Unit = {
-    var org_ds = ""
-    var target_ds = ""
+    var org_ds: String = null
+    var target_ds: String = null
 
     val inc_ods_ds = getLastPartitionsOrElse(inc_ods_tab, null)
     val inc_ads_ds = getLastPartitionsOrElse(inc_ads_tab, null)
@@ -294,7 +294,7 @@ case class general_handler(s: SparkSession,
          |""".stripMargin)
 
     addEmptyPartitionOrSkip(inc_ads_tab, target_ds)
-    explode_calc(inc_ads_tab, target_ds)
+    explode_calc()
   }
 
 
@@ -307,33 +307,73 @@ case class general_handler(s: SparkSession,
   }
 
 
-  private def explode_calc(org_tab: String, ds: String): Unit = {
+  def explode_calc(): Unit = {
     if (job_args.explode_args == null) {
       return
     }
 
+    val ads_explode = s"winhc_ng.ads_${tn}_explode"
+    val inc_ads_explode = s"winhc_ng.inc_ads_${tn}_explode"
+
+
     val all_date_tmp_view = s"insert_${tn}"
     val all_date_explode_tmp_view = s"${all_date_tmp_view}_explode"
 
-    val explode_tab_name = s"${org_tab}_explode"
 
-    sql(
-      s"""
-         |select * from $org_tab where ds = '$ds'
-         |""".stripMargin)
-      .createTempView(all_date_tmp_view)
+    val args = StartAndEndDsUtils(spark).get_start_and_end_args(ads_tab, inc_ads_tab, ads_explode, inc_ads_explode)
+
+    if (args.inc) {
+      sql(
+        s"""
+           |SELECT  *
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY $up ) AS num
+           |            FROM    (
+           |                        SELECT  *
+           |                        FROM    $inc_ads_tab
+           |                        WHERE   ds > ${args.inc_tab_gt_ds}
+           |                    )
+           |        )
+           |WHERE   num = 1
+           |""".stripMargin)
+        .createTempView(all_date_tmp_view)
+    } else {
+      sql(
+        s"""
+           |SELECT  *
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY $up ) AS num
+           |            FROM    (
+           |                        SELECT  *
+           |                        FROM    $ads_tab
+           |                        WHERE   ds > 0
+           |                        UNION ALL
+           |                        SELECT  *
+           |                        FROM    $inc_ads_tab
+           |                        WHERE   ds > 0
+           |                    )
+           |        )
+           |WHERE   num = 1
+           |""".stripMargin)
+        .createTempView(all_date_tmp_view)
+    }
+
+    val explode_tab_name = args.inc match {
+      case true => inc_ads_explode
+      case false => inc_ads_explode
+    }
 
     explode_tab(spark, all_date_tmp_view, job_args.explode_args)
       .calc(all_date_explode_tmp_view)
 
     if (!spark.catalog.tableExists(explode_tab_name)) {
       //表不存在
-
     }
-
     sql(
       s"""
-         |INSERT OVERWRITE TABLE $explode_tab_name PARTITION(ds='$ds')
+         |INSERT OVERWRITE TABLE $explode_tab_name PARTITION(ds='${args.target_ds}')
          |SELECT ${getColumns(explode_tab_name).diff(Seq("ds")).mkString(",")}
          |FROM
          |    $all_date_explode_tmp_view

+ 32 - 276
src/main/scala/com/winhc/bigdata/spark/ng/utils/CompanySummaryNg_new.scala

@@ -1,11 +1,7 @@
 package com.winhc.bigdata.spark.ng.utils
 
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
-import org.apache.commons.lang3.StringUtils
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.functions.{col, struct, to_json}
+import com.winhc.bigdata.spark.utils.SparkUtils
 
-import scala.annotation.meta.getter
 import scala.collection.mutable
 
 /**
@@ -13,249 +9,10 @@ import scala.collection.mutable
  * @Date: 2020/10/10 10:23
  * @Description: winhc_ng空间下摘要增强版
  */
-case class GroupByInfoNg(field: String, value_alias: Seq[(String, String)])
-
-case class company_summary_args(table_name: String
-                                , companyIdField: String
-                                , field_prefix: String = null //如果没有 groupByInfo的话可使用此别名,默认为table_name
-                                , distinctField: String = "rowkey"
-                                , groupByInfo: GroupByInfoNg = null
-                                , where: String = ""
-                                , sortField: String = "ds"
-                               ) {
-  def winhc_hash(): String = {
-    BaseUtil.cleanup(s"${table_name}_${companyIdField}").replaceAll("[\\(\\)() ]", "")
-  }
-}
-
-case class CompanySummaryNg_new(s: SparkSession,
-                                project: String //表所在工程名
-                                , args: Seq[company_summary_args]
-                               ) extends LoggingUtils {
-  @(transient@getter) val spark: SparkSession = s
-  private val target_tab = "winhc_ng.out_es_summary"
-  init()
-
-  private def init() {
-    sql(
-      s"""
-         |CREATE TABLE IF NOT EXISTS $target_tab
-         |(
-         |    company_id  STRING COMMENT '公司id'
-         |    ,summary STRING COMMENT '格子中的摘要信息,json格式'
-         |    ,detail STRING COMMENT '个别维度详细的摘要信息'
-         |)
-         |COMMENT 'out es summary,create by ${BaseUtil.nowDate(pattern = "yyyy-MM-dd HH:mm:ss")}'
-         |PARTITIONED BY (ds STRING COMMENT '分区')
-         |LIFECYCLE 15
-         |""".stripMargin)
-  }
-
-
-  private def get_table_data(arg: company_summary_args): String = {
-    val ds = getLastPartitionsOrElse(target_tab, null)
-
-    val tab = arg.table_name
-    val companyIdField = arg.companyIdField
-    val distinctField = arg.distinctField
-
-    val sortField = arg.sortField
-    val where = if (StringUtils.isEmpty(arg.where)) {
-      s""
-    } else {
-      s"AND   ${arg.where}"
-    }
-
-
-    val ads_table = s"${project}.ads_$tab" //存量ads表
-    val inc_ads_table = s"${project}.inc_ads_$tab"
-
-    val inc_ads_last_ds = getLastPartitionsOrElse(inc_ads_table, "0")
-
-    val new_cols = getColumns(ads_table).intersect(getColumns(inc_ads_table))
-
-    val ads_last_ds = getLastPartitionsOrElse(ads_table, "0")
-    val tab_tmp_view = s"${arg.winhc_hash()}_data"
-
-    if (ds == null) {
-      sql(
-        s"""
-           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
-           |FROM    $ads_table
-           |WHERE   ds = '$ads_last_ds'
-           |UNION ALL
-           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
-           |FROM    $inc_ads_table
-           |WHERE   ds > $ads_last_ds
-           |""".stripMargin)
-        .createOrReplaceTempView(tab_tmp_view)
-    } else {
-      sql(
-        s"""
-           |SELECT  ${new_cols.map(getCastCols(_, "org_tab.")).mkString(",")}
-           |FROM    (
-           |            SELECT  DISTINCT $companyIdField as xjk_cid
-           |            FROM    $inc_ads_table
-           |            WHERE   ds = $inc_ads_last_ds
-           |        ) id_table
-           |JOIN (
-           |              SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
-           |                      ,$companyIdField as xjk_cid
-           |              FROM    $inc_ads_table
-           |              WHERE   ds > '$ads_last_ds'
-           |              AND     ds < '$inc_ads_last_ds'
-           |              UNION ALL
-           |              SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
-           |                      ,$companyIdField as xjk_cid
-           |              FROM    $ads_table
-           |              WHERE   ds = '$ads_last_ds'
-           |          ) org_tab
-           |ON      id_table.xjk_cid = org_tab.xjk_cid
-           |UNION ALL
-           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
-           |FROM    $inc_ads_table
-           |WHERE   ds = $inc_ads_last_ds
-           |""".stripMargin)
-        .createOrReplaceTempView(tab_tmp_view)
-    }
-
-    val distinct_tab = s"${tab_tmp_view}_distinct"
-    sql(
-      s"""
-         |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
-         |FROM    (
-         |            SELECT  tmp.*
-         |                    ,ROW_NUMBER() OVER(PARTITION BY $distinctField ORDER BY $sortField DESC ) c
-         |            FROM    $tab_tmp_view AS tmp
-         |        ) tmp2
-         |WHERE   tmp2.c = 1
-         |$where
-         |""".stripMargin)
-      .createOrReplaceTempView(distinct_tab)
-    distinct_tab
-  }
-
-  private def get_tab_summary(arg: company_summary_args): String = {
-    val tab_hash = arg.winhc_hash()
-
-    val tab = arg.table_name
-    val companyIdField = arg.companyIdField
-
-    val result_tab = s"${tab_hash}_summary_tab"
-
-    val all_tab = get_table_data(arg)
-
-    val func_name = s"xjk_func_${tab_hash}"
-
-    val view = if (arg.groupByInfo == null) {
-      if (arg.field_prefix != null) {
-        s"arr[0] as ${arg.field_prefix}"
-      } else {
-        s"arr[0] as $tab"
-      }
-
-    } else {
-      arg.groupByInfo.value_alias.indices.map(i => {
-        s"arr[$i] as ${arg.groupByInfo.value_alias(i)._2}"
-      }).mkString(",")
-    }
-
-    //注册函数
-    if (arg.groupByInfo != null) {
-      val fieldSeq = arg.groupByInfo.value_alias.map(r => {
-        (s"${r._1}", r._2)
-      })
-
-      def getResArr(group_val: String, num: Long): Seq[Long] = {
-        val res = scala.collection.mutable.ArrayBuffer[Long]()
-        for (i <- fieldSeq) {
-          if (i._1.equals(group_val)) {
-            res += num
-          } else {
-            res += 0
-          }
-        }
-        res
-      }
-
-      spark.udf.register(func_name, getResArr _)
-    }
-
-    val groupKey_show = if (arg.groupByInfo == null) {
-      s",array(count(1)) as arr"
-    } else {
-      s",$func_name(cast(${arg.groupByInfo.field} as STRING),count(1)) as arr"
-    }
-
-    val groupKey = if (arg.groupByInfo == null) {
-      s""
-    } else {
-      s",${arg.groupByInfo.field}"
-    }
-
-    sql(
-      s"""
-         |SELECT  company_id
-         |        ,${view}
-         |FROM    (
-         |        SELECT  $companyIdField as company_id
-         |                $groupKey_show
-         |        FROM    $all_tab
-         |        GROUP BY $companyIdField ${groupKey}
-         |)
-         |""".stripMargin)
-      .createOrReplaceTempView(result_tab)
-
-    if (arg.groupByInfo != null) {
-      sql(
-        s"""
-           |SELECT  company_id
-           |        ,${arg.groupByInfo.value_alias.map(_._2).map(f => s"sum($f) as $f").mkString(",")}
-           |FROM    $result_tab
-           |GROUP BY company_id
-           |""".stripMargin)
-        .createOrReplaceTempView(result_tab)
-    }
-    result_tab
-  }
-
-
-  def calc(): Unit = {
-    val summary_tab = "summary_tab_xjk"
-    val summary_tabs = args.map(get_tab_summary).seq
-    val merge = merge_table(spark, summary_tabs, "company_id")
-    merge.calc(summary_tab)
-    val cols = getColumns(summary_tab).diff(Seq("company_id"))
-
-    sql(
-      s"""
-         |select * from $summary_tab
-         |""".stripMargin)
-      .withColumn("summary", to_json(struct(cols.map(col): _*))).createTempView("xjk_tmp_summary_tab")
-
-    sql(
-      s"""
-         |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}')
-         |SELECT company_id,summary,null as detail
-         |FROM
-         |    xjk_tmp_summary_tab
-         |""".stripMargin)
-    merge.drop()
-  }
-
-  private def getCastCols(name: String, pre: String): String = {
-    val list = List("cid", "new_cid", "ncid")
-    if (list.contains(name)) {
-      return s"CAST(${pre}${name} as BIGINT) $name"
-    }
-    pre + name
-  }
-}
-
 object CompanySummaryNg_new {
 
-  private def get_default_summary_args(tableName: String): company_summary_args = {
-    company_summary_args(
+  private def get_default_summary_args(tableName: String): SummaryArgs = {
+    SummaryArgs(
       table_name = tableName
       , companyIdField = "split(rowkey,'_')[0]"
       , where = "instr(rowkey,'_') != 0"
@@ -266,8 +23,8 @@ object CompanySummaryNg_new {
     )
   }
 
-  private def get_default_summary_args(tableName: String, company_id: String): company_summary_args = {
-    company_summary_args(
+  private def get_default_summary_args(tableName: String, company_id: String): SummaryArgs = {
+    SummaryArgs(
       table_name = tableName
       , companyIdField = company_id
       , where = s"$company_id is not null and length($company_id) = 32 "
@@ -285,7 +42,7 @@ object CompanySummaryNg_new {
     , get_default_summary_args("company_tm")
     , get_default_summary_args("company_app_info")
 
-    , company_summary_args(table_name = "company_court_open_announcement_explode"
+    , SummaryArgs(table_name = "company_court_open_announcement_explode"
       , companyIdField = "plaintiff_info_id_explode"
       , distinctField = "rowkey,plaintiff_info_id_explode"
       , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 32 "
@@ -294,7 +51,7 @@ object CompanySummaryNg_new {
         , ("1", s"company_court_open_announcement_del_1_plaintiff")
       ))
     )
-    , company_summary_args(table_name = "company_court_open_announcement_explode"
+    , SummaryArgs(table_name = "company_court_open_announcement_explode"
       , companyIdField = "defendant_info_id_explode"
       , distinctField = "rowkey,defendant_info_id_explode"
       , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 32 "
@@ -304,7 +61,7 @@ object CompanySummaryNg_new {
       ))
     )
 
-    , company_summary_args(table_name = "company_dishonest_info"
+    , SummaryArgs(table_name = "company_dishonest_info"
       , companyIdField = "keyno"
       , where = "keyno is not null and length(keyno) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
@@ -312,7 +69,7 @@ object CompanySummaryNg_new {
         , ("1", s"company_dishonest_info_del_1")
       ))
     )
-    , company_summary_args(table_name = "company_zxr_restrict"
+    , SummaryArgs(table_name = "company_zxr_restrict"
       , companyIdField = "company_id"
       , where = "company_id is not null and length(company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
@@ -321,7 +78,7 @@ object CompanySummaryNg_new {
       ))
     )
 
-    , company_summary_args(table_name = "company_abnormal_info"
+    , SummaryArgs(table_name = "company_abnormal_info"
       , companyIdField = "company_id"
       , where = "company_id is not null and length(company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
@@ -331,7 +88,7 @@ object CompanySummaryNg_new {
     )
 
     //todo
-    , company_summary_args(table_name = "company_public_announcement"
+    , SummaryArgs(table_name = "company_public_announcement"
       , companyIdField = "company_id"
       , where = "company_id is not null and length(company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
@@ -339,7 +96,7 @@ object CompanySummaryNg_new {
         , ("1", s"company_public_announcement_del_1")
       ))
     )
-    , company_summary_args(table_name = "company_illegal_info"
+    , SummaryArgs(table_name = "company_illegal_info"
       , companyIdField = "company_id"
       , where = "company_id is not null and length(company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
@@ -349,7 +106,7 @@ object CompanySummaryNg_new {
     )
 
 
-    , company_summary_args(table_name = "company_land_mortgage"
+    , SummaryArgs(table_name = "company_land_mortgage"
       , companyIdField = "mortgagor_company_id"
       , where = "mortgagor_company_id is not null and length(mortgagor_company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
@@ -359,7 +116,7 @@ object CompanySummaryNg_new {
     )
 
 
-    , company_summary_args(table_name = "company_land_mortgage"
+    , SummaryArgs(table_name = "company_land_mortgage"
       , companyIdField = "mortgagee_company_id"
       , where = "mortgagee_company_id is not null and length(mortgagee_company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
@@ -369,7 +126,7 @@ object CompanySummaryNg_new {
     )
 
 
-    , company_summary_args(table_name = "company_judicial_assistance"
+    , SummaryArgs(table_name = "company_judicial_assistance"
       , companyIdField = "company_id"
       , where = "company_id is not null and length(company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
@@ -378,7 +135,7 @@ object CompanySummaryNg_new {
       ))
     )
 
-    , company_summary_args(table_name = "company_judicial_assistance"
+    , SummaryArgs(table_name = "company_judicial_assistance"
       , companyIdField = "executed_person_id"
       , where = "executed_person_id is not null and length(executed_person_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
@@ -388,7 +145,7 @@ object CompanySummaryNg_new {
     )
 
 
-    , company_summary_args(table_name = "company_equity_info"
+    , SummaryArgs(table_name = "company_equity_info"
       , companyIdField = "related_company_id"
       , where = "related_company_id is not null and length(related_company_id) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
@@ -396,7 +153,7 @@ object CompanySummaryNg_new {
         , ("1", s"company_equity_info_del_1_related")
       ))
     )
-    , company_summary_args(table_name = "company_equity_info_explode"
+    , SummaryArgs(table_name = "company_equity_info_explode"
       , companyIdField = "pledgor_keyno_explode"
       , distinctField = "rowkey,pledgor_keyno_explode"
       , where = "pledgor_keyno_explode is not null and length(pledgor_keyno_explode) = 32 "
@@ -405,7 +162,7 @@ object CompanySummaryNg_new {
         , ("1", s"company_equity_info_del_1_pledgor")
       ))
     )
-    , company_summary_args(table_name = "company_equity_info_explode"
+    , SummaryArgs(table_name = "company_equity_info_explode"
       , companyIdField = "pledgee_keyno_explode"
       , distinctField = "rowkey,pledgee_keyno_explode"
       , where = "pledgee_keyno_explode is not null and length(pledgee_keyno_explode) = 32 "
@@ -416,7 +173,7 @@ object CompanySummaryNg_new {
     )
 
 
-    , company_summary_args(table_name = "company_lawsuit_explode"
+    , SummaryArgs(table_name = "company_lawsuit_explode"
       , companyIdField = "plaintiff_info_id_explode"
       , distinctField = "rowkey,plaintiff_info_id_explode"
       , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 32 "
@@ -425,7 +182,7 @@ object CompanySummaryNg_new {
         , ("1", s"company_lawsuit_del_1_plaintiff")
       ))
     )
-    , company_summary_args(table_name = "company_lawsuit_explode"
+    , SummaryArgs(table_name = "company_lawsuit_explode"
       , companyIdField = "defendant_info_id_explode"
       , distinctField = "rowkey,defendant_info_id_explode"
       , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 32 "
@@ -436,8 +193,7 @@ object CompanySummaryNg_new {
     )
 
 
-
-    , company_summary_args(table_name = "company_court_announcement_explode"
+    , SummaryArgs(table_name = "company_court_announcement_explode"
       , companyIdField = "plaintiff_info_id_explode"
       , distinctField = "rowkey,plaintiff_info_id_explode"
       , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 32 "
@@ -446,7 +202,7 @@ object CompanySummaryNg_new {
         , ("1", s"company_court_announcement_del_1_plaintiff")
       ))
     )
-    , company_summary_args(table_name = "company_court_announcement_explode"
+    , SummaryArgs(table_name = "company_court_announcement_explode"
       , companyIdField = "litigant_info_id_explode"
       , distinctField = "rowkey,litigant_info_id_explode"
       , where = "litigant_info_id_explode is not null and length(litigant_info_id_explode) = 32 "
@@ -455,7 +211,7 @@ object CompanySummaryNg_new {
         , ("1", s"company_court_announcement_del_1_litigant")
       ))
     )
-    , company_summary_args(table_name = "company_send_announcement_explode"
+    , SummaryArgs(table_name = "company_send_announcement_explode"
       , companyIdField = "plaintiff_info_id_explode"
       , distinctField = "rowkey,plaintiff_info_id_explode"
       , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 32 "
@@ -464,7 +220,7 @@ object CompanySummaryNg_new {
         , ("1", s"company_send_announcement_del_1_plaintiff")
       ))
     )
-    , company_summary_args(table_name = "company_send_announcement_explode"
+    , SummaryArgs(table_name = "company_send_announcement_explode"
       , companyIdField = "defendant_info_id_explode"
       , distinctField = "rowkey,defendant_info_id_explode"
       , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 32 "
@@ -472,7 +228,7 @@ object CompanySummaryNg_new {
         ("0", s"company_send_announcement_del_0_defendant")
         , ("1", s"company_send_announcement_del_1_defendant")
       ))
-    ), company_summary_args(table_name = "company_zxr_final_case"
+    ), SummaryArgs(table_name = "company_zxr_final_case"
       , companyIdField = "keyno"
       , where = "keyno is not null and length(keyno) = 32 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
@@ -481,7 +237,7 @@ object CompanySummaryNg_new {
       ))
     )
 
-    , company_summary_args(table_name = "company_court_register_explode"
+    , SummaryArgs(table_name = "company_court_register_explode"
       , companyIdField = "plaintiff_info_id_explode"
       , distinctField = "rowkey,plaintiff_info_id_explode"
       , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 32 "
@@ -491,7 +247,7 @@ object CompanySummaryNg_new {
       ))
     )
 
-    , company_summary_args(table_name = "company_court_register_explode"
+    , SummaryArgs(table_name = "company_court_register_explode"
       , companyIdField = "defendant_info_id_explode"
       , distinctField = "rowkey,defendant_info_id_explode"
       , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 32 "
@@ -503,7 +259,7 @@ object CompanySummaryNg_new {
 
     // ====================================================
 
-    , company_summary_args(table_name = "bankruptcy_open_case_explode"
+    , SummaryArgs(table_name = "bankruptcy_open_case_explode"
       , companyIdField = "applicant_info_id_explode"
       , distinctField = "rowkey,applicant_info_id_explode"
       , where = "applicant_info_id_explode is not null and length(applicant_info_id_explode) = 32 "
@@ -513,7 +269,7 @@ object CompanySummaryNg_new {
       ))
     )
 
-    , company_summary_args(table_name = "bankruptcy_open_case_explode"
+    , SummaryArgs(table_name = "bankruptcy_open_case_explode"
       , companyIdField = "respondent_info_id_explode"
       , distinctField = "rowkey,respondent_info_id_explode"
       , where = "respondent_info_id_explode is not null and length(respondent_info_id_explode) = 32 "
@@ -541,7 +297,7 @@ object CompanySummaryNg_new {
     , get_default_summary_args("company_brief_cancel_announcement", "company_id")
     , get_default_summary_args("company_double_random_check_info", "company_id")
 
-    , company_summary_args(table_name = "auction_tracking_explode"
+    , SummaryArgs(table_name = "auction_tracking_explode"
       , companyIdField = "company_info_id_explode"
       , distinctField = "rowkey,company_info_id_explode"
       , where = "company_info_id_explode is not null and length(company_info_id_explode) = 32 "
@@ -564,7 +320,7 @@ object CompanySummaryNg_new {
     )
 
     val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
-    CompanySummaryNg_new(s = spark, project = "winhc_ng", args = start_args).calc()
+    WinhcNgSummary_new(s = spark, project = "winhc_ng", target_tab = "winhc_ng.out_es_summary", args = start_args).calc()
     spark.stop()
   }
 }

+ 18 - 250
src/main/scala/com/winhc/bigdata/spark/ng/utils/PersonSummaryNg_new.scala

@@ -1,11 +1,7 @@
 package com.winhc.bigdata.spark.ng.utils
 
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
-import org.apache.commons.lang3.StringUtils
-import org.apache.spark.sql.SparkSession
-import org.apache.spark.sql.functions.{col, struct, to_json}
+import com.winhc.bigdata.spark.utils.SparkUtils
 
-import scala.annotation.meta.getter
 import scala.collection.mutable
 
 /**
@@ -13,235 +9,10 @@ import scala.collection.mutable
  * @Date: 2020/10/10 10:23
  * @Description: winhc_ng空间下摘要增强版
  */
-
-case class PersonSummaryNg_new(s: SparkSession,
-                               project: String //表所在工程名
-                               , args: Seq[company_summary_args]
-                              ) extends LoggingUtils {
-  @(transient@getter) val spark: SparkSession = s
-  private val target_tab = "winhc_ng.out_es_summary_person"
-  init()
-
-  private def init() {
-    sql(
-      s"""
-         |CREATE TABLE IF NOT EXISTS $target_tab
-         |(
-         |    company_id  STRING COMMENT '人id'
-         |    ,summary STRING COMMENT '格子中的摘要信息,json格式'
-         |    ,detail STRING COMMENT '个别维度详细的摘要信息'
-         |)
-         |COMMENT 'out es person summary,create by ${BaseUtil.nowDate(pattern = "yyyy-MM-dd HH:mm:ss")}'
-         |PARTITIONED BY (ds STRING COMMENT '分区')
-         |LIFECYCLE 15
-         |""".stripMargin)
-  }
-
-
-  private def get_table_data(arg: company_summary_args): String = {
-    val ds = getLastPartitionsOrElse(target_tab, null)
-
-    val tab = arg.table_name
-    val companyIdField = arg.companyIdField
-    val distinctField = arg.distinctField
-
-    val sortField = arg.sortField
-    val where = if (StringUtils.isEmpty(arg.where)) {
-      s""
-    } else {
-      s"AND   ${arg.where}"
-    }
-
-
-    val ads_table = s"${project}.ads_$tab" //存量ads表
-    val inc_ads_table = s"${project}.inc_ads_$tab"
-
-    val inc_ads_last_ds = getLastPartitionsOrElse(inc_ads_table, "0")
-
-    val new_cols = getColumns(ads_table).intersect(getColumns(inc_ads_table))
-
-    val ads_last_ds = getLastPartitionsOrElse(ads_table, "0")
-    val tab_tmp_view = s"${arg.winhc_hash()}_data"
-
-    if (ds == null) {
-      sql(
-        s"""
-           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
-           |FROM    $ads_table
-           |WHERE   ds = '$ads_last_ds'
-           |UNION ALL
-           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
-           |FROM    $inc_ads_table
-           |WHERE   ds > $ads_last_ds
-           |""".stripMargin)
-        .createOrReplaceTempView(tab_tmp_view)
-    } else {
-      sql(
-        s"""
-           |SELECT  ${new_cols.map(getCastCols(_, "org_tab.")).mkString(",")}
-           |FROM    (
-           |            SELECT  DISTINCT $companyIdField as xjk_cid
-           |            FROM    $inc_ads_table
-           |            WHERE   ds = $inc_ads_last_ds
-           |        ) id_table
-           |JOIN (
-           |              SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
-           |                      ,$companyIdField as xjk_cid
-           |              FROM    $inc_ads_table
-           |              WHERE   ds > '$ads_last_ds'
-           |              AND     ds < '$inc_ads_last_ds'
-           |              UNION ALL
-           |              SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
-           |                      ,$companyIdField as xjk_cid
-           |              FROM    $ads_table
-           |              WHERE   ds = '$ads_last_ds'
-           |          ) org_tab
-           |ON      id_table.xjk_cid = org_tab.xjk_cid
-           |UNION ALL
-           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
-           |FROM    $inc_ads_table
-           |WHERE   ds = $inc_ads_last_ds
-           |""".stripMargin)
-        .createOrReplaceTempView(tab_tmp_view)
-    }
-
-    val distinct_tab = s"${tab_tmp_view}_distinct"
-    sql(
-      s"""
-         |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
-         |FROM    (
-         |            SELECT  tmp.*
-         |                    ,ROW_NUMBER() OVER(PARTITION BY $distinctField ORDER BY $sortField DESC ) c
-         |            FROM    $tab_tmp_view AS tmp
-         |        ) tmp2
-         |WHERE   tmp2.c = 1
-         |$where
-         |""".stripMargin)
-      .createOrReplaceTempView(distinct_tab)
-    distinct_tab
-  }
-
-  private def get_tab_summary(arg: company_summary_args): String = {
-    val tab_hash = arg.winhc_hash()
-
-    val tab = arg.table_name
-    val companyIdField = arg.companyIdField
-
-    val result_tab = s"${tab_hash}_summary_tab"
-
-    val all_tab = get_table_data(arg)
-
-    val func_name = s"xjk_func_${tab_hash}"
-
-    val view = if (arg.groupByInfo == null) {
-      if (arg.field_prefix != null) {
-        s"arr[0] as ${arg.field_prefix}"
-      } else {
-        s"arr[0] as $tab"
-      }
-
-    } else {
-      arg.groupByInfo.value_alias.indices.map(i => {
-        s"arr[$i] as ${arg.groupByInfo.value_alias(i)._2}"
-      }).mkString(",")
-    }
-
-    //注册函数
-    if (arg.groupByInfo != null) {
-      val fieldSeq = arg.groupByInfo.value_alias.map(r => {
-        (s"${r._1}", r._2)
-      })
-
-      def getResArr(group_val: String, num: Long): Seq[Long] = {
-        val res = scala.collection.mutable.ArrayBuffer[Long]()
-        for (i <- fieldSeq) {
-          if (i._1.equals(group_val)) {
-            res += num
-          } else {
-            res += 0
-          }
-        }
-        res
-      }
-
-      spark.udf.register(func_name, getResArr _)
-    }
-
-    val groupKey_show = if (arg.groupByInfo == null) {
-      s",array(count(1)) as arr"
-    } else {
-      s",$func_name(cast(${arg.groupByInfo.field} as STRING),count(1)) as arr"
-    }
-
-    val groupKey = if (arg.groupByInfo == null) {
-      s""
-    } else {
-      s",${arg.groupByInfo.field}"
-    }
-
-    sql(
-      s"""
-         |SELECT  company_id
-         |        ,${view}
-         |FROM    (
-         |        SELECT  $companyIdField as company_id
-         |                $groupKey_show
-         |        FROM    $all_tab
-         |        GROUP BY $companyIdField ${groupKey}
-         |)
-         |""".stripMargin)
-      .createOrReplaceTempView(result_tab)
-
-    if (arg.groupByInfo != null) {
-      sql(
-        s"""
-           |SELECT  company_id
-           |        ,${arg.groupByInfo.value_alias.map(_._2).map(f => s"sum($f) as $f").mkString(",")}
-           |FROM    $result_tab
-           |GROUP BY company_id
-           |""".stripMargin)
-        .createOrReplaceTempView(result_tab)
-    }
-    result_tab
-  }
-
-
-  def calc(): Unit = {
-    val summary_tab = "summary_tab_xjk"
-    val summary_tabs = args.map(get_tab_summary).seq
-    val merge = merge_table(spark, summary_tabs, "company_id")
-    merge.calc(summary_tab)
-    val cols = getColumns(summary_tab).diff(Seq("company_id"))
-
-    sql(
-      s"""
-         |select * from $summary_tab
-         |""".stripMargin)
-      .withColumn("summary", to_json(struct(cols.map(col): _*))).createTempView("xjk_tmp_summary_tab")
-
-    sql(
-      s"""
-         |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}')
-         |SELECT company_id,summary,null as detail
-         |FROM
-         |    xjk_tmp_summary_tab
-         |""".stripMargin)
-    merge.drop()
-  }
-
-  private def getCastCols(name: String, pre: String): String = {
-    val list = List("cid", "new_cid", "ncid")
-    if (list.contains(name)) {
-      return s"CAST(${pre}${name} as BIGINT) $name"
-    }
-    pre + name
-  }
-}
-
 object PersonSummaryNg_new {
 
-  private def get_default_summary_args(tableName: String, person_id: String): company_summary_args = {
-    company_summary_args(
+  private def get_default_summary_args(tableName: String, person_id: String): SummaryArgs = {
+    SummaryArgs(
       table_name = tableName
       , companyIdField = person_id
       , where = s"$person_id is not null and length($person_id) = 33 "
@@ -256,14 +27,13 @@ object PersonSummaryNg_new {
     get_default_summary_args("company_zxr", "keyno")
     , get_default_summary_args("company_dishonest_info", "keyno")
     , get_default_summary_args("company_zxr_restrict", "pid")
-//    , get_default_summary_args("company_judicial_assistance", "executed_person_id")
+    //    , get_default_summary_args("company_judicial_assistance", "executed_person_id")
     , get_default_summary_args("restrictions_on_exit", "limited_person_pid")
     , get_default_summary_args("company_zxr_final_case", "keyno")
     , get_default_summary_args("zxr_evaluate_results", "keyno")
 
 
-
-    , company_summary_args(table_name = "company_judicial_assistance"
+    , SummaryArgs(table_name = "company_judicial_assistance"
       , companyIdField = "executed_person_id"
       , where = "executed_person_id is not null and length(executed_person_id) = 33 "
       , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
@@ -273,7 +43,7 @@ object PersonSummaryNg_new {
     )
 
 
-    , company_summary_args(table_name = "company_court_open_announcement_explode"
+    , SummaryArgs(table_name = "company_court_open_announcement_explode"
       , companyIdField = "plaintiff_info_id_explode"
       , distinctField = "rowkey,plaintiff_info_id_explode"
       , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 33"
@@ -282,7 +52,7 @@ object PersonSummaryNg_new {
         , ("1", s"company_court_open_announcement_del_1_plaintiff")
       ))
     )
-    , company_summary_args(table_name = "company_court_open_announcement_explode"
+    , SummaryArgs(table_name = "company_court_open_announcement_explode"
       , companyIdField = "defendant_info_id_explode"
       , distinctField = "rowkey,defendant_info_id_explode"
       , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 33 "
@@ -293,7 +63,7 @@ object PersonSummaryNg_new {
     )
 
 
-    , company_summary_args(table_name = "company_lawsuit_explode"
+    , SummaryArgs(table_name = "company_lawsuit_explode"
       , companyIdField = "plaintiff_info_id_explode"
       , distinctField = "rowkey,plaintiff_info_id_explode"
       , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 33 "
@@ -302,7 +72,7 @@ object PersonSummaryNg_new {
         , ("1", s"company_lawsuit_del_1_plaintiff")
       ))
     )
-    , company_summary_args(table_name = "company_lawsuit_explode"
+    , SummaryArgs(table_name = "company_lawsuit_explode"
       , companyIdField = "defendant_info_id_explode"
       , distinctField = "rowkey,defendant_info_id_explode"
       , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 33 "
@@ -313,8 +83,7 @@ object PersonSummaryNg_new {
     )
 
 
-
-    , company_summary_args(table_name = "company_court_announcement_explode"
+    , SummaryArgs(table_name = "company_court_announcement_explode"
       , companyIdField = "plaintiff_info_id_explode"
       , distinctField = "rowkey,plaintiff_info_id_explode"
       , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 33"
@@ -323,7 +92,7 @@ object PersonSummaryNg_new {
         , ("1", s"company_court_announcement_del_1_plaintiff")
       ))
     )
-    , company_summary_args(table_name = "company_court_announcement_explode"
+    , SummaryArgs(table_name = "company_court_announcement_explode"
       , companyIdField = "litigant_info_id_explode"
       , distinctField = "rowkey,litigant_info_id_explode"
       , where = "litigant_info_id_explode is not null and length(litigant_info_id_explode) = 33"
@@ -333,7 +102,7 @@ object PersonSummaryNg_new {
       ))
     )
 
-    , company_summary_args(table_name = "company_court_register_explode"
+    , SummaryArgs(table_name = "company_court_register_explode"
       , companyIdField = "plaintiff_info_id_explode"
       , distinctField = "rowkey,plaintiff_info_id_explode"
       , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 33 "
@@ -343,7 +112,7 @@ object PersonSummaryNg_new {
       ))
     )
 
-    , company_summary_args(table_name = "company_court_register_explode"
+    , SummaryArgs(table_name = "company_court_register_explode"
       , companyIdField = "defendant_info_id_explode"
       , distinctField = "rowkey,defendant_info_id_explode"
       , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 33 "
@@ -352,7 +121,7 @@ object PersonSummaryNg_new {
         , ("1", s"company_court_register_del_1_defendant")
       ))
     )
-    , company_summary_args(table_name = "company_send_announcement_explode"
+    , SummaryArgs(table_name = "company_send_announcement_explode"
       , companyIdField = "plaintiff_info_id_explode"
       , distinctField = "rowkey,plaintiff_info_id_explode"
       , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 33 "
@@ -361,7 +130,7 @@ object PersonSummaryNg_new {
         , ("1", s"company_send_announcement_del_1_plaintiff")
       ))
     )
-    , company_summary_args(table_name = "company_send_announcement_explode"
+    , SummaryArgs(table_name = "company_send_announcement_explode"
       , companyIdField = "defendant_info_id_explode"
       , distinctField = "rowkey,defendant_info_id_explode"
       , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 33 "
@@ -372,7 +141,7 @@ object PersonSummaryNg_new {
     )
 
 
-    , company_summary_args(table_name = "company_equity_info_explode"
+    , SummaryArgs(table_name = "company_equity_info_explode"
       , companyIdField = "pledgor_keyno_explode"
       , distinctField = "rowkey,pledgor_keyno_explode"
       , where = "pledgor_keyno_explode is not null and length(pledgor_keyno_explode) = 33 "
@@ -381,7 +150,7 @@ object PersonSummaryNg_new {
         , ("1", s"company_equity_info_del_1_pledgor")
       ))
     )
-    , company_summary_args(table_name = "company_equity_info_explode"
+    , SummaryArgs(table_name = "company_equity_info_explode"
       , companyIdField = "pledgee_keyno_explode"
       , distinctField = "rowkey,pledgee_keyno_explode"
       , where = "pledgee_keyno_explode is not null and length(pledgee_keyno_explode) = 33 "
@@ -396,7 +165,6 @@ object PersonSummaryNg_new {
 
 
   def main(args: Array[String]): Unit = {
-
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_ng",
       "spark.debug.maxToStringFields" -> "200",
@@ -404,7 +172,7 @@ object PersonSummaryNg_new {
     )
 
     val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
-    PersonSummaryNg_new(s = spark, project = "winhc_ng", args = start_args).calc()
+    WinhcNgSummary_new(s = spark, project = "winhc_ng", target_tab = "winhc_ng.out_es_summary_person", args = start_args).calc()
     spark.stop()
   }
 }

+ 54 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/ReimportSummary.scala

@@ -0,0 +1,54 @@
+package com.winhc.bigdata.spark.ng.utils
+
+import com.winhc.bigdata.spark.ng.jobs.{args_company_job, general_handler}
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/4/29 10:53
+ */
+case class ReimportSummary(s: SparkSession
+                           , tabs: Seq[String]
+                          ) extends LoggingUtils {
+  @(transient@getter) val spark: SparkSession = s
+
+
+  private def re_explode(tn: String): Unit = {
+    val ads_explode = s"winhc_ng.ads_${tn}_explode"
+    val inc_ads_explode = s"winhc_ng.inc_ads_${tn}_explode"
+
+
+    sql(s"ALTER TABLE $ads_explode DROP IF EXISTS PARTITION(ds>'0')")
+    sql(s"ALTER TABLE $inc_ads_explode DROP IF EXISTS PARTITION(ds>'0')")
+
+    general_handler(s = spark
+      , project = "winhc_ng"
+      , job_args = args_company_job.get_args_company_job(tn))
+      .explode_calc()
+  }
+
+  def calc(): Unit = {
+    for (e <- tabs) {
+      re_explode(e)
+    }
+  }
+}
+
+object ReimportSummary {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_ng"
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName + ":reimport", config)
+    val tns = Seq("")
+    ReimportSummary(spark, tns).calc()
+    spark.stop()
+  }
+}

+ 108 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/StartAndEndDsUtils.scala

@@ -0,0 +1,108 @@
+package com.winhc.bigdata.spark.ng.utils
+
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/2/27 09:23
+ */
+
+case class StartAndEndDsArgs(inc: Boolean
+                             , target_ds: String
+                             , inc_tab_gt_ds: String
+                            )
+
+case class StartAndEndDsUtils(s: SparkSession
+                             ) extends LoggingUtils {
+  @(transient@getter) val spark: SparkSession = s
+
+
+  /**
+   * 两个表写入两个表中
+   *
+   * @param org_tab
+   * @param inc_org_tab
+   * @param target_tab
+   * @param inc_target_tab
+   */
+  def get_start_and_end_args(org_tab: String, inc_org_tab: String, target_tab: String, inc_target_tab: String): StartAndEndDsArgs = {
+
+    def all(): StartAndEndDsArgs = {
+      val target_ds = getLastPartitionsOrElse(inc_org_tab, getLastPartitionsOrElse(org_tab, null))
+      if (target_ds == null) {
+        throw new RuntimeException("全量来源表为空 !")
+      }
+      StartAndEndDsArgs(false, target_ds = target_ds, null)
+    }
+
+
+    def inc(): StartAndEndDsArgs = {
+      var gt_ds = ""
+      var target_ds = ""
+
+
+      val inc_org_tab_ds = getLastPartitionsOrElse(inc_org_tab, null)
+      val inc_target_tab_ds = getLastPartitionsOrElse(inc_target_tab, null)
+      val target_tab_ds = getLastPartitionsOrElse(target_tab, null)
+      if (inc_org_tab_ds == null) {
+        return all()
+      }
+
+      target_ds = inc_org_tab_ds
+
+      if (inc_target_tab_ds == null) {
+        gt_ds = target_ds
+      } else {
+        gt_ds = inc_target_tab_ds
+      }
+
+      if (gt_ds.equals(target_ds)) {
+        val inc_target_sec_ds = getSecondLastPartitionOrElse(inc_target_tab, null)
+        if (inc_target_sec_ds == null) {
+          gt_ds = target_tab_ds
+        } else {
+          gt_ds = inc_target_sec_ds
+        }
+      }
+      StartAndEndDsArgs(true, target_ds = target_ds, gt_ds)
+    }
+
+
+    val ads_last_ds = getLastPartitionsOrElse(target_tab, null)
+    if (ads_last_ds == null)
+      all()
+    else
+      inc()
+  }
+}
+
+
+object StartAndEndDsUtils {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_ng"
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val tn = "company_send_announcement"
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName + ":" + tn, config)
+
+
+    val org_tab: String = s"winhc_ng.ads_$tn"
+    val inc_org_tab: String = s"winhc_ng.inc_ads_$tn"
+    val target_tab: String = s"winhc_ng.ads_${tn}_explode"
+    val inc_target_tab: String = s"winhc_ng.inc_ads_${tn}_explode"
+
+
+    val a = StartAndEndDsUtils(spark).get_start_and_end_args(org_tab, inc_org_tab, target_tab, inc_target_tab)
+    println(a)
+
+
+    spark.stop()
+  }
+}

+ 23 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/SummaryArgs.scala

@@ -0,0 +1,23 @@
+package com.winhc.bigdata.spark.ng.utils
+
+import com.winhc.bigdata.spark.utils.BaseUtil
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/4/28 17:26
+ */
+
+case class GroupByInfoNg(field: String, value_alias: Seq[(String, String)])
+
+case class SummaryArgs(table_name: String
+                       , companyIdField: String
+                       , field_prefix: String = null //如果没有 groupByInfo的话可使用此别名,默认为table_name
+                       , distinctField: String = "rowkey"
+                       , groupByInfo: GroupByInfoNg = null
+                       , where: String = ""
+                       , sortField: String = "ds"
+                      ) {
+  def winhc_hash(): String = {
+    BaseUtil.cleanup(s"${table_name}_${companyIdField}").replaceAll("[\\(\\)() ]", "")
+  }
+}

+ 236 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/WinhcNgSummary_new.scala

@@ -0,0 +1,236 @@
+package com.winhc.bigdata.spark.ng.utils
+
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions.{col, struct, to_json}
+
+import scala.annotation.meta.getter
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/4/28 17:35
+ */
+case class WinhcNgSummary_new(s: SparkSession,
+                              project: String //表所在工程名
+                              , target_tab: String
+                              , args: Seq[SummaryArgs]
+                             ) extends LoggingUtils {
+  @(transient@getter) val spark: SparkSession = s
+  init()
+
+  private def init() {
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS $target_tab
+         |(
+         |    company_id  STRING COMMENT '公司或人的id'
+         |    ,summary STRING COMMENT '格子中的摘要信息,json格式'
+         |    ,detail STRING COMMENT '个别维度详细的摘要信息'
+         |)
+         |COMMENT 'out es summary,create by ${BaseUtil.nowDate(pattern = "yyyy-MM-dd HH:mm:ss")}'
+         |PARTITIONED BY (ds STRING COMMENT '分区')
+         |LIFECYCLE 15
+         |""".stripMargin)
+  }
+
+
+  private def get_table_data(arg: SummaryArgs): String = {
+    val ds = getLastPartitionsOrElse(target_tab, null)
+
+    val tab = arg.table_name
+    val companyIdField = arg.companyIdField
+    val distinctField = arg.distinctField
+
+    val sortField = arg.sortField
+    val where = if (StringUtils.isEmpty(arg.where)) {
+      s""
+    } else {
+      s"AND   ${arg.where}"
+    }
+
+
+    val ads_table = s"${project}.ads_$tab" //存量ads表
+    val inc_ads_table = s"${project}.inc_ads_$tab"
+
+    val inc_ads_last_ds = getLastPartitionsOrElse(inc_ads_table, "0")
+
+    val new_cols = getColumns(ads_table).intersect(getColumns(inc_ads_table))
+
+    val ads_last_ds = getLastPartitionsOrElse(ads_table, "0")
+    val tab_tmp_view = s"${arg.winhc_hash()}_data"
+
+    if (ds == null) {
+      sql(
+        s"""
+           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |FROM    $ads_table
+           |WHERE   ds = '$ads_last_ds'
+           |UNION ALL
+           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |FROM    $inc_ads_table
+           |WHERE   ds > $ads_last_ds
+           |""".stripMargin)
+        .createOrReplaceTempView(tab_tmp_view)
+    } else {
+      sql(
+        s"""
+           |SELECT  ${new_cols.map(getCastCols(_, "org_tab.")).mkString(",")}
+           |FROM    (
+           |            SELECT  DISTINCT $companyIdField as xjk_cid
+           |            FROM    $inc_ads_table
+           |            WHERE   ds = $inc_ads_last_ds
+           |        ) id_table
+           |JOIN (
+           |              SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |                      ,$companyIdField as xjk_cid
+           |              FROM    $inc_ads_table
+           |              WHERE   ds > '$ads_last_ds'
+           |              AND     ds < '$inc_ads_last_ds'
+           |              UNION ALL
+           |              SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |                      ,$companyIdField as xjk_cid
+           |              FROM    $ads_table
+           |              WHERE   ds = '$ads_last_ds'
+           |          ) org_tab
+           |ON      id_table.xjk_cid = org_tab.xjk_cid
+           |UNION ALL
+           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |FROM    $inc_ads_table
+           |WHERE   ds = $inc_ads_last_ds
+           |""".stripMargin)
+        .createOrReplaceTempView(tab_tmp_view)
+    }
+
+    val distinct_tab = s"${tab_tmp_view}_distinct"
+    sql(
+      s"""
+         |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+         |FROM    (
+         |            SELECT  tmp.*
+         |                    ,ROW_NUMBER() OVER(PARTITION BY $distinctField ORDER BY $sortField DESC ) c
+         |            FROM    $tab_tmp_view AS tmp
+         |        ) tmp2
+         |WHERE   tmp2.c = 1
+         |$where
+         |""".stripMargin)
+      .createOrReplaceTempView(distinct_tab)
+    distinct_tab
+  }
+
+  private def get_tab_summary(arg: SummaryArgs): String = {
+    val tab_hash = arg.winhc_hash()
+
+    val tab = arg.table_name
+    val companyIdField = arg.companyIdField
+
+    val result_tab = s"${tab_hash}_summary_tab"
+
+    val all_tab = get_table_data(arg)
+
+    val func_name = s"xjk_func_${tab_hash}"
+
+    val view = if (arg.groupByInfo == null) {
+      if (arg.field_prefix != null) {
+        s"arr[0] as ${arg.field_prefix}"
+      } else {
+        s"arr[0] as $tab"
+      }
+
+    } else {
+      arg.groupByInfo.value_alias.indices.map(i => {
+        s"arr[$i] as ${arg.groupByInfo.value_alias(i)._2}"
+      }).mkString(",")
+    }
+
+    //注册函数
+    if (arg.groupByInfo != null) {
+      val fieldSeq = arg.groupByInfo.value_alias.map(r => {
+        (s"${r._1}", r._2)
+      })
+
+      def getResArr(group_val: String, num: Long): Seq[Long] = {
+        val res = scala.collection.mutable.ArrayBuffer[Long]()
+        for (i <- fieldSeq) {
+          if (i._1.equals(group_val)) {
+            res += num
+          } else {
+            res += 0
+          }
+        }
+        res
+      }
+
+      spark.udf.register(func_name, getResArr _)
+    }
+
+    val groupKey_show = if (arg.groupByInfo == null) {
+      s",array(count(1)) as arr"
+    } else {
+      s",$func_name(cast(${arg.groupByInfo.field} as STRING),count(1)) as arr"
+    }
+
+    val groupKey = if (arg.groupByInfo == null) {
+      s""
+    } else {
+      s",${arg.groupByInfo.field}"
+    }
+
+    sql(
+      s"""
+         |SELECT  company_id
+         |        ,${view}
+         |FROM    (
+         |        SELECT  $companyIdField as company_id
+         |                $groupKey_show
+         |        FROM    $all_tab
+         |        GROUP BY $companyIdField ${groupKey}
+         |)
+         |""".stripMargin)
+      .createOrReplaceTempView(result_tab)
+
+    if (arg.groupByInfo != null) {
+      sql(
+        s"""
+           |SELECT  company_id
+           |        ,${arg.groupByInfo.value_alias.map(_._2).map(f => s"sum($f) as $f").mkString(",")}
+           |FROM    $result_tab
+           |GROUP BY company_id
+           |""".stripMargin)
+        .createOrReplaceTempView(result_tab)
+    }
+    result_tab
+  }
+
+
+  def calc(): Unit = {
+    val summary_tab = "summary_tab_xjk"
+    val summary_tabs = args.map(get_tab_summary).seq
+    val merge = merge_table(spark, summary_tabs, "company_id")
+    merge.calc(summary_tab)
+    val cols = getColumns(summary_tab).diff(Seq("company_id"))
+
+    sql(
+      s"""
+         |select * from $summary_tab
+         |""".stripMargin)
+      .withColumn("summary", to_json(struct(cols.map(col): _*))).createTempView("xjk_tmp_summary_tab")
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}')
+         |SELECT company_id,summary,null as detail
+         |FROM
+         |    xjk_tmp_summary_tab
+         |""".stripMargin)
+    merge.drop()
+  }
+
+  private def getCastCols(name: String, pre: String): String = {
+    val list = List("cid", "new_cid", "ncid")
+    if (list.contains(name)) {
+      return s"CAST(${pre}${name} as BIGINT) $name"
+    }
+    pre + name
+  }
+}

+ 92 - 0
src/main/scala/com/winhc/bigdata/spark/utils/ElasticSearchIndexUtils.scala

@@ -0,0 +1,92 @@
+package com.winhc.bigdata.spark.utils
+
+import com.winhc.bigdata.spark.const.EnvConst
+import org.apache.http.HttpHost
+import org.apache.http.auth.{AuthScope, UsernamePasswordCredentials}
+import org.apache.http.entity.ContentType
+import org.apache.http.impl.client.BasicCredentialsProvider
+import org.apache.http.impl.nio.client.HttpAsyncClientBuilder
+import org.apache.http.nio.entity.NStringEntity
+import org.apache.http.util.EntityUtils
+import org.apache.spark.internal.Logging
+import org.elasticsearch.client.{RestClient, RestClientBuilder}
+
+import java.util.Collections
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/4/28 17:46
+ */
+case class Alias(alias: String, index: String)
+
+case class ElasticSearchIndexUtils() extends Logging {
+
+  private lazy val client = getClient("es.eci.nodes")
+
+  private def getClient(nodes: String): RestClient = {
+    val credentialsProvider = new BasicCredentialsProvider();
+    credentialsProvider.setCredentials(AuthScope.ANY,
+      new UsernamePasswordCredentials("elastic", "elastic_168"))
+    val restClient = RestClient.builder(new HttpHost(EnvConst.getEnv().getValue(nodes), 9200))
+      .setHttpClientConfigCallback(new RestClientBuilder.HttpClientConfigCallback() {
+        override def customizeHttpClient(httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder = httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
+      }).build()
+    restClient
+  }
+
+
+  def catIndices(): Seq[String] = {
+    val res = get("_cat/indices?h=index", null)
+    res.split("\n").toSeq
+  }
+
+  def catAliases(): Seq[Alias] = {
+    val res = get("_cat/aliases?h=alias,index", null)
+    res.split("\n").map(r => {
+      val s = r.split(" +")
+      Alias(s(0), s(1))
+    }).toSeq
+  }
+
+
+  def updateAliases(add: Seq[Alias], remove: Seq[Alias]): Boolean = {
+    val all = _aliases("add", add) ++ _aliases("remove", remove)
+    val body = all.mkString("""{"actions":[""", ",", """]}""")
+    try {
+      post("/_aliases", body)
+      true
+    } catch {
+      case e: Exception => logError(e.getMessage, e)
+        false
+    }
+  }
+
+
+  def get(url: String, body: String): String = action("GET", url, body)
+
+  def post(url: String, body: String): String = action("POST", url, body)
+
+  private def action(method: String, url: String, body: String): String = {
+    val entity = body == null match {
+      case true => null
+      case false => new NStringEntity(body, ContentType.APPLICATION_JSON)
+    }
+    val res = client.performRequest(
+      method,
+      url,
+      Collections.emptyMap[String, String](),
+      entity
+    )
+    return EntityUtils.toString(res.getEntity)
+  }
+
+  private def _aliases(`type`: String, list: Seq[Alias]): Seq[String] = if (list == null) Seq.empty else list.map(s => s"""{ "${`type`}": { "index": "${s.index}", "alias": "${s.alias}" }}""")
+}
+
+object ElasticSearchIndexUtils {
+
+  def main(args: Array[String]): Unit = {
+    val l = ElasticSearchIndexUtils().catIndices()
+    println(l.mkString("\n"))
+  }
+}