Jelajahi Sumber

Merge remote-tracking branch 'origin/master'

许家凯 4 tahun lalu
induk
melakukan
9d64faa7f4

+ 5 - 24
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyForCid.scala

@@ -16,39 +16,20 @@ object CompanyForCid {
       "ods_company_tm" -> Seq("reg_no", "new_cid"), //商标
       "ods_company_wechat" -> Seq("public_num", "new_cid"), //微信公众号
       "ods_company_app_info" -> Seq("name", "new_cid"), //产品信息
-      "ods_company_own_tax" -> Seq("own_tax_amount","tax_category","tax_num", "new_cid"), //产品信息
+      "ods_company_own_tax" -> Seq("tax_balance","tax_category","tax_num", "new_cid"), //产品信息
       "ods_company_mortgage_info" -> Seq("reg_date","reg_num","amount", "new_cid") //产品信息
     )
+//  winhc_eci_dev ods_company_own_tax tax_balance,tax_category,tax_num,new_cid
 
   def main(args: Array[String]): Unit = {
-    val (space, sourceTable, cols) = valid(args)
-    //    var config = mutable.Map.empty[String, String]
+    val Array(space, sourceTable, cols) = args
+
     var config = mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev"
     )
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-//    CompanyForCidUtils(spark, space, sourceTable, cols).calc()
+    CompanyForCidUtils(spark, space, sourceTable, cols.split(",")).calc()
     spark.stop()
   }
 
