Prechádzať zdrojové kódy

fix: rowkey生成可不依赖company_id

许家凯 3 rokov pred
rodič
commit
0c56df48ac

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/ng/jobs/args_company_job.scala

@@ -8,6 +8,7 @@ import com.winhc.bigdata.spark.ng.utils.explode_args
  */
 case class args_company_job(tableName: String
                             , md5_fields: Seq[String] //该维度的核心字段
+                            , rowkey_add_company_id: Boolean = true //如果存在company_id,是否将company_id作为rowkey前辍
                             , rowkey_udf: String = null //用于自定义rowkey的结构,会覆盖md5_fields的默认rowkey
                             , is_super_filter: Boolean = true //是否开启超级过滤,超级过滤后任一主键都不允许为空
                             , where: String = "" // ods层数据的过滤条件

+ 4 - 2
src/main/scala/com/winhc/bigdata/spark/ng/jobs/general_handler.scala

@@ -34,6 +34,9 @@ case class general_handler(s: SparkSession,
   val ads_tab = s"$project.ads_$tn"
   val inc_ads_tab = s"$project.inc_ads_$tn"
 
+  val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
+
+
   verify()
   reg_udf()
 
@@ -97,7 +100,7 @@ case class general_handler(s: SparkSession,
       if (md5_fields.isEmpty) {
         s"company_id"
       } else {
-        if (md5_fields.contains("company_id") && job_args.verify_company_id) {
+        if (inter_cols.contains("company_id") && job_args.rowkey_add_company_id) {
           s"concat_ws('_',company_id,md5(cleanup(concat_ws('',${md5_fields.mkString(",")}))))"
         } else {
           s"md5(cleanup(concat_ws('',${md5_fields.mkString(",")})))"
@@ -153,7 +156,6 @@ case class general_handler(s: SparkSession,
 
 
   val rowkey_f = get_rowkey_udf()
-  val inter_cols = getColumns(ods_tab).intersect(getColumns(inc_ods_tab)).diff(Seq("rowkey"))
 
 
   val clean_up = get_clean_up()