Forráskód Böngészése

feat: 支持使用ods层rowkey

许家凯 3 éve
szülő
commit
bd57383171

+ 22 - 9
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -50,11 +50,11 @@ case class general_handler(s: SparkSession,
     case_no_trim_udf()
     json_verify()
 
-    def replace_rowkey(old_rowkey: String, new_rowkey: String): String = {
-      if (StringUtils.isEmpty(old_rowkey)) {
+    def replace_rowkey(use_user_defined_rowkey: String, new_rowkey: String): String = {
+      if (StringUtils.isEmpty(use_user_defined_rowkey)) {
         new_rowkey
       } else {
-        old_rowkey
+        use_user_defined_rowkey
       }
     }
 
@@ -66,6 +66,8 @@ case class general_handler(s: SparkSession,
       }
     }
 
+    spark.udf.register("use_user_defined_rowkey", replace_rowkey _)
+
     spark.udf.register("holder_switch_rowkey", company_holder_rowkey _)
 
     spark.udf.register("split_date", DateUtils.splitDate _)
@@ -74,15 +76,26 @@ case class general_handler(s: SparkSession,
   }
 
   private def get_rowkey_udf(): String = {
-    if (StringUtils.isNotEmpty(job_args.rowkey_udf)) {
-      return job_args.rowkey_udf
+    def get_row(): String ={
+      if (StringUtils.isNotEmpty(job_args.rowkey_udf)) {
+        return job_args.rowkey_udf
+      }
+
+      if (md5_fields.isEmpty) {
+        s"company_id"
+      } else {
+        s"concat_ws('_',company_id,md5(cleanup(concat_ws('',${md5_fields.mkString(",")}))))"
+      }
     }
 
-    if (md5_fields.isEmpty) {
-      s"company_id"
-    } else {
-      s"concat_ws('_',company_id,md5(cleanup(concat_ws('',${md5_fields.mkString(",")}))))"
+    if(job_args.id_user_defined_rowkey){
+      //使用ods层rowkey
+      s" use_user_defined_rowkey(rowkey,${get_row()}) "
+    }else{
+      get_row()
     }
+
+
   }
 
   private def get_clean_up(): String = {