Browse Source

更改日期

xufei 4 years ago
parent
commit
e786613096

+ 2 - 2
src/main/java/com/winhc/bean/NodeParam.java

@@ -25,8 +25,8 @@ public class NodeParam {
     public String toNodeParam(MergeParam mergeParam) {
         HashMap<String, String> map = new HashMap<>(param);
         map.put("bizdate", mergeParam.getBizdate().replace("-", ""));
-        map.put("beginDateTime", "20210128");
-        map.put("endDateTime", "20210129");
+        map.put("beginDateTime", mergeParam.getCurrentPartition());
+        map.put("endDateTime", mergeParam.getAfterPartition());
         return map.entrySet().stream()
                 .filter(e -> !e.getKey().startsWith("_"))
                 .filter(e -> StringUtils.isNotEmpty(e.getValue()))

+ 6 - 1
src/main/java/com/winhc/kafka/produce/KafkaProduce.java

@@ -7,6 +7,10 @@ import org.springframework.kafka.support.SendResult;
 import org.springframework.stereotype.Service;
 import org.springframework.util.concurrent.ListenableFuture;
 
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
 /**
  * @author π
  * @Description:
@@ -19,8 +23,9 @@ public class KafkaProduce {
 
     private final KafkaTemplate<String, String> kafkaTemplate;
 
-    public void produce(String topic, String message) {
+    public void produce(String topic, String message) throws InterruptedException, ExecutionException, TimeoutException {
         ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, message);
+        //send.get(10,TimeUnit.SECONDS);
         send.addCallback(o -> {
             //System.out.println("消息发送成功:" + message);
         }, throwable -> {

+ 5 - 3
src/main/java/com/winhc/task/PersonMergeIncremnetTask.java

@@ -30,6 +30,8 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeoutException;
 
 /**
  * @author π
@@ -59,7 +61,7 @@ public class PersonMergeIncremnetTask {
     //TODO 启动合并任务
     @Scheduled(cron = "00 10 00 * * ?")
     //@Scheduled(cron = "*/20 * * * * ?")
-    //@Scheduled(cron = "00 55 11 * * ?")
+    //@Scheduled(cron = "00 13 15 * * ?")
     public void mergePersonScheduled() throws UnsupportedEncodingException {
         log.info("start  mergePersonScheduled !");
         long start = System.currentTimeMillis();
@@ -77,7 +79,7 @@ public class PersonMergeIncremnetTask {
         log.info("mergePersonScheduled end | cost:{} | !", (System.currentTimeMillis() - start));
     }
 
-    private void sendKafka(MergeParam param) {
+    private void sendKafka(MergeParam param) throws InterruptedException, ExecutionException, TimeoutException {
         //加载文件发送kafka
         loadCSVSendKafka(param.getPathPre(), param.getMergePath(), param.getTopic(), "0");
         loadCSVSendKafka(param.getPathPre(), param.getDeletedPath(), param.getTopic(), "1");
@@ -159,7 +161,7 @@ public class PersonMergeIncremnetTask {
         }
     }
 
-    private void loadCSVSendKafka(String pre, String path, String topic, String flag) {
+    private void loadCSVSendKafka(String pre, String path, String topic, String flag) throws InterruptedException, ExecutionException, TimeoutException {
         log.info("loadCSVSendKafka | flag:{} | start !", flag);
         long start = System.currentTimeMillis();
         CsvData data = CsvUtil.getReader().read(FileUtil.file(pre + path));