|
@@ -1,7 +1,7 @@
|
|
|
package com.winhc.bigdata.spark.jobs
|
|
|
|
|
|
import com.aliyun.odps.utils.StringUtils
|
|
|
-import com.winhc.bigdata.spark.utils.{HBaseUtils, SparkUtils}
|
|
|
+import com.winhc.bigdata.spark.utils.{CompanyNameMappingUtil, HBaseUtils, SparkUtils}
|
|
|
import org.apache.hadoop.hbase.client.{Get, Put, Table}
|
|
|
import org.apache.hadoop.hbase.io.ImmutableBytesWritable
|
|
|
import org.apache.hadoop.hbase.spark.HBaseContext
|
|
@@ -25,67 +25,40 @@ object CompanyNameMappingPro extends Logging {
|
|
|
val cname_bytes: Array[Byte] = Bytes.toBytes("name")
|
|
|
val current_cid_bytes: Array[Byte] = Bytes.toBytes("current_cid")
|
|
|
|
|
|
- private def getCurrentIdAndName(table: Table, cid: String): (String, String) = {
|
|
|
- var current_cid: String = cid
|
|
|
- var name: String = null
|
|
|
- val mutableSet = Set(cid)
|
|
|
- while (true) {
|
|
|
- val result = table.get(new Get(Bytes.toBytes(current_cid)))
|
|
|
- if (result.isEmpty) {
|
|
|
- return (null, null)
|
|
|
- }
|
|
|
- val tmp_name = Bytes.toString(CellUtil.cloneValue(result.getColumnCells(f_bytes, cname_bytes) get (0)))
|
|
|
- var tmp_current_cid: String = null
|
|
|
- try {
|
|
|
- tmp_current_cid = Bytes.toString(CellUtil.cloneValue(result.getColumnCells(f_bytes, current_cid_bytes) get (0)))
|
|
|
- } catch {
|
|
|
- case e: Exception => {
|
|
|
- }
|
|
|
- }
|
|
|
- if (StringUtils.isEmpty(tmp_current_cid)) {
|
|
|
- return (current_cid, tmp_name)
|
|
|
- }
|
|
|
-
|
|
|
- if (mutableSet.contains(tmp_current_cid)) {
|
|
|
- return (null, null)
|
|
|
- }
|
|
|
- mutableSet += tmp_current_cid
|
|
|
- current_cid = tmp_current_cid
|
|
|
- name = tmp_name
|
|
|
- }
|
|
|
- (current_cid, name)
|
|
|
- }
|
|
|
-
|
|
|
-
|
|
|
def main(args: Array[String]): Unit = {
|
|
|
-
|
|
|
- import com.winhc.bigdata.spark.utils.BaseUtil.getExecutorConfigOrExit
|
|
|
- val map = getExecutorConfigOrExit(args)
|
|
|
+ val map = mutable.Map[String, String](
|
|
|
+ "spark.hadoop.odps.spark.local.partition.amt" -> "10"
|
|
|
+ )
|
|
|
+// import com.winhc.bigdata.spark.utils.BaseUtil.getExecutorConfigOrExit
|
|
|
+// val map = getExecutorConfigOrExit(args)
|
|
|
|
|
|
val hbaseKVTable = "company_name_kv"
|
|
|
- val inputTable = "new_ods_company"
|
|
|
- val resultTable = "company_name_mapping_pro"
|
|
|
- val tmpResultTable = "company_name_mapping_pro_tmp"
|
|
|
+ val inputTable = "ods_company"
|
|
|
+ val resultTable = "company_name_mapping_pro_v2"
|
|
|
+ val tmpResultTable = "company_name_mapping_pro_tmp_2"
|
|
|
|
|
|
|
|
|
- val spark = SparkUtils.InitEnv("CompanyNameMapping", map)
|
|
|
+ val spark = SparkUtils.InitEnv("CompanyNameMapping",map)
|
|
|
import spark._
|
|
|
/**
|
|
|
* 写hbase,供查询
|
|
|
*/
|
|
|
val jobConf = HBaseUtils.HBaseOutputJobConf(hbaseKVTable)
|
|
|
|
|
|
- val df = sql(s"select cid,name,current_cid from $inputTable")
|
|
|
- df.rdd.map(row => {
|
|
|
- val id = row(0).asInstanceOf[Long].toString
|
|
|
- val name = row(1).asInstanceOf[String]
|
|
|
- val current_cid = row(2).asInstanceOf[Long].toString
|
|
|
- val put = new Put(Bytes.toBytes(id))
|
|
|
- put.addColumn(f_bytes, cname_bytes, Bytes.toBytes(name))
|
|
|
- if (!"0".equals(current_cid))
|
|
|
- put.addColumn(f_bytes, current_cid_bytes, Bytes.toBytes(current_cid))
|
|
|
- (new ImmutableBytesWritable, put)
|
|
|
- }).saveAsHadoopDataset(jobConf)
|
|
|
+ /* val df = sql(s"select cid,name,current_cid from $inputTable where cid is not null")
|
|
|
+ df.rdd.map(row => {
|
|
|
+ val id = row(0).asInstanceOf[Long].toString
|
|
|
+ val name = row(1).asInstanceOf[String]
|
|
|
+ val current_cid = row(2).asInstanceOf[Long].toString
|
|
|
+ val put = new Put(Bytes.toBytes(id))
|
|
|
+ if (name != null) {
|
|
|
+ put.addColumn(f_bytes, cname_bytes, Bytes.toBytes(name))
|
|
|
+ }
|
|
|
+ if (!"0".equals(current_cid)) {
|
|
|
+ put.addColumn(f_bytes, current_cid_bytes, Bytes.toBytes(current_cid))
|
|
|
+ }
|
|
|
+ (new ImmutableBytesWritable, put)
|
|
|
+ }).saveAsHadoopDataset(jobConf)*/
|
|
|
|
|
|
logInfo("save hbase success!")
|
|
|
|
|
@@ -97,10 +70,9 @@ object CompanyNameMappingPro extends Logging {
|
|
|
| ,current_cid
|
|
|
|FROM $inputTable
|
|
|
|WHERE current_cid IS NOT NULL AND cid IS NOT NULL
|
|
|
- |""".stripMargin)
|
|
|
+ |""".stripMargin).repartition(500)
|
|
|
|
|
|
val hbaseContext = new HBaseContext(spark.sparkContext, jobConf)
|
|
|
-
|
|
|
/**
|
|
|
* 查hbase,找到最新的公司id
|
|
|
*/
|
|
@@ -111,7 +83,7 @@ object CompanyNameMappingPro extends Logging {
|
|
|
val cid = record.getLong(0).toString
|
|
|
val name = record.getString(1)
|
|
|
val current_cid = record.getAs[Long](2).toString
|
|
|
- val (res_cid, res_name) = getCurrentIdAndName(table, current_cid)
|
|
|
+ val (res_cid, res_name) = CompanyNameMappingUtil.getCurrentIdAndName(table, current_cid)
|
|
|
Row(cid, name, current_cid, res_cid, res_name)
|
|
|
} catch {
|
|
|
case e: Exception => {
|
|
@@ -121,7 +93,7 @@ object CompanyNameMappingPro extends Logging {
|
|
|
null
|
|
|
}
|
|
|
})
|
|
|
- table.close()
|
|
|
+// table.close()
|
|
|
rdd_par
|
|
|
}).filter(_ != null)
|
|
|
|
|
@@ -134,7 +106,8 @@ object CompanyNameMappingPro extends Logging {
|
|
|
|
|
|
val tmp_df = spark.createDataFrame(tmp_rdd, schema)
|
|
|
|
|
|
- tmp_df.createTempView(tmpResultTable) //注册临时表
|
|
|
+ tmp_df.write.mode("overwrite").insertInto(tmpResultTable)
|
|
|
+// tmp_df.createTempView(tmpResultTable) //注册临时表
|
|
|
|
|
|
logInfo("new_cid add success")
|
|
|
|
|
@@ -153,14 +126,26 @@ object CompanyNameMappingPro extends Logging {
|
|
|
|GROUP BY new_cid
|
|
|
|""".stripMargin)
|
|
|
.flatMap(r => {
|
|
|
- val new_cid = r.getAs[String]("new_cid")
|
|
|
- val other_id_name = r.getAs[String]("other_id_name")
|
|
|
- val other_id_name_map = other_id_name.split(";").map(str => (str.split(":")(0)) -> str.split(":")(1)).toMap
|
|
|
+ try {
|
|
|
+ val new_cid = r.getAs[String]("new_cid")
|
|
|
+ val other_id_name = r.getAs[String]("other_id_name")
|
|
|
+ val other_id_name_map = other_id_name.split(";").map(str => (str.split(":")(0)) -> str.split(":")(1)).toMap
|
|
|
|
|
|
- other_id_name.split(";").map(str => (str.split(":")(0), other_id_name_map)).:+((new_cid, other_id_name_map)).toSeq
|
|
|
+ other_id_name
|
|
|
+ .split(";")
|
|
|
+ .map(str => (str.split(":")(0), other_id_name_map)).:+((new_cid, other_id_name_map)).toSeq
|
|
|
+ } catch {
|
|
|
+ case e: Exception => {
|
|
|
+ logError(e.getMessage)
|
|
|
+ }
|
|
|
+ logError(r.getAs[String]("new_cid"))
|
|
|
+ logError(r.getAs[String]("other_id_name"))
|
|
|
+ null
|
|
|
+ }
|
|
|
}).filter(_ != null).toDF("cid", "other_id_name")
|
|
|
|
|
|
id_other.printSchema()
|
|
|
+ id_other.show()
|
|
|
|
|
|
/**
|
|
|
* 补全数据,加入未发生过名称变更的公司
|
|
@@ -186,7 +171,6 @@ object CompanyNameMappingPro extends Logging {
|
|
|
tmp_df3.join(id_other, Seq("cid"), "left").select(
|
|
|
"cid"
|
|
|
, "cname"
|
|
|
- // , s"current_cid"
|
|
|
, "other_id_name"
|
|
|
, "new_cid"
|
|
|
, "new_cname"
|