浏览代码

Merge remote-tracking branch 'origin/master'

lyb 4 年之前
父节点
当前提交
6eb59bbb02
共有 53 个文件被更改,包括 616 次插入304 次删除
  1. 109 0
      src/main/scala/com/winhc/bigdata/spark/jobs/JustiCase.scala
  2. 2 2
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala
  3. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company.scala
  4. 39 14
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala
  5. 132 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicForDayCount.scala
  6. 7 4
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala
  7. 3 11
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company.scala
  8. 2 3
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_abnormal_info.scala
  9. 4 5
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_annual_report_out_investment.scala
  10. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_bid_list.scala
  11. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_brief_cancel_announcement.scala
  12. 0 35
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_brief_cancel_announcement_result.scala
  13. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_check_info.scala
  14. 15 15
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_announcement_list.scala
  15. 12 12
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_open_announcement_list.scala
  16. 15 15
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_register_list.scala
  17. 3 4
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_dishonest_info.scala
  18. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_double_random_check_info.scala
  19. 4 2
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_employment.scala
  20. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_env_punishment.scala
  21. 22 30
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_equity_info.scala
  22. 3 4
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_finance.scala
  23. 29 12
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_holder.scala
  24. 5 6
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_illegal_info.scala
  25. 3 3
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_judicial_sale_combine_list.scala
  26. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_announcement.scala
  27. 2 2
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_mortgage.scala
  28. 2 2
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_publicity.scala
  29. 2 2
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_transfer.scala
  30. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license.scala
  31. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_creditchina.scala
  32. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_entpub.scala
  33. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_liquidating_info.scala
  34. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_mortgage_info.scala
  35. 1 3
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_own_tax.scala
  36. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_public_announcement2_list.scala
  37. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info.scala
  38. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info_creditchina.scala
  39. 2 2
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_send_announcement_list.scala
  40. 2 9
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_staff.scala
  41. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_stock_announcement.scala
  42. 21 23
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_tax_contravention.scala
  43. 11 11
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_final_case.scala
  44. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_restrict.scala
  45. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/wenshu_detail_combine.scala
  46. 74 29
      src/main/scala/com/winhc/bigdata/spark/jobs/message/IntellectualMessage.scala
  47. 5 2
      src/main/scala/com/winhc/bigdata/spark/test/Justicase.scala
  48. 1 1
      src/main/scala/com/winhc/bigdata/spark/test/TestChangeExtract.scala
  49. 12 5
      src/main/scala/com/winhc/bigdata/spark/test/TestCompanyDynamic.scala
  50. 7 0
      src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala
  51. 13 17
      src/main/scala/com/winhc/bigdata/spark/utils/AsyncExtract.scala
  52. 9 0
      src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala
  53. 25 1
      src/main/scala/com/winhc/bigdata/spark/utils/DateUtils.scala

+ 109 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/JustiCase.scala

