Переглянути джерело

feat: 新流程下的任务

许家凯 3 роки тому
батько
коміт
40641be892

+ 14 - 0
src/main/scala/com/winhc/bigdata/spark/ng/jobs/args_company_job.scala

@@ -1,5 +1,7 @@
 package com.winhc.bigdata.spark.ng.jobs
 
+import com.winhc.bigdata.spark.ng.utils.explode_args
+
 /**
  * @author: XuJiakai
  * @date: 2021/1/14 19:21
@@ -10,6 +12,8 @@ case class args_company_job(tableName: String
                             , is_super_filter: Boolean = true //是否不允许主键都为空
                             , where: String = "" // ods层数据的过滤条件
                             , id_user_defined_rowkey: Boolean = false //是否读取ods层的用户自定义rowkey
+                            , explode_args: Seq[explode_args] = null //如果表需要炸开,则配制此项
+                            , export_2_es_fields: Seq[String] = null
                            )
 
 object args_company_job {
@@ -19,6 +23,16 @@ object args_company_job {
     , args_company_job("company_holder", Seq("holder_name"), rowkey_udf = "concat_ws('_',company_id,md5(cleanup(concat_ws('',holder_switch_rowkey(holder_type,holder_id,holder_name)))))", where = " holder_name is not null and trim(holder_name) <> '' AND ( not (holder_type = 2 AND length(holder_id) <> 32)) ")
     , args_company_job("company_icp", Seq("liscense", "domain"))
     , args_company_job("company_tm", Seq("reg_no"))
+
+    , args_company_job("company_court_open_announcement", Seq("case_no", "start_date")
+      , rowkey_udf = "md5(cleanup(concat_ws('',case_no_trim(case_no),split_date(cast(start_date as String)) )))"
+      , is_super_filter = false
+      , where = "case_no_trim(case_no) is not null"
+      , explode_args = Seq(
+        explode_args("plaintiff_info", "$.litigant_id", "plaintiff_info_id_explode")
+        , explode_args("defendant_info", "$.litigant_id", "defendant_info_id_explode")
+      )
+    )
   )
 
   def get_args_company_job(tn: String): args_company_job = {

+ 71 - 22
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -1,9 +1,10 @@
 package com.winhc.bigdata.spark.ng.jobs
 
 import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.ng.utils.explode_tab
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{DateUtils, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 
@@ -46,6 +47,7 @@ case class general_handler(s: SparkSession,
 
   private def reg_udf(): Unit = {
     cleanup()
+    case_no_trim_udf()
 
     def replace_rowkey(old_rowkey: String, new_rowkey: String): String = {
       if (StringUtils.isEmpty(old_rowkey)) {
@@ -65,6 +67,9 @@ case class general_handler(s: SparkSession,
 
     spark.udf.register("holder_switch_rowkey", company_holder_rowkey _)
 
+    spark.udf.register("split_date", DateUtils.splitDate _)
+
+
   }
 
   private def get_rowkey_udf(): String = {
@@ -72,38 +77,46 @@ case class general_handler(s: SparkSession,
       return job_args.rowkey_udf
     }
 
-    md5_fields.isEmpty match {
-      case true => s"company_id"
-      case false => s"concat_ws('_',company_id,md5(cleanup(concat_ws('',${md5_fields.mkString(",")}))))"
+    if (md5_fields.isEmpty) {
+      s"company_id"
+    } else {
+      s"concat_ws('_',company_id,md5(cleanup(concat_ws('',${md5_fields.mkString(",")}))))"
     }
   }
 
   private def get_clean_up(): String = {
     s"""
-       |company_id <> '0'
-       |AND company_id is not null
-       |AND trim(company_id) <> ''
        |${
-      StringUtils.isEmpty(where) match {
-        case true => ""
-        case false => s"AND ($where)"
-      }
+      if (inter_cols.contains("company_id"))
+        s"""
+           |AND company_id <> '0'
+           |AND company_id is not null
+           |AND trim(company_id) <> ''
+           |""".stripMargin
+      else
+        s""
     }
        |${
-      is_super_filter match {
-        //每一个去重字段都不允许为空
-        case true => s"AND ${md5_fields.map(" " + _ + " is not null ").mkString("AND")}"
-        case false => ""
-      }
+      if (StringUtils.isEmpty(where))
+        ""
+      else
+        s"AND ($where)"
+    }
+       |${
+      if (is_super_filter)
+        s"AND ${md5_fields.map(" " + _ + " is not null ").mkString("AND")}"
+      else
+        ""
     }
        |AND trim(concat_ws('',${md5_fields.mkString(",")})) <> ''
        |""".stripMargin
   }
 
   private def get_partition_order_by(): String = {
-    inter_cols.contains("update_time") match {
-      case true => " ds DESC,update_time DESC "
-      case false => " ds DESC "
+    if (inter_cols.contains("update_time")) {
+      " ds DESC,update_time DESC "
+    } else {
+      " ds DESC "
     }
   }
 
@@ -146,17 +159,18 @@ case class general_handler(s: SparkSession,
          |                                ,${inter_cols.mkString(",")}
          |                        FROM    $ods_tab
          |                        WHERE   ds > 0
-         |                        AND     $clean_up
+         |                        $clean_up
          |                        UNION ALL
          |                        SELECT  $rowkey_f as rowkey
          |                                ,${inter_cols.mkString(",")}
          |                        FROM    $inc_ods_tab
          |                        WHERE   ds > 0
-         |                        AND     $clean_up
+         |                        $clean_up
          |                    ) AS t1
          |        ) AS t2
          |WHERE   t2.num = 1
          |""".stripMargin)
+    explode_calc(ads_tab, inc_ods_ds)
   }
 
   def inc(): Unit = {
@@ -211,13 +225,14 @@ case class general_handler(s: SparkSession,
          |                                ,${inter_cols.mkString(",")}
          |                        FROM    $inc_ods_tab
          |                        WHERE   ds > $org_ds
-         |                        AND     $clean_up
+         |                        $clean_up
          |                    ) AS t1
          |        ) AS t2
          |WHERE   t2.num = 1
          |""".stripMargin)
 
     addEmptyPartitionOrSkip(inc_ads_tab, target_ds)
+    explode_calc(inc_ads_tab, target_ds)
   }
 
 
@@ -228,6 +243,40 @@ case class general_handler(s: SparkSession,
     else
       inc()
   }
+
+
+  private def explode_calc(org_tab: String, ds: String): Unit = {
+    if (job_args.explode_args == null) {
+      return
+    }
+
+    val all_date_tmp_view = s"insert_${tn}"
+    val all_date_explode_tmp_view = s"${all_date_tmp_view}_explode"
+
+    val explode_tab_name = s"${org_tab}_explode"
+
+    sql(
+      s"""
+         |select * from $org_tab where ds = '$ds'
+         |""".stripMargin)
+      .createTempView(all_date_tmp_view)
+
+    explode_tab(spark, all_date_tmp_view, job_args.explode_args)
+      .calc(all_date_explode_tmp_view)
+
+    if (!spark.catalog.tableExists(explode_tab_name)) {
+      //表不存在
+
+    }
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $explode_tab_name PARTITION(ds='$ds')
+         |SELECT ${getColumns(explode_tab_name).diff(Seq("ds")).mkString(",")}
+         |FROM
+         |    $all_date_explode_tmp_view
+         |""".stripMargin)
+  }
 }
 
 object general_handler {

+ 304 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/CompanySummaryNg_new.scala

@@ -0,0 +1,304 @@
+package com.winhc.bigdata.spark.ng.utils
+
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions.{col, struct, to_json}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/10/10 10:23
+ * @Description: winhc_ng空间下摘要增强版
+ */
+case class GroupByInfoNg(field: String, value_alias: Seq[(String, String)])
+
+case class company_summary_args(table_name: String
+                                , companyIdField: String
+                                , field_prefix: String = null //如果没有 groupByInfo的话可使用此别名,默认为table_name
+                                , distinctField: String = "rowkey"
+                                , groupByInfo: GroupByInfoNg = null
+                                , where: String = ""
+                                , sortField: String = "ds"
+                               ){
+  def winhc_hash(): String ={
+    BaseUtil.cleanup(s"${table_name}_${companyIdField}").replaceAll("[\\(\\)() ]","")
+  }
+}
+
+case class CompanySummaryNg_new(s: SparkSession,
+                                project: String //表所在工程名
+                                , args: Seq[company_summary_args]
+                               ) extends LoggingUtils {
+  @(transient@getter) val spark: SparkSession = s
+  private val target_tab = "winhc_ng.out_es_summary_test"
+  init()
+
+  private def init() {
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS $target_tab
+         |(
+         |    company_id  STRING COMMENT '公司id'
+         |    ,summary STRING COMMENT '格子中的摘要信息,json格式'
+         |    ,detail STRING COMMENT '个别维度详细的摘要信息'
+         |)
+         |COMMENT 'out es summary,create by ${BaseUtil.nowDate(pattern = "yyyy-MM-dd HH:mm:ss")}'
+         |PARTITIONED BY (ds STRING COMMENT '分区')
+         |LIFECYCLE 15
+         |""".stripMargin)
+  }
+
+
+  private def get_table_data(arg: company_summary_args): String = {
+    val ds = getLastPartitionsOrElse(target_tab, null)
+
+    val tab = arg.table_name
+    val companyIdField = arg.companyIdField
+    val distinctField = arg.distinctField
+
+    val sortField = arg.sortField
+    val where = if (StringUtils.isEmpty(arg.where)) {
+      s""
+    } else {
+      s"AND   ${arg.where}"
+    }
+
+
+    val ads_table = s"${project}.ads_$tab" //存量ads表
+    val inc_ads_table = s"${project}.inc_ads_$tab"
+
+    val new_cols = getColumns(ads_table).intersect(getColumns(inc_ads_table))
+
+    val ads_last_ds = getLastPartitionsOrElse(ads_table, "0")
+    val tab_tmp_view = s"${arg.winhc_hash()}_data"
+
+    if (ds == null) {
+      sql(
+        s"""
+           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |FROM    $ads_table
+           |WHERE   ds = '$ads_last_ds'
+           |UNION ALL
+           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |FROM    $inc_ads_table
+           |WHERE   ds > $ads_last_ds
+           |""".stripMargin)
+        .createOrReplaceTempView(tab_tmp_view)
+    } else {
+      sql(
+        s"""
+           |SELECT  ${new_cols.map(getCastCols(_, "org_tab.")).mkString(",")}
+           |FROM    (
+           |            SELECT  DISTINCT $companyIdField as xjk_cid
+           |            FROM    $inc_ads_table
+           |            WHERE   ds = $ds
+           |        ) id_table
+           |JOIN (
+           |              SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |                      ,$companyIdField as xjk_cid
+           |              FROM    $inc_ads_table
+           |              WHERE   ds > '$ads_last_ds'
+           |              AND     ds < '$ds'
+           |              UNION ALL
+           |              SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |                      ,$companyIdField as xjk_cid
+           |              FROM    $ads_table
+           |              WHERE   ds = '$ads_last_ds'
+           |          ) org_tab
+           |ON      id_table.xjk_cid = org_tab.xjk_cid
+           |UNION ALL
+           |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+           |FROM    $inc_ads_table
+           |WHERE   ds = $ds
+           |""".stripMargin)
+        .createOrReplaceTempView(tab_tmp_view)
+    }
+
+    val distinct_tab = s"${tab_tmp_view}_distinct"
+    sql(
+      s"""
+         |SELECT  ${new_cols.map(getCastCols(_, "")).mkString(",")}
+         |FROM    (
+         |            SELECT  tmp.*
+         |                    ,ROW_NUMBER() OVER(PARTITION BY $distinctField ORDER BY $sortField DESC ) c
+         |            FROM    $tab_tmp_view AS tmp
+         |        ) tmp2
+         |WHERE   tmp2.c = 1
+         |$where
+         |""".stripMargin)
+      .createOrReplaceTempView(distinct_tab)
+    distinct_tab
+  }
+
+  private def get_tab_summary(arg: company_summary_args): String = {
+    val tab_hash = arg.winhc_hash()
+
+    val tab = arg.table_name
+    val companyIdField = arg.companyIdField
+
+    val result_tab = s"${tab_hash}_summary_tab"
+
+    val all_tab = get_table_data(arg)
+
+    val func_name = s"xjk_func_${tab_hash}"
+
+    val view = if (arg.groupByInfo == null) {
+      if (arg.field_prefix != null) {
+        s"arr[0] as ${arg.field_prefix}"
+      } else {
+        s"arr[0] as $tab"
+      }
+
+    } else {
+      arg.groupByInfo.value_alias.indices.map(i => {
+        s"arr[$i] as ${arg.groupByInfo.value_alias(i)._2}"
+      }).mkString(",")
+    }
+
+    //注册函数
+    if (arg.groupByInfo != null) {
+      val fieldSeq = arg.groupByInfo.value_alias.map(r => {
+        (s"${r._1}", r._2)
+      })
+
+      def getResArr(group_val: String, num: Long): Seq[Long] = {
+        val res = scala.collection.mutable.ArrayBuffer[Long]()
+        for (i <- fieldSeq) {
+          if (i._1.equals(group_val)) {
+            res += num
+          } else {
+            res += 0
+          }
+        }
+        res
+      }
+
+      spark.udf.register(func_name, getResArr _)
+    }
+
+    val groupKey_show = if (arg.groupByInfo == null) {
+      s",array(count(1)) as arr"
+    } else {
+      s",$func_name(cast(${arg.groupByInfo.field} as STRING),count(1)) as arr"
+    }
+
+    val groupKey = if (arg.groupByInfo == null) {
+      s""
+    } else {
+      s",${arg.groupByInfo.field}"
+    }
+
+    sql(
+      s"""
+         |SELECT  company_id
+         |        ,${view}
+         |FROM    (
+         |        SELECT  $companyIdField as company_id
+         |                $groupKey_show
+         |        FROM    $all_tab
+         |        GROUP BY $companyIdField ${groupKey}
+         |)
+         |""".stripMargin)
+      .createOrReplaceTempView(result_tab)
+
+    if (arg.groupByInfo != null) {
+      sql(
+        s"""
+           |SELECT  company_id
+           |        ,${arg.groupByInfo.value_alias.map(_._2).map(f => s"sum($f) as $f").mkString(",")}
+           |FROM    $result_tab
+           |GROUP BY company_id
+           |""".stripMargin)
+        .createOrReplaceTempView(result_tab)
+    }
+    result_tab
+  }
+
+
+  def calc(): Unit = {
+    val summary_tab = "summary_tab_xjk"
+    val summary_tabs = args.map(get_tab_summary).seq
+    merge_table(spark, summary_tabs, "company_id").calc(summary_tab)
+
+    val cols = getColumns(summary_tab).diff(Seq("company_id"))
+
+    sql(
+      s"""
+         |select * from $summary_tab
+         |""".stripMargin)
+      .withColumn("summary", to_json(struct(cols.map(col): _*))).createTempView("xjk_tmp_summary_tab")
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='${BaseUtil.getYesterday()}')
+         |SELECT company_id,summary,null as detail
+         |FROM
+         |    xjk_tmp_summary_tab
+         |""".stripMargin)
+  }
+
+  private def getCastCols(name: String, pre: String): String = {
+    val list = List("cid", "new_cid", "ncid")
+    if (list.contains(name)) {
+      return s"CAST(${pre}${name} as BIGINT) $name"
+    }
+    pre + name
+  }
+}
+
+object CompanySummaryNg_new {
+
+  private def get_default_summary_args(tableName: String): company_summary_args = {
+    company_summary_args(
+      table_name = tableName
+      , companyIdField = "split(rowkey,'_')[0]"
+      , where = "instr(rowkey,'_') != 0"
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"${tableName}_del_0")
+        , ("1", s"${tableName}_del_1")
+      ))
+    )
+  }
+
+  private val start_args = Seq(
+    get_default_summary_args("company_holder")
+    , get_default_summary_args("company_staff")
+    , get_default_summary_args("company_icp")
+    , get_default_summary_args("company_tm")
+    , get_default_summary_args("company_app_info")
+
+    , company_summary_args(table_name = "company_court_open_announcement_explode"
+      , companyIdField = "plaintiff_info_id_explode"
+      , where = "plaintiff_info_id_explode is not null and length(plaintiff_info_id_explode) = 32 and deleted <> 9"
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_court_open_announcement_deleted_0_plaintiff")
+        , ("1", s"company_court_open_announcement_deleted_1_plaintiff")
+      ))
+    )
+    , company_summary_args(table_name = "company_court_open_announcement_explode"
+      , companyIdField = "defendant_info_id_explode"
+      , where = "defendant_info_id_explode is not null and length(defendant_info_id_explode) = 32 and deleted <> 9"
+      , groupByInfo = GroupByInfoNg(field = "deleted", value_alias = Seq(
+        ("0", s"company_court_open_announcement_deleted_0_defendant")
+        , ("1", s"company_court_open_announcement_deleted_1_defendant")
+      ))
+    )
+
+  )
+
+  def main(args: Array[String]): Unit = {
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_ng",
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
+    )
+
+    val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
+    CompanySummaryNg_new(s = spark, project = "winhc_ng", args = start_args).calc()
+    spark.stop()
+  }
+}

