xufei 2 years ago
parent
commit
48ab581f28

+ 251 - 0
.gitignore

@@ -0,0 +1,251 @@
+### Scala template
+*.class
+*.log
+src/test/resources/data
+### Example user template template
+### Example user template
+
+# IntelliJ project files
+.idea
+*.iml
+out
+gen
+### Java template
+# Compiled class file
+*.class
+
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+### Python template
+# Byte-compiled / optimized / DLL files
+__pycache__/
+*.py[cod]
+*$py.class
+
+# C extensions
+*.so
+
+# Distribution / packaging
+.Python
+build/
+develop-eggs/
+dist/
+downloads/
+eggs/
+.eggs/
+lib/
+lib64/
+parts/
+sdist/
+var/
+wheels/
+share/python-wheels/
+*.egg-info/
+.installed.cfg
+*.egg
+MANIFEST
+
+# PyInstaller
+#  Usually these files are written by a python script from a template
+#  before PyInstaller builds the exe, so as to inject date/other infos into it.
+*.manifest
+*.spec
+
+# Installer logs
+pip-log.txt
+pip-delete-this-directory.txt
+
+# Unit test / coverage reports
+htmlcov/
+.tox/
+.nox/
+.coverage
+.coverage.*
+.cache
+nosetests.xml
+coverage.xml
+*.cover
+*.py,cover
+.hypothesis/
+.pytest_cache/
+cover/
+
+# Translations
+*.mo
+*.pot
+
+# Django stuff:
+*.log
+local_settings.py
+db.sqlite3
+db.sqlite3-journal
+
+# Flask stuff:
+instance/
+.webassets-cache
+
+# Scrapy stuff:
+.scrapy
+
+# Sphinx documentation
+docs/_build/
+
+# PyBuilder
+.pybuilder/
+target/
+
+# Jupyter Notebook
+.ipynb_checkpoints
+
+# IPython
+profile_default/
+ipython_config.py
+
+# pyenv
+#   For a library or package, you might want to ignore these files since the code is
+#   intended to run in multiple environments; otherwise, check them in:
+# .python-version
+
+# pipenv
+#   According to pypa/pipenv#598, it is recommended to include Pipfile.lock in version control.
+#   However, in case of collaboration, if having platform-specific dependencies or dependencies
+#   having no cross-platform support, pipenv may install dependencies that don't work, or not
+#   install all needed dependencies.
+#Pipfile.lock
+
+# PEP 582; used by e.g. github.com/David-OConnor/pyflow
+__pypackages__/
+
+# Celery stuff
+celerybeat-schedule
+celerybeat.pid
+
+# SageMath parsed files
+*.sage.py
+
+# Environments
+.env
+.venv
+env/
+venv/
+ENV/
+env.bak/
+venv.bak/
+
+# Spyder project settings
+.spyderproject
+.spyproject
+
+# Rope project settings
+.ropeproject
+
+# mkdocs documentation
+/site
+
+# mypy
+.mypy_cache/
+.dmypy.json
+dmypy.json
+
+# Pyre type checker
+.pyre/
+
+# pytype static type analyzer
+.pytype/
+
+# Cython debug symbols
+cython_debug/
+
+### JetBrains template
+# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider
+# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839
+
+# User-specific stuff
+.idea/**/workspace.xml
+.idea/**/tasks.xml
+.idea/**/usage.statistics.xml
+.idea/**/dictionaries
+.idea/**/shelf
+
+# Generated files
+.idea/**/contentModel.xml
+
+# Sensitive or high-churn files
+.idea/**/dataSources/
+.idea/**/dataSources.ids
+.idea/**/dataSources.local.xml
+.idea/**/sqlDataSources.xml
+.idea/**/dynamic.xml
+.idea/**/uiDesigner.xml
+.idea/**/dbnavigator.xml
+
+# Gradle
+.idea/**/gradle.xml
+.idea/**/libraries
+
+# Gradle and Maven with auto-import
+# When using Gradle or Maven with auto-import, you should exclude module files,
+# since they will be recreated, and may cause churn.  Uncomment if using
+# auto-import.
+# .idea/artifacts
+# .idea/compiler.xml
+# .idea/jarRepositories.xml
+# .idea/modules.xml
+# .idea/*.iml
+# .idea/modules
+# *.iml
+# *.ipr
+
+# CMake
+cmake-build-*/
+
+# Mongo Explorer plugin
+.idea/**/mongoSettings.xml
+
+# File-based project format
+*.iws
+
+# IntelliJ
+out/
+
+# mpeltonen/sbt-idea plugin
+.idea_modules/
+
+# JIRA plugin
+atlassian-ide-plugin.xml
+
+# Cursive Clojure plugin
+.idea/replstate.xml
+
+# Crashlytics plugin (for Android Studio and IntelliJ)
+com_crashlytics_export_strings.xml
+crashlytics.properties
+crashlytics-build.properties
+fabric.properties
+
+# Editor-based Rest Client
+.idea/httpRequests
+
+# Android studio 3.1+ serialized cache file
+.idea/caches/build_file_checksums.ser
+dependency-reduced-pom.xml
+/src/main/scala/com/winhc/bigdata/flink/xjk/

+ 10 - 0
data/env-dev-sj.yml

@@ -0,0 +1,10 @@
+winhc:
+  mysql:
+    driverClassName: com.mysql.jdbc.Driver
+    jdbcUrl: jdbc:mysql://39.152.30.112:5306/winhc?characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true
+    username: winhc
+    password: winhc123
+
+  task:
+    numThread: 8
+    batchSize: 1000

+ 10 - 0
data/env-dev.yml

@@ -0,0 +1,10 @@
+winhc:
+  mysql:
+    driverClassName: com.mysql.jdbc.Driver
+    jdbcUrl: jdbc:mysql://rm-uf666q55e678h9514so.mysql.rds.aliyuncs.com/export_data_back?characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true
+    username: wenshu
+    password: wenshu_168
+
+  task:
+    numThread: 6
+    batchSize: 500

+ 10 - 0
data/env-prod.yml

@@ -0,0 +1,10 @@
+winhc:
+  mysql:
+    driverClassName: com.mysql.jdbc.Driver
+    jdbcUrl: jdbc:mysql://rm-uf666q55e678h9514123990.mysql.rds.aliyuncs.com/export_data_back?characterEncoding=utf8&useSSL=false&rewriteBatchedStatements=true&useServerPrepStmts=true&cachePrepStmts=true
+    username: wenshu
+    password: wenshu_168
+
+  task:
+    numThread: 8
+    batchSize: 1000

+ 4 - 0
data/env.yml

@@ -0,0 +1,4 @@
+winhc:
+  oss:
+    accessid: LTAI5tG3JANdcd6SQPDVSieV
+    accesskey: MFtVO7LvJNTDt6asJLtZhW2S9M5vpQ

+ 116 - 0
pom.xml

