Jelajahi Sumber

測試性能

xufei 2 tahun lalu
induk
melakukan
961e6a911c

+ 11 - 0
pom.xml

@@ -233,6 +233,17 @@
             <scope>test</scope>
             <classifier>tests</classifier>
         </dependency>
+
+        <dependency>
+            <groupId>com.alibaba.hologres</groupId>
+            <artifactId>holo-client</artifactId>
+            <version>1.2.16.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-connector-kafka_2.12</artifactId>
+            <version>1.12.0</version>
+        </dependency>
     </dependencies>
 
         <build>

+ 32 - 0
src/main/java/com/winhc/bigdata/flink/java/jobs/TestJob2.java

@@ -0,0 +1,32 @@
+package com.winhc.bigdata.flink.java.jobs;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.winhc.bigdata.flink.java.utils.KafkaSourceUtils;
+import org.apache.flink.api.common.functions.FilterFunction;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/10/18 16:39
+ */
+public class TestJob2 {
+    public static void main(String[] args) throws Exception {
+        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
+        //env.setParallelism(1);
+        FlinkKafkaConsumer<String> kafkaSource = KafkaSourceUtils.getKafkaSource("flink_test", "xjk_test");
+        DataStreamSource<String> kafkaDS = env.addSource(kafkaSource);
+        SingleOutputStreamOperator<JSONObject> jsonObjDS = kafkaDS.map(json->{
+            System.out.println("json: "+json);
+            return JSON.parseObject(json);
+        }).name("map name");
+        SingleOutputStreamOperator<JSONObject> filterDS = jsonObjDS.filter((FilterFunction<JSONObject>) value -> !value.isEmpty()).name("filter name");
+        filterDS.print();
+        //filterDS.addSink(new HoloSink()).name("Holo Sink");
+        env.execute("BaseJob");
+    }
+}

+ 17 - 0
src/main/java/com/winhc/bigdata/flink/java/utils/BaseUtils.java

@@ -0,0 +1,17 @@
+package com.winhc.bigdata.flink.java.utils;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/10/19 14:15
+ */
+public class BaseUtils {
+
+    public static Boolean isWindows() {
+        return System.getProperty("os.name").contains("Windows");
+    }
+
+    public static void main(String[] args) {
+        System.out.println(isWindows());
+    }
+}

+ 65 - 0
src/main/java/com/winhc/bigdata/flink/java/utils/KafkaSourceUtils.java

@@ -0,0 +1,65 @@
+package com.winhc.bigdata.flink.java.utils;
+
+import org.apache.flink.api.common.serialization.SimpleStringSchema;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
+import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer;
+import org.apache.flink.streaming.connectors.kafka.KafkaSerializationSchema;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+
+import java.util.Properties;
+
+public class KafkaSourceUtils {
+
+    //private static String KAFKA_SERVER = "47.101.221.131:9092";
+    private static String KAFKA_SERVER = null;
+    private static Properties properties = new Properties();
+    private static String DEFAULT_TOPIC = "test";
+
+    static {
+        if (BaseUtils.isWindows()) {
+            KAFKA_SERVER = "47.101.221.131:9092";
+        } else {
+            KAFKA_SERVER = "192.168.4.239:9092,192.168.4.241:9092,192.168.4.240:9092";
+        }
+        properties.setProperty("bootstrap.servers", KAFKA_SERVER);
+    }
+
+    /**
+     * 获取KafkaSource的方法
+     *
+     * @param topic   主题
+     * @param groupId 消费者组
+     */
+    public static FlinkKafkaConsumer<String> getKafkaSource(String topic, String groupId) {
+
+        //给配置信息对象添加配置项
+        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
+
+        //获取KafkaSource
+        return new FlinkKafkaConsumer<String>(topic, new SimpleStringSchema(), properties);
+    }
+
+    public static FlinkKafkaProducer<String> getKafkaSink(String topic) {
+        return new FlinkKafkaProducer<String>(topic, new SimpleStringSchema(), properties);
+    }
+
+    public static <T> FlinkKafkaProducer<T> getKafkaSinkBySchema(KafkaSerializationSchema<T> kafkaSerializationSchema) {
+        properties.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 5 * 60 * 1000 + "");
+        return new FlinkKafkaProducer<T>(DEFAULT_TOPIC,
+                kafkaSerializationSchema,
+                properties,
+                FlinkKafkaProducer.Semantic.EXACTLY_ONCE);
+    }
+
+    public static String getKafkaDDL(String topic, String groupId) {
+        return "  'connector' = 'kafka'," +
+                "  'topic' = '" + topic + "'," +
+                "  'properties.bootstrap.servers' = '" + KAFKA_SERVER + "'," +
+                "  'properties.group.id' = '" + groupId + "'," +
+                "  'scan.startup.mode' = 'latest-offset'," +
+                "  'format' = 'json'" +
+                ")";
+    }
+
+}

