浏览代码

fix: 调整炸开表逻辑

许家凯 4 年之前
父节点
当前提交
1b87aab40a

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -362,7 +362,7 @@ case class general_handler(s: SparkSession,
 
     val explode_tab_name = args.inc match {
       case true => inc_ads_explode
-      case false => inc_ads_explode
+      case false => ads_explode
     }
 
     explode_tab(spark, all_date_tmp_view, job_args.explode_args)

+ 43 - 4
src/main/scala/com/winhc/bigdata/spark/ng/utils/ReimportSummary.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.spark.ng.utils
 
 import com.winhc.bigdata.spark.ng.jobs.{args_company_job, general_handler}
-import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{BaseUtil, ElasticSearchIndexUtils, LoggingUtils, SparkUtils}
 import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
@@ -16,14 +16,16 @@ case class ReimportSummary(s: SparkSession
                           ) extends LoggingUtils {
   @(transient@getter) val spark: SparkSession = s
 
+  private lazy val es = ElasticSearchIndexUtils()
+
 
   private def re_explode(tn: String): Unit = {
     val ads_explode = s"winhc_ng.ads_${tn}_explode"
     val inc_ads_explode = s"winhc_ng.inc_ads_${tn}_explode"
 
+    dropAllPartitions(ads_explode)
+    dropAllPartitions(inc_ads_explode)
 
-    sql(s"ALTER TABLE $ads_explode DROP IF EXISTS PARTITION(ds>'0')")
-    sql(s"ALTER TABLE $inc_ads_explode DROP IF EXISTS PARTITION(ds>'0')")
 
     general_handler(s = spark
       , project = "winhc_ng"
@@ -35,7 +37,44 @@ case class ReimportSummary(s: SparkSession
     for (e <- tabs) {
       re_explode(e)
     }
+
+    //    sql(s"ALTER TABLE winhc_ng.out_es_summary DROP IF EXISTS PARTITION(ds>'0')")
+    //    sql(s"ALTER TABLE winhc_ng.out_es_summary_person DROP IF EXISTS PARTITION(ds>'0')")
+
+    create_index()
   }
+
+
+  def create_index(): Unit = {
+    val template =
+      s"""
+         |{
+         |  "mappings": {
+         |    "_doc": {
+         |      "properties": {
+         |      }
+         |    }
+         |  },
+         |  "settings": {
+         |    "number_of_shards": 3,
+         |    "number_of_replicas": 0,
+         |    "index": {
+         |      "mapping": {
+         |        "nested_fields": {
+         |          "limit": "300"
+         |        }
+         |      }
+         |    }
+         |  }
+         |}
+         |""".stripMargin
+
+    val yesterday = BaseUtil.getYesterday()
+    es.put(s"out_es_summary_person_v$yesterday", template)
+    es.put(s"out_es_summary_v$yesterday", template)
+  }
+
+
 }
 
 object ReimportSummary {
@@ -47,7 +86,7 @@ object ReimportSummary {
       "spark.hadoop.odps.spark.local.partition.amt" -> "100"
     )
     val spark = SparkUtils.InitEnv(this.getClass.getSimpleName + ":reimport", config)
-    val tns = Seq("")
+    val tns = Seq("company_court_open_announcement")
     ReimportSummary(spark, tns).calc()
     spark.stop()
   }

+ 3 - 2
src/main/scala/com/winhc/bigdata/spark/ng/utils/StartAndEndDsUtils.scala

@@ -89,7 +89,7 @@ object StartAndEndDsUtils {
       "spark.debug.maxToStringFields" -> "200",
       "spark.hadoop.odps.spark.local.partition.amt" -> "100"
     )
-    val tn = "company_send_announcement"
+    val tn = "company_court_open_announcement"
     val spark = SparkUtils.InitEnv(this.getClass.getSimpleName + ":" + tn, config)
 
 
@@ -99,7 +99,8 @@ object StartAndEndDsUtils {
     val inc_target_tab: String = s"winhc_ng.inc_ads_${tn}_explode"
 
 
-    val a = StartAndEndDsUtils(spark).get_start_and_end_args(org_tab, inc_org_tab, target_tab, inc_target_tab)
+    val a = StartAndEndDsUtils(spark)
+      .get_start_and_end_args(org_tab, inc_org_tab, target_tab, inc_target_tab)
     println(a)
 
 

+ 15 - 0
src/main/scala/com/winhc/bigdata/spark/utils/ElasticSearchIndexUtils.scala

@@ -66,11 +66,26 @@ case class ElasticSearchIndexUtils() extends Logging {
 
   def post(url: String, body: String): String = action("POST", url, body)
 
+  def put(url: String, body: String): String = action("PUT", url, body)
+
   private def action(method: String, url: String, body: String): String = {
     val entity = body == null match {
       case true => null
       case false => new NStringEntity(body, ContentType.APPLICATION_JSON)
     }
+    logInfo(
+      s"""
+         |
+         |- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+         |ElasticSearch action:
+         |
+         |$method  $url
+         |$body
+         |
+         |- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
+         |
+         |""".stripMargin)
+
     val res = client.performRequest(
       method,
       url,