Explorar el Código

fix: 调整rowkey主键

许家凯 hace 3 años
padre
commit
002865d1f5

+ 8 - 6
src/main/scala/com/winhc/bigdata/spark/ng/jobs/args_company_job.scala

@@ -38,10 +38,11 @@ object args_company_job {
         , explode_args("defendant_info", "$.litigant_id", "defendant_info_id_explode")
       )
     )
+
     , args_company_job("company_dishonest_info", Seq("case_no", "name")
-      , rowkey_udf = "md5(cleanup(concat_ws('',case_no,get_fixed_val(keyno,name) )))"
+      , rowkey_udf = "md5(cleanup(concat_ws('',case_no,name )))"
       , is_super_filter = false
-      , where = "case_no_trim(case_no) is not null and get_fixed_val(keyno,name) is not null"
+      , where = "case_no_trim(case_no) is not null and name is not null"
     )
     , args_company_job("company_zxr_restrict", Seq("case_no", "person_name")
       , rowkey_udf = "md5(cleanup(concat_ws('',case_no,person_name )))"
@@ -63,8 +64,9 @@ object args_company_job {
     , args_company_job("company_land_mortgage", Seq("land_mark", "land_num", "other_item_num", "use_right_num")
       , is_super_filter = false
     )
+
     , args_company_job("company_judicial_assistance", Seq("executed_person", "execute_notice_num", "type_state", "company_id")
-      , rowkey_udf = "md5(cleanup(concat_ws('',get_fixed_val(executed_person_id,executed_person),execute_notice_num,type_state,company_id )))"
+      , rowkey_udf = "md5(cleanup(concat_ws('',executed_person,execute_notice_num,type_state,company_id )))"
       , is_super_filter = false
     )
     , args_company_job("company_public_announcement", Seq("bill_num")
@@ -82,16 +84,16 @@ object args_company_job {
     )
 
     , args_company_job("zxr_evaluate_results", Seq("case_no", "asset_name", "name", "publish_time")
-      , rowkey_udf = "md5(cleanup(concat_ws('',case_no,asset_name,get_fixed_val(keyno,name),publish_time )))"
+      , rowkey_udf = "md5(cleanup(concat_ws('',case_no,asset_name,name,publish_time )))"
       , is_super_filter = false
     )
     , args_company_job("zxr_evaluate", Seq("case_no", "asset_name", "name")
-      , rowkey_udf = "md5(cleanup(concat_ws('',case_no,asset_name,get_fixed_val(keyno,name) )))"
+      , rowkey_udf = "md5(cleanup(concat_ws('',case_no,asset_name,name )))"
       , is_super_filter = false
     )
 
     , args_company_job("restrictions_on_exit", Seq("case_no", "limited_person", "executed_person_name", "publish_time")
-      , rowkey_udf = "md5(cleanup(concat_ws('',case_no,limited_person,get_fixed_val(executed_person_keyno,executed_person_name),publish_time )))"
+      , rowkey_udf = "md5(cleanup(concat_ws('',case_no,limited_person,executed_person_name,publish_time )))"
       , is_super_filter = false
     )
 

+ 3 - 7
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -101,16 +101,12 @@ case class general_handler(s: SparkSession,
           val arr = mutable.ArrayBuffer[String]()
           for (i <- 0 until jSONArray.size()) {
             val jSONObject = jSONArray.getJSONObject(i)
-            val id = jSONObject.getString(key + "_id")
-            if(StringUtils.isNoneEmpty(id) && id.length==32){
-              arr.append(id)
-            }else{
-              arr.append(jSONObject.getString(key))
-            }
+            arr.append(jSONObject.getString(key))
           }
           arr.distinct.sorted.mkString("、")
         }
-        val str = get_seq("pledgee", pledgee_info) +" "+ get_seq("pledgor", pledgor_info)
+
+        val str = get_seq("pledgee", pledgee_info) + " " + get_seq("pledgor", pledgor_info)
         str
       } catch {
         case ex: Exception => {

+ 29 - 0
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.spark.udf
 
+import com.alibaba.fastjson.{JSON, JSONArray, JSONPath}
 import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper
 import com.winhc.bigdata.spark.utils.BaseUtil._
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils}
@@ -51,6 +52,34 @@ trait BaseFunc extends LoggingUtils {
     spark.udf.register("is_id_card", is_id_card _)
   }
 
+
+  def json_2_array_udf(): Unit ={
+    /**
+     *
+     * @param json_array
+     * @param json_path "$.name"
+     * @return
+     */
+    def json_2_array(json_array: String, json_path: String): Seq[String] = {
+      try {
+        if (StringUtils.isEmpty(json_array)) {
+          return Seq.empty
+        }
+        if (!is_json_str(json_array)) {
+          return Seq.empty
+        }
+        JSONPath.eval(JSON.parse(json_array), json_path).asInstanceOf[JSONArray].toArray[String](Array()).toSeq.distinct.diff(Seq(""))
+      } catch {
+        case e: Exception => {
+          println(json_array)
+          Seq.empty
+        }
+      }
+    }
+
+    spark.udf.register("json_2_array", json_2_array _)
+  }
+
   def code2Name(): (Broadcast[Map[String, Seq[String]]], Broadcast[Map[String, Seq[String]]]) = {
     val categoryCode2Name = spark.sparkContext.broadcast(spark.sql(
       s"""