123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250 |
- package com.winhc.bigdata.spark.jobs
- import com.winhc.bigdata.spark.config.{EsConfig, HBaseConfig}
- import com.winhc.bigdata.spark.const.BaseConst
- import com.winhc.bigdata.spark.udf.BaseFunc
- import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
- import org.apache.hadoop.hbase.client.Put
- import org.apache.hadoop.hbase.io.ImmutableBytesWritable
- import org.apache.hadoop.hbase.util.Bytes
- import org.apache.spark.sql.SparkSession
- import scala.annotation.meta.getter
- import scala.collection.mutable
- /**
- * @Author: XuJiakai
- * @Date: 2020/6/29 10:28
- * @Description: 增量公司基本信息数据写入到 ads 和 es
- */
- object CompanyIncCompany2Es {
- val outFields = Seq(
- "CID"
- , "BASE"
- , "NAME"
- , "NAME_EN"
- , "NAME_ALIAS"
- , "HISTORY_NAMES"
- , "LEGAL_ENTITY_ID"
- , "LEGAL_ENTITY_TYPE"
- , "REG_NUMBER"
- , "COMPANY_ORG_TYPE"
- , "REG_LOCATION"
- , "ESTIBLISH_TIME"
- , "FROM_TIME"
- , "TO_TIME"
- , "BUSINESS_SCOPE"
- , "REG_INSTITUTE"
- , "APPROVED_TIME"
- , "REG_STATUS"
- , "REG_CAPITAL"
- , "ORG_NUMBER"
- , "ORG_APPROVED_INSTITUTE"
- , "CURRENT_CID"
- , "PARENT_CID"
- , "COMPANY_TYPE"
- , "CREDIT_CODE"
- , "SCORE"
- , "CATEGORY_CODE"
- , "LAT"
- , "LNG"
- , "AREA_CODE"
- , "REG_CAPITAL_AMOUNT"
- , "REG_CAPITAL_CURRENCY"
- , "ACTUAL_CAPITAL_AMOUNT"
- , "ACTUAL_CAPITAL_CURRENCY"
- , "REG_STATUS_STD"
- , "SOCIAL_SECURITY_STAFF_NUM"
- , "CANCEL_DATE"
- , "CANCEL_REASON"
- , "REVOKE_DATE"
- , "REVOKE_REASON"
- , "EMAILS"
- , "PHONES"
- , "WECHAT_PUBLIC_NUM"
- , "LOGO"
- , "CRAWLED_TIME"
- , "CREATE_TIME"
- , "UPDATE_TIME"
- , "DELETED"
- )
- val outFields_Human = Seq(
- "NEW_CID"
- ,"CID"
- ,"ID"
- ,"COMPANY_NAME"
- ,"HUMAN_NAME"
- ,"HID"
- ,"HUMAN_PID"
- ,"STATUS"
- ,"CREATE_TIME"
- ,"UPDATE_TIME"
- ,"DELETED"
- )
- case class Company2Es(s: SparkSession, project: String, bizDate: String) extends LoggingUtils with BaseFunc {
- @(transient@getter) val spark: SparkSession = s
- def calc() {
- val code = code2Name()
- val partition = bizDate.replaceAll("\\-", "")
- if (partition.length != 8) {
- println("biz date is error!")
- sys.exit(-99)
- }
- val inc_ods_partitions = BaseUtil.getPartitions(s"${project}.inc_ods_company", spark)
- val end_partition = if (inc_ods_partitions.isEmpty) partition else inc_ods_partitions.last
- val inc_ads_partitions = BaseUtil.getPartitions(s"${project}.inc_ads_company", spark)
- val start_partition = if (inc_ads_partitions.isEmpty) '0' else inc_ads_partitions.last
- if (start_partition.equals(end_partition)) {
- println("start_partition == end_partition")
- sys.exit(-999)
- }
- val companyCols = spark.table(s"${project}.ads_company").columns
- .filter(!_.equals("ds"))
- .seq
- //读取数据
- // 去除数据本身重复
- val df = sql(
- s"""
- |SELECT ${companyCols.mkString(",")}
- |FROM (
- | SELECT a.*
- | ,row_number() OVER (PARTITION BY a.cid ORDER BY update_time DESC) c
- | FROM (
- | SELECT *
- | FROM $project.inc_ods_company
- | WHERE ds > $start_partition and ds <= $end_partition and cid is not null
- | ) as a
- | ) AS tmp
- |WHERE tmp.c = 1
- |""".stripMargin)
- df.cache().createOrReplaceTempView("tmp_company_inc")
- //写出到ads
- sql(
- s"""
- |INSERT ${if (BaseUtil.isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.inc_ads_company PARTITION(ds='$end_partition')
- |SELECT ${companyCols.mkString(",")}
- |FROM
- | tmp_company_inc
- |""".stripMargin)
- //写出到hbase
- import org.apache.spark.sql.functions.col
- val jobConf = HBaseConfig.HBaseOutputJobConf("COMPANY")
- val stringDf = df.select(companyCols.map(column => col(column).cast("string")): _*)
- stringDf.rdd.map(row => {
- val id = row.getAs[String]("cid")
- val put = new Put(Bytes.toBytes(id))
- for (f <- outFields) {
- val v = row.getAs[String](f.toLowerCase)
- if (v != null) {
- put.addColumn(BaseConst.F_BYTES, Bytes.toBytes(f), Bytes.toBytes(v))
- }
- }
- (new ImmutableBytesWritable, put)
- }).filter(_ != null)
- .saveAsHadoopDataset(jobConf)
- //写出到es
- import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper._
- stringDf.companyIndexSave2Es(code._1, code._2)
- }
- }
- case class Company_Human_Relation2HBase(s: SparkSession, project: String, bizDate: String) extends LoggingUtils {
- @(transient@getter) val spark: SparkSession = s
- def calc() {
- val partition = bizDate.replaceAll("\\-", "")
- if (partition.length != 8) {
- println("biz date is error!")
- sys.exit(-99)
- }
- val inc_ods_partitions = BaseUtil.getPartitions(s"${project}.inc_ods_company_human_relation", spark)
- val end_partition = if (inc_ods_partitions.isEmpty) partition else inc_ods_partitions.last
- val inc_ads_partitions = BaseUtil.getPartitions(s"${project}.inc_ads_company_human_relation", spark)
- val start_partition = if (inc_ads_partitions.isEmpty) '0' else inc_ads_partitions.last
- if (start_partition.equals(end_partition)) {
- println("start_partition == end_partition")
- sys.exit(-999)
- }
- val companyCols = spark.table(s"${project}.ads_company_human_relation").columns
- .filter(!_.equals("ds"))
- .seq
- //读取数据
- // 去除数据本身重复
- val df = sql(
- s"""
- |SELECT ${companyCols.mkString(",")}
- |FROM (
- | SELECT CONCAT_WS("_",new_cid,hid) AS rowkey,a.*
- | ,row_number() OVER (PARTITION BY a.cid,a.hid,a.human_pid ORDER BY update_time DESC) c
- | FROM (
- | SELECT *,cid as new_cid
- | FROM $project.inc_ods_company_human_relation
- | WHERE ds > $start_partition and ds <= $end_partition and cid is not null and hid is not null
- | ) as a
- | ) AS tmp
- |WHERE tmp.c = 1
- |""".stripMargin)
- df.cache().createOrReplaceTempView("tmp_company_human_relation_inc")
- //写出到ads
- sql(
- s"""
- |INSERT ${if (BaseUtil.isWindows) "INTO" else "OVERWRITE"} TABLE ${project}.inc_ads_company_human_relation PARTITION(ds='$end_partition')
- |SELECT ${companyCols.mkString(",")}
- |FROM
- | tmp_company_human_relation_inc
- |""".stripMargin)
- //写出到hbase
- import org.apache.spark.sql.functions.col
- val jobConf = HBaseConfig.HBaseOutputJobConf("COMPANY_HUMAN_RELATION")
- val stringDf = df.select(companyCols.map(column => col(column).cast("string")): _*)
- stringDf.rdd.map(row => {
- val id = row.getAs[String]("rowkey")
- val put = new Put(Bytes.toBytes(id))
- for (f <- outFields_Human) {
- val v = row.getAs[String](f.toLowerCase)
- if (v != null) {
- put.addColumn(BaseConst.F_BYTES, Bytes.toBytes(f), Bytes.toBytes(v))
- }
- }
- (new ImmutableBytesWritable, put)
- }).filter(_ != null)
- .saveAsHadoopDataset(jobConf)
- }
- }
- def main(args: Array[String]): Unit = {
- if (args.length != 2) {
- println("please enter project and bizDate!")
- sys.exit(-99)
- }
- val Array(project, bizDate) = args
- val config = EsConfig.getEsConfigMap ++ mutable.Map(
- "spark.hadoop.odps.project.name" -> project,
- "spark.hadoop.odps.spark.local.partition.amt" -> "10"
- )
- val spark = SparkUtils.InitEnv("company2Es", config)
- Company2Es(spark, project, bizDate).calc
- Company_Human_Relation2HBase(spark, project, bizDate).calc
- spark.stop()
- }
- }
|