Browse Source

first commit

xufei 4 years ago
commit
86e426dda8
38 changed files with 2475 additions and 0 deletions
  1. 170 0
      .gitignore
  2. 181 0
      pom.xml
  3. 25 0
      src/main/java/com/winhc/Application.java
  4. 35 0
      src/main/java/com/winhc/common/enums/CompanyEnum.java
  5. 40 0
      src/main/java/com/winhc/config/DemoNeo4j.java
  6. 116 0
      src/main/java/com/winhc/config/KafkaConfig.java
  7. 71 0
      src/main/java/com/winhc/config/Neo4jConfig.java
  8. 28 0
      src/main/java/com/winhc/config/Neo4jDriver.java
  9. 28 0
      src/main/java/com/winhc/dto/CompanyQueryCondition.java
  10. 33 0
      src/main/java/com/winhc/entity/Company.java
  11. 42 0
      src/main/java/com/winhc/entity/HolderCompanyRelation.java
  12. 41 0
      src/main/java/com/winhc/entity/HolderPersonRelation.java
  13. 42 0
      src/main/java/com/winhc/entity/LegalRelation.java
  14. 28 0
      src/main/java/com/winhc/entity/Person.java
  15. 41 0
      src/main/java/com/winhc/entity/StaffRelation.java
  16. 134 0
      src/main/java/com/winhc/kafka/KafkaConsumer.java
  17. 49 0
      src/main/java/com/winhc/kafka/KafkaProduce.java
  18. 103 0
      src/main/java/com/winhc/kafka/consumer/KafkaConsumerNeo4j.java
  19. 60 0
      src/main/java/com/winhc/kafka/consumer/KafkaConsumerNeo4jV2.java
  20. 30 0
      src/main/java/com/winhc/repository/CompanyRelationRepository.java
  21. 17 0
      src/main/java/com/winhc/repository/CompanyRepository.java
  22. 15 0
      src/main/java/com/winhc/repository/PersonRepository.java
  23. 103 0
      src/main/java/com/winhc/service/CompanyRelationService.java
  24. 13 0
      src/main/java/com/winhc/service/RelationService.java
  25. 49 0
      src/main/java/com/winhc/service/impl/CompanyNodeServiceImpl.java
  26. 52 0
      src/main/java/com/winhc/service/impl/HolderRelationV1ServiceImpl.java
  27. 53 0
      src/main/java/com/winhc/service/impl/HolderRelationV2ServiceImpl.java
  28. 53 0
      src/main/java/com/winhc/service/impl/LegalEntityRelationV1ServiceImpl.java
  29. 53 0
      src/main/java/com/winhc/service/impl/LegalEntityRelationV2ServiceImpl.java
  30. 53 0
      src/main/java/com/winhc/service/impl/StaffRelationServiceImpl.java
  31. 26 0
      src/main/java/com/winhc/utils/CompanyUtils.java
  32. 2 0
      src/main/resources/META-INF/spring-devtools.properties
  33. 68 0
      src/main/resources/application-dev.properties
  34. 60 0
      src/main/resources/application-prd.properties
  35. 54 0
      src/main/resources/application.properties
  36. 41 0
      src/main/resources/logback-spring.xml
  37. 408 0
      src/test/java/com/winhc/test/TestCreateNode.java
  38. 58 0
      src/test/java/com/winhc/test/TestJson.java

+ 170 - 0
.gitignore

@@ -0,0 +1,170 @@
+# Created by .ignore support plugin (hsz.mobi)
+### Python template
+# Byte-compiled / optimized / DLL files
+logs
+__pycache__/
+*.py[cod]
+*$py.class
+.idea
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+pip-wheel-metadata/
+share/python-wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# PyInstaller
+#  Usually these files are written by a python script from a template
+#  before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.nox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*.cover
+*.py,cover
+.hypothesis/
+.pytest_cache/
+cover/
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+local_settings.py
+db.sqlite3
+db.sqlite3-journal
+
+# Flask stuff:
+instance/
+.webassets-cache
+
+# Scrapy stuff:
+.scrapy
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+.pybuilder/
+target/
+
+# Jupyter Notebook
+.ipynb_checkpoints
+
+# IPython
+profile_default/
+ipython_config.py
+
+# pyenv
+#   For a library or package, you might want to ignore these files since the code is
+#   intended to run in multiple environments; otherwise, check them in:
+# .python-version
+
+# pipenv
+#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
+#   However, in case of collaboration, if having platform-specific dependencies or dependencies
+#   having no cross-platform support, pipenv may install dependencies that don't work, or not
+#   install all needed dependencies.
+#Pipfile.lock
+
+# PEP 582; used by e.g. github.com/David-OConnor/pyflow
+__pypackages__/
+
+# Celery stuff
+celerybeat-schedule
+celerybeat.pid
+
+# SageMath parsed files
+*.sage.py
+
+# Environments
+.env
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# Spyder project settings
+.spyderproject
+.spyproject
+
+# Rope project settings
+.ropeproject
+
+# mkdocs documentation
+/site
+
+# mypy
+.mypy_cache/
+.dmypy.json
+dmypy.json
+
+# Pyre type checker
+.pyre/
+
+# pytype static type analyzer
+.pytype/
+
+# Cython debug symbols
+cython_debug/
+
+### Java template
+# Compiled class file
+*.class
+
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+Kafka-neo4j.iml
+request.rest
+

+ 181 - 0
pom.xml

@@ -0,0 +1,181 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>org.example</groupId>
+    <artifactId>Kafka-neo4j</artifactId>
+    <version>1.0-SNAPSHOT</version>
+
+
+    <parent>
+        <groupId>org.springframework.boot</groupId>
+        <artifactId>spring-boot-starter-parent</artifactId>
+        <!--		<version>1.5.10.RELEASE</version>-->
+        <version>2.2.6.RELEASE</version>
+        <relativePath /> <!-- lookup parent from repository -->
+    </parent>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <java.version>1.8</java.version>
+        <start-class>com.winhc.Application</start-class>
+<!--        <spring-cloud.version>Dalston.SR3</spring-cloud.version>-->
+<!--        <spring-cloud.version>Finchley.M8</spring-cloud.version>-->
+
+    </properties>
+
+    <dependencies>
+        <!-- 添加mysql驱动包 -->
+        <dependency>
+            <groupId> mysql</groupId>
+            <artifactId> mysql-connector-java</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.mybatis.spring.boot</groupId>
+            <artifactId>mybatis-spring-boot-starter</artifactId>
+            <version>1.1.1</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-test</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-web</artifactId>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.springframework.boot</groupId>
+                    <artifactId>spring-boot-starter-tomcat</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-tomcat</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-devtools</artifactId>
+            <optional>true</optional>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.1.40</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.4</version>
+        </dependency>
+
+<!--        <dependency>-->
+<!--            <groupId>org.apache.httpcomponents</groupId>-->
+<!--            <artifactId>httpclient</artifactId>-->
+<!--        </dependency>-->
+
+<!--        <dependency>-->
+<!--            <groupId>com.google.code.gson</groupId>-->
+<!--            <artifactId>gson</artifactId>-->
+<!--            <version>2.8.2</version>-->
+<!--        </dependency>-->
+<!--        <dependency>-->
+<!--            <groupId>org.neo4j</groupId>-->
+<!--            <artifactId>neo4j-ogm-http-driver</artifactId>-->
+<!--        </dependency>-->
+
+        <!-- swagger依赖 -->
+        <dependency>
+            <groupId>io.springfox</groupId>
+            <artifactId>springfox-swagger2</artifactId>
+            <version>2.6.1</version>
+        </dependency>
+        <dependency>
+            <groupId>io.springfox</groupId>
+            <artifactId>springfox-swagger-ui</artifactId>
+            <version>2.6.1</version>
+        </dependency>
+
+<!--         图形数据库Neo4j 官方支持的neo4j依赖包 -->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-data-neo4j</artifactId>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <optional>true</optional>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>druid</artifactId>
+            <version>1.0.18</version>
+        </dependency>
+
+        <!--SpringBoot 使用 JDBCTemplate 引用下面这个包才会自动注入,不要引用上面按个-->
+        <dependency>
+            <groupId>org.springframework.boot</groupId>
+            <artifactId>spring-boot-starter-jdbc</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>org.springframework.kafka</groupId>
+            <artifactId>spring-kafka</artifactId>
+        </dependency>
+<!--        <dependency>-->
+<!--            <groupId>org.neo4j.driver</groupId>-->
+<!--            <artifactId>neo4j-java-driver-spring-boot-starter</artifactId>-->
+<!--            <version>4.1.1.0</version>-->
+<!--        </dependency>-->
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>2.6.2</version>
+        </dependency>
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+            <version>4.5.16</version>
+        </dependency>
+    </dependencies>
+
+    <dependencyManagement>
+        <dependencies>
+<!--            <dependency>-->
+<!--                <groupId>org.springframework.cloud</groupId>-->
+<!--                <artifactId>spring-cloud-dependencies</artifactId>-->
+<!--                <version>${spring-cloud.version}</version>-->
+<!--                <type>pom</type>-->
+<!--                <scope>import</scope>-->
+<!--            </dependency>-->
+        </dependencies>
+    </dependencyManagement>
+
+    <build>
+        <finalName>kafka-neo4j</finalName>
+
+        <plugins>
+            <plugin>
+                <groupId>org.springframework.boot</groupId>
+                <artifactId>spring-boot-maven-plugin</artifactId>
+                <configuration>
+                    <fork>true</fork>
+                </configuration>
+            </plugin>
+
+
+        </plugins>
+    </build>
+
+
+</project>

+ 25 - 0
src/main/java/com/winhc/Application.java

@@ -0,0 +1,25 @@
+package com.winhc;
+
+import org.springframework.boot.SpringApplication;
+import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.boot.builder.SpringApplicationBuilder;
+import org.springframework.boot.web.servlet.ServletComponentScan;
+import org.springframework.boot.web.servlet.support.SpringBootServletInitializer;
+//import org.springframework.cloud.client.discovery.EnableDiscoveryClient;
+import org.springframework.scheduling.annotation.EnableScheduling;
+
+@SpringBootApplication
+//@EnableDiscoveryClient
+@ServletComponentScan
+@EnableScheduling
+public class Application extends SpringBootServletInitializer {
+
+    @Override
+    protected SpringApplicationBuilder configure(SpringApplicationBuilder application) {
+        return application.sources(Application.class);
+    }
+
+    public static void main(String[] args) {
+        SpringApplication.run(Application.class, args);
+    }
+}

+ 35 - 0
src/main/java/com/winhc/common/enums/CompanyEnum.java