@@ -0,0 +1,109 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.graphx._
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+import com.winhc.bigdata.spark.utils.BaseUtil.{BKDRHash, isWindows}
+import org.apache.spark.sql.types.{LongType, MapType, StringType, StructField, StructType}
+import com.winhc.bigdata.spark.udf.BaseFunc
+import org.apache.spark.sql.functions.col
+
+case class JustiCase(s: SparkSession,
+                     project: String, //表所在工程名
+                     tableName: String, //表名(不加前后辍)
+                     fromCol: String, //边的起点列名
+                     toCol: String //边的终点列名
+                    ) extends LoggingUtils with Logging  with BaseFunc{
+  @(transient@getter) val spark: SparkSession = s
+  justicase_ops()
+
+  def calc(): Unit = {
+    val allCols = getColumns(s"$project.ods_$tableName").toSet
+    val ods_ds = BaseUtil.getPartion(s"$project.ods_$tableName", spark)
+    val inc_ods_ds = BaseUtil.getPartion(s"$project.inc_ods_$tableName", spark)
+    val dfRelations = sql(
+      s"""
+         |SELECT  *
+         |FROM    $project.ods_$tableName
+         |WHERE   ds=${ods_ds} AND ${toCol} != NULL
+         |UNION
+         |SELECT  *
+         |FROM    $project.inc_ods_$tableName
+         |WHERE   ds=${inc_ods_ds} AND ${toCol} != NULL
+         |""".stripMargin)
+    val edgeRDD = dfRelations.select(allCols.map(column => col(column).cast("string")).toSeq: _*).rdd.flatMap(r => {
+      val case_no_from = r.getAs[String](fromCol)
+      val case_no_tos = r.getAs[String](toCol)
+      //      val allColsMap = allCols.map(f => (f, r.getAs[String](f))).toMap
+      val from = BKDRHash(case_no_from)
+      var edges: Set[Edge[String]] = Set[Edge[String]]()
+      for (each <- case_no_tos.split(",")) {
+        val to = BKDRHash(each)
+        edges += Edge(from, to, "1")
+      }
+      edges
+    })
+    // 根据边构造图
+    val graph: Graph[String, String] = Graph.fromEdges(edgeRDD, defaultValue = "")
+
+    // 获取连通分量
+    val connetedGraph: Graph[VertexId, String] = graph.connectedComponents()
+
+    // 将同一连通分量中各个边聚合,经过处理形成打平的(case_no->司法案件id)并与原表join补全信息
+    val tripleRDD = connetedGraph.triplets.map(t => (t.srcAttr, Set((t.dstId, t.attr))))
+      .reduceByKey(_ ++ _)
+      .flatMap(r => {
+        val ss = Set((r._1, "0")) ++ r._2
+        val justicase_id = BKDRHash(ss.map(_._1).toSeq.sorted.mkString(","))
+        var mp: Map[Long, Map[String, String]] = Map()
+        ss.map(r => {
+          mp ++ Map(r._1 -> Map("justicase_id"->justicase_id.toString))
+        })
+        mp
+      }).map(r=>{
+      Row(r._1,r._2("justicase_id"))
+    })
+    val schemaJust = StructType(Array(
+      StructField(toCol,StringType),
+      StructField("justicase_id",StringType)
+    ))
+    val dfJust = spark.createDataFrame(tripleRDD, schemaJust)
+    dfJust.join(dfRelations,"case_no")//有边的case_no补全信息
+      .union(sql(//孤立的case_no
+        s"""
+           |SELECT  get_justicase_id(CASE_NO) AS justicase_id, *
+           |FROM    $project.ods_$tableName
+           |WHERE   ds=${ods_ds}  AND ${toCol} == NULL
+           |UNION
+           |SELECT  *
+           |FROM    $project.inc_ods_$tableName
+           |WHERE   ds=${inc_ods_ds} AND ${toCol} == NULL
+           |""".stripMargin))//.show(100)
+      .createOrReplaceTempView(s"tmp_graphx_$tableName")
+
+    val ds ="20200802"
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.inc_ads_${tableName}_graphx PARTITION(ds='$inc_ods_ds')
+         |SELECT *
+         |FROM
+         |    tmp_graphx_$tableName
+         |""".stripMargin)
+  }
+}
+
+object JustiCase {
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    JustiCase(spark, "winhc_eci_dev", "wenshu_detail", "CONNECT_CASE_NO", "CASE_NO").calc()
+  }
+}

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -346,8 +346,8 @@ object ChangeExtract {
 
     , Args(tableName = "company_equity_info", primaryKey = "id", primaryFields = "reg_number", isCopy = false)
     , Args(tableName = "company_staff", primaryFields = "staff_type")
-    //公司名称,法人ID:人标识或公司标识,公司类型,注册地址,营业期限终止日期,经营范围,登记机关,企业状态                 ,注册资本,实收资本金额(单位:分),注销日期,注销原因
-    , Args(tableName = "company", primaryKey = "cid", primaryFields = "name,legal_entity_id,company_org_type,reg_location,to_time,business_scope,reg_institute,reg_status,reg_capital,actual_capital_amount,cancel_date,cancel_reason")
+    //公司名称,法人ID:人标识或公司标识,公司类型,注册地址,营业期限终止日期,经营范围,登记机关,企业状态                 ,注册资本,注销日期,注销原因
+    , Args(tableName = "company", primaryKey = "cid", primaryFields = "name,legal_entity_id,company_org_type,reg_location,to_time,business_scope,reg_institute,reg_status,reg_capital,cancel_date,cancel_reason")
     , Args(tableName = "company_illegal_info", primaryFields = "remove_reason")
     , Args(tableName = "company_finance", primaryFields = "round")
     , Args(tableName = "company_dishonest_info", primaryFields = "case_no")

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company.scala

@@ -13,7 +13,7 @@ import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
 case class company(equCols: Seq[String]) extends CompanyChangeHandle with Serializable {
   override def getCid(rowkey: String, newMap: Map[String, String]): String = rowkey
 
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = "1"
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = null
 
   override def getBizTime(newMap: Map[String, String]): String = newMap("update_time")
 

+ 39 - 14
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -3,9 +3,10 @@ package com.winhc.bigdata.spark.jobs.dynamic
 import java.util.Date
 
 import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.jobs.message.IntellectualMessage
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
-import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{AsyncExtract, LoggingUtils, SparkUtils}
 import org.apache.commons.lang3.time.DateFormatUtils
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.types.StringType
@@ -143,7 +144,7 @@ object CompanyDynamic {
           None
         }
         else {
-          result.map(res => Row(CompanyDynamicHandleUtils.getDynamicId(cid, res._4, res._7, res._6), res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, res._9, res._10, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
+          result.map(res => Row(CompanyDynamicHandleUtils.getDynamicId(res._1, res._4, res._7, res._6), res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, res._9, res._10, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
         }
       })
 
@@ -177,15 +178,15 @@ object CompanyDynamic {
   }
 
   private val startArgs = Seq(
-    Args(tableName = "company_abnormal_info", bName = 0)
+    Args(tableName = "company_abnormal_info", bName = 1)
     , Args(tableName = "company_equity_info")
-    , Args(tableName = "company_staff", bName = 0)
+    , Args(tableName = "company_staff", bName = 1)
     , Args(tableName = "company", bName = 0)
     , Args(tableName = "company_zxr_list", bName = 0)
     , Args(tableName = "bankruptcy_open_case", bName = 1)
-    , Args(tableName = "company_illegal_info", bName = 0)
+    , Args(tableName = "company_illegal_info", bName = 1)
     , Args(tableName = "company_land_publicity", bName = 1)
-    , Args(tableName = "company_employment", bName = 1)
+    , Args(tableName = "company_employment", bName = 1, aggs = 1)
     , Args(tableName = "company_land_announcement", bName = 1)
     , Args(tableName = "company_bid_list", bName = 2)
     , Args(tableName = "company_land_transfer", bName = 1)
@@ -216,14 +217,16 @@ object CompanyDynamic {
     , Args(tableName = "company_judicial_sale_combine_list", bName = 1) //司法拍卖
     , Args(tableName = "company_tax_contravention", bName = 1) //税收违法
     , Args(tableName = "wenshu_detail_combine", bName = 1) //裁判文书
-    , Args(tableName = "company_holder", bName = 1) //裁判文书
+    , Args(tableName = "company_holder", bName = 1) //股东
     , Args(tableName = "company_annual_report_out_investment", bName = 1) //裁判文书
     , Args(tableName = "company_own_tax", bName = 1) //欠税公告
+    , Args(tableName = "intellectual", bName = 1, aggs = 2)//知识产权
   )
 
   private case class Args(project: String = "winhc_eci_dev"
                           , tableName: String
-                          , bName: Int = 1)
+                          , bName: Int = 1
+                          , aggs: Int = 0)
 
   def main(args: Array[String]): Unit = {
 
@@ -254,20 +257,42 @@ object CompanyDynamic {
     val cd = CompanyDynamicUtil(spark, project, ds)
     cd.init()
 
-    if (tableNames.equals("all")) {
+    var start = startArgs
+    if (!tableNames.equals("all")) {
+      val set = tableNames.split(",").toSet
+      start = start.filter(a => set.contains(a.tableName))
+    }
+
+    val a = start.map(e => (e.tableName, () => {
+      e.aggs match {
+        case 1 => CompanyDynamicForDayCount(spark, project, ds).calc(e.tableName, e.bName)//招聘
+        case 2 => IntellectualMessage(spark, project).calc()//知识产权
+        case _ => cd.calc(e.tableName, e.bName)//通用处理
+      }
+      true
+    }))
+
+    AsyncExtract.startAndWait(spark, a)
+
+
+    /*if (tableNames.equals("all")) {
       startArgs.foreach(e => {
         cd.calc(e.tableName, e.bName)
       })
     } else {
       val ts = tableNames.split(",").toSet
 
-      startArgs.filter(e => {
-        ts.contains(e.tableName)
-      }).foreach(e => {
+    startArgs.filter(e => {
+      ts.contains(e.tableName)
+    }).foreach(e => {
+      if(e.tableName == "company_employment"){
+        val cdf = CompanyDynamicForDayCount(spark, project, ds)
+        cdf.calc(e.tableName, e.bName)
+      }else{
         cd.calc(e.tableName, e.bName)
-      })
-    }
+      }
 
+    })*/
     spark.stop()
   }
 }

+ 132 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicForDayCount.scala

@@ -0,0 +1,132 @@
+package com.winhc.bigdata.spark.jobs.dynamic
+
+import java.util.Date
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamic.{env, targetTab}
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.LoggingUtils
+import com.winhc.bigdata.spark.utils.ReflectUtils.getClazz
+import org.apache.commons.lang3.time.DateFormatUtils
+import org.apache.spark.internal.Logging
+import org.apache.spark.sql.types.StringType
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.immutable.ListMap
+
+case class CompanyDynamicForDayCount(s: SparkSession,
+                                     project: String, //表所在工程名
+                                     ds: String //此维度主键
+
+                                    ) extends LoggingUtils with Logging {
+  @(transient@getter) val spark: SparkSession = s
+
+
+  def init(): Unit = {
+    sql(
+      s"""
+         |CREATE TABLE IF NOT EXISTS ${getEnvProjectName(env, project)}.$targetTab
+         |(
+         |    cid  STRING COMMENT '公司id'
+         |    ,cname  STRING COMMENT '公司name'
+         |    ,info_type STRING COMMENT '变更分类,大类'
+         |    ,rta_desc STRING COMMENT '变更信息描述,变更标题'
+         |    ,change_content STRING COMMENT '变更内容'
+         |    ,change_time STRING COMMENT '变更时间'
+         |    ,biz_id STRING COMMENT '业务id,数据行id'
+         |    ,sub_info_type STRING COMMENT '变更小类,表名'
+         |    ,info_risk_level STRING COMMENT '变更风险等级'
+         |    ,winhc_suggest STRING COMMENT '提示信息'
+         |    ,create_time STRING COMMENT '创建时间'
+         |)
+         |COMMENT '企业动态输出表'
+         |PARTITIONED BY (ds STRING COMMENT '分区',tn STRING COMMENT '表名')
+         |LIFECYCLE 30
+         |""".stripMargin)
+  }
+
+  //表名(不加前后辍)
+  def calc(tableName: String
+           , bName: Int = 0 //是否补充cname字段
+          ): Unit = {
+    val handle = getClazz[CompanyDynamicHandle](s"com.winhc.bigdata.spark.jobs.dynamic.tables.$tableName")
+
+    val types = handle.org_type()
+    val colsExclusiveSome = spark.table(s"${project}.ads_change_extract").columns.filter(s => {
+      !s.equals("cid") && !s.equals("data") && !s.equals("old_data") && !s.equals("ds") && !s.equals("tn")
+    }).seq
+    val rdd = sql(
+      bName match {
+        //默认:无需补全cname字段
+        case 0 =>
+          s"""
+             |SELECT  cid,count(1) AS cnt,biz_date,null AS cname
+             |FROM    ${project}.ads_change_extract
+             |WHERE   ds = '$ds'
+             |AND     tn = '$tableName'
+             |AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+             |GROUP BY CID
+             |""".stripMargin
+        //需根据cid补全cname字段数据
+        case 1 =>
+          s"""
+             |SELECT A.cid as cid,count(1) AS cnt,B.cname AS cname
+             |FROM(
+             |  SELECT  *
+             |  FROM    ${project}.ads_change_extract
+             |  WHERE   ds = '$ds'
+             |  AND     tn = '$tableName'
+             |  AND     TYPE in (${types.map("'" + _ + "'").mkString(",")})
+             |) AS A
+             |LEFT JOIN (
+             |    SELECT cid,cname FROM  $project.base_company_mapping
+             |    WHERE ds = '${getLastPartitionsOrElse(project + ".base_company_mapping", "0")}'
+             |) AS B
+             |ON A.cid = B.cid
+             |GROUP BY A.CID,B.cname
+             |""".stripMargin
+      })
+      .rdd.flatMap(r => {
+      val cid = r.getAs[String]("cid")
+      val biz_date = ds.substring(0, 4) + "-" + ds.substring(4, 6) + "-" + ds.substring(6) + " 00:00:00"
+      val cnt = r.getAs[Long]("cnt")
+      val cname = r.getAs[String]("cname")
+      val new_map = Map("cnt" -> (cnt + ""))
+
+      val result = handle.handle(cid + biz_date, biz_date, cid, null, null, new_map, cname)
+      if (result == null) {
+        None
+      }
+      else {
+        result.map(res => Row(CompanyDynamicHandleUtils.getDynamicId(cid, res._4, res._7, res._6), res._1, res._2, res._3, res._4, res._5, res._6, res._7, res._8, res._9, res._10, DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")))
+      }
+    })
+
+    val schema = getSchema(ListMap(
+      "id" -> StringType
+      , "cid" -> StringType
+      , "cname" -> StringType
+      , "info_type" -> StringType
+      , "rta_desc" -> StringType
+      , "change_content" -> StringType
+      , "change_time" -> StringType
+      , "biz_id" -> StringType
+      , "sub_info_type" -> StringType
+      , "info_risk_level" -> StringType
+      , "winhc_suggest" -> StringType
+      , "create_time" -> StringType
+    ))
+    spark.createDataFrame(rdd, schema)
+      .createOrReplaceTempView("company_dynamic_tmp" + tableName)
+
+    val cols = getColumns(s"$project.$targetTab").filter(!_.equals("ds")).filter(!_.equals("tn"))
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${getEnvProjectName(env, project)}.$targetTab PARTITION(ds='$ds',tn='$tableName')
+         |SELECT ${cols.mkString(",")}
+         |FROM
+         |    company_dynamic_tmp$tableName
+         |""".stripMargin)
+  }
+}