+ 104 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/explode_tab.scala

@@ -0,0 +1,104 @@
+package com.winhc.bigdata.spark.ng.utils
+
+import com.alibaba.fastjson.{JSON, JSONArray, JSONPath}
+import com.google.gson.{JsonElement, JsonParser}
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/2/24 10:18
+ */
+case class explode_args(org_field: String, json_path: String, alias: String, is_filter_null: Boolean = false)
+
+case class explode_tab(s: SparkSession,
+                       org_table: String, //需要炸开的来源表
+                       args: Seq[explode_args]
+                      ) extends LoggingUtils with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+
+  init()
+
+  private def is_json_str(str: String): Boolean = {
+    var jsonElement: JsonElement = null
+    try jsonElement = new JsonParser().parse(str)
+    catch {
+      case e: Exception =>
+        return false
+    }
+    if (jsonElement == null) return false
+    true
+  }
+
+  private def init(): Unit = {
+    /**
+     *
+     * @param json_array
+     * @param json_path "$.name"
+     * @return
+     */
+    def json_2_array(json_array: String, json_path: String): Seq[String] = {
+      try {
+        if (StringUtils.isEmpty(json_array)) {
+          return Seq.empty
+        }
+        if (!is_json_str(json_array)) {
+          return Seq.empty
+        }
+        JSONPath.eval(JSON.parse(json_array), json_path).asInstanceOf[JSONArray].toArray[String](Array()).toSeq.distinct.diff(Seq(""))
+      } catch {
+        case e: Exception => {
+          println(json_array)
+          Seq.empty
+        }
+      }
+    }
+
+    spark.udf.register("json_2_array", json_2_array _)
+  }
+
+
+  def calc(tmpView: String): Unit = {
+    val explode_args = args.map(f => {
+      if (f.is_filter_null) {
+        s"LATERAL VIEW explode(json_2_array(${f.org_field},'${f.json_path}')) t_${f.org_field} AS ${f.alias}"
+      } else {
+        s"LATERAL VIEW OUTER explode(json_2_array(${f.org_field},'${f.json_path}')) t_${f.org_field} AS ${f.alias}"
+      }
+    }).mkString("\n")
+
+    sql(
+      s"""
+         | SELECT  *
+         | FROM    $org_table
+         | $explode_args
+         |""".stripMargin)
+      .createTempView(tmpView)
+  }
+}
+
+object explode_tab {
+  def main(args: Array[String]): Unit = {
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_ng",
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getName, config)
+
+    val ags = Seq(
+      explode_args("plaintiff_info", "$.litigant_id", "plaintiff_info_id_explode")
+      , explode_args("defendant_info", "$.litigant_id", "defendant_info_id_explode")
+    )
+
+    explode_tab(spark, "winhc_ng.ads_company_court_open_announcement", ags).calc("abc")
+    spark.sql("select * from abc").show()
+    spark.stop()
+  }
+}