@@ -0,0 +1,35 @@
+package com.winhc.common.enums;
+
+public class CompanyEnum {
+
+    public enum Lable {
+        COMPANY("COMPANY1"), PERSON("PERSON1"),
+        法人("法人"), 高管("高管"), 投资("投资");
+
+        public final String code;
+
+        Lable(String code) {
+            this.code = code;
+        }
+    }
+
+
+    public enum TopicType {
+        COMPANY_NODE("1", "companyNodeServiceImpl"),
+        HOLDER_RELATION_V1("2", "holderRelationV1ServiceImpl"),
+        HOLDER_RELATION_V2("3", "holderRelationV2ServiceImpl"),
+        LEGAL_ENTITY_RELATION_V1("4", "legalEntityRelationV1ServiceImpl"),
+        LEGAL_ENTITY_RELATION_V2("5", "legalEntityRelationV2ServiceImpl"),
+        STAFF_RELATION("6", "staffRelationServiceImpl");
+
+        public final String CODE;
+        public final String VALUE;
+
+        TopicType(String code, String value) {
+            this.CODE = code;
+            this.VALUE = value;
+        }
+    }
+
+}
+

+ 40 - 0
src/main/java/com/winhc/config/DemoNeo4j.java

@@ -0,0 +1,40 @@
+package com.winhc.config;
+
+import org.neo4j.driver.*;
+
+import static org.neo4j.driver.Values.parameters;
+
+public class DemoNeo4j implements AutoCloseable {
+    private final Driver driver;
+
+    public DemoNeo4j(String uri, String user, String password) {
+        driver = GraphDatabase.driver(uri, AuthTokens.basic(user, password));
+    }
+
+    @Override
+    public void close() throws Exception {
+        driver.close();
+    }
+
+    public void printGreeting(final String message) {
+        try (Session session = driver.session()) {
+            String greeting = session.writeTransaction(new TransactionWork<String>() {
+                @Override
+                public String execute(Transaction tx) {
+                    Result result = tx.run("CREATE (a:Greeting) " +
+                                    "SET a.message = $message " +
+                                    "RETURN a.message + ', from node ' + id(a)",
+                            parameters("message", message));
+                    return result.single().get(0).asString();
+                }
+            });
+            System.out.println(greeting);
+        }
+    }
+
+    public static void main(String... args) throws Exception {
+        try (DemoNeo4j greeter = new DemoNeo4j("bolt://106.14.211.187:7687", "neo4j", "neo4j168")) {
+            greeter.printGreeting("hello, world");
+        }
+    }
+}

+ 116 - 0
src/main/java/com/winhc/config/KafkaConfig.java

@@ -0,0 +1,116 @@
+package com.winhc.config;
+
+import com.google.common.collect.Maps;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.kafka.annotation.EnableKafka;
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
+import org.springframework.kafka.core.DefaultKafkaProducerFactory;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.core.ProducerFactory;
+import org.springframework.kafka.listener.ContainerProperties;
+import org.springframework.stereotype.Component;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2020/12/24 10:06
+ */
+//@Component
+@Configuration
+@EnableKafka
+//@ConditionalOnProperty(value = "spring.profiles.active", havingValue = "dev", matchIfMissing = true)
+public class KafkaConfig {
+    @Value("${spring.kafka.bootstrap-servers}")
+    private String bootstrapServers;
+
+    @Value("${spring.kafka.consumer.enable-auto-commit}")
+    private Boolean autoCommit;
+
+    @Value("${spring.kafka.producer.retries}")
+    private Integer retries;
+
+    @Value("${spring.kafka.producer.batch-size}")
+    private Integer batchSize;
+
+    @Value("${spring.kafka.producer.buffer-memory}")
+    private Integer bufferMemory;
+
+    /**
+     * 生产者配置信息
+     */
+    @Bean
+    public Map<String, Object> producerConfigs() {
+        Map<String, Object> props = Maps.newHashMap();
+        props.put(ProducerConfig.ACKS_CONFIG, "0");
+        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        props.put(ProducerConfig.RETRIES_CONFIG, retries);
+        props.put(ProducerConfig.BATCH_SIZE_CONFIG, batchSize);
+        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
+        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, bufferMemory);
+        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+        props.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "3000");
+        return props;
+    }
+
+    /**
+     * 生产者工厂
+     */
+    @Bean
+    public ProducerFactory<String, String> producerFactory() {
+        return new DefaultKafkaProducerFactory<>(producerConfigs());
+    }
+
+    /**
+     * 生产者模板
+     */
+    @Bean
+    public KafkaTemplate<String, String> kafkaTemplate() {
+        return new KafkaTemplate<>(producerFactory());
+    }
+
+    @Bean("containerFactory")
+    public ConcurrentKafkaListenerContainerFactory<String, String> containerFactory() {
+        ConcurrentKafkaListenerContainerFactory<String, String> container = new ConcurrentKafkaListenerContainerFactory<>();
+        container.setConsumerFactory(new DefaultKafkaConsumerFactory<>(consumerProps()));
+        // 设置并发量,小于或等于Topic的分区数
+        container.setConcurrency(1);
+        // 拉取超时时间
+        //container.getContainerProperties().setPollTimeout(1500);
+        // 设置为批量监听
+        container.setBatchListener(true);
+        // 设置提交偏移量的方式
+        //container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
+        return container;
+    }
+
+    private Map<String, Object> consumerProps() {
+        Map<String, Object> props = new HashMap<>(8);
+        // kafka服务地址
+        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+        // 设置是否自动提交
+        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, autoCommit);
+        // 一次拉取消息数量
+        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5000);
+        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 120000);
+        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 180000);
+        // 最大处理时间
+        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 120000);
+        // 序列化
+        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        return props;
+    }
+
+}

+ 71 - 0
src/main/java/com/winhc/config/Neo4jConfig.java

@@ -0,0 +1,71 @@
+package com.winhc.config;
+
+import org.neo4j.ogm.session.SessionFactory;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+import org.springframework.data.neo4j.repository.config.EnableNeo4jRepositories;
+import org.springframework.data.neo4j.transaction.Neo4jTransactionManager;
+import org.springframework.transaction.annotation.EnableTransactionManagement;
+
+@Configuration
+@EnableNeo4jRepositories(basePackages = {"com.winhc.repository"})
+@EnableTransactionManagement
+public class Neo4jConfig {
+
+//
+//    /******Session Factory*******/
+//    @Bean("sessionFactory")
+//    public SessionFactory sessionFactory(org.neo4j.ogm.config.Configuration configuration) {
+//        // with domain entity base package(s)
+//        return new SessionFactory(configuration(),"com.winhc.entity");
+//    }
+//
+//    @Bean
+//    public org.neo4j.ogm.config.Configuration configuration() {
+//        org.neo4j.ogm.config.Configuration configuration = new org.neo4j.ogm.config.Configuration.Builder()
+//                .uri("bolt://106.14.211.187:7687")
+//                .connectionPoolSize(100)
+//                .credentials("neo4j", "neo4j168")
+//                .build();
+//        return configuration;
+//    }
+//
+//    @Bean
+//    public Neo4jTransactionManager transactionManager() {
+//        return new Neo4jTransactionManager(sessionFactory(configuration()));
+//    }
+//    @Bean
+//    public org.neo4j.ogm.config.Configuration getConfiguration() {
+//        org.neo4j.ogm.config.Configuration config = new Configuration();
+//        config.driverConfiguration().setDriverClassName(" org.neo4j.ogm.drivers.http.driver.HttpDriver")
+//                .setURI(" http: // user:password @ localhost:7474");
+//        return config;
+//    }
+//
+//    @Bean
+//    public SessionFactory getSessionFactory() {
+//        return new SessionFactory(getConfiguration(), "");
+//    }
+}
+
+
+//@Configuration
+//@EnableNeo4jRepositories("com.neo4j.repository")
+//@EnableTransactionManagement
+//public class Neo4jConfig extends Neo4jConfiguration {
+//
+//    public static final int NEO4J_PORT = 7474;
+//
+//    @Bean
+//    public SessionFactory getSessionFactory() {
+//        return new SessionFactory("com.neo4j.domain");
+//    }
+//
+//    //配置事务
+//    @Bean
+//    @Qualifier("neo4jTransactionManager")
+//    public Neo4jTransactionManager neo4jTransactionManager() throws Exception {
+//        return new Neo4jTransactionManager(getSession());
+//    }
+//
+//}

+ 28 - 0
src/main/java/com/winhc/config/Neo4jDriver.java

@@ -0,0 +1,28 @@
+package com.winhc.config;
+
+import org.neo4j.driver.AuthTokens;
+import org.neo4j.driver.Config;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.GraphDatabase;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.context.annotation.Bean;
+import org.springframework.stereotype.Component;
+
+import java.util.concurrent.TimeUnit;
+
+@Component
+public class Neo4jDriver {
+
+    @Value("${spring.data.neo4j.uri}")
+    private String uri;
+    @Value("${spring.data.neo4j.username}")
+    private String username;
+    @Value("${spring.data.neo4j.password}")
+    private String password;
+
+    @Bean
+    public Driver init() {
+        Config config = Config.builder().withConnectionTimeout(100, TimeUnit.SECONDS).withMaxTransactionRetryTime(10, TimeUnit.SECONDS).build();
+        return GraphDatabase.driver(uri, AuthTokens.basic(username, password), config);
+    }
+}

+ 28 - 0
src/main/java/com/winhc/dto/CompanyQueryCondition.java

@@ -0,0 +1,28 @@
+package com.winhc.dto;
+
+import io.swagger.annotations.ApiModel;
+import io.swagger.annotations.ApiModelProperty;
+import lombok.Data;
+
+import java.io.Serializable;
+
+
+
+@Data
+public class CompanyQueryCondition implements Serializable{
+	/**
+	 *
+	 */
+	private static final long serialVersionUID = 1L;
+	
+	private String startId;
+
+	private String endId;
+
+	private String status;
+
+	private String label;
+
+	private String flag;
+
+}

+ 33 - 0
src/main/java/com/winhc/entity/Company.java

@@ -0,0 +1,33 @@
+package com.winhc.entity;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.neo4j.ogm.annotation.Id;
+import org.neo4j.ogm.annotation.NodeEntity;
+
+
+
+@Data
+@NoArgsConstructor
+@NodeEntity(label = "COMPANY")
+public class Company {
+
+
+    /**
+     * Neo4j会分配的ID
+     */
+//    @GraphId
+//    private Long id;
+
+    //@Id @Convert(UuidStringConverter.class)
+    //@GeneratedValue(strategy = UuidStrategy.class)
+
+    @Id
+    private String companyId;
+
+    /**
+     * 属性
+     */
+    private String name;
+
+}

