Browse Source

Merge remote-tracking branch 'origin/master'

许家凯 4 years ago
parent
commit
83f0a722ef
19 changed files with 116 additions and 139 deletions
  1. 8 6
      src/main/scala/com/winhc/bigdata/spark/jobs/JustiCase.scala
  2. 2 1
      src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala
  3. 2 2
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala
  4. 2 2
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala
  5. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_annual_report_out_guarantee.scala
  6. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_bid_list.scala
  7. 1 11
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_brief_cancel_announcement.scala
  8. 1 1
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_equity_info.scala
  9. 5 15
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_mortgage.scala
  10. 3 13
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_transfer.scala
  11. 3 13
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_mortgage_info.scala
  12. 5 15
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_public_announcement2_list.scala
  13. 4 14
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info.scala
  14. 4 14
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info_creditchina.scala
  15. 3 13
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_stock_announcement.scala
  16. 5 15
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_restrict.scala
  17. 3 0
      src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/wenshu_detail_combine.scala
  18. 2 2
      src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala
  19. 61 0
      src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrCombineUtils.scala

+ 8 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/JustiCase.scala

@@ -30,12 +30,12 @@ case class JustiCase(s: SparkSession,
          |SELECT  *,get_justicase_id(case_no) AS case_no_hash
          |SELECT  *,get_justicase_id(case_no) AS case_no_hash
          |FROM    $project.ods_$tableName
          |FROM    $project.ods_$tableName
          |WHERE   ds=${ods_ds} AND ${toCol} IS NOT NULL AND LENGTH(${fromCol})>1
          |WHERE   ds=${ods_ds} AND ${toCol} IS NOT NULL AND LENGTH(${fromCol})>1
+         |""".stripMargin)
+/*
          |UNION
          |UNION
          |SELECT  *,get_justicase_id(case_no) AS case_no_hash
          |SELECT  *,get_justicase_id(case_no) AS case_no_hash
          |FROM    $project.inc_ods_$tableName
          |FROM    $project.inc_ods_$tableName
          |WHERE   ds=${inc_ods_ds} AND ${toCol} IS NOT NULL AND LENGTH(${fromCol})>1
          |WHERE   ds=${inc_ods_ds} AND ${toCol} IS NOT NULL AND LENGTH(${fromCol})>1
-         |""".stripMargin)
-/*
           s"""SELECT *,get_justicase_id(case_no) AS case_no_hash FROM (
           s"""SELECT *,get_justicase_id(case_no) AS case_no_hash FROM (
              |SELECT '(2016)闽02刑更704号' AS case_no, '(2008)厦刑初字第69号\n(2009)闽刑终字第133号\n(2012)厦刑执字第628号\n(2015)厦刑执字第485号' AS connect_case_no
              |SELECT '(2016)闽02刑更704号' AS case_no, '(2008)厦刑初字第69号\n(2009)闽刑终字第133号\n(2012)厦刑执字第628号\n(2015)厦刑执字第485号' AS connect_case_no
              |UNION
              |UNION
@@ -88,10 +88,6 @@ case class JustiCase(s: SparkSession,
          |  SELECT  get_justicase_id(CASE_NO) AS case_no_hash, '0' AS flag, *
          |  SELECT  get_justicase_id(CASE_NO) AS case_no_hash, '0' AS flag, *
          |  FROM    $project.ods_$tableName
          |  FROM    $project.ods_$tableName
          |  WHERE   ds=${ods_ds}  AND ${toCol} IS NOT NULL
          |  WHERE   ds=${ods_ds}  AND ${toCol} IS NOT NULL
-         |  UNION
-         |  SELECT  get_justicase_id(CASE_NO) AS case_no_hash, '0' AS flag, *
-         |  FROM    $project.inc_ods_$tableName
-         |  WHERE   ds=${inc_ods_ds} AND ${toCol} IS NOT NULL
          |) A
          |) A
          |LEFT JOIN
          |LEFT JOIN
          |(
          |(
@@ -99,6 +95,12 @@ case class JustiCase(s: SparkSession,
          |) B
          |) B
          |ON A.case_no_hash=B.case_no_hash
          |ON A.case_no_hash=B.case_no_hash
          |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")
          |""".stripMargin)//.createOrReplaceTempView(s"tmp_graphx_$tableName")
+    /*
+         |  UNION
+         |  SELECT  get_justicase_id(CASE_NO) AS case_no_hash, '0' AS flag, *
+         |  FROM    $project.inc_ods_$tableName
+         |  WHERE   ds=${inc_ods_ds} AND ${toCol} IS NOT NULL
+    */
   }
   }
 }
 }
 
 

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/chance/ChangeExtract.scala

