Prechádzať zdrojové kódy

Merge branch 'master' of http://139.224.213.4:3000/bigdata/Spark_Max

晏永年 4 rokov pred
rodič
commit
90071e2176

+ 4 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -292,7 +292,8 @@ object ChangeExtract {
     , Args(tableName = "company_land_announcement", primaryFields = "e_number,project_name")
     , Args(tableName = "company_bid_list", primaryFields = "title")
     , Args(tableName = "company_land_transfer", primaryFields = "num,location")
-    , Args(tableName = "company_employment", primaryFields = "source")
+    , Args(tableName = "company_employment", primaryFields = "title,cid,url_path")
+    , Args(tableName = "company_brief_cancel_announcement_result", primaryFields = "cid,main_id")
     , Args(tableName = "company_env_punishment", primaryFields = "punish_number")
     , Args(tableName = "company_icp", primaryFields = "domain")
     , Args(tableName = "company_punishment_info", primaryFields = "punish_number")
@@ -321,6 +322,8 @@ object ChangeExtract {
     //公司名称,法人ID:人标识或公司标识,公司类型,注册地址,营业期限终止日期,经营范围,登记机关,企业状态                 ,注册资本,实收资本金额(单位:分),注销日期,注销原因
     , Args(tableName = "company", primaryKey = "cid", primaryFields = "name,legal_entity_id,company_org_type,reg_location,to_time,business_scope,reg_institute,reg_status,reg_capital,actual_capital_amount,cancel_date,cancel_reason")
     , Args(tableName = "company_illegal_info",  primaryFields = "remove_reason")
+    , Args(tableName = "company_finance",  primaryFields = "round")
+    , Args(tableName = "company_dishonest_info",  primaryFields = "case_no")
   )
 
   private case class Args(project: String = "winhc_eci_dev"

+ 24 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_brief_cancel_announcement_result.scala

@@ -0,0 +1,24 @@
+
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/7/9 16:44
+ * @Description:
+ */
+
+
+//
+
+case class company_brief_cancel_announcement_result(equCols: Seq[String]) extends CompanyChangeHandle {
+  override def getUpdateTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"${newMap("title")}简易注销信息发生变更")
+
+  override def getInsertTitle(newMap: Map[String, String]): String = getValueOrNull(newMap("title"), s"新增${newMap("title")}简易注销信息")
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "简易注销", Array("cname", "brief_cancel_result"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("start_date")
+}

+ 19 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_dishonest_info.scala

@@ -0,0 +1,19 @@
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.{ChangeExtractUtils, DateUtils}
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/12 18:50
+ * @Description: 企业失信被执
+ */
+case class company_dishonest_info(equCols: Seq[String]) extends CompanyChangeHandle {
+  override def getUpdateTitle(newMap: Map[String, String]): String = "企业失信被执发生变化"
+
+  override def getInsertTitle(newMap: Map[String, String]): String = s"新增1条企业失信信息:${newMap.getOrElse("court","")}"
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "企业失信被执", Array("name", "case_no", "court", "gist_dd", "gist_unit"))
+
+  override def getBizTime(newMap: Map[String, String]): String = DateUtils.getNotNullStr(newMap("pub_date"),newMap("reg_time"),newMap("appro_time"),newMap("update_time"))
+}

+ 19 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_finance.scala

@@ -0,0 +1,19 @@
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/12 17:42
+ * @Description: 融资历史
+ */
+case class company_finance(equCols: Seq[String]) extends CompanyChangeHandle {
+  override def getUpdateTitle(newMap: Map[String, String]): String = "融资历史发生变化"
+
+  override def getInsertTitle(newMap: Map[String, String]): String = s"获得了${newMap.getOrElse("round", "")}融资,由${newMap.getOrElse("inverstors", "")}投资"
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "融资历史", Array("company_name", "finance_time", "money", "round", "inverstors"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("report_date")
+}

+ 3 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -184,6 +184,7 @@ object CompanyDynamic {
     , Args(tableName = "bankruptcy_open_case", bName = 1)
     , Args(tableName = "company_illegal_info", bName = 0)
     , Args(tableName = "company_land_publicity", bName = 1)
+    , Args(tableName = "company_employment", bName = 1)
     , Args(tableName = "company_land_announcement", bName = 1)
     , Args(tableName = "company_bid_list", bName = 1)
     , Args(tableName = "company_land_transfer", bName = 1)
@@ -194,6 +195,8 @@ object CompanyDynamic {
     , Args(tableName = "company_public_announcement2_list", bName = 1)
     , Args(tableName = "company_mortgage_info", bName = 1)
     , Args(tableName = "company_stock_announcement", bName = 1)
+    , Args(tableName = "company_finance", bName = 1)
+    , Args(tableName = "company_dishonest_info", bName = 1)
     , Args(tableName = "company_send_announcement_list", bName = 1)
     , Args(tableName = "company_annual_report_out_guarantee", bName = 1)
   )

+ 9 - 7
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -16,14 +16,14 @@ trait CompanyDynamicHandle {
     , "company_land_mortgage" -> "land_mortgage" //土地抵押
     , "company_bid_list" -> "tender_es" //中标信息ES
     , "" -> "enterprise_shixin" //失信
-    , "" -> "enterprise_zhixing" //被执
+    , "company_dishonest_info" -> "enterprise_zhixing" //被执
     , "" -> "shareholder_shixin" //股东失信
     , "" -> "shareholder_zhixing" //股东被执
     , "" -> "tender_qichacha" //中标信息企查查
     , "company_abnormal_info" -> "eci_exception" //经营异常
     , "" -> "eci_zscq" //知识产权
     , "" -> "eci_wenshu" //裁判文书
-    , "company_court_announcement_list" -> "court_announcement" //法院公告
+    , "" -> "court_announcement" //法院公告
     , "" -> "" //对外投资
     , "company_punishment_info" -> "punishment_info" //行政处罚
     , "company_punishment_info_creditchina" -> "punishment_info_creditchina" //行政处罚-信用中国
@@ -37,7 +37,7 @@ trait CompanyDynamicHandle {
     , "company_tax_contravention" -> "tax_illegal" //税收违法
     , "company_own_tax" -> "tax_owenotice" //欠税公告
     , "company_judicial_sale_combine_list" -> "judicial" //司法拍卖
-    , "" -> "recruit" //招聘信息
+    , "company_employment" -> "recruit" //招聘信息
     , "" -> "liquidation_information" //清算信息
     , "" -> "investor_equity_change" //大股东变更
     , "" -> "actual_controller_change" //实际控制人变更
@@ -48,6 +48,7 @@ trait CompanyDynamicHandle {
     , "company_annual_report_out_guarantee" -> "company_annual_report_out_guarantee" //年报-对外担保
 
     , "company_staff" -> "company_staff" //主要成员
+    , "company_finance" -> "company_finance" //融资动态
     , "company_check_info" -> "spot_check" //抽查检查
     , "company_double_random_check_info" -> "company_double_random_check_info" //双随机抽查
     , "company_court_register" -> "company_court_register" //立案信息
@@ -56,7 +57,7 @@ trait CompanyDynamicHandle {
   private val table_2_info_type = Map(
     "CompanyDynamicHandleTest" -> "0"
     , "company" -> "1" //工商信息
-    , "" -> "2" // 企业失信被执
+    , "company_dishonest_info" -> "2" // 企业失信被执
     , "" -> "3" // 企业股东失信被执
     , "company_abnormal_info" -> "4" // 经营异常
     , "" -> "5" // 知识产权
@@ -70,7 +71,7 @@ trait CompanyDynamicHandle {
     , "company_land_transfer" -> "11-3" // 土地信息-土地转让
     , "company_land_mortgage" -> "11-4" // 土地信息-土地抵押
     , "company_bid_list" -> "12" // 中标信息
-    , "" -> "13" // 招聘信息
+    , "company_employment" -> "13" // 招聘信息
     , "company_punishment_info" -> "14-1" // 行政处罚
     , "company_punishment_info_creditchina" -> "14-2" // 行政处罚-信用中国
     , "company_public_announcement2_list" -> "15" // 公示催告
@@ -89,7 +90,7 @@ trait CompanyDynamicHandle {
     , "" -> "28" // 股东信息
     , "" -> "29" // 最终受益人
     , "company_staff" -> "30" // 主要成员
-    , "" -> "31" // 融资动态
+    , "company_finance" -> "31" // 融资动态
     , "company_stock_announcement" -> "32" // 企业公告
     , "company_check_info" -> "33" // 抽查检查
     , "" -> "34" // 行政许可
@@ -105,7 +106,7 @@ trait CompanyDynamicHandle {
 
   //风险等级映射
   private val info_risk_level_map = Map(
-    "" -> "4" //企业失信被执情况
+    "company_dishonest_info" -> "4" //企业失信被执情况
     , "" -> "4" //股东失信被执情况
     , "" -> "4" //股权冻结
     , "" -> "4" //司法拍卖
@@ -137,6 +138,7 @@ trait CompanyDynamicHandle {
     , "" -> "2" //对外投资企业注销/吊销/经营异常
     , "" -> "2" //分支机构注销/吊销/经营异常
     , "" -> "2" //新闻舆论(中立、消极)
+    , "company_finance" -> "2" //融资
     , "" -> "1" //增资
     , "" -> "1" //裁判文书(原告)
     , "" -> "1" //裁判文书(申请执行人)

+ 35 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_brief_cancel_announcement_result.scala

@@ -0,0 +1,35 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+case class company_brief_cancel_announcement_result()  extends CompanyDynamicHandle {
+  /**
+    * 信息描述
+    *
+    * @param old_map
+    * @param new_map
+    * @return
+    */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.getOrElse("name", null)
+
+
+  /**
+    * 风险等级
+    *
+    * @param old_map
+    * @param new_map
+    * @return
+    */
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "高风险"
+
+  /**
+    * 变更内容
+    *
+    * @param old_map
+    * @param new_map
+    * @return
+    */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq("cname->公司名称", "brief_cancel_result->简易注销结果", "announcement_apply_date->公告申请日期"))
+
+}

+ 53 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_dishonest_info.scala

@@ -0,0 +1,53 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+import com.winhc.bigdata.spark.implicits.MapHelper._
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/12 18:50
+ * @Description: 企业失信被执
+ */
+case class company_dishonest_info() extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = s"新增1条企业失信信息:${new_map.getOrElse("court", "")}"
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+    "name->失信人名称"
+    , "age->年龄"
+    , "sexy->性别"
+    , "case_no->案号"
+    , "focus_num->关注数"
+    , "card_num"
+    , "legal_name->法人、负责人姓名"
+    , "legal_name_id->法人标识"
+    , "legal_name_type"
+    , "staff->法定负责人/主要负责人信息"
+    , "area->省份地区"
+    , "court->法院"
+    , "gist_dd->执行依据文号"
+    , "reg_time->立案时间"
+    , "gist_unit->做出执行的依据单位"
+    , "duty->生效法律文书确定的义务"
+    , "performance->履行情况"
+    , "unperform_part->未履行"
+    , "performed_part->已履行"
+    , "action_content->失信被执行人行为具体情形"
+    , "pub_date->发布时间"
+    , "lawsuit_url"
+    , "appro_time->与官网核准的时间"
+  ))
+}

+ 38 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_employment.scala

@@ -0,0 +1,38 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.implicits.MapHelper._
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+
+/**
+  * 招聘信息
+  * @Author ydw
+  */
+case class company_employment() extends CompanyDynamicHandle {
+  /**
+    * 信息描述
+    *
+    * @param old_map
+    * @param new_map
+    * @return
+    */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map.getOrElse("name", null)
+
+
+  /**
+    * 风险等级
+    *
+    * @param old_map
+    * @param new_map
+    * @return
+    */
+  override protected def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示"
+
+  /**
+    * 变更内容
+    *
+    * @param old_map
+    * @param new_map
+    * @return
+    */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq("city->地区", "description->招聘描述", "source->招聘来源", "employ_num->招聘人数", "start_date->职位发布日期", "ori_salary->薪资", "education->学历", "experience->工作经验", "title->职位"))
+}

+ 35 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_finance.scala

@@ -0,0 +1,35 @@
+package com.winhc.bigdata.spark.jobs.dynamic.tables
+
+import com.winhc.bigdata.spark.jobs.dynamic.CompanyDynamicHandle
+import com.winhc.bigdata.spark.implicits.MapHelper._
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/12 17:43
+ * @Description: 融资历史
+ */
+case class company_finance() extends CompanyDynamicHandle {
+  /**
+   * 信息描述
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = s"获得了${new_map.getOrElse("round", "")}融资,由${new_map.getOrElse("inverstors", "")}投资"
+
+  /**
+   * 变更内容
+   *
+   * @param old_map
+   * @param new_map
+   * @return
+   */
+  override protected def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String): String = new_map.toJson(Seq(
+    "company_name->公司名称"
+    , "finance_time->融资时间"
+    , "money->融资金额"
+    , "round->轮次"
+    , "inverstors->投资人"
+  ))
+}

+ 41 - 5
src/main/scala/com/winhc/bigdata/spark/utils/DateUtils.scala

@@ -14,19 +14,55 @@ object DateUtils {
     fm.parse(date).getTime / 1000
   }
 
+  /**
+   * 获取第一个不为空的字符串
+   * @param date
+   * @return
+   */
+  def getNotNullStr(date: String*): String = date.filter(_ != null).head
+
+  /**
+   * 获取最小的一个日期,如果有异常情况则反回第一个日期
+   *
+   * @param date
+   * @return
+   */
+  def getMinDate(date: String*): String = {
+    val notNullDate = date.filter(_ != null)
+    if (notNullDate.map(_.length).distinct.length != 1) {
+      return notNullDate.head
+    }
+    var minDate: String = null
+    for (i <- 0 until notNullDate.length - 1) {
+      minDate = getMaxDateBy2(notNullDate(i), notNullDate(i + 1), true)
+    }
+    minDate
+  }
 
   /**
    * 获取最大的一个日期,如果有异常情况则反回第一个日期
    *
-   * @param date1
-   * @param date2
+   * @param date
    */
-  def getMaxDate(date1: String, date2: String): String = {
+  def getMaxDate(date: String*): String = {
+    val notNullDate = date.filter(_ != null)
+    if (notNullDate.map(_.length).distinct.length != 1) {
+      return notNullDate.head
+    }
+    var maxDate: String = null
+    for (i <- 0 until notNullDate.length - 1) {
+      maxDate = getMaxDateBy2(notNullDate(i), notNullDate(i + 1))
+    }
+    maxDate
+  }
+
+  private def getMaxDateBy2(date1: String, date2: String, reverse: Boolean = false): String = {
     try {
       if (date1.length != date2.length) {
         return date1
       }
-      date1.compareTo(date2) match {
+      val re = if (reverse) -1 else 1
+      date1.compareTo(date2) * re match {
         case -1 => date2
         case 1 => date1
         case _ => date1
@@ -37,7 +73,7 @@ object DateUtils {
   }
 
   def main(args: Array[String]): Unit = {
-    println(getMaxDate("2003-10-12 10:00:00", "2003-11-12 00:00:01"))
+    println(getNotNullStr(null,"2003-10-12 10:00:00", null, "2003-11-12 00:00:02"))
   }
 
 }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/utils/SparkUtils.scala

@@ -19,8 +19,8 @@ object SparkUtils {
       .config("spark.sql.crossJoin.enabled", true)
       .config("spark.hadoop.odps.cupid.smartnat.enable", true)
       .config("odps.exec.dynamic.partition.mode", "nonstrict")
-//      .config("spark.hadoop.odps.project.name", "winhc_eci_dev")
-      .config("spark.hadoop.odps.project.name", "winhc_test_dev")
+      .config("spark.hadoop.odps.project.name", "winhc_eci_dev")
+//      .config("spark.hadoop.odps.project.name", "winhc_test_dev")
       .config("spark.hadoop.odps.access.id", "LTAI4G4n7pAW8tUbJVkkZQPD")
       .config("spark.hadoop.odps.access.key", "uNJOBskzcDqHq1TYG3m2rebR4c1009")
       .config("spark.sql.catalogImplementation", "odps")