Browse Source

案源机会优化

xufei 4 years ago
parent
commit
f24c6b611f

+ 14 - 1
src/main/scala/com/winhc/bigdata/spark/const/CaseChanceConst.scala

@@ -20,10 +20,16 @@ object CaseChanceConst {
     , "company_employment" -> "3"
     , "company_land_announcement" -> "3"
     , "company_land_publicity" -> "3"
+    , "company_dishonest_info" -> "3"//失信人
+    , "company_zxr_restrict" -> "3"//限高
+    , "company_zxr_list" -> "3"//被执
+    , "company_court_open_announcement_list" -> "3"//开庭
+    , "wenshu_detail_combine" -> "3"//文书
+    , "company_holder" -> "3"//股东
+    , "company_equity_info_list" -> "3"//出质人
     , "" -> "4"
   )
 
-
   //利好消息祥细type
   val CHANCE_DYNAMIC_TYPE = Map(
     /*"" -> "3-1" //企业增资
@@ -44,5 +50,12 @@ object CaseChanceConst {
     , "company_certificate" -> "10" //资质证书   X
     , "company_copyright_works_list" -> "11" //作品著作权
     , "company_copyright_reg_list" -> "12" //软件著作权
+    , "company_dishonest_info" -> "13" //失信人
+    , "company_zxr_restrict" -> "14" //限高
+    , "company_zxr_list" -> "15" //被执
+    , "company_court_open_announcement_list" -> "16" //开庭
+    , "wenshu_detail_combine" -> "17" //文书
+    , "company_holder" -> "18" //股东
+    , "company_equity_info_list" -> "19" //出质人
   )
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyCourtAnnouncement.scala

@@ -61,7 +61,7 @@ case class CompanyCourtAnnouncement(s: SparkSession, project: String, //表所
     if (runOld) {
       adsListDs = getFirstPartion(inc_ads_company_tb_list, spark)
     }
-    val ads_eci_debtor_relation = s"${project}.ads_eci_debtor_relation" //债权全量表
+    val ads_eci_debtor_relation = s"winhc_eci.ads_eci_debtor_relation_v2" //债权全量表
     val debtorRelationDs = getPartion(ads_eci_debtor_relation, spark)
 
     //结果表导入生产表

+ 25 - 8
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -37,7 +37,7 @@ object ChangeExtract {
 
   case class ChangeExtractHandle(s: SparkSession,
                                  project: String, //表所在工程名
-                                 tableName: String, //表名(不加前后辍)
+                                 tableName1: String, //表名(不加前后辍)
                                  primaryKey: String, //此维度主键
                                  inc_ds: String, //需要计算的分区
                                  primaryFields: Seq[String] //主要字段,该字段任意一个不同 则认为发生变化
@@ -48,10 +48,25 @@ object ChangeExtract {
     val target_eci_change_extract = "ads_change_extract"
 
     val updateTimeMapping = Map(
-      "wenshu_detail_combine" -> "update_date" //文书排序时间
+      "wenshu_detail_combine" -> "update_date", //文书排序时间
+      "company_equity_info_list" -> "reg_date" //文书排序时间
     )
+    //不同name映射table
+    val tabMapping =
+      Map("company_holder_v2" -> "company_holder"//胜诉案件
+      )
+
+    //转换字段
+    def trans(s: String): String = {
+      var res = s
+      if (tabMapping.contains(s)) {
+        res = tabMapping(s)
+      }
+      res
+    }
 
     def calc(isCopy: Boolean = true): Unit = {
+      val tableName = trans(tableName1)
       val cols = primaryFields.filter(!_.equals(primaryKey)).seq
 
       val ds = inc_ds.replace("-", "")
@@ -63,7 +78,7 @@ object ChangeExtract {
 
       val lastDs_ads_all = getLastPartitionsOrElse(s"$project.ads_$tableName", "0")
 
-      val handle = ReflectUtils.getClazz[CompanyChangeHandle](s"com.winhc.bigdata.spark.jobs.chance.table.$tableName", cols)
+      val handle = ReflectUtils.getClazz[CompanyChangeHandle](s"com.winhc.bigdata.spark.jobs.chance.table.$tableName1", cols)
       //      val handle = getHandleClazz(tableName, cols)
 
       val update_time = BaseUtil.nowDate()
@@ -264,14 +279,14 @@ object ChangeExtract {
       ))
 
       spark.createDataFrame(rdd, schema)
-        .createOrReplaceTempView(s"tmp_change_extract_view$tableName") //
+        .createOrReplaceTempView(s"tmp_change_extract_view$tableName1") //
 
       sql(
         s"""
-           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.$target_eci_change_extract PARTITION(ds='$ds',tn='$tableName')
+           |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.$target_eci_change_extract PARTITION(ds='$ds',tn='$tableName1')
            |SELECT *
            |FROM
-           |    tmp_change_extract_view$tableName
+           |    tmp_change_extract_view$tableName1
            |""".stripMargin)
     }
   }
@@ -308,7 +323,7 @@ object ChangeExtract {
     , Args(tableName = "company_land_publicity", primaryFields = "title,location,use_for")
     , Args(tableName = "company_land_announcement", primaryFields = "e_number,project_name")
     , Args(tableName = "company_bid_list", primaryFields = "title")
-    , Args(tableName = "company_zxr_list", primaryFields = "case_no,exec_money")
+    , Args(tableName = "company_zxr_list", primaryFields = "status")
 
     , Args(tableName = "company_land_transfer", primaryFields = "num,location")
     , Args(tableName = "company_land_mortgage", primaryFields = "land_num,source_url")
@@ -346,13 +361,15 @@ object ChangeExtract {
     , Args(tableName = "company_own_tax", primaryFields = "tax_balance,tax_category,tax_num")
 
     , Args(tableName = "company_equity_info", primaryKey = "id", primaryFields = "reg_number", isCopy = false)
+    , Args(tableName = "company_equity_info_list", primaryFields = "reg_number")
     , Args(tableName = "company_staff", primaryFields = "staff_type")
     //公司名称,法人ID:人标识或公司标识,公司类型,注册地址,营业期限终止日期,经营范围,登记机关,企业状态                 ,注册资本,注销日期,注销原因
     , Args(tableName = "company", primaryKey = "cid", primaryFields = "name,legal_entity_id,company_org_type,reg_location,to_time,business_scope,reg_institute,reg_status,reg_capital,cancel_date,cancel_reason")
     , Args(tableName = "company_illegal_info", primaryFields = "remove_reason")
     , Args(tableName = "company_finance", primaryFields = "round")
-    , Args(tableName = "company_dishonest_info", primaryFields = "case_no")
+    , Args(tableName = "company_dishonest_info", primaryFields = "status")
     , Args(tableName = "company_holder", primaryFields = "amount")
+    , Args(tableName = "company_holder_v2", primaryFields = "deleted")
     , Args(tableName = "company_annual_report_out_investment", primaryFields = "main_id")
   )
 

+ 41 - 5
src/main/scala/com/winhc/bigdata/spark/jobs/chance/eci_good_news.scala

@@ -33,6 +33,12 @@ object eci_good_news {
       , "company_certificate" //资质证书
     )
 
+    val update_tn_list = Seq(
+      "company_zxr_restrict"
+      , "company_zxr_list"
+      , "company_dishonest_info"
+    )
+
     val target_ads_case_chance = "ads_case_chance_good_news"
     val target_ads_case_chance_element = "ads_case_chance_element_good_news"
 
@@ -46,9 +52,9 @@ object eci_good_news {
       chance_dynamic_type()
       val now_time = BaseUtil.atMonthsBefore(0)
 
-      val eci_debtor_rel_ds = getLastPartitionsOrElse("winhc_eci_dev.ads_eci_debtor_relation", "0")
+      val eci_debtor_rel_ds = getLastPartitionsOrElse("winhc_eci.ads_eci_debtor_relation_v2", "0")
 
-      val relation_cols = getColumns("winhc_eci_dev.ads_eci_debtor_relation").filter(!_.equals("ds"))
+      val relation_cols = getColumns("winhc_eci.ads_eci_debtor_relation_v2").filter(!_.equals("ds"))
 
       val good_news_cols = getColumns(s"winhc_eci_dev.$source_ads_change_extract").filter(!_.equals("ds"))
 
@@ -57,17 +63,47 @@ object eci_good_news {
            |SELECT  *
            |FROM    (
            |            SELECT  ${relation_cols.map(n => s"$n as rel_$n").mkString(",")}
-           |            FROM    winhc_eci_dev.ads_eci_debtor_relation
+           |            FROM    winhc_eci.ads_eci_debtor_relation_v2
            |            WHERE   ds = '$eci_debtor_rel_ds'
            |            and deleted = 0
            |        ) AS t1
            |JOIN (
            |              SELECT  ${good_news_cols.map(n => s"$n as detail_$n").mkString(",")}
            |              FROM    winhc_eci_dev.$source_ads_change_extract
-           |              WHERE   ds = '${ds}'
+           |              WHERE   ds >= '${ds}'
            |              AND     tn in (${change_tn_list.map("'" + _ + "'").mkString(",")})
            |              AND     TYPE = 'insert'
            |              AND     months_between('$now_time',to_date(biz_date)) < 3
+           |              UNION ALL
+           |              SELECT  ${good_news_cols.map(n => s"$n as detail_$n").mkString(",")}
+           |              FROM    winhc_eci_dev.$source_ads_change_extract
+           |              WHERE   ds >= '${ds}'
+           |              AND     tn in (${update_tn_list.map("'" + _ + "'").mkString(",")})
+           |              AND     TYPE = 'update'
+           |              AND     months_between('$now_time',to_date(biz_date)) < 3
+           |              UNION ALL
+           |              SELECT  ${good_news_cols.map(n => s"$n as detail_$n").mkString(",")}
+           |              FROM    winhc_eci_dev.$source_ads_change_extract
+           |              WHERE   ds >= '${ds}'
+           |              AND     tn in ('company_court_open_announcement_list','wenshu_detail_combine')
+           |              AND     TYPE = 'insert'
+           |              AND     data['case_no'] like concat('%','恢','%')
+           |              AND     months_between('$now_time',to_date(biz_date)) < 3
+           |              UNION ALL
+           |              SELECT  ${good_news_cols.map(n => s"$n as detail_$n").mkString(",")}
+           |              FROM    winhc_eci_dev.$source_ads_change_extract
+           |              WHERE   ds >= '${ds}'
+           |              AND     tn in ('company_holder')
+           |              AND     TYPE = 'insert'
+           |              AND     months_between('$now_time',to_date(biz_date)) < 3
+           |              UNION ALL
+           |              SELECT  ${good_news_cols.map(n => s"$n as detail_$n").mkString(",")}
+           |              FROM    winhc_eci_dev.$source_ads_change_extract
+           |              WHERE   ds >= '${ds}'
+           |              AND     tn in ('company_equity_info_list')
+           |              AND     TYPE = 'insert'
+           |              AND     data['type'] = '1'
+           |              AND     months_between('$now_time',to_date(biz_date)) < 3
            |          ) AS t2
            |ON      t1.rel_bg_cid = t2.detail_cid
            |WHERE t1.rel_bg_name not like concat('%','银行','%') AND t1.rel_bg_name not like concat('%','保险','%')
@@ -151,7 +187,7 @@ object eci_good_news {
 
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
     )
     val spark = SparkUtils.InitEnv("eci_good_news", config)
     eci_good_news_handle(spark, ds).company_ip()

+ 18 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_equity_info_list.scala

@@ -0,0 +1,18 @@
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+
+//股权出质打平
+case class company_equity_info_list(equCols: Seq[String]) extends CompanyChangeHandle {
+
+  override def getCid(rowkey: String, newMap: Map[String, String]): String = newMap("cid")
+
+  override def getUpdateTitle(newMap: Map[String, String]): String = s"股权出质信息发生变更"
+
+  override def getInsertTitle(newMap: Map[String, String]): String = s"股权出质信息发生变更"
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String  = ChangeExtractUtils.getTags(newMap, "股权出质", Array("pledgor", "pledgee", "cid", "pledgor_id", "pledgee_type", "pledgee_id", "pledgor_type"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("reg_date")
+}

+ 19 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/chance/table/company_holder_v2.scala

@@ -0,0 +1,19 @@
+package com.winhc.bigdata.spark.jobs.chance.table
+
+import com.winhc.bigdata.spark.jobs.chance.CompanyChangeHandle
+import com.winhc.bigdata.spark.utils.ChangeExtractUtils
+
+/**
+ * @Author: π
+ * @Date: 2020/12/10
+ * @Description: 股东新增-移除
+ */
+case class company_holder_v2(equCols: Seq[String]) extends CompanyChangeHandle {
+  override def getUpdateTitle(newMap: Map[String, String]): String = "股东移除"
+
+  override def getInsertTitle(newMap: Map[String, String]): String = "新增股东"
+
+  override def getLabel(oldMap: Map[String, String], newMap: Map[String, String]): String = ChangeExtractUtils.getTags(newMap, "股东信息", Array("holder_id", "holder_type", "amount", "capital", "capital_actual"))
+
+  override def getBizTime(newMap: Map[String, String]): String = newMap("update_time")
+}

+ 0 - 362
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPreNewTmp2.scala

@@ -1,362 +0,0 @@
-package com.winhc.bigdata.spark.jobs.judicial
-
-import java.util.NoSuchElementException
-
-import com.winhc.bigdata.spark.udf._
-import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
-import com.winhc.bigdata.spark.utils.{BaseUtil, EsRestUtils, LoggingUtils, SparkUtils}
-import org.apache.commons.lang3.StringUtils
-import org.apache.spark.broadcast.Broadcast
-import org.apache.spark.sql.types.StructType
-import org.apache.spark.sql.{Row, SparkSession}
-
-import scala.collection.mutable
-import scala.collection.mutable.ListBuffer
-
-object JudicialCaseRelationPreNewTmp2 {
-  def main(args: Array[String]): Unit = {
-    val project = "winhc_eci_dev"
-    val ds = ""
-
-    val config = mutable.Map(
-      "spark.hadoop.odps.project.name" -> s"$project",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "10000"
-    )
-    val spark: SparkSession = SparkUtils.InitEnv(this.getClass.getSimpleName, config)
-    JudicialCaseRelationPreNewTmp2(spark, project, ds).calc()
-    spark.stop()
-  }
-}
-
-case class JudicialCaseRelationPreNewTmp2(s: SparkSession, project: String, ds: String
-                                         ) extends LoggingUtils with CompanyMapping with BaseFunc with CourtRank {
-  override protected val spark: SparkSession = s
-
-  def get_seq_by_index(area_code: Broadcast[Map[String, Seq[String]]], code: String, index: Int): String = {
-    val c = area_code.value.getOrElse(code, null)
-    if (c == null) "" else if (c(index) != null) c(index).trim else ""
-  }
-
-  private def get_area_code(code: String, area_code: Broadcast[Map[String, Seq[String]]]) = {
-    val listbuffer = ListBuffer[String]()
-    if (StringUtils.isNotBlank(code) && code.trim.length == 6) {
-      val c = code.trim
-      listbuffer.append(
-        c.substring(0, 2), get_seq_by_index(area_code, c, 0),
-        c.substring(2, 4), get_seq_by_index(area_code, c, 1),
-        c.substring(4, 6), get_seq_by_index(area_code, c, 2))
-    } else {
-      listbuffer.append("", "", "", "", "", "")
-    }
-    listbuffer.toSeq
-  }
-
-
-  private def get_category_code(code: String, category_code: Broadcast[Map[String, Seq[String]]]) = {
-    val listbuffer = ListBuffer[String]()
-    if (StringUtils.isNotBlank(code)) {
-      val c = code.trim
-      (c, get_seq_by_index(category_code, c, 0), get_seq_by_index(category_code, c, 1), get_seq_by_index(category_code, c, 2))
-      listbuffer.append(
-        get_seq_by_index(category_code, c, 0),
-        get_seq_by_index(category_code, c, 1),
-        get_seq_by_index(category_code, c, 2)
-      )
-    } else {
-      listbuffer.append("", "", "")
-    }
-    listbuffer.toSeq
-  }
-
-  def calc(): Unit = {
-    prepareFunctions(spark)
-    map_2_json()
-    case_no_trim_udf()
-    registerCourtRank()
-    val (m1, m2) = code2Name()
-
-    spark.udf.register("category_code", (code: String) => {
-      get_category_code(code, m1)
-    })
-    spark.udf.register("area_code", (code: String) => {
-      get_area_code(code, m2)
-    })
-
-    spark.udf.register("name_aggs", new NameAggs(1000))
-    spark.udf.register("case_reason", new CaseReasonAggs(1000))
-    //预处理数据
-    val t2 = s"ads_judicial_case_relation_pre"
-    var t2_ds = ds
-    var t1_ds = ds
-    if (StringUtils.isBlank(ds)) {
-      t2_ds = BaseUtil.getPartion(t2, "wenshu", spark)
-      t1_ds = t2_ds
-    }
-
-    val t3 = "ads_judicial_case_relation_replace" //司法案件id交换表
-    val t4 = "ads_judicial_case_incr_mapping"
-    val t5 = s"base_company_mapping" //公司name和cid映射
-    val t6 = s"ads_judicial_case_relation_replace_cids" //公司name和cid映射
-    val eci_debtor_relation = "ads_eci_debtor_relation_xf"
-    val deadbeat_company = "ads_deadbeat_company"
-
-
-    //println(schema)
-
-    val t5_ds = BaseUtil.getPartion(t5, spark) //映射表分区
-
-    //    sql(
-    //      """
-    //        |insert overwrite table winhc_eci_dev.tmp_xf_deadbeat_company_deleted
-    //        |SELECT rowkey,cid,name,card_num,publish_date,deleted,tn,flag from (
-    //        |SELECT *,case when tn ='company_zxr' then 7 when tn ='company_dishonest_info' then 3 when tn ='company_zxr_restrict' then 5 else '' end as flag
-    //        |,ROW_NUMBER() OVER(PARTITION BY rowkey,tn ORDER BY publish_date DESC) AS num
-    //        |from ads_deadbeat_company
-    //        |where ds > '0' and deleted = 1 and tn <> 'company_zxr_final_case'
-    //        |)
-    //        |where num = 1;
-    //        |""".stripMargin)
-
-    //    sql(
-    //      s"""
-    //         |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE winhc_eci_dev.ads_judicial_case_relation_r1
-    //         |SELECT
-    //         |    concat_ws('',x.judicase_id,${t1_ds.substring(2)}) judicase_id,
-    //         |    title       ,
-    //         |    case_type   ,
-    //         |    case_reason ,
-    //         |    case_no     ,
-    //         |    court_name  ,
-    //         |    case_stage  ,
-    //         |    lable       ,
-    //         |    detail      ,
-    //         |    -- name_aggs['yg_name'] yg_name,
-    //         |    -- name_aggs['bg_name'] bg_name,
-    //         |    yg_name,
-    //         |    bg_name,
-    //         |    case_amt    ,
-    //         |    date        ,
-    //         |    court_level ,
-    //         |    0 as deleted     ,
-    //         |    cids
-    //         |FROM
-    //         |(
-    //         |SELECT  judicase_id
-    //         |        ,max(title) title
-    //         |        ,concat_ws(',',collect_set(case_type)) case_type
-    //         |        ,case_reason(case_reason,date,flag) case_reason
-    //         |        ,concat_ws(',',collect_set(case_no)) case_no
-    //         |        ,concat_ws(',',collect_set(court_name)) court_name
-    //         |        ,last_stage(concat_ws(' ',collect_set(case_stage))) case_stage
-    //         |        ,trim_black(concat_ws(',',max(case_type),collect_set(lable))) lable
-    //         |        ,null as detail
-    //         |        ,max(case_amt) AS case_amt
-    //         |        ,max(date) AS date
-    //         |        ,trim_black(concat_ws(',',collect_set(court_level))) court_level
-    //         |        ,concat_ws(',',collect_set(cids)) cids
-    //         |        -- ,name_aggs(yg_name,bg_name,flag,date) name_aggs
-    //         |        ,concat_ws(',',collect_set(yg_name)) yg_name
-    //         |        ,concat_ws(',',collect_set(bg_name)) bg_name
-    //         |FROM    (
-    //         |        SELECT  a.*
-    //         |        FROM    (
-    //         |                   SELECT  judicase_id,flag,title,case_type,case_reason,case_no,court_name,case_stage,lable,yg_name,bg_name,date,case_amt,cids,detail_id
-    //         |                   ,court_level(court_name) court_level
-    //         |                   FROM    $project.$t6
-    //         |                   WHERE   ds = '$t1_ds'
-    //         |                ) a
-    //         |                left join
-    //         |                (
-    //         |                select rowkey,flag from winhc_eci_dev.tmp_xf_deadbeat_company_deleted
-    //         |                )b on a.detail_id = b.rowkey and a.flag = b.flag
-    //         |                where b.rowkey is null
-    //         |        )
-    //         |GROUP BY judicase_id
-    //         |)x
-    //         |""".stripMargin).show(20, false)
-
-    //ads_eci_debtor_relation_xf
-
-    //生成债权表
-    //    val df = sql(
-    //      s"""
-    //        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"} TABLE $project.tmp_xf_yg_bg_name
-    //        |SELECT md5(concat_ws('',yg_name,bg_name)) id,yg_name,bg_name
-    //        |FROM (
-    //        |SELECT
-    //        |yg_name_x as yg_name
-    //        |,bg_name_x as bg_name
-    //        |FROM winhc_eci_dev.ads_judicial_case_relation_r1
-    //        |LATERAL VIEW explode(split(yg_name,',')) a AS yg_name_x
-    //        |LATERAL VIEW explode(split(bg_name,',')) b AS bg_name_x
-    //        |WHERE compare_name(yg_name,bg_name)
-    //        |AND (lable like '%被执行人%' or lable like '%限制高消费%'  or lable like '%失信人%')
-    //        |AND  LENGTH(cleanup(yg_name_x)) > 4
-    //        |AND  LENGTH(cleanup(bg_name_x)) > 4
-    //        |AND size(split(yg_name,',')) < 500
-    //        |AND size(split(bg_name,',')) < 500
-    //        |)
-    //        |GROUP BY yg_name,bg_name
-    //        |""".stripMargin)
-
-
-//    sql(
-//      """
-//        |INSERT OVERWRITE  TABLE winhc_eci_dev.tmp_xf_base_company_mapping
-//        |SELECT
-//        |a.cid,a.cname,a.new_cid,id,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
-//        |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,b.company_type,
-//        |credit_code,score,category_code,lat2,lng2,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
-//        |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,b.update_time,b.deleted
-//        |from (
-//        |SELECT * from winhc_eci_dev.base_company_mapping where ds = '20201125'
-//        |)a
-//        |JOIN
-//        |(
-//        |    SELECT
-//        |id,cid,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
-//        |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,company_type,
-//        |credit_code,score,category_code,lat2,lng2,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
-//        |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,update_time,deleted
-//        | from
-//        |(
-//        |SELECT
-//        |*,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY update_time DESC) AS num,split(verify,',')[0] lng2,split(verify,',')[1] lat2
-//        |from (
-//        |SELECT
-//        |verify(lng,lat) verify,
-//        |id,cid,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
-//        |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,company_type,
-//        |credit_code,score,category_code,lat,lng,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
-//        |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,update_time,deleted
-//        |from winhc_eci_dev.ads_company where ds ='20200604'
-//        |UNION ALL
-//        |SELECT
-//        |verify(lng,lat) verify,
-//        |id,cid,base,name,name_en,name_alias,history_names,legal_entity_id,legal_entity_type,reg_number,company_org_type,reg_location,estiblish_time,from_time,
-//        |to_time,business_scope,reg_institute,approved_time,reg_status,reg_capital,org_number,org_approved_institute,current_cid,parent_cid,company_type,
-//        |credit_code,score,category_code,lat,lng,area_code,reg_capital_amount,reg_capital_currency,actual_capital_amount,actual_capital_currency,reg_status_std,
-//        |social_security_staff_num,cancel_date,cancel_reason,revoke_date,revoke_reason,emails,phones,wechat_public_num,logo,crawled_time,create_time,update_time,deleted
-//        |from winhc_eci_dev.inc_ads_company where ds >'20200604'
-//        |)
-//        |)
-//        |where num = 1
-//        |)b on a.cid =b.cid
-//        |""".stripMargin)
-
-    //公司基本信息全量表
-    sql(
-      s"""
-        |INSERT ${if (isWindows) "INTO" else "OVERWRITE"}  TABLE $project.tmp_xf_base_company_mapping_new
-        |select
-        |cid,new_cid,cname,reg_status,area_code,
-        |a1[0]  province_code,a1[1]  province_name,
-        |a1[2]  city_code,a1[3]  city_name,
-        |a1[4]  county_code,a1[5]  county_name,
-        |reg_location,estiblish_time,category_code,
-        |a2[0]  category_first,a2[1]  category_second,a2[2]  category_third,
-        |reg_capital,phones,emails,company_type
-        |from
-        |(
-        |select
-        | area_code(area_code) a1
-        | ,category_code(category_code) a2
-        | ,*
-        |from ${project}.tmp_xf_base_company_mapping
-        |)
-        |""".stripMargin).show(100,false)
-
-    //生成债权表
-    sql(
-      """
-        |insert overwrite table winhc_eci_dev.ads_eci_debtor_relation_xf PARTITION (ds='20201127')
-        |select e.* from (
-        |SELECT
-        |c.id,c.yg_name,d.bg_name,c.new_cid yg_cid,d.new_cid bg_cid,
-        |c.reg_status yg_reg_status,
-        |c.province_code yg_province_code,
-        |c.province_name yg_province_name,
-        |c.city_code yg_city_code,
-        |c.city_name yg_city_name,
-        |c.county_code yg_county_code,
-        |c.county_name yg_county_name,
-        |c.reg_location yg_reg_location,
-        |c.estiblish_time yg_estiblish_time,
-        |c.category_code yg_category_code,
-        |c.category_first yg_category_first,
-        |c.category_second yg_category_second,
-        |c.category_third yg_category_third,
-        |c.reg_capital yg_reg_capital,
-        |c.phones yg_phones,
-        |c.emails yg_emails,
-        |d.reg_status bg_reg_status,
-        |d.province_code bg_province_code,
-        |d.province_name bg_province_name,
-        |d.city_code bg_city_code,
-        |d.city_name bg_city_name,
-        |d.county_code bg_county_code,
-        |d.county_name bg_county_name,
-        |d.reg_location bg_reg_location,
-        |d.estiblish_time bg_estiblish_time,
-        |d.category_code bg_category_code,
-        |d.category_first bg_category_first,
-        |d.category_second bg_category_second,
-        |d.category_third bg_category_third,
-        |d.reg_capital bg_reg_capital,
-        |d.phones bg_phones,
-        |d.emails bg_emails,
-        |0 as deleted
-        |FROM    (
-        |            SELECT  a.id
-        |                    ,a.yg_name
-        |                    ,b.*
-        |            FROM    (
-        |                        SELECT  id
-        |                                ,yg_name
-        |                        FROM    tmp_xf_yg_bg_name
-        |                    ) a
-        |            JOIN    (
-        |                        SELECT  *
-        |                                ,ROW_NUMBER() OVER(PARTITION BY CLEANUP(cname) ORDER BY estiblish_time DESC) AS num
-        |                        FROM    tmp_xf_base_company_mapping_new
-        |                        WHERE   length(cleanup(cname)) > 4
-        |                        AND     company_type NOT IN ('2','8')
-        |                    ) b
-        |            ON      cleanup(a.yg_name) = cleanup(b.cname)
-        |        )c
-        |JOIN    (
-        |            SELECT  a.id
-        |                    ,a.bg_name
-        |                    ,b.*
-        |            FROM    (
-        |                        SELECT  id
-        |                                ,bg_name
-        |                        FROM    tmp_xf_yg_bg_name
-        |                    ) a
-        |            JOIN    (
-        |                        SELECT  *
-        |                                ,ROW_NUMBER() OVER(PARTITION BY CLEANUP(cname) ORDER BY estiblish_time DESC) AS num
-        |                        FROM    tmp_xf_base_company_mapping_new
-        |                        WHERE   length(cleanup(cname)) > 4
-        |                        AND     company_type NOT IN ('2','8')
-        |                    ) b
-        |            ON      cleanup(a.bg_name) = cleanup(b.cname)
-        |        )d
-        |ON      c.id = d.id
-        |)e
-        |JOIN
-        |(
-        |SELECT * from (
-        |SELECT cid
-        |,ROW_NUMBER() OVER(PARTITION BY cid ORDER BY publish_date DESC) AS num
-        |from ads_deadbeat_company
-        |where ds > '0' and deleted = 0 and tn <> 'company_zxr_final_case'
-        |)
-        |where num = 1
-        |)f
-        |on e.bg_cid = f.cid
-        |""".stripMargin)
-
-  }
-
-}

+ 23 - 2
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -271,7 +271,7 @@ object BaseUtil {
     }
     val a1 = ygname.split(",", -1).filter(StringUtils.isNotBlank(_)).toSet
     val a2 = bgname.split(",", -1).filter(StringUtils.isNotBlank(_)).toSet
-    if(a1.size == 0 || a2.size == 0){
+    if (a1.size == 0 || a2.size == 0) {
       return false
     }
     (a1 & a2).size == 0
@@ -406,6 +406,27 @@ object BaseUtil {
     (province, city, district)
   }
 
+  private val LONGITUDE = "^[\\-\\+]?(0?\\d{1,2}\\.\\d{1,15}|1[0-7]?\\d{1}\\.\\d{1,15}|180\\.0{1,15})$"
+  private val LATITUDE = "^[\\-\\+]?([0-8]?\\d{1}\\.\\d{1,15}|90\\.0{1,15})$"
+
+  /**
+   * 经纬度转换
+   */
+  def verify(longitude: String, latitude: String): Unit = {
+    val s = new StringBuilder()
+    if (StringUtils.isNotBlank(longitude) && StringUtils.isNotBlank(latitude)) {
+      if (Pattern.matches(LONGITUDE, longitude) && Pattern.matches(LATITUDE, latitude)) {
+        s.append(longitude).append(",").append(latitude)
+      }
+      else if (Pattern.matches(LONGITUDE, latitude) && Pattern.matches(LATITUDE, longitude)) {
+        s.append(latitude).append(",").append(longitude)
+      }
+    } else {
+      s.append(",")
+    }
+    s.toString()
+  }
+
   def main(args: Array[String]): Unit = {
     //    println(title("xx", null, "reason"))
     //    println(parseAddress("大石桥市人民法院"))
@@ -415,7 +436,7 @@ object BaseUtil {
     //println(caseStage("(2019)鄂初7号"))
     val yg_name = ",,"
     val bg_name = "张三,,小米,"
-    println(compareName(yg_name,bg_name))
+    println(compareName(yg_name, bg_name))
   }
 
 }