@@ -0,0 +1,116 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.winhc</groupId>
+    <artifactId>data_pull_client</artifactId>
+    <version>1.0.0</version>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-log4j12</artifactId>
+            <version>1.7.7</version>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+        </dependency>
+        <!--mysql jdbc连接-->
+        <dependency>
+            <groupId>mysql</groupId>
+            <artifactId>mysql-connector-java</artifactId>
+            <version>5.1.44</version>
+        </dependency>
+        <dependency>
+            <groupId>com.alibaba</groupId>
+            <artifactId>fastjson</artifactId>
+            <version>1.2.83</version>
+        </dependency>
+        <dependency>
+            <groupId>com.zaxxer</groupId>
+            <artifactId>HikariCP</artifactId>
+            <version>4.0.3</version>
+        </dependency>
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.20</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>com.aliyun.oss</groupId>
+            <artifactId>aliyun-sdk-oss</artifactId>
+            <version>3.13.1</version>
+        </dependency>
+        <dependency>
+            <groupId>org.yaml</groupId>
+            <artifactId>snakeyaml</artifactId>
+            <version>1.17</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>3.9</version>
+        </dependency>
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+            <version>5.3.5</version>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>2.4.1</version>
+                <executions>
+                    <execution>
+                        <phase>package</phase>
+                        <goals>
+                            <goal>shade</goal>
+                        </goals>
+                        <configuration>
+                            <transformers>
+                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
+                                    <mainClass>com.winhc.data.DataPull</mainClass>
+                                </transformer>
+                            </transformers>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+            <plugin>
+                <groupId>com.google.cloud.tools</groupId>
+                <artifactId>jib-maven-plugin</artifactId>
+                <version>3.3.1</version>
+                <configuration>
+                    <from>
+                        <image>
+                            eclipse-temurin:8u372-b07-jre-alpine
+                        </image>
+
+                    </from>
+                    <to>
+                        <image>registry.cn-shanghai.aliyuncs.com/winhc-spider/data-pull-client</image>
+                        <auth>
+                            <username>zhangji@whcdata</username>
+                            <password>WUDk&amp;tC)SiTGouRP7noWucx{7C4|9dCf</password>
+                        </auth>
+                    </to>
+                    <container>
+                        <mainClass>com.winhc.data.DataPull</mainClass>
+                    </container>
+                </configuration>
+            </plugin>
+
+        </plugins>
+    </build>
+
+</project>

+ 192 - 0
src/main/java/com/winhc/data/DataPull.java

@@ -0,0 +1,192 @@
+package com.winhc.data;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.*;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+
+import cn.hutool.core.lang.Assert;
+import com.winhc.data.task.BaseTask;
+import com.winhc.data.task.SinkTask;
+import com.winhc.data.task.DownLoadTask;
+import com.winhc.data.utils.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+
+import static com.winhc.data.utils.Common.*;
+import static com.winhc.data.utils.DateUtils.getCurrentHours;
+import static com.winhc.data.utils.EnvConst.initAllConf;
+import static com.winhc.data.utils.EnvProperties.getYmlByFileNameOut;
+import static com.winhc.data.utils.OssUtils.*;
+import static java.lang.System.getenv;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2023/5/29 11:26
+ */
+public class DataPull {
+    private static final Logger logger = LoggerFactory.getLogger(DataPull.class);
+    private static volatile boolean isRunning = false;
+
+    public static void main(String[] args) throws Exception {
+
+        Map<String, String> params = ParameterTool
+                .fromArgs(args)
+                .mergeWith(ParameterTool.fromSystemProperties())
+                .mergeWith(ParameterTool.fromMap(getenv()))
+                .toMap();
+        String confPath;
+        if (params.containsKey("docker.start")) {
+            confPath = "/config.yml";
+        } else {
+            Assert.isTrue(params.containsKey("conf"), "请输入配置文件路径\t:\t--conf conf_path");
+            confPath = params.get("conf");
+        }
+
+        Assert.isTrue(FileUtils.fileExists(confPath), "配置文件不存在\t:\t检查conf路径是否正确");
+        Map<String, String> confMap = new HashMap<>();
+        Assert.isTrue(getYmlByFileNameOut(confPath, confMap), "conf解析异常\t:\t配置文件必须yml格式");
+        //merge params
+        initAllConf(args, confMap);
+
+        int queueSize = 100000;
+        int numThread = Integer.parseInt(EnvConst.getValue("winhc.task.numThread", "8"));
+        int batchSize = Integer.parseInt(EnvConst.getValue("winhc.task.batchSize", "1000"));
+
+        //单次运行
+        //startOne(queueSize, numThread, batchSize);
+        //保持进程
+        startAllTime(queueSize, numThread, batchSize);
+
+    }
+
+    private static void startOne(int queueSize, int numThread, int batchSize) throws Exception {
+        //获取任务
+        while (true) {
+            Boolean finish = exec(queueSize, numThread, batchSize);
+            if (finish) break;
+            //超过12点自动退出
+            int currentHours = getCurrentHours();
+            if (currentHours >= 18) {
+                logger.info("waiting task time out , end hour {} !!!", currentHours);
+                break;
+            }
+        }
+    }
+
+    private static void startAllTime(int queueSize, int numThread, int batchSize) throws Exception {
+        //获取任务
+        while (true) {
+            exec(queueSize, numThread, batchSize);
+        }
+    }
+
+    private static Boolean exec(int queueSize, int numThread, int batchSize) throws Exception {
+        //获取下载文件
+        List<String> uploadTasks = new ArrayList<>();
+        CopyOnWriteArrayList<String> tasks = new CopyOnWriteArrayList<>(findTasks(uploadTasks));
+        CopyOnWriteArrayList<String> success_task = new CopyOnWriteArrayList<>();
+
+        long sleep = 1000 * 10;
+        boolean flag = false;
+        if (tasks.isEmpty()) {
+            logger.info("no task to run !!!");
+            Thread.sleep(sleep);
+            return true;
+        }
+        for (String ds : tasks) {
+            //判断文件是否上传
+            if (existsFile(FILE_PRE + ds + FILE_UPLOAD_SUFFIX)) {
+                startTask(numThread, batchSize, queueSize, ds);
+                tasks.remove(ds);
+                success_task.add(ds);
+            }
+            //剔除空数据
+            if (uploadTasks.contains(ds)) {
+                tasks.remove(ds);
+            }
+        }
+        //休眠
+        if (!tasks.isEmpty()) {
+            logger.info("wait task \t{}\t!!!", String.join("\t|\t", tasks));
+        } else {
+            logger.info("task success \t{}\t!!!", String.join("\t|\t", success_task));
+            flag = true;
+        }
+        Thread.sleep(sleep);
+        return flag;
+    }
+
+    private static void startTask(int numThread, int batchNum, int queueSize, String ds) throws Exception {
+        BlockingQueue<String> queue = new ArrayBlockingQueue<>(queueSize);
+        ExecutorService executor = Executors.newFixedThreadPool(numThread + 1);
+        DataSource dataSource = HikariUtil.getHikariCPDataSource();
+        CountDownLatch latch = new CountDownLatch(numThread);
+        List<SinkTask> tasks = new ArrayList<>();
+        try {
+            for (int k = 0; k < numThread; k++) {
+                SinkTask workThread = new SinkTask(queue, dataSource.getConnection(), batchNum, latch, ds);
+                tasks.add(workThread);
+                executor.execute(workThread);
+            }
+        } catch (Exception e) {
+            logger.error("add work thread error", e);
+        }
+        isRunning = true;
+        Runnable statThread = () -> {
+            while (isRunning) {
+                long total = tasks.stream().map(SinkTask::getProcessNum).reduce(0L, Long::sum);
+                logger.info("{} heartbeat {} thread process {}", ds, numThread, total);
+                try {
+                    TimeUnit.MILLISECONDS.sleep(2000);
+                } catch (InterruptedException e) {
+                    logger.error(e.getMessage(), e);
+                    throw new RuntimeException(e.getMessage());
+                }
+            }
+        };
+
+        executor.execute(statThread);
+        executor.shutdown();
+
+        long start = System.currentTimeMillis();
+        try {
+            DownLoadTask downLoadTask = new DownLoadTask(queue, dataSource.getConnection(), ds);
+            downLoadTask.downLoad(FILE_PRE + ds + FILE_UPLOAD_SUFFIX);
+        } catch (Exception e) {
+            success(numThread, queue);
+            throw new RuntimeException(e.getMessage());
+        }
+        success(numThread, queue);
+        latch.await();
+        for (SinkTask task : tasks) {
+            task.close();
+        }
+        executor.shutdownNow();
+        long totalNum = tasks.stream().map(SinkTask::getProcessNum).reduce(0L, Long::sum);
+        logger.info("{} all {} thread process finish | total count {} | total cost {} ms", ds, numThread, totalNum, (System.currentTimeMillis() - start));
+
+        //拉取成功标志
+        String sql2 = Common.UPDATE_SQL.replaceAll("@ds", ds).replaceAll("@status", "2");
+        BaseTask baseTask = new BaseTask(dataSource.getConnection());
+        baseTask.upsertStatus(sql2);
+        baseTask.close();
+        //上传状态
+        saveStatus(ds, STATUS_SUCCESS_SUFFIX);
+    }
+
+    private static void success(int numThread, BlockingQueue<String> queue) {
+        for (int i = 0; i < numThread; i++) {
+            queue.add("QUIT");
+        }
+        isRunning = false;
+    }
+
+}

+ 43 - 0
src/main/java/com/winhc/data/task/BaseTask.java

@@ -0,0 +1,43 @@
+package com.winhc.data.task;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2023/5/30 17:32
+ */
+public class BaseTask {
+
+    private static final Logger logger = LoggerFactory.getLogger(BaseTask.class);
+
+    private final Connection conn;
+    private final Statement stmt;
+
+    public BaseTask(Connection conn) throws SQLException {
+        this.conn = conn;
+        this.stmt = conn.createStatement();
+    }
+
+    public void close() throws SQLException {
+        if (stmt != null) {
+            stmt.close();
+        }
+        if (conn != null) {
+            conn.close();
+        }
+    }
+
+    public void upsertStatus(String sql) throws SQLException {
+        if (StringUtils.isNotBlank(sql)) {
+            stmt.execute(sql);
+        }
+    }
+}

+ 86 - 0
src/main/java/com/winhc/data/task/DownLoadTask.java

@@ -0,0 +1,86 @@
+package com.winhc.data.task;
+
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.model.GetObjectRequest;
+import com.aliyun.oss.model.OSSObject;
+import com.winhc.data.DataPull;
+import com.winhc.data.utils.BaseUtils;
+import com.winhc.data.utils.Common;
+import com.winhc.data.utils.OssUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.concurrent.BlockingQueue;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2023/5/29 16:39
+ */
+public class DownLoadTask {
+
+    private static final Logger logger = LoggerFactory.getLogger(DownLoadTask.class);
+
+    private final BlockingQueue<String> queue;
+
+    private final Statement stmt;
+
+    private final Connection conn;
+
+    private final String ds;
+
+    public DownLoadTask(BlockingQueue<String> queue, Connection conn , String ds) throws SQLException {
+        this.queue = queue;
+        this.conn = conn;
+        this.stmt = conn.createStatement();
+        this.ds = ds;
+    }
+
+
+    public void downLoad(String objectName) throws SQLException {
+        OSS ossClient = null;
+        try {
+            ossClient = OssUtils.getOssClient();
+            GetObjectRequest getObjectRequest = new GetObjectRequest(OssUtils.BUCKETNAME, objectName);
+            OSSObject ossObject = ossClient.getObject(getObjectRequest);
+            InputStream content = ossObject.getObjectContent();
+            if (content != null) {
+                this.stmt.execute(Common.CREATE_SQL.replaceAll("@ds", ds).replaceAll("@status", "1"));
+                BufferedReader reader = new BufferedReader(new InputStreamReader(content));
+                while (true) {
+                    String line = reader.readLine();
+                    if (StringUtils.isBlank(line)) break;
+                    JSONObject data = JSONObject.parseObject(line);
+                    queue.put(data.getString("sql"));
+                }
+            }
+            if (content != null) {
+                content.close();
+            }
+        } catch (Exception e) {
+            logger.error("DownLoad Error Message: {},{}", e.getMessage(), e);
+            throw new RuntimeException(e.getMessage());
+        } finally {
+            // 关闭OSSClient。
+            if (ossClient != null) {
+                ossClient.shutdown();
+            }
+            if (stmt != null) {
+                stmt.close();
+            }
+            if (conn != null) {
+                conn.close();
+            }
+        }
+    }
+}

+ 99 - 0
src/main/java/com/winhc/data/task/SinkTask.java

@@ -0,0 +1,99 @@
+package com.winhc.data.task;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.sql.DataSource;
+import java.sql.Connection;
+import java.sql.SQLException;
+import java.sql.Statement;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2023/5/29 11:41
+ */
+public class SinkTask extends Thread {
+    private static final Logger logger = LoggerFactory.getLogger(SinkTask.class);
+    private static int counter = 0;
+    private final int id = ++counter;
+    private final BlockingQueue<String> queue;
+    private final int batchNum;
+    private final CountDownLatch latch;
+    private final AtomicLong sum = new AtomicLong(0);
+    private final Statement stmt;
+    private final List<String> batch;
+    private final String ds;
+    private final Connection connection;
+
+
+    public SinkTask(BlockingQueue<String> queue, Connection connection, int batchNum, CountDownLatch latch, String ds) throws Exception {
+        this.queue = queue;
+        this.batchNum = batchNum;
+        this.latch = latch;
+        this.connection = connection;
+        this.stmt = connection.createStatement();
+        this.batch = new ArrayList<>();
+        this.ds = ds;
+    }
+
+    @Override
+    public void run() {
+        writeData();
+        latch.countDown();
+    }
+
+    private void writeData() {
+        try {
+            for (; ; ) {
+                String sql = queue.take();
+                String QUIT_STR = "QUIT";
+                if (QUIT_STR.equalsIgnoreCase(sql)) {
+                    break;
+                }
+                if (StringUtils.isBlank(sql)) {
+                    continue;
+                }
+                batch.add(sql);
+                sum.addAndGet(1);
+                batch();
+            }
+            //last batch
+            batch();
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+        }
+    }
+
+    private void batch() throws SQLException {
+        if (sum.get() % batchNum == 0) {
+            if (!batch.isEmpty()) {
+                for (String r : batch) {
+                    stmt.addBatch(r);
+                }
+                stmt.executeBatch();
+                batch.clear();
+            }
+            logger.info(String.format("%s %s-%s-total process %s !", ds, Thread.currentThread().getName(), Thread.currentThread().getId(), sum.get()));
+        }
+    }
+
+    public void close() throws SQLException {
+        if (stmt != null) {
+            stmt.close();
+        }
+        if (connection != null) {
+            connection.close();
+        }
+    }
+
+    public long getProcessNum() {
+        return sum.get();
+    }
+}

+ 12 - 0
src/main/java/com/winhc/data/utils/BaseUtils.java

@@ -0,0 +1,12 @@
+package com.winhc.data.utils;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2023/5/29 14:00
+ */
+public class BaseUtils {
+    public static Boolean isWindows() {
+        return System.getProperty("os.name").contains("Windows");
+    }
+}

+ 22 - 0
src/main/java/com/winhc/data/utils/Common.java

@@ -0,0 +1,22 @@
+package com.winhc.data.utils;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2023/5/30 17:54
+ */
+public class Common {
+    public static final String CREATE_SQL = "INSERT INTO pull_data_status (id,create_time,update_time,status)\n" +
+            "VALUES (@ds , now() , now(), @status)\n" +
+            "ON DUPLICATE KEY UPDATE create_time = VALUES(create_time) , status = VALUES(status)";
+
+    public static final String UPDATE_SQL = "INSERT INTO pull_data_status (id,create_time,update_time,status)\n" +
+            "VALUES (@ds , now() , now(), @status)\n" +
+            "ON DUPLICATE KEY UPDATE update_time = VALUES(update_time), status = VALUES(status)";
+
+    public static final String STATUS_PRE = "data/sj/status/";
+    public static final String FILE_PRE = "data/sj/file/";
+    public static final String STATUS_SUCCESS_SUFFIX = ".success";
+    public static final String STATUS_UPLOAD_SUFFIX = ".upload";
+    public static final String FILE_UPLOAD_SUFFIX = ".json";
+}

+ 131 - 0
src/main/java/com/winhc/data/utils/DateUtils.java

@@ -0,0 +1,131 @@
+package com.winhc.data.utils;
+
+import java.text.SimpleDateFormat;
+import java.time.Instant;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.ZoneId;
+import java.time.format.DateTimeFormatter;
+import java.time.temporal.ChronoUnit;
+import java.util.Calendar;
+import java.util.Date;
+import java.util.GregorianCalendar;
+import java.util.Locale;
+
+/**
+ * @author ZhangJi
+ * @since 2021-10-22 15:09
+ */
+public class DateUtils {
+    public static final DateTimeFormatter YYYYMMDD = DateTimeFormatter.ofPattern("yyyyMMdd");
+    public static final DateTimeFormatter YYYYMMDDHHMMSS = DateTimeFormatter.ofPattern("yyyyMMdd HH:mm:ss");
+    public static final DateTimeFormatter YYYY_MM_DDHHMMSS = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
+    public static final DateTimeFormatter YYYY_MM_DD = DateTimeFormatter.ofPattern("yyyy-MM-dd");
+    public static final String FORMAT_YYYYMMDD = "yyyyMMdd";
+    public static final String FORMAT_YYYY_MM_DDHHMMSS = "yyyy-MM-dd HH:mm:ss";
+
+    public static final String formatDate(Date date, String pattern) {
+        String v = null;
+        try {
+            if (date == null)
+                return null;
+            SimpleDateFormat dateFormat = new SimpleDateFormat(pattern);
+            v = dateFormat.format(date);
+        } catch (Exception e) {
+            // do nothing
+        }
+        return v;
+    }
+
+    public static String nowDate() {
+        return nowDate(null);
+    }
+
+    public static String nowDate(DateTimeFormatter pattern) {
+        if (pattern == null) {
+            pattern = DateTimeFormatter.ISO_DATE;
+        }
+        return LocalDateTime.now().format(pattern);
+    }
+
+    public static String nowDateTime() {
+        return nowDateTime(null);
+    }
+
+    public static String nowDateTime(DateTimeFormatter pattern) {
+        if (pattern == null) {
+            pattern = DateTimeFormatter.ISO_LOCAL_DATE_TIME;
+        }
+        return LocalDateTime.now().format(pattern);
+    }
+
+    public static String getYesterday() {
+        return LocalDate.now().plusDays(-1).format(DateTimeFormatter.BASIC_ISO_DATE);
+    }
+
+    public static final String FORMAT_YYYY_MM_DD_HH_MM_SS = "yyyy-MM-dd HH:mm:ss";
+
+    public static final String formatDate_YYYY_MM_DD_HH_MM_SS(Date date) {
+        String v = null;
+        try {
+            if (date == null)
+                return null;
+            SimpleDateFormat dateFormat = new SimpleDateFormat(FORMAT_YYYY_MM_DD_HH_MM_SS);
+            v = dateFormat.format(date);
+        } catch (Exception e) {
+            // do nothing
+        }
+        return v;
+    }
+
+    public static final Date parseDate(String d) {
+        Date v = null;
+        try {
+            if (d == null)
+                return null;
+            SimpleDateFormat dateFormat = new SimpleDateFormat(FORMAT_YYYY_MM_DDHHMMSS);
+            v = dateFormat.parse(d);
+        } catch (Exception e) {
+            // do nothing
+        }
+        return v;
+    }
+
+    public static final String formatDate_YYYY_MM_DD_HH_MM_SS() {
+        return formatDate_YYYY_MM_DD_HH_MM_SS(new Date());
+    }
+
+    private static final DateTimeFormatter df = DateTimeFormatter.ofPattern(FORMAT_YYYY_MM_DDHHMMSS)
+            .withLocale(Locale.CHINA)
+            .withZone(ZoneId.systemDefault());
+
+    public static String getAnyTime(Integer i) {
+        Instant instant = Instant.now().minus(i, ChronoUnit.DAYS);
+        return df.format(instant);
+    }
+
+    public static String longToTime(Long timestamp) {
+        if (null == timestamp) return null;
+        try {
+            SimpleDateFormat format = new SimpleDateFormat(FORMAT_YYYY_MM_DDHHMMSS); //设置格式
+            return format.format(timestamp);
+        } catch (Exception e) {
+
+        }
+        return null;
+
+    }
+
+    public static int getCurrentHours() {
+        return new GregorianCalendar().get(Calendar.HOUR_OF_DAY);
+    }
+
+
+    public static void main(String[] args) {
+        System.out.println(getYesterday());
+        System.out.println(nowDate(YYYYMMDD));
+        System.out.println(nowDate(YYYY_MM_DD));
+        System.out.println(nowDate(YYYY_MM_DDHHMMSS));
+        System.out.println(getCurrentHours());
+    }
+}

+ 43 - 0
src/main/java/com/winhc/data/utils/EnvConst.java

@@ -0,0 +1,43 @@
+package com.winhc.data.utils;
+
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author: XuJiakai
+ * 2021/8/31 16:17
+ */
+public class EnvConst {
+    private static final EnvProperties env = new EnvProperties();
+
+    private static Map<String, String> params = env.getSource();
+
+    public static void initAllConf(final String[] args, Map<String, String> conf) throws Exception {
+        EnvConst.params = ParameterTool
+                /*.fromMap(env.getSource())*/
+                .fromMap(ParameterTool.fromArgs(args).toMap())
+                /*.mergeWith(ParameterTool.fromSystemProperties())
+                .mergeWith(ParameterTool.fromMap(getenv()))*/
+                .mergeWith(ParameterTool.fromMap(conf))// mergeWith 会使用最新的配置
+                .toMap();
+    }
+
+    //获取 Job 设置的环境变量
+    private static Map<String, String> getenv() {
+        Map<String, String> map = new HashMap<>();
+        for (Map.Entry<String, String> entry : System.getenv().entrySet()) {
+            map.put(entry.getKey().toLowerCase().replace('_', '.'), entry.getValue());
+        }
+        return map;
+    }
+
+    public static String getValue(String key) {
+        return getValue(key, null);
+    }
+
+    public static String getValue(String key, String orDefault) {
+        String value = params.getOrDefault(key, null);
+        return value == null ? orDefault : value;
+    }
+}

+ 143 - 0
src/main/java/com/winhc/data/utils/EnvProperties.java

@@ -0,0 +1,143 @@
+package com.winhc.data.utils;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.yaml.snakeyaml.Yaml;
+
+import java.io.*;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+/**
+ * @author: XuJiakai
+ * 2021/8/31 09:32
+ */
+public class EnvProperties {
+    private static final Logger logger = LoggerFactory.getLogger(EnvProperties.class);
+
+    private static final String fileName = "env.yml";
+
+    private Map<String, String> source;
+
+    public String getValue(String key) {
+        return source.getOrDefault(key, null);
+    }
+
+    public Map<String, String> getMapByPrefix(String prefix) {
+        return source.entrySet().stream().filter(e -> e.getKey().startsWith(prefix)).collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue, (e1, e2) -> e1));
+    }
+
+    public EnvProperties() {
+        this.source = getYmlByFileName(null, null);
+    }
+
+    public Map<String, String> getSource() {
+        return source;
+    }
+
+
+    /**
+     * 根据文件名获取yml的文件内容
+     *
+     * @return
+     */
+    private static Map<String, String> getYmlByFileName(String file, Map<String, String> source) {
+        if (source == null) {
+            source = new HashMap<>();
+        }
+        if (file == null) {
+            //file = fileName;
+            return source;
+        }
+        InputStream in = EnvProperties.class.getClassLoader().getResourceAsStream(file);
+        Yaml props = new Yaml();
+        Object obj = props.loadAs(in, Map.class);
+        Map<String, Object> param = (Map<String, Object>) obj;
+
+        for (Map.Entry<String, Object> entry : param.entrySet()) {
+            String key = entry.getKey();
+            Object val = entry.getValue();
+
+            if (val instanceof Map) {
+                forEachYaml(source, key, (Map<String, Object>) val);
+            } else {
+                source.put(key, val.toString());
+            }
+        }
+        return source;
+    }
+
+    /**
+     * 根据外部文件地址获取yml的文件内容
+     *
+     * @return
+     */
+    public static Boolean getYmlByFileNameOut(String filePath, Map<String, String> source) throws FileNotFoundException {
+        if (source == null) {
+            source = new HashMap<>();
+        }
+        try {
+            logger.info("file path: {}", filePath);
+            InputStream in = new FileInputStream(new File(filePath));
+            Yaml props = new Yaml();
+            Object obj = props.loadAs(in, Map.class);
+            Map<String, Object> param = (Map<String, Object>) obj;
+
+            for (Map.Entry<String, Object> entry : param.entrySet()) {
+                String key = entry.getKey();
+                Object val = entry.getValue();
+
+                if (val instanceof Map) {
+                    forEachYaml(source, key, (Map<String, Object>) val);
+                } else {
+                    source.put(key, val.toString());
+                }
+            }
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            return false;
+        }
+
+        return true;
+    }
+
+    /**
+     * 遍历yml文件,获取map集合
+     *
+     * @param key_str
+     * @param obj
+     * @return
+     */
+    private static Map<String, String> forEachYaml(Map<String, String> source, String key_str, Map<String, Object> obj) {
+        for (Map.Entry<String, Object> entry : obj.entrySet()) {
+            String key = entry.getKey();
+            Object val = entry.getValue();
+
+            String str_new = "";
+            if (StringUtils.isNotBlank(key_str)) {
+                str_new = key_str + "." + key;
+            } else {
+                str_new = key;
+            }
+            if (val instanceof Map) {
+                forEachYaml(source, str_new, (Map<String, Object>) val);
+            } else {
+                source.put(str_new, val.toString());
+            }
+        }
+        return source;
+    }
+
+
+    public static void main(String[] args) throws IOException {
+        EnvProperties envProperties = new EnvProperties();
+
+        String value = envProperties.getValue("winhc.mysql.driverClassName");
+        System.out.println(value);
+        String value1 = envProperties.getValue("winhc.oss.accessid");
+        System.out.println(value1);
+    }
+
+}

