Browse Source

add:股权出质
alter:企业动态改为flatMap

许家凯 4 years ago
parent
commit
da36b407bb

+ 37 - 0
src/main/scala/com/winhc/bigdata/spark/implicits/DataFrame2HBaseHelper.scala

@@ -0,0 +1,37 @@
+package com.winhc.bigdata.spark.implicits
+
+import com.winhc.bigdata.spark.config.HBaseConfig
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.sql.DataFrame
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/29 13:51
+ * @Description:
+ */
+object DataFrame2HBaseHelper {
+
+  implicit class DataFrameEnhancer(df: DataFrame) {
+
+    def save2HBase(tableName: String, rowkeyFieldName: String, fields: Seq[String], f_bytes: Array[Byte] = Bytes.toBytes("F")): Unit = {
+      import org.apache.spark.sql.functions.col
+      val jobConf = HBaseConfig.HBaseOutputJobConf(tableName)
+
+      val stringDf = df.select(df.columns.map(column => col(column).cast("string")): _*)
+      stringDf.rdd.map(row => {
+        val id = row.getAs[String](rowkeyFieldName)
+        val put = new Put(Bytes.toBytes(id))
+        for (f <- fields) {
+          val v = row.getAs[String](f.toLowerCase)
+          if (v != null) {
+            put.addColumn(f_bytes, Bytes.toBytes(f), Bytes.toBytes(v))
+          }
+        }
+        (new ImmutableBytesWritable, put)
+      }).filter(_ != null)
+        .saveAsHadoopDataset(jobConf)
+    }
+  }
+}

+ 9 - 15
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -3,6 +3,7 @@ package com.winhc.bigdata.spark.jobs.dynamic
 import java.util.Date
 
 import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
 import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.time.DateFormatUtils
