许家凯 2 سال پیش
کامیت
2445d20afe

+ 34 - 0
.gitignore

@@ -0,0 +1,34 @@
+### Java template
+# Compiled class file
+*.class
+
+# Log file
+*.log
+
+# BlueJ files
+*.ctxt
+
+# Mobile Tools for Java (J2ME)
+.mtj.tmp/
+
+# Package Files #
+*.jar
+*.war
+*.nar
+*.ear
+*.zip
+*.tar.gz
+*.rar
+
+# virtual machine crash logs, see http://www.java.com/en/download/help/error_hotspot.xml
+hs_err_pid*
+
+### Example user template template
+### Example user template
+
+# IntelliJ project files
+.idea
+*.iml
+out
+gen
+target

+ 41 - 0
pom.xml

@@ -0,0 +1,41 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <groupId>com.winhc</groupId>
+    <artifactId>max-compute-graph</artifactId>
+    <version>1.2</version>
+
+    <properties>
+        <maven.compiler.source>8</maven.compiler.source>
+        <maven.compiler.target>8</maven.compiler.target>
+    </properties>
+    <dependencies>
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-sdk-graph</artifactId>
+            <version>0.38.4-public</version>
+        </dependency>
+        <dependency>
+            <groupId>log4j</groupId>
+            <artifactId>log4j</artifactId>
+            <version>1.2.17</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.aliyun.odps</groupId>
+            <artifactId>odps-graph-local</artifactId>
+            <version>0.38.4-public</version>
+            <scope>compile</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>cn.hutool</groupId>
+            <artifactId>hutool-all</artifactId>
+            <version>5.7.10</version>
+        </dependency>
+    </dependencies>
+
+</project>

+ 8 - 0
src/main/java/com/winhc/max/compute/graph/Main.java

@@ -0,0 +1,8 @@
+package com.winhc.max.compute.graph;
+
+/**
+ * @author: XuJiakai
+ * 2022/5/27 14:59
+ */
+public class Main {
+}

+ 60 - 0
src/main/java/com/winhc/max/compute/graph/job/CompanyRankGraphJob.java

@@ -0,0 +1,60 @@
+package com.winhc.max.compute.graph.job;
+
+
+import com.aliyun.odps.data.TableInfo;
+import com.aliyun.odps.graph.GraphJob;
+import com.aliyun.odps.graph.RemoveDuplicatesLoadingResolver;
+import com.winhc.max.compute.graph.job.pagerank.CompanyComputingVertexResolver;
+import com.winhc.max.compute.graph.job.pagerank.CompanyRankVertex;
+import com.winhc.max.compute.graph.job.pagerank.CompanyRankVertexReader;
+import com.winhc.max.compute.graph.job.pagerank.PageRankAggregator;
+import com.winhc.max.compute.graph.util.ParameterTool;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+
+/**
+ * @author: XuJiakai
+ * 2022/5/27 14:59
+ */
+public class CompanyRankGraphJob {
+    private final static Logger log = Logger.getLogger(CompanyRankGraphJob.class);
+
+    public static void main(String[] args) throws IOException {
+        ParameterTool parameterTool = ParameterTool.fromArgs(args);
+        String outPart = parameterTool.getOrDefault("outPart", "" + System.currentTimeMillis());
+        String maxIter = parameterTool.getOrDefault("maxIter", "30");
+        String variance = parameterTool.getOrDefault("variance", "0.00000001");
+
+        System.out.println("input args: " + String.join(" ", args));
+
+        GraphJob job = new GraphJob();
+
+        job.set("company.rank.variance", variance);
+
+        job.setGraphLoaderClass(CompanyRankVertexReader.class);
+        job.setVertexClass(CompanyRankVertex.class);
+
+
+        job.setLoadingVertexResolver(RemoveDuplicatesLoadingResolver.class);
+        job.setComputingVertexResolver(CompanyComputingVertexResolver.class);
+        job.setAggregatorClass(PageRankAggregator.class);
+
+
+        job.addInput(TableInfo.builder()
+                .projectName("winhc_ng")
+                .tableName("calc_company_rank_input")
+                .build());
+        job.addOutput(TableInfo.builder()
+                .projectName("winhc_ng")
+                .tableName("calc_company_rank_out")
+                .partSpec("ds='" + outPart + "'")
+                .build());
+        job.setMaxIteration(-1);
+
+        long startTime = System.currentTimeMillis();
+        job.run();
+        System.out.println("Job Finished in "
+                + (System.currentTimeMillis() - startTime) / 1000.0 + " seconds");
+    }
+}