+ 19 - 0
src/main/java/com/winhc/data/utils/FileUtils.java

@@ -0,0 +1,19 @@
+package com.winhc.data.utils;
+
+import java.io.File;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2023/5/31 15:00
+ */
+public class FileUtils {
+
+    public static Boolean fileExists(String filePath) {
+        return new File(filePath).exists();
+    }
+
+    public static void main(String[] args) {
+
+    }
+}

+ 158 - 0
src/main/java/com/winhc/data/utils/HikariUtil.java

@@ -0,0 +1,158 @@
+package com.winhc.data.utils;
+
+import com.zaxxer.hikari.HikariConfig;
+import com.zaxxer.hikari.HikariDataSource;
+import lombok.SneakyThrows;
+import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+
+import javax.sql.DataSource;
+import java.lang.reflect.Field;
+import java.lang.reflect.Method;
+import java.sql.*;
+import java.util.*;
+import java.util.stream.Collectors;
+
+/**
+ * @author: XuJiakai
+ * 2023/5/16 14:12
+ * <p>
+ * xjk  xjk_ABC123
+ */
+
+@Slf4j
+public class HikariUtil {
+    private final static HikariUtil HIKARI_CP_UTI = new HikariUtil();
+
+    private static HikariDataSource dataSource = null;
+
+    private HikariUtil() {
+    }
+
+    @SneakyThrows
+    public static HikariUtil getInstance() {
+        HIKARI_CP_UTI.registerHikariCP();
+        return HIKARI_CP_UTI;
+
+    }
+
+    /**
+     * 注册HikariCP
+     */
+    private synchronized void registerHikariCP() {
+        if (null != dataSource) {
+            return;
+        }
+        HikariConfig config = new HikariConfig();
+        config.setDriverClassName(EnvConst.getValue("winhc.mysql.driverClassName"));
+        config.setJdbcUrl(EnvConst.getValue("winhc.mysql.jdbcUrl"));
+        config.setUsername(EnvConst.getValue("winhc.mysql.username"));
+        config.setPassword(EnvConst.getValue("winhc.mysql.password"));
+        int threadSize = Integer.parseInt(EnvConst.getValue("winhc.task.numThread", "20")) + 2;
+        config.setMaximumPoolSize(threadSize * 2);
+        config.setMinimumIdle(threadSize);
+        config.setMaxLifetime(1800000);
+        dataSource = new HikariDataSource(config);
+    }
+
+    /**
+     * 提供对外 获取 HikariCPDatasource 的方法
+     *
+     * @return
+     */
+    public static DataSource getHikariCPDataSource() {
+        if (null != dataSource) {
+            return dataSource;
+        }
+        HIKARI_CP_UTI.registerHikariCP();
+        return dataSource;
+    }
+
+    /**
+     * 每次重新从连接池中获取新连接,注意:如连接不够会超时
+     */
+    public Connection getConn() throws SQLException {
+        return getHikariCPDataSource().getConnection();
+    }
+
+
+    @SneakyThrows
+    public static <T> List<T> exec(Connection conn, String sql, Class<T> typeClazz) {
+
+        Statement statement = null;
+        ResultSet rs = null;
+
+        List<T> list = new ArrayList<>();
+        Field[] declaredFields = typeClazz.getDeclaredFields();
+        Method[] declaredMethods = typeClazz.getDeclaredMethods();
+        Set<String> methodSet = Arrays.stream(declaredMethods).map(Method::getName).collect(Collectors.toSet());
+        try {
+            statement = conn.createStatement();
+            rs = statement.executeQuery(sql);
+            Set<String> fieldSet = getFieldSet(rs, typeClazz);
+
+            while (rs.next()) {
+                T instance = typeClazz.newInstance();
+                for (Field field : declaredFields) {
+                    Class<?> type = field.getType();
+                    String fieldSetName = "set" + StringUtils.capitalize(field.getName());
+                    if (!methodSet.contains(fieldSetName) || !fieldSet.contains(field.getName())) {
+                        continue;
+                    }
+                    Method fieldSetMet = typeClazz.getMethod(fieldSetName, field
+                            .getType());
+                    switch (type.getSimpleName()) {
+                        case "String":
+                            fieldSetMet.invoke(instance, rs.getString(field.getName()));
+                            break;
+                        case "Integer":
+                            fieldSetMet.invoke(instance, rs.getInt(field.getName()));
+                            break;
+                        default:
+                            throw new RuntimeException();
+                    }
+                }
+                list.add(instance);
+            }
+            return list;
+        } catch (SQLException e) {
+            log.error(e.getMessage(), e);
+            log.error("hikari exec sql error: {}", e.getMessage());
+            throw new RuntimeException("hikari exec sql error: " + e.getMessage(), e);
+        } finally {
+            if (rs != null) {
+                rs.close();
+            }
+            if (statement != null) {
+                statement.close();
+            }
+        }
+    }
+
+
+    private static final Map<Class<?>, Set<String>> map = new HashMap<>();
+
+    private static Set<String> getFieldSet(ResultSet resultSet, Class<?> typeClazz) {
+        if (!map.containsKey(typeClazz)) {
+            synchronized (HikariUtil.class) {
+                if (!map.containsKey(typeClazz)) {
+                    try {
+                        ResultSetMetaData metaData = resultSet.getMetaData();
+                        int columnCount = metaData.getColumnCount();
+                        HashSet<String> set = new HashSet<>();
+                        for (int i = 1; i <= columnCount; i++) {
+                            String columnName = metaData.getColumnName(i);
+                            set.add(columnName);
+                        }
+                        map.put(typeClazz, set);
+                    } catch (Exception e) {
+                        log.error(e.getMessage(), e);
+                        e.printStackTrace();
+                        throw new RuntimeException(e);
+                    }
+                }
+            }
+        }
+        return map.get(typeClazz);
+    }
+}

