Browse Source

公司变更动态加字段,phoenix不输出id

许家凯 4 years ago
parent
commit
1bc8911ea6

+ 6 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -25,12 +25,12 @@ object ChangeExtract {
     (map("0"), map("1"))
   }
 
-  def getHandleClazz(tableName: String, equCols: Seq[String]): {def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], String, String, String, String)} = {
+  def getHandleClazz(tableName: String, equCols: Seq[String]): {def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], String, String, String, String, Map[String, String])} = {
     val clazz = s"com.winhc.bigdata.spark.jobs.chance.$tableName"
     val foo = Class.forName(clazz)
       .getConstructors.head.newInstance(equCols)
       .asInstanceOf[ {
-      def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], String, String, String, String)
+      def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], String, String, String, String, Map[String, String])
     }]
     foo
   }
@@ -104,7 +104,7 @@ object ChangeExtract {
           //          try {
           if (map_list.size == 1) {
             val res = handle.handle(rowkey, null, map_list.head)
-            Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time)
+            Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
           } else {
             if (map_list.size > 2) {
               logger.error("list.size greater than 2! rowkey:" + rowkey)
@@ -117,7 +117,7 @@ object ChangeExtract {
             if (res == null) {
               null
             } else {
-              Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time)
+              Row(res._1, res._2, tableName, res._3, res._4, res._5, res._6, res._7, res._8, update_time, res._9)
             }
           }
           /* } catch {
@@ -141,7 +141,8 @@ object ChangeExtract {
         StructField("title", StringType), // 动态数据展示 ps. 新增某土地公示
         StructField("label", StringType), // 1.一般变更,2.风险变更
         StructField("biz_time", StringType), //业务时间
-        StructField("update_time", StringType) //处理时间
+        StructField("update_time", StringType), //处理时间
+        StructField("old_data", MapType(StringType, StringType)) //变更前数据
       ))
 
       spark.createDataFrame(rdd, schema)

+ 9 - 8
src/main/scala/com/winhc/bigdata/spark/jobs/chance/CompanyChangeHandle.scala

@@ -25,9 +25,9 @@ trait CompanyChangeHandle extends Serializable with Logging {
    * @param newMap
    * @return rowkey,cid,类型【insert or update】,新数据,更新字段,更新标题,变更标签【1.一般变更,2.风险变更 ...】,业务时间
    */
-  def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], String, String, String, String) = {
+  def handle(rowkey: String, oldMap: Map[String, String], newMap: Map[String, String]): (String, String, String, Map[String, String], String, String, String, String, Map[String, String]) = {
     if (oldMap == null) {
-      (rowkey, getCid(rowkey, newMap), "insert", newMap, null, getInsertTitle(newMap), getLabel(oldMap, newMap), getBizTime(newMap))
+      (rowkey, getCid(rowkey, newMap), "insert", newMap, null, getInsertTitle(newMap), getLabel(oldMap, newMap), getBizTime(newMap), null)
     } else {
       val t = getEquAndFields(oldMap, newMap)
       if (t._1) {
@@ -35,12 +35,11 @@ trait CompanyChangeHandle extends Serializable with Logging {
       } else {
         (rowkey, getCid(rowkey, newMap), "update", newMap,
           t._2
-          , getUpdateTitle(newMap), getLabel(oldMap, newMap), getBizTime(newMap))
+          , getUpdateTitle(newMap), getLabel(oldMap, newMap), getBizTime(newMap), oldMap)
       }
     }
   }
 
-
   def getCid(rowkey: String, newMap: Map[String, String]): String = rowkey.split("_")(0)
 
   def getUpdateTitle(newMap: Map[String, String]): String
@@ -178,18 +177,20 @@ case class company_employment(equCols: Seq[String]) extends CompanyChangeHandle
 
   override def getBizTime(newMap: Map[String, String]): String = newMap("start_date")
 }
+
 //招投标
-case class company_bid_list(equCols:Seq[String])extends CompanyChangeHandle{
+case class company_bid_list(equCols: Seq[String]) extends CompanyChangeHandle {
   override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"${newMap("title")}招投标信息发生变更")
 
-  override def getInsertTitle(newMap: Map[String, String]): String =  getValueOrNull(newMap("title"), s"新增${newMap("title")}招投标信息")
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"新增${newMap("title")}招投标信息")
 
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap,"招投标", Array("publish_time","title", "purchaser", "province", "abs"))
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "招投标", Array("publish_time", "title", "purchaser", "province", "abs"))
 
   override def getBizTime(newMap: Map[String, String]): String = newMap("publish_time")
 }
+
 //土地转让
-case class company_land_transfer(equCols:Seq[String])extends CompanyChangeHandle {
+case class company_land_transfer(equCols: Seq[String]) extends CompanyChangeHandle {
   override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("location"), s"${newMap("title")}土地转让信息发生变更")
 
   override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("location"), s"新增${newMap("location")}土地转让信息")

+ 1 - 3
src/main/scala/com/winhc/bigdata/spark/utils/MaxComputer2Phoenix.scala

@@ -30,7 +30,7 @@ case class MaxComputer2Phoenix(spark: SparkSession,
 
 
     val key = s"$rowkey AS rowkey"
-    val res = phoenixCols.map(s => {
+    val res = phoenixCols.filter(!_.equalsIgnoreCase("id")).map(s => {
       if ("NEW_CID".equals(s.toUpperCase())) {
         s"cast ($s as string) as CID"
       } else {
@@ -67,8 +67,6 @@ object MaxComputer2Phoenix {
     )
     val sparkSession = SparkUtils.InitEnv("scala spark on Phoenix5.x test", map)
 
-    import sparkSession._
-
     val Array(project,table) = args
 
     val odpsTable = "winhc_eci_dev.inc_ads_company_bid_list"