Browse Source

first commit

xufei 4 years ago
commit
0626213e68
33 changed files with 2299 additions and 0 deletions
  1. 182 0
      pom.xml
  2. 26 0
      src/main/java/com/winhc/Application.java
  3. 17 0
      src/main/java/com/winhc/bean/Constant.java
  4. 27 0
      src/main/java/com/winhc/bean/DataWorksFlowJob.java
  5. 35 0
      src/main/java/com/winhc/bean/DataWorksFlowTask.java
  6. 78 0
      src/main/java/com/winhc/bean/DingMsg.java
  7. 50 0
      src/main/java/com/winhc/bean/Entry.java
  8. 35 0
      src/main/java/com/winhc/bean/MergeParam.java
  9. 41 0
      src/main/java/com/winhc/bean/NodeParam.java
  10. 43 0
      src/main/java/com/winhc/bean/TaskFlowEnum.java
  11. 28 0
      src/main/java/com/winhc/bean/TaskParam.java
  12. 23 0
      src/main/java/com/winhc/config/DataWorksAccessProperties.java
  13. 37 0
      src/main/java/com/winhc/config/DataWorksProjectConfiguration.java
  14. 111 0
      src/main/java/com/winhc/config/KafkaConfig.java
  15. 28 0
      src/main/java/com/winhc/config/Neo4jDriver.java
  16. 84 0
      src/main/java/com/winhc/config/OkHttpConfiguration.java
  17. 48 0
      src/main/java/com/winhc/config/OkHttpRetryInterceptor.java
  18. 33 0
      src/main/java/com/winhc/kafka/produce/KafkaProduce.java
  19. 112 0
      src/main/java/com/winhc/service/TouchService.java
  20. 256 0
      src/main/java/com/winhc/task/PersonMergeIncremnetTask.java
  21. 39 0
      src/main/java/com/winhc/utils/CompanyUtils.java
  22. 499 0
      src/main/java/com/winhc/utils/DateUtil.java
  23. 92 0
      src/main/java/com/winhc/utils/DingUtils.java
  24. 19 0
      src/main/java/com/winhc/utils/JsonUtils.java
  25. 42 0
      src/main/java/com/winhc/utils/OkHttpUtils.java
  26. 81 0
      src/main/java/com/winhc/utils/SchemaInit.java
  27. 2 0
      src/main/resources/META-INF/spring-devtools.properties
  28. 58 0
      src/main/resources/application-dev.properties
  29. 59 0
      src/main/resources/application-prd.properties
  30. 60 0
      src/main/resources/application.properties
  31. 4 0
      src/main/resources/dataworks-config.properties
  32. 41 0
      src/main/resources/logback-spring.xml
  33. 9 0
      src/main/resources/task.yaml

+ 182 - 0
pom.xml

@@ -0,0 +1,182 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.example</groupId>
+    <artifactId>neo4j-merge</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <version>2.2.6.RELEASE</version>
+        <relativePath />
+    </parent>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <java.version>1.8</java.version>
+        <start-class>com.winhc.Application</start-class>
+    </properties>
+
+    <dependencies>
+        <!-- 添加mysql驱动包 -->
+        <dependency>
+            <groupId> mysql</groupId>
+            <artifactId> mysql-connector-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.mybatis.spring.boot</groupId>
+            <artifactId>mybatis-spring-boot-starter</artifactId>
+            <version>1.1.1</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.springframework.boot</groupId>
+                    <artifactId>spring-boot-starter-tomcat</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-tomcat</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-devtools</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.70</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.4</version>
+        </dependency>
+
+        <!-- swagger依赖 -->
+        <dependency>
+            <groupId>io.springfox</groupId>
+            <artifactId>springfox-swagger2</artifactId>
+            <version>2.6.1</version>
+        </dependency>
+        <dependency>
+            <groupId>io.springfox</groupId>
+            <artifactId>springfox-swagger-ui</artifactId>
+            <version>2.6.1</version>
+        </dependency>
+
+        <!--         图形数据库Neo4j 官方支持的neo4j依赖包 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-neo4j</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.0.18</version>
+        </dependency>
+
+        <!--SpringBoot 使用 JDBCTemplate 引用下面这个包才会自动注入,不要引用上面按个-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-jdbc</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.6.2</version>
+        </dependency>
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+            <version>4.5.16</version>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-mongodb</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.dataformat</groupId>
+            <artifactId>jackson-dataformat-yaml</artifactId>
+            <version>2.9.5</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>aliyun-java-sdk-core</artifactId>
+            <version>4.0.3</version>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun</groupId>
+            <artifactId>aliyun-java-sdk-dataworks-public</artifactId>
+            <version>1.8.3</version>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>3.10.0</version>
+        </dependency>
+    </dependencies>
+
+    <dependencyManagement>
+        <dependencies>
+            <!--            <dependency>-->
+            <!--                <groupId>org.springframework.cloud</groupId>-->
+            <!--                <artifactId>spring-cloud-dependencies</artifactId>-->
+            <!--                <version>${spring-cloud.version}</version>-->
+            <!--                <type>pom</type>-->
+            <!--                <scope>import</scope>-->
+            <!--            </dependency>-->
+        </dependencies>
+    </dependencyManagement>
+
+    <build>
+        <finalName>neo4j-merge</finalName>
+
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <configuration>
+                    <fork>true</fork>
+                </configuration>
+            </plugin>
+
+
+        </plugins>
+    </build>
+
+
+</project>

+ 26 - 0
src/main/java/com/winhc/Application.java

@@ -0,0 +1,26 @@
+package com.winhc;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.boot.web.servlet.ServletComponentScan;
+import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+//import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
+
+@SpringBootApplication
+//@EnableDiscoveryClient
+@ServletComponentScan
+@EnableScheduling
+public class Application extends SpringBootServletInitializer {
+
+    @Override
+    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
+        return application.sources(Application.class);
+    }
+
+    public static void main(String[] args) {
+        SpringApplication.run(Application.class, args);
+    }
+}

+ 17 - 0
src/main/java/com/winhc/bean/Constant.java

@@ -0,0 +1,17 @@
+package com.winhc.bean;
+
+import lombok.Getter;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/1/28 19:29
+ */
+public interface Constant {
+    String flow = "inc_human_relation_merge";
+    String taskName = "inc_human_relation_merge_kafka";
+    String topic = "inc_merge_deleted_person";
+    String 新增 = "新增";
+    String 合并 = "合并";
+    String 删除 = "删除";
+}

+ 27 - 0
src/main/java/com/winhc/bean/DataWorksFlowJob.java

@@ -0,0 +1,27 @@
+package com.winhc.bean;
+
+import com.winhc.utils.JsonUtils;
+import lombok.*;
+
+import java.util.List;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/24 08:49
+ * @Description:
+ */
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class DataWorksFlowJob {
+    private String project;
+    private String flow;
+    private List<DataWorksFlowTask> task;
+
+    @Override
+    public String toString() {
+        return JsonUtils.jsonObjToString(this);
+    }
+}

+ 35 - 0
src/main/java/com/winhc/bean/DataWorksFlowTask.java

@@ -0,0 +1,35 @@
+package com.winhc.bean;
+
+import com.winhc.utils.JsonUtils;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/24 09:01
+ * @Description:
+ */
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class DataWorksFlowTask {
+    private String taskName;
+    private List<NodeParam> params;
+
+    public Map<String, String> toNodeParam(MergeParam param) {
+        return params.stream()
+                .collect(Collectors.toMap(NodeParam::getNodeId, n->n.toNodeParam(param), (o1, o2) -> o1));
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtils.jsonObjToString(this);
+    }
+}

+ 78 - 0
src/main/java/com/winhc/bean/DingMsg.java

@@ -0,0 +1,78 @@
+package com.winhc.bean;
+
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.commons.lang3.SystemUtils;
+
+import java.net.InetAddress;
+import java.time.LocalDateTime;
+import java.time.format.DateTimeFormatter;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/30 13:45
+ * @Description:
+ */
+@Getter
+@Setter
+@NoArgsConstructor
+@AllArgsConstructor
+public class DingMsg {
+    private static final DateTimeFormatter dtf = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+
+    public static String msg2Md(String msg) {
+        return "#### " + msg +
+                "\n\n" + "> 时间:" + LocalDateTime.now().format(dtf) +
+                "\n\n" + "> 系统信息:" + os();
+    }
+
+    public DingMsg(String msgLevel, String project, String flow, String task, String nodeName, String status) {
+        this.msgLevel = msgLevel;
+        this.project = project;
+        this.flow = flow;
+        this.task = task;
+        this.nodeName = nodeName;
+        this.status = status;
+        LocalDateTime date = LocalDateTime.now();
+        this.date = date.format(dtf);
+        this.sysInfo = os();
+    }
+
+    private String msgLevel;
+    private String project;
+    private String flow;
+    private String task;
+    private String nodeName;
+    private String status;
+    private String date;
+    private String sysInfo;
+
+    public String toMd() {
+        StringBuilder sb = new StringBuilder();
+        return sb.append("#### ").append(msgLevel)
+                .append("\n\n").append("位置:").append(project).append(":").append(flow).append(":").append(task)
+                .append("\n\n").append("节点:").append(nodeName)
+                .append("\n\n").append("状态:").append(status)
+                .append("\n\n").append("> 时间:").append(date)
+                .append("\n\n").append("> 系统信息:").append(sysInfo)
+                .toString();
+    }
+
+
+    private static String os() {
+        String osInfo = String.format(" %s , %s , %s", SystemUtils.OS_NAME, SystemUtils.OS_ARCH, SystemUtils.OS_VERSION);
+        try {
+            InetAddress address = InetAddress.getLocalHost();
+            String addressInfo = String.format("\n\n> address: %s \n\n> hostname: %s", address.getHostAddress(), address.getHostName());
+            osInfo += addressInfo;
+        } catch (Exception e) {
+        }
+        return osInfo;
+    }
+
+    public static void main(String[] args) {
+        System.out.println(os());
+    }
+}