+ 134 - 0
src/main/java/com/winhc/data/utils/OssUtils.java

@@ -0,0 +1,134 @@
+package com.winhc.data.utils;
+
+
+import com.aliyun.oss.OSS;
+import com.aliyun.oss.OSSClientBuilder;
+import com.aliyun.oss.model.*;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import static com.winhc.data.utils.Common.*;
+import static com.winhc.data.utils.DateUtils.getYesterday;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2022/11/14 18:02
+ */
+
+public class OssUtils {
+
+    private static final Logger logger = LoggerFactory.getLogger(OssUtils.class);
+
+    //sj key
+/*    public static String ACCESSKEYID = EnvConst.getValue("winhc.oss.accessid");
+    public static String ACCESSKEYSECRET = EnvConst.getValue("winhc.oss.accesskey");*/
+
+    public static String ACCESSKEYID = "LTAI5tG3JANdcd6SQPDVSieV";
+    public static String ACCESSKEYSECRET = "MFtVO7LvJNTDt6asJLtZhW2S9M5vpQ";
+
+    public static String ENDPOINT = "https://oss-cn-shanghai.aliyuncs.com";
+    public static String BUCKETNAME = "data-exchange-sj";
+
+
+    public static OSS getOssClient() {
+        return new OSSClientBuilder().build(ENDPOINT, ACCESSKEYID, ACCESSKEYSECRET);
+    }
+
+    public static void saveStatus(String ds, String statusSuffix) {
+        OSS ossClient = null;
+        try {
+            ossClient = getOssClient();
+            String targetKey = STATUS_PRE + ds + statusSuffix;
+            PutObjectRequest putObjectRequest = new PutObjectRequest(BUCKETNAME, targetKey, new ByteArrayInputStream("".getBytes()));
+            getOssClient().putObject(putObjectRequest);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e.getMessage());
+        } finally {
+            if (ossClient != null) {
+                ossClient.shutdown();
+            }
+        }
+
+    }
+
+    public static List<String> listFiles(String statusSuffix) {
+        OSS ossClient = getOssClient();
+        List<String> list = new ArrayList<>();
+        try {
+            String nextContinuationToken = null;
+            ListObjectsV2Result result;
+            do {
+                ListObjectsV2Request listObjectsV2Request = new ListObjectsV2Request(BUCKETNAME).withMaxKeys(200).withPrefix(STATUS_PRE);
+                listObjectsV2Request.setContinuationToken(nextContinuationToken);
+                result = ossClient.listObjectsV2(listObjectsV2Request);
+                List<OSSObjectSummary> sums = result.getObjectSummaries();
+                for (OSSObjectSummary s : sums) {
+                    String key = s.getKey();
+                    if (key.contains(statusSuffix)) {
+                        list.add(key.replaceAll(STATUS_PRE, "").replaceAll(statusSuffix, ""));
+                    }
+                }
+                nextContinuationToken = result.getNextContinuationToken();
+            } while (result.isTruncated());
+
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e.getMessage());
+        } finally {
+            if (ossClient != null) {
+                ossClient.shutdown();
+            }
+        }
+        return list;
+    }
+
+    public static Boolean existsFile(String objectName) {
+        OSS ossClient = getOssClient();
+        try {
+            return ossClient.doesObjectExist(BUCKETNAME, objectName);
+        } catch (Exception e) {
+            logger.error(e.getMessage(), e);
+            throw new RuntimeException(e.getMessage());
+        } finally {
+            if (ossClient != null) {
+                ossClient.shutdown();
+            }
+        }
+    }
+
+    public static List<String> findTasks(List<String> upload_task) {
+        //compare upload and success
+        upload_task.addAll(listFiles(STATUS_UPLOAD_SUFFIX));
+        List<String> success_task = listFiles(STATUS_SUCCESS_SUFFIX);
+        long lastTask = success_task.stream().mapToLong(Long::valueOf).max().orElse(0);
+        String yesTask = getYesterday();
+        //昨天数据已处理->pass
+        if (lastTask == Long.parseLong(getYesterday())) return new ArrayList<>();
+        List<String> tasks = upload_task.stream()
+                .filter(x -> !success_task.contains(x) && Long.parseLong(x) > lastTask)
+                .distinct()
+                .collect(Collectors.toList());
+        //昨天任务
+        /*if (!tasks.contains(yesTask)) {
+            tasks.add(yesTask);
+        }*/
+        tasks.sort(Comparator.comparing(String::toString));
+        return tasks;
+    }
+
+    public static void main(String[] args) {
+        saveStatus("20230528", STATUS_SUCCESS_SUFFIX);
+        System.out.println(listFiles(STATUS_SUCCESS_SUFFIX));
+        System.out.println(listFiles(STATUS_UPLOAD_SUFFIX));
+        System.out.println(existsFile("data/sj/file/20230528.json"));
+        System.out.println(findTasks(new ArrayList<>()));
+    }
+}