+ 4 - 1
src/main/scala/com/winhc/bigdata/flink/config/ArgsCompanyJob.scala

@@ -1,7 +1,7 @@
 package com.winhc.bigdata.flink.config
 
 import com.fasterxml.jackson.databind.JsonNode
-import com.winhc.bigdata.flink.event.{ElasticSearchInfo, HbaseInfo}
+import com.winhc.bigdata.flink.event.{ElasticSearchInfo, HbaseInfo, HologresInfo}
 import com.winhc.bigdata.flink.utils.BaseUtils.cleanup
 import com.winhc.bigdata.flink.utils.{BaseUtils, PreUDF}
 
@@ -13,6 +13,7 @@ case class ArgsCompanyJob(
                            //                             tn: String,
                            hbaseInfo: HbaseInfo,
                            esInfo: ElasticSearchInfo,
+                           holoInfo: HologresInfo,
                            md5_fields: Seq[String],
                            rowkey_udf: Function2[String, JsonNode, String] = null
 
@@ -22,6 +23,7 @@ object ArgsCompanyJob {
   val job_args = Map(
     "company_holder" -> ArgsCompanyJob(HbaseInfo("NG_RT_COMPANY_HOLDER", "F"),
       ElasticSearchInfo("winhc_rt_index_company_holder", "_doc"),
+      HologresInfo("ng_rt_company_holder", "public"),
       null,
       (companyId, j) => {
         val row = PreUDF.company_holder_rowkey(j.get("holder_type").asInt(), j.get("holder_id").asText(), j.get("holder_name").asText())
@@ -30,6 +32,7 @@ object ArgsCompanyJob {
     "company_staff" -> ArgsCompanyJob(
       HbaseInfo("NG_RT_COMPANY_STAFF", "F"),
       ElasticSearchInfo("winhc_rt_index_company_staff", "_doc"),
+      HologresInfo("ng_rt_company_staff", "public"),
       Seq("staff_name")
     )
   )

+ 20 - 0
src/main/scala/com/winhc/bigdata/flink/constant/HoloPutConst.scala

@@ -0,0 +1,20 @@
+package com.winhc.bigdata.flink.constant
+
+/**
+ * @author: π
+ * @date: 2021/10/21 14:39
+ */
+case class HoloPutConst(tn: String, needFields: Seq[String])
+
+object HoloPutConst {
+
+  private val seq = Seq(
+    HoloPutConst(tn = "company_holder", Seq("rowkey", "company_id", "deleted"))
+    ,HoloPutConst(tn = "company_staff", Seq("rowkey", "company_id", "deleted"))
+  )
+//
+//  def get_args_company_job(tn: String): args_company_job = {
+//    tab_md5_fields.find(p => tn.equals(p.tableName)).getOrElse(throw new NullPointerException("tn is not fount"))
+//  }
+}
+

+ 3 - 1
src/main/scala/com/winhc/bigdata/flink/event/MetaInfo.scala

@@ -6,7 +6,7 @@ package com.winhc.bigdata.flink.event
  */
 
 
-case class MetaInfo(company: Option[CompanyInfo], es: Option[ElasticSearchInfo], hbase: Option[HbaseInfo])
+case class MetaInfo(company: Option[CompanyInfo], es: Option[ElasticSearchInfo], hbase: Option[HbaseInfo], holo: Option[HologresInfo])
 
 case class CompanyInfo(companyId: String, companyName: String)
 
@@ -14,5 +14,7 @@ case class ElasticSearchInfo(_index: String, _type: String)
 
 case class HbaseInfo(table: String, cf: String)
 
+case class HologresInfo(table: String, shema: String)
+
 
 

+ 2 - 2
src/main/scala/com/winhc/bigdata/flink/func/HbaseAsyncFunction.scala

@@ -79,8 +79,8 @@ case class HbaseAsyncFunction() extends RichAsyncFunction[UpdateEntity, UpdateEn
           val gs = v.data.keys.map(key => new Get(key.getBytes)).toList.asJava
           val ht = connection.getTable(TableName.valueOf(BaseUtils.hbaseTableName(k))).asInstanceOf[HTable]
           val rs = ht.get(gs)
-          val rss = rs.map(_.toJson).map(parse(_).asInstanceOf[JObject])
-          val rowkeys = rs.map(r => new String(r.getRow))
+          val rss = rs.filter(r=>r!=null && !r.isEmpty).map(_.toJson).map(parse(_).asInstanceOf[JObject])
+          val rowkeys = rs.filter(r=>r!=null && !r.isEmpty).map(r => new String(r.getRow))
           val zrs=rowkeys zip rss toMap
 
           v.old=zrs

+ 1 - 0
src/main/scala/com/winhc/bigdata/flink/func/HbaseSinkFunction.scala

@@ -51,6 +51,7 @@ class HbaseSinkFunction() extends RichSinkFunction[PutCollection] with Checkpoin
         val (k, v) = t
         val table = connection.getTable(TableName.valueOf(k))
         table.put(SealedPut.toPuts(v.values()))
+        table.close()
       })
   }
 

+ 42 - 10
src/main/scala/com/winhc/bigdata/flink/jobs/TestJob1.scala

@@ -1,6 +1,6 @@
 package com.winhc.bigdata.flink.jobs
 
-import com.winhc.bigdata.flink.event.{ElasticSearchInfo, HbaseInfo, UpdateEntity}
+import com.winhc.bigdata.flink.event.{ElasticSearchInfo, HbaseInfo, HologresInfo, UpdateEntity}
 import com.winhc.bigdata.flink.func.{HbaseAsyncFunction, HbaseSinkFunction}
 import com.winhc.bigdata.flink.implicits._
 import com.winhc.bigdata.flink.java.constant.EnvConst
@@ -12,10 +12,9 @@ import org.apache.commons.lang3.StringUtils
 import org.apache.flink.api.common.eventtime.WatermarkStrategy
 import org.apache.flink.api.common.functions.AggregateFunction
 import org.apache.flink.api.scala._
-import org.apache.flink.configuration.Configuration
 import org.apache.flink.connector.kafka.source.KafkaSource
 import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.streaming.api.scala.{AsyncDataStream, StreamExecutionEnvironment}
+import org.apache.flink.streaming.api.scala.{AsyncDataStream, DataStream, StreamExecutionEnvironment}
 import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
 import org.apache.flink.streaming.api.windowing.time.Time
 import org.apache.flink.streaming.api.windowing.windows.TimeWindow
@@ -25,8 +24,8 @@ import org.json4s.JsonAST.{JNull, JObject, JValue}
 import org.json4s.{JNothing, JString}
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.TimeUnit
-
 import com.alibaba.dcm.DnsCacheManipulator
+import scala.collection.immutable
 
 
 /**
@@ -38,23 +37,25 @@ object TestJob1 {
     DnsCacheManipulator.loadDnsCacheConfig()
 
     val env = StreamExecutionEnvironment.getExecutionEnvironment
+    //env.getCheckpointConfig.setCheckpointTimeout(60000*5)
 
     env.getConfig.setGlobalJobParameters(EnvConst.createParameterTool(args))
     val kafkaSource: KafkaSource[String] = KafkaSourceBuilder.buildSourceFunction("flink_test")
 
     val source = env.fromSource(kafkaSource, WatermarkStrategy.forMonotonousTimestamps(), "Kafka Source")
 
-    val allSource = source.transform_and_validation()
+    val allSource: DataStream[UpdateEntity] = source.transform_and_validation().name("transform_and_validation")
     allSource.getSideOutput(OutputTags.ExceptionTag).map(e => e.toJson()).print()
     allSource.getSideOutput(OutputTags.TransformErrorTag).map(e => e.toJson()).print()
     allSource.getSideOutput(OutputTags.ValidationErrorTag).map(e => e.toJson()).print()
-    val asyncDataStream = AsyncDataStream.unorderedWait(allSource, new HbaseAsyncFunction, 10, TimeUnit.SECONDS)
+    //val asyncDataStream = AsyncDataStream.unorderedWait(allSource, new HbaseAsyncFunction, 10, TimeUnit.SECONDS)
 
-    val afterOutput = asyncDataStream.process(new ProcessFunction[UpdateEntity, (String, Map[String, JObject])] {
+    val afterOutput: DataStream[(String, Map[String, JObject])] = allSource.process(new ProcessFunction[UpdateEntity, (String, Map[String, JObject])] {
       override def processElement(ue: UpdateEntity, ctx: ProcessFunction[UpdateEntity, (String, Map[String, JObject])]#Context, out: Collector[(String, Map[String, JObject])]): Unit = {
         val company = ue.company
         if (company != null) {
           ctx.output(OutputTags.HBASE_SINK_TAG, ("NG_RT_COMPANY", "F", ue.companyId(), company))
+          ctx.output(OutputTags.HOLO_SINK_TAG, ("ng_rt_company", "public", ue.companyId(), company))
         }
         for (table <- ue.dims.values) {
           val meta = table.metaInfo
@@ -78,9 +79,18 @@ object TestJob1 {
               })
             case _ =>
           }*/
+          meta.holo match {
+            case Some(hi) =>
+              val HologresInfo(h0, h1) = hi
+              data.foreach(t => {
+                val (rowkey, jo) = t
+                ctx.output(OutputTags.HOLO_SINK_TAG, (h0, h1, rowkey, jo))
+              })
+            case _ =>
+          }
         }
       }
-    })
+    }).name("afterOutput")
     val hbaseSource = afterOutput.getSideOutput(OutputTags.HBASE_SINK_TAG)
     hbaseSource
       .map(t => {
@@ -106,7 +116,7 @@ object TestJob1 {
             (table, sp)
           case _ => throw new RuntimeException("流数据错误")
         }
-      })
+      }).name("hbaseMap")
       .keyBy(t => {
         val row = new String(t._2.getRowKey, StandardCharsets.UTF_8)
         s"${t._1}${StringUtils.left(row, 2)}"
@@ -133,7 +143,29 @@ object TestJob1 {
       }
     })
 
