Browse Source

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

晏永年 4 years ago
parent
commit
982e565d69

+ 5 - 24
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCid.scala

@@ -16,39 +16,20 @@ object CompanyForCid {
       "ods_company_tm" -> Seq("reg_no", "new_cid"), //商标
       "ods_company_wechat" -> Seq("public_num", "new_cid"), //微信公众号
       "ods_company_app_info" -> Seq("name", "new_cid"), //产品信息
-      "ods_company_own_tax" -> Seq("own_tax_amount","tax_category","tax_num", "new_cid"), //产品信息
+      "ods_company_own_tax" -> Seq("tax_balance","tax_category","tax_num", "new_cid"), //产品信息
       "ods_company_mortgage_info" -> Seq("reg_date","reg_num","amount", "new_cid") //产品信息
     )
+//  winhc_eci_dev ods_company_own_tax tax_balance,tax_category,tax_num,new_cid
 
   def main(args: Array[String]): Unit = {
-    val (space, sourceTable, cols) = valid(args)
-    //    var config = mutable.Map.empty[String, String]
+    val Array(space, sourceTable, cols) = args
+
     var config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-//    CompanyForCidUtils(spark, space, sourceTable, cols).calc()
+    CompanyForCidUtils(spark, space, sourceTable, cols.split(",")).calc()
     spark.stop()
   }
 
-  def valid(args: Array[String]): (String, String, Seq[String]) = {
-    println(args.toSeq.mkString(" "))
-    if (args.length == 1) {
-
-    } else if (args.length == 2) {
-      val Array(sourceTable, cols) = args
-      return (sourceTable.split("\\.")(0), sourceTable.split("\\.")(1), cols.split(";").toSeq)
-    } else {
-      println("请输入要计算的table!!!! ")
-      sys.exit(-1)
-    }
-    val Array(sourceTable) = args
-
-    val cols: Seq[String] = tabMapping.getOrElse(sourceTable.split("\\.")(1), Seq())
-    if (cols.isEmpty) {
-      println("输入表不存在,请配置计算规则!!!   ")
-      sys.exit(-1)
-    }
-    (sourceTable.split("\\.")(0), sourceTable.split("\\.")(1), cols)
-  }
 }

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -208,6 +208,8 @@ object ChangeExtract {
   // winhc_eci_dev company_land_transfer rowkey 20200717 num,location
   // winhc_eci_dev company_abnormal_info rowkey 20200717 remove_reason
 
+  // winhc_eci_dev company_own_tax rowkey 20200729 tax_balance,tax_category,tax_num
+
 
   // winhc_eci_dev company cid 20200630 legal_entity_id,reg_location,business_scope,reg_status,reg_capital,emails,phones
   def main(args: Array[String]): Unit = {

+ 11 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/CompanyChangeHandle.scala

@@ -231,3 +231,14 @@ case class company_abnormal_info(equCols: Seq[String]) extends CompanyChangeHand
 
   override def getBizTime(newMap: Map[String, String]): String = newMap("put_date")
 }
+
+//欠税
+case class company_own_tax(equCols: Seq[String]) extends CompanyChangeHandle {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("name"), s"${newMap("name")}欠税公告发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("name"), s"新增${newMap("name")}欠税公告")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("欠税公告", newMap("name"), newMap("publish_date"), newMap("tax_num"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("publish_date")
+}

+ 37 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_own_tax.scala

@@ -0,0 +1,37 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * 欠税公告
+ */
+case class company_own_tax() extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.getOrElse("name", null)
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.toJson(Seq("tax_balance->欠税余额","tax_category->欠税税种","put_reason->列入经营异常目录原因","new_tax_balance->当前新发生的欠税余额","publish_date->发布日期","department->发布单位"))
+
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "3"
+}

+ 0 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/increment/inc_phx_cid_ads.scala

@@ -2,7 +2,6 @@ package com.winhc.bigdata.spark.jobs.increment
 
 import java.util.Date
 
-import com.winhc.bigdata.spark.jobs.CompanyForCid.valid
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.functions.col

+ 8 - 4
src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidUtils.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.utils
 
 import java.util.Date
 
+import com.winhc.bigdata.spark.udf.CompanyMapping
 import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
@@ -11,12 +12,12 @@ import scala.annotation.meta.getter
  * cid转换
  */
 
-case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: String, cols: Seq[String]) extends LoggingUtils {
+case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: String, cols: Seq[String]) extends LoggingUtils  with CompanyMapping{
   @(transient@getter) val spark: SparkSession = s
 
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
-
+    prepareFunctions(spark)
     val odsTable = s"${space}.$sourceTable"
     val adsTable = s"${space}.ads_${sourceTable.substring(4)}"
     val companyMapping = s"${space}.company_map"
@@ -25,6 +26,8 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
     val columns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds"))
     val disCol = cols
 
+    val cols_md5 = disCol.filter(!_.equals("new_cid"))
+
     //替换字段
     sql(
       s"""
@@ -34,7 +37,8 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
          |        SELECT
          |                *
          |                ,ROW_NUMBER() OVER (PARTITION BY ${disCol.mkString(",")} ORDER BY id DESC ) num
-         |                ,CONCAT_WS('_',new_cid,id) AS rowkey
+         |                ,CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
+         |                ,cleanup(CONCAT_WS('',${cols_md5.mkString(",")})) AS cols
          |        FROM    (
          |                SELECT
          |                        a.*
@@ -45,7 +49,7 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
          |                WHERE   a.ds = $ds AND a.cid IS NOT NULL
          |                ) c
          |        ) d
-         |WHERE   num =1
+         |WHERE   num =1  AND cols is not null AND trim(cols) <> ''
          |""".stripMargin)
 //      .createOrReplaceTempView(s"t2")