+ 458 - 0
src/main/java/com/winhc/data/utils/ParameterTool.java

@@ -0,0 +1,458 @@
+package com.winhc.data.utils;
+
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author: XuJiakai
+ * 2022/6/7 11:28
+ */
+public class ParameterTool {
+    protected transient Map<String, String> defaultData;
+    protected transient Set<String> unrequestedParameters;
+
+    protected static final String NO_VALUE_KEY = "__NO_VALUE_KEY";
+    protected static final String DEFAULT_UNDEFINED = "<undefined>";
+
+    // ------------------ Constructors ------------------------
+
+    /**
+     * Returns {@link ParameterTool} for the given arguments. The arguments are keys followed by
+     * values. Keys have to start with '-' or '--'
+     *
+     * <p><strong>Example arguments:</strong> --key1 value1 --key2 value2 -key3 value3
+     *
+     * @param args Input array arguments
+     * @return A {@link ParameterTool}
+     */
+    public static ParameterTool fromArgs(String[] args) {
+        final Map<String, String> map = new HashMap<>(args.length / 2);
+
+        int i = 0;
+        while (i < args.length) {
+            final String key = getKeyFromArgs(args, i);
+
+            if (key.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "The input " + Arrays.toString(args) + " contains an empty argument");
+            }
+
+            i += 1; // try to find the value
+
+            if (i >= args.length) {
+                map.put(key, NO_VALUE_KEY);
+            } else if (isNumber(args[i])) {
+                map.put(key, args[i]);
+                i += 1;
+            } else if (args[i].startsWith("--") || args[i].startsWith("-")) {
+                // the argument cannot be a negative number because we checked earlier
+                // -> the next argument is a parameter name
+                map.put(key, NO_VALUE_KEY);
+            } else {
+                map.put(key, args[i]);
+                i += 1;
+            }
+        }
+
+        return fromMap(map);
+    }
+
+    /**
+     * Returns {@link ParameterTool} for the given {@link Properties} file.
+     *
+     * @param path Path to the properties file
+     * @return A {@link ParameterTool}
+     * @throws IOException If the file does not exist
+     * @see Properties
+     */
+    public static ParameterTool fromPropertiesFile(String path) throws IOException {
+        File propertiesFile = new File(path);
+        return fromPropertiesFile(propertiesFile);
+    }
+
+    /**
+     * Returns {@link ParameterTool} for the given {@link Properties} file.
+     *
+     * @param file File object to the properties file
+     * @return A {@link ParameterTool}
+     * @throws IOException If the file does not exist
+     * @see Properties
+     */
+    public static ParameterTool fromPropertiesFile(File file) throws IOException {
+        if (!file.exists()) {
+            throw new FileNotFoundException(
+                    "Properties file " + file.getAbsolutePath() + " does not exist");
+        }
+        try (FileInputStream fis = new FileInputStream(file)) {
+            return fromPropertiesFile(fis);
+        }
+    }
+
+    /**
+     * Returns {@link ParameterTool} for the given InputStream from {@link Properties} file.
+     *
+     * @param inputStream InputStream from the properties file
+     * @return A {@link ParameterTool}
+     * @throws IOException If the file does not exist
+     * @see Properties
+     */
+    public static ParameterTool fromPropertiesFile(InputStream inputStream) throws IOException {
+        Properties props = new Properties();
+        props.load(inputStream);
+        return fromMap((Map) props);
+    }
+
+    /**
+     * Returns {@link ParameterTool} for the given map.
+     *
+     * @param map A map of arguments. Both Key and Value have to be Strings
+     * @return A {@link ParameterTool}
+     */
+    public static ParameterTool fromMap(Map<String, String> map) {
+        checkNotNull(map, "Unable to initialize from empty map");
+        return new ParameterTool(map);
+    }
+
+    /**
+     * Returns {@link ParameterTool} from the system properties. Example on how to pass system
+     * properties: -Dkey1=value1 -Dkey2=value2
+     *
+     * @return A {@link ParameterTool}
+     */
+    public static ParameterTool fromSystemProperties() {
+        return fromMap((Map) System.getProperties());
+    }
+
+    // ------------------ ParameterUtil  ------------------------
+    protected final Map<String, String> data;
+
+    private ParameterTool(Map<String, String> data) {
+        this.data = Collections.unmodifiableMap(new HashMap<>(data));
+
+        this.defaultData = new ConcurrentHashMap<>(data.size());
+
+        this.unrequestedParameters =
+                Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));
+
+        unrequestedParameters.addAll(data.keySet());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ParameterTool that = (ParameterTool) o;
+        return Objects.equals(data, that.data)
+                && Objects.equals(defaultData, that.defaultData)
+                && Objects.equals(unrequestedParameters, that.unrequestedParameters);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(data, defaultData, unrequestedParameters);
+    }
+
+    // ------------------ Get data from the util ----------------
+
+    /**
+     * Returns number of parameters in {@link ParameterTool}.
+     */
+    public int getNumberOfParameters() {
+        return data.size();
+    }
+
+    /**
+     * Returns the String value for the given key. If the key does not exist it will return null.
+     */
+    public String get(String key) {
+        addToDefaults(key, null);
+        unrequestedParameters.remove(key);
+        return data.get(key);
+    }
+
+    public String getOrDefault(String key, String defaultValue) {
+        String s = get(key);
+        if (s == null) {
+            return defaultValue;
+        } else {
+            return s;
+        }
+    }
+
+
+    /**
+     * Check if value is set.
+     */
+    public boolean has(String value) {
+        addToDefaults(value, null);
+        unrequestedParameters.remove(value);
+        return data.containsKey(value);
+    }
+
+
+    /**
+     * Returns a {@link Properties} object from this {@link ParameterTool}.
+     *
+     * @return A {@link Properties}
+     */
+    public Properties getProperties() {
+        Properties props = new Properties();
+        props.putAll(this.data);
+        return props;
+    }
+
+    /**
+     * Create a properties file with all the known parameters (call after the last get*() call). Set
+     * the default value, if available.
+     *
+     * <p>Use this method to create a properties file skeleton.
+     *
+     * @param pathToFile Location of the default properties file.
+     */
+    public void createPropertiesFile(String pathToFile) throws IOException {
+        createPropertiesFile(pathToFile, true);
+    }
+
+    /**
+     * Create a properties file with all the known parameters (call after the last get*() call). Set
+     * the default value, if overwrite is true.
+     *
+     * @param pathToFile Location of the default properties file.
+     * @param overwrite  Boolean flag indicating whether or not to overwrite the file
+     * @throws IOException If overwrite is not allowed and the file exists
+     */
+    public void createPropertiesFile(String pathToFile, boolean overwrite) throws IOException {
+        final File file = new File(pathToFile);
+        if (file.exists()) {
+            if (overwrite) {
+                file.delete();
+            } else {
+                throw new RuntimeException(
+                        "File " + pathToFile + " exists and overwriting is not allowed");
+            }
+        }
+        final Properties defaultProps = new Properties();
+        defaultProps.putAll(this.defaultData);
+        try (final OutputStream out = new FileOutputStream(file)) {
+            defaultProps.store(
+                    out, "Default file created by Flink's ParameterUtil.createPropertiesFile()");
+        }
+    }
+
+    @Override
+    protected Object clone() throws CloneNotSupportedException {
+        return new ParameterTool(this.data);
+    }
+
+    // ------------------------- Interaction with other ParameterUtils -------------------------
+
+    /**
+     * Merges two {@link ParameterTool}.
+     *
+     * @param other Other {@link ParameterTool} object
+     * @return The Merged {@link ParameterTool}
+     */
+    public ParameterTool mergeWith(ParameterTool other) {
+        final Map<String, String> resultData = new HashMap<>(data.size() + other.data.size());
+        resultData.putAll(data);
+        resultData.putAll(other.data);
+
+        final ParameterTool ret = new ParameterTool(resultData);
+
+        final HashSet<String> requestedParametersLeft = new HashSet<>(data.keySet());
+        requestedParametersLeft.removeAll(unrequestedParameters);
+
+        final HashSet<String> requestedParametersRight = new HashSet<>(other.data.keySet());
+        requestedParametersRight.removeAll(other.unrequestedParameters);
+
+        ret.unrequestedParameters.removeAll(requestedParametersLeft);
+        ret.unrequestedParameters.removeAll(requestedParametersRight);
+
+        return ret;
+    }
+
+    // ------------------------- ExecutionConfig.UserConfig interface -------------------------
+
+    public Map<String, String> toMap() {
+        return data;
+    }
+
+    // ------------------------- Serialization ---------------------------------------------
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+
+        defaultData = new ConcurrentHashMap<>(data.size());
+        unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));
+    }
+
+    public static String getKeyFromArgs(String[] args, int index) {
+        String key;
+        if (args[index].startsWith("--")) {
+            key = args[index].substring(2);
+        } else if (args[index].startsWith("-")) {
+            key = args[index].substring(1);
+        } else {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Error parsing arguments '%s' on '%s'. Please prefix keys with -- or -.",
+                            Arrays.toString(args), args[index]));
+        }
+
+        if (key.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "The input " + Arrays.toString(args) + " contains an empty argument");
+        }
+
+        return key;
+    }
+
+    public static boolean isNumber(final String str) {
+        if (isEmpty(str)) {
+            return false;
+        }
+        final char[] chars = str.toCharArray();
+        int sz = chars.length;
+        boolean hasExp = false;
+        boolean hasDecPoint = false;
+        boolean allowSigns = false;
+        boolean foundDigit = false;
+        // deal with any possible sign up front
+        final int start = (chars[0] == '-') ? 1 : 0;
+        if (sz > start + 1 && chars[start] == '0') { // leading 0
+            if (
+                    (chars[start + 1] == 'x') ||
+                            (chars[start + 1] == 'X')
+            ) { // leading 0x/0X
+                int i = start + 2;
+                if (i == sz) {
+                    return false; // str == "0x"
+                }
+                // checking hex (it can't be anything else)
+                for (; i < chars.length; i++) {
+                    if ((chars[i] < '0' || chars[i] > '9')
+                            && (chars[i] < 'a' || chars[i] > 'f')
+                            && (chars[i] < 'A' || chars[i] > 'F')) {
+                        return false;
+                    }
+                }
+                return true;
+            } else if (Character.isDigit(chars[start + 1])) {
+                // leading 0, but not hex, must be octal
+                int i = start + 1;
+                for (; i < chars.length; i++) {
+                    if (chars[i] < '0' || chars[i] > '7') {
+                        return false;
+                    }
+                }
+                return true;
+            }
+        }
+        sz--; // don't want to loop to the last char, check it afterwords
+        // for type qualifiers
+        int i = start;
+        // loop to the next to last char or to the last char if we need another digit to
+        // make a valid number (e.g. chars[0..5] = "1234E")
+        while (i < sz || (i < sz + 1 && allowSigns && !foundDigit)) {
+            if (chars[i] >= '0' && chars[i] <= '9') {
+                foundDigit = true;
+                allowSigns = false;
+
+            } else if (chars[i] == '.') {
+                if (hasDecPoint || hasExp) {
+                    // two decimal points or dec in exponent
+                    return false;
+                }
+                hasDecPoint = true;
+            } else if (chars[i] == 'e' || chars[i] == 'E') {
+                // we've already taken care of hex.
+                if (hasExp) {
+                    // two E's
+                    return false;
+                }
+                if (!foundDigit) {
+                    return false;
+                }
+                hasExp = true;
+                allowSigns = true;
+            } else if (chars[i] == '+' || chars[i] == '-') {
+                if (!allowSigns) {
+                    return false;
+                }
+                allowSigns = false;
+                foundDigit = false; // we need a digit after the E
+            } else {
+                return false;
+            }
+            i++;
+        }
+        if (i < chars.length) {
+            if (chars[i] >= '0' && chars[i] <= '9') {
+                // no type qualifier, OK
+                return true;
+            }
+            if (chars[i] == 'e' || chars[i] == 'E') {
+                // can't have an E at the last byte
+                return false;
+            }
+            if (chars[i] == '.') {
+                if (hasDecPoint || hasExp) {
+                    // two decimal points or dec in exponent
+                    return false;
+                }
+                // single trailing decimal point after non-exponent is ok
+                return foundDigit;
+            }
+            if (!allowSigns
+                    && (chars[i] == 'd'
+                    || chars[i] == 'D'
+                    || chars[i] == 'f'
+                    || chars[i] == 'F')) {
+                return foundDigit;
+            }
+            if (chars[i] == 'l'
+                    || chars[i] == 'L') {
+                // not allowing L with an exponent or decimal point
+                return foundDigit && !hasExp && !hasDecPoint;
+            }
+            // last character is illegal
+            return false;
+        }
+        // allowSigns is true iff the val ends in 'E'
+        // found digit it to make sure weird stuff like '.' and '1E-' doesn't pass
+        return !allowSigns && foundDigit;
+    }
+
+    public static boolean isEmpty(final CharSequence cs) {
+        return cs == null || cs.length() == 0;
+    }
+
+
+    protected void addToDefaults(String key, String value) {
+        final String currentValue = defaultData.get(key);
+        if (currentValue == null) {
+            if (value == null) {
+                value = DEFAULT_UNDEFINED;
+            }
+            defaultData.put(key, value);
+        } else {
+            // there is already an entry for this key. Check if the value is the undefined
+            if (currentValue.equals(DEFAULT_UNDEFINED) && value != null) {
+                // update key with better default value
+                defaultData.put(key, value);
+            }
+        }
+    }
+
+    public static <T> T checkNotNull( T reference,  String errorMessage) {
+        if (reference == null) {
+            throw new NullPointerException(String.valueOf(errorMessage));
+        }
+        return reference;
+    }
+}