+ 50 - 0
src/main/java/com/winhc/bean/Entry.java

@@ -0,0 +1,50 @@
+package com.winhc.bean;
+
+import com.winhc.utils.JsonUtils;
+import lombok.AllArgsConstructor;
+import lombok.Getter;
+import lombok.NoArgsConstructor;
+import lombok.Setter;
+import org.apache.commons.lang3.builder.EqualsBuilder;
+import org.apache.commons.lang3.builder.HashCodeBuilder;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/24 14:05
+ * @Description:
+ */
+@Getter
+@Setter
+@AllArgsConstructor
+@NoArgsConstructor
+public class Entry<K, V> {
+    private K key;
+    private V value;
+
+    @Override
+    public String toString() {
+        return JsonUtils.jsonObjToString(this);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+
+        if (o == null || getClass() != o.getClass()) return false;
+
+        Entry<?, ?> entry = (Entry<?, ?>) o;
+
+        return new EqualsBuilder()
+                .append(key, entry.key)
+                .append(value, entry.value)
+                .isEquals();
+    }
+
+    @Override
+    public int hashCode() {
+        return new HashCodeBuilder(17, 37)
+                .append(key)
+                .append(value)
+                .toHashCode();
+    }
+}

+ 35 - 0
src/main/java/com/winhc/bean/MergeParam.java

@@ -0,0 +1,35 @@
+package com.winhc.bean;
+
+import com.winhc.utils.JsonUtils;
+import lombok.*;
+
+import java.util.Map;
+
+/**
+ * @Author: π
+ * @Date: 2021/1/28
+ * @Description:
+ */
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class MergeParam {
+    private String flow;
+    private String taskName;
+    private String topic;
+    private String bizdate;
+    private String lastPartition;
+    private String currentPartition;
+    private String afterPartition;
+    private String before7DayPartition;
+    private String pathPre;
+    private String mergePath;
+    private String deletedPath;
+
+    @Override
+    public String toString() {
+        return JsonUtils.jsonObjToString(this);
+    }
+}

+ 41 - 0
src/main/java/com/winhc/bean/NodeParam.java

@@ -0,0 +1,41 @@
+package com.winhc.bean;
+
+import com.winhc.utils.JsonUtils;
+import lombok.*;
+import org.apache.commons.lang3.StringUtils;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/24 08:50
+ * @Description:
+ */
+@Setter
+@Getter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class NodeParam {
+    private String nodeId;
+    private Map<String, String> param;
+
+    public String toNodeParam(MergeParam mergeParam) {
+        HashMap<String, String> map = new HashMap<>(param);
+        map.put("bizdate", mergeParam.getBizdate().replace("-", ""));
+        map.put("beginDateTime", "20210128");
+        map.put("endDateTime", "20210129");
+        return map.entrySet().stream()
+                .filter(e -> !e.getKey().startsWith("_"))
+                .filter(e -> StringUtils.isNotEmpty(e.getValue()))
+                .map(e -> e.getKey() + "=" + e.getValue())
+                .collect(Collectors.joining(" "));
+    }
+
+    @Override
+    public String toString() {
+        return JsonUtils.jsonObjToString(this);
+    }
+}

+ 43 - 0
src/main/java/com/winhc/bean/TaskFlowEnum.java

@@ -0,0 +1,43 @@
+package com.winhc.bean;
+
+import lombok.Getter;
+
+import java.util.Arrays;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/23 10:58
+ * @Description:
+ */
+@Getter
+@SuppressWarnings("all")
+public enum TaskFlowEnum {
+    NOT_RUN(1, "未运行"),
+    WAIT_TIME(2, "等待时间"),
+    WAIT_RESOURCE(3, "等待资源"),
+    RUNNING(4, "运行中"),
+    FAILURE(5, "运行失败"),
+    SUCCESS(6, "运行成功"),
+    CHECKING(7, "校检中"),
+
+    ;
+    private final Integer code;
+    private final String msg;
+
+    TaskFlowEnum(int code, String msg) {
+        this.code = code;
+        this.msg = msg;
+    }
+
+    public static TaskFlowEnum getTaskFlowEnumByCode(Integer code) {
+        return Arrays.stream(TaskFlowEnum.values())
+                .filter(taskFlowEnum -> taskFlowEnum.getCode().equals(code))
+                .findFirst()
+                .orElse(null);
+    }
+
+    @Override
+    public String toString() {
+        return msg;
+    }
+}

+ 28 - 0
src/main/java/com/winhc/bean/TaskParam.java

@@ -0,0 +1,28 @@
+package com.winhc.bean;
+
+import com.winhc.utils.JsonUtils;
+import lombok.*;
+
+import java.util.Map;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/22 14:45
+ * @Description:
+ */
+@Getter
+@Setter
+@Builder
+@NoArgsConstructor
+@AllArgsConstructor
+public class TaskParam {
+    private String projectName;
+    private String flowName;
+    private String bizDate;
+    private Map<String, String> nodeParam;
+
+    @Override
+    public String toString() {
+        return JsonUtils.jsonObjToString(this);
+    }
+}

+ 23 - 0
src/main/java/com/winhc/config/DataWorksAccessProperties.java

@@ -0,0 +1,23 @@
+package com.winhc.config;
+
+import lombok.Data;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.context.annotation.PropertySource;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/8 15:56
+ * @Description:
+ */
+@Data
+//@PropertySource("classpath:dataworks-config.properties")
+@Configuration
+public class DataWorksAccessProperties {
+    @Value("${access-key-id}")
+    private String accessKeyId;
+    @Value("${access-key-secret}")
+    private String accessKeySecret;
+    @Value("${region-id}")
+    private String regionId;
+}

+ 37 - 0
src/main/java/com/winhc/config/DataWorksProjectConfiguration.java

@@ -0,0 +1,37 @@
+package com.winhc.config;
+
+import com.aliyuncs.DefaultAcsClient;
+import com.aliyuncs.IAcsClient;
+import com.aliyuncs.exceptions.ClientException;
+import com.aliyuncs.profile.DefaultProfile;
+import lombok.Data;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/8 15:41
+ * @Description:
+ */
+@Configuration
+@Data
+@Slf4j
+public class DataWorksProjectConfiguration {
+    @Autowired
+    private DataWorksAccessProperties dataWorksAccessProperties;
+
+    @Bean
+    public IAcsClient client() throws ClientException {
+        DefaultProfile.addEndpoint(
+                "winhc-eci",
+                "cn-shanghai",
+                "dataworks-public",
+                "dataworks." + dataWorksAccessProperties.getRegionId() + ".aliyuncs.com");
+        DefaultProfile profile = DefaultProfile.getProfile(dataWorksAccessProperties.getRegionId(), dataWorksAccessProperties.getAccessKeyId(), dataWorksAccessProperties.getAccessKeySecret());
+        IAcsClient client = new DefaultAcsClient(profile);
+        return client;
+    }
+}

+ 111 - 0
src/main/java/com/winhc/config/KafkaConfig.java

@@ -0,0 +1,111 @@
+package com.winhc.config;
+
+import com.google.common.collect.Maps;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2020/12/24 10:06
+ */
+@Configuration
+@EnableKafka
+public class KafkaConfig {
+    @Value("${spring.kafka.bootstrap-servers}")
+    private String bootstrapServers;
+
+    @Value("${spring.kafka.consumer.enable-auto-commit}")
+    private Boolean autoCommit;
+
+    @Value("${spring.kafka.producer.retries}")
+    private Integer retries;
+
+    @Value("${spring.kafka.producer.batch-size}")
+    private Integer batchSize;
+
+    @Value("${spring.kafka.producer.buffer-memory}")
+    private Integer bufferMemory;
+
+    /**
+     * 生产者配置信息
+     */
+    @Bean
+    public Map<String, Object> producerConfigs() {
+        Map<String, Object> props = Maps.newHashMap();
+        props.put(ProducerConfig.ACKS_CONFIG, "-1");
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.RETRIES_CONFIG, retries);
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
+        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
+        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "3000");
+        return props;
+    }
+
+    /**
+     * 生产者工厂
+     */
+    @Bean
+    public ProducerFactory<String, String> producerFactory() {
+        return new DefaultKafkaProducerFactory<>(producerConfigs());
+    }
+
+    /**
+     * 生产者模板
+     */
+    @Bean
+    public KafkaTemplate<String, String> kafkaTemplate() {
+        return new KafkaTemplate<>(producerFactory());
+    }
+
+    @Bean("containerFactory")
+    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
+        ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<>();
+        container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps()));
+        // 设置并发量,小于或等于Topic的分区数
+        container.setConcurrency(1);
+        // 拉取超时时间
+        //container.getContainerProperties().setPollTimeout(1500);
+        // 设置为批量监听
+        container.setBatchListener(true);
+        // 设置提交偏移量的方式
+        //container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
+        return container;
+    }
+
+    private Map<String, Object> consumerProps() {
+        Map<String, Object> props = new HashMap<>(8);
+        // kafka服务地址
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        // 设置是否自动提交
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
+        // 一次拉取消息数量
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2000);
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
+        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
+        // 最大处理时间
+        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
+        // 序列化
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        return props;
+    }
+
+}

+ 28 - 0
src/main/java/com/winhc/config/Neo4jDriver.java

