فهرست منبع

feat: 添加与个人相关的维度

许家凯 4 سال پیش
والد
کامیت
c2af55b3fe

+ 132 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/human/company_judicial_human.scala

@@ -0,0 +1,132 @@
+package com.winhc.bigdata.spark.jobs.human
+
+import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyMapping}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils, TableInitUtil}
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/9/29 11:14
+ * @Description: 公司所在维度,只包含人
+ */
+case class company_judicial_human(s: SparkSession,
+                                  project: String //表所在工程名
+                                 ) extends LoggingUtils with TableInitUtil with BaseFunc with CompanyMapping {
+  @(transient@getter) val spark: SparkSession = s
+
+  def init(tableName: String): Unit = {
+    def ddl(target_tableName: String): Unit = {
+      if (!spark.catalog.tableExists(target_tableName)) {
+        val ddl = getDDL(org_tableName = s"$project.ods_$tableName", target_tableName = target_tableName, addOtherCols = Seq(("rowkey", "String")))
+        println(ddl)
+        sys.exit(-1)
+      }
+    }
+
+    val target_tableName = s"$project.ads_${tableName}_human"
+    val inc_target_tableName = s"$project.inc_ads_${tableName}_human"
+    ddl(target_tableName)
+    ddl(inc_target_tableName)
+
+  }
+
+  def etl(tableName: String, dupliCols: Seq[String], is_inc: Boolean = true): Unit = {
+    cleanup()
+    is_id_card()
+    init(tableName)
+    val ods_tableName = s"$project.ods_$tableName"
+    val inc_ods_tableName = s"$project.inc_ods_$tableName"
+    val target_table = s"$project.ads_${tableName}_human"
+    val targetCols = getColumns(target_table).filter(f => {
+      !f.equals("ds")
+    })
+    val intersectCols = getColumns(ods_tableName).toSet & getColumns(inc_ods_tableName).toSet
+
+    val cid_name = intersectCols.filter(f => {
+      f.equals("cid") || f.equals("cids")
+    }).max
+
+    def all(): Unit = {
+      val inc_last_ds = getLastPartitionsOrElse(inc_ods_tableName, "0")
+      val last_ds = getLastPartitionsOrElse(ods_tableName, "0")
+
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_table PARTITION(ds='$inc_last_ds')
+           |SELECT  ${targetCols.mkString(",")}
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds) AS num
+           |            FROM    (
+           |                        SELECT  MD5(cleanup(concat(${dupliCols.mkString(",")}))) AS rowkey
+           |                                ,${intersectCols.mkString(",")}
+           |                        FROM    $ods_tableName
+           |                        WHERE   ds = '$last_ds'
+           |                        AND     $cid_name IS NULL
+           |                        AND     trim(cleanup(concat(${dupliCols.mkString(",")}))) <> ''
+           |                        AND     is_id_card(card_num)
+           |                        UNION ALL
+           |                        SELECT  MD5(cleanup(concat(${dupliCols.mkString(",")}))) AS rowkey
+           |                                ,${intersectCols.mkString(",")}
+           |                        FROM    $inc_ods_tableName
+           |                        WHERE   ds > '$last_ds'
+           |                        AND     $cid_name IS NULL
+           |                        AND     trim(cleanup(concat(${dupliCols.mkString(",")}))) <> ''
+           |                        AND     is_id_card(card_num)
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |""".stripMargin)
+
+    }
+
+    def inc(): Unit = {
+      val inc_last_ds = getLastPartitionsOrElse(inc_ods_tableName, "0")
+      val ads_last_ds = getLastPartitionsOrElse(target_table, "0")
+
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $target_table PARTITION(ds='$inc_last_ds')
+           |SELECT  ${targetCols.mkString(",")}
+           |FROM    (
+           |            SELECT  *
+           |                    ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds) AS num
+           |            FROM    (
+           |                        SELECT  MD5(cleanup(concat(${dupliCols.mkString(",")}))) AS rowkey
+           |                                ,${intersectCols.mkString(",")}
+           |                        FROM    $inc_ods_tableName
+           |                        WHERE   ds > '$ads_last_ds'
+           |                        AND     $cid_name IS NULL
+           |                        AND     trim(cleanup(concat(${dupliCols.mkString(",")}))) <> ''
+           |                        AND     is_id_card(card_num)
+           |                    ) AS t1
+           |        ) AS t2
+           |WHERE   t2.num = 1
+           |""".stripMargin)
+    }
+
+    if (is_inc)
+      inc()
+    else
+      all()
+  }
+}
+
+object company_judicial_human {
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    company_judicial_human(spark, "winhc_eci_dev")
+      .etl("company_dishonest_info", Seq("name", "case_no"), is_inc = false)
+
+    spark.stop()
+  }
+}

