Prechádzať zdrojové kódy

feat: 添加读取mysql

许家凯 3 rokov pred
rodič
commit
8f986c3dee

+ 2 - 0
src/main/resources/env.yaml

@@ -27,6 +27,7 @@ env:
     es.eci.nodes: es-cn-oew22t8bw002iferu.public.elasticsearch.aliyuncs.com
     zk.address: hb-proxy-pub-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com:2181,hb-proxy-pub-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com:2181
     phoenix.address: http://hb-uf6m8e1nu4ivp06m5-proxy-phoenix-pub.hbase.rds.aliyuncs.com:8765
+    mysql.prism1.address: rm-uf61r3m23ba1p5z3dfo.mysql.rds.aliyuncs.com
 
 ---
 env:
@@ -36,3 +37,4 @@ env:
     es.eci.nodes: es-cn-oew22t8bw002iferu.elasticsearch.aliyuncs.com
     zk.address: hb-uf6m8e1nu4ivp06m5-master1-001.hbase.rds.aliyuncs.com:2181,hb-uf6m8e1nu4ivp06m5-master2-001.hbase.rds.aliyuncs.com:2181,hb-uf6m8e1nu4ivp06m5-master3-001.hbase.rds.aliyuncs.com:2181
     phoenix.address: http://hb-uf6m8e1nu4ivp06m5-proxy-phoenix.hbase.rds.aliyuncs.com:8765
+    mysql.prism1.address: rm-uf61r3m23ba1p5z3d.mysql.rds.aliyuncs.com

+ 21 - 0
src/main/scala/com/winhc/bigdata/spark/config/MysqlConfig.scala

@@ -0,0 +1,21 @@
+package com.winhc.bigdata.spark.config
+
+import com.winhc.bigdata.spark.const.EnvConst
+
+import java.util.Properties
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/5/31 14:37
+ */
+object MysqlConfig {
+  def getMysqlConfig(): (String, Properties) = {
+    val url: String = "jdbc:mysql://" + EnvConst.getEnv.getValue("mysql.prism1.address") + ":3306/prism1?autoReconnect=true&useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&serverTimezone=GMT%2B8"
+    val properties: Properties = new Properties()
+    properties.setProperty("dbtable", "prism1")
+    properties.setProperty("user", "wenshu")
+    properties.setProperty("password", "wenshu_168")
+    properties.setProperty("driver", "com.mysql.cj.jdbc.Driver")
+    (url, properties)
+  }
+}

+ 17 - 0
src/main/scala/com/winhc/bigdata/spark/implicits/MysqlHelper.scala

@@ -0,0 +1,17 @@
+package com.winhc.bigdata.spark.implicits
+
+import com.winhc.bigdata.spark.config.MysqlConfig
+import org.apache.spark.sql.{DataFrame, SparkSession}
+
+/**
+ * @author: XuJiakai
+ * @date: 2021/5/31 14:35
+ */
+object MysqlHelper {
+  implicit class SparkSessionEnhancer(sparkSession: SparkSession) extends Serializable {
+    def read(table: String): DataFrame = {
+      val tuple = MysqlConfig.getMysqlConfig()
+      sparkSession.read.jdbc(tuple._1, table, tuple._2)
+    }
+  }
+}