Ver código fonte

Merge remote-tracking branch 'origin/master'

许家凯 4 anos atrás
pai
commit
d319daab32

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/jobs/judicial/JudicialCaseRelationPreNew.scala

@@ -445,7 +445,7 @@ case class JudicialCaseRelationPreNew(s: SparkSession, project: String, ds: Stri
          |                JOIN (
          |                        SELECT
          |                                cname
-         |                                ,max(new_cid) AS new_cid
+         |                                ,concat_ws(',',collect_set(new_cid)) AS new_cid
          |                        FROM    $project.$t5
          |                        WHERE   ds = '$t5_ds' AND length(cleanup(cname)) >4
          |                        GROUP BY cname

+ 4 - 4
src/main/scala/com/winhc/bigdata/spark/udf/CaseReasonAggs.scala

@@ -14,7 +14,7 @@ import scala.collection.mutable
 
 class CaseReasonAggs(max: Int) extends UserDefinedAggregateFunction {
 
-  val flags = Seq("0", "1", "2", "4", "8")
+  //val flags = Seq("0", "1", "2", "4", "8")
   val split = "\u0001"
 
   override def inputSchema: StructType = StructType(Array[StructField](
@@ -47,9 +47,9 @@ class CaseReasonAggs(max: Int) extends UserDefinedAggregateFunction {
     if (StringUtils.isBlank(case_reason)) {
       return
     }
-    if (!flags.contains(flag)) {
-      return
-    }
+//    if (!flags.contains(flag)) {
+//      return
+//    }
 
     buffer(0) = mutable.Map(bus_date -> s"$case_reason") ++= scala.collection.mutable.Map[String, String](buffer.getMap[String, String](0).toMap.toSeq: _*)
   }

+ 20 - 15
src/main/scala/com/winhc/bigdata/spark/udf/NameAggs.scala

@@ -13,7 +13,7 @@ import org.apache.spark.sql.types._
 
 class NameAggs(max: Int) extends UserDefinedAggregateFunction {
 
-  val flags = Seq("0", "1", "2", "4", "8")
+  //val flags = Seq("0", "1", "2", "4", "8")
   val split = "\u0001"
 
   override def inputSchema: StructType = StructType(Array[StructField](
@@ -26,7 +26,7 @@ class NameAggs(max: Int) extends UserDefinedAggregateFunction {
   override def bufferSchema: StructType = StructType(
     Array[StructField](
       StructField("t1", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType))
-      ,StructField("t2", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType))
+      , StructField("t2", DataTypes.createMapType(DataTypes.StringType, DataTypes.StringType))
     )
   )
 
@@ -52,9 +52,9 @@ class NameAggs(max: Int) extends UserDefinedAggregateFunction {
     if (StringUtils.isBlank(yg_name) && StringUtils.isBlank(bg_name)) {
       return
     }
-    if (!flags.contains(flag)) {
-      return
-    }
+    //    if (flags.contains(flag)) {
+    //      return
+    //    }
     val map0 = buffer.getMap[String, String](0).toMap
     val map1 = buffer.getMap[String, String](1).toMap
     var map_new0 = scala.collection.mutable.Map[String, String](map0.toSeq: _*)
@@ -76,21 +76,26 @@ class NameAggs(max: Int) extends UserDefinedAggregateFunction {
   override def evaluate(buffer: Row): Any = {
     var yg_name = ""
     var bg_name = ""
-    val m0: Map[String, String] = buffer.getAs[Map[String, String]](0).filter(_._2.split(s"$split",-1).length == 2)
-    val m1: Map[String, String] = buffer.getAs[Map[String, String]](1).filter(_._2.split(s"$split",-1).length == 2)
+    val m0: Map[String, String] = buffer.getAs[Map[String, String]](0)
+    //.filter(_._2.split(s"$split",-1).length == 2)
+    val m1: Map[String, String] = buffer.getAs[Map[String, String]](1)
+    //.filter(_._2.split(s"$split",-1).length == 2)
     //println("m0" + m0 + "m1" + m1)
+    //if(!m0.filter(_._2.split(s"$split",-1).size != 2).isEmpty){
+    //println("m0 : error : " + m0)
+    //}
     if (m0.isEmpty && m1.isEmpty) {
       return Map("yg_name" -> yg_name, "bg_name" -> bg_name)
-    }else if(!m0.isEmpty){
+    } else if (!m0.isEmpty) {
       val key = m0.keySet.toSeq.sorted.head
-      val Array(a, b) = m0(key).split(s"$split",-1)
-      yg_name = a
-      bg_name = b
-    }else{
+      val arr = m0(key).split(s"$split", -1)
+      yg_name = arr(0)
+      bg_name = arr(1)
+    } else {
       val key = m1.keySet.toSeq.sorted.head
-      val Array(x, y) = m1(key).split(s"$split",-1)
-      yg_name = x
-      bg_name = y
+      val arr = m1(key).split(s"$split", -1)
+      yg_name = arr(0)
+      bg_name = arr(1)
     }
     Map("yg_name" -> yg_name, "bg_name" -> bg_name)
   }

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -340,7 +340,7 @@ object BaseUtil {
   def sortString(s: String, split: String = "\\001"): String = {
     var r = ""
     if (StringUtils.isNotBlank(s)) {
-      r = s.split(split).sorted.mkString(split)
+      r = s.split(split).toSet.toList.sorted.mkString(split)
     }
     r
   }