许家凯 il y a 4 ans
Parent
commit
daef91ca81

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -171,6 +171,7 @@ object ChangeExtract {
 
   // winhc_eci_dev company_bid_list rowkey 20200717 title
   // winhc_eci_dev company_land_transfer rowkey 20200717 num,location
+  // winhc_eci_dev company_abnormal_info rowkey 20200717 remove_reason
 
 
   // winhc_eci_dev company cid 20200630 legal_entity_id,reg_location,business_scope,reg_status,reg_capital,emails,phones

+ 12 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/CompanyChangeHandle.scala

@@ -199,8 +199,9 @@ case class company_land_transfer(equCols: Seq[String]) extends CompanyChangeHand
 
   override def getBizTime(newMap: Map[String, String]): String = newMap("merchandise_time")
 }
+
 //环保处罚
-case class company_env_punishment(equCols:Seq[String])extends CompanyChangeHandle {
+case class company_env_punishment(equCols: Seq[String]) extends CompanyChangeHandle {
   override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("punish_number"), s"${newMap("title")}环保处罚信息发生变更")
 
   override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("punish_number"), s"新增${newMap("punish_number")}环保处罚信息")
@@ -209,3 +210,13 @@ case class company_env_punishment(equCols:Seq[String])extends CompanyChangeHandl
 
   override def getBizTime(newMap: Map[String, String]): String = newMap("publish_time")
 }
+
+case class company_abnormal_info(equCols: Seq[String]) extends CompanyChangeHandle {
+  override def getUpdateTitle(newMap: Map[String, String]): String = "经营异常发生变更"
+
+  override def getInsertTitle(newMap: Map[String, String]): String = "新增一条经营异常"
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "经营异常", Array("put_department", "remove_department", "put_reason", "put_date", "remove_date", "remove_reason"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("put_date")
+}

+ 4 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -11,6 +11,7 @@ import org.apache.spark.sql.types.StringType
 import org.apache.spark.sql.{Row, SparkSession}
 
 import scala.annotation.meta.getter
+import scala.collection.immutable.ListMap
 import scala.collection.mutable
 
 /**
@@ -71,13 +72,13 @@ object CompanyDynamic {
         val cid = r.getAs[String]("cid")
         val new_data = r.getAs[Map[String, String]]("data")
         val old_data = r.getAs[Map[String, String]]("old_data")
-        val biz_time = r.getAs[String]("biz_time")
+        val biz_date = r.getAs[String]("biz_date")
         val fields = r.getAs[String]("fields")
-        val res = handle.handle(rowkey, biz_time, cid, fields.split(","), old_data, new_data)
+        val res = handle.handle(rowkey, biz_date, cid, if(fields==null) null else fields.split(","), old_data, new_data)
         Row(cid, res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss"))
       })
 
-      val schema = getSchema(Map(
+      val schema = getSchema(ListMap(
         "cid" -> StringType
         , "info_type" -> StringType
         , "rta_desc" -> StringType

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -20,7 +20,7 @@ trait CompanyDynamicHandle {
     , "" -> "shareholder_shixin" //股东失信
     , "" -> "shareholder_zhixing" //股东被执
     , "" -> "tender_qichacha" //中标信息企查查
-    , "" -> "eci_exception" //经营异常
+    , "company_abnormal_info" -> "eci_exception" //经营异常
     , "" -> "eci_zscq" //知识产权
     , "" -> "eci_wenshu" //裁判文书
     , "" -> "court_announcement" //法院公告
@@ -48,7 +48,7 @@ trait CompanyDynamicHandle {
     , "" -> "1" //工商信息
     , "" -> "2" // 企业失信被执
     , "" -> "3" // 企业股东失信被执
-    , "" -> "4" // 经营异常
+    , "company_abnormal_info" -> "4" // 经营异常
     , "" -> "5" // 知识产权
     , "" -> "6" // 裁判文书
     , "" -> "7" // 法院公告

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -6,6 +6,7 @@ import org.apache.spark.sql.types.{DataType, StructField, StructType}
 import org.apache.spark.sql.{DataFrame, SparkSession}
 
 import scala.annotation.meta.getter
+import scala.collection.immutable.ListMap
 
 /**
  * π
@@ -144,7 +145,7 @@ trait LoggingUtils {
     spark.table(t).columns.seq
   }
 
-  def getSchema(map: Map[String, DataType]): StructType = {
+  def getSchema(map: ListMap[String, DataType]): StructType = {
     StructType(map.map(e => StructField(e._1, e._2)).toArray)
   }