+ 7 - 0
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -20,6 +20,8 @@ trait BaseFunc {
   @(transient@getter) protected val spark: SparkSession
   private val pattern = "[^\\u4e00-\\u9fa5a-zA-Z \\(\\)().]+".r
 
+  private val id_card_pattern = "^[1-9]\\d{5}(18|19|20)\\d{2}((0[1-9])|(1[0-2])|\\*{2})(([0-2][1-9])|10|20|30|31|\\*{2})\\d{3}[0-9Xx]$".r
+
 
   /* def to_epoch_millis_timestamp(): Unit = {
      spark.udf.register("to_epoch_millis_timestamp", (et: String) => {
@@ -27,6 +29,11 @@ trait BaseFunc {
      })
    }*/
 
+  def is_id_card(): Unit = {
+    import com.winhc.bigdata.spark.implicits.RegexUtils._
+    spark.udf.register("is_id_card", (str: String) => id_card_pattern matches str)
+  }
+
   def code2Name(): (Broadcast[Map[String, Seq[String]]], Broadcast[Map[String, Seq[String]]]) = {
     val categoryCode2Name = spark.sparkContext.broadcast(spark.sql(
       s"""

+ 0 - 1
src/main/scala/com/winhc/bigdata/spark/utils/DataTypeUtils.scala

@@ -28,5 +28,4 @@ object DataTypeUtils {
       }
     }
   }
-
 }

+ 57 - 0
src/main/scala/com/winhc/bigdata/spark/utils/TableInitUtil.scala

@@ -0,0 +1,57 @@
+package com.winhc.bigdata.spark.utils
+
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/9/29 11:21
+ * @Description: 跟据某个表,获取另一个表的ddl
+ */
+trait TableInitUtil {
+  @(transient@getter) protected val spark: SparkSession
+
+  def getDDL(org_tableName: String
+             , target_tableName: String
+             , addOtherCols: Seq[(String, String)] = Seq.empty
+             , removeCols: Seq[String] = Seq.empty
+             , addPartitions: Seq[(String, String)] = Seq(("ds", "STRING"))
+            ): String = {
+    val colsSet = addOtherCols.map(_._1).toSet
+    val rmSet = removeCols.toSet
+
+    val cols = addOtherCols.map(f => {
+      s"${f._1} ${f._2} \n"
+    }) ++
+      spark.table(s"$org_tableName").schema
+        .fields
+        .filter(f => {
+          !f.name.equals("ds")
+        })
+        .filter(f => {
+          !colsSet.contains(f.name)
+        })
+        .filter(f => {
+          !rmSet.contains(f.name)
+        })
+        .map(f => {
+          val name = f.name
+          val dataType = f.dataType
+          s"$name ${DataTypeUtils.getDataType(dataType)} COMMENT '${f.getComment().getOrElse("")}'\n"
+        })
+
+    val str = addPartitions.map(f => {
+      s"${f._1} ${f._2} COMMENT '分区'"
+    }).mkString(",")
+
+    s"""
+       |CREATE TABLE IF NOT EXISTS $target_tableName
+       |(
+       |    ${cols.mkString(",")}
+       |)
+       |COMMENT 'TABLE COMMENT'
+       |PARTITIONED BY ($str)
+       |""".stripMargin
+  }
+}