Forráskód Böngészése

使用全量mapping表获取cname,并扩充动态建议参数

晏永年 4 éve
szülő
commit
682cd4a368

+ 16 - 20
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -24,9 +24,8 @@ object CompanyDynamic {
 
   case class CompanyDynamicUtil(s: SparkSession,
                                 project: String, //表所在工程名
-
                                 ds: String, //此维度主键
-                                bName:Boolean = false //是否补充cname字段
+                                bName: Boolean = false //是否补充cname字段
                                ) extends LoggingUtils with Logging {
     @(transient@getter) val spark: SparkSession = s
 
@@ -64,16 +63,16 @@ object CompanyDynamic {
       val rdd = sql(
         bName match {
           case false =>
-        s"""
-           |SELECT  *,null AS cname
-           |FROM    ${getEnvProjectName(env, project)}.ads_change_extract
-           |WHERE   ds = '$ds'
-           |AND     tn = '$tableName'
-           |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
-           |""".stripMargin
+            s"""
+               |SELECT  *,null AS cname
+               |FROM    ${getEnvProjectName(env, project)}.ads_change_extract
+               |WHERE   ds = '$ds'
+               |AND     tn = '$tableName'
+               |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+               |""".stripMargin
           case true =>
             s"""
-               |SELECT A.*,TMP.name AS cname
+               |SELECT A.*,B.cname AS cname
                |FROM(
                |  SELECT  *
                |  FROM    ${getEnvProjectName(env, project)}.ads_change_extract
@@ -82,13 +81,10 @@ object CompanyDynamic {
                |  AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
                |) AS A
                |LEFT JOIN (
-               |    SELECT cid,name FROM ${getEnvProjectName(env, project)}.ads_company B
-               |    WHERE B.ds>'0'
-               |    UNION ALL
-               |    SELECT cid,name FROM ${getEnvProjectName(env, project)}.inc_ads_company C
-               |    WHERE C.ds>'0'
-               |) AS TMP
-               |ON A.cid = TMP.cid
+               |    SELECT cid,cname FROM ${getEnvProjectName(env, project)}.base_company_mapping
+               |    WHERE ds=(SELECT MAX(ds) FROM ${getEnvProjectName(env, project)}.base_company_mapping WHERE ds>'0')
+               |) AS B
+               |ON A.cid = B.cid
                |""".stripMargin
         })
         .rdd.flatMap(r => {
@@ -100,8 +96,8 @@ object CompanyDynamic {
         val fields = r.getAs[String]("fields")
         val cname = r.getAs[String]("cname")
         val result = handle.handle(rowkey, biz_date, cid, if (fields == null) null else fields.split(","), old_data, new_data, cname)
-        if(result == null){
-          null
+        if (result == null) {
+          None
         }
         else {
           result.map(res => Row(cid, if (cname == null) null else cname, res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
@@ -138,7 +134,7 @@ object CompanyDynamic {
 
 
   def main(args: Array[String]): Unit = {
-    val Array(project, tableName, ds, bName) = if (args.length>=4) args else args:+"false"
+    val Array(project, tableName, ds, bName) = if (args.length >= 4) args else args :+ "false"
 
     println(
       s"""

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -14,7 +14,7 @@ trait CompanyDynamicHandle {
     , "" -> "land_purchase" //土地购买
     , "" -> "land_transfer" //土地转让
     , "" -> "land_mortgage" //土地抵押
-    , "" -> "tender_es" //中标信息ES
+    , "company_bid_list" -> "tender_es" //中标信息ES
     , "" -> "enterprise_shixin" //失信
     , "" -> "enterprise_zhixing" //被执
     , "" -> "shareholder_shixin" //股东失信
@@ -56,7 +56,7 @@ trait CompanyDynamicHandle {
     , "" -> "9" // 动产抵押
     , "" -> "10" // 司法拍卖
     , "" -> "11" // 土地信息
-    , "" -> "12" // 中标信息
+    , "company_bid_list" -> "12" // 中标信息
     , "" -> "13" // 招聘信息
     , "" -> "14" // 行政处罚
     , "" -> "15" // 公示催告
@@ -101,7 +101,7 @@ trait CompanyDynamicHandle {
    *         info_risk_level
    *         winhc_suggest
    */
-  def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String], cname: String = null): Seq[(String, String, String, String, String, String, String, String)] = {
+  def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String], cname: String = null, suggestion: String = null): Seq[(String, String, String, String, String, String, String, String)] = {
     Seq((get_info_type()
       , get_rta_desc(old_map, new_map)
       , get_change_content(old_map, new_map)
@@ -109,7 +109,7 @@ trait CompanyDynamicHandle {
       , get_biz_id(rowkey)
       , get_sub_info_type()
       , get_info_risk_level(old_map, new_map)
-      , "被监控企业流动资金紧张,可能存在经营困难的情况。建议立即与被监控企业书面对账,适当催促其履行债务并持续监控。"
+      , if(suggestion == null) "被监控企业流动资金紧张,可能存在经营困难的情况。建议立即与被监控企业书面对账,适当催促其履行债务并持续监控。" else suggestion
     ))
   }