Ver Fonte

Merge remote-tracking branch 'origin/master'

# Conflicts:
#	src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala
lyb há 4 anos atrás
pai
commit
1afbaa6502
53 ficheiros alterados com 1625 adições e 320 exclusões
  1. 1 1
      src/main/java/com/winhc/bigdata/calc/DimScoreV2.java
  2. 1 1
      src/main/scala/com/winhc/bigdata/spark/implicits/DataFrame2HBaseHelper.scala
  3. 112 6
      src/main/scala/com/winhc/bigdata/spark/jobs/CompanyAnnualReport.scala
  4. 104 0
      src/main/scala/com/winhc/bigdata/spark/jobs/GraphX4Judicase.scala
  5. 149 0
      src/main/scala/com/winhc/bigdata/spark/jobs/JudicialCaseRelation.scala
  6. 2 1
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala
  7. 5 3
      src/main/scala/com/winhc/bigdata/spark/jobs/company_staff.scala
  8. 5 5
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala
  9. 3 2
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicForDayCount.scala
  10. 4 2
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala
  11. 8 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/bankruptcy_open_case.scala
  12. 1 10
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_annual_report_out_guarantee.scala
  13. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_bid_list.scala
  14. 1 11
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_brief_cancel_announcement.scala
  15. 2 4
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_check_info.scala
  16. 2 5
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_announcement_list.scala
  17. 2 4
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_open_announcement_list.scala
  18. 2 4
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_register_list.scala
  19. 2 4
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_double_random_check_info.scala
  20. 0 9
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_employment.scala
  21. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_equity_info.scala
  22. 2 5
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_judicial_sale_combine_list.scala
  23. 5 15
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_mortgage.scala
  24. 3 13
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_transfer.scala
  25. 2 4
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license.scala
  26. 2 5
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_creditchina.scala
  27. 2 4
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_entpub.scala
  28. 3 13
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_mortgage_info.scala
  29. 5 15
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_public_announcement2_list.scala
  30. 4 14
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info.scala
  31. 4 14
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info_creditchina.scala
  32. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_send_announcement_list.scala
  33. 3 13
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_stock_announcement.scala
  34. 2 5
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_tax_contravention.scala
  35. 2 5
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_final_case.scala
  36. 5 15
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_restrict.scala
  37. 5 4
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/wenshu_detail_combine.scala
  38. 2 2
      src/main/scala/com/winhc/bigdata/spark/jobs/inc_company_equity_info.scala
  39. 184 66
      src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation.scala
  40. 2 1
      src/main/scala/com/winhc/bigdata/spark/jobs/message/IntellectualMessage.scala
  41. 122 0
      src/main/scala/com/winhc/bigdata/spark/model/CompanyAnnualReport.scala
  42. 321 0
      src/main/scala/com/winhc/bigdata/spark/model/CompanyCommonScoreV1.scala
  43. 124 0
      src/main/scala/com/winhc/bigdata/spark/model/CompanyEnvPunishment.scala
  44. 156 0
      src/main/scala/com/winhc/bigdata/spark/model/CompanyEquityInfo.scala
  45. 126 0
      src/main/scala/com/winhc/bigdata/spark/model/CompanyPunishmentInfo.scala
  46. 10 1
      src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala
  47. 20 1
      src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala
  48. 8 5
      src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncSummary.scala
  49. 2 0
      src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidWithoutMD5Utils.scala
  50. 19 4
      src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala
  51. 7 1
      src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala
  52. 55 0
      src/main/scala/com/winhc/bigdata/spark/utils/RowkeyRuleUtils.scala
  53. 9 20
      src/main/scala/com/winhc/bigdata/spark/utils/case_connect_utils.scala

+ 1 - 1
src/main/java/com/winhc/bigdata/calc/DimScoreV2.java

