|
@@ -2,9 +2,10 @@ package com.winhc.bigdata.spark.udf
|
|
|
|
|
|
import com.alibaba.fastjson.{JSON, JSONArray, JSONPath}
|
|
|
import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper
|
|
|
+import com.winhc.bigdata.spark.implicits.AreaCode2NameHelper._
|
|
|
import com.winhc.bigdata.spark.utils.BaseUtil._
|
|
|
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils}
|
|
|
-import org.apache.commons.lang3.StringUtils
|
|
|
+import com.winhc.bigdata.spark.utils.{AreaCode2Name, BaseUtil, LoggingUtils, area_code_org}
|
|
|
+import org.apache.commons.lang3.{StringEscapeUtils, StringUtils}
|
|
|
import org.apache.spark.broadcast.Broadcast
|
|
|
import org.apache.spark.sql.SparkSession
|
|
|
import org.json4s._
|
|
@@ -29,6 +30,35 @@ trait BaseFunc extends LoggingUtils {
|
|
|
})
|
|
|
}*/
|
|
|
|
|
|
+
|
|
|
+ def json_parse_udf(): Unit = {
|
|
|
+ /**
|
|
|
+ *
|
|
|
+ * @param json_array
|
|
|
+ * @param json_path "$.name"
|
|
|
+ * @return
|
|
|
+ */
|
|
|
+ def json_2_array(json_array: String, json_path: String): Seq[String] = {
|
|
|
+ try {
|
|
|
+ if (StringUtils.isEmpty(json_array)) {
|
|
|
+ return Seq.empty
|
|
|
+ }
|
|
|
+ if (!is_json_str(json_array)) {
|
|
|
+ return Seq.empty
|
|
|
+ }
|
|
|
+ JSONPath.eval(JSON.parse(json_array), json_path).asInstanceOf[JSONArray].toArray[String](Array()).toSeq.distinct.diff(Seq(""))
|
|
|
+ } catch {
|
|
|
+ case e: Exception => {
|
|
|
+ println(json_array)
|
|
|
+ Seq.empty
|
|
|
+ }
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ spark.udf.register("json_2_array", json_2_array _)
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
def addEmptyPartitionOrSkip(tab: String, ds: String): Unit = {
|
|
|
sql(
|
|
|
s"""
|
|
@@ -81,6 +111,30 @@ trait BaseFunc extends LoggingUtils {
|
|
|
spark.udf.register("json_2_array", json_2_array _)
|
|
|
}
|
|
|
|
|
|
+
|
|
|
+ def areaCode2Name_pro(): Unit = {
|
|
|
+ val area_code_df = spark.sql(
|
|
|
+ s"""
|
|
|
+ |select area_code,province,city,district
|
|
|
+ |from winhc_eci_dev.ods_area_code where ds = '20200604'
|
|
|
+ """.stripMargin).collect()
|
|
|
+
|
|
|
+ val array = area_code_df.map(f => area_code_org(f.getString(0), f.getString(1), f.getString(2), f.getString(3)))
|
|
|
+ val map = AreaCode2Name.getAreaCodeTree(array)
|
|
|
+ val broad_map = spark.sparkContext.broadcast(map)
|
|
|
+
|
|
|
+ spark.udf.register("get_area_name_pro", (areaCode: String) => {
|
|
|
+ val t = broad_map.value.get_area_name(areaCode)
|
|
|
+ Seq(t._1, t._2, t._3)
|
|
|
+ })
|
|
|
+
|
|
|
+ spark.udf.register("get_area_name_pro_detail", (provinceCode: String, cityCode: String, countyCode: String) => {
|
|
|
+ val t = broad_map.value.get_area_name(provinceCode, cityCode, countyCode)
|
|
|
+ Seq(t._1, t._2, t._3)
|
|
|
+ })
|
|
|
+ }
|
|
|
+
|
|
|
+
|
|
|
def code2Name(): (Broadcast[Map[String, Seq[String]]], Broadcast[Map[String, Seq[String]]]) = {
|
|
|
val categoryCode2Name = spark.sparkContext.broadcast(spark.sql(
|
|
|
s"""
|
|
@@ -139,7 +193,6 @@ trait BaseFunc extends LoggingUtils {
|
|
|
def unescapeHtml4(): Unit = {
|
|
|
//清理html字符
|
|
|
spark.udf.register("unescapeHtml4", (col: String) => {
|
|
|
- import org.apache.commons.lang3.StringEscapeUtils
|
|
|
StringEscapeUtils.unescapeHtml4(col)
|
|
|
})
|
|
|
}
|