+ 39 - 0
src/main/java/com/winhc/max/compute/graph/job/pagerank/CompanyComputingVertexResolver.java

@@ -0,0 +1,39 @@
+package com.winhc.max.compute.graph.job.pagerank;
+
+import com.aliyun.odps.graph.DefaultComputingVertexResolver;
+import com.aliyun.odps.graph.Vertex;
+import com.aliyun.odps.graph.VertexChanges;
+import com.aliyun.odps.io.Writable;
+import com.aliyun.odps.io.WritableComparable;
+
+import java.io.IOException;
+
+/**
+ * @author: XuJiakai
+ * 2022/6/14 09:12
+ */
+public class CompanyComputingVertexResolver<I extends WritableComparable, V extends Writable, E extends Writable, M extends Writable> extends DefaultComputingVertexResolver {
+
+    @Override
+    public Vertex resolveNotExistVertexMutations(WritableComparable vertexId, VertexChanges vertexChanges, boolean hasMessages) throws IOException {
+
+        /**
+         * 1. If creation of vertex desired, pick first vertex.
+         */
+        Vertex<I, V, E, M> vertex = addVertexIfDesired(vertexId, vertexChanges);
+
+        /** 2. If edge addition, add the edges */
+        addEdges(vertexId, vertex, vertexChanges);
+
+        /** 3. If the vertex exists, first prune the edges. */
+        removeEdges(vertexId, vertex, vertexChanges);
+
+        /** 4. If vertex removal desired, remove the vertex. */
+        vertex = removeVertexIfDesired(vertexId, vertex, vertexChanges);
+
+        /** 5. If send messages to not exist vertex, throw exception.*/
+        if (vertex == null && hasMessages) {
+        }
+        return vertex;
+    }
+}

+ 84 - 0
src/main/java/com/winhc/max/compute/graph/job/pagerank/CompanyRank.java

@@ -0,0 +1,84 @@
+package com.winhc.max.compute.graph.job.pagerank;
+
+import com.aliyun.odps.io.DoubleWritable;
+import com.aliyun.odps.io.Text;
+import com.aliyun.odps.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * @author: XuJiakai
+ * 2022/6/8 09:55
+ */
+public class CompanyRank implements Writable {
+
+    private Text companyName;
+    private DoubleWritable initValue;
+    private DoubleWritable pr;
+
+    public static CompanyRank of(double initValue) {
+        return of(new DoubleWritable(initValue));
+    }
+
+    public Text getCompanyName() {
+        return companyName;
+    }
+
+    public void setCompanyName(Text companyName) {
+        this.companyName = companyName;
+    }
+
+    public static CompanyRank of(DoubleWritable initValue) {
+        CompanyRank companyRank = new CompanyRank();
+        companyRank.initValue = initValue;
+        companyRank.pr = new DoubleWritable(-1);
+        return companyRank;
+    }
+
+    public DoubleWritable getFinalValue() {
+        if (pr.get() < 0) {
+            return initValue;
+        } else {
+            return new DoubleWritable(initValue.get() + pr.get());
+        }
+    }
+
+    public Double getSendMsg() {
+        return pr.get() < 0 ? initValue.get() : pr.get() + initValue.get();
+    }
+
+    public DoubleWritable getPr() {
+        return pr.get() < 0 ? initValue : pr;
+    }
+
+    public DoubleWritable getInitValue() {
+        return initValue;
+    }
+
+    public void setInitValue(DoubleWritable initValue) {
+        this.initValue = initValue;
+    }
+
+    public void setPr(DoubleWritable pr) {
+        this.pr = pr;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        companyName.write(out);
+        initValue.write(out);
+        pr.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        companyName = new Text();
+        initValue = new DoubleWritable();
+        pr = new DoubleWritable();
+        companyName.readFields(in);
+        initValue.readFields(in);
+        pr.readFields(in);
+    }
+}