@@ -0,0 +1,28 @@
+package com.winhc.config;
+
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class Neo4jDriver {
+
+    @Value("${spring.data.neo4j.uri}")
+    private String uri;
+    @Value("${spring.data.neo4j.username}")
+    private String username;
+    @Value("${spring.data.neo4j.password}")
+    private String password;
+
+    @Bean
+    public Driver init() {
+        Config config = Config.builder().withConnectionTimeout(100, TimeUnit.SECONDS).withMaxTransactionRetryTime(10, TimeUnit.SECONDS).build();
+        return GraphDatabase.driver(uri, AuthTokens.basic(username, password), config);
+    }
+}

+ 84 - 0
src/main/java/com/winhc/config/OkHttpConfiguration.java

@@ -0,0 +1,84 @@
+package com.winhc.config;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.ConnectionPool;
+import okhttp3.OkHttpClient;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import javax.net.ssl.SSLContext;
+import javax.net.ssl.SSLSocketFactory;
+import javax.net.ssl.TrustManager;
+import javax.net.ssl.X509TrustManager;
+import java.security.KeyManagementException;
+import java.security.NoSuchAlgorithmException;
+import java.security.SecureRandom;
+import java.security.cert.CertificateException;
+import java.security.cert.X509Certificate;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/28 14:37
+ * @Description:
+ */
+@Slf4j
+@Configuration
+public class OkHttpConfiguration {
+
+    @Bean
+    public X509TrustManager x509TrustManager() {
+        return new X509TrustManager() {
+            @Override
+            public void checkClientTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+            }
+
+            @Override
+            public void checkServerTrusted(X509Certificate[] x509Certificates, String s) throws CertificateException {
+            }
+
+            @Override
+            public X509Certificate[] getAcceptedIssuers() {
+                return new X509Certificate[0];
+            }
+        };
+    }
+
+    @Bean
+    public SSLSocketFactory sslSocketFactory() {
+        try {
+            //信任任何链接
+            SSLContext sslContext = SSLContext.getInstance("TLS");
+            sslContext.init(null, new TrustManager[]{x509TrustManager()}, new SecureRandom());
+            return sslContext.getSocketFactory();
+        } catch (NoSuchAlgorithmException e) {
+            e.printStackTrace();
+        } catch (KeyManagementException e) {
+            e.printStackTrace();
+        }
+        return null;
+    }
+
+    /**
+     * Create a new connection pool with tuning parameters appropriate for a single-user application.
+     * The tuning parameters in this pool are subject to change in future OkHttp releases. Currently
+     */
+    @Bean
+    public ConnectionPool pool() {
+        return new ConnectionPool(200, 5, TimeUnit.MINUTES);
+    }
+
+    @Bean
+    public OkHttpClient okHttpClient() {
+        return new OkHttpClient.Builder()
+                .sslSocketFactory(sslSocketFactory(), x509TrustManager())
+                .addInterceptor(new OkHttpRetryInterceptor(3))
+                //是否开启缓存
+                .retryOnConnectionFailure(false)
+                .connectionPool(pool())
+                .connectTimeout(10L, TimeUnit.SECONDS)
+                .readTimeout(10L, TimeUnit.SECONDS)
+                .build();
+    }
+
+}

+ 48 - 0
src/main/java/com/winhc/config/OkHttpRetryInterceptor.java

@@ -0,0 +1,48 @@
+package com.winhc.config;
+
+import lombok.extern.slf4j.Slf4j;
+import okhttp3.Interceptor;
+import okhttp3.Request;
+import okhttp3.Response;
+
+import java.io.IOException;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/28 14:38
+ * @Description:
+ */
+@Slf4j
+public class OkHttpRetryInterceptor implements Interceptor {
+    //最大重试次数
+    public int maxRetry;
+    //假如设置为3次重试的话,则最大可能请求4次(默认1次+3次重试)
+
+    public OkHttpRetryInterceptor(int maxRetry) {
+        this.maxRetry = maxRetry;
+    }
+
+    @Override
+    public Response intercept(Chain chain) throws IOException {
+        Request request = chain.request();
+        int retryNum = 0;
+        while (true) {
+            try {
+                Response response = chain.proceed(request);
+                while (!response.isSuccessful() && retryNum < maxRetry) {
+                    retryNum++;
+                    log.error("response code : {} ok http retry:{} request url: {} ,response body:{}", response.code(), retryNum, request.url(), response.body().string());
+                    response.close();
+                    response = chain.proceed(request);
+                }
+                return response;
+            } catch (Exception e) {
+                log.error(e.getMessage(), e);
+                log.error("ok http retry:{} request url: {} ", retryNum, request.url());
+                if (retryNum++ > maxRetry) {
+                    throw e;
+                }
+            }
+        }
+    }
+}

+ 33 - 0
src/main/java/com/winhc/kafka/produce/KafkaProduce.java

@@ -0,0 +1,33 @@
+package com.winhc.kafka.produce;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.stereotype.Service;
+import org.springframework.util.concurrent.ListenableFuture;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2020/12/24 10:03
+ */
+@Slf4j
+@Service
+@AllArgsConstructor
+public class KafkaProduce {
+
+    private final KafkaTemplate<String, String> kafkaTemplate;
+
+    public void produce(String topic, String message) {
+        ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, message);
+        send.addCallback(o -> {
+            //System.out.println("消息发送成功:" + message);
+        }, throwable -> {
+            log.error("KafkaProduce send error | message:{} | error message:{} | end !", message, throwable.getMessage());
+            //System.out.println("消息发送失败:" + message);
+        });
+    }
+
+
+}

+ 112 - 0
src/main/java/com/winhc/service/TouchService.java

@@ -0,0 +1,112 @@
+package com.winhc.service;
+
+import com.aliyuncs.IAcsClient;
+import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagRequest;
+import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
+import com.aliyuncs.dataworks_public.model.v20180601.SearchManualDagNodeInstanceRequest;
+import com.aliyuncs.dataworks_public.model.v20180601.SearchManualDagNodeInstanceResponse;
+import com.aliyuncs.exceptions.ClientException;
+import com.aliyuncs.http.ProtocolType;
+import com.winhc.bean.TaskFlowEnum;
+import com.winhc.bean.TaskParam;
+import com.winhc.config.DataWorksAccessProperties;
+import com.winhc.utils.JsonUtils;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.stereotype.Service;
+
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/22 11:28
+ * @Description:
+ */
+@Slf4j
+@Service
+public class TouchService {
+    @Autowired
+    private DataWorksAccessProperties dataWorksAccessProperties;
+    @Autowired
+    private IAcsClient client;
+
+    public CreateManualDagResponse touch(String projectName
+            , String flowName, String bizDate) throws ClientException {
+        log.info("触发任务:{}.{} {}", projectName, flowName, bizDate);
+        CreateManualDagRequest request = new CreateManualDagRequest();
+        request.setProjectName(projectName);
+        request.setFlowName(flowName);
+        request.setBizdate(bizDate + " 00:00:00");
+        request.setRegionId(dataWorksAccessProperties.getRegionId());
+        request.setProtocol(ProtocolType.HTTP);
+        CreateManualDagResponse response = client.getAcsResponse(request);
+        log.info("\n任务结果:\n\trequestId:{},\n\treturnCode:{},\n\treturnErrorSolution:{},\n\treturnMessage:{},\n\treturnValue:{}",
+                response.getRequestId()
+                , response.getReturnCode()
+                , response.getReturnErrorSolution()
+                , response.getReturnMessage()
+                , response.getReturnValue());
+        return response;
+    }
+
+    public CreateManualDagResponse triggerWithParam(String projectName
+            , String flowName, String bizDate, String dagPara, String nodePara) throws ClientException {
+        log.info("触发任务:{}.{} {}", projectName, flowName, bizDate);
+        CreateManualDagRequest request = new CreateManualDagRequest();
+        request.setProjectName(projectName);
+        request.setFlowName(flowName);
+        request.setBizdate(bizDate + " 00:00:00");
+        request.setRegionId(dataWorksAccessProperties.getRegionId());
+        request.setProtocol(ProtocolType.HTTP);
+        request.setDagPara(dagPara);
+        request.setNodePara(nodePara);
+        CreateManualDagResponse response = client.getAcsResponse(request);
+        log.info("\n任务结果:\n\trequestId:{},\n\treturnCode:{},\n\treturnErrorSolution:{},\n\treturnMessage:{},\n\treturnValue:{}",
+                response.getRequestId()
+                , response.getReturnCode()
+                , response.getReturnErrorSolution()
+                , response.getReturnMessage()
+                , response.getReturnValue());
+        return response;
+    }
+
+    public CreateManualDagResponse touch(TaskParam taskParam) throws ClientException {
+        log.info("触发任务:{}", taskParam.toString());
+        CreateManualDagRequest request = new CreateManualDagRequest();
+        request.setProjectName(taskParam.getProjectName());
+        request.setFlowName(taskParam.getFlowName());
+        request.setBizdate(taskParam.getBizDate() + " 00:00:00");
+        request.setRegionId(dataWorksAccessProperties.getRegionId());
+        request.setProtocol(ProtocolType.HTTP);
+        request.setNodePara(JsonUtils.jsonObjToString(taskParam.getNodeParam()));
+        CreateManualDagResponse response = client.getAcsResponse(request);
+        log.info("\n任务结果:\n\trequestId:{},\n\treturnCode:{},\n\treturnErrorSolution:{},\n\treturnMessage:{},\n\treturnValue:{}",
+                response.getRequestId()
+                , response.getReturnCode()
+                , response.getReturnErrorSolution()
+                , response.getReturnMessage()
+                , response.getReturnValue());
+        return response;
+    }
+
+
+    public Map<String, TaskFlowEnum> query(String projectName, Long dagId) throws ClientException {
+        SearchManualDagNodeInstanceRequest searchNodeInstanceListRequest
+                = new SearchManualDagNodeInstanceRequest();
+        searchNodeInstanceListRequest.setDagId(dagId);
+        searchNodeInstanceListRequest.setProjectName(projectName);
+        searchNodeInstanceListRequest.setProtocol(ProtocolType.HTTP);
+        SearchManualDagNodeInstanceResponse searchResponse = client
+                .getAcsResponse(searchNodeInstanceListRequest);
+        java.util.List<SearchManualDagNodeInstanceResponse.NodeInsInfo> nodeInsfos = searchResponse.getData();
+        for (SearchManualDagNodeInstanceResponse.NodeInsInfo nodeInsInfo : nodeInsfos) {
+            TaskFlowEnum code = TaskFlowEnum.getTaskFlowEnumByCode(nodeInsInfo.getStatus());
+            log.info("{}:{} {}", nodeInsInfo.getNodeName(), nodeInsInfo.getStatus(), code);
+        }
+        return nodeInsfos.stream()
+                .collect(Collectors.toMap(SearchManualDagNodeInstanceResponse.NodeInsInfo::getNodeName, node -> TaskFlowEnum.getTaskFlowEnumByCode(node.getStatus()), (o1, o2) -> o1));
+    }
+
+}