@@ -309,6 +309,7 @@ object ChangeExtract {
     , Args(tableName = "company_land_announcement", primaryFields = "e_number,project_name")
     , Args(tableName = "company_land_announcement", primaryFields = "e_number,project_name")
     , Args(tableName = "company_bid_list", primaryFields = "title")
     , Args(tableName = "company_bid_list", primaryFields = "title")
     , Args(tableName = "company_land_transfer", primaryFields = "num,location")
     , Args(tableName = "company_land_transfer", primaryFields = "num,location")
+    , Args(tableName = "company_land_mortgage", primaryFields = "land_num,source_url")
     , Args(tableName = "company_employment", primaryFields = "title,url_path")
     , Args(tableName = "company_employment", primaryFields = "title,url_path")
     , Args(tableName = "company_env_punishment", primaryFields = "punish_number")
     , Args(tableName = "company_env_punishment", primaryFields = "punish_number")
     , Args(tableName = "company_icp", primaryFields = "domain")
     , Args(tableName = "company_icp", primaryFields = "domain")
@@ -366,7 +367,7 @@ object ChangeExtract {
 
 
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
     val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
       "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
-      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+      "spark.hadoop.odps.spark.local.partition.amt" -> "100"
     )
     )
     val spark = SparkUtils.InitEnv("ChangeExtract", config)
     val spark = SparkUtils.InitEnv("ChangeExtract", config)
 
 

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamic.scala