+ 7 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -7,6 +7,7 @@ package com.winhc.bigdata.spark.jobs.dynamic
  */
 trait CompanyDynamicHandle {
 
+  //废弃
   private val table_2_sub_info_type_map = Map(
     "CompanyDynamicHandleTest" -> "MyTest"
     , "company" -> "eci_detail" //工商信息
@@ -60,6 +61,7 @@ trait CompanyDynamicHandle {
     , "company_holder" -> "company_holder" //行政许可
   )
 
+  //废弃
   private val table_2_info_type = Map(
     "CompanyDynamicHandleTest" -> "0"
     , "company" -> "1" //工商信息
@@ -147,6 +149,7 @@ trait CompanyDynamicHandle {
     , "" -> "2" //对外投资企业注销/吊销/经营异常
     , "" -> "2" //分支机构注销/吊销/经营异常
     , "" -> "2" //新闻舆论(中立、消极)
+    , "company_staff" -> "2" //主要成员
     , "company_holder" -> "2" //股东信息
     , "company_finance" -> "2" //融资
     , "" -> "1" //增资
@@ -204,7 +207,7 @@ trait CompanyDynamicHandle {
       , get_biz_id(rowkey)
       , get_sub_info_type()
       , get_info_risk_level(old_map, new_map)
-      , if (suggestion == null) "被监控企业流动资金紧张,可能存在经营困难的情况。建议立即与被监控企业书面对账,适当催促其履行债务并持续监控。" else suggestion
+      , if (suggestion == null) null else suggestion
     ))
   }
 
@@ -230,7 +233,7 @@ trait CompanyDynamicHandle {
    *
    * @return
    */
-  protected def get_info_type(): String = table_2_info_type(getClass.getSimpleName)
+  protected def get_info_type(): String = getClass.getSimpleName //table_2_info_type(getClass.getSimpleName)
 
   /**
    * 变更内容
@@ -239,7 +242,7 @@ trait CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String
+  protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = null
 
   /**
    * 变更时间
@@ -268,7 +271,7 @@ trait CompanyDynamicHandle {
    *
    * @return
    */
-  protected def get_sub_info_type(): String = table_2_sub_info_type_map(getClass.getSimpleName)
+  protected def get_sub_info_type(): String = null //table_2_sub_info_type_map(getClass.getSimpleName)
 
   /**
    * 风险等级

+ 3 - 11
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company.scala

@@ -19,7 +19,7 @@ case class company() extends CompanyDynamicHandle {
     , "reg_institute" -> "登记机关"
     , "reg_status" -> "企业状态"
     , "reg_capital" -> "注册资本"
-    , "actual_capital_amount" -> "实收资本"
+//    , "actual_capital_amount" -> "实收资本"
   )
 
 
@@ -82,12 +82,12 @@ case class company() extends CompanyDynamicHandle {
         , new_map("name")
         , super.get_info_type()
         , s"${change_type_map(e)}发生变更"
-        , s"""{"变更后内容": ${new_map(e).getOrNull()},"变更事项": ${change_type_map(e).getOrNull()},"变更日期": "$bizDate","变更前内容": ${old_map(e).getOrNull()}}"""
+        , s"""{"content_after": ${new_map(e).getOrNull()},"change_item": ${change_type_map(e).getOrNull()},"change_time": "$bizDate","content_before": ${old_map(e).getOrNull()}}"""
         , bizDate
         , rowkey
         , super.get_sub_info_type()
         , v
-        , "建议"
+        , null
       )
     })
   }
@@ -101,12 +101,4 @@ case class company() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = null
 
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = null
 }

+ 2 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_abnormal_info.scala

@@ -1,7 +1,6 @@
 package com.winhc.bigdata.spark.jobs.dynamic.tables
 
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
-import com.winhc.bigdata.spark.implicits.MapHelper._
 
 //经营异常
 case class company_abnormal_info() extends CompanyDynamicHandle {
@@ -14,13 +13,13 @@ case class company_abnormal_info() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.getOrElse("put_reason", null)
 
-  /**
+  /*
    * 变更内容
    *
    * @param old_map
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map.toJson(Seq("put_department->做出决定机关", "remove_department->移出决定机关", "put_reason->列入经营异常目录原因", "put_date->列入日期", "remove_date->移出日期", "remove_reason->移出经营异常目录原因"))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map.toJson(Seq("put_department->做出决定机关", "remove_department->移出决定机关", "put_reason->列入经营异常目录原因", "put_date->列入日期", "remove_date->移出日期", "remove_reason->移出经营异常目录原因"))
 
 }

+ 4 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_annual_report_out_investment.scala

@@ -1,7 +1,6 @@
 package com.winhc.bigdata.spark.jobs.dynamic.tables
 
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
-import com.winhc.bigdata.spark.implicits.MapHelper._
 /**
  * @Author: XuJiakai
  * @Date: 2020/8/21 13:44
@@ -15,17 +14,17 @@ case class company_annual_report_out_investment() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = s"新增一家对外投资:${new_map.getOrElse("out_investment_name", "")}"
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = s"新增一家对外投资:${new_map.getOrElse("out_investment_name", "未知")}"
 
-  /**
+  /*
    * 变更内容
    *
    * @param old_map
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(
+ /* override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(
     Seq("out_investment_cid", "out_investment_name", "reg_number", "credit_code"
     )
-  )
+  )*/
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_bid_list.scala

@@ -41,7 +41,7 @@ case class company_bid_list() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "2"
 
   /**
    *

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_brief_cancel_announcement.scala

@@ -44,5 +44,5 @@ case class company_brief_cancel_announcement() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "高风险信息"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "4"
 }

+ 0 - 35
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_brief_cancel_announcement_result.scala

@@ -1,35 +0,0 @@
-package com.winhc.bigdata.spark.jobs.dynamic.tables
-
-import com.winhc.bigdata.spark.implicits.MapHelper._
-import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
-
-case class company_brief_cancel_announcement_result()  extends CompanyDynamicHandle {
-  /**
-    * 信息描述
-    *
-    * @param old_map
-    * @param new_map
-    * @return
-    */
-  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.getOrElse("name", null)
-
-
-  /**
-    * 风险等级
-    *
-    * @param old_map
-    * @param new_map
-    * @return
-    */
-  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "高风险"
-
-  /**
-    * 变更内容
-    *
-    * @param old_map
-    * @param new_map
-    * @return
-    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq("cname->公司名称", "brief_cancel_result->简易注销结果", "announcement_apply_date->公告申请日期"))
-
-}

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_check_info.scala

@@ -39,7 +39,7 @@ case class company_check_info() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "2"
 }

+ 15 - 15
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_announcement_list.scala

@@ -42,21 +42,21 @@ case class company_court_announcement_list() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
-    "announcement_type_code->公告类型"
-    , "plaintiff->原告"
-    , "bltn_no->公告号"
-    , "court_name->法院名"
-    , "deal_grade->处理等级名称"
-    , "litigant->当事人"
-    , "judge->法官"
-    , "province->省份"
-    , "judge_phone->法官电话"
-    , "case_no->案件号"
-    , "content->案件内容"
-    , "publish_page->刊登版面"
-    , "publish_date->刊登日期"
-  ))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+//    "announcement_type_code->公告类型"
+//    , "plaintiff->原告"
+//    , "bltn_no->公告号"
+//    , "court_name->法院名"
+//    , "deal_grade->处理等级名称"
+//    , "litigant->当事人"
+//    , "judge->法官"
+//    , "province->省份"
+//    , "judge_phone->法官电话"
+//    , "case_no->案件号"
+//    , "content->案件内容"
+//    , "publish_page->刊登版面"
+//    , "publish_date->刊登日期"
+//  ))
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     val oldcname = new_map("cname")

+ 12 - 12
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_open_announcement_list.scala

@@ -42,18 +42,18 @@ case class company_court_open_announcement_list() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
-    "case_reason->案由"
-    , "case_no->案号"
-    , "start_date->开庭时间"
-    , "area->地区"
-    , "plan_date->排期日期"
-    , "judge->审判长/主审人"
-    , "litigant->当事人"
-    , "court->法院"
-    , "court_room->法庭"
-    , "content->公告内容"
-  ))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+//    "case_reason->案由"
+//    , "case_no->案号"
+//    , "start_date->开庭时间"
+//    , "area->地区"
+//    , "plan_date->排期日期"
+//    , "judge->审判长/主审人"
+//    , "litigant->当事人"
+//    , "court->法院"
+//    , "court_room->法庭"
+//    , "content->公告内容"
+//  ))
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     val oldcname = new_map("cname")

+ 15 - 15
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_register_list.scala

@@ -42,21 +42,21 @@ case class company_court_register_list() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
-    "case_reason->案由"
-    ,"case_no->案号"
-    ,"filing_date->立案日期"
-    ,"start_time->开庭时间"
-    ,"department->承办部门"
-    ,"court->法院"
-    ,"judge->承办法官"
-    ,"assistant->法官助理"
-    ,"case_type->案件类型"
-    ,"case_status->案件状态"
-    ,"plaintiff->上诉人"
-    ,"defendant->被上诉人"
-    ,"third->第三人"
-  ))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+//    "case_reason->案由"
+//    ,"case_no->案号"
+//    ,"filing_date->立案日期"
+//    ,"start_time->开庭时间"
+//    ,"department->承办部门"
+//    ,"court->法院"
+//    ,"judge->承办法官"
+//    ,"assistant->法官助理"
+//    ,"case_type->案件类型"
+//    ,"case_status->案件状态"
+//    ,"plaintiff->上诉人"
+//    ,"defendant->被上诉人"
+//    ,"third->第三人"
+//  ))
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     val oldcname = new_map("cname")

+ 3 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_dishonest_info.scala

@@ -1,7 +1,6 @@
 package com.winhc.bigdata.spark.jobs.dynamic.tables
 
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
-import com.winhc.bigdata.spark.implicits.MapHelper._
 
 /**
  * @Author: XuJiakai
@@ -18,14 +17,14 @@ case class company_dishonest_info() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = s"新增1条企业失信信息:${new_map.getOrElse("court", "")}"
 
-  /**
+  /*
    * 变更内容
    *
    * @param old_map
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+  /*override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
     "name->失信人名称"
     , "age->年龄"
     , "sexy->性别"
@@ -49,5 +48,5 @@ case class company_dishonest_info() extends CompanyDynamicHandle {
     , "pub_date->发布时间"
     , "lawsuit_url"
     , "appro_time->与官网核准的时间"
-  ))
+  ))*/
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_double_random_check_info.scala

@@ -40,7 +40,7 @@ case class company_double_random_check_info() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     "2"

+ 4 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_employment.scala

@@ -15,7 +15,7 @@ case class company_employment() extends CompanyDynamicHandle {
     * @param new_map
     * @return
     */
-  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.getOrElse("name", null)
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = "新增"+new_map.getOrElse("cnt", "")+"条招聘信息"
 
 
   /**
@@ -34,5 +34,7 @@ case class company_employment() extends CompanyDynamicHandle {
     * @param new_map
     * @return
     */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq("city->地区", "description->招聘描述", "source->招聘来源", "employ_num->招聘人数", "start_date->职位发布日期", "ori_salary->薪资", "education->学历", "experience->工作经验", "title->职位"))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq("city->地区", "description->招聘描述", "source->招聘来源", "employ_num->招聘人数", "start_date->职位发布日期", "ori_salary->薪资", "education->学历", "experience->工作经验", "title->职位"))
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_env_punishment.scala

@@ -42,5 +42,5 @@ case class company_env_punishment()extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "警示信息"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "3"
 }

+ 22 - 30
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_equity_info.scala

@@ -1,7 +1,6 @@
 package com.winhc.bigdata.spark.jobs.dynamic.tables
 
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
-import com.winhc.bigdata.spark.implicits.MapHelper._
 
 /**
  * @Author: XuJiakai
@@ -32,27 +31,28 @@ case class company_equity_info() extends CompanyDynamicHandle {
     if ("1".equals(new_map("deleted"))) {
       return Seq.empty
     }
-    var list: Seq[(String, String, String)] = Seq((new_map("cid"), cname, "2")) //标的企业
+
+    var map = Map(new_map("cid") -> (cname, "2")) //标的企业
     if ("2".equals(new_map.getOrElse("pledgor_type", "0"))) {
-      list = list :+ (new_map("pledgor_id"), new_map("pledgor"), "3") //出质人
+      map = map ++ Map(new_map("pledgor_id") -> (new_map("pledgor"), "3")) //出质人
     }
     if ("2".equals(new_map.getOrElse("pledgee_type", "0"))) {
-      list = list :+ (new_map("pledgee_id"), new_map("pledgee"), "1") //质权人
+      map = map ++ Map(new_map("pledgee_id") -> (new_map("pledgee"), "1")) //质权人
     }
-
-    list.map(t => {
-      (t._1
-        , t._2
-        , get_info_type()
-        , get_rta_desc(old_map, new_map)
-        , get_change_content(old_map, new_map, cname)
-        , get_change_time(bizDate, new_map)
-        , get_biz_id(rowkey)
-        , get_sub_info_type()
-        , t._3
-        , "被监控企业流动资金紧张,可能存在经营困难的情况。建议立即与被监控企业书面对账,适当催促其履行债务并持续监控。"
-      )
-    }).seq
+    map.map(e => (e._1, e._2._1, e._2._2)).toSeq
+      .map(t => {
+        (t._1
+          , t._2
+          , get_info_type()
+          , get_rta_desc(old_map, new_map)
+          , get_change_content(old_map, new_map, cname)
+          , get_change_time(bizDate, new_map)
+          , get_biz_id(rowkey)
+          , get_sub_info_type()
+          , t._3
+          , null
+        )
+      }).seq
   }
 
   /**
@@ -64,25 +64,17 @@ case class company_equity_info() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = "新增1条股权出质信息"
 
-  /**
+  /*
    * 变更内容
    *
    * @param old_map
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
+  /*override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
+    import com.winhc.bigdata.spark.implicits.MapHelper._
     val json = new_map.toJson(Seq("equity_amount->出质股权数额", "state->出质状态", "reg_date->股权出质设立登记日期", "reg_number->质权登记编号"))
     json.substring(0, json.length - 1) +
       s""","出质股权标的企业": {"企业名称": "$cname","企业KeyNo": "${new_map("cid")}"},"出质人信息": {"证件号": "${new_map("certif_number_l")}","出质人": "${new_map("pledgor")}"},"质权人信息": {"质权人": "${new_map("pledgee")}","证件号": "${new_map("certif_number_r")}"}}"""
-  }
-
-  /**
-   * 风险等级
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = null
+  }*/
 }

