소스 검색

feat: 添加摘要重算、索引切换

许家凯 4 년 전
부모
커밋
033dcc2313

+ 17 - 12
src/main/scala/com/winhc/bigdata/spark/ng/utils/ReimportSummary.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.spark.ng.utils
 
 import com.winhc.bigdata.spark.ng.jobs.{args_company_job, general_handler}
-import com.winhc.bigdata.spark.utils.{BaseUtil, ElasticSearchIndexUtils, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{ElasticSearchIndexUtils, LoggingUtils, SparkUtils}
 import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
@@ -33,19 +33,19 @@ case class ReimportSummary(s: SparkSession
       .explode_calc()
   }
 
-  def calc(): Unit = {
+  def calc(ds: String): Unit = {
     for (e <- tabs) {
       re_explode(e)
     }
 
-    //    sql(s"ALTER TABLE winhc_ng.out_es_summary DROP IF EXISTS PARTITION(ds>'0')")
-    //    sql(s"ALTER TABLE winhc_ng.out_es_summary_person DROP IF EXISTS PARTITION(ds>'0')")
+    dropAllPartitions("winhc_ng.out_es_summary")
+    dropAllPartitions("winhc_ng.out_es_summary_person")
 
-    create_index()
+    create_index(ds)
   }
 
 
-  def create_index(): Unit = {
+  def create_index(ds: String): Unit = {
     val template =
       s"""
          |{
@@ -69,16 +69,15 @@ case class ReimportSummary(s: SparkSession
          |}
          |""".stripMargin
 
-    val yesterday = BaseUtil.getYesterday()
-    es.put(s"out_es_summary_person_v$yesterday", template)
-    es.put(s"out_es_summary_v$yesterday", template)
+    es.put(s"out_es_summary_person_v$ds", template)
+    es.put(s"out_es_summary_v$ds", template)
   }
 
-
 }
 
 object ReimportSummary {
   def main(args: Array[String]): Unit = {
+    val Array(ds) = args
     val project = "winhc_ng"
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
@@ -86,8 +85,14 @@ object ReimportSummary {
       "spark.hadoop.odps.spark.local.partition.amt" -> "100"
     )
     val spark = SparkUtils.InitEnv(this.getClass.getSimpleName + ":reimport", config)
-    val tns = Seq("company_court_open_announcement")
-    ReimportSummary(spark, tns).calc()
+    val tns = Seq(
+      "company_court_open_announcement"
+      ,"company_court_announcement"
+      ,"company_send_announcement"
+      ,"company_court_register"
+      ,"company_land_mortgage"
+    )
+    ReimportSummary(spark, tns).calc(ds)
     spark.stop()
   }
 }

+ 67 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/SwitchSummaryIndex.scala

@@ -0,0 +1,67 @@
+package com.winhc.bigdata.spark.ng.utils
+
+import com.winhc.bigdata.spark.utils.{Alias, ElasticSearchIndexUtils, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/4/29 16:31
+ */
+
+case class SwitchSummaryIndex(s: SparkSession
+                             ) extends LoggingUtils {
+  @(transient@getter) val spark: SparkSession = s
+
+  private lazy val es = ElasticSearchIndexUtils()
+
+  private val company_summary_alias = "company_summary"
+  private val person_summary_alias = "company_summary_person"
+
+  def switch(ds: String): Unit = {
+    val company_summary = "out_es_summary_v" + ds
+    val person_summary = "out_es_summary_person_v" + ds
+
+    val size = es.catIndices().filter(a => a.equals(company_summary) || a.equals(person_summary)).size
+    if (size != 2) {
+      throw new RuntimeException("target indices is not found !")
+    }
+
+    val ali: Seq[Alias] = es.catAliases()
+      .filter(a => a.alias.equals(company_summary_alias) || a.alias.equals(person_summary_alias))
+
+    val add_alias = Seq(
+      Alias(alias = company_summary_alias, index = company_summary)
+      , Alias(alias = person_summary_alias, index = person_summary)
+    )
+
+    import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper.CaseClass2JsonEnhancer
+    println(
+      s"""
+         |remove alias: ${ali.toJson()}
+         |add alias: ${add_alias.toJson()}
+         |""".stripMargin)
+
+    es.updateAliases(add_alias,ali)
+  }
+}
+
+
+object SwitchSummaryIndex {
+  def main(args: Array[String]): Unit = {
+    val Array(ds) = args
+    val project = "winhc_ng"
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    SwitchSummaryIndex(spark).switch(ds)
+
+    spark.stop()
+  }
+}

+ 0 - 1
src/main/scala/com/winhc/bigdata/spark/utils/ElasticSearchIndexUtils.scala

@@ -75,7 +75,6 @@ case class ElasticSearchIndexUtils() extends Logging {
     }
     logInfo(
       s"""
-         |
          |- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
          |ElasticSearch action:
          |