Browse Source

整理package

许家凯 4 years ago
parent
commit
cf827cf81e

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/utils/EsUtils.scala

@@ -1,4 +1,4 @@
-package com.winhc.bigdata.spark.utils
+package com.winhc.bigdata.spark.config
 
 import com.winhc.bigdata.spark.const.EnvConst
 
@@ -9,7 +9,7 @@ import scala.collection.mutable
  * @Date: 2020/6/5 09:53
  * @Description:
  */
-object EsUtils {
+object EsConfig {
 
   def getEsConfigMap: mutable.Map[String, String] = {
     val map = mutable.Map(

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/utils/HBaseUtils.scala

@@ -1,4 +1,4 @@
-package com.winhc.bigdata.spark.utils
+package com.winhc.bigdata.spark.config
 
 import com.winhc.bigdata.spark.const.EnvConst
 import org.apache.hadoop.conf.Configuration
@@ -11,7 +11,7 @@ import org.apache.hadoop.mapred.JobConf
  * @Date: 2020/6/3 19:03
  * @Description:
  */
-object HBaseUtils {
+object HBaseConfig {
   def getHbaseConf(): Configuration = {
     val config = HBaseConfiguration.create()
     val zkAddress: String = EnvConst.getEnv().getValue("zk.address")

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/utils/PhoenixUtil.scala

@@ -1,4 +1,4 @@
-package com.winhc.bigdata.spark.utils
+package com.winhc.bigdata.spark.config
 
 import java.util.Properties
 
@@ -9,7 +9,7 @@ import com.winhc.bigdata.spark.const.{BaseConst, EnvConst}
  * @Date: 2020/6/3 18:09
  * @Description:
  */
-object PhoenixUtil {
+object PhoenixConfig {
   private val DB_PHOENIX_DRIVER = BaseConst.DB_PHOENIX_DRIVER
   private val DB_PHOENIX_USER = ""
   private val DB_PHOENIX_PASS = ""

+ 3 - 5
src/main/scala/com/winhc/bigdata/spark/implicits/PhoenixHelper.scala

@@ -1,7 +1,6 @@
 package com.winhc.bigdata.spark.implicits
 
-import com.winhc.bigdata.spark.const.BaseConst
-import com.winhc.bigdata.spark.utils.PhoenixUtil
+import com.winhc.bigdata.spark.config.PhoenixConfig
 import org.apache.spark.sql.DataFrame
 
 /**
@@ -19,7 +18,7 @@ object PhoenixHelper {
      */
     def save2Phoenix(tableName: String): Unit = {
       val tmpTable = "tmp_" + tableName
-      df.sparkSession.sql(PhoenixUtil.getPhoenixTempView(tmpTable, tableName))
+      df.sparkSession.sql(PhoenixConfig.getPhoenixTempView(tmpTable, tableName))
       df.write
         .mode("append")
         .insertInto(tmpTable)
@@ -33,8 +32,7 @@ object PhoenixHelper {
     def save2PhoenixByJDBC(tableName: String): Unit = {
       df.write
         .mode("append")
-        .jdbc(PhoenixUtil.getPhoenixJDBCUrl, tableName, PhoenixUtil.getPhoenixProperties)
-      //        .jdbc(PhoenixUtil.getPhoenixJDBCUrl, tableName + BaseConst.PHOENIX_TABLE_NAME_FLAG, PhoenixUtil.getPhoenixProperties)
+        .jdbc(PhoenixConfig.getPhoenixJDBCUrl, tableName, PhoenixConfig.getPhoenixProperties)
     }
   }
 

+ 4 - 3
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIncCompany2Es.scala

@@ -1,7 +1,8 @@
 package com.winhc.bigdata.spark.jobs
 
+import com.winhc.bigdata.spark.config.{EsConfig, HBaseConfig}
 import com.winhc.bigdata.spark.const.BaseConst
-import com.winhc.bigdata.spark.utils.{BaseUtil, EsUtils, HBaseUtils, LoggingUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, SparkUtils}
 import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.hbase.util.Bytes
@@ -113,7 +114,7 @@ object CompanyIncCompany2Es {
       import spark.implicits._
       //写出到hbase
       import org.apache.spark.sql.functions.col
-      val jobConf = HBaseUtils.HBaseOutputJobConf("COMPANY")
+      val jobConf = HBaseConfig.HBaseOutputJobConf("COMPANY")
       val stringDf = df.select(companyCols.map(column => col(column).cast("string")): _*)
       stringDf.rdd.map(row => {
         val id = row.getAs[String]("cid")
@@ -150,7 +151,7 @@ object CompanyIncCompany2Es {
 
     val Array(project, bizDate) = args
 
-    val config = EsUtils.getEsConfigMap ++ mutable.Map(
+    val config = EsConfig.getEsConfigMap ++ mutable.Map(
       "spark.hadoop.odps.project.name" -> project,
       "spark.hadoop.odps.spark.local.partition.amt" -> "10"
     )

+ 3 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyIndexSave2Es.scala

@@ -1,7 +1,8 @@
 package com.winhc.bigdata.spark.jobs
 
+import com.winhc.bigdata.spark.config.EsConfig
 import com.winhc.bigdata.spark.utils.CompanyEsUtils.getEsDoc
-import com.winhc.bigdata.spark.utils.{EsUtils, SparkUtils}
+import com.winhc.bigdata.spark.utils.SparkUtils
 
 /**
  * @Author: XuJiakai
@@ -20,7 +21,7 @@ object CompanyIndexSave2Es {
   }
 
   def main(args: Array[String]): Unit = {
-    val map = EsUtils.getEsConfigMap
+    val map = EsConfig.getEsConfigMap
 
     val company_name_mapping = "winhc_eci_dev.company_name_mapping_pro_v2"
 

+ 3 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyNameMappingPro.scala

@@ -1,7 +1,8 @@
 package com.winhc.bigdata.spark.jobs
 
 import com.aliyun.odps.utils.StringUtils
-import com.winhc.bigdata.spark.utils.{CompanyNameMappingUtil, HBaseUtils, SparkUtils}
+import com.winhc.bigdata.spark.config.HBaseConfig
+import com.winhc.bigdata.spark.utils.{CompanyNameMappingUtil, SparkUtils}
 import org.apache.hadoop.hbase.client.{Get, Put, Table}
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.hbase.spark.HBaseContext
@@ -43,7 +44,7 @@ object CompanyNameMappingPro extends Logging {
     /**
      * 写hbase,供查询
      */
-    val jobConf = HBaseUtils.HBaseOutputJobConf(hbaseKVTable)
+    val jobConf = HBaseConfig.HBaseOutputJobConf(hbaseKVTable)
 
     /*  val df = sql(s"select cid,name,current_cid from $inputTable where cid is not null")
       df.rdd.map(row => {

+ 3 - 2
src/main/scala/com/winhc/bigdata/spark/jobs/CompanyNameMappingPro_stage_01.scala

@@ -1,6 +1,7 @@
 package com.winhc.bigdata.spark.jobs
 
-import com.winhc.bigdata.spark.utils.{CompanyNameMappingUtil, HBaseUtils, SparkUtils}
+import com.winhc.bigdata.spark.config.HBaseConfig
+import com.winhc.bigdata.spark.utils.{CompanyNameMappingUtil, SparkUtils}
 import org.apache.hadoop.hbase.TableName
 import org.apache.hadoop.hbase.spark.HBaseContext
 import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
@@ -28,7 +29,7 @@ object CompanyNameMappingPro_stage_01 extends Logging {
     /**
      * 写hbase,供查询
      */
-    val jobConf = HBaseUtils.HBaseOutputJobConf(hbaseKVTable)
+    val jobConf = HBaseConfig.HBaseOutputJobConf(hbaseKVTable)
 
     val df_old = sql(
       s"""

+ 6 - 6
src/main/scala/com/winhc/bigdata/spark/jobs/increment/CommonTableOps.scala

@@ -2,11 +2,11 @@ package com.winhc.bigdata.spark.jobs.increment
 
 import java.util.Date
 
-import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils, PhoenixUtil}
+import com.winhc.bigdata.spark.config.PhoenixConfig
+import com.winhc.bigdata.spark.utils.{BaseUtil, LoggingUtils}
 import org.apache.spark.sql.SparkSession
 
 import scala.annotation.meta.getter
-
 import com.winhc.bigdata.spark.implicits.PhoenixHelper._
 
 /**
@@ -74,8 +74,8 @@ case class CommonTableOps(s: SparkSession, sourceTable: String, dupCols: Seq[Str
     println(s"${this.getClass.getSimpleName} ads end! " + new Date().toString)
 
     // 写入PHX的表
-    val DB_PHOENIX_URL = PhoenixUtil.getPhoenixJDBCUrl
-    val connProp = PhoenixUtil.getPhoenixProperties
+    val DB_PHOENIX_URL = PhoenixConfig.getPhoenixJDBCUrl
+    val connProp = PhoenixConfig.getPhoenixProperties
     df1.persist()
     sql(s"""SELECT ${adsColumns.filter(!_.equals("ROWKEY")).mkString(",")},CONCAT_WS("_",c.cid,c.id) AS ROWKEY FROM t2""")
       .save2PhoenixByJDBC(s"${phxTable}")
@@ -145,8 +145,8 @@ case class CommonTableOps(s: SparkSession, sourceTable: String, dupCols: Seq[Str
     println(s"${this.getClass.getSimpleName} ads end! " + new Date().toString)
 
     // 写入PHX的LIST表
-    val DB_PHOENIX_URL = PhoenixUtil.getPhoenixJDBCUrl
-    val connProp = PhoenixUtil.getPhoenixProperties
+    val DB_PHOENIX_URL = PhoenixConfig.getPhoenixJDBCUrl
+    val connProp = PhoenixConfig.getPhoenixProperties
     df1.persist()
     sql(s"SELECT ${adsListColumns.mkString(",")} FROM t2")
       .save2PhoenixByJDBC(s"${phxListTable}")

+ 36 - 4
src/main/scala/com/winhc/bigdata/spark/test/TestFlow.scala

@@ -2,6 +2,8 @@ package com.winhc.bigdata.spark.test
 
 import com.winhc.bigdata.spark.utils.SparkUtils
 
+import scala.collection.mutable
+
 /**
  * @Author: XuJiakai
  * @Date: 2020/6/22 14:32
@@ -9,13 +11,43 @@ import com.winhc.bigdata.spark.utils.SparkUtils
  */
 object TestFlow {
   def main(args: Array[String]): Unit = {
-    val spark = SparkUtils.InitEnv("test touch flow")
 
-    val Array(test1, test2, test3) = args
+
+
+
+
+/*
+
+   val spark = SparkUtils.InitEnv("test touch flow")
+
+    val tableName = "company_icp"
+    val project = "winhc_eci_dev"
+
+    val ads_table = s"${project}.ads_$tableName" //存量ads表
+    val inc_ads_table = s"${project}.inc_ads_$tableName"
+    val ads_table_cols = spark.table(ads_table).columns.filter(l => {
+      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag")
+    }).toList.sorted
+
+    val inc_ads_table_cols = spark.table(inc_ads_table).columns.filter(l => {
+      !l.equals("ds") && !l.equals("rowkey") && !l.equals("flag")
+    }).toList.sorted
+
+    val new_cols = (ads_table_cols ::: inc_ads_table_cols).distinct.sorted
+
+    if (new_cols.size != inc_ads_table_cols.size || new_cols.size != ads_table_cols.size) {
+      println(ads_table_cols)
+      println(inc_ads_table_cols)
+      println("cols not equals!")
+      sys.exit(-99)
+    }
+*/
+
+   /* val Array(test1, test2, test3) = args
 
     println(test1, test2, test3)
     val df1 = spark.createDataFrame(Seq((test1, test2, test3))).toDF("col0", "col1", "col2")
-    df1.show
-    spark.stop()
+    df1.show*/
+//    spark.stop()
   }
 }

+ 7 - 6
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2AliPhoenix.scala

@@ -1,6 +1,7 @@
 package com.winhc.bigdata.spark.test
 
-import com.winhc.bigdata.spark.utils.{PhoenixUtil, SparkUtils}
+import com.winhc.bigdata.spark.config.PhoenixConfig
+import com.winhc.bigdata.spark.utils.SparkUtils
 
 import scala.collection.mutable
 
@@ -19,7 +20,7 @@ object TestSpark2AliPhoenix {
     val sparkTableName = "test_spark"
 
     val driver = "org.apache.phoenix.queryserver.client.Driver"
-    val url = PhoenixUtil.getPhoenixJDBCUrl
+    val url = PhoenixConfig.getPhoenixJDBCUrl
 
     //    sparkSession.sql("select * from const_company_category_code").show()
 
@@ -38,10 +39,10 @@ object TestSpark2AliPhoenix {
     val querySql = "select * from " + sparkTableName + " limit 100"
     sparkSession.sql(querySql).show
 
-//        sparkSession.createDataFrame(Seq(("rowkey", "2", null))).toDF("k", "s", "time")
-//          .write
-//          .mode("append")
-//          .insertInto(sparkTableName)
+    //        sparkSession.createDataFrame(Seq(("rowkey", "2", null))).toDF("k", "s", "time")
+    //          .write
+    //          .mode("append")
+    //          .insertInto(sparkTableName)
 
     sparkSession.stop()
   }

+ 3 - 2
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2Hbase.scala

@@ -1,6 +1,7 @@
 package com.winhc.bigdata.spark.test
 
-import com.winhc.bigdata.spark.utils.{HBaseUtils, SparkUtils}
+import com.winhc.bigdata.spark.config.HBaseConfig
+import com.winhc.bigdata.spark.utils.SparkUtils
 import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.hbase.util.Bytes
@@ -26,7 +27,7 @@ object TestSpark2Hbase extends Logging {
     val spark = SparkUtils.InitEnv("TestSpark2Hbase", map)
     import spark._
 
-    val jobConf = HBaseUtils.HBaseOutputJobConf(hbaseKVTable)
+    val jobConf = HBaseConfig.HBaseOutputJobConf(hbaseKVTable)
     val df1 = spark.createDataFrame(Seq(("1", "2", "3"))).toDF("col0", "col1", "col2")
     df1.rdd.map(row => {
       val id = row(0).asInstanceOf[String]

+ 4 - 3
src/main/scala/com/winhc/bigdata/spark/test/TestSpark2Phoenix.scala

@@ -1,6 +1,7 @@
 package com.winhc.bigdata.spark.test
 
-import com.winhc.bigdata.spark.utils.{PhoenixUtil, SparkUtils}
+import com.winhc.bigdata.spark.config.PhoenixConfig
+import com.winhc.bigdata.spark.utils.SparkUtils
 import org.apache.phoenix.spark._
 
 /**
@@ -12,12 +13,12 @@ object TestSpark2Phoenix {
   def main(args: Array[String]): Unit = {
     val spark = SparkUtils.InitEnv("testSpark2Phoenix")
 
-    val df1 = spark.sqlContext.phoenixTableAsDataFrame("\"company_abnormal_info\"", Seq("rowkey", "ncid"), zkUrl = Some(PhoenixUtil.getPhoenixOptions("\"company_abnormal_info\"")("zkUrl")))
+    val df1 = spark.sqlContext.phoenixTableAsDataFrame("\"company_abnormal_info\"", Seq("rowkey", "ncid"), zkUrl = Some(PhoenixConfig.getPhoenixOptions("\"company_abnormal_info\"")("zkUrl")))
     df1.show()
 
     val df = spark.createDataFrame(Seq(("xjkxjk", "ncid_value", "ncname_value"))).toDF("\"rowkey\"", "\"ncid\"", "\"ncname\"")
 
-    df.saveToPhoenix(PhoenixUtil.getPhoenixOptions("\"company_abnormal_info\""))
+    df.saveToPhoenix(PhoenixConfig.getPhoenixOptions("\"company_abnormal_info\""))
 
     spark.stop()
   }

+ 3 - 2
src/main/scala/com/winhc/bigdata/spark/test/TestSpark4Hbase.scala

@@ -1,6 +1,7 @@
 package com.winhc.bigdata.spark.test
 
-import com.winhc.bigdata.spark.utils.{CompanyNameMappingUtil, HBaseUtils, SparkUtils}
+import com.winhc.bigdata.spark.config.HBaseConfig
+import com.winhc.bigdata.spark.utils.{CompanyNameMappingUtil, SparkUtils}
 import org.apache.hadoop.hbase.TableName
 import org.apache.hadoop.hbase.spark.HBaseContext
 import org.apache.hadoop.hbase.spark.HBaseRDDFunctions._
@@ -17,7 +18,7 @@ object TestSpark4Hbase {
     val spark = SparkUtils.InitEnv("4hbase")
     import spark.implicits._
     val hbaseKVTable = "company_name_kv"
-    val jobConf = HBaseUtils.HBaseOutputJobConf(hbaseKVTable)
+    val jobConf = HBaseConfig.HBaseOutputJobConf(hbaseKVTable)
     val hbaseContext = new HBaseContext(spark.sparkContext, jobConf)
     val rdd = spark.createDataset(Seq("3190263436", "3190295172", "3190295172")).rdd.hbaseMapPartitions(hbaseContext, (f, con) => {
       val table = con.getTable(TableName.valueOf(hbaseKVTable))

+ 0 - 4
src/main/scala/com/winhc/bigdata/spark/test/YamlTest.scala

@@ -1,7 +1,5 @@
 package com.winhc.bigdata.spark.test
 
-import com.winhc.bigdata.spark.const.EnvConst
-
 /**
  * @Author: XuJiakai
  * @Date: 2020/6/29 13:51
@@ -9,7 +7,5 @@ import com.winhc.bigdata.spark.const.EnvConst
  */
 object YamlTest {
   def main(args: Array[String]): Unit = {
-    val env = EnvConst.getEnv("dev-remote")
-    println(env.getValue("es.node"))
   }
 }

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/utils/CompanyIncSummary.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.spark.utils
 
+import com.winhc.bigdata.spark.config.HBaseConfig
 import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.hbase.util.Bytes
@@ -78,7 +79,7 @@ case class CompanyIncSummary(s: SparkSession,
          |""".stripMargin).cache().createOrReplaceTempView("inc_tmp_view")
 
 
-    val jobConf = HBaseUtils.HBaseOutputJobConf("COMPANY_SUMMARY")
+    val jobConf = HBaseConfig.HBaseOutputJobConf("COMPANY_SUMMARY")
     sql(
       s"""
          |SELECT  ${cidField} as cid

+ 2 - 2
src/main/scala/com/winhc/bigdata/spark/utils/HbaseUtil.scala

@@ -1,6 +1,6 @@
 package cn.oyohotels.utils
 
-import com.winhc.bigdata.spark.utils.HBaseUtils
+import com.winhc.bigdata.spark.config.HBaseConfig
 import org.apache.hadoop.hbase._
 import org.apache.hadoop.hbase.client._
 import org.apache.hadoop.hbase.util.Bytes
@@ -15,7 +15,7 @@ object HbaseUtil {
   val lrs = HbaseUtil.getClass.getResource("/").getPath
 
   lazy val conf = {
-    val myConf = HBaseUtils.getHbaseConf()
+    val myConf = HBaseConfig.getHbaseConf()
     myConf
   }
 

+ 2 - 1
src/main/scala/com/winhc/bigdata/spark/utils/Maxcomputer2Hbase.scala

@@ -1,5 +1,6 @@
 package com.winhc.bigdata.spark.utils
 
+import com.winhc.bigdata.spark.config.HBaseConfig
 import org.apache.hadoop.hbase.client.Put
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable
 import org.apache.hadoop.hbase.util.Bytes
@@ -18,7 +19,7 @@ case class Maxcomputer2Hbase(dataFrame: DataFrame
   lazy val f_bytes: Array[Byte] = Bytes.toBytes("F")
 
   def syn(): Unit = {
-    val jobConf = HBaseUtils.HBaseOutputJobConf(hbaseTable.toUpperCase)
+    val jobConf = HBaseConfig.HBaseOutputJobConf(hbaseTable.toUpperCase)
     import org.apache.spark.sql.functions.col
     //df字段转化string
     val columns: Array[String] = dataFrame.columns