@@ -29,7 +30,6 @@ object CompanyDynamic {
     @(transient@getter) val spark: SparkSession = s
 
     private val env = "dev"
-    var cleanFlag = false
     val targetTab = "xjk_tmp_company_dynamic"
 
     def init(): Unit = {
@@ -38,6 +38,7 @@ object CompanyDynamic {
            |CREATE TABLE IF NOT EXISTS ${getEnvProjectName(env, project)}.$targetTab
            |(
            |    cid  STRING COMMENT '公司id'
+           |    ,cname  STRING COMMENT '公司name'
            |    ,info_type STRING COMMENT '变更分类,大类'
            |    ,rta_desc STRING COMMENT '变更信息描述,变更标题'
            |    ,change_content STRING COMMENT '变更内容'
@@ -49,7 +50,7 @@ object CompanyDynamic {
            |    ,create_time STRING COMMENT '创建时间'
            |)
            |COMMENT '企业动态输出表'
-           |PARTITIONED BY (ds STRING COMMENT '分区')
+           |PARTITIONED BY (ds STRING COMMENT '分区',tn STRING COMMENT '表名')
            |LIFECYCLE 30
            |""".stripMargin)
     }
@@ -67,19 +68,20 @@ object CompanyDynamic {
            |AND     tn = '$tableName'
            |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
            |""".stripMargin)
-        .rdd.map(r => {
+        .rdd.flatMap(r => {
         val rowkey = r.getAs[String]("rowkey")
         val cid = r.getAs[String]("cid")
         val new_data = r.getAs[Map[String, String]]("data")
         val old_data = r.getAs[Map[String, String]]("old_data")
         val biz_date = r.getAs[String]("biz_date")
         val fields = r.getAs[String]("fields")
-        val res = handle.handle(rowkey, biz_date, cid, if (fields == null) null else fields.split(","), old_data, new_data)
-        Row(cid, res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"))
+        val result = handle.handle(rowkey, biz_date, cid, if (fields == null) null else fields.split(","), old_data, new_data)
+        result.map(res => Row(cid, null, res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
       })
 
       val schema = getSchema(ListMap(
         "cid" -> StringType
+        , "cname" -> StringType
         , "info_type" -> StringType
         , "rta_desc" -> StringType
         , "change_content" -> StringType
@@ -93,19 +95,11 @@ object CompanyDynamic {
       spark.createDataFrame(rdd, schema)
         .createOrReplaceTempView("company_dynamic_tmp")
 
-      if (!cleanFlag) {
-        sql(
-          s"""
-             |alter table ${getEnvProjectName(env, project)}.$targetTab drop if exists partition(ds='$ds')
-             |""".stripMargin)
-        cleanFlag = true
-      }
-
-      val cols = getColumns(s"$project.$targetTab").filter(!_.equals("ds"))
+      val cols = getColumns(s"$project.$targetTab").filter(!_.equals("ds")).filter(!_.equals("tn"))
 
       sql(
         s"""
-           |INSERT INTO TABLE ${getEnvProjectName(env, project)}.$targetTab PARTITION(ds='$ds')
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${getEnvProjectName(env, project)}.$targetTab PARTITION(ds='$ds',tn='$tableName')
            |SELECT ${cols.mkString(",")}
            |FROM
            |    company_dynamic_tmp

+ 3 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -101,8 +101,8 @@ trait CompanyDynamicHandle {
    *         info_risk_level
    *         winhc_suggest
    */
-  def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String]): (String, String, String, String, String, String, String, String) = {
-    (get_info_type()
+  def handle(rowkey: String, bizDate: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String]): Seq[(String, String, String, String, String, String, String, String)] = {
+    Seq((get_info_type()
       , get_rta_desc(old_map, new_map)
       , get_change_content(old_map, new_map)
       , get_change_time(bizDate, new_map)
@@ -110,8 +110,7 @@ trait CompanyDynamicHandle {
       , get_sub_info_type()
       , get_info_risk_level(old_map, new_map)
       , "被监控企业流动资金紧张,可能存在经营困难的情况。建议立即与被监控企业书面对账,适当催促其履行债务并持续监控。"
-    )
-
+    ))
   }
 
 

+ 168 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/inc_company_equity_info.scala

@@ -0,0 +1,168 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, MaxComputer2Phoenix, SparkUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/29 11:18
+ * @Description: 股权出质增量数据外理
+ */
+object inc_company_equity_info {
+
+
+  case class IncCompanyEquityInfoUtils(s: SparkSession,
+                                       project: String, //表所在工程名
+                                       ds: String //此维度主键
+                                      ) extends LoggingUtils with Logging with BaseFunc{
+    @(transient@getter) val spark: SparkSession = s
+
+
+    def calc(): Unit = {
+      cleanup()
+      val cols = getColumns(s"winhc_eci_dev.ads_company_equity_info").diff(Seq("ds", "rowkey", "id"))
+
+//      val startPart = getLastPartitionsOrElse(s"winhc_eci_dev.inc_ads_company_equity_info", "0")
+      val startPart = "20200725"
+      val endPart = getLastPartitionsOrElse(s"winhc_eci_dev.inc_ods_company_equity_info", BaseUtil.getYesterday())
+
+      if (startPart.equals(endPart)) {
+        println("start partition = end partition!")
+        sys.exit(-1)
+      }
+
+      sql(
+        s"""
+           |SELECT  tmp.id
+           |        ,tmp.cid
+           |        ,tmp.base
+           |        ,tmp.reg_number
+           |        ,tmp.pledgor
+           |        ,tmp.certif_number_l
+           |        ,tmp.equity_amount
+           |        ,tmp.pledgee
+           |        ,tmp.certif_number_r
+           |        ,tmp.reg_date
+           |        ,tmp.state
+           |        ,tmp.pub_date
+           |        ,tmp.change_situation
+           |        ,tmp.cancel_date
+           |        ,tmp.cancel_reason
+           |        ,tmp.pledgor_type
+           |        ,tmp.pledgor_id
+           |        ,tmp.pledgee_type
+           |        ,tmp.pledgee_id
+           |        ,tmp.create_time
+           |        ,tmp.update_time
+           |        ,tmp.deleted
+           |FROM    (
+           |            SELECT  a.*
+           |                    ,row_number() OVER (PARTITION BY a.reg_number,a.cid,a.pledgor,a.pledgee ORDER BY update_time DESC) c
+           |            FROM    (
+           |                        SELECT  *
+           |                        FROM    winhc_eci_dev.inc_ods_company_equity_info
+           |                        WHERE   ds > $startPart
+           |                    ) as a
+           |        ) AS tmp
+           |WHERE   tmp.c = 1
+           |""".stripMargin).createOrReplaceTempView("tmp_company_equity_info_all")
+
+
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.inc_ads_company_equity_info PARTITION(ds='$endPart')
+           |SELECT  md5(cleanup(CONCAT_WS('',reg_number,cid,pledgor,pledgee))) as id
+           |        ,${cols.mkString(",")}
+           |FROM    tmp_company_equity_info_all
+           |""".stripMargin)
+
+      sql(
+        s"""
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.inc_ads_company_equity_info_list PARTITION(ds='$endPart')
+           |SELECT  CONCAT_WS('_',pledgee_id,id) as rowkey
+           |        ,pledgee_id AS cid
+           |        ,id AS main_id
+           |        ,pledgee AS cname
+           |        ,2 AS type
+           |        ,state
+           |        ,deleted
+           |FROM    tmp_company_equity_info_all
+           |WHERE   pledgee_id IS NOT NULL
+           |AND     pledgee_type == 2
+           |
+           |UNION ALL
+           |
+           |SELECT  CONCAT_WS('_',pledgor_id,id) as rowkey
+           |        ,pledgor_id AS cid
+           |        ,id AS main_id
+           |        ,pledgor AS cname
+           |        ,1 AS type
+           |        ,state
+           |        ,deleted
+           |FROM    tmp_company_equity_info_all
+           |WHERE   pledgor_id IS NOT NULL
+           |AND     pledgor_type == 2
+           |
+           |UNION ALL
+           |
+           |SELECT  CONCAT_WS('_',cid,id) as rowkey
+           |        ,cid AS cid
+           |        ,id AS main_id
+           |        ,null AS cname
+           |        ,0 AS type
+           |        ,state
+           |        ,deleted
+           |FROM    tmp_company_equity_info_all
+           |WHERE   cid IS NOT NULL
+           |""".stripMargin)
+
+      val writeCols = getColumns("winhc_eci_dev.inc_ads_company_equity_info_list").diff(Seq("ds", "rowkey"))
+
+      MaxComputer2Phoenix(spark
+        , writeCols
+        , "inc_ads_company_equity_info_list"
+        , "COMPANY_EQUITY_INFO_LIST"
+        , endPart
+        , "CONCAT_WS('_',cid,main_id)").syn()
+      import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
+
+      val outFields = getColumns("winhc_eci_dev.inc_ads_company_equity_info").map(_.toUpperCase)
+      sql(
+        s"""
+           |SELECT  *
+           |FROM    winhc_eci_dev.inc_ads_company_equity_info
+           |WHERE   ds = '$endPart'
+           |""".stripMargin)
+        .save2HBase("COMPANY_EQUITY_INFO", "id", outFields)
+    }
+  }
+
+
+  def main(args: Array[String]): Unit = {
+    val Array(project, ds) = args
+
+    println(
+      s"""
+         |project: $project
+         |ds: $ds
+         |""".stripMargin)
+
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.debug.maxToStringFields" -> "200",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
+    )
+    val spark = SparkUtils.InitEnv("inc_company_equity_info", config)
+    IncCompanyEquityInfoUtils(spark, project, ds).calc()
+    spark.stop()
+  }
+
+}

+ 6 - 2
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -62,6 +62,10 @@ object BaseUtil {
     DateFormatUtils.format(c.getTime.getTime, pattern)
   }
 
+  def getYesterday(): String = {
+    atDaysAfter(-1, nowDate("yyyyMMdd"))
+  }
+
   def atDaysAfter(n: Int, time: String, pattern: String = "yyyyMMdd"): String = {
     import java.text.SimpleDateFormat
     val newtime: Date = new SimpleDateFormat("yyyyMMdd").parse(time)
@@ -104,7 +108,7 @@ object BaseUtil {
           sb.append(a).append(",")
         }
         sb.substring(0, sb.lastIndexOf(",")) toString()
-      }else{
+      } else {
         ""
       }
     } else {
@@ -113,6 +117,6 @@ object BaseUtil {
   }
 
   def main(args: Array[String]): Unit = {
-    println(atDaysAfter(-1, nowDate("yyyyMMdd")))
+    println(getYesterday())
   }
 }