+ 3 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_finance.scala

@@ -1,7 +1,6 @@
 package com.winhc.bigdata.spark.jobs.dynamic.tables
 
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
-import com.winhc.bigdata.spark.implicits.MapHelper._
 
 /**
  * @Author: XuJiakai
@@ -18,18 +17,18 @@ case class company_finance() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = s"获得了${new_map.getOrElse("round", "")}融资,由${new_map.getOrElse("inverstors", "")}投资"
 
-  /**
+  /*
    * 变更内容
    *
    * @param old_map
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+  /*override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
     "company_name->公司名称"
     , "finance_time->融资时间"
     , "money->融资金额"
     , "round->轮次"
     , "inverstors->投资人"
-  ))
+  ))*/
 }

+ 29 - 12
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_holder.scala

@@ -1,12 +1,11 @@
 package com.winhc.bigdata.spark.jobs.dynamic.tables
 
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
-import com.winhc.bigdata.spark.implicits.MapHelper._
 
 /**
  * @Author: XuJiakai
  * @Date: 2020/8/19 14:15
- * @Description:
+ * @Description: 股东
  */
 case class company_holder() extends CompanyDynamicHandle {
 
@@ -25,21 +24,39 @@ case class company_holder() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = s"股东及出资信息发生变化"
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    if (old_map == null) {
+      "insert"
+    } else {
+      "update"
+    }
+  }
 
