许家凯 5 éve
szülő
commit
e85368defd

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyNameMappingPro.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.spark.jobs
 
 import com.aliyun.odps.utils.StringUtils
-import com.winhc.bigdata.spark.utils.SparkUtils
+import com.winhc.bigdata.spark.utils.{HBaseUtils, SparkUtils}
 import org.apache.hadoop.hbase.client.{Get, Put, Table}
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.hbase.spark.HBaseContext
@@ -73,7 +73,7 @@ object CompanyNameMappingPro extends Logging {
     /**
      * 写hbase,供查询
      */
-    val jobConf = SparkUtils.HBaseOutputJobConf(hbaseKVTable)
+    val jobConf = HBaseUtils.HBaseOutputJobConf(hbaseKVTable)
 
      val df = sql(s"select cid,name,current_cid from $inputTable")
      df.rdd.map(row => {

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2AliPhoenix.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.spark.test
 
-import com.winhc.bigdata.spark.utils.SparkUtils
+import com.winhc.bigdata.spark.utils.{PhoenixUtil, SparkUtils}
 
 import scala.collection.mutable
 
@@ -19,7 +19,7 @@ object TestSpark2AliPhoenix {
     val sparkTableName = "test_spark"
 
     val driver = "org.apache.phoenix.queryserver.client.Driver"
-    val url = SparkUtils.PhoenixUrl
+    val url = PhoenixUtil.getPhoenixUrl
 
     //    sparkSession.sql("select * from const_company_category_code").show()
 

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2Hbase.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.spark.test
 
-import com.winhc.bigdata.spark.utils.SparkUtils
+import com.winhc.bigdata.spark.utils.{HBaseUtils, SparkUtils}
 import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.hbase.util.Bytes
@@ -29,7 +29,7 @@ object TestSpark2Hbase extends Logging {
     import spark._
     val df = sql(s"select cid,name,current_cid from $inputTable")
 
-    val jobConf = SparkUtils.HBaseOutputJobConf(hbaseKVTable)
+    val jobConf = HBaseUtils.HBaseOutputJobConf(hbaseKVTable)
     val df1 = spark.createDataFrame(Seq(("1", "2", "3"))).toDF("col0", "col1", "col2")
     df1.rdd.map(row => {
       val id = row(0).asInstanceOf[String]

+ 3 - 4
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2Phoenix.scala

@@ -1,7 +1,6 @@
 package com.winhc.bigdata.spark.test
 
-import com.winhc.bigdata.spark.utils.SparkUtils
-
+import com.winhc.bigdata.spark.utils.{PhoenixUtil, SparkUtils}
 import org.apache.phoenix.spark._
 
 /**
@@ -13,12 +12,12 @@ object TestSpark2Phoenix {
   def main(args: Array[String]): Unit = {
     val spark = SparkUtils.InitEnv("testSpark2Phoenix")
 
-    val df1 = spark.sqlContext.phoenixTableAsDataFrame("\"company_abnormal_info\"", Seq("rowkey", "ncid"), zkUrl = Some(SparkUtils.PhoenixOptions("\"company_abnormal_info\"")("zkUrl")))
+    val df1 = spark.sqlContext.phoenixTableAsDataFrame("\"company_abnormal_info\"", Seq("rowkey", "ncid"), zkUrl = Some(PhoenixUtil.getPhoenixOptions("\"company_abnormal_info\"")("zkUrl")))
     df1.show()
 
     val df = spark.createDataFrame(Seq(("xjkxjk", "ncid_value", "ncname_value"))).toDF("\"rowkey\"", "\"ncid\"", "\"ncname\"")
 
-    df.saveToPhoenix(SparkUtils.PhoenixOptions("\"company_abnormal_info\""))
+    df.saveToPhoenix(PhoenixUtil.getPhoenixOptions("\"company_abnormal_info\""))
 
     spark.stop()
   }

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2PhoenixJDBC.scala

@@ -3,7 +3,7 @@ package com.winhc.bigdata.spark.test
 import java.util
 import java.util.Properties
 
-import com.winhc.bigdata.spark.utils.SparkUtils
+import com.winhc.bigdata.spark.utils.{PhoenixUtil, SparkUtils}
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.types.{DataType, StringType, StructField, StructType, UserDefinedType, VarcharType}
 
@@ -26,7 +26,7 @@ object TestSpark2PhoenixJDBC {
       "spark.hadoop.odps.spark.local.partition.amt" -> "100"
     )
     val sparkSession = SparkUtils.InitEnv("scala spark on Phoenix5.x test", map)
-    val DB_PHOENIX_URL = SparkUtils.PhoenixUrl
+    val DB_PHOENIX_URL = PhoenixUtil.getPhoenixUrl
 
     // JDBC连接属性
     val SQL_QUERY = " ( SELECT ID,NCID,CID,TITLE FROM COMPANY_BID_LIST limit 10 )  events  "

+ 31 - 0
src/main/scala/com/winhc/bigdata/spark/utils/HBaseUtils.scala

@@ -0,0 +1,31 @@
+package com.winhc.bigdata.spark.utils
+
+import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+import org.apache.hadoop.hbase.mapred.TableOutputFormat
+import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
+import org.apache.hadoop.mapred.JobConf
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/3 19:03
+ * @Description:
+ */
+object HBaseUtils {
+  def HBaseOutputJobConf(outputTable: String): JobConf = {
+    val config = HBaseConfiguration.create()
+    var zkAddress: String = null
+    if (isWindows) {
+      zkAddress = "hb-proxy-pub-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com"
+      import com.alibaba.dcm.DnsCacheManipulator
+      DnsCacheManipulator.setDnsCache("hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com", "47.101.251.157")
+    } else {
+      zkAddress = "hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com"
+    }
+    config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
+
+    val jobConf = new JobConf(config)
+    jobConf.setOutputFormat(classOf[TableOutputFormat])
+    jobConf.set(TableOutputFormat.OUTPUT_TABLE, outputTable)
+    jobConf
+  }
+}

+ 10 - 0
src/main/scala/com/winhc/bigdata/spark/utils/PhoenixUtil.scala

@@ -30,4 +30,14 @@ object PhoenixUtil {
        |)
        |""".stripMargin
 
+
+  def getPhoenixOptions(tableName: String): Map[String, String] = {
+    if (isWindows) {
+      import com.alibaba.dcm.DnsCacheManipulator
+      DnsCacheManipulator.setDnsCache("hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com", "47.101.251.157")
+      Map("table" -> tableName, "zkUrl" -> "hb-proxy-pub-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:2181")
+    } else {
+      Map("table" -> tableName, "zkUrl" -> "hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:2181")
+    }
+  }
 }

+ 0 - 35
src/main/scala/com/winhc/bigdata/spark/utils/SparkUtils.scala

@@ -1,45 +1,11 @@
 package com.winhc.bigdata.spark.utils
 
 import com.winhc.bigdata.spark.utils.BaseUtil._
-import org.apache.hadoop.hbase.mapred.TableOutputFormat
-import org.apache.hadoop.hbase.{HBaseConfiguration, HConstants}
-import org.apache.hadoop.mapred.JobConf
 import org.apache.spark.sql.SparkSession
 
 import scala.collection.mutable
 
 object SparkUtils {
-
-
-
-  def PhoenixOptions(tableName: String): Map[String, String] = {
-    if (isWindows) {
-      import com.alibaba.dcm.DnsCacheManipulator
-      DnsCacheManipulator.setDnsCache("hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com", "47.101.251.157")
-      Map("table" -> tableName, "zkUrl" -> "hb-proxy-pub-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:2181")
-    } else {
-      Map("table" -> tableName, "zkUrl" -> "hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com:2181")
-    }
-  }
-
-  def HBaseOutputJobConf(outputTable: String): JobConf = {
-    val config = HBaseConfiguration.create()
-    var zkAddress: String = null
-    if (isWindows) {
-      zkAddress = "hb-proxy-pub-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com"
-      import com.alibaba.dcm.DnsCacheManipulator
-      DnsCacheManipulator.setDnsCache("hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com", "47.101.251.157")
-    } else {
-      zkAddress = "hb-uf6as8i6h85k02092-001.hbase.rds.aliyuncs.com"
-    }
-    config.set(HConstants.ZOOKEEPER_QUORUM, zkAddress);
-
-    val jobConf = new JobConf(config)
-    jobConf.setOutputFormat(classOf[TableOutputFormat])
-    jobConf.set(TableOutputFormat.OUTPUT_TABLE, outputTable)
-    jobConf
-  }
-
   def InitEnv(appName: String): SparkSession = {
     InitEnv(appName, null)
   }
@@ -71,5 +37,4 @@ object SparkUtils {
     }
     spark.getOrCreate()
   }
-
 }