|
@@ -0,0 +1,126 @@
|
|
|
+package com.winhc.bigdata.spark.jobs
|
|
|
+
|
|
|
+import com.aliyun.odps.utils.StringUtils
|
|
|
+import com.winhc.bigdata.spark.utils.SparkUtils
|
|
|
+import org.apache.hadoop.hbase.client.{Get, Put, Table}
|
|
|
+import org.apache.hadoop.hbase.io.ImmutableBytesWritable
|
|
|
+import org.apache.hadoop.hbase.spark.HBaseContext
|
|
|
+import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
|
|
|
+import org.apache.hadoop.hbase.util.Bytes
|
|
|
+import org.apache.hadoop.hbase.{CellUtil, TableName}
|
|
|
+import org.apache.spark.internal.Logging
|
|
|
+import org.apache.spark.sql.Row
|
|
|
+import org.apache.spark.sql.types.{StringType, StructField, StructType}
|
|
|
+
|
|
|
+import scala.collection.mutable
|
|
|
+import scala.collection.mutable.Set
|
|
|
+
|
|
|
+/**
|
|
|
+ * @Author: XuJiakai
|
|
|
+ * @Date: 2020/5/26 10:41
|
|
|
+ * @Description: 公司名称映射关系计算
|
|
|
+ */
|
|
|
+object CompanyNameMapping extends Logging {
|
|
|
+ val f_bytes: Array[Byte] = Bytes.toBytes("f")
|
|
|
+ val name_bytes: Array[Byte] = Bytes.toBytes("name")
|
|
|
+ val current_cid_bytes: Array[Byte] = Bytes.toBytes("current_cid")
|
|
|
+
|
|
|
+
|
|
|
+ def getCurrentIdAndName(table: Table, cid: String): (String, String) = {
|
|
|
+ var current_cid: String = cid
|
|
|
+ var name: String = null
|
|
|
+ val mutableSet = Set(cid)
|
|
|
+ while (true) {
|
|
|
+ val result = table.get(new Get(Bytes.toBytes(current_cid)))
|
|
|
+
|
|
|
+ if (result.isEmpty) {
|
|
|
+ return (null, null)
|
|
|
+ }
|
|
|
+ val tmp_name = Bytes.toString(CellUtil.cloneValue(result.getColumnCells(f_bytes, name_bytes) get (0)))
|
|
|
+ var tmp_current_cid: String = null
|
|
|
+ try {
|
|
|
+ tmp_current_cid = Bytes.toString(CellUtil.cloneValue(result.getColumnCells(f_bytes, current_cid_bytes) get (0)))
|
|
|
+ } catch {
|
|
|
+ case e: Exception => {
|
|
|
+ }
|
|
|
+ }
|
|
|
+ if (StringUtils.isEmpty(tmp_current_cid)) {
|
|
|
+ return (current_cid, tmp_name)
|
|
|
+ }
|
|
|
+
|
|
|
+ if (mutableSet.contains(tmp_current_cid)) {
|
|
|
+ return (null, null)
|
|
|
+ }
|
|
|
+ mutableSet += tmp_current_cid
|
|
|
+ current_cid = tmp_current_cid
|
|
|
+ name = tmp_name
|
|
|
+ }
|
|
|
+ (current_cid, name)
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
+ def main(args: Array[String]): Unit = {
|
|
|
+ val map = mutable.Map[String, String](
|
|
|
+ "spark.hadoop.odps.cupid.vpc.domain.list" -> "{\"regionId\":\"cn-shanghai\",\"vpcs\":[{\"vpcId\":\"vpc-11hby9xee\",\"zones\":[{\"urls\":[{\"domain\":\"dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com\",\"port\":3717},{\"domain\":\"dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com\",\"port\":3717},{\"domain\":\"hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com\",\"port\":2181}]}]}]}"
|
|
|
+ )
|
|
|
+
|
|
|
+ val hbaseKVTable = "company_name_kv"
|
|
|
+ val resultTable = "company_name_mapping_2"
|
|
|
+ val inputTable = "new_ods_company"
|
|
|
+
|
|
|
+ val spark = SparkUtils.InitEnv("CompanyNameMapping", map)
|
|
|
+ import spark._
|
|
|
+ val df = sql(s"select cid,name,current_cid from $inputTable")
|
|
|
+
|
|
|
+ val jobConf = SparkUtils.HBaseOutputJobConf(hbaseKVTable)
|
|
|
+ df.rdd.map(row => {
|
|
|
+ val id = row(0).asInstanceOf[Long].toString
|
|
|
+ val name = row(1).asInstanceOf[String]
|
|
|
+ val current_cid = row(2).asInstanceOf[Long].toString
|
|
|
+ val put = new Put(Bytes.toBytes(id))
|
|
|
+ put.addColumn(f_bytes, name_bytes, Bytes.toBytes(name))
|
|
|
+ if (!"0".equals(current_cid))
|
|
|
+ put.addColumn(f_bytes, current_cid_bytes, Bytes.toBytes(current_cid))
|
|
|
+ (new ImmutableBytesWritable, put)
|
|
|
+ }).saveAsHadoopDataset(jobConf)
|
|
|
+
|
|
|
+ logInfo("save hbase success!")
|
|
|
+
|
|
|
+ val df_old = sql(s"select cid,name,current_cid from $inputTable where current_cid is not null")
|
|
|
+
|
|
|
+ val hbaseContext = new HBaseContext(spark.sparkContext, jobConf)
|
|
|
+
|
|
|
+ val res_rdd = df_old.rdd.hbaseMapPartitions(hbaseContext, (f, con) => {
|
|
|
+ val table = con.getTable(TableName.valueOf(hbaseKVTable))
|
|
|
+ val rdd_par = f.map(record => {
|
|
|
+ try {
|
|
|
+ val cid = record.getLong(0).toString
|
|
|
+ val name = record.getString(1)
|
|
|
+ val current_cid = record.getLong(2).toString
|
|
|
+ val (res_cid, res_name) = getCurrentIdAndName(table, current_cid)
|
|
|
+ Row(cid, name, current_cid, res_cid, res_name)
|
|
|
+ } catch {
|
|
|
+ case e: Exception => {
|
|
|
+ logWarning(record.toString())
|
|
|
+ logError(e.getMessage, e)
|
|
|
+ }
|
|
|
+ null
|
|
|
+ }
|
|
|
+ })
|
|
|
+ table.close()
|
|
|
+ rdd_par
|
|
|
+ }).filter(_ != null)
|
|
|
+
|
|
|
+ val schema = StructType(Array(
|
|
|
+ StructField("cid", StringType),
|
|
|
+ StructField("name", StringType),
|
|
|
+ StructField("current_cid", StringType),
|
|
|
+ StructField("res_cid", StringType),
|
|
|
+ StructField("res_name", StringType)))
|
|
|
+
|
|
|
+ val res_df = spark.createDataFrame(res_rdd, schema)
|
|
|
+ res_df.write.mode("append").insertInto(resultTable)
|
|
|
+ logInfo("CompanyNameMapping success")
|
|
|
+ spark.stop()
|
|
|
+ }
|
|
|
+}
|