-  /**
+  /*
    * 变更内容
    *
    * @param old_map
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
-    "holder_id"
-    , "holder_type"
-    , "amount"
-    , "capital"
-    , "capital_actual"
-    , "percent"
-  ))
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = {
+    var out = Map(
+      "after" -> new_map("percent")
+      , "holder_id" -> new_map("holder_id")
+      , "holder_type" -> new_map("holder_type")
+    )
+    if (old_map == null) {
+      out = out ++ Map(
+        "type" -> "insert"
+      )
+    } else {
+      out = out ++ Map(
+        "type" -> "update"
+        , "before" -> old_map("percent")
+      )
+    }
+    import com.winhc.bigdata.spark.implicits.MapHelper._
+    val keys = out.keys.toSeq
+    out.toJson(keys)
+  }
 }

+ 5 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_illegal_info.scala

@@ -1,7 +1,6 @@
 package com.winhc.bigdata.spark.jobs.dynamic.tables
 
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
-import com.winhc.bigdata.spark.implicits.MapHelper._
 
 /**
  * @Author: XuJiakai
@@ -27,22 +26,22 @@ case class company_illegal_info() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
     if (old_map == null)
-      return s"新增严重违法记录:${new_map.getOrElse("put_reason", "")}"
+      return s"新增严重违法记录:${new_map.getOrElse("put_reason", "未知")}"
 
     if (old_map.get("remove_date") == null && new_map("remove_date") != null)
-      return s"移除严重违法记录:${new_map.getOrElse("remove_reason", "")}"
+      return s"移除严重违法记录:${new_map.getOrElse("remove_reason", "未知")}"
 
     null
   }
 
-  /**
+  /*
    * 变更内容
    *
    * @param old_map
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+  /*override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
     "put_reason->列入原因"
     , "put_date->列入时间"
     , "remove_date->移除时间"
@@ -50,5 +49,5 @@ case class company_illegal_info() extends CompanyDynamicHandle {
     , "remove_reason->移除原因"
     , "remove_department->移除决定机关"
     , "type->类型"
-  ))
+  ))*/
 }

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_judicial_sale_combine_list.scala

@@ -41,9 +41,9 @@ case class company_judicial_sale_combine_list() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
-    "source_id->address"
-  ))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+//    "source_id->address"
+//  ))
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     "4"

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_announcement.scala

@@ -42,5 +42,5 @@ case class company_land_announcement() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "利好信息"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "1"
 }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_mortgage.scala