-      .addSink(new HbaseSinkFunction)
+      .addSink(new HbaseSinkFunction).name("HbaseSinkFunction")
+
+//    val holoSource: DataStream[(String, String, String, JObject)] = afterOutput.getSideOutput(OutputTags.HOLO_SINK_TAG)
+//    holoSource.map(t => {
+//      val (table, pb, row, value) = t
+//
+//      value match {
+//        case JObject(x) =>
+//          val x1: immutable.Seq[(String, JValue)] = x
+//          x.foreach(t => {
+//            val (k, v) = t
+//            val stringv = v match {
+//              case JNull => ""
+//              case JNothing => ""
+//              case x: JObject => x.toJson()
+//              case JString(x) => x
+//              case x: JValue => x.toString
+//            }
+//          })
+//          (table, sp)
+//        case _ => throw new RuntimeException("流数据错误")
+//      }
+//    })
 
     env.execute("flink start")
   }

+ 1 - 1
src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/OutputTags.scala

@@ -16,6 +16,6 @@ object OutputTags {
   val ValidationErrorTag = new OutputTag[Map[String, Seq[JObject]]]("validation")
   val HBASE_SINK_TAG=new OutputTag[(String,String,String,JObject)]("hbase")
   val ELASTICSEARCH_SINK_TAG=new OutputTag[(String,String,String,JObject)]("es")
-  val HOLO_SINK_TAG=new OutputTag[(String,JObject)]("holo")
+  val HOLO_SINK_TAG=new OutputTag[(String,String,String,JObject)]("holo")
 
 }

