|
@@ -0,0 +1,200 @@
|
|
|
+package com.winhc.bigdata.spark.jobs
|
|
|
+
|
|
|
+import com.aliyun.odps.utils.StringUtils
|
|
|
+import com.winhc.bigdata.spark.utils.SparkUtils
|
|
|
+import org.apache.hadoop.hbase.client.{Get, Put, Table}
|
|
|
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
|
|
|
+import org.apache.hadoop.hbase.spark.HBaseContext
|
|
|
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
|
|
|
+import org.apache.hadoop.hbase.util.Bytes
|
|
|
+import org.apache.hadoop.hbase.{CellUtil, TableName}
|
|
|
+import org.apache.spark.internal.Logging
|
|
|
+import org.apache.spark.sql.Row
|
|
|
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
|
|
+
|
|
|
+import scala.collection.mutable
|
|
|
+import scala.collection.mutable.Set
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Author: XuJiakai
|
|
|
+ * @Date: 2020/5/26 10:41
|
|
|
+ * @Description: 公司名称映射关系计算
|
|
|
+ */
|
|
|
+object CompanyNameMappingPro extends Logging {
|
|
|
+ val f_bytes: Array[Byte] = Bytes.toBytes("f")
|
|
|
+ val cname_bytes: Array[Byte] = Bytes.toBytes("name")
|
|
|
+ val current_cid_bytes: Array[Byte] = Bytes.toBytes("current_cid")
|
|
|
+
|
|
|
+ private def getCurrentIdAndName(table: Table, cid: String): (String, String) = {
|
|
|
+ var current_cid: String = cid
|
|
|
+ var name: String = null
|
|
|
+ val mutableSet = Set(cid)
|
|
|
+ while (true) {
|
|
|
+ val result = table.get(new Get(Bytes.toBytes(current_cid)))
|
|
|
+ if (result.isEmpty) {
|
|
|
+ return (null, null)
|
|
|
+ }
|
|
|
+ val tmp_name = Bytes.toString(CellUtil.cloneValue(result.getColumnCells(f_bytes, cname_bytes) get (0)))
|
|
|
+ var tmp_current_cid: String = null
|
|
|
+ try {
|
|
|
+ tmp_current_cid = Bytes.toString(CellUtil.cloneValue(result.getColumnCells(f_bytes, current_cid_bytes) get (0)))
|
|
|
+ } catch {
|
|
|
+ case e: Exception => {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (StringUtils.isEmpty(tmp_current_cid)) {
|
|
|
+ return (current_cid, tmp_name)
|
|
|
+ }
|
|
|
+
|
|
|
+ if (mutableSet.contains(tmp_current_cid)) {
|
|
|
+ return (null, null)
|
|
|
+ }
|
|
|
+ mutableSet += tmp_current_cid
|
|
|
+ current_cid = tmp_current_cid
|
|
|
+ name = tmp_name
|
|
|
+ }
|
|
|
+ (current_cid, name)
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val map = mutable.Map[String, String](
|
|
|
+ "spark.hadoop.odps.cupid.vpc.domain.list" -> "{\"regionId\":\"cn-shanghai\",\"vpcs\":[{\"vpcId\":\"vpc-11hby9xee\",\"zones\":[{\"urls\":[{\"domain\":\"dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com\",\"port\":3717},{\"domain\":\"dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com\",\"port\":3717},{\"domain\":\"hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com\",\"port\":2181}]}]}]}"
|
|
|
+ )
|
|
|
+
|
|
|
+ val hbaseKVTable = "company_name_kv"
|
|
|
+ val inputTable = "new_ods_company"
|
|
|
+ val resultTable = "company_name_mapping_pro"
|
|
|
+ val tmpResultTable = "company_name_mapping_pro_tmp"
|
|
|
+
|
|
|
+
|
|
|
+ val spark = SparkUtils.InitEnv("CompanyNameMapping", map)
|
|
|
+ import spark._
|
|
|
+ /**
|
|
|
+ * 写hbase,供查询
|
|
|
+ */
|
|
|
+ val jobConf = SparkUtils.HBaseOutputJobConf(hbaseKVTable)
|
|
|
+
|
|
|
+ val df = sql(s"select cid,name,current_cid from $inputTable")
|
|
|
+ df.rdd.map(row => {
|
|
|
+ val id = row(0).asInstanceOf[Long].toString
|
|
|
+ val name = row(1).asInstanceOf[String]
|
|
|
+ val current_cid = row(2).asInstanceOf[Long].toString
|
|
|
+ val put = new Put(Bytes.toBytes(id))
|
|
|
+ put.addColumn(f_bytes, cname_bytes, Bytes.toBytes(name))
|
|
|
+ if (!"0".equals(current_cid))
|
|
|
+ put.addColumn(f_bytes, current_cid_bytes, Bytes.toBytes(current_cid))
|
|
|
+ (new ImmutableBytesWritable, put)
|
|
|
+ }).saveAsHadoopDataset(jobConf)
|
|
|
+
|
|
|
+ logInfo("save hbase success!")
|
|
|
+
|
|
|
+
|
|
|
+ val df_old = sql(
|
|
|
+ s"""
|
|
|
+ |SELECT cid
|
|
|
+ | ,name
|
|
|
+ | ,current_cid
|
|
|
+ |FROM $inputTable
|
|
|
+ |WHERE current_cid IS NOT NULL AND cid IS NOT NULL
|
|
|
+ |""".stripMargin)
|
|
|
+
|
|
|
+ val hbaseContext = new HBaseContext(spark.sparkContext, jobConf)
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 查hbase,找到最新的公司id
|
|
|
+ */
|
|
|
+ val tmp_rdd = df_old.rdd.hbaseMapPartitions(hbaseContext, (f, con) => {
|
|
|
+ val table = con.getTable(TableName.valueOf(hbaseKVTable))
|
|
|
+ val rdd_par = f.map(record => {
|
|
|
+ try {
|
|
|
+ val cid = record.getLong(0).toString
|
|
|
+ val name = record.getString(1)
|
|
|
+ val current_cid = record.getAs[Long](2).toString
|
|
|
+ val (res_cid, res_name) = getCurrentIdAndName(table, current_cid)
|
|
|
+ Row(cid, name, current_cid, res_cid, res_name)
|
|
|
+ } catch {
|
|
|
+ case e: Exception => {
|
|
|
+ logWarning(record.toString())
|
|
|
+ logError(e.getMessage, e)
|
|
|
+ }
|
|
|
+ null
|
|
|
+ }
|
|
|
+ })
|
|
|
+ table.close()
|
|
|
+ rdd_par
|
|
|
+ }).filter(_ != null)
|
|
|
+
|
|
|
+ val schema = StructType(Array(
|
|
|
+ StructField("cid", StringType),
|
|
|
+ StructField("cname", StringType),
|
|
|
+ StructField("current_cid", StringType),
|
|
|
+ StructField("new_cid", StringType),
|
|
|
+ StructField("new_cname", StringType)))
|
|
|
+
|
|
|
+ val tmp_df = spark.createDataFrame(tmp_rdd, schema)
|
|
|
+
|
|
|
+ tmp_df.createTempView(tmpResultTable) //注册临时表
|
|
|
+
|
|
|
+ logInfo("new_cid add success")
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生成id的曾(现)用名列表
|
|
|
+ */
|
|
|
+ import spark.implicits._
|
|
|
+ val id_other = sql(
|
|
|
+ s"""
|
|
|
+ |SELECT new_cid
|
|
|
+ | ,concat_ws(
|
|
|
+ | ';'
|
|
|
+ | ,collect_set(concat_ws(':', cid, cname))
|
|
|
+ | ) other_id_name
|
|
|
+ |FROM $tmpResultTable
|
|
|
+ |GROUP BY new_cid
|
|
|
+ |""".stripMargin)
|
|
|
+ .flatMap(r => {
|
|
|
+ val new_cid = r.getAs[String]("new_cid")
|
|
|
+ val other_id_name = r.getAs[String]("other_id_name")
|
|
|
+ val other_id_name_map = other_id_name.split(";").map(str => (str.split(":")(0)) -> str.split(":")(1)).toMap
|
|
|
+
|
|
|
+ other_id_name.split(";").map(str => (str.split(":")(0), other_id_name_map)).:+((new_cid, other_id_name_map)).toSeq
|
|
|
+ }).filter(_ != null).toDF("cid", "other_id_name")
|
|
|
+
|
|
|
+ id_other.printSchema()
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 补全数据,加入未发生过名称变更的公司
|
|
|
+ */
|
|
|
+ var tmp_df2 = sql(
|
|
|
+ s"""
|
|
|
+ |SELECT cid
|
|
|
+ | ,name as cname
|
|
|
+ | ,current_cid
|
|
|
+ |FROM $inputTable
|
|
|
+ |WHERE current_cid IS NULL AND cid IS NOT NULL
|
|
|
+ |""".stripMargin)
|
|
|
+ .toDF("cid", "cname", "current_cid")
|
|
|
+ tmp_df2 = tmp_df2.withColumn("new_cid", tmp_df2.col("cid"))
|
|
|
+ tmp_df2 = tmp_df2.withColumn("new_cname", tmp_df2.col("cname"))
|
|
|
+
|
|
|
+ val tmp_df3 = tmp_df.union(tmp_df2)
|
|
|
+
|
|
|
+ /**
|
|
|
+ * 生成结果表
|
|
|
+ */
|
|
|
+ val resultDf =
|
|
|
+ tmp_df3.join(id_other, Seq("cid"), "left").select(
|
|
|
+ "cid"
|
|
|
+ , "cname"
|
|
|
+ // , s"current_cid"
|
|
|
+ , "other_id_name"
|
|
|
+ , "new_cid"
|
|
|
+ , "new_cname"
|
|
|
+ )
|
|
|
+ resultDf.printSchema()
|
|
|
+ resultDf.write.mode("append").insertInto(resultTable)
|
|
|
+
|
|
|
+ logInfo("CompanyNameMapping success")
|
|
|
+ spark.stop()*/
|
|
|
+ }
|
|
|
+}
|