@@ -182,18 +182,18 @@ object CompanyDynamic {
     , Args(tableName = "company_equity_info")
     , Args(tableName = "company_equity_info")
     , Args(tableName = "company_staff", bName = 1)
     , Args(tableName = "company_staff", bName = 1)
     , Args(tableName = "company", bName = 0)
     , Args(tableName = "company", bName = 0)
-    , Args(tableName = "bankruptcy_open_case", bName = 1)
     , Args(tableName = "company_illegal_info", bName = 1)
     , Args(tableName = "company_illegal_info", bName = 1)
     , Args(tableName = "company_land_publicity", bName = 1)
     , Args(tableName = "company_land_publicity", bName = 1)
     , Args(tableName = "company_employment", bName = 1, aggs = 1)
     , Args(tableName = "company_employment", bName = 1, aggs = 1)
     , Args(tableName = "company_land_announcement", bName = 1)
     , Args(tableName = "company_land_announcement", bName = 1)
     , Args(tableName = "company_bid_list", bName = 2)
     , Args(tableName = "company_bid_list", bName = 2)
     , Args(tableName = "company_land_transfer", bName = 1)
     , Args(tableName = "company_land_transfer", bName = 1)
+    , Args(tableName = "company_land_mortgage", bName = 1)
     , Args(tableName = "company_env_punishment", bName = 1)
     , Args(tableName = "company_env_punishment", bName = 1)
     , Args(tableName = "company_punishment_info", bName = 1)
     , Args(tableName = "company_punishment_info", bName = 1)
     , Args(tableName = "company_punishment_info_creditchina", bName = 1)
     , Args(tableName = "company_punishment_info_creditchina", bName = 1)
-    , Args(tableName = "bankruptcy_open_case", bName = 1)
     , Args(tableName = "company_public_announcement2_list", bName = 2)
     , Args(tableName = "company_public_announcement2_list", bName = 2)
+    , Args(tableName = "bankruptcy_open_case", bName = 1)
     , Args(tableName = "company_mortgage_info", bName = 1)
     , Args(tableName = "company_mortgage_info", bName = 1)
     , Args(tableName = "company_stock_announcement", bName = 1)
     , Args(tableName = "company_stock_announcement", bName = 1)
     , Args(tableName = "company_finance", bName = 1)
     , Args(tableName = "company_finance", bName = 1)

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/CompanyDynamicHandle.scala

@@ -205,7 +205,7 @@ trait CompanyDynamicHandle {
       , rta_desc
       , rta_desc
       , get_change_content(old_map, new_map)
       , get_change_content(old_map, new_map)
       , get_change_time(bizDate, new_map)
       , get_change_time(bizDate, new_map)
-      , get_biz_id(rowkey)
+      , get_biz_id(rowkey, new_map)
       , get_sub_info_type()
       , get_sub_info_type()
       , get_info_risk_level(old_map, new_map)
       , get_info_risk_level(old_map, new_map)
       , if (suggestion == null) null else suggestion
       , if (suggestion == null) null else suggestion
@@ -265,7 +265,7 @@ trait CompanyDynamicHandle {
    * @param rowkey
    * @param rowkey
    * @return
    * @return
    */
    */
-  protected def get_biz_id(rowkey: String): String = rowkey
+  protected def get_biz_id(rowkey: String, new_map: Map[String, String]): String = rowkey
 
 
   /**
   /**
    * 子信息类型,小类
    * 子信息类型,小类

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_annual_report_out_guarantee.scala

@@ -42,5 +42,5 @@ case class company_annual_report_out_guarantee()extends CompanyDynamicHandle {
    * @param new_map
    * @param new_map
    * @return
    * @return
    */
    */
-  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "提示信息"
+  override def get_info_risk_level(old_map: Map[String, String], new_map: Map[String, String]): String = "2"
 }
 }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_bid_list.scala

@@ -32,7 +32,7 @@ case class company_bid_list() extends CompanyDynamicHandle {
    * @param rowkey
    * @param rowkey
    * @return
    * @return
    */
    */
-  override def get_biz_id(rowkey: String): String = rowkey.split("_")(1)
+  override def get_biz_id(rowkey: String, new_map: Map[String, String]): String = rowkey.split("_")(1)
 
 
   /**
   /**
    * 风险等级
    * 风险等级

+ 1 - 11
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_brief_cancel_announcement.scala

@@ -16,19 +16,9 @@ case class company_brief_cancel_announcement() extends CompanyDynamicHandle {
    * @param new_map
    * @param new_map
    * @return
    * @return
    */
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("announcement_term")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
     "进行了简易注销"
     "进行了简易注销"
   }
   }
-
   /**
   /**
    * 变更时间
    * 变更时间
    *
    *

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_equity_info.scala

@@ -47,7 +47,7 @@ case class company_equity_info() extends CompanyDynamicHandle {
           , get_rta_desc(old_map, new_map)
           , get_rta_desc(old_map, new_map)
           , get_change_content(old_map, new_map, cname)
           , get_change_content(old_map, new_map, cname)
           , get_change_time(bizDate, new_map)
           , get_change_time(bizDate, new_map)
-          , get_biz_id(rowkey)
+          , get_biz_id(rowkey,new_map)
           , get_sub_info_type()
           , get_sub_info_type()
           , t._3
           , t._3
           , null
           , null

+ 5 - 15
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_mortgage.scala

@@ -16,22 +16,12 @@ case class company_land_mortgage() extends CompanyDynamicHandle {
    * @param new_map
    * @param new_map
    * @return
    * @return
    */
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("land_num")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""抵押土地评估金额:${new_map("evaluate_amount")}万元\n
-       |抵押金额:${new_map("mortgage_amount")}万元\n
-       |抵押开始时间:${new_map("start_date")}\n
-       |抵押结束时间:${new_map("end_date")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""抵押土地评估金额:${new_map("evaluate_amount")}万元
+       |抵押金额:${new_map("mortgage_amount")}万元
+       |抵押开始时间:${new_map("start_date")}
+       |抵押结束时间:${new_map("end_date")}""".stripMargin
   }
   }
-
   /**
   /**
    * 变更时间
    * 变更时间
    *
    *

+ 3 - 13
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_land_transfer.scala

@@ -16,20 +16,10 @@ case class company_land_transfer() extends CompanyDynamicHandle {
    * @param new_map
    * @param new_map
    * @return
    * @return
    */
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("num")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""转让价格:${new_map("merchandise_price")}万元\n
-       |成交时间:${new_map("merchandise_time")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""转让价格:${new_map("merchandise_price")}万元
+       |成交时间:${new_map("merchandise_time")}""".stripMargin
   }
   }
