Browse Source

feat: 新流程下的任务

许家凯 3 years ago
parent
commit
ab99fc6e84

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/jobs/args_company_job.scala

@@ -27,7 +27,7 @@ object args_company_job {
     , args_company_job("company_court_open_announcement", Seq("case_no", "start_date")
     , args_company_job("company_court_open_announcement", Seq("case_no", "start_date")
       , rowkey_udf = "md5(cleanup(concat_ws('',case_no_trim(case_no),split_date(cast(start_date as String)) )))"
       , rowkey_udf = "md5(cleanup(concat_ws('',case_no_trim(case_no),split_date(cast(start_date as String)) )))"
       , is_super_filter = false
       , is_super_filter = false
-      , where = "case_no_trim(case_no) is not null"
+      , where = "case_no_trim(case_no) is not null and is_json_str(defendant_info) and is_json_str(plaintiff_info) and is_json_str(litigant_info)"
       , explode_args = Seq(
       , explode_args = Seq(
         explode_args("plaintiff_info", "$.litigant_id", "plaintiff_info_id_explode")
         explode_args("plaintiff_info", "$.litigant_id", "plaintiff_info_id_explode")
         , explode_args("defendant_info", "$.litigant_id", "defendant_info_id_explode")
         , explode_args("defendant_info", "$.litigant_id", "defendant_info_id_explode")

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -48,6 +48,7 @@ case class general_handler(s: SparkSession,
   private def reg_udf(): Unit = {
   private def reg_udf(): Unit = {
     cleanup()
     cleanup()
     case_no_trim_udf()
     case_no_trim_udf()
+    json_verify()
 
 
     def replace_rowkey(old_rowkey: String, new_rowkey: String): String = {
     def replace_rowkey(old_rowkey: String, new_rowkey: String): String = {
       if (StringUtils.isEmpty(old_rowkey)) {
       if (StringUtils.isEmpty(old_rowkey)) {

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/ng/utils/CompanySummaryNg_new.scala

@@ -33,7 +33,7 @@ case class CompanySummaryNg_new(s: SparkSession,
                                 , args: Seq[company_summary_args]
                                 , args: Seq[company_summary_args]
                                ) extends LoggingUtils {
                                ) extends LoggingUtils {
   @(transient@getter) val spark: SparkSession = s
   @(transient@getter) val spark: SparkSession = s
-  private val target_tab = "winhc_ng.out_es_summary_test"
+  private val target_tab = "winhc_ng.out_es_summary"
   init()
   init()
 
 
   private def init() {
   private def init() {

+ 1 - 11
src/main/scala/com/winhc/bigdata/spark/ng/utils/explode_tab.scala

@@ -1,9 +1,9 @@
 package com.winhc.bigdata.spark.ng.utils
 package com.winhc.bigdata.spark.ng.utils
 
 
 import com.alibaba.fastjson.{JSON, JSONArray, JSONPath}
 import com.alibaba.fastjson.{JSON, JSONArray, JSONPath}
-import com.google.gson.{JsonElement, JsonParser}
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.is_json_str
 import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
 import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.SparkSession
@@ -25,16 +25,6 @@ case class explode_tab(s: SparkSession,
 
 
   init()
   init()
 
 
-  private def is_json_str(str: String): Boolean = {
-    var jsonElement: JsonElement = null
-    try jsonElement = new JsonParser().parse(str)
-    catch {
-      case e: Exception =>
-        return false
-    }
-    if (jsonElement == null) return false
-    true
-  }
 
 
   private def init(): Unit = {
   private def init(): Unit = {
     /**
     /**

+ 18 - 8
src/main/scala/com/winhc/bigdata/spark/ng/utils/export_2_es.scala

@@ -15,9 +15,9 @@ import scala.collection.mutable
 
 
 case class export_2_es_args(tn: String, out_fields: Seq[String])
 case class export_2_es_args(tn: String, out_fields: Seq[String])
 
 
-case class export_2_es(s: SparkSession,
-                       export_args: export_2_es_args
-                      ) extends LoggingUtils with BaseFunc {
+case class export_company_index_2_es(s: SparkSession,
+                                     export_args: export_2_es_args
+                                    ) extends LoggingUtils with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
   @(transient@getter) val spark: SparkSession = s
 
 
   private val target_tab = s"winhc_ng.out_index_es_${export_args.tn}"
   private val target_tab = s"winhc_ng.out_index_es_${export_args.tn}"
@@ -125,18 +125,28 @@ case class export_2_es(s: SparkSession,
   }
   }
 }
 }
 
 
-object export_2_es {
+object export_company_index_2_es {
+  val as = Seq(
+    export_2_es_args("company_court_open_announcement"
+      , "rowkey,defendant_info,plaintiff_info,start_date,case_no,case_reason".split(","))
+  )
+
+
+  def run(spark: SparkSession, tn: String): Unit = {
+    export_company_index_2_es(spark, export_args = as.find(p => p.tn.equals(tn)).get)
+      .calc()
+  }
+
+
   def main(args: Array[String]): Unit = {
   def main(args: Array[String]): Unit = {
+    val Array(tn) = args
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_ng",
       "spark.hadoop.odps.project.name" -> "winhc_ng",
       "spark.debug.maxToStringFields" -> "200",
       "spark.debug.maxToStringFields" -> "200",
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
     )
     val spark = SparkUtils.InitEnv(this.getClass.getName, config)
     val spark = SparkUtils.InitEnv(this.getClass.getName, config)
-
-    export_2_es(spark, export_args = export_2_es_args("company_court_open_announcement", "rowkey,defendant_info,plaintiff_info,start_date,case_no,case_reason".split(",")))
-      .calc()
-
+    run(spark, tn)
 
 
     spark.stop()
     spark.stop()
   }
   }

+ 7 - 3
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -1,8 +1,8 @@
 package com.winhc.bigdata.spark.udf
 package com.winhc.bigdata.spark.udf
 
 
 import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper
 import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils}
 import com.winhc.bigdata.spark.utils.BaseUtil._
 import com.winhc.bigdata.spark.utils.BaseUtil._
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils}
 import org.apache.commons.lang3.StringUtils
 import org.apache.commons.lang3.StringUtils
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.SparkSession
@@ -16,7 +16,7 @@ import scala.annotation.meta.getter
  * @Date: 2020/7/10 13:49
  * @Date: 2020/7/10 13:49
  * @Description:
  * @Description:
  */
  */
-trait BaseFunc extends LoggingUtils{
+trait BaseFunc extends LoggingUtils {
   @(transient@getter) protected val spark: SparkSession
   @(transient@getter) protected val spark: SparkSession
   private val pattern = "[^\\u4e00-\\u9fa5a-zA-Z \\(\\)().]+".r
   private val pattern = "[^\\u4e00-\\u9fa5a-zA-Z \\(\\)().]+".r
 
 
@@ -27,7 +27,7 @@ trait BaseFunc extends LoggingUtils{
      })
      })
    }*/
    }*/
 
 
-  def addEmptyPartitionOrSkip(tab:String,ds:String): Unit ={
+  def addEmptyPartitionOrSkip(tab: String, ds: String): Unit = {
     sql(
     sql(
       s"""
       s"""
          |ALTER TABLE $tab ADD IF NOT EXISTS PARTITION(ds='$ds')
          |ALTER TABLE $tab ADD IF NOT EXISTS PARTITION(ds='$ds')
@@ -109,6 +109,10 @@ trait BaseFunc extends LoggingUtils{
     })
     })
   }
   }
 
 
+  def json_verify(): Unit = {
+    spark.udf.register("is_json_str", is_json_str _)
+  }
+
   def cleanup(): Unit = {
   def cleanup(): Unit = {
     //清理特殊字符
     //清理特殊字符
     spark.udf.register("cleanup", (col: String) => {
     spark.udf.register("cleanup", (col: String) => {