许家凯 4 年之前
父節點
當前提交
3bb5d4b7f9

+ 28 - 22
src/main/scala/com/winhc/bigdata/spark/const/EnvConst.scala

@@ -14,36 +14,42 @@ import scala.collection.mutable
  * @Description:
  */
 object EnvConst {
-  private val yaml = new Yaml().loadAll(getClass.getResourceAsStream("/env.yaml"))
-    .iterator()
-  var envName: String = null
-  var envs: List[EnvConst] = List()
+  private lazy val env: (String, List[EnvConst]) = {
+    var envName: String = null
+    var envs: List[EnvConst] = List()
 
-  import scala.collection.JavaConversions._
-  import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+    val yaml = new Yaml().loadAll(getClass.getResourceAsStream("/env.yaml")).iterator()
 
-  if (isWindows) {
-    import com.alibaba.dcm.DnsCacheManipulator
-    DnsCacheManipulator.loadDnsCacheConfig();
-  }
+    import com.winhc.bigdata.spark.utils.BaseUtil.isWindows
+
+    import scala.collection.JavaConversions._
+
+    if (isWindows) {
+      import com.alibaba.dcm.DnsCacheManipulator
+      DnsCacheManipulator.loadDnsCacheConfig();
+    }
 
-  private var i = 0
-  while (yaml.hasNext) {
-    val o = yaml.next()
+    var i = 0
+    while (yaml.hasNext) {
+      val o = yaml.next()
 
-    if (i == 0) {
-      val m1 = o.asInstanceOf[util.HashMap[String, util.HashMap[String, String]]]
-      envName = mapAsScalaMap(m1)("profile")("activate")
-    } else {
-      val m2 = o.asInstanceOf[util.HashMap[String, util.HashMap[String, Object]]]
-      val name = m2("env")("name").asInstanceOf[String]
-      val config = mapAsScalaMap(m2("env")("config").asInstanceOf[util.HashMap[String, String]])
+      if (i == 0) {
+        val m1 = o.asInstanceOf[util.HashMap[String, util.HashMap[String, String]]]
+        envName = mapAsScalaMap(m1)("profile")("activate")
+      } else {
+        val m2 = o.asInstanceOf[util.HashMap[String, util.HashMap[String, Object]]]
+        val name = m2("env")("name").asInstanceOf[String]
+        val config = mapAsScalaMap(m2("env")("config").asInstanceOf[util.HashMap[String, String]])
 
-      envs = envs :+ EnvConst(name, config)
+        envs = envs :+ EnvConst(name, config)
+      }
+      i += 1
     }
-    i += 1
+    (envName, envs)
   }
 
+  lazy val envName = env._1
+  lazy val envs = env._2
 
   def getEnv(envName: String): EnvConst = {
     val map = envs.map(e => {

+ 1 - 0
src/main/scala/com/winhc/bigdata/spark/implicits/PhoenixHelper.scala

@@ -16,6 +16,7 @@ object PhoenixHelper {
      *
      * @param tableName
      */
+    @deprecated(message = "catalog为in-memory方式可用")
     def save2Phoenix(tableName: String): Unit = {
       val tmpTable = "tmp_" + tableName
       df.sparkSession.sql(PhoenixConfig.getPhoenixTempView(tmpTable, tableName))

+ 68 - 0
src/main/scala/com/winhc/bigdata/spark/test/ExportSql.scala

@@ -0,0 +1,68 @@
+package com.winhc.bigdata.spark.test
+
+import java.io.{File, PrintWriter}
+
+import com.winhc.bigdata.spark.utils.SparkUtils
+
+import scala.collection.mutable
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/30 09:55
+ * @Description:
+ */
+object ExportSql {
+
+
+  def getSql(tableName: String, cols: Seq[String]): String = {
+    val cs = cols.map(f => {
+      if ("deleted".equals(f)) {
+        """
+          |CASE _update_type    WHEN "DELETE" THEN 1
+          |                             ELSE deleted
+          |                     END as deleted
+          |""".stripMargin
+      } else {
+        f
+      }
+    }).seq
+
+    val temp =
+      s"""
+         |INSERT OVERWRITE TABLE winhc_eci_dev.${tableName} PARTITION(ds)
+         |SELECT  ${cs.mkString(",")}
+         |FROM    winhc_eci_dev.$tableName
+         |WHERE   ds > 0
+         |;
+         |""".stripMargin
+    temp
+  }
+
+  def getCols(catalog: org.apache.spark.sql.catalog.Catalog, tableName: String): Seq[String] = {
+    catalog.listColumns(tableName).collect().map(_.name).seq
+  }
+
+
+  def main(args: Array[String]): Unit = {
+    val config = mutable.Map(
+      "spark.hadoop.odps.project.name" -> "winhc_eci_dev",
+      "spark.hadoop.odps.spark.local.partition.amt" -> "10"
+    )
+    val spark = SparkUtils.InitEnv("exportSql", config)
+
+    val catalog = spark.catalog
+    val tables = catalog.listTables.collect().map(_.name).filter(_.startsWith("inc_ods")).seq
+
+
+    val allSql = tables.map(t => {
+      getSql(t, getCols(catalog, t))
+    }).seq.mkString("")
+
+    val writer = new PrintWriter(new File("C:\\Users\\x\\Desktop\\公司\\all_sql.txt"))
+
+    writer.write(allSql)
+    writer.close()
+
+    spark.stop()
+  }
+}

+ 3 - 7
src/main/scala/com/winhc/bigdata/spark/utils/BaseUtil.scala

@@ -13,22 +13,18 @@ import org.apache.spark.sql.SparkSession
 object BaseUtil {
   def isWindows: Boolean = System.getProperty("os.name").contains("Windows")
 
-  def getPartitions(t: String, @transient spark: SparkSession):Seq[String] = {
+  def getPartitions(t: String, @transient spark: SparkSession): Seq[String] = {
     import spark._
     val sql_s = s"show partitions " + t
     sql(sql_s).collect.toList.map(_.getString(0).split("=")(1)).seq
   }
 
   def getPartion(t: String, @transient spark: SparkSession) = {
-    import spark._
-    val sql_s = s"show partitions " + t
-    sql(sql_s).collect.toList.map(_.getString(0).split("=")(1)).last
+    getPartitions(t, spark).last
   }
 
   def getFirstPartion(t: String, @transient spark: SparkSession) = {
-    import spark._
-    val sql_s = s"show partitions " + t
-    sql(sql_s).collect.toList.map(_.getString(0).split("=")(1)).head
+    getPartitions(t, spark).head
   }
 
   def atMonthsBefore(n: Int, pattern: String = "yyyy-MM-dd"): String = {

+ 1 - 1
src/main/scala/com/winhc/bigdata/spark/utils/HbaseUtil.scala

@@ -43,7 +43,7 @@ object HbaseUtil {
   }
 
   def main(args: Array[String]): Unit = {
-    val row = getRowData(getTable("company_name_kv"), "100008680")
+    val row = getRowData(getTable("COMPANY_TEST"), "2346619464")
     println(row)
   }
 }

+ 2 - 6
src/main/scala/com/winhc/bigdata/spark/utils/Odps2PhoenixUtils.scala

@@ -29,13 +29,9 @@ case class Odps2PhoenixUtils(s: SparkSession,
          |""".stripMargin)
 
 
-    val fields = spark.table(orgTableName).schema.map(schemaType => {
+    val fields = spark.table(orgTableName).schema.filter(!ignoreCols.contains(_)).map(schemaType => {
       val name = schemaType.name
-
-      if (ignoreCols.contains(name))
-        return null
-      else
-        return s"CAST($name as string) as ${renameCols.getOrElse(name, name)}"
+      s"CAST($name as string) as ${renameCols.getOrElse(name, name)}"
     }).filter(_ != null).seq
 
     import com.winhc.bigdata.spark.implicits.PhoenixHelper._