12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697 |
- package com.winhc.bigdata.spark.udf
- import org.apache.commons.lang3.StringUtils
- import org.apache.spark.sql.Row
- import org.apache.spark.sql.expressions.{MutableAggregationBuffer, UserDefinedAggregateFunction}
- import org.apache.spark.sql.types._
- /**
- * @Description:原告,被告聚合
- * @author π
- * @date 2020/10/26 15:15
- */
- class NameAggs(max: Int) extends UserDefinedAggregateFunction {
- val flags = Seq("0", "1", "2", "4", "8")
- val split = "\u0001"
- override def inputSchema: StructType = StructType(Array[StructField](
- StructField("yg_name", DataTypes.StringType)
- , StructField("bg_name", DataTypes.StringType)
- , StructField("flag", DataTypes.StringType)
- , StructField("bus_date", DataTypes.StringType)
- ))
- override def bufferSchema: StructType = StructType(
- Array[StructField](
- StructField("t1", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType))
- ,StructField("t2", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType))
- )
- )
- override def dataType: DataType = DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType)
- override def deterministic: Boolean = true
- override def initialize(buffer: MutableAggregationBuffer): Unit = {
- buffer.update(0, Map[String, String]())
- buffer.update(1, Map[String, String]())
- }
- override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
- if (buffer.size >= max) {
- return
- }
- val yg_name = input.getString(0)
- val bg_name = input.getString(1)
- val flag = input.getString(2)
- val bus_date = input.getString(3)
- if (StringUtils.isBlank(yg_name) && StringUtils.isBlank(bg_name)) {
- return
- }
- if (!flags.contains(flag)) {
- return
- }
- val map0 = buffer.getMap[String, String](0).toMap
- val map1 = buffer.getMap[String, String](1).toMap
- var map_new0 = scala.collection.mutable.Map[String, String](map0.toSeq: _*)
- var map_new1 = scala.collection.mutable.Map[String, String](map1.toSeq: _*)
- if (StringUtils.isNotBlank(yg_name) && StringUtils.isNotBlank(bg_name)) {
- map_new0 ++= Map(bus_date -> s"$yg_name$split$bg_name")
- } else {
- map_new1 ++= Map(bus_date -> s"$yg_name$split$bg_name")
- }
- buffer.update(0, map_new0)
- buffer.update(1, map_new1)
- }
- override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
- buffer1(0) = buffer1.getAs[Map[String, String]](0) ++ buffer2.getAs[Map[String, String]](0)
- buffer1(1) = buffer1.getAs[Map[String, String]](1) ++ buffer2.getAs[Map[String, String]](1)
- }
- override def evaluate(buffer: Row): Any = {
- var yg_name = ""
- var bg_name = ""
- val m0: Map[String, String] = buffer.getAs[Map[String, String]](0)
- val m1: Map[String, String] = buffer.getAs[Map[String, String]](1)
- println("m0" + m0 + "m1" + m1)
- if (m0.isEmpty && m1.isEmpty) {
- return Map("yg_name" -> yg_name, "bg_name" -> bg_name)
- }else if(!m0.isEmpty){
- val key = m0.keySet.toSeq.sorted.head
- val Array(a, b) = m0(key).split(s"$split",-1)
- yg_name = a
- bg_name = b
- }else{
- val key = m1.keySet.toSeq.sorted.head
- val Array(x, y) = m1(key).split(s"$split",-1)
- yg_name = x
- bg_name = y
- }
- Map("yg_name" -> yg_name, "bg_name" -> bg_name)
- }
- }
|