+ 256 - 0
src/main/java/com/winhc/task/PersonMergeIncremnetTask.java

@@ -0,0 +1,256 @@
+package com.winhc.task;
+
+
+import cn.hutool.core.io.FileUtil;
+import cn.hutool.core.text.csv.CsvData;
+import cn.hutool.core.text.csv.CsvRow;
+import cn.hutool.core.text.csv.CsvUtil;
+import com.alibaba.fastjson.JSON;
+import com.aliyuncs.dataworks_public.model.v20180601.CreateManualDagResponse;
+import com.aliyuncs.exceptions.ClientException;
+import com.winhc.bean.*;
+import com.winhc.kafka.produce.KafkaProduce;
+import com.winhc.service.TouchService;
+import com.winhc.utils.CompanyUtils;
+import com.winhc.utils.DateUtil;
+import com.winhc.utils.DingUtils;
+import com.winhc.utils.SchemaInit;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Session;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.jdbc.core.JdbcTemplate;
+import org.springframework.scheduling.annotation.EnableScheduling;
+import org.springframework.scheduling.annotation.Scheduled;
+import org.springframework.stereotype.Component;
+import java.io.IOException;
+import java.io.UnsupportedEncodingException;
+import java.util.Date;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @date 2021年1月27日
+ * @des 合并人员
+ */
+@Component
+@Slf4j
+@EnableScheduling
+@AllArgsConstructor
+public class PersonMergeIncremnetTask {
+
+    @Autowired
+    JdbcTemplate jdbcTemplate;
+
+    private final Driver driver;
+
+    @Autowired
+    KafkaProduce kafkaProduce;
+
+    @Autowired
+    TouchService touchService;
+
+    @Autowired
+    DingUtils dingUtils;
+
+    //TODO 启动合并任务
+    @Scheduled(cron = "00 10 00 * * ?")
+    //@Scheduled(cron = "*/20 * * * * ?")
+    //@Scheduled(cron = "00 55 11 * * ?")
+    public void mergePersonScheduled() throws UnsupportedEncodingException {
+        log.info("start  mergePersonScheduled !");
+        long start = System.currentTimeMillis();
+        try {
+            MergeParam param = initParams();
+            mergeAndExport(param);
+            sendKafka(param);
+            startJob(param);
+        } catch (Exception e) {
+            log.error("mergePersonScheduled error | message:{} | .", e.getMessage());
+            if (!CompanyUtils.isWindows()) {
+                dingUtils.send("mergePersonScheduled job run error : \n" + e.getMessage() + "\n!!!!!!! ");
+            }
+        }
+        log.info("mergePersonScheduled end | cost:{} | !", (System.currentTimeMillis() - start));
+    }
+
+    private void sendKafka(MergeParam param) {
+        //加载文件发送kafka
+        loadCSVSendKafka(param.getPathPre(), param.getMergePath(), param.getTopic(), "0");
+        loadCSVSendKafka(param.getPathPre(), param.getDeletedPath(), param.getTopic(), "1");
+    }
+
+    private void mergeAndExport(MergeParam param) {
+        Session session = driver.session();
+        //合并逻辑
+        mergePerson(param.getLastPartition(), session);
+        //导出合并csv
+        exportMergePerson2CSV(param.getLastPartition(), session, param.getMergePath());
+        //导出删除csv
+        exportDeletedPerson2CSV(param.getLastPartition(), session, param.getDeletedPath());
+        //TODO 删除七天前标签(新增)
+        deletedPersonLabel(Constant.新增, param, session);
+        deletedPersonLabel(Constant.合并, param, session);
+        deletedPersonLabel(Constant.删除, param, session);
+        session.close();
+    }
+
+    public MergeParam initParams() {
+        String bizdate = DateUtil.getDateBefore(-1);
+        String currentPartition = DateUtil.getDateBefore(0).replace("-", "");
+        String afterPartition = DateUtil.getDateBefore(1).replace("-", "");
+        String before7DayPartition = DateUtil.getDateBefore(-7).replace("-", "");
+        String lastPartition = bizdate.replace("-", "");
+        String pathPre = "D:\\data\\opt\\";
+        if (!CompanyUtils.isWindows()) {
+            pathPre = "/data/opt/";
+        }
+        final String mergePath = "export/merge-person-" + lastPartition + ".csv";
+        final String deletedPath = "export/deleted-person-" + lastPartition + ".csv";
+        MergeParam param = MergeParam.builder()
+                .flow(Constant.flow)
+                .taskName(Constant.taskName)
+                .topic(Constant.topic)
+                .bizdate(bizdate)
+                .lastPartition(lastPartition)
+                .currentPartition(currentPartition)
+                .afterPartition(afterPartition)
+                .before7DayPartition(before7DayPartition)
+                .pathPre(pathPre)
+                .mergePath(mergePath)
+                .deletedPath(deletedPath)
+                .build();
+        log.info("show params : {} !", param.toString());
+        return param;
+    }
+
+    public void startJob(MergeParam param) throws InterruptedException, ClientException, IOException {
+        log.info("startJob start !");
+        long start = System.currentTimeMillis();
+        DataWorksFlowJob dataWorksFlowJob = SchemaInit.getJobs().stream().filter(j -> j.getFlow().equals(param.getFlow())).findFirst().orElseThrow(NullPointerException::new);
+        DataWorksFlowTask dataWorksFlowTask = dataWorksFlowJob.getTask().stream().filter(t -> t.getTaskName().equals(param.getTaskName())).findFirst().orElseThrow(NullPointerException::new);
+        TaskParam build = TaskParam.builder()
+                .projectName(dataWorksFlowJob.getProject())
+                .bizDate(param.getBizdate())
+                .flowName(dataWorksFlowJob.getFlow())
+                .nodeParam(dataWorksFlowTask.toNodeParam(param))
+                .build();
+        CreateManualDagResponse touch = touchService.touch(build);
+        if (touch == null) {
+            return;
+        }
+        while (true) {
+            Map<String, TaskFlowEnum> query = touchService.query(dataWorksFlowJob.getProject(), touch.getReturnValue());
+            long count = query.values()
+                    .stream()
+                    .filter(e -> !(TaskFlowEnum.SUCCESS.equals(e) || TaskFlowEnum.FAILURE.equals(e)))
+                    .count();
+
+            if (count != 0) {
+                Thread.sleep(10000);
+            } else {
+                log.info("startJob | cost:{} | end !", (System.currentTimeMillis() - start));
+                log.info("startJob end !");
+                return;
+            }
+        }
+    }
+
+    private void loadCSVSendKafka(String pre, String path, String topic, String flag) {
+        log.info("loadCSVSendKafka | flag:{} | start !", flag);
+        long start = System.currentTimeMillis();
+        CsvData data = CsvUtil.getReader().read(FileUtil.file(pre + path));
+        int i = 0;
+        for (CsvRow csvRow : data.getRows()) {
+            ++i;
+            if (i == 1) continue;
+            Map<String, String> map = new HashMap<>();
+            List<String> list = csvRow.getRawList();
+            if (list.size() < 2) continue;
+            String time = DateUtil.formatDate_YYYY_MM_DD_HH_MM_SS(new Date());
+            map.put("create_time", time);
+            map.put("update_time", time);
+            map.put("person_id", list.get(0).replaceAll("\"\"", "\""));
+            map.put("person_name", list.get(1).replaceAll("\"\"", "\""));
+            if ("0".equals(flag)) {//合并
+                map.put("company_id", list.get(2).replaceAll("\"\"", "\""));
+                map.put("deleted", "0");
+            } else {//删除
+                map.put("company_id", "");
+                map.put("deleted", "1");
+            }
+            String message = JSON.toJSONString(map);
+            kafkaProduce.produce(topic, message);
+        }
+        log.info("loadCSVSendKafka | flag:{} | size:{} | cost:{} | end !", flag, i - 1, (System.currentTimeMillis() - start));
+    }
+
+    private void exportDeletedPerson2CSV(String date, Session session, String DELETED_NAME_PATH) {
+        log.info("exportDeletedPerson2CSV start!");
+        long start = System.currentTimeMillis();
+        final String cql3 = "CALL apoc.export.csv.query('MATCH (person:删除" + date + ")\n" +
+                "RETURN person.person_id as person_id,person.name as person_name', \n" +
+                "'" + DELETED_NAME_PATH + "', \n" +
+                "{batchSize:10000,parallel:false,retries:3,iterateList:true})\n" +
+                "YIELD file,rows";
+        log.info("cql3 : \n {} ", cql3);
+        String res3 = CompanyUtils.runNeo4j(session, cql3);
+        log.info("exportDeletedPerson2CSV | cost:{} | result:{} | end !", (System.currentTimeMillis() - start), res3);
+    }
+
+    private void exportMergePerson2CSV(String date, Session session, String MERGE_NAME_PATH) {
+        log.info("exportMergePerson2CSV start!");
+        long start = System.currentTimeMillis();
+        final String cql2 = "CALL apoc.export.csv.query('MATCH (person:合并" + date + ")-[r]-(company:企业) \n" +
+                "RETURN person.person_id as person_id,person.name as person_name,company.company_id  as company_id', \n" +
+                "'" + MERGE_NAME_PATH + "', \n" +
+                "{batchSize:10000,parallel:false,retries:3,iterateList:true}) \n" +
+                "YIELD file,rows";
+        log.info("cql2 : \n {} ", cql2);
+        String res2 = CompanyUtils.runNeo4j(session, cql2);
+        log.info("exportMergePerson2CSV | cost:{} | result:{} | end !", (System.currentTimeMillis() - start), res2);
+    }
+
+    private void mergePerson(String date, Session session) {
+        log.info("mergePerson start!");
+        long start = System.currentTimeMillis();
+        final String cql1 = "CALL apoc.periodic.iterate( \n" +
+                "'MATCH (p:新增" + date + ")-[:投资*1..3]-(q:个人) \n" +
+                "WHERE p.name=q.name AND ID(p)<>ID(q) \n" +
+                "WITH p.name AS nn,max(ID(p)) as m_id \n" +
+                "MATCH (p:个人)-[:投资*1..3]-(q:个人) \n" +
+                "WHERE ID(p) = m_id AND p.name=q.name AND ID(p)<>ID(q) \n" +
+                "WITH p,q \n" +
+                "MATCH (q)-[r]-(x) \n" +
+                "WHERE x<>p \n" +
+                "RETURN p,q,r,x', \n" +
+                "'CALL apoc.merge.relationship(p, TYPE(r), {},{}, x) YIELD rel \n" +
+                "SET rel = r \n" +
+                "SET p:合并" + date + " \n" +
+                "SET q:删除" + date + " \n" +
+                "DELETE r', \n" +
+                "{batchSize:10000,parallel:false,retries:3,iterateList:true}\n" +
+                ") YIELD batches, total";
+        log.info("cql1 : \n {} ", cql1);
+        String res1 = CompanyUtils.runNeo4j(session, cql1);
+        log.info("mergePerson | cost:{} | result:{} | end !", (System.currentTimeMillis() - start), res1);
+    }
+
+    private void deletedPersonLabel(String labelPre, MergeParam param, Session session) {
+        String label = labelPre + param.getBefore7DayPartition();
+        log.info("deletedPersonLabel | label:{} | start!", label);
+        long start = System.currentTimeMillis();
+        final String cql4 = "CALL apoc.periodic.iterate( \n" +
+                "'MATCH (m:" + label + ") \n" +
+                "RETURN m', \n" +
+                "'REMOVE m:" + label + "', \n" +
+                "{batchSize:10000,parallel:false,retries:3,iterateList:true} \n" +
+                ") YIELD batches, total";
+        log.info("cql4 : \n {} ", cql4);
+        String res4 = CompanyUtils.runNeo4j(session, cql4);
+        log.info("deletedPersonLabel | label:{} | cost:{} | result:{} | end !", label, (System.currentTimeMillis() - start), res4);
+    }
+}

