Browse Source

fix: 调整v8索引

许家凯 3 years ago
parent
commit
e847b54b1e
1 changed files with 20 additions and 25 deletions
  1. 20 25
      src/main/scala/com/winhc/bigdata/spark/ng/jobs/CompanyIndexJob.scala

+ 20 - 25
src/main/scala/com/winhc/bigdata/spark/ng/jobs/CompanyIndexJob.scala

@@ -80,16 +80,7 @@ case class CompanyIndexJob(s: SparkSession,
   }
 
   private val target_tab = "winhc_ng.out_company_v8_index"
-  private val target_tab_simp = "winhc_ng.out_company_v8_index_simp"
-  val target_tab_simp_cols = Seq("id"
-    , "cname"
-    , "history_name"
-    , "new_cid"
-    , "company_tm"
-    , "app_info"
-    , "company_score_weight"
-    , "deleted"
-  )
+
 
   private val org_prefix = "ads"
 
@@ -187,6 +178,20 @@ case class CompanyIndexJob(s: SparkSession,
     calc(c, h, s, t, a, i, insert_ds)
   }
 
+  def getCompanyRank(): String = {
+    val last_ds = getLastPartitionsOrElse("winhc_ng.out_company_rank", "0")
+    sql(
+      s"""
+         |select company_id,
+         |       rank
+         |from winhc_ng.out_company_rank
+         |where ds = '$last_ds'
+         |""".stripMargin)
+      .createTempView("company_rank_tmp")
+    "company_rank_tmp"
+  }
+
+
   private def calc(company_tab: String
                    , holder_tab: String
                    , staff_tab: String
@@ -203,6 +208,7 @@ case class CompanyIndexJob(s: SparkSession,
     val base_tm = get_company_tm_tab(company_tm_tab, "tm_name", "company_tm")
     val base_app_info = getTab(company_app_info_tab, "filter_name", "app_info")
     val base_icp = getTab(company_icp_tab, "web_name", "icp")
+    val companyRank = getCompanyRank()
 
     var all_tab = addField(base_company, base_holder, "company_id", "holder", "holder")
 
@@ -214,6 +220,8 @@ case class CompanyIndexJob(s: SparkSession,
 
     all_tab = addField(all_tab, base_icp, "company_id", "icp", "icp")
 
+    all_tab = addField(all_tab, companyRank, "company_id", "rank", "company_rank")
+
     val df = sql(
       s"""
          |SELECT  id
@@ -248,6 +256,7 @@ case class CompanyIndexJob(s: SparkSession,
          |        ,geo
          |        ,logo
          |        ,reg_number
+         |        ,company_rank
          |        ,company_score_weight
          |        ,COALESCE(deleted,'0') AS deleted
          |from    $all_tab
@@ -266,21 +275,7 @@ case class CompanyIndexJob(s: SparkSession,
          |    out_company_index
          |""".stripMargin)
 
-
-    tab_verify(target_tab_simp_cols, target_tab_simp)
-
-    sql(
-      s"""
-         |INSERT OVERWRITE TABLE $target_tab_simp PARTITION(ds='$target_ds')
-         |SELECT ${getColumns(target_tab_simp).diff(Seq("ds")).mkString(",")}
-         |FROM
-         |    $target_tab
-         |WHERE
-         |    ds='$target_ds'
-         |""".stripMargin)
-
     addEmptyPartitionOrSkip(target_tab, target_ds)
-    addEmptyPartitionOrSkip(target_tab_simp, target_ds)
   }
 
   private def tab_verify(out_f: Seq[String], tab: String, ignore_f: Seq[String] = Seq("ds")): Unit = {
@@ -352,7 +347,7 @@ case class CompanyIndexJob(s: SparkSession,
          |                ,cate_third_code
          |                ,reg_status
          |                ,reg_status_std
-         |                ,get_company_org_type_std(name,company_org_type) AS company_org_type_std
+         |                ,get_company_org_type_std(name,company_org_type,credit_code) AS company_org_type_std
          |                ,company_type
          |                ,credit_code
          |                ,reg_capital