|
@@ -57,14 +57,18 @@ object CompanyDynamic {
|
|
|
|
|
|
//表名(不加前后辍)
|
|
|
def calc(tableName: String
|
|
|
- , bName: Boolean = false //是否补充cname字段
|
|
|
+ , bName: Int = 0 //是否补充cname字段
|
|
|
): Unit = {
|
|
|
val handle = getClazz[CompanyDynamicHandle](s"com.winhc.bigdata.spark.jobs.dynamic.tables.$tableName")
|
|
|
|
|
|
val types = handle.org_type()
|
|
|
+ val colsExclusiveSome = spark.table(s"${project}.ads_change_extract").columns.filter(s => {
|
|
|
+ !s.equals("cid") && !s.equals("data") && !s.equals("old_data") && !s.equals("ds") && !s.equals("tn")
|
|
|
+ }).seq
|
|
|
val rdd = sql(
|
|
|
bName match {
|
|
|
- case false =>
|
|
|
+//默认:无需补全cname字段
|
|
|
+ case 0 =>
|
|
|
s"""
|
|
|
|SELECT *,null AS cname
|
|
|
|FROM ${project}.ads_change_extract
|
|
@@ -72,7 +76,8 @@ object CompanyDynamic {
|
|
|
|AND tn = '$tableName'
|
|
|
|AND TYPE in (${types.map("'" + _ + "'").mkString(",")})
|
|
|
|""".stripMargin
|
|
|
- case true =>
|
|
|
+//需根据cid补全cname字段数据
|
|
|
+ case 1 =>
|
|
|
s"""
|
|
|
|SELECT A.*,B.cname AS cname
|
|
|
|FROM(
|
|
@@ -88,6 +93,42 @@ object CompanyDynamic {
|
|
|
|) AS B
|
|
|
|ON A.cid = B.cid
|
|
|
|""".stripMargin
|
|
|
+//既需根据cid补全cname字段数据;还需在data字段中根据其中的cid添加对应cname数据
|
|
|
+ case 2 =>
|
|
|
+ s"""
|
|
|
+ |SELECT cid, ${colsExclusiveSome.mkString(",")},old_data, cname, str_to_map(replace(concat_ws(',',data),'},{',',')) AS data
|
|
|
+ |FROM(
|
|
|
+ | SELECT new_cid AS cid, ${colsExclusiveSome.mkString(",")},to_json(old_data) AS old_data, COLLECT_SET(cname)[0] AS cname, COLLECT_SET(to_json(data)) AS data
|
|
|
+ | FROM(
|
|
|
+ | SELECT new_cid, ${colsExclusiveSome.map("A." + _).mkString(",")}, A.cid, IF(A.data IS NULL, STR_TO_MAP(CONCAT('cname:',cname),',',':'), A.data) AS data, old_data, IF(A.new_cid=A.cid,B.cname,null) AS cname
|
|
|
+ | FROM(
|
|
|
+ | SELECT cid AS new_cid, cid, ${colsExclusiveSome.mkString(",")},data, old_data
|
|
|
+ | FROM(
|
|
|
+ | SELECT *
|
|
|
+ | FROM ${project}.ads_change_extract
|
|
|
+ | WHERE ds = '$ds'
|
|
|
+ | AND tn = '$tableName'
|
|
|
+ | AND TYPE in (${types.map("'" + _ + "'").mkString(",")})
|
|
|
+ | )
|
|
|
+ | UNION ALL
|
|
|
+ | SELECT cid AS new_cid, data['cid'] AS cid, ${colsExclusiveSome.mkString(",")},null AS data, old_data
|
|
|
+ | FROM(
|
|
|
+ | SELECT *
|
|
|
+ | FROM ${project}.ads_change_extract
|
|
|
+ | WHERE ds = '$ds'
|
|
|
+ | AND tn = '$tableName'
|
|
|
+ | AND TYPE in (${types.map("'" + _ + "'").mkString(",")})
|
|
|
+ | )
|
|
|
+ | ) AS A
|
|
|
+ | LEFT JOIN (
|
|
|
+ | SELECT cid,cname FROM $project.base_company_mapping
|
|
|
+ | WHERE ds = '${getLastPartitionsOrElse(project + ".base_company_mapping", "0")}'
|
|
|
+ | ) AS B
|
|
|
+ | ON A.cid = B.cid
|
|
|
+ | )
|
|
|
+ | GROUP BY new_cid, ${colsExclusiveSome.mkString(",")},to_json(old_data)
|
|
|
+ |)
|
|
|
+ |""".stripMargin
|
|
|
})
|
|
|
.rdd.flatMap(r => {
|
|
|
val rowkey = r.getAs[String]("rowkey")
|
|
@@ -145,7 +186,7 @@ object CompanyDynamic {
|
|
|
sys.exit(-99)
|
|
|
}
|
|
|
|
|
|
- val Array(project, ds, tableName, bName_table) = if (args.length == 4) args else args :+ ""
|
|
|
+ val Array(project, tableName, ds, bName_table) = if (args.length == 4) args else args :+ "0"
|
|
|
|
|
|
println(
|
|
|
s"""
|
|
@@ -162,15 +203,17 @@ object CompanyDynamic {
|
|
|
val cd = CompanyDynamicUtil(spark, project, ds)
|
|
|
|
|
|
cd.init()
|
|
|
+/*
|
|
|
|
|
|
for (e <- tableName.split(",")) {
|
|
|
if (e.length > 2) {
|
|
|
- cd.calc(e)
|
|
|
+ cd.calc(e, bName_table.toInt)
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- for (e <- bName_table.split(",")) {
|
|
|
- cd.calc(e, bName = true)
|
|
|
+*/
|
|
|
+ for (e <- tableName.split(",")) {
|
|
|
+ cd.calc(e, bName_table.toInt)
|
|
|
}
|
|
|
spark.stop()
|
|
|
}
|