+ 57 - 0
src/main/java/com/winhc/max/compute/graph/job/pagerank/CompanyRankVertex.java

@@ -0,0 +1,57 @@
+package com.winhc.max.compute.graph.job.pagerank;
+
+import com.aliyun.odps.graph.ComputeContext;
+import com.aliyun.odps.graph.Vertex;
+import com.aliyun.odps.graph.WorkerContext;
+import com.aliyun.odps.io.DoubleWritable;
+import com.aliyun.odps.io.Text;
+
+import java.io.IOException;
+
+/**
+ * @author: XuJiakai
+ * 2022/6/8 10:00
+ */
+public class CompanyRankVertex extends
+        Vertex<Text, CompanyRank, DoubleWritable, DoubleWritable> {
+    @Override
+    public void compute(ComputeContext<Text, CompanyRank, DoubleWritable, DoubleWritable> context, Iterable<DoubleWritable> messages) throws IOException {
+        //            log.info("start -> step:" + context.getSuperstep() + ",vertex Id:" + getId() + ",vertex pr:" + getValue().get() + ";");
+        Double oldPr = getValue().getPr().get();
+        if (context.getSuperstep() == 0) {
+//                log.info("start -> step:" + context.getSuperstep() + ",vertex Id:" + getId() + ",vertex pr:" + getValue().get() + ";");
+//                setValue(new DoubleWritable(1.0));
+        } else if (context.getSuperstep() >= 1) {
+            double sum = 0;
+            int count = 0;
+            for (DoubleWritable msg : messages) {
+                sum += msg.get();
+                count++;
+            }
+            if (count > 0) {
+                CompanyRank value = getValue();
+                value.setPr(new DoubleWritable(0.15f + 0.85f * sum));
+                setValue(value);
+            }
+        }
+
+        if (hasEdges()) {
+            Double sendMsg = getValue().getSendMsg();
+            Double sendValue = sendMsg / getEdges().size();
+
+            context.sendMessageToNeighbors(this, new DoubleWritable(sendValue));
+
+            Double newPr = getValue().getPr().get();
+            context.aggregate(new DoubleWritable(Math.pow(newPr - oldPr, 2)));
+//                log.info("end -> step:" + context.getSuperstep() + ",vertex Id:" + getId() + ",vertex pr:" + getValue().get() + ",send message:" + sendValue);
+        } else {
+//                log.info("end -> step:" + context.getSuperstep() + ",vertex Id:" + getId() + ",vertex pr:" + getValue().get() + ",no send message");
+        }
+
+    }
+
+    @Override
+    public void cleanup(WorkerContext<Text, CompanyRank, DoubleWritable, DoubleWritable> context) throws IOException {
+        context.write(getId(),getValue().getCompanyName(), getValue().getFinalValue(), getValue().getInitValue(), getValue().getPr());
+    }
+}

+ 39 - 0
src/main/java/com/winhc/max/compute/graph/job/pagerank/CompanyRankVertexReader.java