+ 42 - 0
src/main/java/com/winhc/entity/HolderCompanyRelation.java

@@ -0,0 +1,42 @@
+package com.winhc.entity;
+
+import com.fasterxml.jackson.annotation.JsonProperty;
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.neo4j.ogm.annotation.*;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+@RelationshipEntity
+@Data
+@NoArgsConstructor
+public class HolderCompanyRelation {
+
+//    @GraphId
+//    private Long id;
+
+    @StartNode
+    private Company startNode;
+
+    @EndNode
+    private Company endNode;
+
+    private String status;
+    @Relationship(type = "TOUZI")
+    @JsonProperty("TOUZI1")
+    private String label;
+
+    //在新增对象,或者关系之前使用addLabel()方法,然后就会有多个标签了
+//    private Set<String> labels = new HashSet<String>();
+//
+//    public Set<String> getLabels() {
+//        return labels;
+//    }
+//
+//    public void addLabel(String name) {
+//        this.labels.add(name);
+//    }
+
+}

+ 41 - 0
src/main/java/com/winhc/entity/HolderPersonRelation.java

@@ -0,0 +1,41 @@
+package com.winhc.entity;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.neo4j.ogm.annotation.*;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+@RelationshipEntity
+@Data
+@NoArgsConstructor
+public class HolderPersonRelation {
+
+//    @GraphId
+//    private Long id;
+
+    @StartNode
+    private Person startNode;
+
+    @EndNode
+    private Company endNode;
+
+    private String status;
+    @Relationship
+    private String label;
+
+    //在新增对象,或者关系之前使用addLabel()方法,然后就会有多个标签了
+//    @Labels
+//    private Set<String> labels = new HashSet<String>();
+//
+//    public Set<String> getLabels() {
+//        return labels;
+//    }
+//
+//    public void addLabel(String name) {
+//        this.labels.add(name);
+//    }
+
+}

+ 42 - 0
src/main/java/com/winhc/entity/LegalRelation.java

@@ -0,0 +1,42 @@
+package com.winhc.entity;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.neo4j.ogm.annotation.*;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+@RelationshipEntity
+@Data
+@NoArgsConstructor
+public class LegalRelation {
+
+//    @GraphId
+//    private Long id;
+
+    @StartNode
+    private Person startNode;
+
+    @EndNode
+    private Company endNode;
+
+    private String status;
+    @Relationship
+    private String label;
+
+    //在新增对象,或者关系之前使用addLabel()方法,然后就会有多个标签了
+//    @Labels
+//    private Set<String> labels = new HashSet<String>();
+//
+//    public Set<String> getLabels() {
+//        return labels;
+//    }
+//
+//    public void addLabel(String name) {
+//        this.labels.add(name);
+//    }
+
+
+}

+ 28 - 0
src/main/java/com/winhc/entity/Person.java

@@ -0,0 +1,28 @@
+package com.winhc.entity;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.neo4j.ogm.annotation.Id;
+import org.neo4j.ogm.annotation.NodeEntity;
+
+
+@Data
+@NoArgsConstructor
+@NodeEntity(label = "PERSON")
+public class Person {
+
+ 
+	/**
+	 * Neo4j会分配的ID
+	 */
+//	@GraphId
+//	private Long id;
+	@Id
+	private String personId;
+	
+	/**
+	 * 属性
+	 */
+	private String name;
+
+}

+ 41 - 0
src/main/java/com/winhc/entity/StaffRelation.java

@@ -0,0 +1,41 @@
+package com.winhc.entity;
+
+import lombok.Data;
+import lombok.NoArgsConstructor;
+import org.neo4j.ogm.annotation.*;
+
+import java.util.HashSet;
+import java.util.Set;
+
+
+@RelationshipEntity
+@Data
+@NoArgsConstructor
+public class StaffRelation {
+
+//    @GraphId
+//    private Long id;
+
+    @StartNode
+    private Person startNode;
+
+    @EndNode
+    private Company endNode;
+
+    private String status;
+    @Relationship
+    private String label;
+
+//    @Labels
+//    private Set<String> labels = new HashSet<String>();
+//
+//    public Set<String> getLabels() {
+//        return labels;
+//    }
+//
+//    public void addLabel(String name) {
+//        this.labels.add(name);
+//    }
+
+
+}

+ 134 - 0
src/main/java/com/winhc/kafka/KafkaConsumer.java

@@ -0,0 +1,134 @@
+package com.winhc.kafka;
+
+import com.winhc.repository.CompanyRepository;
+import com.winhc.repository.PersonRepository;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.neo4j.driver.*;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.TopicPartition;
+import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
+import org.springframework.kafka.listener.ListenerExecutionFailedException;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.messaging.Message;
+import org.springframework.messaging.handler.annotation.Payload;
+import org.springframework.stereotype.Service;
+
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2020/12/24 10:03
+ */
+@Slf4j
+//@Service
+public class KafkaConsumer {
+
+    @Autowired
+    CompanyRepository companyRepository;
+    @Autowired
+    PersonRepository personRepository;
+    @Autowired
+    Driver driver;
+
+
+
+    @KafkaListener(id = "id1",
+            topicPartitions = {@TopicPartition(topic = "test",
+                    partitions = { "0" })
+                    //partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))
+                    },
+            groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "consumerAwareListenerErrorHandler1")
+    public void consumerRelation(List<ConsumerRecord<?, ?>> records, Acknowledgment ack) {
+        long start = System.currentTimeMillis();
+        List<Map<String, Object>> batch_list = records.stream().filter(r -> {
+            return (r != null && r.value() != null);
+        }).map(r -> {
+            String s = r.value().toString();
+//            int disableDecimalFeature = JSON.DEFAULT_PARSER_FEATURE & ~Feature.UseBigDecimal.getMask();
+//            SerializeConfig config = SerializeConfig.getGlobalInstance();
+//            config.put(Double.class, new DoubleSerializer("#.#####"));
+//            JSONObject JB = JSON.parseObject(s, JSONObject.class, disableDecimalFeature);
+            int i = 1/0;
+            Map<String, Object> map = new HashMap<>();
+            return map;
+        }).collect(Collectors.toList());
+        System.out.println("list query: " + batch_list.size());
+
+//        Session session = driver.session();
+//        final String cql = "WITH  {batch_list} AS batch_list \n" +
+//                "UNWIND batch_list AS row \n" +
+//                "MERGE(company:COMPANY2{companyId:row.companyId}) \n" +
+//                "SET company.name=row.companyName, company.companyId=row.companyId \n" +
+//                //"FOREACH (_ IN case when row.personId is not null then [1] else [] end|\n" +
+//                "MERGE(person:PERSON2{personId:row.personId}) \n" +
+//                "SET person.name=row.personName, person.personId=row.personId \n" +
+//                //")" +
+//                "WITH person,company,row \n" +
+//                "MERGE(person)-[r:高管]->(company) \n" +
+//                "SET r.percent=row.percent, r.status=row.status, r.relType=row.relType \n"
+////                "WITH person,company,row\n" +
+////                "CALL apoc.merge.relationship(person, row.relType, {},{percent:row.percent,status:row.status}, company) YIELD rel \n" +
+////                "WITH rel,row \n" +
+////                "SET rel.status= row.status, rel.percent= row.percent \n"
+//                ;
+//        Map parameters = new HashMap() {{
+//            put("batch_list", batch_list);
+//        }};
+//
+//        String data = session.writeTransaction(new TransactionWork<String>() {
+//            @Override
+//            public String execute(Transaction tx) {
+//                Result result = tx.run(cql, parameters);
+//                return "11";
+//            }
+//        });
+        //session.close();
+        System.out.println(" 导入 " + batch_list.size() + " company 边");
+
+//        System.out.println("cost: " + (System.currentTimeMillis() - start));
+
+        //提交偏移量
+        ack.acknowledge();
+    }
+
+    /**
+     * 因为手动确认消费,若消费失败,记录重刷
+     */
+    @Bean("consumerAwareListenerErrorHandler1")
+    public ConsumerAwareListenerErrorHandler dealError() {
+
+//        new ConsumerAwareListenerErrorHandler() {
+//            @Override
+//            public Object handleError(Message<?> message, ListenerExecutionFailedException e, Consumer<?, ?> consumer) {
+//                List<ConsumerRecord> c = (List<ConsumerRecord>) message.getPayload();
+//                return null;
+//            }
+//        };
+       // return null;
+        //return new ConsumerAwareListenerErrorHandler() {
+        return (message, e, consumer) -> {
+            System.out.println("consumer error:" + e);
+            //System.out.println("consumer 1:" + consumer.toString());
+            System.out.println("consumer 2:" + message.toString());
+            List<ConsumerRecord> records = (List<ConsumerRecord>) message.getPayload();
+
+            List<String> batch_list = records.stream().filter(r -> {
+                return (r != null && r.value() != null);
+            }).map(r -> {
+                String s = r.value().toString();
+
+                return s;
+            }).collect(Collectors.toList());
+            System.out.println(batch_list);
+//            Consumer<?, ?> consumer1 = consumer;
+            // TODO 将失败的记录保存到数据库,再用定时任务查询记录,并重刷数据
+            return null;
+        };
+    }
+}

+ 49 - 0
src/main/java/com/winhc/kafka/KafkaProduce.java

@@ -0,0 +1,49 @@
+package com.winhc.kafka;
+
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.PartitionOffset;
+import org.springframework.kafka.annotation.TopicPartition;
+import org.springframework.kafka.core.KafkaTemplate;
+import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
+import org.springframework.kafka.listener.ListenerExecutionFailedException;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.kafka.support.SendResult;
+import org.springframework.messaging.Message;
+import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+import org.springframework.util.concurrent.ListenableFuture;
+
+import java.util.List;
+import java.util.Optional;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2020/12/24 10:03
+ */
+@Slf4j
+@Service
+@AllArgsConstructor
+public class KafkaProduce {
+//    @Autowired
+//    KafkaTemplate kafkaTemplate;
+
+    private final KafkaTemplate<String, String> kafkaTemplate;
+
+    public void produce(String topic,String message) {
+        ListenableFuture<SendResult<String, String>> send = kafkaTemplate.send(topic, message);
+        send.addCallback(o -> {
+            System.out.println("send-消息发送成功:" + message);
+        }, throwable -> {
+            System.out.println("消息发送失败:" + message);
+        });
+    }
+
+
+}

+ 103 - 0
src/main/java/com/winhc/kafka/consumer/KafkaConsumerNeo4j.java

