Explorar el Código

feat: LoggingUtils更新

许家凯 hace 4 años
padre
commit
1d38829dfa
Se han modificado 1 ficheros con 21 adiciones y 8 borrados
  1. 21 8
      src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

+ 21 - 8
src/main/scala/com/winhc/bigdata/spark/utils/LoggingUtils.scala

@@ -122,8 +122,21 @@ trait LoggingUtils extends Logging {
     }
   }
 
+  def getPartitions(t: String, expression: String = null): Seq[String] = {
+    var list = sql(s"show partitions $t").collect().map(_.getString(0))
+    if (expression != null) {
+      list = list.filter(s => s.split("/").contains(expression))
+    }
+    list.map(r => {
+      if (r.contains("/")) {
+        r.split("/").find(s => s.contains("ds")).map(s => s.split("=")(1)).orNull
+      } else {
+        r.split("=")(1)
+      }
+    })
+  }
 
-  def getPartitions(t: String): Seq[String] = {
+  /*def getPartitions(t: String): Seq[String] = {
     val sql_s = s"show partitions " + t
     sql(sql_s).collect.toList.map(r => r.getString(0)).map(r => {
       if (r.contains("/")) {
@@ -132,10 +145,10 @@ trait LoggingUtils extends Logging {
         r.split("=")(1)
       }
     })
-  }
+  }*/
 
-  def getSecondLastPartitionOrElse(t: String, default: String): String = {
-    val ps = getPartitions(t)
+  def getSecondLastPartitionOrElse(t: String, default: String, expression: String = null): String = {
+    val ps = getPartitions(t, expression)
     if (ps.length >= 2) {
       ps(ps.length - 2)
     } else {
@@ -143,8 +156,8 @@ trait LoggingUtils extends Logging {
     }
   }
 
-  def getLastPartitionsOrElse(t: String, default: String): String = {
-    val ps = getPartitions(t)
+  def getLastPartitionsOrElse(t: String, default: String, expression: String = null): String = {
+    val ps = getPartitions(t, expression)
     if (ps.nonEmpty) {
       ps.last
     } else {
@@ -152,8 +165,8 @@ trait LoggingUtils extends Logging {
     }
   }
 
-  def getHeadPartitionsOrElse(t: String, default: String): String = {
-    val ps = getPartitions(t)
+  def getHeadPartitionsOrElse(t: String, default: String, expression: String = null): String = {
+    val ps = getPartitions(t, expression)
     if (ps.nonEmpty) {
       ps.head
     } else {