+ 24 - 0
src/main/resources/log4j.properties

@@ -0,0 +1,24 @@
+### 将等级为debug的日志输出到console和file, console和file的定义在下面 ###
+log4j.rootLogger = INFO,console,file
+
+
+### console 配置输出到控制台 ###
+log4j.appender.console = org.apache.log4j.ConsoleAppender
+log4j.appender.console.Target = System.out
+log4j.appender.console.Threshold = INFO
+log4j.appender.console.layout = org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern = [%p] [%d{yyyy-MM-dd HH:mm:ss}] [%c] %m%n
+#log4j.appender.console.layout.ConversionPattern =  %d{ABSOLUTE} %5p %c{ 1 }:%L - %m%n
+
+
+### file 配置输出到文件 ###
+### DailyRollingFileAppender: 以日期分割每小时产生一个文件,  DatePattern: 日志文件名称格式 ###
+log4j.appender.file = org.apache.log4j.DailyRollingFileAppender
+log4j.appender.file.DatePattern  = '_'yyyy-MM-dd-HH'.log'
+log4j.appender.file.File = logs/log.log
+log4j.appender.file.Threshold = INFO
+log4j.appender.file.Append = true
+log4j.appender.file.layout = org.apache.log4j.PatternLayout
+log4j.appender.file.layout.ConversionPattern = [%p] [%d{yyyy-MM-dd HH:mm:ss}] [%c] %m%n
+#log4j.appender.file.layout.ConversionPattern = %-d{yyyy-MM-dd HH:mm:ss}  [ %t:%r ] - [ %p ]  %m%n
+

+ 19 - 0
src/test/java/com/winhc/test/Test.java

@@ -0,0 +1,19 @@
+package com.winhc.test;
+
+
+import com.winhc.data.utils.ParameterTool;
+
+import java.util.Map;
+
+/**
+ * @author π
+ * @Description:
+ * @date 2023/5/31 14:51
+ */
+public class Test {
+
+    public static void main(String[] args) throws Exception {
+        Map<String, String> params = ParameterTool.fromArgs(args).toMap();
+        System.out.println(params);
+    }
+}