BaseFunc.scala 5.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package com.winhc.bigdata.spark.udf
  2. import com.winhc.bigdata.spark.implicits.CompanyIndexSave2EsHelper
  3. import com.winhc.bigdata.spark.implicits.RegexUtils._
  4. import com.winhc.bigdata.spark.utils.BaseUtil
  5. import com.winhc.bigdata.spark.utils.BaseUtil._
  6. import org.apache.commons.lang3.StringUtils
  7. import org.apache.spark.broadcast.Broadcast
  8. import org.apache.spark.sql.SparkSession
  9. import org.json4s._
  10. import org.json4s.jackson.JsonMethods._
  11. import scala.annotation.meta.getter
  12. /**
  13. * @Author: XuJiakai
  14. * @Date: 2020/7/10 13:49
  15. * @Description:
  16. */
  17. trait BaseFunc {
  18. @(transient@getter) protected val spark: SparkSession
  19. private val pattern = "[^\\u4e00-\\u9fa5a-zA-Z \\(\\)().]+".r
  20. private val id_card_pattern = "^[1-9]\\d{5}(18|19|20)\\d{2}((0[1-9])|(1[0-2])|\\*{2})(([0-2][1-9])|10|20|30|31|\\*{2})\\d{3}[0-9Xx]$".r
  21. /* def to_epoch_millis_timestamp(): Unit = {
  22. spark.udf.register("to_epoch_millis_timestamp", (et: String) => {
  23. DateUtils.toUnixTimestamp(date = et) * 1000 + 28800000L
  24. })
  25. }*/
  26. def case_no_trim_udf(): Unit = {
  27. spark.udf.register("case_no_trim", case_no_trim _)
  28. }
  29. def is_id_card(): Unit = {
  30. spark.udf.register("is_id_card", (str: String) => id_card_pattern matches str)
  31. }
  32. def code2Name(): (Broadcast[Map[String, Seq[String]]], Broadcast[Map[String, Seq[String]]]) = {
  33. val categoryCode2Name = spark.sparkContext.broadcast(spark.sql(
  34. s"""
  35. |select category_code,
  36. | cate_first,
  37. | cate_second,
  38. | cate_third
  39. |from winhc_eci_dev.ods_category_code
  40. |where ds = '20200604'
  41. """.stripMargin).collect().map(r => {
  42. (r.getString(0), Seq(r.getString(1), r.getString(2), r.getString(3)))
  43. }).toMap)
  44. val areaCode2Name = spark.sparkContext.broadcast(spark.sql(
  45. s"""
  46. |select area_code,province,city,district
  47. |from winhc_eci_dev.ods_area_code where ds = '20200604'
  48. """.stripMargin).collect().map(r => {
  49. (r.getString(0), Seq(r.getString(1), r.getString(2), r.getString(3)))
  50. }).toMap)
  51. spark.udf.register("get_category_first", (code: String) => {
  52. CompanyIndexSave2EsHelper.get_seq_by_index(categoryCode2Name, code, 0)
  53. })
  54. spark.udf.register("get_category_second", (code: String) => {
  55. CompanyIndexSave2EsHelper.get_seq_by_index(categoryCode2Name, code, 1)
  56. })
  57. spark.udf.register("get_category_third", (code: String) => {
  58. CompanyIndexSave2EsHelper.get_seq_by_index(categoryCode2Name, code, 2)
  59. })
  60. spark.udf.register("get_province_name", (code: String) => {
  61. CompanyIndexSave2EsHelper.get_seq_by_index(areaCode2Name, code, 0)
  62. })
  63. spark.udf.register("get_city_name", (code: String) => {
  64. CompanyIndexSave2EsHelper.get_seq_by_index(areaCode2Name, code, 1)
  65. })
  66. spark.udf.register("get_county_name", (code: String) => {
  67. CompanyIndexSave2EsHelper.get_seq_by_index(areaCode2Name, code, 2)
  68. })
  69. (categoryCode2Name, areaCode2Name)
  70. }
  71. def unescapeHtml4(): Unit = {
  72. //清理html字符
  73. spark.udf.register("unescapeHtml4", (col: String) => {
  74. import org.apache.commons.lang3.StringEscapeUtils
  75. StringEscapeUtils.unescapeHtml4(col)
  76. })
  77. }
  78. def cleanup(): Unit = {
  79. //清理特殊字符
  80. spark.udf.register("cleanup", (col: String) => {
  81. BaseUtil.cleanup(col)
  82. })
  83. }
  84. def tyc_split(): Unit = {
  85. spark.udf.register("tyc_split", (name: String) => {
  86. if (StringUtils.isEmpty(name)) {
  87. null
  88. } else {
  89. name.split("\t;\t")
  90. }
  91. })
  92. }
  93. def json_utils(): Unit = {
  94. spark.udf.register("get_json_kv", (key: String, value: String) => {
  95. if (StringUtils.isNotBlank(value)) {
  96. "\"" + key + "\":\"" + value + "\""
  97. } else {
  98. "\"" + key + "\":\"\""
  99. }
  100. })
  101. }
  102. def json_add_kv(): Unit = {
  103. spark.udf.register("json_add_str", (json: String, addVal: String) => {
  104. if (StringUtils.isNotBlank(json)) {
  105. "{" + addVal + "," + json.substring(1)
  106. } else {
  107. "{" + addVal + "}"
  108. }
  109. })
  110. }
  111. def map_2_json(): Unit = {
  112. spark.udf.register("map_2_json", (map: Map[String, String]) => {
  113. compact(render(Extraction.decompose(map)(DefaultFormats)))
  114. })
  115. }
  116. def company_split(): Unit = {
  117. spark.udf.register("company_split", (name: String) => {
  118. if (StringUtils.isEmpty(name)) {
  119. Array("")
  120. } else {
  121. pattern.split(name)
  122. }
  123. })
  124. }
  125. def area_code(): Unit = {
  126. spark.udf.register("get_province_code", (name: String) => {
  127. if (StringUtils.isNotEmpty(name) && name.trim.length == 6) {
  128. name.trim.substring(0, 2)
  129. } else {
  130. null
  131. }
  132. })
  133. spark.udf.register("get_city_code", (name: String) => {
  134. if (StringUtils.isNotEmpty(name) && name.trim.length == 6) {
  135. name.trim.substring(2, 4)
  136. } else {
  137. null
  138. }
  139. })
  140. spark.udf.register("get_county_code", (name: String) => {
  141. if (StringUtils.isNotEmpty(name) && name.trim.length == 6) {
  142. name.trim.substring(4, 6)
  143. } else {
  144. null
  145. }
  146. })
  147. }
  148. def justicase_ops(): Unit = {
  149. spark.udf.register("get_justicase_id", (case_nos: String) => {
  150. BKDRHash(case_nos.split(",").sorted.mkString(","))
  151. })
  152. }
  153. }