+ 8 - 5
src/main/scala/com/winhc/bigdata/flink/jobs/company_job_step/gs_01_kafka_transform_and_validation.scala

@@ -13,6 +13,7 @@ import org.json4s.jackson.JsonMethods._
 import org.slf4j.LoggerFactory
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 
 /**
  * @author ZhangJi
@@ -35,18 +36,20 @@ case class gs_01_kafka_transform_and_validation(source: DataStream[String]) {
               val meta = c.getMeta
               val data = c.getData
               val base = c.getCompany
-              val joMap = data.asScala
+              val joMap: Map[String, mutable.Buffer[JObject]] = data.asScala
                 .map(e => (e._1, e._2.asScala.map(fromJsonNode(_).asInstanceOf[JObject])))
                 .toMap
 
-              val validationResult = RowDataUtils.validateDims(joMap)
+/*              val validationResult = RowDataUtils.validateDims(joMap)
               val error = validationResult.error.filter(t => t._2.nonEmpty)
-              if (error.nonEmpty) ctx.output(ValidationErrorTag, error)
-              val dims = validationResult
-                .success
+              if (error.nonEmpty) ctx.output(ValidationErrorTag, error)*/
+//              val dims=joMap
+              val dims = joMap
+
                 .filter(t => t._2.nonEmpty)
                 .map(t => (t._1, RowBaseDimEntity.ofJObject(t, (meta.getCompanyId, meta.getCompanyName))))
               if (dims.nonEmpty) {
+
                 val entity = RowDataUtils.validateCompany(base) match {
                   case Right(c) => UpdateEntity(fromJsonNode(c).asInstanceOf[JObject], dims)
                   case Left(c) =>

+ 22 - 3
src/main/scala/com/winhc/bigdata/flink/utils/RowDataUtils.scala

@@ -206,7 +206,26 @@ object RowDataUtils {
           |  "type": ["null","object"],
           |  "properties": {
           |    "capital_actual": {
-          |      "type": ["null","string"]
+          |      "type": ["null","array"],
+          |      "items": [
+          |        {
+          |          "type": ["object"],
+          |          "properties": {
+          |            "amomon": {
+          |              "type": ["string"]
+          |            },
+          |            "paymet": {
+          |              "type": ["null","string"]
+          |            },
+          |            "time": {
+          |              "type": ["null","string"]
+          |            }
+          |          },
+          |          "required": [
+          |            "amomon"
+          |          ]
+          |        }
+          |      ]
           |    },
           |    "amount": {
           |      "type": ["null","number"]
@@ -355,9 +374,9 @@ object RowDataUtils {
   def generateMetaInfo(dim: String, companyId: String, companyName: String): MetaInfo = {
     val m = ArgsCompanyJob.job_args.get(dim).orNull
     if (m == null) {
-      MetaInfo(Some(CompanyInfo(companyId, companyName)), None, None)
+      MetaInfo(Some(CompanyInfo(companyId, companyName)), None, None, None)
     } else {
-      MetaInfo(Some(CompanyInfo(companyId, companyName)), Some(m.esInfo), Some(m.hbaseInfo))
+      MetaInfo(Some(CompanyInfo(companyId, companyName)), Some(m.esInfo), Some(m.hbaseInfo), Some(m.holoInfo))
     }
   }