浏览代码

摘要写出到hbase,公司基本信息更新到ads hbase和es

许家凯 4 年之前
父节点
当前提交
bd41fa12b4

+ 163 - 0
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncCompany2Es.scala

@@ -0,0 +1,163 @@
+package com.winhc.bigdata.spark.jobs
+
+import com.winhc.bigdata.spark.const.BaseConst
+import com.winhc.bigdata.spark.utils.{BaseUtil, EsUtils, HBaseUtils, LoggingUtils, SparkUtils}
+import org.apache.hadoop.hbase.client.Put
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
+import org.apache.hadoop.hbase.util.Bytes
+import org.apache.spark.sql.SparkSession
+
+import scala.annotation.meta.getter
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/29 10:28
+ * @Description: 增量公司基本信息数据写入到 ads 和 es
+ */
+object CompanyIncCompany2Es {
+  val outFields = Seq(
+    "CID"
+    , "BASE"
+    , "NAME"
+    , "NAME_EN"
+    , "NAME_ALIAS"
+    , "HISTORY_NAMES"
+    , "LEGAL_ENTITY_ID"
+    , "LEGAL_ENTITY_TYPE"
+    , "REG_NUMBER"
+    , "COMPANY_ORG_TYPE"
+    , "REG_LOCATION"
+    , "ESTIBLISH_TIME"
+    , "FROM_TIME"
+    , "TO_TIME"
+    , "BUSINESS_SCOPE"
+    , "REG_INSTITUTE"
+    , "APPROVED_TIME"
+    , "REG_STATUS"
+    , "REG_CAPITAL"
+    , "ORG_NUMBER"
+    , "ORG_APPROVED_INSTITUTE"
+    , "CURRENT_CID"
+    , "PARENT_CID"
+    , "COMPANY_TYPE"
+    , "CREDIT_CODE"
+    , "SCORE"
+    , "CATEGORY_CODE"
+    , "LAT"
+    , "LNG"
+    , "AREA_CODE"
+    , "REG_CAPITAL_AMOUNT"
+    , "REG_CAPITAL_CURRENCY"
+    , "ACTUAL_CAPITAL_AMOUNT"
+    , "ACTUAL_CAPITAL_CURRENCY"
+    , "REG_STATUS_STD"
+    , "SOCIAL_SECURITY_STAFF_NUM"
+    , "CANCEL_DATE"
+    , "CANCEL_REASON"
+    , "REVOKE_DATE"
+    , "REVOKE_REASON"
+    , "EMAILS"
+    , "PHONES"
+    , "WECHAT_PUBLIC_NUM"
+    , "LOGO"
+    , "CRAWLED_TIME"
+    , "CREATE_TIME"
+    , "UPDATE_TIME"
+    , "DELETED"
+  )
+
+  case class Company2Es(s: SparkSession, project: String, bizDate: String) extends LoggingUtils {
+    @(transient@getter) val spark: SparkSession = s
+
+    def calc() {
+      val partition = bizDate.replaceAll("\\-", "")
+      if (partition.length != 8) {
+        println("biz date is error!")
+        sys.exit(-99)
+      }
+      val inc_ods_partitions = BaseUtil.getPartitions(s"${project}.inc_ods_company", spark)
+      val end_partition = if (inc_ods_partitions.isEmpty) partition else inc_ods_partitions.last
+
+      val inc_ads_partitions = BaseUtil.getPartitions(s"${project}.inc_ads_company", spark)
+      val start_partition = if (inc_ads_partitions.isEmpty) '0' else inc_ads_partitions.last
+
+      if (start_partition.equals(end_partition)) {
+        println("start_partition == end_partition")
+        sys.exit(-999)
+      }
+
+      val companyCols = spark.table("ads_company").columns
+        .filter(!_.equals("ds"))
+        .seq
+
+      //读取数据
+      val df = sql(
+        s"""
+           |SELECT  ${companyCols.mkString(",")}
+           |FROM    $project.inc_ods_company
+           |WHERE   ds > $start_partition and ds <= $end_partition
+           |""".stripMargin)
+
+      df.createOrReplaceTempView("tmp_company_inc")
+
+      //写出到ads
+      sql(
+        s"""
+           |INSERT ${if (BaseUtil.isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.inc_ads_company PARTITION(ds='$end_partition')
+           |SELECT ${companyCols.mkString(",")}
+           |FROM
+           |    tmp_company_inc
+           |""".stripMargin)
+
+      import spark.implicits._
+      //写出到hbase
+      import org.apache.spark.sql.functions.col
+      val jobConf = HBaseUtils.HBaseOutputJobConf("COMPANY")
+      val stringDf = df.select(companyCols.map(column => col(column).cast("string")): _*)
+      stringDf.rdd.map(row => {
+        val id = row.getAs[String]("cid")
+        val put = new Put(Bytes.toBytes(id))
+        for (f <- outFields) {
+          val v = row.getAs[String](f.toLowerCase)
+          if (v != null) {
+            put.addColumn(BaseConst.F_BYTES, Bytes.toBytes(f), Bytes.toBytes(v))
+          }
+        }
+        (new ImmutableBytesWritable, put)
+      }).filter(_ != null).saveAsHadoopDataset(jobConf)
+
+      //写出到es
+      import com.winhc.bigdata.spark.utils.CompanyEsUtils.getEsDoc
+      import org.elasticsearch.spark._
+      stringDf.map(r => {
+        val cid = r.getAs[String]("cid")
+        val cname = r.getAs[String]("name")
+        val history_names = r.getAs[String]("history_names")
+        val current_cid = r.getAs[String]("current_cid")
+        val company_type = r.getAs[String]("company_type")
+        getEsDoc(cid, cname, history_names, current_cid, company_type)
+      }).rdd.saveToEsWithMeta("winhc-company/company")
+
+    }
+  }
+
+  def main(args: Array[String]): Unit = {
+    if (args.length != 2) {
+      println("please enter project and bizDate!")
+      sys.exit(-99)
+    }
+
+    val Array(project, bizDate) = args
+
+    val config = EsUtils.getEsConfigMap ++ mutable.Map(
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+
+    val spark = SparkUtils.InitEnv("company2Es", config)
+
+    Company2Es(spark, project, bizDate).calc
+    spark.stop()
+  }
+}

+ 2 - 29
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIndexSave2Es.scala

@@ -1,9 +1,7 @@
 package com.winhc.bigdata.spark.jobs
 
+import com.winhc.bigdata.spark.utils.CompanyEsUtils.getEsDoc
 import com.winhc.bigdata.spark.utils.{EsUtils, SparkUtils}
-import org.apache.commons.lang3.StringUtils
-
-import scala.collection.JavaConverters._
 
 /**
  * @Author: XuJiakai
@@ -11,28 +9,6 @@ import scala.collection.JavaConverters._
  * @Description:
  */
 object CompanyIndexSave2Es {
-  val pattern = "[^\\u4e00-\\u9fa50-9a-zA-Z]".r
-
-  case class CompanyName(show: String, value: String)
-
-  case class CompanyDoc(cname: CompanyName, current_id: String = null, history_name: Seq[CompanyName] = null, company_type: String)
-
-  def getEsDoc(cid: String, cname: String, other_id_name: scala.collection.Map[String, String], new_cid: String, company_type: String): (String, CompanyDoc) = {
-    var history_name: Seq[CompanyName] = null
-    if (other_id_name != null) {
-      history_name = other_id_name
-        .filterKeys(!_.equals(new_cid))
-        .filterKeys(!_.equals(cid))
-        .values
-        .map(getCompanyName)
-        .toSeq
-      if (history_name.isEmpty) {
-        history_name = null
-      }
-    }
-    (cid, CompanyDoc(getCompanyName(cname), if (cid.equals(new_cid)) null else new_cid, history_name, company_type))
-  }
-
   private def getOtherIdName(str: String): scala.collection.Map[String, String] = {
     if (str == null) {
       return null
@@ -43,18 +19,15 @@ object CompanyIndexSave2Es {
     }).toMap
   }
 
-
-  private def getCompanyName(name: String): CompanyName = if (StringUtils.isEmpty(name)) null else CompanyName(name, pattern replaceAllIn(name, ""))
-
   def main(args: Array[String]): Unit = {
     val map = EsUtils.getEsConfigMap
 
     val company_name_mapping = "winhc_eci_dev.company_name_mapping_pro_v2"
 
     val spark = SparkUtils.InitEnv("CompanyIndexSave2Es", map)
+    val df = spark.sql(s"select cid,cname,other_id_name,new_cid,company_type from $company_name_mapping")
     import org.elasticsearch.spark._
     import spark.implicits._
-    val df = spark.sql(s"select cid,cname,other_id_name,new_cid,company_type from $company_name_mapping")
     df.map(r => {
       val cid = r.getLong(0).toString
       val cname = r.getString(1)

+ 6 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/CompanySummaryInc.scala

@@ -12,10 +12,15 @@ import scala.collection.mutable
 object CompanySummaryInc {
   //winhc_eci_dev company_icp new_cid liscense,domain,new_cid
   def main(args: Array[String]): Unit = {
+    if (args.length != 4) {
+      println("please enter project,tableName,cidField,dupliCols")
+      sys.exit(-99)
+    }
     val Array(project, tableName, cidField, dupliCols) = args
 
     val config = mutable.Map(
-      "spark.hadoop.odps.project.name" -> project
+      "spark.hadoop.odps.project.name" -> project,
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )
     val spark = SparkUtils.InitEnv("CompanySummaryInc", config)
     CompanyIncSummary(spark, project, tableName, cidField, dupliCols.split(",").seq).calc

+ 6 - 0
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -13,6 +13,12 @@ import org.apache.spark.sql.SparkSession
 object BaseUtil {
   def isWindows: Boolean = System.getProperty("os.name").contains("Windows")
 
+  def getPartitions(t: String, @transient spark: SparkSession):Seq[String] = {
+    import spark._
+    val sql_s = s"show partitions " + t
+    sql(sql_s).collect.toList.map(_.getString(0).split("=")(1)).seq
+  }
+
   def getPartion(t: String, @transient spark: SparkSession) = {
     import spark._
     val sql_s = s"show partitions " + t

+ 52 - 0
src/main/scala/com/winhc/bigdata/spark/utils/CompanyEsUtils.scala

@@ -0,0 +1,52 @@
+package com.winhc.bigdata.spark.utils
+
+import org.apache.commons.lang3.StringUtils
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/29 10:51
+ * @Description:
+ */
+object CompanyEsUtils {
+  val pattern = "[^\\u4e00-\\u9fa50-9a-zA-Z]".r
+
+  case class CompanyName(show: String, value: String)
+
+  case class CompanyDoc(cname: CompanyName, current_id: String = null, history_name: Seq[CompanyName] = null, company_type: String)
+
+
+  def getEsDoc(cid: String, cname: String, historyName: String, currentId: String, company_type: String): (String, CompanyDoc) = {
+    var history_name: Seq[CompanyName] = null
+    if (StringUtils.isNoneEmpty(historyName)) {
+      history_name = historyName.split("\t;\t")
+        .filter(!cname.equals(_))
+        .filter(StringUtils.isNoneEmpty(_))
+        .map(getCompanyName)
+        .toSeq
+
+      if (history_name.isEmpty) {
+        history_name = null
+      }
+    }
+    (cid, CompanyDoc(getCompanyName(cname), if (cid.equals(currentId)) null else currentId, history_name, company_type))
+  }
+
+
+  def getEsDoc(cid: String, cname: String, other_id_name: scala.collection.Map[String, String], new_cid: String, company_type: String): (String, CompanyDoc) = {
+    var history_name: Seq[CompanyName] = null
+    if (other_id_name != null) {
+      history_name = other_id_name
+        .filterKeys(!_.equals(new_cid))
+        .filterKeys(!_.equals(cid))
+        .values
+        .map(getCompanyName)
+        .toSeq
+      if (history_name.isEmpty) {
+        history_name = null
+      }
+    }
+    (cid, CompanyDoc(getCompanyName(cname), if (cid.equals(new_cid)) null else new_cid, history_name, company_type))
+  }
+
+  private def getCompanyName(name: String): CompanyName = if (StringUtils.isEmpty(name)) null else CompanyName(name, pattern replaceAllIn(name, ""))
+}

+ 15 - 17
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncSummary.scala

@@ -4,6 +4,7 @@ import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.hbase.util.Bytes
 import org.apache.spark.sql.SparkSession
+import org.apache.spark.sql.functions.col
 
 import scala.annotation.meta.getter
 
@@ -27,14 +28,11 @@ case class CompanyIncSummary(s: SparkSession,
     val ads_table = s"${project}.ads_$tableName" //存量ads表
     val inc_ads_table = s"${project}.inc_ads_$tableName"
 
-
     val partition = sql(s"show partitions $ads_table").collect.toList
       .map(_.getString(0).split("=")(1))
       .last
 
 
-    val jobConf = HBaseUtils.HBaseOutputJobConf("COMPANY_SUMMARY")
-
     val ads_table_cols = spark.table(ads_table).columns.filter(l => {
       !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag")
     }).toList.sorted
@@ -79,29 +77,29 @@ case class CompanyIncSummary(s: SparkSession,
          |WHERE   tmp2.c = 1
          |""".stripMargin).cache().createOrReplaceTempView("inc_tmp_view")
 
+
+    val jobConf = HBaseUtils.HBaseOutputJobConf("COMPANY_SUMMARY")
     sql(
       s"""
-         |CREATE TABLE IF NOT EXISTS winhc_eci_dev.xjk_tmp_count_company_icp AS
          |SELECT  ${cidField} as cid
          |        ,COUNT(1) as num
          |FROM    inc_tmp_view
          |GROUP BY $cidField
          |""".stripMargin)
-
-    /* .rdd.map(row => {
-     val id = row(0).asInstanceOf[String]
-     val num = row(1).asInstanceOf[String]
-     val put = new Put(Bytes.toBytes(id))
-     if (!"0".equals(num)) {
-       put.addColumn(f_bytes, name_bytes, Bytes.toBytes(num))
-       (new ImmutableBytesWritable, put)
-     } else {
-       return null
-     }
-   }).filter(_ != null).saveAsHadoopDataset(jobConf)*/
+      .select(Seq("cid", "num").map(column => col(column).cast("string")): _*)
+      .rdd
+      .filter(r => {
+        r.get(1) != null && !"0".equals(r.getString(1))
+      }).map(row => {
+      val id = row.getString(0)
+      val num = row.getString(1)
+      val put = new Put(Bytes.toBytes(id))
+      put.addColumn(f_bytes, name_bytes, Bytes.toBytes(num))
+      (new ImmutableBytesWritable, put)
+    }).filter(_ != null).saveAsHadoopDataset(jobConf)
   }
 
-  def getCastCols(name: String, pre: String): String = {
+  private def getCastCols(name: String, pre: String): String = {
     val list = List("cid", "new_cid", "ncid")
     if (list.contains(name)) {
       return s"CAST(${pre}${name} as BIGINT) $name"

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/utils/HbaseUtil.scala

@@ -43,7 +43,7 @@ object HbaseUtil {
   }
 
   def main(args: Array[String]): Unit = {
-    val row = getRowData(getTable("COMPANY_SCORE_V3"), "100008680_101")
+    val row = getRowData(getTable("company_name_kv"), "100008680")
     println(row)
   }
 }