+ 39 - 0
src/main/java/com/winhc/utils/CompanyUtils.java

@@ -0,0 +1,39 @@
+package com.winhc.utils;
+
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.neo4j.driver.Value;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Properties;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/1/11 21:31
+ */
+public class CompanyUtils {
+
+    public static String runNeo4j(Session session, String cql) {
+        String data = session.writeTransaction(tx -> {
+            Result result = tx.run(cql);
+            String r1 = result.list().get(0).values().toString();
+            return r1;
+        });
+        return data;
+    }
+
+    public static Boolean isWindows() {
+        Properties props = System.getProperties();
+        if (props.containsKey("os.name") && props.get("os.name").toString().contains("Windows")) {
+            return true;
+        }
+        return false;
+    }
+
+    public static void main(String[] args) {
+        System.out.println(CompanyUtils.isWindows());
+    }
+
+}

+ 499 - 0
src/main/java/com/winhc/utils/DateUtil.java

@@ -0,0 +1,499 @@
+package com.winhc.utils;
+
+
+import org.apache.commons.lang3.StringUtils;
+
+import java.text.ParseException;
+import java.text.SimpleDateFormat;
+import java.util.Calendar;
+import java.util.Date;
+
+/**
+ * 日期工具类
+ * 
+ */
+public class DateUtil {
+
+    /** yyyy-MM-dd */
+    public static final String FORMAT_YYYY_MM_DD = "yyyy-MM-dd";
+
+    /** yyyyMMdd */
+    public static final String FORMAT_YYYYMMDD = "yyyyMMdd";
+
+    /** yyyyMM */
+    public static final String FORMAT_YYYYMM = "yyyyMM";
+
+    /** yyMMdd */
+    public static final String FORMAT_YYMMDD = "yyMMdd";
+
+    /** yyyy-MM-dd HH:mm:ss */
+    public static final String FORMAT_YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
+
+    /** yyyy-MM-dd HH:mm */
+    public static final String FORMAT_YYYY_MM_DD_HH_MM = "yyyy-MM-dd HH:mm";
+
+    /** yyyyMMddHHmmss */
+    public static final String FORMAT_YYYYMMDDHHMMSS = "yyyyMMddHHmmss";
+
+    /** HH:mm:ss */
+    public static final String FORMAT_HH_MM_SS = "HH:mm:ss";
+
+    /** HHmmss */
+    public static final String FORMAT_HHMMSS = "HHmmss";
+
+    /**
+     * 转换为2017年01月26日
+     * 
+     * @param dateString 20170126
+     * @return
+     */
+    public static String convertToYearMoth(String dateString) {
+        String s_nd = dateString.substring(0, 4); // 年份
+        String s_yf = dateString.substring(4, 6); // 月份
+        String s_rq = dateString.substring(6, 8); // 日期
+        String targetDateString = s_nd + "年" + s_yf + "月" + s_rq + "日";
+        return targetDateString;
+    }
+
+    /**
+     * 2017-12-08
+     * 
+     * @param dateString
+     * @return
+     */
+    public static String convertToYearMothDate(String dateString) {
+        String s_nd = dateString.substring(0, 4); // 年份
+        String s_yf = dateString.substring(4, 6); // 月份
+        String s_rq = dateString.substring(6, 8); // 日期
+        String targetDateString = s_nd + "-" + s_yf + "-" + s_rq;
+        return targetDateString;
+    }
+
+    /**
+     * 11:04:23
+     * 
+     * @param dateString
+     * @return
+     */
+    public static String convertToHHMMSS(String dateString) {
+        String s_nd = dateString.substring(0, 2); // 时间
+        String s_yf = dateString.substring(2, 4); // 分钟
+        String s_rq = dateString.substring(4, 6); // 秒
+        String targetDateString = s_nd + ":" + s_yf + ":" + s_rq;
+        return targetDateString;
+    }
+
+    /**
+     * 字符串 转 日期
+     * 
+     * @param dateString 日期字符串
+     * @param pattern 格式化样式
+     * @return 日期对象
+     */
+    public static final Date parseDate(String dateString, String pattern) {
+        Date date = null;
+        if (StringUtils.isEmpty(dateString))
+            return null;
+        try {
+            SimpleDateFormat sdf = new SimpleDateFormat(pattern);
+            date = sdf.parse(dateString);
+        } catch (Exception e) {
+            // do nothing
+        }
+        return date;
+    }
+
+
+    /**
+     * 日期对象格式化
+     * 
+     * @param date 日期对象
+     * @param pattern 格式化样式
+     * @return 字符串日期
+     */
+    public static final String formatDate(Date date, String pattern) {
+        String v = null;
+        try {
+            if (date == null)
+                return null;
+            SimpleDateFormat dateFormat = new SimpleDateFormat(pattern);
+            v = dateFormat.format(date);
+        } catch (Exception e) {
+            // do nothing
+        }
+        return v;
+    }
+
+    public static final String formatDate_YYYYMMDD(Date date) {
+        String v = "";
+        try {
+            if (date == null)
+                return v;
+            SimpleDateFormat dateFormat = new SimpleDateFormat(FORMAT_YYYYMMDD);
+            v = dateFormat.format(date);
+        } catch (Exception e) {
+            // do nothing
+        }
+        return v;
+    }
+
+    public static final String formatDate_YYYY_MM_DD(Date date) {
+        String v = null;
+        try {
+            if (date == null)
+                return null;
+            SimpleDateFormat dateFormat = new SimpleDateFormat(FORMAT_YYYY_MM_DD);
+            v = dateFormat.format(date);
+        } catch (Exception e) {
+            // do nothing
+        }
+        return v;
+    }
+
+    public static final String formatDate_HHMMSS(Date date) {
+        String v = null;
+        try {
+            if (date == null)
+                return null;
+            SimpleDateFormat dateFormat = new SimpleDateFormat(FORMAT_HHMMSS);
+            v = dateFormat.format(date);
+        } catch (Exception e) {
+            // do nothing
+        }
+        return v;
+    }
+
+    public static final String formatDate_YYYY_MM_DD_HH_MM_SS(Date date) {
+        String v = null;
+        try {
+            if (date == null)
+                return null;
+            SimpleDateFormat dateFormat = new SimpleDateFormat(FORMAT_YYYY_MM_DD_HH_MM_SS);
+            v = dateFormat.format(date);
+        } catch (Exception e) {
+            // do nothing
+        }
+        return v;
+    }
+
+    /**
+     * 日期字符串之间的类型转换.
+     * <p>
+     * 例如:convertDate("2012-01-02", "yyyy-MM-dd", "yyyy/mm/dd")返回2012/01/02
+     * </p>
+     * 
+     * @param source 待处理的日期字符串
+     * @param sformat 原来的格式
+     * @param dformat 新的格式
+     * @return 转换后的日期字符串
+     */
+    public static String convertDate(String source, String sformat, String dformat) {
+        // 参数检查
+        if (StringUtils.isEmpty(source) || StringUtils.isEmpty(sformat) || StringUtils.isEmpty(dformat))
+            return source;
+        // 开始转换
+        String newString = formatDate(parseDate(source, sformat), dformat);
+        // 如果转换失败返回原始字符串
+        return (newString == null) ? source : newString;
+    }
+
+    /**
+     * 获得当前日期字符串,格式为 yyyyMM
+     * 
+     * @return 当前日期字符串(yyyyMM)
+     */
+
+    public static String getCurrDate_YYYYMMDD() {
+        return formatDate(new Date(), FORMAT_YYYYMMDD);
+    }
+
+    /**
+     * 获得当前日期字符串,格式为 yyyyMMdd
+     * 
+     * @return 当前日期字符串(yyyyMMdd)
+     */
+    public static String getCurrDate_YYYYMM() {
+        return formatDate(new Date(), FORMAT_YYYYMM);
+    }
+
+    /**
+     * 获得当前日期字符串,格式为 HHmmss
+     * 
+     * @return 当前日期字符串(HHmmss)
+     */
+    public static String getCurrDate_HHMMSS() {
+        return formatDate(new Date(), FORMAT_HHMMSS);
+    }
+
+    /**
+     * 获得当前日期字符串,格式为 yyyy-MM-dd
+     * 
+     * @return 当前日期字符串(yyyy-MM-dd)
+     */
+    public static String getCurrDate_YYYY_MM_DD() {
+        return formatDate(new Date(), FORMAT_YYYY_MM_DD);
+    }
+
+    /**
+     * 获得当前日期字符串,格式为 yyyy-MM-dd HH:mm:ss
+     * 
+     * @return 当前日期字符串(yyyy-MM-dd HH:mm:ss)
+     */
+    public static String getCurrDate_YYYY_MM_DD_HH_MM_SS() {
+        return formatDate(new Date(), FORMAT_YYYY_MM_DD_HH_MM_SS);
+    }
+
+    /**
+     * 获得当前日期字符串,格式为 yyyy-MM-dd HH:mm
+     * 
+     * @return 当前日期字符串(yyyy-MM-dd HH:mm)
+     */
+    public static String getCurrDate_YYYY_MM_DD_HH_MM() {
+        return formatDate(new Date(), FORMAT_YYYY_MM_DD_HH_MM);
+    }
+
+    /**
+     * 获得当前日期字符串,格式为 yyyyMMddHHmmss
+     * 
+     * @return 当前日期字符串(yyyyMMddHHmmss)
+     */
+    public static String getCurrTime_YYYYMMDDHHMMSS() {
+        return formatDate(new Date(), FORMAT_YYYYMMDDHHMMSS);
+    }
+
+    /**
+     * 判断是否是日期
+     * 
+     * @return dateString 日期字符串
+     * @param pattern 格式化样式(yyyyMMddHHmmss)
+     * @return 是日期返回true, 否则返回false
+     */
+    public static boolean isDateString(String dateString, String pattern) {
+        boolean v = false;
+        try {
+            if (StringUtils.isNotEmpty(dateString)) {
+                SimpleDateFormat dateFormat = new SimpleDateFormat(pattern);
+                Date d = dateFormat.parse(dateString);
+                if (d != null) {
+                    v = true;
+                }
+            }
+        } catch (Exception e) {
+            // do nothing
+        }
+        return v;
+    }
+
+    /**
+     * 判断是否是日期
+     * 
+     * @return dateString 日期字符串
+     * @param pattern 格式化样式(yyyyMMddHHmmss)
+     * @return 不是日期返回true, 否则返回false
+     */
+    public static boolean isNotDateString(String dateString, String pattern) {
+        return !isDateString(dateString, pattern);
+    }
+
+    /**
+     * 获取上个月的月份
+     * 
+     * @return dateString 日期字符串
+     * @return 不是日期返回true, 否则返回false
+     */
+    public static String getLastMonth() {
+        Calendar c = Calendar.getInstance();
+        c.add(Calendar.MONTH, -1);
+        SimpleDateFormat format = new SimpleDateFormat("MM");
+        return format.format(c.getTime());
+    }
+
+    /**
+     * 获取上个月的年月份
+     * 
+     * @return dateString 日期字符串
+     * @return 不是日期返回true, 否则返回false
+     */
+    public static String getLastYearMonth() {
+        Calendar c = Calendar.getInstance();
+        c.add(Calendar.MONTH, -1);
+        SimpleDateFormat format = new SimpleDateFormat("yyyyMM");
+        return format.format(c.getTime());
+    }
+
+    /**
+     * 获取上个月的年
+     * 
+     * @return dateString 日期字符串
+     * @return 不是日期返回true, 否则返回false
+     */
+    public static String getYearByLastMonth() {
+        Calendar c = Calendar.getInstance();
+        c.add(Calendar.MONTH, -1);
+        SimpleDateFormat format = new SimpleDateFormat("yyyy");
+        return format.format(c.getTime());
+    }
+
+    public final static String getBetweenStr(Date beginDate, Date endDate) {
+        if (beginDate == null || endDate == null) {
+            return null;
+        }
+        long time = endDate.getTime() - beginDate.getTime();
+        long minute = (time / (60 * 1000));
+        long second = (time / 1000 - minute * 60);
+        if (time % 1000 > 0)
+            second += 1L;
+        return minute + "\u5206" + second + "\u79d2";
+    }
+
+    /**
+     * 计算日期
+     * 
+     * @param date
+     * @param yearNum
+     * @param monthNum
+     * @param dateNum
+     * @return
+     */
+    public static String calDate(String date, int yearNum, int monthNum, int dateNum) {
+        String result = "";
+        try {
+            SimpleDateFormat sd = new SimpleDateFormat(FORMAT_YYYY_MM_DD);
+            Calendar cal = Calendar.getInstance();
+            cal.setTime(sd.parse(date));
+            cal.add(Calendar.MONTH, monthNum);
+            cal.add(Calendar.YEAR, yearNum);
+            cal.add(Calendar.DATE, dateNum);
+            result = sd.format(cal.getTime());
+        } catch (Exception e) {
+            e.printStackTrace();
+        }
+        return result;
+    }
+
+    
+
+    public static Date getOnlyDate(Date date) {
+        Calendar calendar = Calendar.getInstance();
+        calendar.setTime(date);
+        calendar.set(Calendar.HOUR_OF_DAY, 0);
+        calendar.set(Calendar.MINUTE, 0);
+        calendar.set(Calendar.SECOND, 0);
+        calendar.set(Calendar.MILLISECOND, 0);
+        return calendar.getTime();
+    }
+
+
+  
+  
+    /**
+     * 计算时间差值,单位为天 去除时间后的计算结果+1,例如20180410
+     * 
+     * @return
+     * @throws Exception
+     */
+    public static long diffDays(Date startDate, Date endDate) throws Exception {
+        long time = removeTime(endDate).getTime();
+        long time2 = removeTime(startDate).getTime();
+        Long t = time - time2;
+        Long day = t / (24 * 60 * 60 * 1000);
+        if (day == 0L) {
+            day = day + 1;
+        }
+        return day;
+
+    }
+
+    private static Date removeTime(Date nowDate) throws ParseException {
+        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd");
+        String s = sdf.format(nowDate);
+        Date date = sdf.parse(s); // 这样就是 yyyy-MM-dd
+        return date;
+    }
+
+    /**
+     * 获取该日期几天后的日期
+     * 
+     * @return
+     */
+    public static Date getDateAfter(Date date, int dayNum) {
+        Calendar now = Calendar.getInstance();
+        now.setTime(date);
+        now.add(Calendar.DATE, dayNum);
+
+        return now.getTime();
+    }
+
+    /**
+     * 
+     * 描述:获取一月后的日期.
+     * 
+     * @return yyyyMMdd
+     */
+    public static String getDateAfterOneMonth() {
+        Calendar cal = Calendar.getInstance();
+        cal.add(Calendar.MONTH, 1);
+        return formatDate(cal.getTime(), FORMAT_YYYYMMDD);
+    }
+
+    /**
+     * 
+     * 获取一周后的日期
+     * 
+     * @return yyyyMMdd
+     */
+    public static String getDateAfterOneWeek() {
+        Calendar cal = Calendar.getInstance();
+        cal.add(Calendar.DATE, 7);
+
+        return formatDate(cal.getTime(), FORMAT_YYYYMMDD);
+    }
+
+    /**
+     * 
+     * 获取dayNum天后的日期
+     * 
+     * @return yyyyMMdd
+     */
+    public static String getDateAfterDayNum(Integer dayNum) {
+        Calendar cal = Calendar.getInstance();
+        cal.add(Calendar.DATE, dayNum);
+
+        return formatDate(cal.getTime(), FORMAT_YYYYMMDD);
+    }
+
+    /**
+     *
+     * 获取day前数据
+     *
+     * @return yyyyMMdd
+     */
+    public static String getDateBefore(Integer dayNum) {
+        Calendar cal = Calendar.getInstance();
+        cal.add(Calendar.DATE, dayNum);
+
+        return formatDate(cal.getTime(), FORMAT_YYYY_MM_DD);
+    }
+
+    /**
+     * 获取N分钟以后的日期,格式: yyyyMMddHHmmsss
+     * 
+     * @param minuts
+     * @return
+     */
+    public static String getDateAfterMinutes(Integer minuts, Date date) {
+        SimpleDateFormat FORMAT2 = new SimpleDateFormat("yyyyMMddHHmmss");
+
+        Calendar can = Calendar.getInstance();
+        System.out.println(FORMAT2.format(date.getTime()));
+        can.add(Calendar.MINUTE, +minuts);
+
+        return FORMAT2.format(can.getTime());
+
+    }
+
+    public static void main(String[] args) throws Exception {
+        System.out.println(formatDate_YYYYMMDD(new Date()));
+    }
+
+}