@@ -0,0 +1,39 @@
+package com.winhc.max.compute.graph.job.pagerank;
+
+import com.aliyun.odps.graph.GraphLoader;
+import com.aliyun.odps.graph.MutationContext;
+import com.aliyun.odps.io.*;
+
+import java.io.IOException;
+
+/**
+ * @author: XuJiakai
+ * 2022/6/8 09:58
+ */
+public class CompanyRankVertexReader extends
+        GraphLoader<Text, CompanyRank, DoubleWritable, DoubleWritable> {
+    @Override
+    public void load(LongWritable recordNum, WritableRecord record, MutationContext<Text, CompanyRank, DoubleWritable, DoubleWritable> context) throws IOException {
+        CompanyRankVertex vertex = new CompanyRankVertex();
+
+        Text companyId = (Text) record.get("investment_company_id");
+        Text company_name = (Text) record.get("investment_company_name");
+//        DoubleWritable initScore = ((DoubleWritable) record.get("investment_company_init_score"));
+        DoubleWritable initScore = new DoubleWritable(1);
+        vertex.setId(companyId);
+        CompanyRank of = CompanyRank.of(initScore);
+        of.setCompanyName(company_name);
+        vertex.setValue(of);
+        Writable holder_info = record.get("holder_info");
+
+        if (!(holder_info.equals(NullWritable.get()))) {
+            String[] split1 = holder_info.toString().split(",");
+            for (String s : split1) {
+                String[] split = s.split("=");
+                vertex.addEdge(new Text(split[0]), new DoubleWritable(1 / split1.length));
+            }
+        }
+//            log.info(graphId.toString() + "," + holder_info.toString() + " vertex edgs size: " + (vertex.hasEdges() ? vertex.getEdges().size() : 0));
+        context.addVertexRequest(vertex);
+    }
+}

+ 58 - 0
src/main/java/com/winhc/max/compute/graph/job/pagerank/PageRankAggValue.java

@@ -0,0 +1,58 @@
+package com.winhc.max.compute.graph.job.pagerank;
+
+import com.aliyun.odps.io.DoubleWritable;
+import com.aliyun.odps.io.LongWritable;
+import com.aliyun.odps.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+/**
+ * @author: XuJiakai
+ * 2022/6/7 15:41
+ */
+public class PageRankAggValue implements Writable {
+
+    public PageRankAggValue() {
+        n = new LongWritable(0);
+        sumValue = new DoubleWritable(0);
+    }
+
+    private LongWritable n;
+    private DoubleWritable sumValue;
+
+    /**
+     * @param value (newPr -oldPr)^2
+     */
+    public void addNewValue(double value) {
+        update(n.get() + 1, sumValue.get() + value);
+    }
+
+    public void update(long n, double sumValue) {
+        this.sumValue = new DoubleWritable(sumValue);
+        this.n = new LongWritable(n);
+    }
+
+    public LongWritable getN() {
+        return n;
+    }
+
+    public DoubleWritable getSumValue() {
+        return sumValue;
+    }
+
+    @Override
+    public void write(DataOutput out) throws IOException {
+        n.write(out);
+        sumValue.write(out);
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+        n = new LongWritable();
+        sumValue = new DoubleWritable();
+        n.readFields(in);
+        sumValue.readFields(in);
+    }
+}

+ 43 - 0
src/main/java/com/winhc/max/compute/graph/job/pagerank/PageRankAggregator.java

