Browse Source

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

晏永年 4 years ago
parent
commit
4918ad85ea

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/implicits/BaseHelper.scala

@@ -16,7 +16,7 @@ object BaseHelper {
     def isRegCapital(): Boolean = pattern matches str
     def isRegCapital(): Boolean = pattern matches str
 
 
 
 
-    def getOrNull(): String = if (str == null) "null" else "\"" + str + "\""
+    def getOrNull(): String = if (str == null) "\"\"" else "\"" + str + "\""
 
 
 
 
     def toRegCapital(): Double = str match {
     def toRegCapital(): Double = str match {

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyAnnualReport.scala

@@ -238,7 +238,7 @@ object CompanyAnnualReport {
           , "rowkey"
           , "rowkey"
           , "cid" +: writCols)
           , "cid" +: writCols)
 
 
-      CompanyIncSummary(spark, project, "company_annual_report", "new_cid", Seq("rowkey")).calc
+      CompanyIncSummary(spark, project, "company_annual_report", "new_cid", Seq("new_cid","report_year")).calc
 
 
     }
     }
 
 
@@ -407,11 +407,11 @@ object CompanyAnnualReport {
 
 
     if (all_flag) {
     if (all_flag) {
       //存量
       //存量
-      //      CompanyAnnualReportHandle(spark, project).main_table_all()
+     /* CompanyAnnualReportHandle(spark, project).main_table_all()
       for (elem <- sublist_map) {
       for (elem <- sublist_map) {
         println("xjk:" + elem._1)
         println("xjk:" + elem._1)
         CompanyAnnualReportHandle(spark, project).sublist_all(elem._1, elem._2.split(","))
         CompanyAnnualReportHandle(spark, project).sublist_all(elem._1, elem._2.split(","))
-      }
+      }*/
     } else {
     } else {
       //增量
       //增量
       CompanyAnnualReportHandle(spark, project).main_table_inc()
       CompanyAnnualReportHandle(spark, project).main_table_inc()

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -352,7 +352,8 @@ object ChangeExtract {
     , Args(tableName = "company_illegal_info", primaryFields = "remove_reason")
     , Args(tableName = "company_illegal_info", primaryFields = "remove_reason")
     , Args(tableName = "company_finance", primaryFields = "round")
     , Args(tableName = "company_finance", primaryFields = "round")
     , Args(tableName = "company_dishonest_info", primaryFields = "case_no")
     , Args(tableName = "company_dishonest_info", primaryFields = "case_no")
-    //    , Args(tableName = "company_holder",  primaryFields = "amount")
+    , Args(tableName = "company_holder", primaryFields = "amount")
+    , Args(tableName = "company_annual_report_out_investment", primaryFields = "main_id")
   )
   )
 
 
 
 

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_abnormal_info.scala

@@ -2,7 +2,7 @@
 package com.winhc.bigdata.spark.jobs.chance.table
 package com.winhc.bigdata.spark.jobs.chance.table
 
 
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
 import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
-import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
 
 
 /**
 /**
  * @Author: XuJiakai
  * @Author: XuJiakai
@@ -16,5 +16,5 @@ case class company_abnormal_info(equCols: Seq[String]) extends CompanyChangeHand
 
 
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "经营异常", Array("put_department", "remove_department", "put_reason", "put_date", "remove_date", "remove_reason"))
   override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "经营异常", Array("put_department", "remove_department", "put_reason", "put_date", "remove_date", "remove_reason"))
 
 
-  override def getBizTime(newMap: Map[String, String]): String = newMap("put_date")
+  override def getBizTime(newMap: Map[String, String]): String = DateUtils.getNotNullStr(newMap("put_date"),newMap("update_time"))
 }
 }

+ 19 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_annual_report_out_investment.scala

@@ -0,0 +1,19 @@
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/21 13:41
+ * @Description: 年报对外投资
+ */
+case class company_annual_report_out_investment(equCols: Seq[String]) extends CompanyChangeHandle {
+  override def getUpdateTitle(newMap: Map[String, String]): String = "对外投资发生变更"
+
+  override def getInsertTitle(newMap: Map[String, String]): String = s"新增一家对外投资:${newMap.getOrElse("out_investment_name", "")}"
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "对外投资", Array("out_investment_cid", "out_investment_name", "reg_number", "credit_code"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("update_time")
+}

+ 19 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_holder.scala

@@ -0,0 +1,19 @@
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/19 14:10
+ * @Description: 股东
+ */
+case class company_holder(equCols: Seq[String]) extends CompanyChangeHandle {
+  override def getUpdateTitle(newMap: Map[String, String]): String = "股东发生更新"
+
+  override def getInsertTitle(newMap: Map[String, String]): String = "新增股东"
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "股东信息", Array("holder_id", "holder_type", "amount", "capital", "capital_actual"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("update_time")
+}

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_illegal_info.scala

@@ -13,7 +13,7 @@ case class company_illegal_info(equCols: Seq[String]) extends CompanyChangeHandl
 
 
   override def getInsertTitle(newMap: Map[String, String]): String = "新增一条严重违法"
   override def getInsertTitle(newMap: Map[String, String]): String = "新增一条严重违法"
 
 
-  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String =  ChangeExtractUtils.getTags(newMap, "严重违法", Array("put_reason", "put_date", "put_department", "remove_reason", "remove_date", "remove_department", "type"))
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "严重违法", Array("put_reason", "put_date", "put_department", "remove_reason", "remove_date", "remove_department", "type"))
 
 
-  override def getBizTime(newMap: Map[String, String]): String = DateUtils.getMaxDate(newMap.getOrElse("put_date", ""), newMap.getOrElse("remove_date", ""))
+  override def getBizTime(newMap: Map[String, String]): String = DateUtils.getNotNullStr(newMap("put_date"), newMap("remove_date"), newMap("update_time"))
 }
 }

+ 28 - 19
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -203,18 +203,20 @@ object CompanyDynamic {
     , Args(tableName = "company_brief_cancel_announcement", bName = 1)
     , Args(tableName = "company_brief_cancel_announcement", bName = 1)
     , Args(tableName = "company_liquidating_info", bName = 1)
     , Args(tableName = "company_liquidating_info", bName = 1)
 
 
-    , Args(tableName = "company_zxr_final_case", bName = 1)//终本案件
-    , Args(tableName = "company_license_creditchina", bName = 1)//行政许可-信用中国
-    , Args(tableName = "company_license_entpub", bName = 1)//行政许可-企业公示
-    , Args(tableName = "company_license", bName = 1)//行政许可
-    , Args(tableName = "company_check_info", bName = 1)//抽查检查
-    , Args(tableName = "company_court_announcement_list", bName = 1)//法院公告
-    , Args(tableName = "company_court_open_announcement_list", bName = 1)//开庭公告
-    , Args(tableName = "company_court_register_list", bName = 1)//立案信息
-    , Args(tableName = "company_double_random_check_info", bName = 1)//双随机抽查
-    , Args(tableName = "company_judicial_sale_combine_list", bName = 1)//司法拍卖
-    , Args(tableName = "company_tax_contravention", bName = 1)//税收违法
-    , Args(tableName = "wenshu_detail_combine", bName = 1)//裁判文书
+    , Args(tableName = "company_zxr_final_case", bName = 1) //终本案件
+    , Args(tableName = "company_license_creditchina", bName = 1) //行政许可-信用中国
+    , Args(tableName = "company_license_entpub", bName = 1) //行政许可-企业公示
+    , Args(tableName = "company_license", bName = 1) //行政许可
+    , Args(tableName = "company_check_info", bName = 1) //抽查检查
+    , Args(tableName = "company_court_announcement_list", bName = 1) //法院公告
+    , Args(tableName = "company_court_open_announcement_list", bName = 1) //开庭公告
+    , Args(tableName = "company_court_register_list", bName = 1) //立案信息
+    , Args(tableName = "company_double_random_check_info", bName = 1) //双随机抽查
+    , Args(tableName = "company_judicial_sale_combine_list", bName = 1) //司法拍卖
+    , Args(tableName = "company_tax_contravention", bName = 1) //税收违法
+    , Args(tableName = "wenshu_detail_combine", bName = 1) //裁判文书
+    , Args(tableName = "company_holder", bName = 1) //裁判文书
+    , Args(tableName = "company_annual_report_out_investment", bName = 1) //裁判文书
   )
   )
 
 
   private case class Args(project: String = "winhc_eci_dev"
   private case class Args(project: String = "winhc_eci_dev"
@@ -228,7 +230,7 @@ object CompanyDynamic {
       println(
       println(
         s"""
         s"""
            |Please enter the legal parameters !
            |Please enter the legal parameters !
-           |<project> <ds> <tableNames>
+           |<project> <tableNames> <ds>
            |""".stripMargin)
            |""".stripMargin)
       sys.exit(-99)
       sys.exit(-99)
     }
     }
@@ -250,13 +252,20 @@ object CompanyDynamic {
     val cd = CompanyDynamicUtil(spark, project, ds)
     val cd = CompanyDynamicUtil(spark, project, ds)
     cd.init()
     cd.init()
 
 
-    val ts = tableNames.split(",").toSet
+    if (tableNames.equals("all")) {
+      startArgs.foreach(e => {
+        cd.calc(e.tableName, e.bName)
+      })
+    } else {
+      val ts = tableNames.split(",").toSet
+
+      startArgs.filter(e => {
+        ts.contains(e.tableName)
+      }).foreach(e => {
+        cd.calc(e.tableName, e.bName)
+      })
+    }
 
 
-    startArgs.filter(e => {
-      ts.contains(e.tableName)
-    }).foreach(e => {
-      cd.calc(e.tableName, e.bName)
-    })
     spark.stop()
     spark.stop()
   }
   }
 }
 }

