浏览代码

cids兼容list表

xufei 4 年之前
父节点
当前提交
0085215bd0

+ 3 - 3
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncrForCidsUtils.scala

@@ -133,16 +133,16 @@ case class CompanyIncrForCidsUtils(s: SparkSession,
          |                    ,ROW_NUMBER() OVER (PARTITION BY cleanup(CONCAT_WS('',${dupliCols.mkString(",")})) ORDER BY update_time DESC ) num
          |            FROM    (
          |                        SELECT  a.new_cid
-         |                                ,${columns.mkString(",")}
+         |                                ,${sublistTableFieldName.mkString(",")}
          |                        FROM    mapping a
          |                        JOIN    (
          |                                    SELECT  new_cid AS cid
-         |                                            ,${columns.mkString(",")}
+         |                                            ,${sublistTableFieldName.mkString(",")}
          |                                    FROM    ${inc_ads_company_tb_list}
          |                                    WHERE   ds >= ${runDs}
          |                                    UNION ALL
          |                                    SELECT  new_cid AS cid
-         |                                            ,${columns.mkString(",")}
+         |                                            ,${sublistTableFieldName.mkString(",")}
          |                                    FROM    ${ads_company_tb_list}
          |                                    WHERE   ds >= ${remainDs}
          |                                ) b

+ 33 - 2
src/main/scala/com/winhc/bigdata/spark/utils/HbaseUtil.scala

@@ -1,11 +1,17 @@
 package cn.oyohotels.utils
 
+import java.{lang, util}
+
 import com.winhc.bigdata.spark.config.HBaseConfig
 import org.apache.hadoop.hbase._
 import org.apache.hadoop.hbase.client._
+import org.apache.hadoop.hbase.filter.{BinaryPrefixComparator, CompareFilter, RowFilter}
 import org.apache.hadoop.hbase.util.Bytes
 import org.slf4j.LoggerFactory
 
+import scala.collection.mutable
+import scala.collection.mutable.ListBuffer
+
 
 object HbaseUtil {
   final val FAMILY_NAME = "F"
@@ -42,8 +48,33 @@ object HbaseUtil {
     m
   }
 
+  def getRowDataScan(tb: Table, prefix: String, family: String = FAMILY_NAME) = {
+    val scan = new Scan()
+    val prefixFilter = new RowFilter(CompareFilter.CompareOp.EQUAL, new BinaryPrefixComparator(prefix.getBytes()));
+    scan.setFilter(prefixFilter)
+    var ms = ListBuffer[Map[String, String]]()
+    try {
+      val scanner = tb.getScanner(scan)
+      import scala.collection.JavaConversions._
+      import collection.JavaConverters._
+      for (res <- scanner) {
+        //System.out.println(res)
+//        val row: Array[Byte] = res.getRow
+        var r: Map[String, String] = res.rawCells().map(c1 => (Bytes.toString(CellUtil.cloneQualifier(c1)), Bytes.toString(CellUtil.cloneValue(c1)))).toMap
+        //        println(r)
+        r += ("ROWKEY" ->Bytes.toString(res.getRow))
+        ms.+=(r)
+      }
+    } catch {
+      case e: Throwable => e.printStackTrace()
+    }
+    ms.toList
+  }
+
   def main(args: Array[String]): Unit = {
-    val row = getRowData(getTable("COMPANY_TEST"), "2346619464")
-    println(row)
+    val rows = getRowDataScan(getTable("COMPANY_SCORE"), "23402373")
+    for(r <- rows){
+      println(r)
+    }
   }
 }