+ 92 - 0
src/main/java/com/winhc/utils/DingUtils.java

@@ -0,0 +1,92 @@
+package com.winhc.utils;
+
+import cn.hutool.crypto.digest.HMac;
+import cn.hutool.crypto.digest.HmacAlgorithm;
+import com.winhc.bean.DingMsg;
+import com.winhc.bean.Entry;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.codec.binary.Base64;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.stereotype.Component;
+
+import java.io.UnsupportedEncodingException;
+import java.net.URLEncoder;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/28 14:39
+ * @Description:
+ */
+@Slf4j
+@Component
+public class DingUtils {
+    @Autowired
+    private OkHttpUtils client;
+    private static final String URL = "https://oapi.dingtalk.com/robot/send?access_token=2773b742b74d84599c4f05f7b42cacd6714b10c33cd4c74402649019fa7e56c8";
+    @Value("${ding-secret}")
+    private String dingSecret;
+
+    public boolean send(DingMsg msg) throws UnsupportedEncodingException {
+        return sendByBody(getMdBody(msg));
+    }
+
+    public boolean send(String msg) throws UnsupportedEncodingException {
+        return sendByBody(getMdBody(msg.replace("\\", "\\\\")));
+    }
+
+    public boolean send(String msg, String at) throws UnsupportedEncodingException {
+        String a = " ,\"at\": {\"atMobiles\": [\n" +
+                "              \"" + at + "\"\n" +
+                "          ],\n" +
+                "          \"isAtAll\": true\n" +
+                "      }}";
+        String mdBody = getMdBody( msg.replace("\\", "\\\\"));
+        mdBody = mdBody.substring(0, mdBody.length() - 1) + a;
+        return sendByBody(mdBody);
+    }
+
+    private boolean sendByBody(String body) throws UnsupportedEncodingException {
+        Entry<Long, String> sign = getSign();
+        String query = "&timestamp=" + sign.getKey() + "&sign=" + sign.getValue();
+        try {
+            String post = client.post(URL + query, body);
+            log.info(post);
+            return post.contains("ok");
+        } catch (Exception e) {
+            log.error(e.getMessage(), e);
+            return false;
+        }
+    }
+
+
+    private static String getMdBody(String msg) {
+        return String.format("{\"msgtype\":\"markdown\",\"markdown\":{\"title\":\"%s\",\"text\":\"%s\"}}", "任务通知", DingMsg.msg2Md(msg));
+    }
+
+    private static String getMdBody(DingMsg msg) {
+        return String.format("{\"msgtype\":\"markdown\",\"markdown\":{\"title\":\"%s\",\"text\":\"%s\"}}", msg.getMsgLevel(), msg.toMd());
+    }
+
+    private static String getMsgBody(String msg) {
+        return "{\"msgtype\": \"text\",\"text\": {\"content\": \"" + msg + "\"}}";
+    }
+
+    private Entry<Long, String> getSign() throws UnsupportedEncodingException {
+        Long timestamp = System.currentTimeMillis();
+        String stringToSign = timestamp + "\n" + dingSecret;
+
+
+        byte[] key = dingSecret.getBytes("UTF-8");
+        HMac mac = new HMac(HmacAlgorithm.HmacSHA256, key);
+        byte[] signData = mac.digest(stringToSign);
+
+        String sign = URLEncoder.encode(new String(Base64.encodeBase64(signData)), "UTF-8");
+        Entry<Long, String> entry = new Entry<>();
+        entry.setKey(timestamp);
+        entry.setValue(sign);
+        return entry;
+    }
+
+}