+ 143 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/export_2_es.scala

@@ -0,0 +1,143 @@
+package com.winhc.bigdata.spark.ng.utils
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/2/24 17:32
+ */
+
+case class export_2_es_args(tn: String, out_fields: Seq[String])
+
+case class export_2_es(s: SparkSession,
+                       export_args: export_2_es_args
+                      ) extends LoggingUtils with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+
+  private val target_tab = s"winhc_ng.out_index_es_${export_args.tn}"
+
+
+  private val cols = (export_args.out_fields :+ "deleted").distinct
+
+  init()
+
+  private def init(): Unit = {
+    if (spark.catalog.tableExists(target_tab)) {
+      val source_tab_cols = getColumns(target_tab).diff(Seq("ds"))
+      if (source_tab_cols.diff(cols).nonEmpty || cols.diff(source_tab_cols).nonEmpty) {
+        sql(
+          s"""
+             |DROP TABLE IF EXISTS $target_tab
+             |""".stripMargin)
+      }
+    }
+
+    if (!spark.catalog.tableExists(target_tab)) {
+      sql(
+        s"""
+           |CREATE TABLE IF NOT EXISTS $target_tab
+           |(
+           |    ${cols.map(f => s"${f} STRING COMMENT '${f}'").mkString(",")}
+           |)
+           |COMMENT 'out index es company,create by ${BaseUtil.nowDate(pattern = "yyyy-MM-dd HH:mm:ss")}'
+           |PARTITIONED BY (ds STRING COMMENT '分区')
+           |LIFECYCLE 15
+           |""".stripMargin)
+    }
+  }
+
+  private def get_all_tab(): Unit = {
+    sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+         |            FROM    (
+         |                        SELECT  *
+         |                        FROM    winhc_ng.ads_${export_args.tn}
+         |                        WHERE   ds > 0
+         |                        UNION ALL
+         |                        SELECT  *
+         |                        FROM    winhc_ng.inc_ads_${export_args.tn}
+         |                        WHERE   ds > 0
+         |                    )
+         |        )
+         |WHERE   num = 1
+         |""".stripMargin)
+      .createTempView("export_all_tab")
+  }
+
+  private def get_inc_tab(start_ds: String): Unit = {
+    sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+         |            FROM    (
+         |                        SELECT  *
+         |                        FROM    winhc_ng.inc_ads_${export_args.tn}
+         |                        WHERE   ds > $start_ds
+         |                    )
+         |        )
+         |WHERE   num = 1
+         |""".stripMargin).createTempView("export_all_tab")
+  }
+
+
+  def calc(): Unit = {
+    val last_ds = getLastPartitionsOrElse(target_tab, null)
+
+    if (last_ds == null) {
+      // all
+      get_all_tab()
+    } else {
+      // inc
+      if (last_ds.equals(BaseUtil.getYesterday())) {
+        sql(
+          s"""
+             |ALTER TABLE $target_tab DROP IF EXISTS PARTITION(ds='${last_ds}')
+             |""".stripMargin)
+        calc()
+        return
+      }
+
+      get_inc_tab(last_ds)
+    }
+
+    val target_ds = BaseUtil.getYesterday()
+    val target_cols = getColumns(target_tab).diff(Seq("ds"))
+
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='$target_ds')
+         |SELECT ${target_cols.mkString(",")}
+         |FROM
+         |    export_all_tab
+         |""".stripMargin)
+  }
+}
+
+object export_2_es {
+  def main(args: Array[String]): Unit = {
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_ng",
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getName, config)
+
+    export_2_es(spark, export_args = export_2_es_args("company_court_open_announcement", "rowkey,defendant_info,plaintiff_info,start_date,case_no,case_reason".split(",")))
+      .calc()
+
+
+    spark.stop()
+  }
+}

+ 75 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/merge_table.scala

@@ -0,0 +1,75 @@
+package com.winhc.bigdata.spark.ng.utils
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/2/23 16:20
+ */
+case class merge_table(s: SparkSession,
+                       tables: Seq[String], //需要合并的表名
+                       join_key: String
+                      ) extends LoggingUtils with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+
+  init()
+
+  private def init(): Unit = {
+    if (tables.length <= 1) {
+      throw new RuntimeException(s"table.length = ${tables.length},Unable to merge the table !")
+    }
+
+    val tab_cols = tables.map(getColumns)
+    var int_cols = tab_cols.head
+
+    for (i <- 0 until tab_cols.length - 1) {
+      val table_2 = tab_cols(i + 1)
+      int_cols = int_cols.intersect(table_2)
+    }
+
+    if (int_cols.size != 1 || !int_cols.head.equals(join_key)) {
+      throw new RuntimeException("Unable to merge the table !")
+    }
+  }
+
+  def calc(tempView: String): Unit = {
+    var df = sql(
+      s"""
+         |select * from ${tables.head}
+         |""".stripMargin)
+
+    for (i <- 0 until tables.length - 1) {
+      val df_2 = sql(
+        s"""
+           |select * from ${tables(i + 1)}
+           |""".stripMargin)
+      df = df.join(df_2, Seq(join_key), "full")
+    }
+    df.createTempView(tempView)
+  }
+}
+
+object merge_table {
+
+  def main(args: Array[String]): Unit = {
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_ng",
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv("inc_compaaaaaany_equity_info", config)
+
+    spark.sql("select 2 as id,'v' as v").createTempView("a")
+    spark.sql("select 1 as id,'v2' as v2").createTempView("b")
+    spark.sql("select 1 as id,'v3' as v3").createTempView("c")
+    merge_table(spark, Seq("a", "b", "c"), "id").calc("abc")
+    spark.sql("select * from abc").show()
+    spark.stop()
+  }
+}

+ 16 - 3
src/main/scala/com/winhc/bigdata/spark/utils/DateUtils.scala

@@ -10,6 +10,19 @@ import org.apache.commons.lang3.StringUtils
  * @Description:
  */
 object DateUtils {
+
+  def splitDate(date: String): String = {
+    try {
+      if (StringUtils.isEmpty(date)) {
+        null
+      } else {
+        date.split(" ")(0)
+      }
+    } catch {
+      case ex: Exception => null
+    }
+  }
+
   private def addZero(str: String): String = {
     if (str.length == 2) {
       str
@@ -44,7 +57,7 @@ object DateUtils {
       p = "yyyy-MM-dd"
     }
     val fm = new SimpleDateFormat(p)
-    fm.parse(date).getTime +  ""
+    fm.parse(date).getTime + ""
   }
 
   def toUnixTimestamp(date: String, pattern: String = "yyyy-MM-dd HH:mm:ss"): Long = {
@@ -163,8 +176,8 @@ object DateUtils {
   }
 
   def main(args: Array[String]): Unit = {
-        println(DateUtils.toMillisTimestamp(date = "2001-06-05 00:00:00"))
-//    println(getNotNullStr(null, null))
+    println(DateUtils.toMillisTimestamp(date = "2001-06-05 00:00:00"))
+    //    println(getNotNullStr(null, null))
     //    println(getNotNullStr(null, "2003-10-12 10:00:00", null, "2003-11-12 00:00:02"))
   }