Parcourir la source

公司动态通用程序init

许家凯 il y a 4 ans
Parent
commit
2a7365f911

+ 90 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -0,0 +1,90 @@
+package com.winhc.bigdata.spark.jobs.dynamic
+
+import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
+import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/27 16:52
+ * @Description: 企业动态
+ */
+object CompanyDynamic {
+
+  case class CompanyDynamicUtil(s: SparkSession,
+                                project: String, //表所在工程名
+                                tableName: String, //表名(不加前后辍)
+                                ds: String //此维度主键
+                               ) extends LoggingUtils with Logging {
+    @(transient@getter) val spark: SparkSession = s
+
+    val targetTab = "target_tab"
+
+
+    def calc(): Unit = {
+      val handle = getClazz[ {
+        def org_type(): Seq[String]
+        def handle(rowkey: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String]): (String, String, String, String, String, String, String, String)
+      }](s"com.winhc.bigdata.spark.jobs.dynamic.$tableName")
+
+      val types = handle.org_type()
+      val rdd = sql(
+        s"""
+           |SELECT  *
+           |FROM    winhc_eci_dev.ads_change_extract
+           |WHERE   ds = '$ds'
+           |AND     tn = '$tableName'
+           |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+           |""".stripMargin)
+        .rdd.map(r => {
+        val rowkey = r.getAs[String]("rowkey")
+        val cid = r.getAs[String]("cid")
+        val new_data = r.getAs[Map[String, String]]("data")
+        val old_data = r.getAs[Map[String, String]]("old_data")
+        val biz_data = r.getAs[Map[String, String]]("biz_data")
+        val fields = r.getAs[String]("fields")
+        val res = handle.handle(rowkey, cid, fields.split(","), old_data, new_data)
+        Row(cid, res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8)
+      })
+
+      val schema = getSchema(Map(
+        "cid" -> StringType
+        , "info_type" -> StringType
+        , "rta_desc" -> StringType
+        , "change_content" -> StringType
+        , "change_time" -> StringType
+        , "biz_id" -> StringType
+        , "sub_info_type" -> StringType
+        , "info_risk_level" -> StringType
+        , "winhc_suggest" -> StringType
+        , "create_time" -> StringType
+      ))
+      spark.createDataFrame(rdd, schema).write
+        .mode(if (isWindows) "append" else "overwrite")
+        .insertInto(targetTab)
+
+    }
+
+
+  }
+
+
+  def main(args: Array[String]): Unit = {
+    val Array(project, tableName) = args
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark = SparkUtils.InitEnv("CompanyDynamic", config)
+    CompanyDynamicUtil(spark, "winhc_eci_dev", "table_name", "ds").calc()
+    spark.stop()
+
+  }
+}

+ 156 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -0,0 +1,156 @@
+package com.winhc.bigdata.spark.jobs.dynamic
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/27 17:05
+ * @Description:
+ */
+trait CompanyDynamicHandle {
+
+  private val table_2_sub_info_type_map = Map("CompanyDynamicHandleTest" -> "MyTest")
+
+  private val table_2_info_type = Map("CompanyDynamicHandleTest" -> "0")
+
+  /**
+   *
+   * @param rowkey
+   * @param cid
+   * @param change_fields
+   * @param old_map
+   * @param new_map
+   * @return info_type
+   *         rta_desc
+   *         change_content
+   *         change_time
+   *         biz_id
+   *         sub_info_type
+   *         info_risk_level
+   *         winhc_suggest
+   */
+  def handle(rowkey: String, cid: String, change_fields: Seq[String], old_map: Map[String, String], new_map: Map[String, String]): (String, String, String, String, String, String, String, String) = {
+    (get_info_type()
+      , get_rta_desc(old_map, new_map)
+      , get_change_content(old_map, new_map)
+      , get_change_time(new_map)
+      , get_biz_id(rowkey)
+      , get_sub_info_type()
+      , get_info_risk_level(old_map, new_map)
+      , "被监控企业流动资金紧张,可能存在经营困难的情况。建议立即与被监控企业书面对账,适当催促其履行债务并持续监控。"
+    )
+
+  }
+
+
+  /**
+   * 来源表的变更类型,insert or update
+   *
+   * @return
+   */
+  def org_type(): Seq[String] = Seq("insert")
+
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String
+
+  /**
+   * 信息类型,大类
+   *
+   * @return
+   */
+  def get_info_type(): String = table_2_info_type(getClass.getSimpleName)
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String
+
+  /**
+   * 变更时间
+   *
+   * @param new_map
+   * @return
+   */
+  def get_change_time(new_map: Map[String, String]): String
+
+  /**
+   * 业务id
+   *
+   * @param rowkey
+   * @return
+   */
+  def get_biz_id(rowkey: String): String
+
+  /**
+   * 子信息类型,小类
+   *
+   * @return
+   */
+  def get_sub_info_type(): String = table_2_sub_info_type_map(getClass.getSimpleName)
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String
+
+}
+
+case class CompanyDynamicHandleTest() extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = ???
+
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String]): String = ???
+
+  /**
+   * 变更时间
+   *
+   * @param new_map
+   * @return
+   */
+  override def get_change_time(new_map: Map[String, String]): String = ???
+
+  /**
+   * 业务id
+   *
+   * @param rowkey
+   * @return
+   */
+  override def get_biz_id(rowkey: String): String = ???
+
+
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = ???
+}

+ 7 - 5
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -1,10 +1,8 @@
 package com.winhc.bigdata.spark.utils
 
-import java.io.PrintWriter
-
-import com.winhc.bigdata.spark.utils.BaseUtil.getPartitions
 import org.apache.commons.lang3.StringUtils
 import org.apache.log4j.Logger
+import org.apache.spark.sql.types.{DataType, StructField, StructType}
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
 import scala.annotation.meta.getter
@@ -132,7 +130,8 @@ trait LoggingUtils {
       default
     }
   }
- def getHeadPartitionsOrElse(t: String, default: String): String = {
+
+  def getHeadPartitionsOrElse(t: String, default: String): String = {
     val ps = getPartitions(t)
     if (ps.nonEmpty) {
       ps.head
@@ -141,7 +140,10 @@ trait LoggingUtils {
     }
   }
 
-  def getColumns(t:String): Seq[String] ={
+  def getColumns(t: String): Seq[String] = {
     spark.table(t).columns.seq
   }
+  def getSchema(map: Map[String, DataType]): StructType = {
+    StructType(map.map(e => StructField(e._1, e._2)).toArray)
+  }
 }

+ 20 - 0
src/main/scala/com/winhc/bigdata/spark/utils/ReflectUtils.scala

@@ -0,0 +1,20 @@
+package com.winhc.bigdata.spark.utils
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/27 17:01
+ * @Description:
+ */
+object ReflectUtils {
+  def getClazz[T](clazzName: String, initargs: Any*): T = {
+    Class.forName(clazzName)
+      .getConstructors.head.newInstance(initargs.asInstanceOf[Seq[Object]]: _*)
+      .asInstanceOf[T]
+  }
+
+  def main(args: Array[String]): Unit = {
+    val handle = getClazz[ {def p()}
+    ]("com.winhc.bigdata.spark.test.XjkTest")
+    handle.p
+  }
+}