Parcourir la source

feat: cid和cids程序加入自动建表

许家凯 il y a 4 ans
Parent
commit
8b389deca5

+ 4 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCid.scala

@@ -20,10 +20,14 @@ object CompanyForCid {
       "ods_company_mortgage_info" -> Seq("reg_date","reg_num","amount", "new_cid") //产品信息
     )
 //  winhc_eci_dev ods_company_own_tax tax_balance,tax_category,tax_num,new_cid
+  // winhc_eci_dev ods_company_change new_cid,change_item,change_time
+  // winhc_eci_dev ods_company_illegal_info new_cid,put_reason,put_date,put_department
 //  winhc_eci_dev company_check_info check_org,check_date,new_cid
 //  winhc_eci_dev company_tax_contravention taxpayer_number,case_info,new_cid
 //  winhc_eci_dev company_double_random_check_info check_task_num,new_cid
 //  winhc_eci_dev company_double_random_check_result_info main_id,check_item,new_cid
+//  winhc_eci_dev ods_company_finance new_cid,round,money
+//  winhc_eci_dev ods_company_dishonest_info new_cid,case_no
 
   def main(args: Array[String]): Unit = {
     val Array(space, sourceTable, cols) = args

+ 4 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncrForCid.scala

@@ -16,6 +16,10 @@ object CompanyIncrForCid {
   //winhc_eci_dev company_app_info icon_oss_path,brief,name,new_cid
   //winhc_eci_dev ads_company_tm app_date,tm_name,reg_no,new_cid
   //winhc_eci_dev company_wechat title,public_num,new_cid
+  // winhc_eci_dev company_change new_cid,change_item,change_time
+  // winhc_eci_dev company_illegal_info new_cid,put_reason,put_date,put_department
+  //  winhc_eci_dev company_finance new_cid,round,money
+  // winhc_eci_dev company_dishonest_info new_cid,case_no
   def main(args: Array[String]): Unit = {
     val Array(project, tableName, dupliCols) = args
     println(

+ 12 - 7
src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidUtils.scala

@@ -12,7 +12,7 @@ import scala.annotation.meta.getter
  * cid转换
  */
 
-case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: String, cols: Seq[String]) extends LoggingUtils  with CompanyMapping{
+case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: String, cols: Seq[String]) extends LoggingUtils with CompanyMapping {
   @(transient@getter) val spark: SparkSession = s
 
   val rowKeyMapping =
@@ -35,15 +35,16 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
     //rowkey前缀匹配
     val rowKeyPre = rowKeyMapping.getOrElse(sourceTable,"new_cid")
 
-   val ddl =  spark.table(odsTable).schema.filter(s=>{!"ds".equals(s.name)}).map(s=>{
-
+    val ddl = spark.table(odsTable).schema.filter(s => {
+      !"ds".equals(s.name)
+    }).map(s => {
       val name = s.name
       val dataType = s.dataType
       s"$name ${DataTypeUtils.getDataType(dataType)} COMMENT '${s.getComment().getOrElse("")}'\n"
     }).mkString(",")
 
 
-    sql(
+    println(
       s"""
          |CREATE TABLE IF NOT EXISTS ${adsTable}
          |(
@@ -54,6 +55,10 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
          |COMMENT 'TABLE COMMENT'
          |PARTITIONED BY (ds STRING COMMENT '分区')
          |""".stripMargin)
+    if (!spark.catalog.tableExists("adsTable")) {
+      return
+    }
+
 
     //替换字段
     sql(
@@ -78,12 +83,12 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
          |        ) d
          |WHERE   num =1  AND cols is not null AND trim(cols) <> ''
          |""".stripMargin)
-//      .createOrReplaceTempView(s"t2")
+    //      .createOrReplaceTempView(s"t2")
 
-//    sql(s"select rowkey,new_cid,${columns.mkString(",")} from t2").show(10)
+    //    sql(s"select rowkey,new_cid,${columns.mkString(",")} from t2").show(10)
 
     //写表
-//    sql(s"insert into table ${adsTable} partition (ds=${ds}) select rowkey,new_cid,${columns.mkString(",")} from t2")
+    //    sql(s"insert into table ${adsTable} partition (ds=${ds}) select rowkey,new_cid,${columns.mkString(",")} from t2")
 
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }

+ 41 - 5
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidUtils.scala

@@ -16,7 +16,7 @@ case class CompanyIncrForCidUtils(s: SparkSession,
                                   project: String, //表所在工程名
                                   tableName: String, //表名(不加前后辍)
                                   dupliCols: Seq[String] // 去重列
-                                 ) extends LoggingUtils with CompanyMapping{
+                                 ) extends LoggingUtils with CompanyMapping {
   @(transient@getter) val spark: SparkSession = s
  //主键字段
   val rowKeyMapping =
@@ -33,6 +33,44 @@ case class CompanyIncrForCidUtils(s: SparkSession,
     val inc_ods_company_tb = s"${project}.inc_ods_$tableName" //增量ods表
     val inc_ads_company_tb = s"${project}.inc_ads_$tableName" //增量ads表
 
+
+
+    //table字段
+    val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
+      !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey")
+    })
+
+    val colsSet = columns.toSet
+
+    val ddl = spark.table(inc_ods_company_tb).schema.filter(s => colsSet.contains(s.name)).map(s => {
+      val name = s.name
+      val dataType = s.dataType
+      s"$name ${DataTypeUtils.getDataType(dataType)} COMMENT '${s.getComment().getOrElse("")}'\n"
+    }).mkString(",")
+
+    println(
+      s"""
+         |CREATE TABLE IF NOT EXISTS ${inc_ads_company_tb}
+         |(
+         |    rowkey  STRING COMMENT 'FIELD'
+         |    ,flag STRING COMMENT 'FIELD'
+         |    ,new_cid STRING COMMENT 'FIELD'
+         |    ,cid STRING COMMENT 'FIELD'
+         |    ,$ddl
+         |)
+         |COMMENT 'TABLE COMMENT'
+         |PARTITIONED BY (ds STRING COMMENT '分区')
+         |""".stripMargin)
+
+    if (!spark.catalog.tableExists(inc_ads_company_tb)) {
+      return
+    }
+
+
+
+
+
+
     //存量表ads最新分区
     val remainDs = BaseUtil.getPartion(ads_company_tb, spark)
 
@@ -72,10 +110,8 @@ case class CompanyIncrForCidUtils(s: SparkSession,
          |firstDsIncOds:$firstDsIncOds
          |""".stripMargin)
 
-    //table字段
-    val columns: Seq[String] = spark.table(ads_company_tb).schema.map(_.name).filter(s => {
-      !s.equals("ds") && !s.equals("cid") && !s.equals("new_cid") && !s.equals("rowkey")
-    })
+
+
 
     //rowkey前缀匹配
     val rowKeyPre = rowKeyMapping.getOrElse(tableName,"new_cid")