-
   /**
   /**
    * 变更时间
    * 变更时间
    *
    *

+ 3 - 13
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_mortgage_info.scala

@@ -16,20 +16,10 @@ case class company_mortgage_info() extends CompanyDynamicHandle {
    * @param new_map
    * @param new_map
    * @return
    * @return
    */
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("reg_num")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""被担保债权数额:${new_map("amount")}\n
-       |债务人履行债务期限:${new_map("term")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""被担保债权数额:${new_map("amount")}
+       |债务人履行债务期限:${new_map("term")}""".stripMargin
   }
   }
-
   /**
   /**
    * 变更时间
    * 变更时间
    *
    *

+ 5 - 15
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_public_announcement2_list.scala

@@ -16,22 +16,12 @@ case class company_public_announcement2_list()extends CompanyDynamicHandle {
    * @param new_map
    * @param new_map
    * @return
    * @return
    */
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("bill_type") + new_map("bill_num")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""票号:${new_map("bill_num")}\n
-       |申请人:${new_map("cname")}\n
-       |票面金额:${new_map("start_date")}\n
-       |公告日期:${new_map("end_date")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""票号:${new_map("bill_num")}
+       |申请人:${new_map("cname")}
+       |票面金额:${new_map("start_date")}
+       |公告日期:${new_map("end_date")}""".stripMargin
   }
   }
-
   /**
   /**
    * 变更时间
    * 变更时间
    *
    *

+ 4 - 14
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info.scala

@@ -16,21 +16,11 @@ case class company_punishment_info() extends CompanyDynamicHandle {
    * @param new_map
    * @param new_map
    * @return
    * @return
    */
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("punish_number")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""决定日期:${new_map("decision_date")}\n
-       |违法行为类型:${new_map("type")}\n
-       |行政处罚内容:${new_map("content")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""决定日期:${new_map("decision_date")}
+       |违法行为类型:${new_map("type")}
+       |行政处罚内容:${new_map("content")}""".stripMargin
   }
   }
-
   /**
   /**
    * 变更时间
    * 变更时间
    *
    *

+ 4 - 14
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_punishment_info_creditchina.scala

@@ -16,21 +16,11 @@ case class company_punishment_info_creditchina() extends CompanyDynamicHandle {
    * @param new_map
    * @param new_map
    * @return
    * @return
    */
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("punish_number")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""决定日期:${new_map("decision_date")}\n
-       |违法行为类型:${new_map("type")}\n
-       |行政处罚结果:${new_map("result")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""决定日期:${new_map("decision_date")}
+       |违法行为类型:${new_map("type")}
+       |行政处罚结果:${new_map("result")}""".stripMargin
   }
   }