+ 19 - 0
src/main/java/com/winhc/utils/JsonUtils.java

@@ -0,0 +1,19 @@
+package com.winhc.utils;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/22 14:18
+ * @Description:
+ */
+public class JsonUtils {
+    public static String jsonObjToString(Object jsonObject) {
+        return JSONObject.toJSONString(jsonObject, SerializerFeature.WriteMapNullValue);
+    }
+
+    public static String jsonObjToStringNotWriteMapNull(Object jsonObject) {
+        return JSONObject.toJSONString(jsonObject, SerializerFeature.IgnoreNonFieldGetter);
+    }
+}

+ 42 - 0
src/main/java/com/winhc/utils/OkHttpUtils.java

@@ -0,0 +1,42 @@
+package com.winhc.utils;
+
+import lombok.AllArgsConstructor;
+import okhttp3.*;
+import org.springframework.stereotype.Component;
+
+import java.io.IOException;
+import java.util.Objects;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/8/7 11:54
+ * @Description:
+ */
+@Component
+@AllArgsConstructor
+public class OkHttpUtils {
+    private OkHttpClient client;
+    private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
+
+    public String post(String url, String json) throws IOException {
+        RequestBody body = RequestBody.create(JSON, json);
+        try (Response response = client.newCall((new Request.Builder()).url(url).post(body).build()).execute()) {
+            return parseBody(response);
+        } catch (Exception e) {
+            e.printStackTrace();
+            return "";
+        }
+    }
+
+    public String get(String url) throws IOException {
+        try (Response response = client.newCall(new Request.Builder().url(url).get().build()).execute()) {
+            return parseBody(response);
+        }
+    }
+
+    private static String parseBody(Response response) throws IOException {
+        return response == null ? "" : response.isSuccessful() ? Objects.requireNonNull(response.body()).string() : "";
+    }
+
+
+}

+ 81 - 0
src/main/java/com/winhc/utils/SchemaInit.java

@@ -0,0 +1,81 @@
+package com.winhc.utils;
+
+import com.winhc.bean.DataWorksFlowJob;
+import com.winhc.bean.DataWorksFlowTask;
+import com.winhc.bean.NodeParam;
+import lombok.SneakyThrows;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.*;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @Author: XuJiakai
+ * @Date: 2020/6/23 14:23
+ * @Description:
+ */
+@SuppressWarnings("all")
+public class SchemaInit {
+
+    private static final String defaultFileName = "task.yaml";
+
+    public static List<DataWorksFlowJob> getJobs() throws IOException {
+        String path = "/data/opt/neo4j-merge/task.yaml";
+        if (CompanyUtils.isWindows()){
+            path = Object.class.getResource("/").getPath().substring(1) + defaultFileName;
+        }
+        return parseJobs(path);
+    }
+
+    public static List<DataWorksFlowJob> getJobs(String fileName) throws FileNotFoundException {
+        return parseJobs(fileName);
+    }
+
+    private static List<DataWorksFlowJob> parseJobs(String path) throws FileNotFoundException {
+        Yaml yml = new Yaml();
+        Reader reader = new FileReader(new File(path));
+        Map map = yml.loadAs(reader, Map.class);
+
+        List j = ((List) map.get("job"));
+        List<DataWorksFlowJob> jobs = (List<DataWorksFlowJob>) j
+                .stream()
+
+                .map(m -> {
+                    String project = ((String) ((Map<String, Object>) m).get("project"));
+                    String flow = ((String) ((Map<String, Object>) m).get("flow"));
+                    List<DataWorksFlowTask> collect1 = (List<DataWorksFlowTask>) ((List) ((Map<String, Object>) m).get("task"))
+                            .stream()
+                            .map(t -> {
+                                String taskName = ((String) ((Map<String, Object>) t).get("taskName"));
+                                List<Map<String, Object>> ll = ((List<Map<String, Object>>) ((Map<String, Object>) t).get("param"));
+                                List<NodeParam> nodeParamList = ll.stream()
+                                        .map(mm -> {
+                                            String nodeId = String.valueOf(mm.remove("_nodeId"));
+                                            Map<String, String> collect = mm.entrySet().stream().collect(Collectors.toMap(Map.Entry::getKey, e -> String.valueOf(e.getValue()), (o1, o2) -> o1));
+                                            return NodeParam.builder().nodeId(nodeId).param(collect).build();
+                                        }).collect(Collectors.toList());
+                                return new DataWorksFlowTask(taskName, nodeParamList);
+                            })
+                            .collect(Collectors.toList());
+                    DataWorksFlowJob build = DataWorksFlowJob.builder()
+                            .project(project)
+                            .flow(flow)
+                            .task(collect1)
+                            .build();
+                    return build;
+                })
+                .collect(Collectors.toList());
+
+        List<DataWorksFlowJob> collect = jobs.stream().collect(Collectors.groupingBy(m -> m.getFlow() + m.getProject())).values().stream().map(list -> new DataWorksFlowJob(list.get(0).getProject(), list.get(0).getFlow(), list.stream().flatMap(l -> l.getTask().stream()).collect(Collectors.toList()))).collect(Collectors.toList());
+        return collect;
+    }
+
+    public static void main(String[] args) throws IOException {
+        List<DataWorksFlowJob> list = new SchemaInit().getJobs();
+        System.out.println(list);
+    }
+}

+ 2 - 0
src/main/resources/META-INF/spring-devtools.properties