@@ -0,0 +1,43 @@
+package com.winhc.max.compute.graph.job.pagerank;
+
+import com.aliyun.odps.graph.Aggregator;
+import com.aliyun.odps.graph.WorkerContext;
+import com.aliyun.odps.io.DoubleWritable;
+
+import java.io.IOException;
+
+/**
+ * @author: XuJiakai
+ * 2022/6/7 15:50
+ */
+public class PageRankAggregator extends Aggregator<PageRankAggValue> {
+    @Override
+    public PageRankAggValue createInitialValue(WorkerContext context) throws IOException {
+        return new PageRankAggValue();
+    }
+
+    @Override
+    public void aggregate(PageRankAggValue pageRankAggValue, Object item) throws IOException {
+        DoubleWritable inputValue = ((DoubleWritable) item);
+        pageRankAggValue.addNewValue(inputValue.get());
+    }
+
+    @Override
+    public void merge(PageRankAggValue pageRankAggValue, PageRankAggValue partial) throws IOException {
+        long n = pageRankAggValue.getN().get() + partial.getN().get();
+        double sumValue = pageRankAggValue.getSumValue().get() + partial.getSumValue().get();
+        pageRankAggValue.update(n, sumValue);
+    }
+
+    @Override
+    public boolean terminate(WorkerContext context, PageRankAggValue pageRankAggValue) throws IOException {
+        Double variance = Double.parseDouble(context.getConfiguration().get("company.rank.variance", "0.01"));
+        double v = pageRankAggValue.getSumValue().get() / pageRankAggValue.getN().get();
+        if (context.getSuperstep() == 0) {
+            System.out.println("skip step " + context.getSuperstep() + ", variance value :" + v);
+            return false;
+        }
+        System.out.println("step " + context.getSuperstep() + ", variance value: " + v);
+        return v < variance;
+    }
+}

+ 459 - 0
src/main/java/com/winhc/max/compute/graph/util/ParameterTool.java

