Browse Source

补全企业动态预备表的cname字段数据

晏永年 4 years ago
parent
commit
c3cb5f8c33

+ 31 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -25,7 +25,8 @@ object CompanyDynamic {
   case class CompanyDynamicUtil(s: SparkSession,
                                 project: String, //表所在工程名
 
-                                ds: String //此维度主键
+                                ds: String, //此维度主键
+                                bName:Boolean = false //是否补充cname字段
                                ) extends LoggingUtils with Logging {
     @(transient@getter) val spark: SparkSession = s
 
@@ -61,13 +62,35 @@ object CompanyDynamic {
 
       val types = handle.org_type()
       val rdd = sql(
+        bName match {
+          case false =>
         s"""
-           |SELECT  *
-           |FROM    winhc_eci_dev.ads_change_extract
+           |SELECT  *,null AS cname
+           |FROM    ${getEnvProjectName(env, project)}.ads_change_extract
            |WHERE   ds = '$ds'
            |AND     tn = '$tableName'
            |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
-           |""".stripMargin)
+           |""".stripMargin
+          case true =>
+            s"""
+               |SELECT A.*,TMP.name AS cname
+               |FROM(
+               |  SELECT  *
+               |  FROM    ${getEnvProjectName(env, project)}.ads_change_extract
+               |  WHERE   ds = '$ds'
+               |  AND     tn = '$tableName'
+               |  AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+               |) AS A
+               |LEFT JOIN (
+               |    SELECT cid,name FROM ${getEnvProjectName(env, project)}.ads_company B
+               |    WHERE B.ds>'0'
+               |    UNION ALL
+               |    SELECT cid,name FROM ${getEnvProjectName(env, project)}.inc_ads_company C
+               |    WHERE C.ds>'0'
+               |) AS TMP
+               |ON A.cid = TMP.cid
+               |""".stripMargin
+        })
         .rdd.flatMap(r => {
         val rowkey = r.getAs[String]("rowkey")
         val cid = r.getAs[String]("cid")
@@ -75,8 +98,9 @@ object CompanyDynamic {
         val old_data = r.getAs[Map[String, String]]("old_data")
         val biz_date = r.getAs[String]("biz_date")
         val fields = r.getAs[String]("fields")
+        val cname = r.getAs[String]("cname")
         val result = handle.handle(rowkey, biz_date, cid, if (fields == null) null else fields.split(","), old_data, new_data)
-        result.map(res => Row(cid, null, res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
+        result.map(res => Row(cid, if (cname == null) null else cname, res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
       })
 
       val schema = getSchema(ListMap(
@@ -109,7 +133,7 @@ object CompanyDynamic {
 
 
   def main(args: Array[String]): Unit = {
-    val Array(project, tableName, ds) = args
+    val Array(project, tableName, ds, bName) = if (args.length>=4) args else args:+"false"
 
     println(
       s"""
@@ -123,7 +147,7 @@ object CompanyDynamic {
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
     val spark = SparkUtils.InitEnv("CompanyDynamic", config)
-    val cd = CompanyDynamicUtil(spark, project, ds)
+    val cd = CompanyDynamicUtil(spark, project, ds, bName.toBoolean)
 
     cd.init()
 

+ 20 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_bid_list.scala

@@ -41,6 +41,25 @@ case class company_bid_list() extends CompanyDynamicHandle{
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
 
+  /**
+   *
+   * @param rowkey
+   * @param cid
+   * @param change_fields
+   * @param old_map
+   * @param new_map
+   * @return info_type
+   *         rta_desc
+   *         change_content
+   *         change_time
+   *         biz_id
+   *         sub_info_type
+   *         info_risk_level
+   *         winhc_suggest
+   */
+  override def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String]): Seq[(String, String, String, String, String, String, String, String)] = {
+    super.handle(rowkey, bizDate, cid, change_fields, old_map, new_map)
+  }
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_env_punishment.scala

@@ -42,5 +42,5 @@ case class company_env_punishment()extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "警示"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "警示信息"
 }