@@ -53,10 +53,10 @@ case class company_land_mortgage() extends CompanyDynamicHandle {
     if (mortType != null && !mortType.isEmpty) {
       //抵押权人
       if(mortType.equals("mortgagee") || mortType.equals("bothtwo")) {
-        "提示信息"
+        "2"
       }
       else{//抵押人
-        "警示信息"
+        "3"
       }
     }
     else{

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_publicity.scala

@@ -25,7 +25,7 @@ case class company_land_publicity() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map("location") + new_map("user_for")
+  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map("location") + new_map("use_for")
 
   /**
    * 变更时间
@@ -42,5 +42,5 @@ case class company_land_publicity() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "2"
 }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_transfer.scala

@@ -51,10 +51,10 @@ case class company_land_transfer() extends CompanyDynamicHandle {
     if (mortType != null && !mortType.isEmpty) {
       //现有土地使用权人
       if(mortType.equals("now") || mortType.equals("bothtwo")) {
-        "提示信息"
+        "2"
       }
       else{//原有土地使用权人
-        "警示信息"
+        "3"
       }
     }
     else{

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license.scala

@@ -42,7 +42,7 @@ case class company_license() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "2"
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_creditchina.scala

@@ -43,7 +43,7 @@ case class company_license_creditchina() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "2"
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_entpub.scala

@@ -42,7 +42,7 @@ case class company_license_entpub() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "2"
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_liquidating_info.scala

@@ -44,5 +44,5 @@ case class company_liquidating_info() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "高风险信息"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "4"
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_mortgage_info.scala

@@ -45,5 +45,5 @@ case class company_mortgage_info() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "警示信息"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "3"
 }

+ 1 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_own_tax.scala

@@ -19,11 +19,9 @@ case class company_own_tax() extends CompanyDynamicHandle {
   /**
    * 变更内容
    *
-   * @param old_map
-   * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map.toJson(Seq("tax_balance->欠税余额","tax_category->欠税税种","put_reason->列入经营异常目录原因","new_tax_balance->当前新发生的欠税余额","publish_date->发布日期","department->发布单位"))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map.toJson(Seq("tax_balance->欠税余额","tax_category->欠税税种","put_reason->列入经营异常目录原因","new_tax_balance->当前新发生的欠税余额","publish_date->发布日期","department->发布单位"))
 
 
   /**

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_public_announcement2_list.scala

@@ -47,7 +47,7 @@ case class company_public_announcement2_list()extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "2"
   /**
    *
    * @param rowkey

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info.scala

@@ -46,5 +46,5 @@ case class company_punishment_info() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "警示信息"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "3"
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info_creditchina.scala

@@ -46,5 +46,5 @@ case class company_punishment_info_creditchina() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "警示信息"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "3"
 }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_send_announcement_list.scala

@@ -48,10 +48,10 @@ case class company_send_announcement_list()extends CompanyDynamicHandle {
   override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     if(new_map("defendant_cids").contains("cid"))//原告
       {
-        "警示信息"
+        "3"
       }
     else {
-        "提示信息"
+        "2"
       }
   }
   /**

+ 2 - 9
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_staff.scala

@@ -25,14 +25,7 @@ case class company_staff() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq("hid","staff_type->职位"))
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq("hid", "staff_type"))
+
 
-  /**
-   * 风险等级
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "1"
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_stock_announcement.scala

@@ -45,5 +45,5 @@ case class company_stock_announcement()extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "2"
 }

+ 21 - 23
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_tax_contravention.scala

@@ -36,31 +36,29 @@ case class company_tax_contravention() extends CompanyDynamicHandle {
   /**
    * 变更内容
    *
-   * @param old_map
-   * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = {
-      new_map.toJson(Seq(
-        "taxpayer_name->纳税人名称"
-        , "taxpayer_number->纳税人识别号"
-        , "taxpayer_code->组织机构代码"
-        , "address->注册地址"
-        , "publish_time->发布日期"
-        , "case_type->案件性质"
-        , "department->所属税务机关"
-        , "case_info->主要违法事实"
-        , "legal_person_info->法定代表人或负责人"
-//        , s"${splitInfo(new_map("legal_person_info"))._2}->性别"
-//        , s"${splitInfo(new_map("legal_person_info"))._3}->证件名称"
-//        , s"${splitInfo(new_map("legal_person_info"))._4}->证件号码"
-        , "responsible_person_info->负有责任的财务负责人"
-//        , s"${splitInfo(new_map("legal_person_info"))._2}->性别"
-//        , s"${splitInfo(new_map("legal_person_info"))._3}->证件名称"
-//        , s"${splitInfo(new_map("legal_person_info"))._4}->证件号码"
-        , "responsible_department_info->负有直接责任的中介机构"
-      ))
-  }
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = {
+//      new_map.toJson(Seq(
+//        "taxpayer_name->纳税人名称"
+//        , "taxpayer_number->纳税人识别号"
+//        , "taxpayer_code->组织机构代码"
+//        , "address->注册地址"
+//        , "publish_time->发布日期"
+//        , "case_type->案件性质"
+//        , "department->所属税务机关"
+//        , "case_info->主要违法事实"
+//        , "legal_person_info->法定代表人或负责人"
+////        , s"${splitInfo(new_map("legal_person_info"))._2}->性别"
+////        , s"${splitInfo(new_map("legal_person_info"))._3}->证件名称"
+////        , s"${splitInfo(new_map("legal_person_info"))._4}->证件号码"
+//        , "responsible_person_info->负有责任的财务负责人"
+////        , s"${splitInfo(new_map("legal_person_info"))._2}->性别"
+////        , s"${splitInfo(new_map("legal_person_info"))._3}->证件名称"
+////        , s"${splitInfo(new_map("legal_person_info"))._4}->证件号码"
+//        , "responsible_department_info->负有直接责任的中介机构"
+//      ))
+//  }
 
   def splitInfo(s: String) = {
     if (StringUtils.isNotBlank(s) && s.replaceAll(":", ",").split(",").size == 4) {

+ 11 - 11
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_final_case.scala

@@ -42,17 +42,17 @@ case class company_zxr_final_case() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
-    "name->被执行人姓名"
-    ,"identity_num->身份证号码/组织机构代码"
-    ,"sex->性别"
-    ,"case_no->案号"
-    ,"court_name->执行法院"
-    ,"case_create_time->立案时间"
-    ,"case_final_time->终本日期"
-    ,"exec_amount->执行标的(元)"
-    ,"no_exec_amount->未履行金额"
-  ))
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+//    "name->被执行人姓名"
+//    ,"identity_num->身份证号码/组织机构代码"
+//    ,"sex->性别"
+//    ,"case_no->案号"
+//    ,"court_name->执行法院"
+//    ,"case_create_time->立案时间"
+//    ,"case_final_time->终本日期"
+//    ,"exec_amount->执行标的(元)"
+//    ,"no_exec_amount->未履行金额"
+//  ))
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     "4"

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_restrict.scala

@@ -47,5 +47,5 @@ case class company_zxr_restrict()extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "高风险"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "4"
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/wenshu_detail_combine.scala

@@ -50,7 +50,7 @@ case class wenshu_detail_combine() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
+//  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = ""
 
   override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
     val t1 = new_map("name_type")

+ 74 - 29
src/main/scala/com/winhc/bigdata/spark/jobs/message/IntellectualMessage.scala

@@ -1,8 +1,12 @@
 package com.winhc.bigdata.spark.jobs.message
 
+import java.util.Date
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandleUtils
 import com.winhc.bigdata.spark.udf.MapAggs
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{BaseUtil, DateUtils, LoggingUtils, SparkUtils}
+import org.apache.commons.lang3.time.DateFormatUtils
 import org.apache.spark.sql.SparkSession
 import org.json4s.DefaultFormats
 import org.json4s.jackson.Json
@@ -23,18 +27,18 @@ object IntellectualMessage {
 
     val config = mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
-      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
-    val spark = SparkUtils.InitEnv("CompanySummaryInc", config)
-        IntellectualMessage(spark, project, "").calc()
-//    IntellectualMessage(spark, project, "").transForm()
+    val spark = SparkUtils.InitEnv("IntellectualMessage", config)
+    IntellectualMessage(spark, project).calc()
+//    IntellectualMessage(spark, project).transForm()
     spark.stop()
   }
 
 }
 
 
-case class IntellectualMessage(s: SparkSession, project: String, sourceTable: String,
+case class IntellectualMessage(s: SparkSession, project: String,
                                runOld: Boolean = false) extends LoggingUtils {
 
   @(transient@getter) val spark: SparkSession = s
@@ -48,21 +52,37 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
   val t5 = s"company_certificate" //证书
   val t6 = s"company_copyright_works_list" //软著作权
   val res_tb = s"$project.tmp_xf_ads_intellectual_message" //聚合结果表
-  val res_tb_res = s"$project.tmp_xf_ads_intellectual_message_res" //转换输出结果表
+  //  val res_tb_res = s"$project.tmp_xf_ads_intellectual_message_res" //转换输出结果表
+  val res_tb_res = s"$project.ads_company_dynamic" //转换输出结果表
+
+  val tn = "intellectual"
 
-  val ds = BaseUtil.getPartion(s"$project.inc_ads_$t1", spark)
+  var ds = BaseUtil.getPartion(s"$project.inc_ads_$t1", spark)
   val remainDs = BaseUtil.getPartion(s"$project.ads_$t1", spark)
+  val mapDs = BaseUtil.getPartion(s"$project.base_company_mapping", spark)
 
   def col2Map(pkg: String, day: String): Map[String, String] = {
     Map(pkg -> day)
   }
 
   def calc(): Unit = {
+    println("start calc" + new Date())
     spark.udf.register("col2Map", col2Map _)
+    spark.udf.register("form_date", DateUtils.formatterDate _)
     spark.udf.register("MapAggs", new MapAggs())
 
     sql(
       s"""
+        |SELECT  cid as new_cid
+        |FROM    $project.ads_change_extract
+        |WHERE   ds = $ds
+        |AND     tn in ('$t1','$t2','$t3','$t4','$t5','$t6')
+        |AND     type = 'insert'
+        |GROUP by cid
+        |""".stripMargin).cache().createOrReplaceTempView("mapping")
+
+    sql(
+      s"""
          |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $res_tb PARTITION(ds=$ds)
          |select new_cid,date,m,
          |max(coalesce(split(m[1], ',')[2],'0')) over (partition by new_cid) cnt1,
@@ -103,21 +123,48 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
 
     //转换输出格式
     transForm()
-
+    println("end calc" + new Date())
   }
 
   def transForm(): Unit = {
+    println("start transForm" + new Date())
     sql(
       s"""
+         |select a.*,b.cname from
+         |(
          |select *
          |from ${res_tb}
-         |where ds = $ds
+         |where ds = $ds and date is not null
+         |)a
+         |left join
+         |(
+         |select cid,cname
+         |from
+         |$project.base_company_mapping
+         |where ds =$mapDs
+         |)b on a.cid = b.cid
          |""".stripMargin).map(r => {
       val cid = r.getAs[String]("cid")
-      val date = r.getAs[String]("date")
+      val cname = r.getAs[String]("cname")
+      val info_type = tn
+      val change_time = r.getAs[String]("date").concat(" 00:00:00")
+      val biz_id = s"${cid}"
+      val sub_info_type = ""
+      val info_risk_level = "2"
+      val winhc_suggest = ""
+      val create_time = DateFormatUtils.format(new Date(), "yyyy-MM-dd HH:mm:ss")
       val m = r.getAs[Map[String, String]](2)
-      val tags = descStr(m)
-      val content: String = Json(DefaultFormats).write(
+      val rta_desc = descStr(m)
+      var id = "-1"
+      try{
+        id = CompanyDynamicHandleUtils.getDynamicId(cid, "", biz_id, change_time)
+
+      }catch {
+        case ex:Exception => {
+          logError(ex.getMessage)
+        }
+      }
+      val change_content: String = Json(DefaultFormats).write(
         mutable.LinkedHashMap("商标数量" -> r.getAs[String]("cnt1"),
           "专利数量" -> r.getAs[String]("cnt2"),
           "域名数量" -> r.getAs[String]("cnt3"),
@@ -126,14 +173,20 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
           "软著数量" -> r.getAs[String]("cnt6")
         )
       )
-      (cid, date, tags, content)
-    }).toDF("cid", "date", "tags", "content").createOrReplaceTempView("res")
+
+      (id, cid, cname, info_type, rta_desc, change_content, change_time, biz_id,
+        sub_info_type, info_risk_level, winhc_suggest, create_time)
+    }).toDF("id", "cid", "cname", "info_type", "rta_desc", "change_content", "change_time", "biz_id",
+      "sub_info_type", "info_risk_level", "winhc_suggest", "create_time").createOrReplaceTempView("res")
 
     sql(
       s"""
-         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${res_tb_res} PARTITION (ds=$ds)
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${res_tb_res} PARTITION (ds='$ds',tn='$tn')
          |select * from res
+         |where id <> '-1'
          |""".stripMargin)
+
+    println("end transForm" + new Date())
   }
 
   def descStr(m: Map[String, String]) = {
@@ -163,7 +216,7 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
       s"""
          |(
          |SELECT  new_cid
-         |        ,SUBSTR(CAST($date AS STRING),1,10) AS DATE
+         |        ,form_date(SUBSTR(CAST($date AS STRING),1,10)) AS DATE
          |        ,sum(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid,$date) inc_cnt
          |        ,0 del_cnt
          |        ,sum(CASE WHEN deleted = 0 THEN 1 ELSE 0 END) OVER(PARTITION BY new_cid) total_cnt
@@ -197,7 +250,7 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
          |select * from
          |(
          |SELECT  new_cid
-         |        ,SUBSTR(CAST(date AS STRING),1,10) AS DATE
+         |        ,form_date(SUBSTR(CAST(date AS STRING),1,10)) AS DATE
          |        ,count(rowkey) OVER(PARTITION BY new_cid,date) inc_cnt
          |        ,0 del_cnt
          |        ,count(rowkey) OVER(PARTITION BY new_cid) total_cnt
@@ -205,15 +258,7 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
          |        ,f
          |FROM    (
          |            SELECT  b.*,"1" f
-         |            FROM    (
-         |                        SELECT  rowkey
-         |                                ,cid as new_cid
-         |                                ,biz_date as date
-         |                        FROM    $project.ads_change_extract
-         |                        WHERE   ds = $ds
-         |                        AND     tn = '$table'
-         |                        AND     type = 'insert'
-         |                    ) a
+         |            FROM    mapping a
          |            JOIN    (
          |                        SELECT  rowkey
          |                                ,new_cid
@@ -242,7 +287,7 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
          |                                ) d
          |                        WHERE   num = 1
          |                    ) b
-         |            ON      a.rowkey = b.rowkey
+         |            ON      a.new_cid = b.new_cid
          |            UNION ALL
          |            SELECT  rowkey
          |                    ,cid new_cid
@@ -254,7 +299,7 @@ case class IntellectualMessage(s: SparkSession, project: String, sourceTable: St
          |            AND     type = 'insert'
          |        ) e
          |)
-         |where f = "0"
+         |--where f = "0"
          |)
          |""".stripMargin
     }

+ 5 - 2
src/main/scala/com/winhc/bigdata/spark/test/Justicase.scala

@@ -25,11 +25,14 @@ object Justicase {
         ,"""{"case_no":"4","from":"3"}"""
 
         ,"""{"case_no":"8","from":"7"}"""
+        ,"""{"case_no":"7","from":"8"}"""
+
+        ,"""{"case_no":"101","from":"150"}"""
 
         ,"""{"case_no":"14","from":"13"}"""
         ,"""{"case_no":"13","from":"12"}"""
 
-        ,"""{"case_no":"19"}"""
+        ,"""{"case_no":"19","from":"12"}"""
 
         ,"""{"case_no":"28","from":"27"}"""
         ,"""{"case_no":"27","from":"26"}"""
@@ -57,7 +60,7 @@ object Justicase {
     val connetedGraph: Graph[VertexId, String] = graph.connectedComponents()
 
     // 将同一连通分量中各个边聚合
-    val tripleGroup: RDD[(VertexId, Set[VertexId])] = connetedGraph.triplets.map(t => (t.srcAttr, Set(t.dstId)))
+    val tripleGroup: RDD[(VertexId, Set[VertexId])] = connetedGraph.triplets.map(t => (t.srcId, Set(t.dstId)))
       .reduceByKey(_ ++ _)
 
     //逐一输出所有极大连通子图结果:起点Vertex,与其连通的Vertex集合。Vertex可以是案号形成的Long

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/test/TestChangeExtract.scala

@@ -86,7 +86,7 @@ object TestChangeExtract {
   }
 
   val seq = List(
-    "company_mortgage_info"
+    "company_own_tax"
     , "company_check_info"
     , "company_court_announcement_list"
     , "company_court_open_announcement_list"

+ 12 - 5
src/main/scala/com/winhc/bigdata/spark/test/TestCompanyDynamic.scala

@@ -45,21 +45,22 @@ object TestCompanyDynamic {
     , Args(tableName = "company_annual_report_out_guarantee", bName = 1)
     , Args(tableName = "company_zxr_restrict", bName = 1)
 
+    , Args(tableName = "company_own_tax", bName = 1) //终本案件
     , Args(tableName = "company_zxr_final_case", bName = 1) //终本案件
     , Args(tableName = "company_license_creditchina", bName = 1) //行政许可-信用中国
     , Args(tableName = "company_license_entpub", bName = 1) //行政许可-企业公示
     , Args(tableName = "company_license", bName = 1) //行政许可
     , Args(tableName = "company_check_info", bName = 1) //抽查检查
-    , Args(tableName = "company_court_announcement_list", bName = 1) //法院公告
-    , Args(tableName = "company_court_open_announcement_list", bName = 1) //开庭公告
-    , Args(tableName = "company_court_register_list", bName = 1) //立案信息
+    , Args(tableName = "company_court_announcement_list", bName = 2) //法院公告
+    , Args(tableName = "company_court_open_announcement_list", bName = 2) //开庭公告
+    , Args(tableName = "company_court_register_list", bName = 2) //立案信息
     , Args(tableName = "company_double_random_check_info", bName = 1) //双随机抽查
     , Args(tableName = "company_judicial_sale_combine_list", bName = 1) //司法拍卖
     , Args(tableName = "company_tax_contravention", bName = 1) //税收违法
     , Args(tableName = "wenshu_detail_combine", bName = 1) //裁判文书
   )
   val seq = List(
-    "company_mortgage_info"
+      "company_own_tax"
     , "company_check_info"
     , "company_court_announcement_list"
     , "company_court_open_announcement_list"
@@ -76,6 +77,8 @@ object TestCompanyDynamic {
 
   def main(args: Array[String]): Unit = {
 
+    val ds = args(0)
+    println(s"$ds")
     val project = "winhc_eci_dev"
 
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
@@ -83,7 +86,11 @@ object TestCompanyDynamic {
       "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
     val spark = SparkUtils.InitEnv("CompanyDynamic", config)
-    TestCompanyDynamic(spark, project, "20200817").calc()
+    val s = spark.sparkContext.statusTracker
+      s.getExecutorInfos.map(x=>{
+      x.numRunningTasks()
+    })
+    TestCompanyDynamic(spark, project, ds).calc()
     spark.stop()
   }
 

+ 7 - 0
src/main/scala/com/winhc/bigdata/spark/udf/BaseFunc.scala

@@ -9,6 +9,7 @@ import org.json4s._
 import org.json4s.jackson.JsonMethods._
 
 import scala.annotation.meta.getter
+import com.winhc.bigdata.spark.utils.BaseUtil._
 
 /**
  * @Author: XuJiakai
@@ -140,4 +141,10 @@ trait BaseFunc {
       }
     })
   }
+  def justicase_ops() : Unit = {
+    spark.udf.register("get_justicase_id", (case_nos: String) => {
+      BKDRHash(case_nos.split(",").sorted.mkString(","))
+    })
+  }
+
 }

+ 13 - 17
src/main/scala/com/winhc/bigdata/spark/utils/AsyncExtract.scala

@@ -25,18 +25,6 @@ object AsyncExtract {
 
   def wait(spark: SparkSession): Unit = {
     val tracker = spark.sparkContext.statusTracker
-    var i = 0
-    while (!tracker.getActiveJobIds().nonEmpty && i < 100) {
-      i = i + 1
-      println("await job 。。。")
-      Thread.sleep(10000)
-    }
-    println(tracker.getActiveJobIds().mkString(","))
-    while (tracker.getActiveJobIds().nonEmpty) {
-      println(tracker.getActiveJobIds().mkString(","))
-      println("spark is not stop ! ")
-      Thread.sleep(10000)
-    }
 
     val ints = tracker.getJobIdsForGroup(null)
     println(ints.mkString(","))
@@ -46,6 +34,7 @@ object AsyncExtract {
       .filter(_ != null)
       .map(i => i.status()).exists(i => JobExecutionStatus.FAILED.equals(i))
     if (failed) {
+      println("There are failed jobs !!!")
       sys.exit(-999)
     }
   }
@@ -57,13 +46,20 @@ case class AsyncExtract(seq: Seq[(String, () => Boolean)]) extends Logging {
     val latch = new CountDownLatch(seq.length)
     seq.foreach(e => {
       asyncWatch(e._1, () => {
-        println("______________________________" + e._1 + "___________________________________")
-        val res = e._2()
-        println(s"---${e._1}---------$res-------------")
-        latch.countDown()
+        try {
+          println("______________________________" + e._1 + "___________________________________")
+          val res = e._2()
+          println(s"---${e._1}---------$res-------------")
+        } catch {
+          case ex: Exception => {
+            ex.printStackTrace()
+          }
+        } finally {
+          latch.countDown()
+        }
       })
     })
-    //    latch.await()
+    latch.await()
   }
 
   private def asyncWatch[T](name: String, f: () => Unit): Unit = {

+ 9 - 0
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -115,6 +115,15 @@ object BaseUtil {
       ""
     }
   }
+  def BKDRHash(str: String): Long = {
+    val seed: Long = 131 // 31 131 1313 13131 131313 etc..
+    var hash: Long = 0
+    for (i <- 0 to str.length - 1) {
+      hash = hash * seed + str.charAt(i)
+      hash = hash & 0x7FFFFFFF
+    }
+    return hash
+  }
 
   def main(args: Array[String]): Unit = {
     println(getYesterday())

+ 25 - 1
src/main/scala/com/winhc/bigdata/spark/utils/DateUtils.scala

@@ -10,6 +10,30 @@ import org.apache.commons.lang3.StringUtils
  * @Description:
  */
 object DateUtils {
+  private def addZero(str: String): String = {
+    if (str.length == 2) {
+      str
+    } else {
+      "0" + str
+    }
+  }
+
+  def formatterDate(date: String): String = {
+    try {
+      var d = date.replaceAll("[年月日号/]", "-")
+      if (d.contains(" ")) {
+        d = d.split(" ")(0)
+      }
+      val ymd = d.split("-")
+      s"${ymd(0)}-${addZero(ymd(1))}-${addZero(ymd(2))}"
+    } catch {
+      case ex: Exception => {
+        println(date)
+      }
+        date
+    }
+  }
+
 
   def toUnixTimestamp(date: String, pattern: String = "yyyy-MM-dd HH:mm:ss"): Long = {
     var p = "yyyy-MM-dd HH:mm:ss"
@@ -115,7 +139,7 @@ object DateUtils {
   }
 
   def main(args: Array[String]): Unit = {
-    println(isLegalDate("2003-10-12 10:00:00"))
+    println(formatterDate("2017/2/6 0"))
     //    println(getNotNullStr(null, "2003-10-12 10:00:00", null, "2003-11-12 00:00:02"))
   }