@@ -0,0 +1,103 @@
+package com.winhc.kafka.consumer;
+
+import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.annotation.TopicPartition;
+import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
+import org.springframework.kafka.support.Acknowledgment;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/1/8 16:04
+ */
+@Slf4j
+//@Service
+@AllArgsConstructor
+public class KafkaConsumerNeo4j {
+
+    private final Map<String, RelationService> map;
+
+    @KafkaListener(id = "${spring.kafka.topic_company_node}"
+//            topicPartitions = {@TopicPartition(topic = "${spring.kafka.topic_company_node}")
+//                    //,partitions = {"0"})
+//                    //partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))
+//            },
+            , topics = "${spring.kafka.topic_company_node}"
+            , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "consumerAwareListenerErrorHandler")
+    public void consumerCompanyNode(List<ConsumerRecord<?, ?>> records) {
+        //保存
+        map.get("companyNodeServiceImpl").save(CompanyUtils.map(records));
+        //提交偏移量
+        //ack.acknowledge();
+    }
+
+    @KafkaListener(id = "${spring.kafka.topic_holder_v1}"
+            , topics = "${spring.kafka.topic_holder_v1}"
+            , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "consumerAwareListenerErrorHandler")
+    public void consumerHolderV1(List<ConsumerRecord<?, ?>> records//, Acknowledgment ack
+    ) {
+        //保存
+        map.get("holderRelationV1ServiceImpl").save(CompanyUtils.map(records));
+    }
+
+    @KafkaListener(id = "${spring.kafka.topic_holder_v2}"
+            , topics = "${spring.kafka.topic_holder_v2}"
+            , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "consumerAwareListenerErrorHandler")
+    public void consumerHolderV2(List<ConsumerRecord<?, ?>> records) {
+        //保存
+        map.get("holderRelationV2ServiceImpl").save(CompanyUtils.map(records));
+    }
+
+    @KafkaListener(id = "${spring.kafka.topic_staff_relation}"
+            , topics = "${spring.kafka.topic_staff_relation}"
+            , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "consumerAwareListenerErrorHandler")
+    public void consumerStaffRelationV1(List<ConsumerRecord<?, ?>> records) {
+        //保存
+        map.get("staffRelationServiceImpl").save(CompanyUtils.map(records));
+    }
+
+    @KafkaListener(id = "${spring.kafka.topic_legal_entity_relation_v1}"
+            , topics = "${spring.kafka.topic_legal_entity_relation_v1}"
+            , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "consumerAwareListenerErrorHandler")
+    public void consumerLegalEntityRelationV1(List<ConsumerRecord<?, ?>> records) {
+        //保存
+        map.get("legalEntityRelationV1ServiceImpl").save(CompanyUtils.map(records));
+    }
+
+    @KafkaListener(id = "${spring.kafka.topic_legal_entity_relation_v2}"
+            , topics = "${spring.kafka.topic_legal_entity_relation_v2}"
+            , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "consumerAwareListenerErrorHandler")
+    public void consumerLegalEntityRelationV2(List<ConsumerRecord<?, ?>> records) {
+        //保存
+        map.get("legalEntityRelationV2ServiceImpl").save(CompanyUtils.map(records));
+    }
+
+
+    /**
+     * 因为手动确认消费,若消费失败,记录重刷
+     */
+    @Bean("consumerAwareListenerErrorHandler")
+    public ConsumerAwareListenerErrorHandler dealError() {
+        return (message, e, consumer) -> {
+            List<ConsumerRecord> records = (List<ConsumerRecord>) message.getPayload();
+            log.error("consumer error: {}",e.getMessage());
+            //System.out.println("consumer error:" + e.getMessage());
+            //System.out.println("consumer 1:" + consumer.toString());
+            //System.out.println("consumer 2:" + records);
+            log.error("consumer message: {}",records.get(0).toString());
+            // TODO 将失败的记录保存到数据库,再用定时任务查询记录,并重刷数据
+            return null;
+        };
+    }
+
+}

+ 60 - 0
src/main/java/com/winhc/kafka/consumer/KafkaConsumerNeo4jV2.java

@@ -0,0 +1,60 @@
+package com.winhc.kafka.consumer;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.service.RelationService;
+import com.winhc.utils.CompanyUtils;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.springframework.context.annotation.Bean;
+import org.springframework.kafka.annotation.KafkaListener;
+import org.springframework.kafka.listener.ConsumerAwareListenerErrorHandler;
+import org.springframework.stereotype.Service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/1/8 16:04
+ */
+@Slf4j
+@Service
+@AllArgsConstructor
+public class KafkaConsumerNeo4jV2 {
+
+    private final Map<String, RelationService> map;
+
+    @KafkaListener(id = "${spring.kafka.topic_node_relation_union}"
+            , topics = "${spring.kafka.topic_node_relation_union}"
+            , groupId = "${spring.kafka.consumer.group-id}", containerFactory = "containerFactory", errorHandler = "consumerAwareListenerErrorHandlerV2")
+    public void consumerCompanyNode(List<ConsumerRecord<?, ?>> records) {
+        List<Map<String, Object>> listMap = CompanyUtils.map(records);
+        this.map.get(CompanyEnum.TopicType.COMPANY_NODE.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.COMPANY_NODE.CODE));
+        this.map.get(CompanyEnum.TopicType.HOLDER_RELATION_V1.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.HOLDER_RELATION_V1.CODE));
+        this.map.get(CompanyEnum.TopicType.HOLDER_RELATION_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.HOLDER_RELATION_V2.CODE));
+        this.map.get(CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V1.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V1.CODE));
+        this.map.get(CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V2.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.LEGAL_ENTITY_RELATION_V2.CODE));
+        this.map.get(CompanyEnum.TopicType.STAFF_RELATION.VALUE).save(CompanyUtils.filterList(listMap, CompanyEnum.TopicType.STAFF_RELATION.CODE));
+    }
+
+
+    /**
+     * 因为手动确认消费,若消费失败,记录重刷
+     */
+    @Bean("consumerAwareListenerErrorHandlerV2")
+    public ConsumerAwareListenerErrorHandler dealError() {
+        return (message, e, consumer) -> {
+            List<ConsumerRecord> records = (List<ConsumerRecord>) message.getPayload();
+            log.error("consumer error: {}", e.getMessage());
+            //System.out.println("consumer error:" + e.getMessage());
+            //System.out.println("consumer 1:" + consumer.toString());
+            //System.out.println("consumer 2:" + records);
+            log.error("consumer message: {}", records.get(0).toString());
+            // TODO 将失败的记录保存到数据库,再用定时任务查询记录,并重刷数据
+            return null;
+        };
+    }
+
+}

+ 30 - 0
src/main/java/com/winhc/repository/CompanyRelationRepository.java

@@ -0,0 +1,30 @@
+package com.winhc.repository;
+
+import com.winhc.entity.Company;
+import org.springframework.data.neo4j.annotation.Query;
+import org.springframework.data.neo4j.repository.Neo4jRepository;
+import org.springframework.data.repository.query.Param;
+import org.springframework.stereotype.Repository;
+
+import java.util.List;
+
+
+@Repository
+public interface CompanyRelationRepository extends Neo4jRepository<Company,String>{
+
+	/**
+	 * 返回关系长度为2以内的节点和关系(100条)
+	 * @param name
+	 * @return
+	 */
+	@Query("MATCH p=(n:Party)<-[r:SUE*0..2]->(m:Party) where n.name={name} RETURN p order by p desc LIMIT {limitNumber}")
+    List<Company> findPartyRelationByName(@Param("name") String name, @Param("limitNumber") int limitNumber);
+
+
+	@Query("MATCH (a:COMPANY),(b:COMPANY) where a.companyId={aCompanyId} and b.companyId={bCompanyId} MERGE (a)-[r:label {status:{status}}]->(b) return r")
+	List<Company> createHolderCompany(@Param("aCompanyId") String aCompanyId,@Param("bCompanyId") String bCompanyId,@Param("status") String status,@Param("label") String label);
+
+//	@Query("MATCH (a:Party),(b:Party)  where a.partyId={aPartyId} and b.partyId={bPartyId} MERGE (a)-[r:SUE{lawsuitId:{lawsuitId},province:{province},city:{city},judge_year:{judge_year},judge_result:{judge_result}}]->(b) return r")
+//	List<Company> createSues(@Param("aPartyId") Long aPartyId,@Param("bPartyId") Long bPartyId,@Param("lawsuitId") String lawsuitId,@Param("province") String province,@Param("city") String city,@Param("judge_year") Integer judge_year,@Param("judge_result") String judge_result);
+
+}

+ 17 - 0
src/main/java/com/winhc/repository/CompanyRepository.java

@@ -0,0 +1,17 @@
+package com.winhc.repository;
+
+import com.winhc.entity.Company;
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+import org.springframework.stereotype.Repository;
+import org.springframework.data.neo4j.repository.Neo4jRepository;
+
+
+
+@Repository
+public interface CompanyRepository extends Neo4jRepository<Company, String> {
+
+    //Company findByName(@Param("name") String name);
+    //Company save(@Param("person") Company person);
+}
+ 

+ 15 - 0
src/main/java/com/winhc/repository/PersonRepository.java

@@ -0,0 +1,15 @@
+package com.winhc.repository;
+
+import com.winhc.entity.Person;
+import org.springframework.data.neo4j.repository.Neo4jRepository;
+import org.springframework.data.repository.CrudRepository;
+import org.springframework.data.repository.query.Param;
+import org.springframework.stereotype.Repository;
+
+
+@Repository
+public interface PersonRepository extends Neo4jRepository<Person,String> {
+
+	Person findByName(@Param("name") String name);
+}
+ 

+ 103 - 0
src/main/java/com/winhc/service/CompanyRelationService.java

