浏览代码

fix: v8公司索引更新

许家凯 3 年之前
父节点
当前提交
00f478d994

+ 104 - 13
src/main/scala/com/winhc/bigdata/spark/ng/jobs/CompanyIndexJob.scala

@@ -1,6 +1,7 @@
 package com.winhc.bigdata.spark.ng.jobs
 
 import com.winhc.bigdata.spark.config.EsConfig
+import com.winhc.bigdata.spark.ng.utils.DomainEntity
 import com.winhc.bigdata.spark.udf.{BaseFunc, CompanyIndexFunc}
 import com.winhc.bigdata.spark.utils._
 import org.apache.commons.lang3.StringUtils
@@ -22,6 +23,32 @@ case class CompanyIndexJob(s: SparkSession,
   reg_urd()
 
   def reg_urd(): Unit = {
+
+    def icp_web_site(icp_domain: String, web_site: String, site_url: String): String = {
+      def parse_array(str: String): Seq[String] = {
+        if (StringUtils.isEmpty(str)) {
+          Seq.empty
+        } else {
+          str.replaceAll("[;\t\n;,。]", ",").split(',').filter(StringUtils.isNotBlank).distinct
+        }
+      }
+
+      if (StringUtils.isEmpty(icp_domain) && StringUtils.isBlank(web_site) && StringUtils.isBlank(site_url)) {
+        null
+      } else {
+        val url_seq = parse_array(icp_domain) ++ parse_array(web_site) ++ parse_array(site_url)
+        val u = url_seq.distinct
+        if (u.isEmpty) {
+          null
+        } else {
+          u.map(s => DomainEntity(s)).groupBy(d => d.hashCode()).map(s => s._2.max).map(d => d.domain).mkString(",")
+        }
+      }
+    }
+
+    spark.udf.register("icp_domain_trim", icp_web_site _)
+
+
     company_org_type_udf()
     cleanup()
     area_code()
@@ -79,7 +106,7 @@ case class CompanyIndexJob(s: SparkSession,
     spark.udf.register("get_amount", RegCapitalAmount.getAmount _)
   }
 
-  private val target_tab = "winhc_ng.out_company_v8_index"
+  private val target_tab = "winhc_ng.out_company_v8_index_test"
 
 
   private val org_prefix = "ads"
@@ -99,11 +126,12 @@ case class CompanyIndexJob(s: SparkSession,
     val t = get_row_number("company_tm", Seq("rowkey"))
     val a = get_row_number("company_app_info", Seq("rowkey"))
     val i = get_row_number("company_icp", Seq("rowkey"))
+    val f = get_row_number("company_finance", Seq("rowkey"))
 
     val default_ds = getLastPartitionsOrElse(s"winhc_ng.${org_prefix}_company", "0")
     val target_ds = getLastPartitionsOrElse(s"winhc_ng.inc_${org_prefix}_company", default_ds)
 
-    calc(c, h, s, t, a, i, target_ds)
+    calc(c, h, s, t, a, i, f, target_ds)
   }
 
   private def inc(): Unit = {
@@ -174,8 +202,9 @@ case class CompanyIndexJob(s: SparkSession,
     val t = get_alt_tab("company_tm", Seq("rowkey"))
     val a = get_alt_tab("company_app_info", Seq("rowkey"))
     val i = get_alt_tab("company_icp", Seq("rowkey"))
+    val f = get_alt_tab("company_finance", Seq("rowkey"))
 
-    calc(c, h, s, t, a, i, insert_ds)
+    calc(c, h, s, t, a, i, f, insert_ds)
   }
 
   def getCompanyRank(): String = {
@@ -198,30 +227,51 @@ case class CompanyIndexJob(s: SparkSession,
                    , company_tm_tab: String
                    , company_app_info_tab: String
                    , company_icp_tab: String
+                   , company_finance: String
                    , target_ds: String
                   ): Unit = {
 
     val base_company = get_company_tab(company_tab)
     val base_holder = get_company_holder_tab(holder_tab)
+    val base_holder_history = get_company_holder_tab(holder_tab, is_history = true)
     val base_staff = get_staff_tab(staff_tab)
+    val base_staff_history = get_staff_tab(staff_tab, is_history = true)
 
     val base_tm = get_company_tm_tab(company_tm_tab, "tm_name", "company_tm")
     val base_app_info = getTab(company_app_info_tab, "filter_name", "app_info")
     val base_icp = getTab(company_icp_tab, "web_name", "icp")
+
+    val base_icp_domain = getIcpDomainTab(company_icp_tab, "icp_domain")
+
+    //    val base_icp_domain = getTab(company_icp_tab, "domain", "icp_domain")
+    //    val base_icp_site_url = getTab(company_icp_tab, "site_url", "icp_site_url")
+    //    val base_icp_web_site = getTab(company_icp_tab, "web_site", "icp_web_site")
+
+    val base_company_finance = getSingleValTab(company_finance, "round", "finance_round", "coalesce(finance_time,report_date)")
     val companyRank = getCompanyRank()
 
     var all_tab = addField(base_company, base_holder, "company_id", "holder", "holder")
 
+    all_tab = addField(all_tab, base_holder_history, "company_id", "holder", "holder_history")
+
     all_tab = addField(all_tab, base_staff, "company_id", "staff", "staff")
 
+    all_tab = addField(all_tab, base_staff_history, "company_id", "staff", "staff_history")
+
     all_tab = addField(all_tab, base_tm, "company_id", "company_tm", "company_tm")
 
     all_tab = addField(all_tab, base_app_info, "company_id", "app_info", "app_info")
 
     all_tab = addField(all_tab, base_icp, "company_id", "icp", "icp")
 
+    all_tab = addField(all_tab, base_icp_domain, "company_id", "icp_domain", "icp_domain")
+    //    all_tab = addField(all_tab, base_icp_site_url, "company_id", "icp_site_url", "icp_site_url")
+    //    all_tab = addField(all_tab, base_icp_web_site, "company_id", "icp_web_site", "icp_web_site")
+
     all_tab = addField(all_tab, companyRank, "company_id", "rank", "company_rank")
 
+    all_tab = addField(all_tab, base_company_finance, "company_id", "finance_round", "finance_round")
+
     val df = sql(
       s"""
          |SELECT  id
@@ -232,7 +282,11 @@ case class CompanyIndexJob(s: SparkSession,
          |        ,legal_entity_type
          |        ,legal_entity_name
          |        ,holder
+         |        ,holder_history
          |        ,staff
+         |        ,staff_history
+         |        ,finance_round
+         |        ,upper(org_number) as org_number
          |        ,province_code
          |        ,city_code
          |        ,county_code
@@ -244,20 +298,21 @@ case class CompanyIndexJob(s: SparkSession,
          |        ,reg_status_std
          |        ,company_org_type_std
          |        ,company_type
-         |        ,credit_code
+         |        ,upper(credit_code) as credit_code
          |        ,reg_capital
          |        ,reg_capital_amount
          |        ,reg_location
          |        ,company_tm
          |        ,icp
+         |        ,icp_domain
          |        ,app_info
          |        ,phones
          |        ,emails
          |        ,geo
          |        ,logo
-         |        ,reg_number
+         |        ,upper(reg_number) as reg_number
          |        ,company_rank
-         |        ,company_score_weight
+         |        ,if(finance_round is not null,company_score_weight+2,company_score_weight) as company_score_weight
          |        ,COALESCE(deleted,'0') AS deleted
          |from    $all_tab
          |""".stripMargin)
@@ -307,6 +362,7 @@ case class CompanyIndexJob(s: SparkSession,
          |        ,legal_entity_type
          |        ,legal_entity_name
          |        ,province_code
+         |        ,org_number
          |        ,city_code
          |        ,county_code
          |        ,estiblish_time
@@ -341,6 +397,7 @@ case class CompanyIndexJob(s: SparkSession,
          |                ,province_code
          |                ,city_code
          |                ,county_code
+         |                ,org_number
          |                ,to_millis_timestamp(estiblish_time) AS estiblish_time
          |                ,cate_first_code
          |                ,cate_second_code
@@ -366,8 +423,9 @@ case class CompanyIndexJob(s: SparkSession,
     alias_tab
   }
 
-  private def get_company_holder_tab(holder_tab: String): String = {
-    val alias_tab = "alias_base_" + holder_tab
+  private def get_company_holder_tab(holder_tab: String, is_history: Boolean = false): String = {
+    val flag = if (is_history) "1" else "0"
+    val alias_tab = "alias_base_" + holder_tab + "_" + flag
     sql(
       s"""
          |SELECT  company_id
@@ -379,7 +437,7 @@ case class CompanyIndexJob(s: SparkSession,
          |FROM    (
          |        select  *
          |        from    $holder_tab
-         |        where   deleted = 0
+         |        where   deleted = $flag
          |)
          |GROUP BY company_id
          |""".stripMargin)
@@ -387,8 +445,9 @@ case class CompanyIndexJob(s: SparkSession,
     alias_tab
   }
 
-  private def get_staff_tab(staff_tab: String): String = {
-    val alias_tab = "alias_base_" + staff_tab
+  private def get_staff_tab(staff_tab: String, is_history: Boolean = false): String = {
+    val flag = if (is_history) "1" else "0"
+    val alias_tab = "alias_base_" + staff_tab + "_" + flag
     sql(
       s"""
          |SELECT  company_id
@@ -400,7 +459,7 @@ case class CompanyIndexJob(s: SparkSession,
          |FROM    (
          |        select *
          |        from   $staff_tab
-         |        where  deleted = 0
+         |        where  deleted = $flag
          |)
          |GROUP BY company_id
          |""".stripMargin)
@@ -422,10 +481,42 @@ case class CompanyIndexJob(s: SparkSession,
     alias_tab
   }
 
-  private def getTab(tab: String, name: String, alias_name: String): String = {
+  private def getSingleValTab(tab: String, name: String, alias_name: String, order_by: String): String = {
     val alias_tab = "alias_base_" + tab
     sql(
       s"""
+         |SELECT  company_id,$name as $alias_name
+         |FROM    (
+         |            SELECT  *
+         |                    ,ROW_NUMBER() OVER(PARTITION BY company_id ORDER BY $order_by DESC) AS xjk_num_abc
+         |            FROM    $tab
+         |            WHERE   $name is not null and trim($name) <> '' and deleted = 0 and $order_by is not null
+         |        )
+         |WHERE   xjk_num_abc = 1
+         |""".stripMargin)
+      .createTempView(alias_tab)
+    alias_tab
+  }
+
+
+  private def getIcpDomainTab(tab: String, alias_name: String): String = {
+    val alias_tab = "alias_base_" + tab + "_" + alias_name
+    sql(
+      s"""
+         | SELECT  company_id
+         |         ,concat_ws(',',collect_set(icp_domain_trim(domain,site_url,web_site))) AS $alias_name
+         | FROM    $tab
+         | where deleted = 0
+         | GROUP BY company_id
+         |""".stripMargin)
+      .createTempView(alias_tab)
+    alias_tab
+  }
+
+  private def getTab(tab: String, name: String, alias_name: String): String = {
+    val alias_tab = "alias_base_" + tab + "_" + alias_name
+    sql(
+      s"""
          | SELECT  company_id
          |         ,concat_ws(',',collect_set($name)) AS $alias_name
          | FROM    $tab

+ 49 - 0
src/main/scala/com/winhc/bigdata/spark/ng/utils/DomainEntity.scala

@@ -0,0 +1,49 @@
+package com.winhc.bigdata.spark.ng.utils
+
+import com.winhc.bigdata.spark.utils.BaseUtil
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/11/17 10:08
+ */
+case class DomainEntity(domain: String) extends Comparable[DomainEntity] {
+  override def compareTo(o: DomainEntity): Int = domain.length.compareTo(o.domain.length)
+
+  override def hashCode(): Int = {
+    val d = BaseUtil.trim_url_domain(domain)
+    if (d == null) {
+      domain.hashCode
+    } else d.hashCode()
+  }
+
+  override def equals(obj: Any): Boolean =
+    obj match {
+      case d: DomainEntity => {
+        var thisDomain = BaseUtil.trim_url_domain(domain)
+        var otherDomain = BaseUtil.trim_url_domain(d.domain)
+        if (thisDomain == null) {
+          thisDomain = domain
+        }
+        if (otherDomain == null) {
+          otherDomain = d.domain
+        }
+        thisDomain.equals(otherDomain)
+      }
+      case _ =>
+        false
+    }
+}
+
+
+object DomainEntity {
+  def main(args: Array[String]): Unit = {
+    val seq = "https://网络.中国,www.yuxianweb.com,baijinggame.cn,www.baijinggame.cn,baijinggame.com,www.baijinggame.com,yuxianweb.cn,www.yuxianweb.cn"
+
+   val ss =  seq.replaceAll("[;\t\n]",",").split(',').filter(StringUtils.isNotEmpty).distinct.mkString(",")
+    println(ss)
+
+    val s = seq.split(",").map(s => DomainEntity(s)).groupBy(d => d.hashCode()).map(s => s._2.max).map(d => d.domain).mkString(",")
+    println(s)
+  }
+}