@@ -0,0 +1,2 @@
+restart.include.mapper=/mapper-[\\w-\\.]+jar
+restart.include.pagehelper=/pagehelper-[\\w-\\.]+jar

+ 58 - 0
src/main/resources/application-dev.properties

@@ -0,0 +1,58 @@
+#eureka.client.serviceUrl.defaultZone= http://106.14.81.247:8900/eureka/
+
+#Neo4j配置
+spring.data.neo4j.username=neo4j
+spring.data.neo4j.password=neo4j168
+#spring.data.neo4j.uri=bolt://127.0.0.1:7687
+
+#爬虫外网
+#spring.data.neo4j.uri=bolt://47.100.177.224:7687
+
+#生产外网
+spring.data.neo4j.uri=bolt://139.196.165.100:7687
+
+
+#spring.datasource.url = jdbc:mysql://47.100.20.161:3306/prism1?useUnicode=true&characterEncoding=utf-8
+#spring.datasource.username = firefly
+#spring.datasource.password = firefly
+
+scheduling.enabled = true
+
+
+
+#============== kafka ===================
+# 指定kafka 代理地址,可以多个
+spring.kafka.bootstrap-servers=47.100.177.224:9092
+#topic
+spring.kafka.topic_node_relation_union=inc_node_relation_union
+
+#spring.kafka.bootstrap-servers=192.168.4.237:9092,192.168.4.235:9092,192.168.4.236:9092
+#spring.kafka.topic=xf_test
+#=============== provider  =======================
+spring.kafka.producer.retries=3
+# 每次批量发送消息的数量
+spring.kafka.producer.batch-size=16384
+spring.kafka.producer.buffer-memory=33554432
+
+# 指定消息key和消息体的编解码方式
+spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
+
+#=============== consumer  =======================
+# 指定默认消费者group id
+spring.kafka.consumer.group-id=neo4j_node_relation
+
+spring.kafka.consumer.auto-offset-reset=earliest
+# 取消自动提交
+spring.kafka.consumer.enable-auto-commit=true
+spring.kafka.consumer.auto-commit-interval=100
+
+# 指定消息key和消息体的编解码方式
+spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+# 手动提交
+#spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
+
+
+#mongo
+spring.data.mongodb.uri=mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641601-pub.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642555-pub.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997

+ 59 - 0
src/main/resources/application-prd.properties

@@ -0,0 +1,59 @@
+#eureka.client.serviceUrl.defaultZone= http://192.168.1.1:8900/eureka/
+
+#Neo4j配置
+spring.data.neo4j.username=neo4j
+spring.data.neo4j.password=neo4j168
+
+#爬虫
+#spring.data.neo4j.uri=bolt://192.168.2.56:7687
+
+#neo4j_prd
+spring.data.neo4j.uri=bolt://192.168.2.57:7687
+
+
+#spring.datasource.url = jdbc:mysql://rm-uf61r3m23ba1p5z3dfo.mysql.rds.aliyuncs.com:3306/prism1?useUnicode=true&characterEncoding=utf-8
+
+#内网地址
+#spring.datasource.url = jdbc:mysql://rm-uf61r3m23ba1p5z3d.mysql.rds.aliyuncs.com:3306/prism1?useUnicode=true&characterEncoding=utf-8
+#spring.datasource.username = wenshu
+#spring.datasource.password = wenshu_168
+
+scheduling.enabled = true
+
+
+#============== kafka ===================
+# 指定kafka 代理地址,可以多个
+#prod
+spring.kafka.bootstrap-servers=192.168.4.237:9092,192.168.4.235:9092,192.168.4.236:9092
+
+#topic
+spring.kafka.topic_node_relation_union=inc_node_relation_union
+
+#=============== provider  =======================
+spring.kafka.producer.retries=3
+# 每次批量发送消息的数量
+spring.kafka.producer.batch-size=16384
+spring.kafka.producer.buffer-memory=33554432
+
+# 指定消息key和消息体的编解码方式
+spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
+
+#=============== consumer  =======================
+# 指定默认消费者group id
+spring.kafka.consumer.group-id=neo4j_node_relation
+
+spring.kafka.consumer.auto-offset-reset=earliest
+# 取消自动提交
+spring.kafka.consumer.enable-auto-commit=true
+spring.kafka.consumer.auto-commit-interval=100
+
+# 指定消息key和消息体的编解码方式
+spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+# 手动提交
+#spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE
+
+#mongo
+#spring.data.mongodb.uri=mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641601-pub.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642555-pub.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997&maxIdleTimeMS=3000
+spring.data.mongodb.uri=mongodb://itslaw:itslaw_168@dds-uf6ff5dfd9aef3641.mongodb.rds.aliyuncs.com:3717,dds-uf6ff5dfd9aef3642.mongodb.rds.aliyuncs.com:3717/itslaw?replicaSet=mgset-6501997

+ 60 - 0
src/main/resources/application.properties

@@ -0,0 +1,60 @@
+spring.profiles.active=prd
+
+###server
+server.port=9099
+logging.level.root=DEBUG
+
+spring.application.name=neo4j-merge
+#eureka.instance.instance-id=${spring.cloud.client.ipAddress}:${server.port}
+
+#\u8BBE\u7F6Espring-boot \u7F16\u7801\u683C\u5F0F
+banner.charset=UTF-8
+server.tomcat.uri-encoding=UTF-8
+spring.http.encoding.charset=UTF-8
+spring.http.encoding.enabled=true
+spring.http.encoding.force=true
+
+logging.file= ./logs/my.log
+
+
+spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
+spring.jackson.time-123456
+
+eureka.instance.preferIpAddress=true
+spring.messages.encoding=UTF-8
+
+spring.datasource.url = jdbc:mysql://rm-uf61r3m23ba1p5z3dfo.mysql.rds.aliyuncs.com:3306/prism1?useUnicode=true&characterEncoding=utf-8
+spring.datasource.username = wenshu
+spring.datasource.password = wenshu_168
+
+
+spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
+spring.datasource.driverClassName = com.mysql.jdbc.Driver
+# 下面为连接池的补充设置,应用到上面所有数据源中
+# 初始化大小,最小,最大
+spring.datasource.initialSize=5
+spring.datasource.minIdle=5
+spring.datasource.maxActive=20
+# 配置获取连接等待超时的时间
+spring.datasource.maxWait=60000
+# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 
+spring.datasource.timeBetweenEvictionRunsMillis=60000
+# 配置一个连接在池中最小生存的时间,单位是毫秒 
+spring.datasource.minEvictableIdleTimeMillis=300000
+spring.datasource.validationQuery=SELECT 1 FROM DUAL
+spring.datasource.testWhileIdle=true
+spring.datasource.testOnBorrow=false
+spring.datasource.testOnReturn=false
+# 打开PSCache,并且指定每个连接上PSCache的大小 
+spring.datasource.poolPreparedStatements=true
+spring.datasource.maxPoolPreparedStatementPerConnectionSize=20
+# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙 
+spring.datasource.filters=stat,wall,log4j
+# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
+spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000
+
+
+ding-secret=SECe7b26876f443e77f872b8b10880e39b3c5dfaf44855f1aa3235372bb73698ab6
+access-key-id:LTAI4FynxS5nNuKyZ3LHhMX5
+access-key-secret:r6gWoySXC8kSK4qnfKRxEuWJ5uHIiE
+region-id:cn-shanghai

+ 4 - 0
src/main/resources/dataworks-config.properties

@@ -0,0 +1,4 @@
+access-key-id:LTAI4FynxS5nNuKyZ3LHhMX5
+access-key-secret:r6gWoySXC8kSK4qnfKRxEuWJ5uHIiE
+region-id:cn-shanghai
+ding-secret:SECe7b26876f443e77f872b8b10880e39b3c5dfaf44855f1aa3235372bb73698ab6

+ 41 - 0
src/main/resources/logback-spring.xml

@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+  <!-- 控制台打印日志的相关配置 --> 
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> 
+    <!-- 日志格式 -->
+    <encoder>
+        <pattern>%d{yyyy-MM-dd HH:mm:ss} [%level] - %m%n</pattern>
+    </encoder>
+    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+         <level>info</level>
+     </filter>
+
+  </appender>
+
+  <!-- 文件保存日志的相关配置 --> 
+  <appender name="ERROR-OUT" class="ch.qos.logback.core.rolling.RollingFileAppender">
+      <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+          <level>info</level>
+      </filter>
+     <!-- 保存日志文件的路径 -->
+    <file>logs/info.log</file>
+    <!-- 日志格式 -->
+    <encoder>
+        <pattern>%d{yyyy-MM-dd HH:mm:ss} [%level] [%class:%line] - %m%n</pattern>
+    </encoder>
+
+    <!-- 循环政策:基于时间创建日志文件 -->
+    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+      <!-- 日志文件名格式 -->
+      <fileNamePattern>logs/info.%d{yyyy-MM-dd}.log</fileNamePattern>
+      <!-- 最大保存时间:30天-->
+      <maxHistory>30</maxHistory>
+    </rollingPolicy>
+  </appender>
+
+  <!-- 基于dubug处理日志:具体控制台或者文件对日志级别的处理还要看所在appender配置的filter,如果没有配置filter,则使用root配置 -->
+  <root level="debug">
+    <appender-ref ref="STDOUT" />
+    <appender-ref ref="ERROR-OUT" />
+  </root>
+</configuration>

+ 9 - 0
src/main/resources/task.yaml

@@ -0,0 +1,9 @@
+job:
+  #human表数据回流
+  - project: winhc_ng
+    flow: inc_human_relation_merge
+    task:
+      - taskName: inc_human_relation_merge_kafka
+        param:
+          - _nodeId: 700004102657
+            project: winhc_ng