-  def valid(args: Array[String]): (String, String, Seq[String]) = {
-    println(args.toSeq.mkString(" "))
-    if (args.length == 1) {
-
-    } else if (args.length == 2) {
-      val Array(sourceTable, cols) = args
-      return (sourceTable.split("\\.")(0), sourceTable.split("\\.")(1), cols.split(";").toSeq)
-    } else {
-      println("请输入要计算的table!!!! ")
-      sys.exit(-1)
-    }
-    val Array(sourceTable) = args
-
-    val cols: Seq[String] = tabMapping.getOrElse(sourceTable.split("\\.")(1), Seq())
-    if (cols.isEmpty) {
-      println("输入表不存在,请配置计算规则!!!   ")
-      sys.exit(-1)
-    }
-    (sourceTable.split("\\.")(0), sourceTable.split("\\.")(1), cols)
-  }
 }

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -202,6 +202,8 @@ object ChangeExtract {
   // winhc_eci_dev company_land_transfer rowkey 20200717 num,location
   // winhc_eci_dev company_abnormal_info rowkey 20200717 remove_reason
 
+  // winhc_eci_dev company_own_tax rowkey 20200729 tax_balance,tax_category,tax_num
+
 
   // winhc_eci_dev company cid 20200630 legal_entity_id,reg_location,business_scope,reg_status,reg_capital,emails,phones
   def main(args: Array[String]): Unit = {

+ 11 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/CompanyChangeHandle.scala

@@ -231,3 +231,14 @@ case class company_abnormal_info(equCols: Seq[String]) extends CompanyChangeHand
 
   override def getBizTime(newMap: Map[String, String]): String = newMap("put_date")
 }
+
+//欠税
+case class company_own_tax(equCols: Seq[String]) extends CompanyChangeHandle {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("name"), s"${newMap("name")}欠税公告发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("name"), s"新增${newMap("name")}欠税公告")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.get_ip_tags("欠税公告", newMap("name"), newMap("publish_date"), newMap("tax_num"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("publish_date")
+}

+ 21 - 20
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -24,9 +24,8 @@ object CompanyDynamic {
 
   case class CompanyDynamicUtil(s: SparkSession,
                                 project: String, //表所在工程名
-
                                 ds: String, //此维度主键
-                                bName:Boolean = false //是否补充cname字段
+                                bName: Boolean = false //是否补充cname字段
                                ) extends LoggingUtils with Logging {
     @(transient@getter) val spark: SparkSession = s
 
@@ -64,16 +63,16 @@ object CompanyDynamic {
       val rdd = sql(
         bName match {
           case false =>
-        s"""
-           |SELECT  *,null AS cname
-           |FROM    ${getEnvProjectName(env, project)}.ads_change_extract
-           |WHERE   ds = '$ds'
-           |AND     tn = '$tableName'
-           |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
-           |""".stripMargin
+            s"""
+               |SELECT  *,null AS cname
+               |FROM    ${getEnvProjectName(env, project)}.ads_change_extract
+               |WHERE   ds = '$ds'
+               |AND     tn = '$tableName'
+               |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+               |""".stripMargin
           case true =>
             s"""
-               |SELECT A.*,TMP.name AS cname
+               |SELECT A.*,B.cname AS cname
                |FROM(
                |  SELECT  *
                |  FROM    ${getEnvProjectName(env, project)}.ads_change_extract
@@ -82,13 +81,10 @@ object CompanyDynamic {
                |  AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
                |) AS A
                |LEFT JOIN (
-               |    SELECT cid,name FROM ${getEnvProjectName(env, project)}.ads_company B
-               |    WHERE B.ds>'0'
-               |    UNION ALL
-               |    SELECT cid,name FROM ${getEnvProjectName(env, project)}.inc_ads_company C
-               |    WHERE C.ds>'0'
-               |) AS TMP
-               |ON A.cid = TMP.cid
+               |    SELECT cid,cname FROM ${getEnvProjectName(env, project)}.base_company_mapping
+               |    WHERE ds=(SELECT MAX(ds) FROM ${getEnvProjectName(env, project)}.base_company_mapping WHERE ds>'0')
+               |) AS B
+               |ON A.cid = B.cid
                |""".stripMargin
         })
         .rdd.flatMap(r => {
@@ -99,8 +95,13 @@ object CompanyDynamic {
         val biz_date = r.getAs[String]("biz_date")
         val fields = r.getAs[String]("fields")
         val cname = r.getAs[String]("cname")
-        val result = handle.handle(rowkey, biz_date, cid, if (fields == null) null else fields.split(","), old_data, new_data)
-        result.map(res => Row(cid, if (cname == null) null else cname, res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
+        val result = handle.handle(rowkey, biz_date, cid, if (fields == null) null else fields.split(","), old_data, new_data, cname)
+        if (result == null) {
+          None
+        }
+        else {
+          result.map(res => Row(cid, if (cname == null) null else cname, res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
+        }
       })
 
       val schema = getSchema(ListMap(
@@ -133,7 +134,7 @@ object CompanyDynamic {
 
 
   def main(args: Array[String]): Unit = {
-    val Array(project, tableName, ds, bName) = if (args.length>=4) args else args:+"false"
+    val Array(project, tableName, ds, bName) = if (args.length >= 4) args else args :+ "false"
 
     println(
       s"""

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -14,7 +14,7 @@ trait CompanyDynamicHandle {
     , "" -> "land_purchase" //土地购买
     , "" -> "land_transfer" //土地转让
     , "" -> "land_mortgage" //土地抵押
-    , "" -> "tender_es" //中标信息ES
+    , "company_bid_list" -> "tender_es" //中标信息ES
     , "" -> "enterprise_shixin" //失信
     , "" -> "enterprise_zhixing" //被执
     , "" -> "shareholder_shixin" //股东失信
@@ -56,7 +56,7 @@ trait CompanyDynamicHandle {
     , "" -> "9" // 动产抵押
     , "" -> "10" // 司法拍卖
     , "" -> "11" // 土地信息
-    , "" -> "12" // 中标信息
+    , "company_bid_list" -> "12" // 中标信息
     , "" -> "13" // 招聘信息
     , "" -> "14" // 行政处罚
     , "" -> "15" // 公示催告
@@ -101,7 +101,7 @@ trait CompanyDynamicHandle {
    *         info_risk_level
    *         winhc_suggest
    */
-  def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String]): Seq[(String, String, String, String, String, String, String, String)] = {
+  def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String], cname: String = null, suggestion: String = null): Seq[(String, String, String, String, String, String, String, String)] = {
     Seq((get_info_type()
       , get_rta_desc(old_map, new_map)
       , get_change_content(old_map, new_map)
@@ -109,7 +109,7 @@ trait CompanyDynamicHandle {
       , get_biz_id(rowkey)
       , get_sub_info_type()
       , get_info_risk_level(old_map, new_map)
-      , "被监控企业流动资金紧张,可能存在经营困难的情况。建议立即与被监控企业书面对账,适当催促其履行债务并持续监控。"
+      , if(suggestion == null) "被监控企业流动资金紧张,可能存在经营困难的情况。建议立即与被监控企业书面对账,适当催促其履行债务并持续监控。" else suggestion
     ))
   }
 

+ 11 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_bid_list.scala

@@ -27,12 +27,12 @@ case class company_bid_list() extends CompanyDynamicHandle{
   override def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("abs")
 
   /**
-   * 变更时间
+   * 业务id
    *
-   * @param new_map
+   * @param rowkey
    * @return
    */
-  //  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
+  override def get_biz_id(rowkey: String): String = rowkey.split("_")(1)
 
   /**
    * 风险等级
@@ -59,7 +59,13 @@ case class company_bid_list() extends CompanyDynamicHandle{
    *         info_risk_level
    *         winhc_suggest
    */
-  override def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String]): Seq[(String, String, String, String, String, String, String, String)] = {
-    super.handle(rowkey, bizDate, cid, change_fields, old_map, new_map)
+  override def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String], cname: String, suggestion: String = null): Seq[(String, String, String, String, String, String, String, String)] = {
+    val proxyName=new_map("proxy")
+    if (proxyName!=null && !proxyName.isEmpty && proxyName.equals(cname)){
+      return null
+    }
+    else{
+      super.handle(rowkey, bizDate, cid, change_fields, old_map, new_map,cname,"该企业发布或参与招投标行为")
+    }
   }
 }

+ 37 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_own_tax.scala

@@ -0,0 +1,37 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+ * 欠税公告
+ */
+case class company_own_tax() extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.getOrElse("name", null)
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.toJson(Seq("tax_balance->欠税余额","tax_category->欠税税种","put_reason->列入经营异常目录原因","new_tax_balance->当前新发生的欠税余额","publish_date->发布日期","department->发布单位"))
+
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "3"
+}

+ 0 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/increment/inc_phx_cid_ads.scala

@@ -2,7 +2,6 @@ package com.winhc.bigdata.spark.jobs.increment
 
 import java.util.Date
 
-import com.winhc.bigdata.spark.jobs.CompanyForCid.valid
 import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.functions.col

+ 8 - 4
src/main/scala/com/winhc/bigdata/spark/utils/CompanyForCidUtils.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.utils
 
 import java.util.Date
 
+import com.winhc.bigdata.spark.udf.CompanyMapping
 import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
@@ -11,12 +12,12 @@ import scala.annotation.meta.getter
  * cid转换
  */
 
-case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: String, cols: Seq[String]) extends LoggingUtils {
+case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: String, cols: Seq[String]) extends LoggingUtils  with CompanyMapping{
   @(transient@getter) val spark: SparkSession = s
 
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
-
+    prepareFunctions(spark)
     val odsTable = s"${space}.$sourceTable"
     val adsTable = s"${space}.ads_${sourceTable.substring(4)}"
     val companyMapping = s"${space}.company_map"
@@ -25,6 +26,8 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
     val columns: Seq[String] = spark.table(odsTable).schema.map(_.name).filter(!_.equals("ds"))
     val disCol = cols
 
+    val cols_md5 = disCol.filter(!_.equals("new_cid"))
+
     //替换字段
     sql(
       s"""
@@ -34,7 +37,8 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
          |        SELECT
          |                *
          |                ,ROW_NUMBER() OVER (PARTITION BY ${disCol.mkString(",")} ORDER BY id DESC ) num
-         |                ,CONCAT_WS('_',new_cid,id) AS rowkey
+         |                ,CONCAT_WS('_',new_cid,md5(cleanup(CONCAT_WS('',${cols_md5.mkString(",")})))) AS rowkey
+         |                ,cleanup(CONCAT_WS('',${cols_md5.mkString(",")})) AS cols
          |        FROM    (
          |                SELECT
          |                        a.*
@@ -45,7 +49,7 @@ case class CompanyForCidUtils(s: SparkSession, space: String, sourceTable: Strin
          |                WHERE   a.ds = $ds AND a.cid IS NOT NULL
          |                ) c
          |        ) d
-         |WHERE   num =1
+         |WHERE   num =1  AND cols is not null AND trim(cols) <> ''
          |""".stripMargin)
 //      .createOrReplaceTempView(s"t2")