Browse Source

feat: 添加摘要重算、索引切换、删除多余索引等发布上线

许家凯 4 years ago
parent
commit
843d884e9e

+ 0 - 1
src/main/scala/com/winhc/bigdata/spark/ng/utils/ReimportSummary.scala

@@ -90,7 +90,6 @@ object ReimportSummary {
       ,"company_court_announcement"
       ,"company_send_announcement"
       ,"company_court_register"
-      ,"company_land_mortgage"
     )
     ReimportSummary(spark, tns).calc(ds)
     spark.stop()

+ 25 - 5
src/main/scala/com/winhc/bigdata/spark/ng/utils/SwitchSummaryIndex.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.spark.ng.utils
 
+import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper._
 import com.winhc.bigdata.spark.utils.{Alias, ElasticSearchIndexUtils, LoggingUtils, SparkUtils}
 import org.apache.spark.sql.SparkSession
 
@@ -23,9 +24,10 @@ case class SwitchSummaryIndex(s: SparkSession
   def switch(ds: String): Unit = {
     val company_summary = "out_es_summary_v" + ds
     val person_summary = "out_es_summary_person_v" + ds
+    val allIndices = es.catIndices()
 
-    val size = es.catIndices().filter(a => a.equals(company_summary) || a.equals(person_summary)).size
-    if (size != 2) {
+    val size = allIndices.filter(a => a.equals(company_summary) || a.equals(person_summary)).size
+    if (size < 2) {
       throw new RuntimeException("target indices is not found !")
     }
 
@@ -36,15 +38,33 @@ case class SwitchSummaryIndex(s: SparkSession
       Alias(alias = company_summary_alias, index = company_summary)
       , Alias(alias = person_summary_alias, index = person_summary)
     )
-
-    import com.winhc.bigdata.spark.implicits.CaseClass2JsonHelper.CaseClass2JsonEnhancer
     println(
       s"""
          |remove alias: ${ali.toJson()}
          |add alias: ${add_alias.toJson()}
          |""".stripMargin)
 
-    es.updateAliases(add_alias,ali)
+    es.updateAliases(add_alias, ali)
+
+    deletedOtherIndices(allIndices, add_alias, ali)
+  }
+
+  def deletedOtherIndices(allIndices: Seq[String], add_alias: Seq[Alias], remove: Seq[Alias]): Unit = {
+    val index = add_alias.map(_.index) ++ remove.map(_.index)
+    val other = allIndices.filter(i => i.startsWith("out_es_summary_")).filterNot(i => index.contains(i))
+    println(
+      s"""
+         |
+         |deleted index: ${other.mkString(",")}
+         |""".stripMargin)
+
+    if(other.size==2){
+      for (elem <- other) {
+        es.delete(elem)
+      }
+    }else{
+      throw new RuntimeException("删除的目标索引大于两个,请手动排查!!!")
+    }
   }
 }
 

+ 3 - 2
src/main/scala/com/winhc/bigdata/spark/utils/ElasticSearchIndexUtils.scala

@@ -61,6 +61,8 @@ case class ElasticSearchIndexUtils() extends Logging {
     }
   }
 
+  def delete(index: String): Unit = action("DELETE", index, null)
+
 
   def get(url: String, body: String): String = action("GET", url, body)
 
@@ -100,7 +102,6 @@ case class ElasticSearchIndexUtils() extends Logging {
 object ElasticSearchIndexUtils {
 
   def main(args: Array[String]): Unit = {
-    val l = ElasticSearchIndexUtils().catIndices()
-    println(l.mkString("\n"))
+    val es = ElasticSearchIndexUtils()
   }
 }