@@ -0,0 +1,459 @@
+package com.winhc.max.compute.graph.util;
+
+import javax.annotation.Nullable;
+import java.io.*;
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * @author: XuJiakai
+ * 2022/6/7 11:28
+ */
+public class ParameterTool {
+    protected transient Map<String, String> defaultData;
+    protected transient Set<String> unrequestedParameters;
+
+    protected static final String NO_VALUE_KEY = "__NO_VALUE_KEY";
+    protected static final String DEFAULT_UNDEFINED = "<undefined>";
+
+    // ------------------ Constructors ------------------------
+
+    /**
+     * Returns {@link ParameterTool} for the given arguments. The arguments are keys followed by
+     * values. Keys have to start with '-' or '--'
+     *
+     * <p><strong>Example arguments:</strong> --key1 value1 --key2 value2 -key3 value3
+     *
+     * @param args Input array arguments
+     * @return A {@link ParameterTool}
+     */
+    public static ParameterTool fromArgs(String[] args) {
+        final Map<String, String> map = new HashMap<>(args.length / 2);
+
+        int i = 0;
+        while (i < args.length) {
+            final String key = getKeyFromArgs(args, i);
+
+            if (key.isEmpty()) {
+                throw new IllegalArgumentException(
+                        "The input " + Arrays.toString(args) + " contains an empty argument");
+            }
+
+            i += 1; // try to find the value
+
+            if (i >= args.length) {
+                map.put(key, NO_VALUE_KEY);
+            } else if (isNumber(args[i])) {
+                map.put(key, args[i]);
+                i += 1;
+            } else if (args[i].startsWith("--") || args[i].startsWith("-")) {
+                // the argument cannot be a negative number because we checked earlier
+                // -> the next argument is a parameter name
+                map.put(key, NO_VALUE_KEY);
+            } else {
+                map.put(key, args[i]);
+                i += 1;
+            }
+        }
+
+        return fromMap(map);
+    }
+
+    /**
+     * Returns {@link ParameterTool} for the given {@link Properties} file.
+     *
+     * @param path Path to the properties file
+     * @return A {@link ParameterTool}
+     * @throws IOException If the file does not exist
+     * @see Properties
+     */
+    public static ParameterTool fromPropertiesFile(String path) throws IOException {
+        File propertiesFile = new File(path);
+        return fromPropertiesFile(propertiesFile);
+    }
+
+    /**
+     * Returns {@link ParameterTool} for the given {@link Properties} file.
+     *
+     * @param file File object to the properties file
+     * @return A {@link ParameterTool}
+     * @throws IOException If the file does not exist
+     * @see Properties
+     */
+    public static ParameterTool fromPropertiesFile(File file) throws IOException {
+        if (!file.exists()) {
+            throw new FileNotFoundException(
+                    "Properties file " + file.getAbsolutePath() + " does not exist");
+        }
+        try (FileInputStream fis = new FileInputStream(file)) {
+            return fromPropertiesFile(fis);
+        }
+    }
+
+    /**
+     * Returns {@link ParameterTool} for the given InputStream from {@link Properties} file.
+     *
+     * @param inputStream InputStream from the properties file
+     * @return A {@link ParameterTool}
+     * @throws IOException If the file does not exist
+     * @see Properties
+     */
+    public static ParameterTool fromPropertiesFile(InputStream inputStream) throws IOException {
+        Properties props = new Properties();
+        props.load(inputStream);
+        return fromMap((Map) props);
+    }
+
+    /**
+     * Returns {@link ParameterTool} for the given map.
+     *
+     * @param map A map of arguments. Both Key and Value have to be Strings
+     * @return A {@link ParameterTool}
+     */
+    public static ParameterTool fromMap(Map<String, String> map) {
+        checkNotNull(map, "Unable to initialize from empty map");
+        return new ParameterTool(map);
+    }
+
+    /**
+     * Returns {@link ParameterTool} from the system properties. Example on how to pass system
+     * properties: -Dkey1=value1 -Dkey2=value2
+     *
+     * @return A {@link ParameterTool}
+     */
+    public static ParameterTool fromSystemProperties() {
+        return fromMap((Map) System.getProperties());
+    }
+
+    // ------------------ ParameterUtil  ------------------------
+    protected final Map<String, String> data;
+
+    private ParameterTool(Map<String, String> data) {
+        this.data = Collections.unmodifiableMap(new HashMap<>(data));
+
+        this.defaultData = new ConcurrentHashMap<>(data.size());
+
+        this.unrequestedParameters =
+                Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));
+
+        unrequestedParameters.addAll(data.keySet());
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        ParameterTool that = (ParameterTool) o;
+        return Objects.equals(data, that.data)
+                && Objects.equals(defaultData, that.defaultData)
+                && Objects.equals(unrequestedParameters, that.unrequestedParameters);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(data, defaultData, unrequestedParameters);
+    }
+
+    // ------------------ Get data from the util ----------------
+
+    /**
+     * Returns number of parameters in {@link ParameterTool}.
+     */
+    public int getNumberOfParameters() {
+        return data.size();
+    }
+
+    /**
+     * Returns the String value for the given key. If the key does not exist it will return null.
+     */
+    public String get(String key) {
+        addToDefaults(key, null);
+        unrequestedParameters.remove(key);
+        return data.get(key);
+    }
+
+    public String getOrDefault(String key, String defaultValue) {
+        String s = get(key);
+        if (s == null) {
+            return defaultValue;
+        } else {
+            return s;
+        }
+    }
+
+
+    /**
+     * Check if value is set.
+     */
+    public boolean has(String value) {
+        addToDefaults(value, null);
+        unrequestedParameters.remove(value);
+        return data.containsKey(value);
+    }
+
+
+    /**
+     * Returns a {@link Properties} object from this {@link ParameterTool}.
+     *
+     * @return A {@link Properties}
+     */
+    public Properties getProperties() {
+        Properties props = new Properties();
+        props.putAll(this.data);
+        return props;
+    }
+
+    /**
+     * Create a properties file with all the known parameters (call after the last get*() call). Set
+     * the default value, if available.
+     *
+     * <p>Use this method to create a properties file skeleton.
+     *
+     * @param pathToFile Location of the default properties file.
+     */
+    public void createPropertiesFile(String pathToFile) throws IOException {
+        createPropertiesFile(pathToFile, true);
+    }
+
+    /**
+     * Create a properties file with all the known parameters (call after the last get*() call). Set
+     * the default value, if overwrite is true.
+     *
+     * @param pathToFile Location of the default properties file.
+     * @param overwrite  Boolean flag indicating whether or not to overwrite the file
+     * @throws IOException If overwrite is not allowed and the file exists
+     */
+    public void createPropertiesFile(String pathToFile, boolean overwrite) throws IOException {
+        final File file = new File(pathToFile);
+        if (file.exists()) {
+            if (overwrite) {
+                file.delete();
+            } else {
+                throw new RuntimeException(
+                        "File " + pathToFile + " exists and overwriting is not allowed");
+            }
+        }
+        final Properties defaultProps = new Properties();
+        defaultProps.putAll(this.defaultData);
+        try (final OutputStream out = new FileOutputStream(file)) {
+            defaultProps.store(
+                    out, "Default file created by Flink's ParameterUtil.createPropertiesFile()");
+        }
+    }
+
+    @Override
+    protected Object clone() throws CloneNotSupportedException {
+        return new ParameterTool(this.data);
+    }
+
+    // ------------------------- Interaction with other ParameterUtils -------------------------
+
+    /**
+     * Merges two {@link ParameterTool}.
+     *
+     * @param other Other {@link ParameterTool} object
+     * @return The Merged {@link ParameterTool}
+     */
+    public ParameterTool mergeWith(ParameterTool other) {
+        final Map<String, String> resultData = new HashMap<>(data.size() + other.data.size());
+        resultData.putAll(data);
+        resultData.putAll(other.data);
+
+        final ParameterTool ret = new ParameterTool(resultData);
+
+        final HashSet<String> requestedParametersLeft = new HashSet<>(data.keySet());
+        requestedParametersLeft.removeAll(unrequestedParameters);
+
+        final HashSet<String> requestedParametersRight = new HashSet<>(other.data.keySet());
+        requestedParametersRight.removeAll(other.unrequestedParameters);
+
+        ret.unrequestedParameters.removeAll(requestedParametersLeft);
+        ret.unrequestedParameters.removeAll(requestedParametersRight);
+
+        return ret;
+    }
+
+    // ------------------------- ExecutionConfig.UserConfig interface -------------------------
+
+    public Map<String, String> toMap() {
+        return data;
+    }
+
+    // ------------------------- Serialization ---------------------------------------------
+
+    private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException {
+        in.defaultReadObject();
+
+        defaultData = new ConcurrentHashMap<>(data.size());
+        unrequestedParameters = Collections.newSetFromMap(new ConcurrentHashMap<>(data.size()));
+    }
+
+    public static String getKeyFromArgs(String[] args, int index) {
+        String key;
+        if (args[index].startsWith("--")) {
+            key = args[index].substring(2);
+        } else if (args[index].startsWith("-")) {
+            key = args[index].substring(1);
+        } else {
+            throw new IllegalArgumentException(
+                    String.format(
+                            "Error parsing arguments '%s' on '%s'. Please prefix keys with -- or -.",
+                            Arrays.toString(args), args[index]));
+        }
+
+        if (key.isEmpty()) {
+            throw new IllegalArgumentException(
+                    "The input " + Arrays.toString(args) + " contains an empty argument");
+        }
+
+        return key;
+    }
+
+    public static boolean isNumber(final String str) {
+        if (isEmpty(str)) {
+            return false;
+        }
+        final char[] chars = str.toCharArray();
+        int sz = chars.length;
+        boolean hasExp = false;
+        boolean hasDecPoint = false;
+        boolean allowSigns = false;
+        boolean foundDigit = false;
+        // deal with any possible sign up front
+        final int start = (chars[0] == '-') ? 1 : 0;
+        if (sz > start + 1 && chars[start] == '0') { // leading 0
+            if (
+                    (chars[start + 1] == 'x') ||
+                            (chars[start + 1] == 'X')
+            ) { // leading 0x/0X
+                int i = start + 2;
+                if (i == sz) {
+                    return false; // str == "0x"
+                }
+                // checking hex (it can't be anything else)
+                for (; i < chars.length; i++) {
+                    if ((chars[i] < '0' || chars[i] > '9')
+                            && (chars[i] < 'a' || chars[i] > 'f')
+                            && (chars[i] < 'A' || chars[i] > 'F')) {
+                        return false;
+                    }
+                }
+                return true;
+            } else if (Character.isDigit(chars[start + 1])) {
+                // leading 0, but not hex, must be octal
+                int i = start + 1;
+                for (; i < chars.length; i++) {
+                    if (chars[i] < '0' || chars[i] > '7') {
+                        return false;
+                    }
+                }
+                return true;
+            }
+        }
+        sz--; // don't want to loop to the last char, check it afterwords
+        // for type qualifiers
+        int i = start;
+        // loop to the next to last char or to the last char if we need another digit to
+        // make a valid number (e.g. chars[0..5] = "1234E")
+        while (i < sz || (i < sz + 1 && allowSigns && !foundDigit)) {
+            if (chars[i] >= '0' && chars[i] <= '9') {
+                foundDigit = true;
+                allowSigns = false;
+
+            } else if (chars[i] == '.') {
+                if (hasDecPoint || hasExp) {
+                    // two decimal points or dec in exponent
+                    return false;
+                }
+                hasDecPoint = true;
+            } else if (chars[i] == 'e' || chars[i] == 'E') {
+                // we've already taken care of hex.
+                if (hasExp) {
+                    // two E's
+                    return false;
+                }
+                if (!foundDigit) {
+                    return false;
+                }
+                hasExp = true;
+                allowSigns = true;
+            } else if (chars[i] == '+' || chars[i] == '-') {
+                if (!allowSigns) {
+                    return false;
+                }
+                allowSigns = false;
+                foundDigit = false; // we need a digit after the E
+            } else {
+                return false;
+            }
+            i++;
+        }
+        if (i < chars.length) {
+            if (chars[i] >= '0' && chars[i] <= '9') {
+                // no type qualifier, OK
+                return true;
+            }
+            if (chars[i] == 'e' || chars[i] == 'E') {
+                // can't have an E at the last byte
+                return false;
+            }
+            if (chars[i] == '.') {
+                if (hasDecPoint || hasExp) {
+                    // two decimal points or dec in exponent
+                    return false;
+                }
+                // single trailing decimal point after non-exponent is ok
+                return foundDigit;
+            }
+            if (!allowSigns
+                    && (chars[i] == 'd'
+                    || chars[i] == 'D'
+                    || chars[i] == 'f'
+                    || chars[i] == 'F')) {
+                return foundDigit;
+            }
+            if (chars[i] == 'l'
+                    || chars[i] == 'L') {
+                // not allowing L with an exponent or decimal point
+                return foundDigit && !hasExp && !hasDecPoint;
+            }
+            // last character is illegal
+            return false;
+        }
+        // allowSigns is true iff the val ends in 'E'
+        // found digit it to make sure weird stuff like '.' and '1E-' doesn't pass
+        return !allowSigns && foundDigit;
+    }
+
+    public static boolean isEmpty(final CharSequence cs) {
+        return cs == null || cs.length() == 0;
+    }
+
+
+    protected void addToDefaults(String key, String value) {
+        final String currentValue = defaultData.get(key);
+        if (currentValue == null) {
+            if (value == null) {
+                value = DEFAULT_UNDEFINED;
+            }
+            defaultData.put(key, value);
+        } else {
+            // there is already an entry for this key. Check if the value is the undefined
+            if (currentValue.equals(DEFAULT_UNDEFINED) && value != null) {
+                // update key with better default value
+                defaultData.put(key, value);
+            }
+        }
+    }
+
+    public static <T> T checkNotNull(@Nullable T reference, @Nullable String errorMessage) {
+        if (reference == null) {
+            throw new NullPointerException(String.valueOf(errorMessage));
+        }
+        return reference;
+    }
+}