Browse Source

Merge remote-tracking branch 'origin/master'

许家凯 3 years ago
parent
commit
08259c29ed

+ 27 - 1
src/main/scala/com/winhc/bigdata/spark/ng/jobs/args_company_job.scala

@@ -132,10 +132,36 @@ object args_company_job {
       , rowkey_udf = "md5(cleanup(concat_ws('',tax_balance,tax_category,company_id )))"
       , is_super_filter = false
     )
-    , args_company_job("company_check_info", Seq("check_type", "check_result", "check_org","check_date","company_id")
+    , args_company_job("company_check_info", Seq("check_type", "check_result", "check_org", "check_date", "company_id")
       , rowkey_udf = "md5(cleanup(concat_ws('',check_type,check_result,check_org,split_date(cast(check_date as String)),company_id )))"
       , is_super_filter = false
     )
+    , args_company_job("company_mortgage_info", Seq("reg_num")
+      , rowkey_udf = "md5(cleanup(concat_ws('',reg_num )))"
+      , is_super_filter = false
+      , id_user_defined_rowkey = true
+    )
+    , args_company_job("company_mortgage_pawn", Seq("main_id", "pawn_name", "detail")
+      , rowkey_udf = "md5(cleanup(concat_ws('',main_id,pawn_name,detail )))"
+      , is_super_filter = false
+    )
+    , args_company_job("company_mortgage_change", Seq("main_id", "change_date", "change_content")
+      , rowkey_udf = "md5(cleanup(concat_ws('',main_id,split_date(cast(change_date as String)),change_content )))"
+      , is_super_filter = false
+    )
+    , args_company_job("company_mortgage_people", Seq("main_id")
+      , rowkey_udf = "md5(cleanup(concat_ws('',main_id)))"
+      , is_super_filter = false
+    )
+    , args_company_job("company_double_random_check_info", Seq("check_task_num", "check_plan_num")
+      , rowkey_udf = "md5(cleanup(concat_ws('',check_task_num,check_plan_num)))"
+      , is_super_filter = false
+      , id_user_defined_rowkey = true
+    )
+    , args_company_job("company_double_random_check_result_info", Seq("main_id", "check_item", "check_result")
+      , rowkey_udf = "md5(cleanup(concat_ws('',main_id,check_item,check_result)))"
+      , is_super_filter = false
+    )
   )
 
   def get_args_company_job(tn: String): args_company_job = {

+ 61 - 0
src/main/scala/com/winhc/bigdata/spark/ng/relation/inc_human_relation_back.scala

@@ -0,0 +1,61 @@
+package com.winhc.bigdata.spark.ng.relation
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description:删除数据回流处理
+ * @author π
+ * @date 2021/3/15 10:59
+ */
+
+case class inc_human_relation_back(s: SparkSession,
+                                   project: String, //表所在工程名
+                                   ds: String //分区
+                                  ) extends LoggingUtils with BaseFunc with CompanyMapping {
+  override protected val spark: SparkSession = s
+
+  val inc_ods_company_human_relation_merge = "winhc_ng.inc_ods_company_human_relation_merge"
+  val inc_ads_company_human_relation_back_deleted = "winhc_ng.inc_ads_company_human_relation_back_deleted"
+
+  def calc() = {
+
+    sql(
+      s"""
+        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} table $inc_ads_company_human_relation_back_deleted PARTITION (ds='$ds')
+        |SELECT  person_name
+        |        ,person_id
+        |        ,create_time
+        |        ,update_time
+        |        ,deleted
+        |FROM    $inc_ods_company_human_relation_merge
+        |WHERE   ds = '$ds'
+        |AND     length(company_id) = 0   AND   length(person_id) = 33
+        |""".stripMargin)
+  }
+
+}
+
+object inc_human_relation_back {
+  def main(args: Array[String]): Unit = {
+    if (args.size != 2) {
+      println("please set project ds.")
+      sys.exit(-1)
+    }
+    val Array(project, ds) = args
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    val re = inc_human_relation_back(s = spark, project = project, ds = ds)
+    re.calc()
+    spark.stop()
+  }
+}

+ 4 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/export_company_index_2_es.scala

@@ -183,6 +183,10 @@ object export_company_index_2_es {
       , "rowkey,company_id,company_name,legal_name,tax_category,own_tax_amount,publish_date,tax_balance,new_tax_balance,deleted".split(","))
     , export_2_es_args("company_check_info"
       , "rowkey,company_id,company_name,check_org,check_type,check_date,check_result,deleted".split(","))
+    , export_2_es_args("company_mortgage_info"
+      , "rowkey,company_id,company_name,reg_num,reg_date,publish_date,reg_department,type,deleted".split(","))
+    , export_2_es_args("company_double_random_check_info"
+      , "rowkey,company_id,company_name,check_plan_num,check_plan_name,check_task_num,check_task_name,check_type,check_department,check_date,deleted".split(","))
   )