@@ -0,0 +1,103 @@
+package com.winhc.service;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+import com.winhc.dto.CompanyQueryCondition;
+import com.winhc.entity.Company;
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.*;
+import org.neo4j.ogm.session.Session;
+import org.neo4j.ogm.session.SessionFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.data.neo4j.transaction.SessionFactoryUtils;
+import org.springframework.stereotype.Service;
+
+import java.lang.reflect.Field;
+import java.util.List;
+
+@Slf4j
+@Service
+public class CompanyRelationService {
+    @Autowired
+    private SessionFactory sessionFactory;
+    @Autowired
+    Driver driver;
+
+    public List<Company> createCompanyRelation(CompanyQueryCondition c) {
+        Session session = SessionFactoryUtils.getSession(sessionFactory);
+        StringBuffer cql = new StringBuffer();
+        cql.append("MATCH (a:COMPANY),(b:COMPANY) where a.companyId=" + c.getStartId() + " and b.companyId=" + c.getEndId() + " MERGE (a)-[r:" + c.getLabel() + " {status:" + c.getStatus() + "]->(b) return r");
+
+        //拼接基本语句
+        //cql.append("MATCH p=(n:Party)<-[r:SUE*0..2{caseStage:'一审',judgeResult:'胜'}]->(m:Party) where n.name='珠海格力电器股份有限公司' RETURN p  LIMIT 100");
+
+        //数据查询结果
+        Iterable<Company> experimentNodes = session.query(Company.class, cql.toString(), Maps.newHashMap());
+
+        //返回
+        return Lists.newArrayList(experimentNodes);
+    }
+
+    public List<Company> createCompanyRelation2(CompanyQueryCondition c) {
+        org.neo4j.driver.Session session = driver.session();
+        StringBuffer cql = new StringBuffer();
+        String a1 = "";
+        String a2 = "";
+        if (c.getFlag() != null && c.getFlag().equals("1")) {//company
+            a1 = "COMPANY";
+            a2 = "companyId";
+        } else {
+            a1 = "PERSON";
+            a2 = "personId";
+        }
+        cql.append("MATCH (a:" + a1 + "),(b:COMPANY) where a." + a2 + "='" + c.getStartId() + "' and b.companyId='" + c.getEndId() + "' MERGE (a)-[r:" + c.getLabel() + " {status:'" + c.getStatus() + "'}]->(b) return r");
+
+        System.out.println(cql.toString());
+        List<Record> greeting = session.writeTransaction(new TransactionWork<List<Record>>() {
+            @Override
+            public List<Record> execute(Transaction tx) {
+                Result result = tx.run(String.valueOf(cql));
+                //return result.single().get(0).asString();
+                return result.list();
+            }
+        });
+        System.out.println(greeting);
+        //返回
+        return null;
+    }
+
+//    public void createCompanyRelation3(CompanyQueryCondition c) {
+//
+//    }
+
+    public static void main(String[] args) {
+//        SueObj sueObj  = new SueObj();
+//        sueObj.setJudgeYear("2018");
+//        sueObj.setJudgeResult("胜");
+//        SerializerFeature.config(0,SerializerFeature.QuoteFieldNames,false);
+//       String kk = JSON.toJSONString(sueObj,SerializerFeature.QuoteFieldNames);
+//       System.out.println(kk);
+    }
+
+    //判断该对象是否: 返回ture表示所有属性为null  返回false表示不是所有属性都是null
+    public static boolean isAllFieldNull(Object obj) {
+        try {
+            Class stuCla = (Class) obj.getClass();// 得到类对象
+            Field[] fs = stuCla.getDeclaredFields();//得到属性集合
+            boolean flag = true;
+            for (Field f : fs) {//遍历属性
+                f.setAccessible(true); // 设置属性是可以访问的(私有的也可以)
+                Object val = f.get(obj);// 得到此属性的值
+                if (val != null) {//只要有1个属性不为空,那么就不是所有的属性值都为空
+                    flag = false;
+                    break;
+                }
+            }
+            return flag;
+        } catch (IllegalAccessException e) {
+            log.error("属性判断出错", e);
+            return false;
+        }
+
+    }
+}

+ 13 - 0
src/main/java/com/winhc/service/RelationService.java

@@ -0,0 +1,13 @@
+package com.winhc.service;
+
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/1/8 15:06
+ */
+public interface RelationService {
+    String save(List<Map<String, Object>> messages);
+}

+ 49 - 0
src/main/java/com/winhc/service/impl/CompanyNodeServiceImpl.java

