Prechádzať zdrojové kódy

feat: 执行人、终本索引添加转化字段

许家凯 3 rokov pred
rodič
commit
b0fddad65f

+ 37 - 5
src/main/scala/com/winhc/bigdata/spark/ng/utils/export_company_index_2_es.scala

@@ -3,6 +3,7 @@ package com.winhc.bigdata.spark.ng.utils
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.StringUtils
 import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
@@ -13,7 +14,9 @@ import scala.collection.mutable
  * @date: 2021/2/24 17:32
  */
 
-case class export_2_es_args(tn: String, out_fields: Seq[String])
+case class field_handle(field_name: String, handle: String)
+
+case class export_2_es_args(tn: String, out_fields: Seq[String], handles: Seq[field_handle] = null)
 
 case class export_company_index_2_es(s: SparkSession,
                                      export_args: export_2_es_args
@@ -50,6 +53,25 @@ case class export_company_index_2_es(s: SparkSession,
            |LIFECYCLE 15
            |""".stripMargin)
     }
+
+
+    def parse_double(v: String): String = {
+      if (StringUtils.isEmpty(v)) {
+        return null
+      }
+      val v_2 = v.trim
+      try {
+        v_2.toDouble
+        v_2
+      } catch {
+        case ex: Exception => {
+          null
+        }
+      }
+    }
+
+    spark.udf.register("parse_double", parse_double _)
+
   }
 
   private def get_all_tab(): Unit = {
@@ -115,10 +137,15 @@ case class export_company_index_2_es(s: SparkSession,
     val target_ds = BaseUtil.getYesterday()
     val target_cols = getColumns(target_tab).diff(Seq("ds"))
 
+    val map: Map[String, String] = if (export_args.handles != null)
+      export_args.handles.map(f => (f.field_name, f.handle)).toMap
+    else
+      Map.empty
+
     sql(
       s"""
          |INSERT OVERWRITE TABLE $target_tab PARTITION(ds='$target_ds')
-         |SELECT ${target_cols.mkString(",")}
+         |SELECT ${target_cols.map(f => s"${map.getOrElse(f, f)} as $f").mkString(",")}
          |FROM
          |    export_all_tab
          |""".stripMargin)
@@ -128,7 +155,6 @@ case class export_company_index_2_es(s: SparkSession,
 object export_company_index_2_es {
   val as = Seq(
 
-
     export_2_es_args("company_court_open_announcement"
       , "rowkey,defendant_info,plaintiff_info,start_date,case_no,case_reason".split(","))
 
@@ -187,8 +213,12 @@ object export_company_index_2_es {
       , "rowkey,defendant_info,plaintiff_info,court,start_date,title,deleted".split(","))
     , export_2_es_args("company_court_register"
       , "rowkey,case_no,case_reason,defendant_info,plaintiff_info,filing_date,deleted".split(","))
+
     , export_2_es_args("company_zxr_final_case"
-      , "rowkey,name,keyno,court_name,case_final_time,case_create_time,case_no,exec_amount,status,deleted".split(","))
+      , "rowkey,name,keyno,court_name,case_final_time,case_create_time,case_no,exec_amount,no_exec_amount,status,deleted".split(",")
+      , handles = Seq(field_handle(field_name = "exec_amount", handle = "parse_double(exec_amount)"),field_handle(field_name = "no_exec_amount", handle = "parse_double(no_exec_amount)")))
+
+
     , export_2_es_args("company_tax_contravention"
       , "rowkey,company_id,company_name,case_type,case_info,department,publish_time,deleted".split(","))
     , export_2_es_args("company_own_tax"
@@ -202,7 +232,9 @@ object export_company_index_2_es {
 
 
     , export_2_es_args("company_zxr"
-      , "rowkey,name,keyno,card,court,case_create_time,case_no,exec_money,status,deleted".split(","))
+      , "rowkey,name,keyno,card,court,case_create_time,case_no,exec_money,status,deleted".split(",")
+      , handles = Seq(field_handle(field_name = "exec_money", handle = "parse_double(exec_money)")))
+
     , export_2_es_args("bankruptcy_open_case"
       , "rowkey,case_no,case_type,agency_court,applicant,applicant_info,respondent,respondent_info,public_date,deleted".split(","))
     , export_2_es_args("auction_tracking"