-
   /**
   /**
    * 变更时间
    * 变更时间
    *
    *

+ 3 - 13
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_stock_announcement.scala

@@ -16,20 +16,10 @@ case class company_stock_announcement()extends CompanyDynamicHandle {
    * @param new_map
    * @param new_map
    * @return
    * @return
    */
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("title")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""公告名称:${new_map("title")}\n
-       |公告日期:${new_map("time")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""公告名称:${new_map("title")}
+       |公告日期:${new_map("time")}""".stripMargin
   }
   }
-
   /**
   /**
    * 变更时间
    * 变更时间
    *
    *

+ 5 - 15
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/company_zxr_restrict.scala

@@ -16,22 +16,12 @@ case class company_zxr_restrict()extends CompanyDynamicHandle {
    * @param new_map
    * @param new_map
    * @return
    * @return
    */
    */
-  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = new_map("case_no")
-
-  /**
-   * 变更内容
-   *
-   * @param old_map
-   * @param new_map
-   * @return
-   */
-  override def get_change_content(old_map: Map[String, String], new_map: Map[String, String], cname: String = null): String = {
-    s"""案号:${new_map("case_no")}\n
-       |限消令对象:${new_map("name")}\n
-       |立案日期:${new_map("case_create_time")}\n
-       |发布日期:${new_map("update_time")}\n""".stripMargin
+  override def get_rta_desc(old_map: Map[String, String], new_map: Map[String, String]): String = {
+    s"""案号:${new_map("case_no")}
+       |限消令对象:${new_map("name")}
+       |立案日期:${new_map("case_create_time")}
+       |发布日期:${new_map("update_time")}""".stripMargin
   }
   }
-
   /**
   /**
    * 变更时间
    * 变更时间
    *
    *

+ 3 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/dynamic/tables/wenshu_detail_combine.scala

@@ -65,4 +65,7 @@ case class wenshu_detail_combine() extends CompanyDynamicHandle {
     t2
     t2
   }
   }
 
 
+  override protected def get_biz_id(rowkey: String, new_map: Map[String, String]): String = {
+    new_map("case_id")
+  }
 }
 }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -116,11 +116,11 @@ object BaseUtil {
     }
     }
   }
   }
   def BKDRHash(str: String): Long = {
   def BKDRHash(str: String): Long = {
-    val seed: Long = 131 // 31 131 1313 13131 131313 etc..
+    val seed: Long = 1313131313 // 31 131 1313 13131 131313 etc..
     var hash: Long = 0
     var hash: Long = 0
     for (i <- 0 to str.length - 1) {
     for (i <- 0 to str.length - 1) {
       hash = hash * seed + str.charAt(i)
       hash = hash * seed + str.charAt(i)
-      hash = hash & 0x7FFFFFFF
+      hash = hash & 0x7FFFFFFFFFFFFFFFL
     }
     }
     return hash
     return hash
   }
   }

+ 61 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrCombineUtils.scala

@@ -0,0 +1,61 @@
+package com.winhc.bigdata.spark.utils
+
+import org.apache.commons.lang3.StringUtils
+import org.apache.spark.sql.SparkSession
+
+import scala.collection.mutable
+
+/**
+ * @Description: 自有增量数据合并到天眼查数据
+ * @author π
+ * @date 2020/8/3114:07
+ */
+object CompanyIncrCombineUtils {
+  def main(args: Array[String]): Unit = {
+    val Array(project, source, target) = args
+
+    println(
+      s"""
+         |project:$project
+         |source:$source
+         |target:$target
+         |""".stripMargin)
+
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.hadoop.odps.spark.local.partition.amt" -> "1000"
+    )
+    val spark = SparkUtils.InitEnv(getClass.getSimpleName, config)
+    CompanyIncrCombineUtils(spark, source, target).calc()
+    spark.stop()
+  }
+}
+
+case class CompanyIncrCombineUtils(s: SparkSession, source: String, target: String) extends LoggingUtils {
+  override protected val spark: SparkSession = s
+
+  def calc(): Unit = {
+
+    val ds2 = BaseUtil.getPartion(s"$target", spark) //目标表数据
+
+    val cols: Seq[String] = spark.table(target).schema.map(_.name).filter(s => {
+      !s.equals("ds")
+    })
+
+    //判断目标表是否之前合并过
+    val list = sql(
+      s"""
+         |select max(ds) max_ds from $target where id = -1 and ds > '0'
+         |""".stripMargin).collect().toList.map(_.getAs[String]("max_ds"))
+
+    println(s"list: $list")
+
+    sql(
+      s"""
+         |INSERT into table $target PARTITION(ds=$ds2)
+         |SELECT ${cols.mkString(",")} from
+         |$source
+         |where ds > '${if (StringUtils.isNotBlank(list.head)) s"${list.head}" else s"0"}'
+         |""".stripMargin)
+  }
+}