+ 16 - 8
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -24,14 +24,14 @@ trait CompanyDynamicHandle {
     , "" -> "eci_zscq" //知识产权
     , "" -> "eci_zscq" //知识产权
     , "wenshu_detail_combine" -> "eci_wenshu" //裁判文书
     , "wenshu_detail_combine" -> "eci_wenshu" //裁判文书
     , "company_court_announcement_list" -> "court_announcement" //法院公告
     , "company_court_announcement_list" -> "court_announcement" //法院公告
-    , "" -> "" //对外投资
+    , "company_annual_report_out_investment" -> "company_annual_report_out_investment" //对外投资
     , "company_punishment_info" -> "punishment_info" //行政处罚
     , "company_punishment_info" -> "punishment_info" //行政处罚
     , "company_punishment_info_creditchina" -> "punishment_info_creditchina" //行政处罚-信用中国
     , "company_punishment_info_creditchina" -> "punishment_info_creditchina" //行政处罚-信用中国
     , "company_mortgage_info" -> "eci_chattel" //动产抵押
     , "company_mortgage_info" -> "eci_chattel" //动产抵押
     , "company_env_punishment" -> "env_punishment" //环保处罚
     , "company_env_punishment" -> "env_punishment" //环保处罚
     , "" -> "judicial_assistance" //股权冻结
     , "" -> "judicial_assistance" //股权冻结
     , "company_public_announcement2_list" -> "company_public_announcement2_list" //公示催告
     , "company_public_announcement2_list" -> "company_public_announcement2_list" //公示催告
-    , "" -> "serious_violation" //严重违法
+    , "company_illegal_info" -> "serious_violation" //严重违法
     , "company_brief_cancel_announcement" -> "company_brief_cancel_announcement" //简易注销
     , "company_brief_cancel_announcement" -> "company_brief_cancel_announcement" //简易注销
     , "company_equity_info" -> "stock_pledge" //股权出质
     , "company_equity_info" -> "stock_pledge" //股权出质
     , "company_tax_contravention" -> "tax_illegal" //税收违法
     , "company_tax_contravention" -> "tax_illegal" //税收违法
@@ -57,6 +57,7 @@ trait CompanyDynamicHandle {
     , "company_license_creditchina" -> "company_license_creditchina" //行政许可-信用中国
     , "company_license_creditchina" -> "company_license_creditchina" //行政许可-信用中国
     , "company_license_entpub" -> "company_license_entpub" //行政许可-企业公示
     , "company_license_entpub" -> "company_license_entpub" //行政许可-企业公示
     , "company_license" -> "company_license" //行政许可
     , "company_license" -> "company_license" //行政许可
+    , "company_holder" -> "company_holder" //行政许可
   )
   )
 
 
   private val table_2_info_type = Map(
   private val table_2_info_type = Map(
@@ -68,7 +69,7 @@ trait CompanyDynamicHandle {
     , "" -> "5" // 知识产权
     , "" -> "5" // 知识产权
     , "wenshu_detail_combine" -> "6" // 裁判文书
     , "wenshu_detail_combine" -> "6" // 裁判文书
     , "company_court_announcement_list" -> "7" // 法院公告
     , "company_court_announcement_list" -> "7" // 法院公告
-    , "" -> "8" // 对外投资
+    , "company_annual_report_out_investment" -> "8" // 对外投资
     , "company_mortgage_info" -> "9" // 动产抵押
     , "company_mortgage_info" -> "9" // 动产抵押
     , "company_judicial_sale_combine_list" -> "10" // 司法拍卖
     , "company_judicial_sale_combine_list" -> "10" // 司法拍卖
     , "company_land_publicity" -> "11-1" // 土地信息-地块公示
     , "company_land_publicity" -> "11-1" // 土地信息-地块公示
@@ -82,7 +83,7 @@ trait CompanyDynamicHandle {
     , "company_public_announcement2_list" -> "15" // 公示催告
     , "company_public_announcement2_list" -> "15" // 公示催告
     , "company_env_punishment" -> "16" // 环保处罚
     , "company_env_punishment" -> "16" // 环保处罚
     , "company_equity_info" -> "17" // 股权出质
     , "company_equity_info" -> "17" // 股权出质
-    , "" -> "18" // 严重违法
+    , "company_illegal_info" -> "18" // 严重违法
     , "company_brief_cancel_announcement" -> "19" // 简易注销
     , "company_brief_cancel_announcement" -> "19" // 简易注销
     , "company_own_tax" -> "20" // 欠税公告
     , "company_own_tax" -> "20" // 欠税公告
     , "company_tax_contravention" -> "21" // 税收违法
     , "company_tax_contravention" -> "21" // 税收违法
@@ -92,7 +93,7 @@ trait CompanyDynamicHandle {
     , "" -> "25" // 实际控制人变更
     , "" -> "25" // 实际控制人变更
     , "company_court_open_announcement_list" -> "26" // 开庭公告
     , "company_court_open_announcement_list" -> "26" // 开庭公告
     , "" -> "27" // 新闻信息
     , "" -> "27" // 新闻信息
-    , "" -> "28" // 股东信息
+    , "company_holder" -> "28" // 股东信息
     , "" -> "29" // 最终受益人
     , "" -> "29" // 最终受益人
     , "company_staff" -> "30" // 主要成员
     , "company_staff" -> "30" // 主要成员
     , "company_finance" -> "31" // 融资动态
     , "company_finance" -> "31" // 融资动态
@@ -120,7 +121,7 @@ trait CompanyDynamicHandle {
     , "" -> "4" //司法拍卖
     , "" -> "4" //司法拍卖
     , "company_liquidating_info" -> "4" //清算信息
     , "company_liquidating_info" -> "4" //清算信息
     , "company_brief_cancel_announcement" -> "4" //简易注销
     , "company_brief_cancel_announcement" -> "4" //简易注销
-    , "" -> "4" //严重违法
+    , "company_illegal_info" -> "4" //严重违法
     , "" -> "3" //裁判文书(被告)
     , "" -> "3" //裁判文书(被告)
     , "" -> "3" //裁判文书(被执行人)
     , "" -> "3" //裁判文书(被执行人)
     , "" -> "3" //法院公告(被告/被执行人)
     , "" -> "3" //法院公告(被告/被执行人)
@@ -146,11 +147,12 @@ trait CompanyDynamicHandle {
     , "" -> "2" //对外投资企业注销/吊销/经营异常
     , "" -> "2" //对外投资企业注销/吊销/经营异常
     , "" -> "2" //分支机构注销/吊销/经营异常
     , "" -> "2" //分支机构注销/吊销/经营异常
     , "" -> "2" //新闻舆论(中立、消极)
     , "" -> "2" //新闻舆论(中立、消极)
+    , "company_holder" -> "2" //股东信息
     , "company_finance" -> "2" //融资
     , "company_finance" -> "2" //融资
     , "" -> "1" //增资
     , "" -> "1" //增资
     , "" -> "1" //裁判文书(原告)
     , "" -> "1" //裁判文书(原告)
     , "" -> "1" //裁判文书(申请执行人)
     , "" -> "1" //裁判文书(申请执行人)
-    , "" -> "1" //对外投资
+    , "company_annual_report_out_investment" -> "1" //对外投资
     , "" -> "1" //分支机构
     , "" -> "1" //分支机构
     , "" -> "1" //购地信息
     , "" -> "1" //购地信息
     , "" -> "1" //地块公示
     , "" -> "1" //地块公示
@@ -245,7 +247,13 @@ trait CompanyDynamicHandle {
    * @param new_map
    * @param new_map
    * @return
    * @return
    */
    */
-  protected def get_change_time(bizDate: String, new_map: Map[String, String]): String = bizDate
+  protected def get_change_time(bizDate: String, new_map: Map[String, String]): String = {
+    var res = bizDate
+    if (bizDate.length == 10) {
+      res = res.concat(" 00:00:00")
+    }
+    res
+  }
 
 
   /**
   /**
    * 业务id
    * 业务id

+ 31 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_annual_report_out_investment.scala

@@ -0,0 +1,31 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+import com.winhc.bigdata.spark.implicits.MapHelper._
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/21 13:44
+ * @Description:
+ */
+case class company_annual_report_out_investment() extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = s"新增一家对外投资:${new_map.getOrElse("out_investment_name", "")}"
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(
+    Seq("out_investment_cid", "out_investment_name", "reg_number", "credit_code"
+    )
+  )
+}

+ 45 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_holder.scala

@@ -0,0 +1,45 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+import com.winhc.bigdata.spark.implicits.MapHelper._
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/19 14:15
+ * @Description:
+ */
+case class company_holder() extends CompanyDynamicHandle {
+
+
+  /**
+   * 来源表的变更类型,insert or update
+   *
+   * @return
+   */
+  override def org_type(): Seq[String] = Seq("insert", "update")
+
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = s"股东及出资信息发生变化"
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+    "holder_id"
+    , "holder_type"
+    , "amount"
+    , "capital"
+    , "capital_actual"
+    , "percent"
+  ))
+}