@@ -0,0 +1,49 @@
+package com.winhc.service.impl;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.service.RelationService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:股东关系 人->公司
+ * @date 2021/1/11 10:03
+ */
+@Slf4j
+@Service("companyNodeServiceImpl")
+@AllArgsConstructor
+public class CompanyNodeServiceImpl implements RelationService {
+
+    private final Driver driver;
+
+    @Override
+    public String save(List<Map<String, Object>> batch_list) {
+        if (batch_list.isEmpty()) return null;
+        long start = System.currentTimeMillis();
+
+        Session session = driver.session();
+        final String cql = "WITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MERGE(e:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.id}) \n" +
+                "SET e.name=row.name, e.company_id=row.id \n";
+        Map parameters = new HashMap() {{
+            put("batch_list", batch_list);
+        }};
+        //log.info("cql:" + cql);
+        String data = session.writeTransaction(tx -> {
+            Result result = tx.run(cql, parameters);
+            return "success";
+        });
+        log.info("class:{} | save size:{} | cost:{}", CompanyNodeServiceImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
+        return data;
+    }
+}

+ 52 - 0
src/main/java/com/winhc/service/impl/HolderRelationV1ServiceImpl.java

@@ -0,0 +1,52 @@
+package com.winhc.service.impl;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.service.RelationService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.*;
+import org.springframework.stereotype.Service;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:股东关系 人->公司
+ * @date 2021/1/11 10:03
+ */
+@Slf4j
+@Service("holderRelationV1ServiceImpl")
+@AllArgsConstructor
+public class HolderRelationV1ServiceImpl implements RelationService {
+
+    private final Driver driver;
+
+    @Override
+    public String save(List<Map<String, Object>> batch_list) {
+        if (batch_list.isEmpty()) return null;
+        long start = System.currentTimeMillis();
+
+        Session session = driver.session();
+        final String cql = "WITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MERGE(s:" + CompanyEnum.Lable.PERSON.code + "{person_id:row.start_id}) \n" +
+                "SET s.name=row.start_name, s.person_id=row.start_id \n" +
+                "MERGE(e:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.end_id}) \n" +
+                "SET e.name=row.end_name, e.company_id=row.end_id \n" +
+                "WITH s,e,row \n" +
+                "MERGE(s)-[r:" + CompanyEnum.Lable.投资.code + "]->(e) \n" +
+                "SET r.percent=row.percent, r.deleted=row.deleted \n";
+        Map parameters = new HashMap() {{
+            put("batch_list", batch_list);
+        }};
+        //log.info("cql:" + cql);
+        String data = session.writeTransaction(tx -> {
+            Result result = tx.run(cql, parameters);
+            return "success";
+        });
+        log.info("class:{} | save size:{} | cost:{}", HolderRelationV1ServiceImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
+        return data;
+    }
+}

+ 53 - 0
src/main/java/com/winhc/service/impl/HolderRelationV2ServiceImpl.java

@@ -0,0 +1,53 @@
+package com.winhc.service.impl;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.service.RelationService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.springframework.stereotype.Service;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:股东关系 公司->公司
+ * @date 2021/1/11 10:03
+ */
+@Slf4j
+@Service("holderRelationV2ServiceImpl")
+@AllArgsConstructor
+public class HolderRelationV2ServiceImpl implements RelationService {
+
+    private final Driver driver;
+
+    @Override
+    public String save(List<Map<String, Object>> batch_list) {
+        if (batch_list.isEmpty()) return null;
+        long start = System.currentTimeMillis();
+
+        Session session = driver.session();
+        final String cql = "WITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MERGE(s:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.start_id}) \n" +
+                "SET s.name=row.start_name, s.company_id=row.start_id \n" +
+                "MERGE(e:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.end_id}) \n" +
+                "SET e.name=row.end_name, e.company_id=row.end_id \n" +
+                "WITH s,e,row \n" +
+                "MERGE(s)-[r:" + CompanyEnum.Lable.投资.code + "]->(e) \n" +
+                "SET r.percent=row.percent, r.deleted=row.deleted \n";
+        Map parameters = new HashMap() {{
+            put("batch_list", batch_list);
+        }};
+        //log.info("cql:" + cql);
+        String data = session.writeTransaction(tx -> {
+            Result result = tx.run(cql, parameters);
+            return "success";
+        });
+        log.info("class:{} | save size:{} | cost:{}", HolderRelationV2ServiceImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
+        return data;
+    }
+}

+ 53 - 0
src/main/java/com/winhc/service/impl/LegalEntityRelationV1ServiceImpl.java

@@ -0,0 +1,53 @@
+package com.winhc.service.impl;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.service.RelationService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.springframework.stereotype.Service;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:主要成员 人->公司
+ * @date 2021/1/11 10:03
+ */
+@Slf4j
+@Service("legalEntityRelationV1ServiceImpl")
+@AllArgsConstructor
+public class LegalEntityRelationV1ServiceImpl implements RelationService {
+
+    private final Driver driver;
+
+    @Override
+    public String save(List<Map<String, Object>> batch_list) {
+        if (batch_list.isEmpty()) return null;
+        long start = System.currentTimeMillis();
+
+        Session session = driver.session();
+        final String cql = "WITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MERGE(s:" + CompanyEnum.Lable.PERSON.code + "{person_id:row.start_id}) \n" +
+                "SET s.name=row.start_name, s.person_id=row.start_id \n" +
+                "MERGE(e:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.end_id}) \n" +
+                "SET e.name=row.end_name, e.company_id=row.end_id \n" +
+                "WITH s,e,row \n" +
+                "MERGE(s)-[r:" + CompanyEnum.Lable.法人.code + "]->(e) \n" +
+                "SET r.deleted=row.deleted \n";
+        Map parameters = new HashMap() {{
+            put("batch_list", batch_list);
+        }};
+        //log.info("cql:" + cql);
+        String data = session.writeTransaction(tx -> {
+            Result result = tx.run(cql, parameters);
+            return "success";
+        });
+        log.info("class:{} | save size:{} | cost:{}", LegalEntityRelationV1ServiceImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
+        return data;
+    }
+}

+ 53 - 0
src/main/java/com/winhc/service/impl/LegalEntityRelationV2ServiceImpl.java

@@ -0,0 +1,53 @@
+package com.winhc.service.impl;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.service.RelationService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.springframework.stereotype.Service;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:主要成员 公司->公司
+ * @date 2021/1/11 10:03
+ */
+@Slf4j
+@Service("legalEntityRelationV2ServiceImpl")
+@AllArgsConstructor
+public class LegalEntityRelationV2ServiceImpl implements RelationService {
+
+    private final Driver driver;
+
+    @Override
+    public String save(List<Map<String, Object>> batch_list) {
+        if (batch_list.isEmpty()) return null;
+        long start = System.currentTimeMillis();
+
+        Session session = driver.session();
+        final String cql = "WITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MERGE(s:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.start_id}) \n" +
+                "SET s.name=row.start_name, s.company_id=row.start_id \n" +
+                "MERGE(e:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.end_id}) \n" +
+                "SET e.name=row.end_name, e.company_id=row.end_id \n" +
+                "WITH s,e,row \n" +
+                "MERGE(s)-[r:" + CompanyEnum.Lable.法人.code + "]->(e) \n" +
+                "SET r.deleted=row.deleted \n";
+        Map parameters = new HashMap() {{
+            put("batch_list", batch_list);
+        }};
+        //log.info("cql:" + cql);
+        String data = session.writeTransaction(tx -> {
+            Result result = tx.run(cql, parameters);
+            return "success";
+        });
+        log.info("class:{} | save size:{} | cost:{}", LegalEntityRelationV2ServiceImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
+        return data;
+    }
+}

+ 53 - 0
src/main/java/com/winhc/service/impl/StaffRelationServiceImpl.java

@@ -0,0 +1,53 @@
+package com.winhc.service.impl;
+
+import com.winhc.common.enums.CompanyEnum;
+import com.winhc.service.RelationService;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.neo4j.driver.Driver;
+import org.neo4j.driver.Result;
+import org.neo4j.driver.Session;
+import org.springframework.stereotype.Service;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:主要成员 人->公司
+ * @date 2021/1/11 10:03
+ */
+@Slf4j
+@Service("staffRelationServiceImpl")
+@AllArgsConstructor
+public class StaffRelationServiceImpl implements RelationService {
+
+    private final Driver driver;
+
+    @Override
+    public String save(List<Map<String, Object>> batch_list) {
+        if (batch_list.isEmpty()) return null;
+        long start = System.currentTimeMillis();
+
+        Session session = driver.session();
+        final String cql = "WITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MERGE(s:" + CompanyEnum.Lable.PERSON.code + "{person_id:row.start_id}) \n" +
+                "SET s.name=row.start_name, s.person_id=row.start_id \n" +
+                "MERGE(e:" + CompanyEnum.Lable.COMPANY.code + "{company_id:row.end_id}) \n" +
+                "SET e.name=row.end_name, e.company_id=row.end_id \n" +
+                "WITH s,e,row \n" +
+                "MERGE(s)-[r:" + CompanyEnum.Lable.高管.code + "]->(e) \n" +
+                "SET r.staff_type=row.staff_type, r.deleted=row.deleted \n";
+        Map parameters = new HashMap() {{
+            put("batch_list", batch_list);
+        }};
+        //log.info("cql:" + cql);
+        String data = session.writeTransaction(tx -> {
+            Result result = tx.run(cql, parameters);
+            return "success";
+        });
+        log.info("class:{} | save size:{} | cost:{}", StaffRelationServiceImpl.class.getSimpleName(), batch_list.size(), (System.currentTimeMillis() - start));
+        return data;
+    }
+}

+ 26 - 0
src/main/java/com/winhc/utils/CompanyUtils.java

@@ -0,0 +1,26 @@
+package com.winhc.utils;
+
+import cn.hutool.json.JSONUtil;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/1/11 21:31
+ */
+public class CompanyUtils {
+    public static List<Map<String, Object>> map(List<ConsumerRecord<?, ?>> records) {
+        return records.stream().filter(r -> (r != null && r.value() != null)).map(r -> {
+            Map<String, Object> m = JSONUtil.parseObj(r.value().toString());
+            return m;
+        }).collect(Collectors.toList());
+    }
+
+    public static List<Map<String, Object>> filterList(List<Map<String, Object>> list, String type) {
+        return list.stream().filter(r -> (r.get("topic_type").equals(type))).collect(Collectors.toList());
+    }
+}

+ 2 - 0
src/main/resources/META-INF/spring-devtools.properties

@@ -0,0 +1,2 @@
+restart.include.mapper=/mapper-[\\w-\\.]+jar
+restart.include.pagehelper=/pagehelper-[\\w-\\.]+jar

+ 68 - 0
src/main/resources/application-dev.properties

@@ -0,0 +1,68 @@
+#eureka.client.serviceUrl.defaultZone= http://106.14.81.247:8900/eureka/
+
+#Neo4j配置
+spring.data.neo4j.username=neo4j
+spring.data.neo4j.password=neo4j168
+#spring.data.neo4j.uri=http://106.14.211.187:7474
+spring.data.neo4j.uri=bolt://106.14.211.187:7687
+#prod
+#spring.data.neo4j.uri=bolt://10.29.30.244:7687
+
+#org.neo4j.driver.uri=bolt://106.14.211.187:7687
+#org.neo4j.driver.authentication.username=neo4j
+#org.neo4j.driver.authentication.password=neo4j168
+#spring.data.neo4j.embedded.enabled=true
+#spring.data.neo4j.driver=org.neo4j.ogm.drivers.http.driver.HttpDriver
+#数据库uri地址
+#spring.data.neo4j.uri=http://10.29.26.76:7474
+#spring.data.neo4j.uri=http://47.101.212.122:7474
+
+
+
+#spring.datasource.url = jdbc:mysql://47.100.20.161:3306/prism1?useUnicode=true&characterEncoding=utf-8
+#spring.datasource.username = firefly
+#spring.datasource.password = firefly
+
+scheduling.enabled = false
+
+
+
+#============== kafka ===================
+# 指定kafka 代理地址,可以多个
+spring.kafka.bootstrap-servers=106.14.211.187:9092
+#topic
+spring.kafka.topic_company_node=inc_company_node
+spring.kafka.topic_holder_v1=inc_holder_relation_v1
+spring.kafka.topic_holder_v2=inc_holder_relation_v2
+spring.kafka.topic_staff_relation=inc_staff_relation
+spring.kafka.topic_legal_entity_relation_v1=inc_legal_entity_relation_v1
+spring.kafka.topic_legal_entity_relation_v2=inc_legal_entity_relation_v2
+
+spring.kafka.topic_node_relation_union=inc_node_relation_union
+
+#spring.kafka.bootstrap-servers=192.168.4.237:9092,192.168.4.235:9092,192.168.4.236:9092
+#spring.kafka.topic=xf_test
+#=============== provider  =======================
+spring.kafka.producer.retries=3
+# 每次批量发送消息的数量
+spring.kafka.producer.batch-size=16384
+spring.kafka.producer.buffer-memory=33554432
+
+# 指定消息key和消息体的编解码方式
+spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
+
+#=============== consumer  =======================
+# 指定默认消费者group id
+spring.kafka.consumer.group-id=user-log-group1
+
+spring.kafka.consumer.auto-offset-reset=earliest
+# 取消自动提交
+spring.kafka.consumer.enable-auto-commit=true
+spring.kafka.consumer.auto-commit-interval=100
+
+# 指定消息key和消息体的编解码方式
+spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+# 手动提交
+#spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

+ 60 - 0
src/main/resources/application-prd.properties

@@ -0,0 +1,60 @@
+#eureka.client.serviceUrl.defaultZone= http://192.168.1.1:8900/eureka/
+
+#Neo4j配置
+spring.data.neo4j.username=neo4j
+spring.data.neo4j.password=neo4j168
+#spring.data.neo4j.uri=http://192.168.2.55:7474
+spring.data.neo4j.uri=bolt://192.168.2.55:7687
+
+
+#spring.datasource.url = jdbc:mysql://rm-uf61r3m23ba1p5z3dfo.mysql.rds.aliyuncs.com:3306/prism1?useUnicode=true&characterEncoding=utf-8
+
+#内网地址
+#spring.datasource.url = jdbc:mysql://rm-uf61r3m23ba1p5z3d.mysql.rds.aliyuncs.com:3306/prism1?useUnicode=true&characterEncoding=utf-8
+#spring.datasource.username = wenshu
+#spring.datasource.password = wenshu_168
+
+scheduling.enabled = true
+
+
+#============== kafka ===================
+# 指定kafka 代理地址,可以多个
+#spring.kafka.bootstrap-servers=106.14.211.187:9092
+#spring.kafka.topic=test
+
+spring.kafka.bootstrap-servers=192.168.4.237:9092,192.168.4.235:9092,192.168.4.236:9092
+
+#topic
+spring.kafka.topic_company_node=inc_company_node
+spring.kafka.topic_holder_v1=inc_holder_relation_v1
+spring.kafka.topic_holder_v2=inc_holder_relation_v2
+spring.kafka.topic_staff_relation=inc_staff_relation
+spring.kafka.topic_legal_entity_relation_v1=inc_legal_entity_relation_v1
+spring.kafka.topic_legal_entity_relation_v2=inc_legal_entity_relation_v2
+
+spring.kafka.topic_node_relation_union=inc_node_relation_union
+
+#=============== provider  =======================
+spring.kafka.producer.retries=3
+# 每次批量发送消息的数量
+spring.kafka.producer.batch-size=16384
+spring.kafka.producer.buffer-memory=33554432
+
+# 指定消息key和消息体的编解码方式
+spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
+spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
+
+#=============== consumer  =======================
+# 指定默认消费者group id
+spring.kafka.consumer.group-id=neo4j_node_relation
+
+spring.kafka.consumer.auto-offset-reset=earliest
+# 取消自动提交
+spring.kafka.consumer.enable-auto-commit=true
+spring.kafka.consumer.auto-commit-interval=100
+
+# 指定消息key和消息体的编解码方式
+spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
+# 手动提交
+#spring.kafka.listener.ack-mode=MANUAL_IMMEDIATE

+ 54 - 0
src/main/resources/application.properties

@@ -0,0 +1,54 @@
+spring.profiles.active=prd
+
+###server
+server.port=9098
+logging.level.root=DEBUG
+
+spring.application.name=kafka-neo4j
+#eureka.instance.instance-id=${spring.cloud.client.ipAddress}:${server.port}
+
+#\u8BBE\u7F6Espring-boot \u7F16\u7801\u683C\u5F0F
+banner.charset=UTF-8
+server.tomcat.uri-encoding=UTF-8
+spring.http.encoding.charset=UTF-8
+spring.http.encoding.enabled=true
+spring.http.encoding.force=true
+
+logging.file= ./logs/my.log
+
+
+spring.jackson.date-format=yyyy-MM-dd HH:mm:ss
+spring.jackson.time-123456
+
+eureka.instance.preferIpAddress=true
+spring.messages.encoding=UTF-8
+
+spring.datasource.url = jdbc:mysql://rm-uf61r3m23ba1p5z3dfo.mysql.rds.aliyuncs.com:3306/prism1?useUnicode=true&characterEncoding=utf-8
+spring.datasource.username = wenshu
+spring.datasource.password = wenshu_168
+
+
+spring.datasource.type=com.alibaba.druid.pool.DruidDataSource
+spring.datasource.driverClassName = com.mysql.jdbc.Driver
+# 下面为连接池的补充设置,应用到上面所有数据源中
+# 初始化大小,最小,最大
+spring.datasource.initialSize=5
+spring.datasource.minIdle=5
+spring.datasource.maxActive=20
+# 配置获取连接等待超时的时间
+spring.datasource.maxWait=60000
+# 配置间隔多久才进行一次检测,检测需要关闭的空闲连接,单位是毫秒 
+spring.datasource.timeBetweenEvictionRunsMillis=60000
+# 配置一个连接在池中最小生存的时间,单位是毫秒 
+spring.datasource.minEvictableIdleTimeMillis=300000
+spring.datasource.validationQuery=SELECT 1 FROM DUAL
+spring.datasource.testWhileIdle=true
+spring.datasource.testOnBorrow=false
+spring.datasource.testOnReturn=false
+# 打开PSCache,并且指定每个连接上PSCache的大小 
+spring.datasource.poolPreparedStatements=true
+spring.datasource.maxPoolPreparedStatementPerConnectionSize=20
+# 配置监控统计拦截的filters,去掉后监控界面sql无法统计,'wall'用于防火墙 
+spring.datasource.filters=stat,wall,log4j
+# 通过connectProperties属性来打开mergeSql功能;慢SQL记录
+spring.datasource.connectionProperties=druid.stat.mergeSql=true;druid.stat.slowSqlMillis=5000

+ 41 - 0
src/main/resources/logback-spring.xml

@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<configuration>
+  <!-- 控制台打印日志的相关配置 --> 
+  <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender"> 
+    <!-- 日志格式 -->
+    <encoder>
+        <pattern>%d{yyyy-MM-dd HH:mm:ss} [%level] - %m%n</pattern>
+    </encoder>
+    <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+         <level>info</level>
+     </filter>
+
+  </appender>
+
+  <!-- 文件保存日志的相关配置 --> 
+  <appender name="ERROR-OUT" class="ch.qos.logback.core.rolling.RollingFileAppender">
+      <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
+          <level>info</level>
+      </filter>
+     <!-- 保存日志文件的路径 -->
+    <file>logs/info.log</file>
+    <!-- 日志格式 -->
+    <encoder>
+        <pattern>%d{yyyy-MM-dd HH:mm:ss} [%level] [%class:%line] - %m%n</pattern>
+    </encoder>
+
+    <!-- 循环政策:基于时间创建日志文件 -->
+    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+      <!-- 日志文件名格式 -->
+      <fileNamePattern>logs/info.%d{yyyy-MM-dd}.log</fileNamePattern>
+      <!-- 最大保存时间:30天-->
+      <maxHistory>30</maxHistory>
+    </rollingPolicy>
+  </appender>
+
+  <!-- 基于dubug处理日志:具体控制台或者文件对日志级别的处理还要看所在appender配置的filter,如果没有配置filter,则使用root配置 -->
+  <root level="debug">
+    <appender-ref ref="STDOUT" />
+    <appender-ref ref="ERROR-OUT" />
+  </root>
+</configuration>

+ 408 - 0
src/test/java/com/winhc/test/TestCreateNode.java

@@ -0,0 +1,408 @@
+package com.winhc.test;
+
+import com.alibaba.fastjson.JSON;
+import com.winhc.dto.CompanyQueryCondition;
+import com.winhc.entity.Company;
+import com.winhc.entity.Person;
+import com.winhc.kafka.KafkaProduce;
+import com.winhc.repository.CompanyRepository;
+import com.winhc.repository.PersonRepository;
+import com.winhc.service.CompanyRelationService;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.neo4j.driver.*;
+import org.neo4j.ogm.model.Edge;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.test.context.SpringBootTest;
+import org.springframework.test.context.junit4.SpringRunner;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.neo4j.driver.Values.parameters;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2020/12/25 16:24
+ */
+@RunWith(SpringRunner.class)
+@SpringBootTest
+public class TestCreateNode {
+    Log log = LogFactory.getLog(TestCreateNode.class);
+    @Autowired
+    CompanyRepository companyRepository;
+    @Autowired
+    PersonRepository personRepository;
+    @Autowired
+    CompanyRelationService CompanyRelationService;
+    @Autowired
+    Driver driver;
+    @Autowired
+    KafkaProduce kafkaProduce;
+
+
+    @Test
+    public void pushData() {
+        log.info("start push save !");
+        long start = System.currentTimeMillis();
+
+        Company c1 = new Company();
+        c1.setCompanyId("1");
+        c1.setName("小米");
+        companyRepository.save(c1);
+        Company c2 = new Company();
+        c2.setCompanyId("2");
+        c2.setName("华为");
+        companyRepository.save(c2);
+        Company c3 = new Company();
+        c3.setCompanyId("3");
+        c3.setName("赢火虫");
+        companyRepository.save(c3);
+
+        Person p1 = new Person();
+        p1.setName("张三");
+        p1.setPersonId("4");
+        personRepository.save(p1);
+        Person p2 = new Person();
+        p2.setName("李四");
+        p2.setPersonId("5");
+        personRepository.save(p2);
+        Person p3 = new Person();
+        p3.setName("张三");
+        p3.setPersonId("6");
+        personRepository.save(p3);
+        System.out.println(System.currentTimeMillis() - start);
+    }
+
+    @Test
+    public void pushDataRelation() {
+        CompanyQueryCondition c = new CompanyQueryCondition();
+        c.setStartId("1");
+        c.setEndId("2");
+        c.setStatus("0");
+        c.setLabel("TOUZI");
+        c.setFlag("1");
+        CompanyRelationService.createCompanyRelation2(c);
+        CompanyQueryCondition c2 = new CompanyQueryCondition();
+        c2.setStartId("2");
+        c2.setEndId("3");
+        c2.setStatus("0");
+        c2.setLabel("TOUZI");
+        c2.setFlag("1");
+        CompanyRelationService.createCompanyRelation2(c2);
+        CompanyQueryCondition c3 = new CompanyQueryCondition();
+        c3.setStartId("4");
+        c3.setEndId("3");
+        c3.setStatus("0");
+        c3.setLabel("TOUZI");
+        CompanyRelationService.createCompanyRelation2(c3);
+        CompanyQueryCondition c4 = new CompanyQueryCondition();
+        c4.setStartId("6");
+        c4.setEndId("1");
+        c4.setStatus("0");
+        c4.setLabel("TOUZI");
+        CompanyRelationService.createCompanyRelation2(c4);
+        CompanyQueryCondition c5 = new CompanyQueryCondition();
+        c5.setStartId("5");
+        c5.setEndId("3");
+        c5.setStatus("0");
+        c5.setLabel("TOUZI");
+        CompanyRelationService.createCompanyRelation2(c5);
+    }
+
+    @Test
+    public void pushDataRelation2() {
+        try (Session session = driver.session()) {
+            List<Record> result = session.writeTransaction(new TransactionWork<List<Record>>() {
+                @Override
+                public List<Record> execute(Transaction tx) {
+                    Result result = tx.run("CALL apoc.periodic.iterate(\n" +
+                                    "'MATCH (p:PERSON)-[:TOUZI*1..5]-(q:PERSON)\n" +
+                                    "WHERE p.name=q.name AND ID(p)<>ID(q)\n" +
+                                    "WITH p,q\n" +
+                                    "LIMIT 1\n" +
+                                    "MATCH (q)-[r]-(x)\n" +
+                                    "WHERE x<>p\n" +
+                                    "RETURN p,q,r,x',\n" +
+                                    "'DELETE r\n" +
+                                    "MERGE (p)-[:TOUZI]-(x)\n" +
+                                    "SET p:合并\n" +
+                                    "SET q:删除',\n" +
+                                    "{batchSize:1000,parallel:false,retries:3,iterateList:true}\n" +
+                                    ") YIELD batches, total",
+                            parameters("message", "xxx"));
+                    return result.list();
+                }
+            });
+            System.out.println(result);
+        }
+    }
+
+    @Test
+    public void pushDataRelation3() {
+        Session session = driver.session();
+        List<Map<String, Object>> batch_list = new ArrayList<>();
+
+        for (int i = 0; i <= 1000; i++) {
+            HashMap<String, Object> m1 = new HashMap<>();
+            m1.put("companyId", "companyId" + i);
+            m1.put("personId", "personId" + i);
+            m1.put("companyName", "companyName" + i);
+            m1.put("personName", "personName" + i);
+            m1.put("relType", "relType_old" + i);
+            m1.put("status", "1");
+            m1.put("percent", i * 0.5);
+            m1.put("rid", i * 3);
+            batch_list.add(m1);
+        }
+        Map parameters = new HashMap() {{
+            put("batch_list", batch_list);
+        }};
+
+        System.out.println("list: " + batch_list.toString());
+        long start = System.currentTimeMillis();
+//            final String cql = "WITH  {batch_list} AS batch_list \n" +
+//                    "UNWIND batch_list AS row \n" +
+//                    "MERGE(company:COMPANY1{companyId:row.companyId}) \n" +
+//                    "ON CREATE SET company.name=row.companyName, company.companyId=row.companyId \n" +
+//                    "MERGE(person:PERSON1{personId:row.personId}) \n" +
+//                    "ON CREATE SET person.name=row.personName, person.personId=row.personId \n" +
+//                    "WITH person,company,row\n" +
+//                    "MERGE(person)-[:投资1]->(company) ";
+
+        final String cql = "WITH  {batch_list} AS batch_list \n" +
+                "UNWIND batch_list AS row \n" +
+                "MERGE(company:COMPANY1{companyId:row.companyId}) \n" +
+                "SET company.name=row.companyName, company.companyId=row.companyId \n" +
+                //"FOREACH (_ IN case when row.personId is not null then [1] else [] end|\n" +
+                "MERGE(person:PERSON1{personId:row.personId}) \n" +
+                "SET person.name=row.personName, person.personId=row.personId \n" +
+                //")" +
+                "MERGE(person)-[r:高管]->(company)" +
+                "SET r.percent=row.percent, r.status=row.status \n" +
+                "WITH person,company,row\n" +
+                "CALL apoc.merge.relationship(person, row.relType, {},{percent:row.percent,status:row.status}, company) YIELD rel \n" +
+                "WITH rel,row \n" +
+                "SET rel.status= row.status, rel.percent= row.percent \n";
+
+
+        List<Record> result = session.writeTransaction(new TransactionWork<List<Record>>() {
+            @Override
+            public List<Record> execute(Transaction tx) {
+                Result result = tx.run(cql, parameters);
+                return result.list();
+            }
+        });
+        System.out.println(result);
+        System.out.println("cost" + (System.currentTimeMillis() - start));
+    }
+
+    public void saveEdges(List<Edge> edges) {
+        StringBuilder sb = new StringBuilder();
+
+        sb.append("UNWIND {batch} as row ") //
+                .append(" WITH split(row.properties.from, '/') AS fromInfo, " //
+                        + "split(row.properties.to, '/') AS toInfo, row ") //
+                .append(" CALL apoc.cypher.doIt(" //
+                        + "'MATCH (from:`' + fromInfo[0] + '` {id: {fromId}})" //
+                        + " MATCH (to:`' + toInfo[0] + '` {id: {toId}}) " //
+                        + " MERGE (from)-[r:`' + row.properties.label + '` {id: {id}}]->(to) " //
+                        + " SET n += {properties}', " //
+                        + "{ fromId: row.properties.from, toId: row.properties.to, " //
+                        + " properties: row.properties, id: row.properties.id }" //
+                        + ") YIELD value") //
+                .append(" RETURN 1 ");
+
+        String statement = sb.toString();
+
+//        Map<String, Object> params = new HashMap<>();
+//        List<Map<String, Object>> batches = new ArrayList<>();
+//        for (Edge e : edges) {
+//            Map<String, Object> map = new HashMap<>();
+//            map.put("id", e.getId());
+//            map.put("from", e.getFrom());
+//            map.put("to", e.getTo());
+//            map.put("properties", e.getProperties());
+//            batches.add(map);
+//        }
+//        params.put("batch", batches);
+//
+//        cypher.query(statement, params, null);
+    }
+
+
+//"CALL apoc.create.relationship(person, row.relType,{status:row.status,percent:row.percent}, company) YIELD rel RETURN count(*) \n" ;
+    //Procedure apoc.create.relationship has signature: apoc.create.relationship(from :: NODE?, relType :: STRING?, props :: MAP?, to :: NODE?) :: rel :: RELATIONSHIP?
+    //Procedure apoc.merge.relationship has signature: apoc.merge.relationship(startNode :: NODE?, relationshipType :: STRING?, identProps :: MAP?, props :: MAP?, endNode :: NODE?) :: rel :: RELATIONSHIP?
+
+    @Test
+    public void sendKafka() {
+        String topic = "compamy_relation3";
+        Long start = System.currentTimeMillis();
+
+//        params.put("companyId", "222");
+//        params.put("name", "bbb");
+//        String msg = JSON.toJSONString(params);
+//        System.out.println(msg);
+//        kafkaProduce.produce(topic, msg);
+
+//        for (int i = 0; i < 100000; i++) {
+//            params.put("companyId", "id" + i);
+//            params.put("name", "name" + i);
+//            String msg = JSON.toJSONString(params);
+//            kafkaProduce.produce(topic, msg);
+//        }
+//        System.out.println("cost: " + (System.currentTimeMillis() - start));
+
+        for (int i = 200000; i <= 300000; i++) {
+            Map<String, Object> m1 = new HashMap<>();
+            m1.put("companyId", "companyId" + i);
+            m1.put("personId", "personId" + i);
+            m1.put("companyName", "companyName_7_" + i);
+            m1.put("personName", "personName_7_" + i);
+            m1.put("relType", "relType_7_" + i+2);
+            m1.put("status", "0");
+            m1.put("percent", i * 0.5+"");
+            m1.put("rid", i * 3+"");
+            String msg = JSON.toJSONString(m1);
+            System.out.println(msg);
+            kafkaProduce.produce(topic, msg);
+        }
+    }
+
+    @Test
+    public void sendKafkaTest() {
+        String topic = "test";
+        for (int i = 260; i <= 270; i++) {
+            HashMap<String, Object> m1 = new HashMap<>();
+            m1.put("start_id", "start_id" + i);
+            m1.put("end_id", "end_id" + i);
+            m1.put("startName", "startName" + i+1);
+            m1.put("endName", "endName" + i+1);
+            m1.put("deleted", 3);
+            m1.put("percent", i * 0.1);
+            m1.put("rid", i * 3);
+            String msg = JSON.toJSONString(m1);
+            System.out.println(msg);
+            kafkaProduce.produce(topic, msg);
+        }
+    }
+
+    @Test
+    public void sendKafkaCompanyNode() {
+        //String topic = "inc_company_node";
+        String topic = "inc_node_relation_union";
+        for (int i = 1; i <= 1000; i++) {
+            HashMap<String, Object> m1 = new HashMap<>();
+            m1.put("id", "companyId" + i);
+            m1.put("name", "name" + i+1);
+            m1.put("deleted", 3);
+            m1.put("topic_type", "1");
+            String msg = JSON.toJSONString(m1);
+            System.out.println(msg);
+            kafkaProduce.produce(topic, msg);
+        }
+    }
+
+    @Test
+    public void sendKafkaHolderV1() {
+        //String topic = "inc_holder_relation_v1";
+        String topic = "inc_node_relation_union";
+        for (int i = 1; i <= 1000; i++) {
+            HashMap<String, Object> m1 = new HashMap<>();
+            m1.put("start_id", "start_id" + i);
+            m1.put("end_id", "end_id" + i);
+            m1.put("start_name", "startName" + i+1);
+            m1.put("end_name", "endName" + i+1);
+            m1.put("deleted", 3);
+            m1.put("percent", i * 0.1);
+            m1.put("topic_type", "2");
+            String msg = JSON.toJSONString(m1);
+            System.out.println(msg);
+            kafkaProduce.produce(topic, msg);
+        }
+    }
+
+    @Test
+    public void sendKafkaHolderV2() {
+        //String topic = "inc_holder_relation_v2";
+        String topic = "inc_node_relation_union";
+        for (int i = 1000; i <= 2000; i++) {
+            HashMap<String, Object> m1 = new HashMap<>();
+            m1.put("start_id", "start_id" + i);
+            m1.put("end_id", "end_id" + i);
+            m1.put("start_name", "startName" + i+1);
+            m1.put("end_name", "endName" + i+1);
+            m1.put("deleted", 3);
+            m1.put("percent", i * 0.1);
+            m1.put("topic_type", "3");
+            String msg = JSON.toJSONString(m1);
+            System.out.println(msg);
+            kafkaProduce.produce(topic, msg);
+        }
+    }
+
+
+
+    @Test
+    public void sendKafkaLegalEntityV1() {
+        //String topic = "inc_legal_entity_relation_v1";
+        String topic = "inc_node_relation_union";
+        for (int i = 1000; i <= 2000; i++) {
+            HashMap<String, Object> m1 = new HashMap<>();
+            m1.put("start_id", "start_id" + i);
+            m1.put("end_id", "end_id" + i);
+            m1.put("start_name", "startName" + i+1);
+            m1.put("end_name", "endName" + i+1);
+            m1.put("deleted", 0);
+            m1.put("topic_type", "4");
+            String msg = JSON.toJSONString(m1);
+            System.out.println(msg);
+            kafkaProduce.produce(topic, msg);
+        }
+    }
+
+    @Test
+    public void sendKafkaLegalEntityV2() {
+        //String topic = "inc_legal_entity_relation_v2";
+        String topic = "inc_node_relation_union";
+        for (int i = 1000; i <= 2000; i++) {
+            HashMap<String, Object> m1 = new HashMap<>();
+            m1.put("start_id", "start_id" + i);
+            m1.put("end_id", "end_id" + i);
+            m1.put("start_name", "startName" + i+1);
+            m1.put("end_name", "endName" + i+1);
+            m1.put("deleted", 0);
+            m1.put("topic_type", "5");
+            String msg = JSON.toJSONString(m1);
+            System.out.println(msg);
+            kafkaProduce.produce(topic, msg);
+        }
+    }
+
+    @Test
+    public void sendKafkStaff() {
+        //String topic = "inc_staff_relation";
+        String topic = "inc_node_relation_union";
+        for (int i = 1000; i <= 2000; i++) {
+            HashMap<String, Object> m1 = new HashMap<>();
+            m1.put("start_id", "start_id" + i);
+            m1.put("end_id", "end_id" + i);
+            m1.put("start_name", "startName" + i+1);
+            m1.put("end_name", "endName" + i+1);
+            m1.put("deleted", 0);
+            m1.put("staff_type", "董事"+i);
+            m1.put("topic_type", "6");
+            String msg = JSON.toJSONString(m1);
+            System.out.println(msg);
+            kafkaProduce.produce(topic, msg);
+        }
+    }
+
+
+}

+ 58 - 0
src/test/java/com/winhc/test/TestJson.java

@@ -0,0 +1,58 @@
+package com.winhc.test;
+
+import cn.hutool.json.JSONUtil;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.parser.Feature;
+import com.alibaba.fastjson.serializer.DoubleSerializer;
+import com.alibaba.fastjson.serializer.SerializeConfig;
+import com.google.gson.Gson;
+import org.apache.ibatis.javassist.expr.NewArray;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2021/1/8 16:21
+ */
+public class TestJson {
+
+    @Test
+    public void test1() {
+        List<Map<String, Object>> batch_list = new ArrayList<>();
+
+//        for (int i = 0; i <= 10; i++) {
+            Integer i = 3;
+            HashMap<String, Object> m1 = new HashMap<>();
+            m1.put("startId", "startId" + i);
+            m1.put("endId", "endId" + i);
+            m1.put("startName", "startName" + i);
+            m1.put("endName", "endName" + i);
+            m1.put("deleted", 1);
+            m1.put("percent", i * 0.5);
+            m1.put("rid", i * 3);
+            batch_list.add(m1);
+//        }
+        String s = JSON.toJSONString(m1);
+        JSON.DEFAULT_PARSER_FEATURE &= ~Feature.UseBigDecimal.getMask();
+        int disableDecimalFeature = JSON.DEFAULT_PARSER_FEATURE &= ~Feature.UseBigDecimal.getMask();
+        SerializeConfig config = SerializeConfig.getGlobalInstance();
+        config.put(Double.class, new DoubleSerializer("#.#####"));
+        JSONObject JB = JSON.parseObject(s, JSONObject.class, disableDecimalFeature);
+//        JSONObject JB = JSON.parseObject(s, JSONObject.class);
+        Map<String, Object> map = JB;
+        System.out.println(JB);
+
+        Gson gson = new Gson();
+        Map map1 = gson.fromJson(s, Map.class);
+        System.out.println(JB);
+        cn.hutool.json.JSONObject jsonObject = JSONUtil.parseObj(s);
+        System.out.println(jsonObject);
+
+    }
+}