123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175 |
- package com.winhc.bigdata.spark.udf
- import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper
- import com.winhc.bigdata.spark.implicits.RegexUtils._
- import com.winhc.bigdata.spark.utils.BaseUtil
- import com.winhc.bigdata.spark.utils.BaseUtil._
- import org.apache.commons.lang3.StringUtils
- import org.apache.spark.broadcast.Broadcast
- import org.apache.spark.sql.SparkSession
- import org.json4s._
- import org.json4s.jackson.JsonMethods._
- import scala.annotation.meta.getter
- /**
- * @Author: XuJiakai
- * @Date: 2020/7/10 13:49
- * @Description:
- */
- trait BaseFunc {
- @(transient@getter) protected val spark: SparkSession
- private val pattern = "[^\\u4e00-\\u9fa5a-zA-Z \\(\\)().]+".r
- private val id_card_pattern = "^[1-9]\\d{5}(18|19|20)\\d{2}((0[1-9])|(1[0-2])|\\*{2})(([0-2][1-9])|10|20|30|31|\\*{2})\\d{3}[0-9Xx]$".r
- /* def to_epoch_millis_timestamp(): Unit = {
- spark.udf.register("to_epoch_millis_timestamp", (et: String) => {
- DateUtils.toUnixTimestamp(date = et) * 1000 + 28800000L
- })
- }*/
- def case_no_trim_udf(): Unit = {
- spark.udf.register("case_no_trim", case_no_trim _)
- }
- def is_id_card(): Unit = {
- spark.udf.register("is_id_card", (str: String) => id_card_pattern matches str)
- }
- def code2Name(): (Broadcast[Map[String, Seq[String]]], Broadcast[Map[String, Seq[String]]]) = {
- val categoryCode2Name = spark.sparkContext.broadcast(spark.sql(
- s"""
- |select category_code,
- | cate_first,
- | cate_second,
- | cate_third
- |from winhc_eci_dev.ods_category_code
- |where ds = '20200604'
- """.stripMargin).collect().map(r => {
- (r.getString(0), Seq(r.getString(1), r.getString(2), r.getString(3)))
- }).toMap)
- val areaCode2Name = spark.sparkContext.broadcast(spark.sql(
- s"""
- |select area_code,province,city,district
- |from winhc_eci_dev.ods_area_code where ds = '20200604'
- """.stripMargin).collect().map(r => {
- (r.getString(0), Seq(r.getString(1), r.getString(2), r.getString(3)))
- }).toMap)
- spark.udf.register("get_category_first", (code: String) => {
- CompanyIndexSave2EsHelper.get_seq_by_index(categoryCode2Name, code, 0)
- })
- spark.udf.register("get_category_second", (code: String) => {
- CompanyIndexSave2EsHelper.get_seq_by_index(categoryCode2Name, code, 1)
- })
- spark.udf.register("get_category_third", (code: String) => {
- CompanyIndexSave2EsHelper.get_seq_by_index(categoryCode2Name, code, 2)
- })
- spark.udf.register("get_province_name", (code: String) => {
- CompanyIndexSave2EsHelper.get_seq_by_index(areaCode2Name, code, 0)
- })
- spark.udf.register("get_city_name", (code: String) => {
- CompanyIndexSave2EsHelper.get_seq_by_index(areaCode2Name, code, 1)
- })
- spark.udf.register("get_county_name", (code: String) => {
- CompanyIndexSave2EsHelper.get_seq_by_index(areaCode2Name, code, 2)
- })
- (categoryCode2Name, areaCode2Name)
- }
- def unescapeHtml4(): Unit = {
- //清理html字符
- spark.udf.register("unescapeHtml4", (col: String) => {
- import org.apache.commons.lang3.StringEscapeUtils
- StringEscapeUtils.unescapeHtml4(col)
- })
- }
- def cleanup(): Unit = {
- //清理特殊字符
- spark.udf.register("cleanup", (col: String) => {
- BaseUtil.cleanup(col)
- })
- }
- def tyc_split(): Unit = {
- spark.udf.register("tyc_split", (name: String) => {
- if (StringUtils.isEmpty(name)) {
- null
- } else {
- name.split("\t;\t")
- }
- })
- }
- def json_utils(): Unit = {
- spark.udf.register("get_json_kv", (key: String, value: String) => {
- if (StringUtils.isNotBlank(value)) {
- "\"" + key + "\":\"" + value + "\""
- } else {
- "\"" + key + "\":\"\""
- }
- })
- }
- def json_add_kv(): Unit = {
- spark.udf.register("json_add_str", (json: String, addVal: String) => {
- if (StringUtils.isNotBlank(json)) {
- "{" + addVal + "," + json.substring(1)
- } else {
- "{" + addVal + "}"
- }
- })
- }
- def map_2_json(): Unit = {
- spark.udf.register("map_2_json", (map: Map[String, String]) => {
- compact(render(Extraction.decompose(map)(DefaultFormats)))
- })
- }
- def company_split(): Unit = {
- spark.udf.register("company_split", (name: String) => {
- if (StringUtils.isEmpty(name)) {
- Array("")
- } else {
- pattern.split(name)
- }
- })
- }
- def area_code(): Unit = {
- spark.udf.register("get_province_code", (name: String) => {
- if (StringUtils.isNotEmpty(name) && name.trim.length == 6) {
- name.trim.substring(0, 2)
- } else {
- null
- }
- })
- spark.udf.register("get_city_code", (name: String) => {
- if (StringUtils.isNotEmpty(name) && name.trim.length == 6) {
- name.trim.substring(2, 4)
- } else {
- null
- }
- })
- spark.udf.register("get_county_code", (name: String) => {
- if (StringUtils.isNotEmpty(name) && name.trim.length == 6) {
- name.trim.substring(4, 6)
- } else {
- null
- }
- })
- }
- def justicase_ops(): Unit = {
- spark.udf.register("get_justicase_id", (case_nos: String) => {
- BKDRHash(case_nos.split(",").sorted.mkString(","))
- })
- }
- }
|