@@ -330,7 +330,7 @@ public class DimScoreV2 {
         put("著作权","311");
         put("网站","312");
         //经营风险
-        put("解散清算","401");
+        put("清算信息","401");
         put("简易注销","402");
         put("严重违法行为","403");
         put("减资记录","404");

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/implicits/DataFrame2HBaseHelper.scala

@@ -21,7 +21,7 @@ object DataFrame2HBaseHelper {
 
       val stringDf = df.select(df.columns.map(column => col(column).cast("string")): _*)
       stringDf.rdd.map(row => {
-        val id = row.getAs[String](rowkeyFieldName)
+        val id = row.getAs[String](rowkeyFieldName.toLowerCase())
         val put = new Put(Bytes.toBytes(id))
         for (f <- fields) {
           val v = row.getAs[String](f.toLowerCase)

+ 112 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyAnnualReport.scala

@@ -18,6 +18,103 @@ import scala.collection.mutable
  */
 object CompanyAnnualReport {
 
+  case class sublist_summary(s: SparkSession,
+                             project: String, //表所在工程名
+                             tableName: String //表名,无前辍
+                            ) extends LoggingUtils with Logging {
+    @(transient@getter) val spark: SparkSession = s
+
+    def all(): Unit = {
+      val inc_ads_table_name = s"inc_ads_$tableName"
+      val ads_table_name = s"ads_$tableName"
+
+      val tableExists = spark.catalog.tableExists(inc_ads_table_name)
+      println(
+        s"""
+           |CREATE TABLE IF NOT EXISTS winhc_eci_dev.xjk_company_annual_report_out_investment_summary as 
+           |SELECT  split(rowkey,'_')[0] AS cid
+           |        ,COUNT(1) AS company_annual_report_out_investment
+           |FROM    (
+           |            SELECT  *
+           |                    ,DENSE_RANK() OVER(PARTITION BY split(rowkey,'_')[0] ORDER BY split(rowkey,'_')[1] DESC ) AS year_num
+           |            FROM    (
+           |                        SELECT  *
+           |                                ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |                        FROM    (
+           |                                    SELECT  *
+           |                                    FROM    winhc_eci_dev.ads_company_annual_report_out_investment
+           |                                    WHERE   ds = '${getLastPartitionsOrElse(ads_table_name, "0")}'
+           |                                    ${
+          tableExists match {
+            case true => {
+              s"""
+                 |UNION ALL
+                 |SELECT  *
+                 |FROM    winhc_eci_dev.inc_ads_company_annual_report_out_investment
+                 |WHERE   ds > '${getLastPartitionsOrElse(ads_table_name, "0")}'
+                 |""".stripMargin
+            }
+            case false => {
+              ""
+            }
+          }
+        }
+           |                                ) AS t1
+           |                    ) AS t2
+           |            WHERE   t2.num = 1
+           |        ) AS t3
+           |WHERE   t3.year_num = 1
+           |GROUP BY split(rowkey,'_')[0]
+           |""".stripMargin)
+
+    }
+
+
+    def inc(ds: String): Unit = {
+      val inc_ads_table_name = s"inc_ads_$tableName"
+      val ads_table_name = s"ads_$tableName";
+      import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
+      sql(
+        s"""
+           |SELECT  split(rowkey,'_')[0] AS cid
+           |        ,COUNT(1) AS company_annual_report_out_investment
+           |FROM    (
+           |            SELECT  *
+           |                    ,DENSE_RANK() OVER(PARTITION BY split(rowkey,'_')[0] ORDER BY split(rowkey,'_')[1] DESC ) AS year_num
+           |            FROM    (
+           |                        SELECT  *
+           |                                ,ROW_NUMBER() OVER(PARTITION BY rowkey ORDER BY ds DESC ) AS num
+           |                        FROM    (
+           |                                    SELECT  t1.*
+           |                                    FROM    (
+           |                                                SELECT  *
+           |                                                FROM    winhc_eci_dev.$ads_table_name
+           |                                                WHERE   ds = '${getLastPartitionsOrElse(ads_table_name, "0")}'
+           |                                            ) AS t1
+           |                                    JOIN    (
+           |                                                SELECT  DISTINCT split(rowkey,'_')[0] AS cid
+           |                                                FROM    winhc_eci_dev.$inc_ads_table_name
+           |                                                WHERE   ds = '$ds'
+           |                                            ) AS t2
+           |                                    ON      t2.cid = split(t1.rowkey,'_')[0]
+           |                                    UNION ALL
+           |                                    SELECT  *
+           |                                    FROM    winhc_eci_dev.$inc_ads_table_name
+           |                                    WHERE   ds = '$ds'
+           |                                ) AS t3
+           |                    ) AS t4
+           |            WHERE   t4.num = 1
+           |        ) AS t5
+           |WHERE   t5.year_num = 1
+           |GROUP BY split(rowkey,'_')[0]
+           |""".stripMargin)
+        .save2HBase("COMPANY_SUMMARY", "cid", Seq("company_annual_report_out_investment"))
+
+    }
+
+  }
+
+
   case class CompanyAnnualReportUtils(s: SparkSession,
                                       project: String //表所在工程名
                                      ) extends LoggingUtils with Logging with BaseFunc {
@@ -238,7 +335,8 @@ object CompanyAnnualReport {
           , "rowkey"
           , "cid" +: writCols)
 
-      CompanyIncSummary(spark, project, "company_annual_report", "new_cid", Seq("new_cid","report_year")).calc
+      CompanyIncSummary(spark, project, "company_annual_report", "new_cid", Seq("new_cid", "report_year")).calc
+
 
     }
 
@@ -373,6 +471,13 @@ object CompanyAnnualReport {
         .save2HBase(tableName.toUpperCase
           , "rowkey"
           , "cid" +: writeCols)
+
+
+      //todo 年报对外投资需要输出摘要
+
+      if (tableName.equals("company_annual_report_out_investment")) {
+        sublist_summary(s = spark, project = "winhc_eci_dev", tableName = tableName).inc(inc_ods_end_ds)
+      }
     }
   }
 
@@ -405,13 +510,14 @@ object CompanyAnnualReport {
 
     val all_flag = false
 
+
     if (all_flag) {
       //存量
-     /* CompanyAnnualReportHandle(spark, project).main_table_all()
-      for (elem <- sublist_map) {
-        println("xjk:" + elem._1)
-        CompanyAnnualReportHandle(spark, project).sublist_all(elem._1, elem._2.split(","))
-      }*/
+      /* CompanyAnnualReportHandle(spark, project).main_table_all()
+       for (elem <- sublist_map) {
+         println("xjk:" + elem._1)
+         CompanyAnnualReportHandle(spark, project).sublist_all(elem._1, elem._2.split(","))
+       }*/
     } else {
       //增量
       CompanyAnnualReportHandle(spark, project).main_table_inc()

+ 104 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/GraphX4Judicase.scala

@@ -0,0 +1,104 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.BaseFunc
+import com.winhc.bigdata.spark.utils.BaseUtil.{BKDRHash, isWindows}
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.graphx._
+import org.apache.spark.internal.Logging
+import org.apache.spark.rdd.RDD
+import org.apache.spark.sql.functions.col
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+case class GraphX4Judicase(s: SparkSession,
+                           project: String, //表所在工程名
+                           tableName: String, //表名(不加前后辍)
+                           fromCol: String, //边的起点列名
+                           toCol: String //边的终点列名
+                    ) extends LoggingUtils with Logging with BaseFunc {
+  @(transient@getter) val spark: SparkSession = s
+  justicase_ops()
+
+  def calc(): Unit = {
+//    val allCols = getColumns(s"$project.ods_$tableName").filter(_ != "ds").toSeq
+//    val ods_ds = BaseUtil.getPartion(s"$project.ods_$tableName", spark)
+//    val inc_ods_ds = BaseUtil.getPartion(s"$project.inc_ods_$tableName", spark)
+    val srcAllCols = getColumns(s"$project.xjk_ads_judicial_case_relation1").filter(_ != "ds").toSeq
+    val desAllCols = getColumns(s"$project.ods_justicase").filter(_ != "ds").toSeq
+    val dfRelations = sql(
+      s"""
+         |SELECT  *
+         |FROM    $project.$tableName
+         |WHERE   ${toCol} IS NOT NULL AND ${fromCol} IS NOT NULL
+         |""".stripMargin)
+    val edgeRDD: RDD[Edge[Long]] = dfRelations .select(srcAllCols.map(column => col(column).cast("string")): _*) .rdd.map(r => {
+      val case_no_from = r.getAs[String](fromCol)
+      val case_no_to = r.getAs[String](toCol)
+      val from = case_no_from.toLong
+      val to = case_no_to.toLong
+      Edge(from, to)
+    })
+    // 根据边构造图
+    val graph = Graph.fromEdges(edgeRDD, defaultValue = 0)
+
+    // 将同一连通分量中各个边聚合,经过处理形成打平的(case_no->司法案件id)并与原表join补全信息
+    val tripleRDD = graph.connectedComponents().vertices
+      .map(tp => (tp._2, tp._1)) //尝试N次明确必须这样交换,否则得到的不是极大连通子图
+      .map(r => (r._1, Set(r._2)))
+      .reduceByKey(_ ++ _)
+      .flatMap(r => {
+        val judicase_id = BKDRHash(r._2.toSeq.sorted.mkString(","))
+        var mp: Map[Long, Map[String, String]] = Map()
+        r._2.map(r => {
+          mp = mp ++ Map(r -> Map("judicase_id" -> judicase_id.toString))
+        })
+        mp
+      })
+      .map(r => {
+      Row(r._1.toString, r._2("judicase_id"), "1")
+    })
+    val schemaJust = StructType(Array(
+      StructField("id", StringType),
+      StructField("judicase_id", StringType),
+      StructField("flag", StringType)
+    ))
+    //仅包含这3个字段的表在后面融入全量时再实例其他属性
+    val dfEdgelets = spark.createDataFrame(tripleRDD, schemaJust).createOrReplaceTempView(s"tmp_edgelets_$tableName")
+    //将图结果融入全量数据中,case_no对应的司法案件号以图为准
+    sql(
+/*      s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.${tableName}_graphx PARTITION(ds='20200903')
+         |SELECT IF(B.judicase_id IS NOT NULL,B.judicase_id,A.case_id) AS judicase_id
+         |,IF(B.judicase_id IS NOT NULL,B.flag,A.flag) AS flag
+         |,${desAllCols.mkString(",")}
+         |FROM(
+         |  SELECT  '0' AS flag, *
+         |  FROM    $project.ods_justicase
+         |  WHERE   ds='20200830'
+         |) A
+         |LEFT JOIN
+         |(
+         |  SELECT id, judicase_id, flag FROM tmp_edgelets_$tableName
+         |) B
+         |ON A.case_id=B.id
+         |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")*/
+      s"""INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.xjk_ads_judicial_case_relation1_tmp
+         |SELECT id, judicase_id, flag
+         |FROM tmp_edgelets_$tableName
+         |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")
+  }
+}
+
+object GraphX4Judicase {
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "2000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    spark.sparkContext.setLogLevel("Warn")
+    GraphX4Judicase(spark, "winhc_eci_dev", "xjk_ads_judicial_case_relation1", "id_2", "id_1").calc()
+  }
+}

+ 149 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/JudicialCaseRelation.scala

@@ -0,0 +1,149 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.udf.CompanyMapping
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description:司法案件输出表
+ * @author π
+ * @date 2020/9/99:56
+ */
+object JudicialCaseRelation {
+  def main(args: Array[String]): Unit = {
+    if (args.length != 1) {
+      println("请输入 project:项目 !!!")
+      sys.exit(-1)
+    }
+    val Array(project) = args
+    println(
+      s"""
+         |project: $project
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    JudicialCaseRelation(spark, project).calc()
+    spark.stop()
+  }
+}
+
+case class JudicialCaseRelation(s: SparkSession, project: String
+                               ) extends LoggingUtils with CompanyMapping {
+  override protected val spark: SparkSession = s
+
+  def calc(): Unit = {
+
+    prepareFunctions(spark)
+
+    val t1 = s"$project.xjk_ads_judicial_case_relation1_tmp" //司法案件关联id表
+    val t2 = s"$project.ods_justicase" //司法案件源表
+    val t3 = s"$project.tmp_xf_judicial_case_relation_1" //司法案件主表
+    val t4 = s"$project.base_company_mapping" //公司name和cid映射
+    val t5 = s"$project.tmp_xf_judicial_case_relation_2" //企业司法案件
+    val t6 = s"$project.tmp_xf_judicial_case_relation_3" //司法案件明细
+
+    val t2_ds = BaseUtil.getPartion(t2, spark)
+    val t4_ds = BaseUtil.getPartion(t4, spark)
+    //mapping映射表
+    sql(
+      s"""
+         |SELECT  a.judicase_id
+         |        ,b.*
+         |FROM    (
+         |            SELECT  *
+         |            FROM $t1
+         |        ) a
+         |JOIN    (
+         |            SELECT  *
+         |            FROM    $t2
+         |            WHERE   ds = '$t2_ds'
+         |        ) b
+         |ON      a.id = b.case_id
+         |""".stripMargin).repartition(1024).createOrReplaceTempView("mapping")
+
+    //司法案件主表
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $t3
+         |SELECT  a.*
+         |        ,b.yg_name
+         |        ,b.bg_name
+         |FROM    (
+         |            SELECT  judicase_id
+         |                    ,max(title) title
+         |                    ,max(case_type) case_type
+         |                    ,max(case_reason)case_reason
+         |                    ,concat_ws('\n',collect_set(case_no)) case_no
+         |                    ,concat_ws('\n',collect_set(court_name)) court_name
+         |                    ,max(case_stage) case_stage
+         |                    ,max(concat_ws(' ',case_type,'裁判文书')) lable
+         |                    ,concat_ws('\n',collect_set(concat_ws('',case_type,case_stage,'|民事判决日期:',judge_date))) apps
+         |            FROM    mapping
+         |            GROUP BY judicase_id
+         |        ) a
+         |JOIN    (
+         |            SELECT  judicase_id
+         |                    ,yg_name
+         |                    ,bg_name
+         |            FROM    (
+         |                        SELECT  *
+         |                                ,ROW_NUMBER() OVER (PARTITION BY judicase_id ORDER BY judge_date DESC ) num
+         |                        FROM    mapping
+         |                    )
+         |            WHERE   num = 1
+         |        ) b
+         |ON      a.judicase_id = b.judicase_id
+         |""".stripMargin)
+
+    //企业司法案件表
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $t5
+         |SELECT  concat_ws('_',b.new_cid,judicase_id) AS rowkey
+         |        ,b.new_cid AS cid
+         |        ,a.*
+         |FROM    (
+         |            SELECT  name_judge(name,yg_name,bg_name) AS name_type
+         |                    ,*
+         |            FROM    (
+         |                        SELECT  *
+         |                        FROM    $t3
+         |                        LATERAL VIEW explode(split(concat_ws('\n',yg_name,bg_name) ,'\n')) t AS name
+         |                    )
+         |            WHERE   LENGTH(name) > 4
+         |        ) a
+         |JOIN    (
+         |            SELECT  *
+         |            FROM    $t4
+         |            WHERE   ds = '$t4_ds'
+         |        ) b
+         |ON      cleanup(a.name) = cleanup(b.cname)
+         |""".stripMargin)
+
+    //司法案件明细
+    sql(
+      s"""
+         |INSERT OVERWRITE TABLE $t6
+         |SELECT  judicase_id
+         |        ,case_type
+         |        ,case_stage
+         |        ,max(case_no) case_no
+         |        ,max(case_reason)case_reason
+         |        ,max(yg_name) yg_name
+         |        ,max(bg_name) bg_name
+         |        ,max(court_name) court_name
+         |        ,concat_ws('\n',collect_set(concat_ws(':','民事判决日期',judge_date,case_id))) apps
+         |FROM    mapping
+         |GROUP BY judicase_id
+         |         ,case_type
+         |         ,case_stage
+         |""".stripMargin)
+
+  }
+}

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -311,6 +311,7 @@ object ChangeExtract {
     , Args(tableName = "company_zxr_list", primaryFields = "case_no,exec_money")
 
     , Args(tableName = "company_land_transfer", primaryFields = "num,location")
+    , Args(tableName = "company_land_mortgage", primaryFields = "land_num,source_url")
     , Args(tableName = "company_employment", primaryFields = "title,url_path")
     , Args(tableName = "company_env_punishment", primaryFields = "punish_number")
     , Args(tableName = "company_icp", primaryFields = "domain")
@@ -368,7 +369,7 @@ object ChangeExtract {
 
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
     )
     val spark = SparkUtils.InitEnv("ChangeExtract", config)
 

+ 5 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/company_staff.scala

@@ -2,7 +2,7 @@ package com.winhc.bigdata.spark.jobs
 
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyStaffAggs}
-import com.winhc.bigdata.spark.utils.{LoggingUtils, MaxComputer2Phoenix, SparkUtils}
+import com.winhc.bigdata.spark.utils.{CompanyIncSummary, LoggingUtils, MaxComputer2Phoenix, SparkUtils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 
@@ -260,7 +260,9 @@ object company_staff {
         , "staff_type"
         , "create_time"
         , "update_time"
-        , "deleted"), "winhc_eci_dev.ads_company_staff", "COMPANY_STAFF", ds, "rowkey").syn()
+        , "deleted"), "winhc_eci_dev.inc_ads_company_staff", "COMPANY_STAFF", ds, "rowkey").syn()
+
+      CompanyIncSummary(spark, project, "company_staff", "new_cid", Seq("new_cid", "hid")).calc
     }
 
     def inc_bulk_save(startDs: String): Unit = {
@@ -308,7 +310,7 @@ object company_staff {
     val e = CompanyStaffUtil(spark, project)
     e.init()
 
-//    e.inc_bulk_save("20200603")
+    //    e.inc_bulk_save("20200603")
 
     if (args.length == 1) {
       val Array(ds) = args

+ 5 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -22,7 +22,7 @@ import scala.collection.mutable
  * @Description: 企业动态
  */
 object CompanyDynamic {
-  val env = "dev"
+  val env = "prod"
   val targetTab = "ads_company_dynamic"
 
   case class CompanyDynamicUtil(s: SparkSession,
@@ -38,7 +38,8 @@ object CompanyDynamic {
         s"""
            |CREATE TABLE IF NOT EXISTS ${getEnvProjectName(env, project)}.$targetTab
            |(
-           |    cid  STRING COMMENT '公司id'
+           |    id  STRING COMMENT '唯一标示,cid-sub_info_type-change_time'
+           |    ,cid  STRING COMMENT '公司id'
            |    ,cname  STRING COMMENT '公司name'
            |    ,info_type STRING COMMENT '变更分类,大类'
            |    ,rta_desc STRING COMMENT '变更信息描述,变更标题'
@@ -182,19 +183,18 @@ object CompanyDynamic {
     , Args(tableName = "company_equity_info")
     , Args(tableName = "company_staff", bName = 1)
     , Args(tableName = "company", bName = 0)
-    , Args(tableName = "company_zxr_list", bName = 0)
-    , Args(tableName = "bankruptcy_open_case", bName = 1)
     , Args(tableName = "company_illegal_info", bName = 1)
     , Args(tableName = "company_land_publicity", bName = 1)
     , Args(tableName = "company_employment", bName = 1, aggs = 1)
     , Args(tableName = "company_land_announcement", bName = 1)
     , Args(tableName = "company_bid_list", bName = 2)
     , Args(tableName = "company_land_transfer", bName = 1)
+    , Args(tableName = "company_land_mortgage", bName = 1)
     , Args(tableName = "company_env_punishment", bName = 1)
     , Args(tableName = "company_punishment_info", bName = 1)
     , Args(tableName = "company_punishment_info_creditchina", bName = 1)
-    , Args(tableName = "bankruptcy_open_case", bName = 1)
     , Args(tableName = "company_public_announcement2_list", bName = 2)
+    , Args(tableName = "bankruptcy_open_case", bName = 1)
     , Args(tableName = "company_mortgage_info", bName = 1)
     , Args(tableName = "company_stock_announcement", bName = 1)
     , Args(tableName = "company_finance", bName = 1)

+ 3 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicForDayCount.scala

@@ -27,7 +27,8 @@ case class CompanyDynamicForDayCount(s: SparkSession,
       s"""
          |CREATE TABLE IF NOT EXISTS ${getEnvProjectName(env, project)}.$targetTab
          |(
-         |    cid  STRING COMMENT '公司id'
+         |    id  STRING COMMENT '唯一标示,cid-sub_info_type-change_time'
+         |    ,cid  STRING COMMENT '公司id'
          |    ,cname  STRING COMMENT '公司name'
          |    ,info_type STRING COMMENT '变更分类,大类'
          |    ,rta_desc STRING COMMENT '变更信息描述,变更标题'
@@ -93,7 +94,7 @@ case class CompanyDynamicForDayCount(s: SparkSession,
       val cname = r.getAs[String]("cname")
       val new_map = Map("cnt" -> (cnt + ""))
 
-      val result = handle.handle(cid + biz_date, biz_date, cid, null, null, new_map, cname)
+      val result = handle.handle(cid , biz_date, cid, null, null, new_map, cname)
       if (result == null) {
         None
       }

+ 4 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -146,7 +146,9 @@ trait CompanyDynamicHandle {
     , "" -> "2" //法院公告(原告/申请人)
     , "" -> "2" //开庭公告(原告)
     , "" -> "2" //工商变更
+    , "company_annual_report_out_guarantee" -> "2" //年报对外担保
     , "" -> "2" //退出对外投资
+    , "company_employment" -> "2" //招聘
     , "" -> "2" //对外投资企业注销/吊销/经营异常
     , "" -> "2" //分支机构注销/吊销/经营异常
     , "" -> "2" //新闻舆论(中立、消极)
@@ -205,7 +207,7 @@ trait CompanyDynamicHandle {
       , rta_desc
       , get_change_content(old_map, new_map)
       , get_change_time(bizDate, new_map)
-      , get_biz_id(rowkey)
+      , get_biz_id(rowkey, new_map)
       , get_sub_info_type()
       , get_info_risk_level(old_map, new_map)
       , if (suggestion == null) null else suggestion
@@ -265,7 +267,7 @@ trait CompanyDynamicHandle {
    * @param rowkey
    * @return
    */
-  protected def get_biz_id(rowkey: String): String = rowkey
+  protected def get_biz_id(rowkey: String, new_map: Map[String, String]): String = rowkey
 
   /**
    * 子信息类型,小类

+ 8 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/bankruptcy_open_case.scala

@@ -22,4 +22,12 @@ case class bankruptcy_open_case() extends CompanyDynamicHandle {
        |申请人:${new_map("applicant")}
        |公开日期:${new_map("public_date")}""".stripMargin
   }
+  /**
+   * 风险等级
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "4"
 }

+ 1 - 10
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_annual_report_out_guarantee.scala

@@ -8,7 +8,7 @@ import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
  * @Description
  */
 //年报-对外担保
-case class company_annual_report_out_guarantee()extends CompanyDynamicHandle {
+case class company_annual_report_out_guarantee() extends CompanyDynamicHandle {
   /**
    * 信息描述
    *
@@ -26,15 +26,6 @@ case class company_annual_report_out_guarantee()extends CompanyDynamicHandle {
    * @return
    */
   override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = new_map("credito_term") + new_map("guarantee_way")
-
-  /**
-   * 变更时间
-   *
-   * @param new_map
-   * @return
-   */
-//  override def get_change_time(new_map: Map[String, String]): String = new_map("biz_date")
-
   /**
    * 风险等级
    *

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_bid_list.scala

@@ -32,7 +32,7 @@ case class company_bid_list() extends CompanyDynamicHandle {
    * @param rowkey
    * @return
    */
-  override def get_biz_id(rowkey: String): String = rowkey.split("_")(1)
+  override def get_biz_id(rowkey: String, new_map: Map[String, String]): String = rowkey.split("_")(1)
 
   /**
    * 风险等级

+ 1 - 11
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_brief_cancel_announcement.scala

@@ -16,19 +16,9 @@ case class company_brief_cancel_announcement() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("announcement_term")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
     "进行了简易注销"
   }
-
   /**
    * 变更时间
    *

+ 2 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_check_info.scala

@@ -26,10 +26,8 @@ case class company_check_info() extends CompanyDynamicHandle {
    * @return
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
-    s"""
-       |日期:${new_map("check_date")}
-       |结果:${new_map("check_result")}
-       |""".stripMargin
+    s"""日期:${new_map("check_date")}
+       |结果:${new_map("check_result")}""".stripMargin
   }
 
   /**

+ 2 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_announcement_list.scala

@@ -1,6 +1,5 @@
 package com.winhc.bigdata.spark.jobs.dynamic.tables
 
-import com.winhc.bigdata.spark.implicits.MapHelper._
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 import org.apache.commons.lang3.StringUtils
 
@@ -27,12 +26,10 @@ case class company_court_announcement_list() extends CompanyDynamicHandle {
    * @return
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
-    s"""
-       |案号:${new_map("case_no")}万元
+    s"""案号:${new_map("case_no")}万元
        |上诉人:${new_map("plaintiff")}
        |被上诉人:${new_map("litigant")}
-       |刊登日期:${new_map("publish_date")}
-       |""".stripMargin
+       |刊登日期:${new_map("publish_date")}""".stripMargin
   }
 
   /**

+ 2 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_open_announcement_list.scala

@@ -28,11 +28,9 @@ case class company_court_open_announcement_list() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
 
-    s"""
-       |开庭时间:${new_map("start_date")}
+    s"""开庭时间:${new_map("start_date")}
        |案号:${new_map("case_no")}
-       |案由:${new_map("case_reason")}
-       |""".stripMargin
+       |案由:${new_map("case_reason")}""".stripMargin
   }
 
   /**

+ 2 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_court_register_list.scala

@@ -28,11 +28,9 @@ case class company_court_register_list() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
 
-    s"""
-       |立案日期:${new_map("filing_date")}
+    s"""立案日期:${new_map("filing_date")}
        |上诉人:${new_map("plaintiff")}
-       |被上诉人:${new_map("defendant")}
-       |""".stripMargin
+       |被上诉人:${new_map("defendant")}""".stripMargin
   }
 
   /**

+ 2 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_double_random_check_info.scala

@@ -25,12 +25,10 @@ case class company_double_random_check_info() extends CompanyDynamicHandle {
    * @return
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
-    s"""
-       |任务编号:${new_map("check_task_num")}
+    s"""任务编号:${new_map("check_task_num")}
        |任务名称:${new_map("check_task_name")}
        |抽查机关:${new_map("check_department")}
-       |完成日期:${new_map("check_date")}
-       |""".stripMargin
+       |完成日期:${new_map("check_date")}""".stripMargin
   }
 
   /**

+ 0 - 9
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_employment.scala

@@ -1,6 +1,5 @@
 package com.winhc.bigdata.spark.jobs.dynamic.tables
 
-import com.winhc.bigdata.spark.implicits.MapHelper._
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 
 /**
@@ -18,14 +17,6 @@ case class company_employment() extends CompanyDynamicHandle {
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = "新增"+new_map.getOrElse("cnt", "")+"条招聘信息"
 
 
-  /**
-    * 风险等级
-    *
-    * @param old_map
-    * @param new_map
-    * @return
-    */
-  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示"
 
   /**
     * 变更内容

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_equity_info.scala

@@ -47,7 +47,7 @@ case class company_equity_info() extends CompanyDynamicHandle {
           , get_rta_desc(old_map, new_map)
           , get_change_content(old_map, new_map, cname)
           , get_change_time(bizDate, new_map)
-          , get_biz_id(rowkey)
+          , get_biz_id(rowkey,new_map)
           , get_sub_info_type()
           , t._3
           , null

+ 2 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_judicial_sale_combine_list.scala

@@ -1,6 +1,5 @@
 package com.winhc.bigdata.spark.jobs.dynamic.tables
 
-import com.winhc.bigdata.spark.implicits.MapHelper._
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 
 /**
@@ -27,11 +26,9 @@ case class company_judicial_sale_combine_list() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
 
-    s"""
-       |标题:${new_map("title")}
+    s"""标题:${new_map("title")}
        |起拍价:${new_map("initial_price")}
-       |拍卖时间:${new_map("start_time")}
-       |""".stripMargin
+       |拍卖时间:${new_map("start_time")}""".stripMargin
   }
 
   /**

+ 5 - 15
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_mortgage.scala

@@ -16,22 +16,12 @@ case class company_land_mortgage() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("land_num")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""抵押土地评估金额:${new_map("evaluate_amount")}万元\n
-       |抵押金额:${new_map("mortgage_amount")}万元\n
-       |抵押开始时间:${new_map("start_date")}\n
-       |抵押结束时间:${new_map("end_date")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""抵押土地评估金额:${new_map("evaluate_amount")}万元
+       |抵押金额:${new_map("mortgage_amount")}万元
+       |抵押开始时间:${new_map("start_date")}
+       |抵押结束时间:${new_map("end_date")}""".stripMargin
   }
-
   /**
    * 变更时间
    *

+ 3 - 13
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_transfer.scala

@@ -16,20 +16,10 @@ case class company_land_transfer() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("num")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""转让价格:${new_map("merchandise_price")}万元\n
-       |成交时间:${new_map("merchandise_time")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""转让价格:${new_map("merchandise_price")}万元
+       |成交时间:${new_map("merchandise_time")}""".stripMargin
   }
-
   /**
    * 变更时间
    *

+ 2 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license.scala

@@ -26,13 +26,11 @@ case class company_license() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
 
-    s"""
-       |许可证编号:${new_map("license_number")}
+    s"""许可证编号:${new_map("license_number")}
        |有效期自:${new_map("start_date")}
        |有效期至:${new_map("end_date")}
        |许可机关:${new_map("department")}
-       |许可内容:${new_map("scope")}
-       |""".stripMargin
+       |许可内容:${new_map("scope")}""".stripMargin
   }
 
   /**

+ 2 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_creditchina.scala

@@ -1,6 +1,5 @@
 package com.winhc.bigdata.spark.jobs.dynamic.tables
 
-import com.winhc.bigdata.spark.implicits.MapHelper._
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 
 /**
@@ -27,13 +26,11 @@ case class company_license_creditchina() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
 
-    s"""
-       |许可证编号:${new_map("licence_number")}
+    s"""许可证编号:${new_map("licence_number")}
        |有效期自:${new_map("decision_date")}
        |有效期至:${new_map("end_date")}
        |许可机关:${new_map("department")}
-       |许可内容:${new_map("resume")}
-       |""".stripMargin
+       |许可内容:${new_map("resume")}""".stripMargin
   }
 
   /**

+ 2 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_license_entpub.scala

@@ -26,13 +26,11 @@ case class company_license_entpub() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
 
-    s"""
-       |许可证编号:${new_map("license_number")}
+    s"""许可证编号:${new_map("license_number")}
        |有效期自:${new_map("start_date")}
        |有效期至:${new_map("end_date")}
        |许可机关:${new_map("department")}
-       |许可内容:${new_map("scope")}
-       |""".stripMargin
+       |许可内容:${new_map("scope")}""".stripMargin
   }
 
   /**

+ 3 - 13
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_mortgage_info.scala

@@ -16,20 +16,10 @@ case class company_mortgage_info() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("reg_num")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""被担保债权数额:${new_map("amount")}\n
-       |债务人履行债务期限:${new_map("term")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""被担保债权数额:${new_map("amount")}
+       |债务人履行债务期限:${new_map("term")}""".stripMargin
   }
-
   /**
    * 变更时间
    *

+ 5 - 15
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_public_announcement2_list.scala

@@ -16,22 +16,12 @@ case class company_public_announcement2_list()extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("bill_type") + new_map("bill_num")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""票号:${new_map("bill_num")}\n
-       |申请人:${new_map("cname")}\n
-       |票面金额:${new_map("start_date")}\n
-       |公告日期:${new_map("end_date")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""票号:${new_map("bill_num")}
+       |申请人:${new_map("cname")}
+       |票面金额:${new_map("start_date")}
+       |公告日期:${new_map("end_date")}""".stripMargin
   }
-
   /**
    * 变更时间
    *

+ 4 - 14
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info.scala

@@ -16,21 +16,11 @@ case class company_punishment_info() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("punish_number")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""决定日期:${new_map("decision_date")}\n
-       |违法行为类型:${new_map("type")}\n
-       |行政处罚内容:${new_map("content")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""决定日期:${new_map("decision_date")}
+       |违法行为类型:${new_map("type")}
+       |行政处罚内容:${new_map("content")}""".stripMargin
   }
-
   /**
    * 变更时间
    *

+ 4 - 14
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info_creditchina.scala

@@ -16,21 +16,11 @@ case class company_punishment_info_creditchina() extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("punish_number")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""决定日期:${new_map("decision_date")}\n
-       |违法行为类型:${new_map("type")}\n
-       |行政处罚结果:${new_map("result")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""决定日期:${new_map("decision_date")}
+       |违法行为类型:${new_map("type")}
+       |行政处罚结果:${new_map("result")}""".stripMargin
   }
-
   /**
    * 变更时间
    *

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_send_announcement_list.scala

@@ -46,7 +46,7 @@ case class company_send_announcement_list()extends CompanyDynamicHandle {
    * @return
    */
   override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = {
-    if(new_map("defendant_cids").contains("cid"))//
+    if(new_map("defendant_cids").contains("cid"))//
       {
         "3"
       }

+ 3 - 13
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_stock_announcement.scala

@@ -16,20 +16,10 @@ case class company_stock_announcement()extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("title")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""公告名称:${new_map("title")}\n
-       |公告日期:${new_map("time")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""公告名称:${new_map("title")}
+       |公告日期:${new_map("time")}""".stripMargin
   }
-
   /**
    * 变更时间
    *

+ 2 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_tax_contravention.scala

@@ -1,7 +1,6 @@
 package com.winhc.bigdata.spark.jobs.dynamic.tables
 
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
-import com.winhc.bigdata.spark.implicits.MapHelper._
 import org.apache.commons.lang3.StringUtils
 
 /**
@@ -27,10 +26,8 @@ case class company_tax_contravention() extends CompanyDynamicHandle {
    * @return
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
-    s"""
-       |案件性质:${new_map("case_type")}
-       |发布日期:${new_map("publish_time")}
-       |""".stripMargin
+    s"""案件性质:${new_map("case_type")}
+       |发布日期:${new_map("publish_time")}""".stripMargin
   }
 
   /**

+ 2 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_final_case.scala

@@ -1,6 +1,5 @@
 package com.winhc.bigdata.spark.jobs.dynamic.tables
 
-import com.winhc.bigdata.spark.implicits.MapHelper._
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
 
 /**
@@ -27,12 +26,10 @@ case class company_zxr_final_case() extends CompanyDynamicHandle {
    */
   override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
 
-    s"""
-       |案号:${new_map("case_no")}
+    s"""案号:${new_map("case_no")}
        |执行法院:${new_map("court_name")}
        |立案日期:${new_map("case_create_time")}
-       |终本日期:${new_map("case_final_time")}
-       |""".stripMargin
+       |终本日期:${new_map("case_final_time")}""".stripMargin
   }
 
   /**

+ 5 - 15
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_restrict.scala

@@ -16,22 +16,12 @@ case class company_zxr_restrict()extends CompanyDynamicHandle {
    * @param new_map
    * @return
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("case_no")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""案号:${new_map("case_no")}\n
-       |限消令对象:${new_map("name")}\n
-       |立案日期:${new_map("case_create_time")}\n
-       |发布日期:${new_map("update_time")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""案号:${new_map("case_no")}
+       |限消令对象:${new_map("name")}
+       |立案日期:${new_map("case_create_time")}
+       |发布日期:${new_map("update_time")}""".stripMargin
   }
-
   /**
    * 变更时间
    *

+ 5 - 4
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/wenshu_detail_combine.scala

@@ -35,12 +35,10 @@ case class wenshu_detail_combine() extends CompanyDynamicHandle {
         t2 = "被告"
       }
     }
-    s"""
-       |案由:${new_map("case_reason_level3")}
+    s"""案由:${new_map("case_reason_level3")}
        |案号:${new_map("case_no")}
        |诉讼身份:${t2}
-       |发布日期:${new_map("judge_date")}
-       |""".stripMargin
+       |发布日期:${new_map("judge_date")}""".stripMargin
   }
 
   /**
@@ -65,4 +63,7 @@ case class wenshu_detail_combine() extends CompanyDynamicHandle {
     t2
   }
 
+  override protected def get_biz_id(rowkey: String, new_map: Map[String, String]): String = {
+    new_map("case_id")
+  }
 }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/inc_company_equity_info.scala

@@ -3,7 +3,7 @@ package com.winhc.bigdata.spark.jobs
 import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, MaxComputer2Phoenix, SparkUtils}
+import com.winhc.bigdata.spark.utils.{BaseUtil, CompanyIncSummary, LoggingUtils, MaxComputer2Phoenix, SparkUtils}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 
@@ -172,7 +172,7 @@ object inc_company_equity_info {
         , "CONCAT_WS('_',cid,main_id)").syn()
       import com.winhc.bigdata.spark.implicits.DataFrame2HBaseHelper._
 
-      //todo 没有写出摘要
+      CompanyIncSummary(spark, project, "company_equity_info_list", "cid", Seq("cid", "main_id")).calc
 
       val outFields = getColumns("winhc_eci_dev.inc_ads_company_equity_info").map(_.toUpperCase)
       sql(

+ 184 - 66
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelation.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.spark.jobs.judicial
 
+import com.winhc.bigdata.spark.implicits.RegexUtils.RichRegex
 import com.winhc.bigdata.spark.udf.BaseFunc
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
 import com.winhc.bigdata.spark.utils.{LoggingUtils, SparkUtils, case_connect_utils}
@@ -8,6 +9,7 @@ import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
 import scala.collection.mutable
+import scala.collection.mutable.ArrayBuffer
 
 /**
  * @Author: XuJiakai
@@ -19,22 +21,16 @@ case class JudicialCaseRelation(s: SparkSession,
                                ) extends LoggingUtils with Logging with BaseFunc {
   @(transient@getter) val spark: SparkSession = s
   val table_id_map = Map("justicase" -> "case_id")
+  val pat = ".*\\d+.*".r
 
-
-  def getStrToMap(cols: Seq[String]): String = {
-    val set = cols.toSet
-    val str = set.map(e => {
-      s"concat_ws('\001','$e',cast($e as string))"
-    }).mkString(",")
-    s"str_to_map(concat_ws('\002',$str),'\002','\001')"
-  }
-
+  import spark.implicits._
 
   def all(tableName: String): Unit = {
     val table_id = table_id_map(tableName)
     val ods_table_name = s"ods_$tableName"
     val ods_last_ds = getLastPartitionsOrElse(ods_table_name, "0")
-    val other_cols = getColumns(ods_table_name).diff(Seq("ds", "case_no", "connect_case_no", table_id))
+    //    val other_cols = getColumns(ods_table_name).diff(Seq("ds", "case_no", "connect_case_no", table_id))
+    val other_cols = Seq("yg_name", "court_name", "case_no", "bg_name")
 
     sql(
       s"""
@@ -64,21 +60,190 @@ case class JudicialCaseRelation(s: SparkSession,
 
 
   def inc(tableName: String, ds: String): Unit = {
+  }
 
 
-  }
+  def relationByGroup(): Unit = {
+    val dwd_last_ds = getLastPartitionsOrElse("winhc_eci_dev.dwd_judicial_case", "0")
+    spark.udf.register("case_equ", case_equ _)
+    spark.udf.register("str_sort", (v1: String, v2: String) => Seq(v1, v2).filter(_ != null).sorted.mkString(""))
+    spark.udf.register("match_case_no", (case_no: String) => pat matches case_no)
+
+    sql(
+      s"""
+         | SELECT  *
+         | FROM    winhc_eci_dev.dwd_judicial_case
+         | WHERE   ds = '$dwd_last_ds'
+         | AND     case_no IS NOT NULL
+         | AND     case_no <> ''
+         | AND     match_case_no(case_no)
+         |""".stripMargin)
+      .cache()
+      .createOrReplaceTempView("dwd_judicial_case_tmp")
+
+    sql(
+      s"""
+         |SELECT  case_no,party,collect_set(id) as connect_case_id
+         |FROM    (
+         |            SELECT  concat_ws('_',id,tn) as id
+         |                    ,case_no
+         |                    ,tn
+         |                    ,main_case_no
+         |                    ,case_attribute
+         |                    ,party
+         |            FROM    dwd_judicial_case_tmp
+         |            LATERAL VIEW explode(split(concat_ws('\\n',case_attribute['yg_name'],case_attribute['bg_name']) ,'\\n')) t AS party
+         |        ) AS t1
+         |WHERE   length(t1.party) > 4
+         |GROUP BY case_no,party
+         |""".stripMargin).rdd
+      .flatMap(r => {
+        val case_no = r.getAs[String]("case_no")
+        val party = r.getAs[String]("party")
+        val connect_case_id = r.getAs[Seq[String]]("connect_case_id")
+        val list = ArrayBuffer[(String, String, String, String, String, String, Int)]()
+        if (connect_case_id.length < 2) {
+          val e_1 = connect_case_id.head.split("_")
+          list.append((e_1(0), null, case_no, null, e_1(1), null, 2))
+        }
+        for (i <- 0 to connect_case_id.length - 2) {
+          val e_1 = connect_case_id(i).split("_")
+          val e_2 = connect_case_id(i + 1).split("_")
+          list.append((e_1(0), e_2(0), case_no, case_no, e_1(1), e_2(1), 2))
+        }
+        list
+      })
+      .toDF("id_1", "id_2", "case_no_1", "case_no_2", "tn_1", "tn_2", "connect_type")
+      .createOrReplaceTempView("connect_tmp_1")
+
+    sql(
+      s"""
+         |SELECT  t1.id AS id_1
+         |        ,t2.id AS id_2
+         |        ,t1.case_no AS case_no_1
+         |        ,t2.case_no AS case_no_2
+         |        ,t1.tn AS tn_1
+         |        ,t2.tn AS tn_2
+         |        ,1 as connect_type
+         |        ,str_sort(concat_ws('',t1.id,t1.tn),concat_ws('',t2.id,t2.tn)) as xjk_sorted
+         |FROM    (select * from dwd_judicial_case_tmp where main_case_no = 1) AS t1
+         |FULL JOIN (select * from dwd_judicial_case_tmp where main_case_no = 0) AS t2
+         |ON      t1.case_no = t2.case_no
+         |AND     t1.id <> t2.id
+         |AND     case_equ(t1.case_attribute , t2.case_attribute)
+         |""".stripMargin)
+      .createOrReplaceTempView("connect_tmp_2")
+
+    sql(
+      s"""
+         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.xjk_ads_judicial_case_relation1
+         |SELECT  id_1
+         |        ,id_2
+         |        ,case_no_1
+         |        ,case_no_2
+         |        ,tn_1
+         |        ,tn_2
+         |        ,connect_type
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY xjk_sorted ORDER BY connect_type) AS num
+         |            FROM    (
+         |                        SELECT  id_1
+         |                                ,id_2
+         |                                ,case_no_1
+         |                                ,case_no_2
+         |                                ,tn_1
+         |                                ,tn_2
+         |                                ,connect_type
+         |                                ,str_sort(concat_ws('',id_1,tn_1),concat_ws('',id_2,tn_2)) AS xjk_sorted
+         |                        FROM    connect_tmp_1
+         |                        UNION ALL
+         |                        SELECT  id_1
+         |                                ,id_2
+         |                                ,case_no_1
+         |                                ,case_no_2
+         |                                ,tn_1
+         |                                ,tn_2
+         |                                ,connect_type
+         |                                ,xjk_sorted
+         |                        FROM    connect_tmp_2
+         |                    ) AS t1
+         |        ) AS t2
+         |WHERE   t2.num = 1
+         |""".stripMargin)
 
-  private def getVal(map: Map[String, String], key: String): String = {
-    map.getOrElse(key, "")
   }
 
-  def sort(v1: String, v2: String): String = {
-    val seq = Seq(v1, v2)
-    seq.filter(_ != null).sorted.mkString("")
+  /* def relation(): Unit = {
+     spark.udf.register("case_equ", case_equ _)
+     spark.udf.register("str_sort", sort _)
+     spark.udf.register("match_case_no", match_case_no _)
+     val dwd_last_ds = getLastPartitionsOrElse("winhc_eci_dev.dwd_judicial_case", "0")
+     val ignoreCaseNo = JudicialCaseRelation.getTopCaseNo()
+     sql(
+       s"""
+          | SELECT  *
+          | FROM    winhc_eci_dev.dwd_judicial_case
+          | WHERE   ds = '$dwd_last_ds'
+          | AND     case_no IS NOT NULL
+          | AND     case_no <> ''
+          | AND     match_case_no(case_no)
+          | ${
+         ignoreCaseNo.isEmpty match {
+           case true => ""
+
+           case false => s"AND case_no not in (${ignoreCaseNo.map(ss => "\"" + ss + "\"").mkString(",")})"
+
+         }
+       }
+          |""".stripMargin)
+       .cache()
+       .createOrReplaceTempView("dwd_judicial_case_tmp")
+     sql(
+       s"""
+          |--- INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.xjk_ads_judicial_case_relation3
+          | SELECT  id_1
+          |         ,id_2
+          |         ,case_no_1
+          |         ,case_no_2
+          |         ,tn_1
+          |         ,tn_2
+          |         ,connect_type
+          | FROM    (
+          |             SELECT  *
+          |                     ,ROW_NUMBER() OVER(PARTITION BY xjk_sorted ORDER BY xjk_sorted) AS num
+          |             FROM    (
+          |                        SELECT  t1.id AS id_1
+          |                                ,t2.id AS id_2
+          |                                ,t1.case_no AS case_no_1
+          |                                ,t2.case_no AS case_no_2
+          |                                ,t1.tn AS tn_1
+          |                                ,t2.tn AS tn_2
+          |                                ,1 as connect_type
+          |                                ,str_sort(concat_ws('',t1.id,t1.tn),concat_ws('',t2.id,t2.tn)) as xjk_sorted
+          |                        FROM    (select * from dwd_judicial_case_tmp where main_case_no = 1) AS t1
+          |                        FULL JOIN (select * from dwd_judicial_case_tmp where main_case_no = 0) AS t2
+          |                        ON      t1.case_no = t2.case_no
+          |                        AND     t1.id <> t2.id
+          |                        AND     case_equ(t1.case_attribute , t2.case_attribute)
+          |                     ) AS t1
+          |         ) AS t2
+          | WHERE   t2.num = 1
+          |""".stripMargin)
+   }*/
+
+
+  def getStrToMap(cols: Seq[String]): String = {
+    val set = cols.toSet
+    val str = set.map(e => {
+      s"concat_ws('\001','$e',cast($e as string))"
+    }).mkString(",")
+    s"str_to_map(concat_ws('\002',$str),'\002','\001')"
   }
 
-  def case_equ(m1: Map[String, String], m2: Map[String, String]): Boolean = {
+  private def getVal(map: Map[String, String], key: String): String = map.getOrElse(key, "")
 
+  def case_equ(m1: Map[String, String], m2: Map[String, String]): Boolean = {
     try {
       val current_case_party_list_org: Seq[String] = getVal(m1, "yg_name").split("\n") ++ getVal(m1, "bg_name").split("\n")
       val connect_case_party_list_org: Seq[String] = getVal(m2, "yg_name").split("\n") ++ getVal(m2, "bg_name").split("\n")
@@ -100,54 +265,6 @@ case class JudicialCaseRelation(s: SparkSession,
     }
   }
 
-  def relation(): Unit = {
-    spark.udf.register("case_equ", case_equ _)
-    spark.udf.register("str_sort", sort _)
-    val dwd_last_ds = getLastPartitionsOrElse("winhc_eci_dev.dwd_judicial_case", "0")
-    sql(
-      s"""
-         |INSERT OVERWRITE TABLE winhc_eci_dev.xjk_ads_judicial_case_relation3
-         | SELECT  id_1
-         |         ,id_2
-         |         ,case_no_1
-         |         ,case_no_2
-         |         ,tn_t1
-         |         ,tn_t2
-         | FROM    (
-         |             SELECT  *
-         |                     ,ROW_NUMBER() OVER(PARTITION BY xjk_sorted ORDER BY xjk_sorted) AS num
-         |             FROM    (
-         |                        SELECT  t1.id AS id_1
-         |                                ,t2.id AS id_2
-         |                                ,t1.case_no AS case_no_1
-         |                                ,t2.case_no AS case_no_2
-         |                                ,t1.tn AS tn_t1
-         |                                ,t2.tn AS tn_t2
-         |                                ,concat(concat(t1.id,t1.tn),concat(t2.id,t2.tn)) as xjk_sorted
-         |                        FROM    (
-         |                                    SELECT  *
-         |                                    FROM    winhc_eci_dev.dwd_judicial_case
-         |                                    WHERE   ds = '$dwd_last_ds'
-         |                                    AND     case_no IS NOT NULL
-         |                                    AND     case_no <> ''
-         |                                    AND     case_no RLIKE '\\d+'
-         |                                ) AS t1
-         |                        FULL JOIN (
-         |                                      SELECT  *
-         |                                      FROM    winhc_eci_dev.dwd_judicial_case
-         |                                      WHERE   ds = '$dwd_last_ds'
-         |                                      AND     case_no IS NOT NULL
-         |                                      AND     case_no <> ''
-         |                                      AND     case_no RLIKE '\\d+'
-         |                                  ) AS t2
-         |                        ON      t1.case_no = t2.case_no
-         |                        AND     t1.id <> t2.id
-         |                        AND     case_equ(t1.case_attribute , t2.case_attribute)
-         |                     ) AS t1
-         |         ) AS t2
-         | WHERE   t2.num = 1
-         |""".stripMargin)
-  }
 }
 
 object JudicialCaseRelation {
@@ -160,7 +277,8 @@ object JudicialCaseRelation {
     val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
     val jcr = JudicialCaseRelation(spark, project = "winhc_eci_dev")
     //    jcr.all("justicase")
-    jcr.relation()
+    //    jcr.relation()
+    jcr.relationByGroup()
     spark.stop()
   }
 }

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/message/IntellectualMessage.scala

@@ -2,6 +2,7 @@ package com.winhc.bigdata.spark.jobs.message
 
 import java.util.Date
 
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamic.env
 import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandleUtils
 import com.winhc.bigdata.spark.udf.MapAggs
 import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
@@ -53,7 +54,7 @@ case class IntellectualMessage(s: SparkSession, project: String,
   val t6 = s"company_copyright_works_list" //软著作权
   val res_tb = s"$project.tmp_xf_ads_intellectual_message" //聚合结果表
   //  val res_tb_res = s"$project.tmp_xf_ads_intellectual_message_res" //转换输出结果表
-  val res_tb_res = s"$project.ads_company_dynamic" //转换输出结果表
+  val res_tb_res = s"${getEnvProjectName(env, project)}.ads_company_dynamic" //转换输出结果表
 
   val tn = "intellectual"
 

+ 122 - 0
src/main/scala/com/winhc/bigdata/spark/model/CompanyAnnualReport.scala

@@ -0,0 +1,122 @@
+package com.winhc.bigdata.spark.model
+
+import java.util.Date
+
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
+import org.apache.spark.sql.{Row, SparkSession}
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Description:年报得分
+ * @author π
+ * @date 2020/9/316:52
+ */
+object CompanyAnnualReport {
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyAnnualReport(spark, "company_annual_report",
+      "", "206", "update_time", "经营情况", "年报信息", "0", "winhc_eci_dev").calc()
+    spark.stop()
+  }
+}
+
+
+case class CompanyAnnualReport(s: SparkSession, sourceTable: String, tableView: String = "",
+                               flag: String, time: String, kind: String, project: String,
+                               tp: String = "0", namespace: String = ""
+                              ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+    val adsTable = namespace + ".ads_" + sourceTable
+    val incAdsTable = namespace + ".inc_ads_" + sourceTable
+    val targetTable = namespace + ".ads_" + sourceTable + "_score"
+    var ds = ""
+
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
+
+    val df = sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |        SELECT
+         |                *
+         |                ,COUNT(new_cid) OVER(PARTITION BY new_cid ) AS cnt1
+         |                ,ROW_NUMBER() OVER(PARTITION BY new_cid ORDER BY $time DESC ) AS num
+         |        FROM    $tb
+         |        WHERE   new_cid IS NOT NULL AND report_year = '${BaseUtil.getYear(-1)}'
+         |        ${appsql2}
+         |        ) a
+         |WHERE   num =1
+         |""".stripMargin)
+
+    df.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"t1_view")
+
+    sql("select * from t1_view").show(20, false)
+
+    sql(s"insert overwrite table ${targetTable} " +
+      s"partition (ds='${ds}')  select * from t1_view")
+
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from t1_view
+           |""".stripMargin)
+
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = r.getAs[Long]("id")
+    val cid = r.getAs[Long]("new_cid").toString
+    val cnt1 = r.getAs[Long]("cnt1")
+    getScore(id, cid, cnt1, kind, prpject)
+  }
+
+  //年报
+  def getScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 10f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 0f
+      ty = "前一年未公示年报"
+    } else {
+      score = 10f
+      ty = "前一年正常公示年报"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+}

+ 321 - 0
src/main/scala/com/winhc/bigdata/spark/model/CompanyCommonScoreV1.scala

@@ -0,0 +1,321 @@
+package com.winhc.bigdata.spark.model
+
+import java.util.Date
+
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.utils.{AsyncExtract, BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Description:清算信息,简易注销,严重违法,经营异常,税收违法,欠税公告,公示催告,司法拍卖
+ * @author π
+ * @date 2020/9/316:52
+ */
+object CompanyCommonScoreV1 {
+
+  private case class Params(tableName: String //表名
+                            , tableView: String = "" //增量数据视图
+                            , flag: String //维度code
+                            , time: String = "update_time" //去重字段
+                            , kind: String //大类
+                            , project: String //小类
+                            , tp: String = "0" //区分增量=1,存量=0
+                            , namespace: String = "winhc_eci_dev" //工作空间
+                           )
+
+  private val startParams = Seq(
+    Params(tableName = "company_liquidating_info", tableView = "", flag = "401", time = "update_time", kind = "经营风险", project = "清算信息", tp = "0")
+    , Params(tableName = "company_brief_cancel_announcement", tableView = "", flag = "402", time = "update_time", kind = "经营风险", project = "简易注销", tp = "0")
+    , Params(tableName = "company_illegal_info", tableView = "", flag = "403", time = "update_time", kind = "经营风险", project = "严重违法行为", tp = "0")
+    , Params(tableName = "company_abnormal_info", tableView = "", flag = "409", time = "update_time", kind = "经营风险", project = "经营异常", tp = "0")
+    , Params(tableName = "company_tax_contravention", tableView = "", flag = "412", time = "update_time", kind = "经营风险", project = "税收违法", tp = "0")
+    , Params(tableName = "company_own_tax", tableView = "", flag = "413", time = "update_time", kind = "经营风险", project = "欠税公告", tp = "0")
+    , Params(tableName = "company_public_announcement2_list", tableView = "", flag = "414", time = "update_time", kind = "经营风险", project = "公示催告", tp = "0")
+    , Params(tableName = "company_judicial_sale_combine_list", tableView = "", flag = "506", time = "update_time", kind = "法律风险", project = "司法拍卖", tp = "0")
+    //增量是全量, Params(tableName = "bankruptcy_open_case", tableView = "", flag = "505", time = "update_time", kind = "法律风险", project = "破产重整", tp = "0",namespace = "winhc_eci")
+  )
+
+  def main(args: Array[String]): Unit = {
+
+    if (args.length != 2) {
+      println(
+        s"""
+           |Please enter the legal parameters !
+           |<project> <tableNames>
+           |""".stripMargin)
+      sys.exit(-1)
+    }
+
+    val Array(project, tableNames) = args
+
+    println(
+      s"""
+         |project: $project
+         |tableNames: $tableNames
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+
+    var start = startParams
+    if (!tableNames.equals("all")) {
+      val set = tableNames.split(",").toSet
+      start = start.filter(a => set.contains(a.tableName))
+    }
+
+    val a = start.map(e => (e.tableName, () => {
+      CompanyCommonScoreV1(spark, e.tableName, e.tableView, e.flag, e.time, e.kind, e.project, e.tp, e.namespace).calc()
+      true
+    }))
+
+    AsyncExtract.startAndWait(spark, a)
+
+    //清算信息
+    //CompanyCommonScoreV1(spark, "company_liquidating_info","", "401", "update_time", "经营风险", "清算信息", "0", s"$project").calc()
+    //简易注销
+    //CompanyCommonScoreV1(spark, "company_brief_cancel_announcement","", "402", "update_time", "经营风险", "简易注销", "0", s"$project").calc()
+    //严重违法
+    //CompanyCommonScoreV1(spark, "company_illegal_info","", "403", "update_time", "经营风险", "严重违法行为", "0", s"$project").calc()
+    //经营异常
+    //CompanyCommonScoreV1(spark, "company_abnormal_info","", "409", "update_time", "经营风险", "经营异常", "0", s"$project").calc()
+    //税收违法
+    //CompanyCommonScoreV1(spark, "company_tax_contravention", "", "412", "update_time", "经营风险", "税收违法", "0", s"$project").calc()
+    //欠税公告
+    //CompanyCommonScoreV1(spark, "company_own_tax", "", "413", "update_time", "经营风险", "欠税公告", "0", s"$project").calc()
+    //公示催告
+    //CompanyCommonScoreV1(spark, "company_public_announcement2_list", "", "414", "update_time", "经营风险", "公示催告", "0", s"$project").calc()
+    //司法拍卖
+    //CompanyCommonScoreV1(spark, "company_judicial_sale_combine_list", "", "506", "update_time", "法律风险", "司法拍卖", "0", s"$project").calc()
+    //破产重整
+    //CompanyCommonScoreV1(spark, "bankruptcy_open_case", "", "505", "update_time", "法律风险", "破产重整", "0", s"$project").calc()
+    spark.stop()
+  }
+}
+
+case class CompanyCommonScoreV1(s: SparkSession, sourceTable: String, tableView: String = "",
+                                flag: String, time: String, kind: String, project: String,
+                                tp: String = "0", namespace: String = ""
+                               ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+    val adsTable = namespace + ".ads_" + sourceTable
+    val incAdsTable = namespace + ".inc_ads_" + sourceTable
+    val targetTable = namespace + ".ads_" + sourceTable + "_score"
+    var ds = ""
+
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
+
+    val df = sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |        SELECT
+         |                *
+         |                ,COUNT(new_cid) OVER(PARTITION BY new_cid ) AS cnt1
+         |                ,ROW_NUMBER() OVER(PARTITION BY new_cid ORDER BY $time DESC ) AS num
+         |        FROM    $tb
+         |        WHERE   new_cid IS NOT NULL
+         |        ${appsql2}
+         |        ) a
+         |WHERE   num =1
+         |""".stripMargin)
+
+    df.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"t1_view_$sourceTable")
+
+    sql(s"select * from t1_view_$sourceTable").show(20, false)
+
+    sql(s"insert overwrite table $targetTable " +
+      s"partition (ds='${ds}')  select * from t1_view_$sourceTable")
+
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from t1_view_$sourceTable
+           |""".stripMargin)
+
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = r.getAs[Long]("id")
+    val cid = r.getAs[Long]("new_cid").toString
+    val cnt1 = r.getAs[Long]("cnt1")
+    flag match {
+      case "401" => getLiquidatingScore(id, cid, cnt1, kind, prpject)
+      case "402" => getBriefCancelScore(id, cid, cnt1, kind, prpject)
+      case "403" => getIllegalInfoScore(id, cid, cnt1, kind, prpject)
+      case "409" => getAbnormalInfoScore(id, cid, cnt1, kind, prpject)
+      case "412" => getTaxContraventionScore(id, cid, cnt1, kind, prpject)
+      case "413" => getCompanyOwnTaxScore(id, cid, cnt1, kind, prpject)
+      case "414" => getPublicAnnouncementScore(id, cid, cnt1, kind, prpject)
+      case "506" => getJudicialSaleScore(id, cid, cnt1, kind, prpject)
+    }
+  }
+
+  //清算信息
+  def getLiquidatingScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 5f
+      ty = "无"
+    } else {
+      score = 0f
+      ty = "有"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //简易注销
+  def getBriefCancelScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 5f
+      ty = "无"
+    } else {
+      score = 0f
+      ty = "有"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //严重违法行为
+  def getIllegalInfoScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 10f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 10f
+      ty = "无"
+    } else {
+      score = 0f
+      ty = "有"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //经营异常
+  def getAbnormalInfoScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 5f
+      ty = "无"
+    } else {
+      score = 0f
+      ty = "有"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //税收违法
+  def getTaxContraventionScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 10f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 10f
+      ty = "无"
+    } else {
+      score = 0f
+      ty = "有"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //欠税公告
+  def getCompanyOwnTaxScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 10f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 10f
+      ty = "无"
+    } else {
+      score = 0f
+      ty = "有"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //公示催告
+  def getPublicAnnouncementScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 10f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 10f
+      ty = "无"
+    } else {
+      score = 0f
+      ty = "有"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //司法拍卖
+  def getJudicialSaleScore(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 5f
+      ty = "无"
+    } else {
+      score = 0f
+      ty = "有"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+}

+ 124 - 0
src/main/scala/com/winhc/bigdata/spark/model/CompanyEnvPunishment.scala

@@ -0,0 +1,124 @@
+package com.winhc.bigdata.spark.model
+
+import java.util.Date
+
+import cn.hutool.core.util.StrUtil
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Description:环保处罚
+ * @author π
+ * @date 2020/9/3 16:52
+ */
+object CompanyEnvPunishment {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyEnvPunishment(spark, "company_env_punishment",
+      "", "411", "update_time", "经营风险", "环保处罚", "0", s"$project").calc()
+    spark.stop()
+  }
+}
+
+case class CompanyEnvPunishment(s: SparkSession, sourceTable: String, tableView: String = "",
+                                flag: String, time: String, kind: String, project: String,
+                                tp: String = "0", namespace: String = ""
+                              ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+    val adsTable = namespace + ".ads_" + sourceTable
+    val incAdsTable = namespace + ".inc_ads_" + sourceTable
+    val targetTable = namespace + ".ads_" + sourceTable + "_score"
+    var ds = ""
+
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
+
+    val df = sql(
+      s"""
+         |SELECT
+         |        new_cid,concat_ws(',' , collect_set(content)) content
+         |FROM    $tb
+         |WHERE   new_cid IS NOT NULL
+         |${appsql2}
+         |GROUP BY new_cid
+         |""".stripMargin)
+
+    df.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"t1_view")
+
+    sql("select * from t1_view").show(20, false)
+
+    sql(s"insert overwrite table ${targetTable} " +
+      s"partition (ds='${ds}')  select * from t1_view")
+
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from t1_view
+           |""".stripMargin)
+
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = -1
+    val cid = r.getAs[Long]("new_cid").toString
+    val content = r.getAs[String]("content")
+    getScore(id, cid, kind, prpject, content)
+  }
+
+  //环保处罚
+  def getScore(id: Long, cid: String, kind: String, project: String, content: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (StringUtils.isNotBlank(content)) {
+      val bool1 = StrUtil.containsAny(content, "停业关闭", "暂扣", "吊销", "许可证","执照")
+      if (bool1) {
+        score = 0f
+        ty = "责令停产停业关闭、暂扣或吊销许可证、执照的行政处罚"
+      } else {
+        score = 2f
+        ty = "警告、罚款、停产整顿、没收违法所得、行政拘留等其他行政处罚"
+      }
+    }
+
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+}

+ 156 - 0
src/main/scala/com/winhc/bigdata/spark/model/CompanyEquityInfo.scala

@@ -0,0 +1,156 @@
+package com.winhc.bigdata.spark.model
+
+import java.util.Date
+
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Description:股权出质
+ * @author π
+ * @date 2020/9/316:52
+ */
+object CompanyEquityInfo {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyEquityInfo(spark, "company_equity_info_list",
+      "", "301", "main_id", "资产权益", "股权出质(质权人)", "0", s"$project").calc()
+    CompanyEquityInfo(spark, "company_equity_info_list",
+      "", "408", "main_id", "经营风险", "股权出质(出质人)", "0", s"$project").calc()
+    spark.stop()
+  }
+}
+
+case class CompanyEquityInfo(s: SparkSession, sourceTable: String, tableView: String = "",
+                             flag: String, time: String, kind: String, project: String,
+                             tp: String = "0", namespace: String = ""
+                            ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+    val adsTable = namespace + ".ads_" + sourceTable
+    val incAdsTable = namespace + ".inc_ads_" + sourceTable
+    val targetTable = namespace + ".ads_" + sourceTable + "_score"
+    var ds = ""
+
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
+
+    var appsql1 = ""
+    if(flag.equals("301")){
+      appsql1 = s"AND  type = 2"
+    }else if(flag.equals("408")){
+      appsql1 = s"AND  type = 1"
+    }
+
+    val df = sql(
+      s"""
+         |SELECT  *
+         |FROM    (
+         |        SELECT
+         |                *
+         |                ,COUNT(cid) OVER(PARTITION BY cid ) AS cnt1
+         |                ,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY $time DESC ) AS num
+         |        FROM    $tb
+         |        WHERE   cid IS NOT NULL
+         |        ${appsql2} $appsql1
+         |        ) a
+         |WHERE   num =1
+         |""".stripMargin)
+
+    df.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"t1_view")
+
+    sql("select * from t1_view").show(20, false)
+
+    sql(s"insert ${if (flag.equals("408")) "INTO" else "OVERWRITE"} table ${targetTable} " +
+      s"partition (ds='${ds}')  select * from t1_view")
+
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from t1_view
+           |""".stripMargin)
+
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = -1
+    val cid = r.getAs[Long]("cid").toString
+    val cnt1 = r.getAs[Long]("cnt1")
+    flag match {
+      case "301" => getScore1(id, cid, cnt1, kind, prpject)
+      case "408" => getScore2(id, cid, cnt1, kind, prpject)
+    }
+  }
+
+  //股权出质(质权人)
+  def getScore1(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 2f
+      ty = "无"
+    } else if (cnt1 < 3) {
+      score = 4f
+      ty = "质权数量<3"
+    } else {
+      score = 5f
+      ty = "质权数量≥3"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+  //股权出质(出质人)
+  def getScore2(id: Long, cid: String, cnt1: Long, kind: String, project: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (cnt1 == 0) {
+      score = 5f
+      ty = "无"
+    }else {
+      score = 1f
+      ty = "有"
+    }
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+}

+ 126 - 0
src/main/scala/com/winhc/bigdata/spark/model/CompanyPunishmentInfo.scala

@@ -0,0 +1,126 @@
+package com.winhc.bigdata.spark.model
+
+import java.util.Date
+
+import cn.hutool.core.util.StrUtil
+import com.winhc.bigdata.calc.DimScoreV2
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, Maxcomputer2Hbase, SparkUtils}
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.{Row, SparkSession}
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+
+/**
+ * @Description:行政处罚
+ * @author π
+ * @date 2020/9/3 16:52
+ */
+object CompanyPunishmentInfo {
+  def main(args: Array[String]): Unit = {
+    val project = "winhc_eci_dev"
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> s"$project",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
+    CompanyPunishmentInfo(spark, "company_punishment_info",
+      "", "410", "update_time", "经营风险", "行政处罚", "0", s"$project").calc()
+    spark.stop()
+  }
+}
+
+
+case class CompanyPunishmentInfo(s: SparkSession, sourceTable: String, tableView: String = "",
+                                 flag: String, time: String, kind: String, project: String,
+                                 tp: String = "0", namespace: String = ""
+                                ) extends LoggingUtils {
+
+  @(transient@getter) val spark: SparkSession = s
+
+  import spark.implicits._
+
+  def calc(): Unit = {
+    println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
+    val adsTable = namespace + ".ads_" + sourceTable
+    val incAdsTable = namespace + ".inc_ads_" + sourceTable
+    val targetTable = namespace + ".ads_" + sourceTable + "_score"
+    var ds = ""
+
+    //区别有无分区表
+    var appsql2 = ""
+    var tb = adsTable
+    if ("1".equals(tp)) {
+      tb = tableView
+      ds = BaseUtil.getPartion(incAdsTable, spark)
+    } else {
+      ds = BaseUtil.getPartion(adsTable, spark)
+      appsql2 = s"AND  ds = ${ds}"
+    }
+
+    val df = sql(
+      s"""
+         |SELECT
+         |        new_cid,concat_ws(',' , collect_set(content)) content
+         |FROM    $tb
+         |WHERE   new_cid IS NOT NULL
+         |${appsql2}
+         |GROUP BY new_cid
+         |""".stripMargin)
+
+    df.map(r => {
+      trans(r, flag, kind, project)
+    }).toDF("id", "cid", "kind", "kind_code", "project", "project_code", "type",
+      "score", "total", "extraScore")
+      .createOrReplaceTempView(s"t1_view")
+
+    sql("select * from t1_view").show(20, false)
+
+    sql(s"insert overwrite table ${targetTable} " +
+      s"partition (ds='${ds}')  select * from t1_view")
+
+    //同步hbase
+    if ("1".equals(tp)) { //存量计算不用同步hbase
+      val dataFrame = sql(
+        s"""
+           |select
+           |CONCAT_WS('_',cid,project_code) AS rowkey,
+           |id,cid,kind,kind_code,project,project_code,type,score,total,extraScore
+           |from t1_view
+           |""".stripMargin)
+
+      Maxcomputer2Hbase(dataFrame, "COMPANY_SCORE").syn()
+    }
+    println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
+  }
+
+  def trans(r: Row, flag: String, kind: String, prpject: String) = {
+    val id = -1
+    val cid = r.getAs[Long]("new_cid").toString
+    val content = r.getAs[String]("content")
+    getScore(id, cid, kind, prpject, content)
+  }
+
+  //行政处罚
+  def getScore(id: Long, cid: String, kind: String, project: String, content: String) = {
+    var score = 0f
+    val total = 5f
+    val extraScore = 0f
+    var ty = ""
+    if (StringUtils.isNotBlank(content)) {
+      val bool1 = StrUtil.containsAny(content, "停业关闭", "暂扣", "吊销", "许可证", "执照")
+      if (bool1) {
+        score = 0f
+        ty = "责令停产停业关闭、暂扣或吊销许可证、执照的行政处罚"
+      } else {
+        score = 2f
+        ty = "警告、罚款、停产整顿、没收违法所得、行政拘留等其他行政处罚"
+      }
+    }
+
+    (id, cid, kind, DimScoreV2.newsEventMap.get(kind), project, DimScoreV2.newsEventMap.get(project), ty,
+      score, total, extraScore)
+  }
+
+}

+ 10 - 1
src/main/scala/com/winhc/bigdata/spark/udf/CompanyMapping.scala

@@ -5,10 +5,11 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.spark.broadcast.Broadcast
 import org.apache.spark.sql.SparkSession
 import com.winhc.bigdata.spark.utils.BaseUtil._
+import com.winhc.bigdata.spark.utils.RowkeyRuleUtils._
 
 trait CompanyMapping {
 
-  def prepareFunctions(spark: SparkSession): Unit ={
+  def prepareFunctions(spark: SparkSession): Unit = {
     import spark._
     //清理特殊字符
     spark.udf.register("cleanup", (col: String) => {
@@ -22,6 +23,14 @@ trait CompanyMapping {
     spark.udf.register("case_no", (col: String) => {
       caseNo(col)
     })
+
+    spark.udf.register("rowkey_trans", (col: String, tab: String) => {
+      rowkey_trans(col, tab)
+    })
+
+    spark.udf.register("name_judge", (name: String, yg_name: String, bg_name: String) => {
+      nameJudge(name, yg_name, bg_name)
+    })
   }
 
   def prepare(spark: SparkSession): Unit = {

+ 20 - 1
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -66,6 +66,12 @@ object BaseUtil {
     atDaysAfter(-1, nowDate("yyyyMMdd"))
   }
 
+  def getYear(n:Int): String ={
+    val cal = Calendar.getInstance(Locale.CHINA)
+    cal.add(Calendar.YEAR,1*n)
+    cal.get(Calendar.YEAR).toString
+  }
+
   def atDaysAfter(n: Int, time: String, pattern: String = "yyyyMMdd"): String = {
     import java.text.SimpleDateFormat
     val newtime: Date = new SimpleDateFormat("yyyyMMdd").parse(time)
@@ -115,6 +121,7 @@ object BaseUtil {
       ""
     }
   }
+
   def BKDRHash(str: String): Long = {
     val seed: Long = 1313131313 // 31 131 1313 13131 131313 etc..
     var hash: Long = 0
@@ -125,7 +132,19 @@ object BaseUtil {
     return hash
   }
 
+  def nameJudge(name: String, yg_name: String, bg_name: String): String = {
+    if(StringUtils.isNotBlank(name)){
+      if(StringUtils.isNotBlank(yg_name)&&yg_name.contains(name)){
+        return "y"
+      }else if(StringUtils.isNotBlank(bg_name)&&bg_name.contains(name)){
+        return "b"
+      }
+    }
+     ""
+  }
+
   def main(args: Array[String]): Unit = {
-    println(getYesterday())
+    println(getYear(-1))
   }
+
 }

+ 8 - 5
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncSummary.scala

@@ -25,7 +25,8 @@ case class CompanyIncSummary(s: SparkSession,
   private val f_bytes: Array[Byte] = Bytes.toBytes("F")
   private val name_bytes: Array[Byte] = Bytes.toBytes(tableName.toUpperCase)
   val updateTimeMapping = Map(
-    "wenshu_detail_combine" -> "update_date" //文书排序时间
+    "wenshu_detail_combine" -> "update_date", //文书排序时间
+    "company_equity_info_list" -> "ds" //文书排序时间
   )
 
   def calc(): Unit = {
@@ -37,14 +38,16 @@ case class CompanyIncSummary(s: SparkSession,
       .last
 
 
-    val ads_table_cols = spark.table(ads_table).columns.filter(l => {
+    val ads_table_cols = spark.table(ads_table).columns
+    /*  .filter(l => {
       !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids") && !l.equals("cid") && !l.equals("new_litigant_cids")
-    }).toList.sorted
+    }).toList.sorted*/
 
 
-    val inc_ads_table_cols = spark.table(inc_ads_table).columns.filter(l => {
+    val inc_ads_table_cols = spark.table(inc_ads_table).columns
+    /*  .filter(l => {
       !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag") && !l.equals("new_cids") && !l.equals("cids") && !l.equals("cid") && !l.equals("new_litigant_cids")
-    }).toList.sorted
+    }).toList.sorted*/
 
 
     val new_cols = ads_table_cols.intersect(inc_ads_table_cols)

+ 2 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidWithoutMD5Utils.scala

@@ -148,6 +148,8 @@ case class CompanyIncrForCidWithoutMD5Utils(s: SparkSession,
       s"CONCAT_WS('_',new_cid,${cols_md5.mkString(",")})"
     ).syn()
 
+    CompanyIncSummary(spark, project, tableName, "new_cid", dupliCols).calc
+
     println(s"${this.getClass.getSimpleName} calc end! " + new Date().toString)
   }
 }

+ 19 - 4
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -23,7 +23,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
 
   val tabMapping =
     Map("company_court_open_announcement" -> ("litigant_cids", ";") //开庭公告
-      ,"company_send_announcement" -> ("litigant_cids",",")//送达公告
+      , "company_send_announcement" -> ("litigant_cids", ",") //送达公告
     )
 
   val funMap =
@@ -32,6 +32,12 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
       "company_court_announcement.litigant" -> "replace_char(litigant)"
     )
 
+  val rowkey_mapping: Map[String, String] =
+    Map(
+      "company_patent.pub_number" -> s"rowkey_trans(pub_number,'$mainTableName')",//专利
+      "company_patent.app_number" -> s"rowkey_trans(app_number,'$mainTableName')"
+    )
+
   //转换字段
   def trans(s: String): String = {
     val key = mainTableName + "." + s
@@ -42,6 +48,15 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
     res
   }
 
+  def trans2(s: String) = {
+    val key = mainTableName + "." + s
+    var res = s
+    if (rowkey_mapping.contains(key)) {
+      res = rowkey_mapping(key)
+    }
+    res
+  }
+
   def calc(): Unit = {
     println(s"${this.getClass.getSimpleName} calc start! " + new Date().toString)
 
@@ -88,7 +103,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
       runDs = BaseUtil.atDaysAfter(1, lastDsIncAds)
     }
 
-    val cols_md5 = dupliCols.filter(!_.equals("new_cid")).map(trans)
+    val cols_md5 = dupliCols.filter(!_.equals("new_cid")).map(trans).map(trans2)
 
     //增量ods和增量ads最后一个分区相等,跳出
     if (lastDsIncOds.equals(lastDsIncAds)) {
@@ -97,7 +112,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
       val l1 = sql(s"show partitions $inc_ads_company_tb").collect.toList.map(_.getString(0).split("=")(1)).sorted
       if (l1.size > 1) {
         runDs = BaseUtil.atDaysAfter(1, l1(l1.size - 2))
-      }else{
+      } else {
         runDs = firstDsIncOds
       }
       //sys.exit(-1)
@@ -159,7 +174,7 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |                    ,new_cid
          |                    ,cid
          |                    ,${sublistTableFieldName.mkString(",")}
-         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.map(trans).mkString(",")})) ORDER BY update_time DESC ) num
+         |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.map(trans).map(trans2)mkString(",")})) ORDER BY update_time DESC ) num
          |            FROM    (
          |                        SELECT  "0" AS $f
          |                                ,CAST(new_cid AS STRING) AS new_cid

+ 7 - 1
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -118,7 +118,13 @@ trait LoggingUtils extends Logging {
 
   def getPartitions(t: String): Seq[String] = {
     val sql_s = s"show partitions " + t
-    sql(sql_s).collect.toList.map(_.getString(0).split("=")(1)).seq
+    sql(sql_s).collect.toList.map(r => r.getString(0)).map(r => {
+      if (r.contains("/")) {
+        r.split("/").find(s => s.contains("ds")).map(s => s.split("=")(1)).orNull
+      } else {
+        r.split("=")(1)
+      }
+    })
   }
 
   def getSecondLastPartitionOrElse(t: String, default: String): String = {

+ 55 - 0
src/main/scala/com/winhc/bigdata/spark/utils/RowkeyRuleUtils.scala

@@ -0,0 +1,55 @@
+package com.winhc.bigdata.spark.utils
+
+import cn.hutool.crypto.SecureUtil
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Description:主键规则字段修改
+ * @author π
+ * @date 2020/9/49:28
+ */
+object RowkeyRuleUtils {
+  val md5Map =
+    Map("company_patent" -> "1"
+    )
+
+  val reg = "^CN.*".r
+
+  def rowkey_trans(s: String, name: String): String = {
+    var res = ""
+    if (md5Map.contains(name)) {
+      res = md5Map(name)
+    }
+    val r = res match {
+      case "1" => patent(s)
+      case _ => s
+    }
+    r
+  }
+
+  //专利规则
+  def patent(s: String): String = {
+    if (StringUtils.isBlank(s)) {
+      return ""
+    }
+    var r = s
+    val flag = reg.pattern.matcher(s.toUpperCase()).matches()
+    if (flag) {
+      r = s.substring(2)
+    }
+    r
+  }
+
+  def main(args: Array[String]): Unit = {
+
+    val r1 ="CN211281060U"
+    val r2 ="2019218036260"
+    val re1 = rowkey_trans(r1, "company_patent")
+    val re2 = rowkey_trans(r2, "company_patent")
+    val md5 = SecureUtil.md5(re1 + re2)
+    val md52 = SecureUtil.md5(r1 + r2)
+    println(md5)
+    println(md52)
+    println(re1 + " " + re2)
+  }
+}

+ 9 - 20
src/main/scala/com/winhc/bigdata/spark/utils/case_connect_utils.scala

@@ -95,26 +95,15 @@ object case_connect_utils {
     0
   }
 
-  def sort(v1: String, v2: String): String = {
-    val seq = Seq(v1, v2)
-    seq.filter(_ != null).sorted.mkString("")
-  }
-
   def main(args: Array[String]): Unit = {
-    for(e<-Seq(("a","b"),("b","a"))){
-      println(sort(e._1,e._2))
-    }
-
-
-
-//    val current_case_party_list: Seq[String] = Seq("张三", "张二", "张一", "张四")
-//    val connect_case_party_list: Seq[String] = Seq("张三", "张二")
-//
-//    val current_case_no = ""
-//    val connect_case_no = ""
-//    val current_court_name = ""
-//    val connect_court_name = ""
-//
-//    println(isConnect(current_case_party_list, connect_case_party_list, current_case_no, connect_case_no, current_court_name, connect_court_name))
+    //    val current_case_party_list: Seq[String] = Seq("张三", "张二", "张一", "张四")
+    //    val connect_case_party_list: Seq[String] = Seq("张三", "张二")
+    //
+    //    val current_case_no = ""
+    //    val connect_case_no = ""
+    //    val current_court_name = ""
+    //    val connect_court_name = ""
+    //
+    //    println(isConnect(current_case_party_list, connect_case_party_list, current_case_no, connect_case_no, current_court_name, connect_court_name))
   }
 }