Browse Source

feat: add

许家凯 2 years ago
parent
commit
6bc8605b24

+ 30 - 0
src/main/java/com/winhc/bigdata/udf/ToJson.java

@@ -0,0 +1,30 @@
+package com.winhc.bigdata.udf;
+
+import com.alibaba.fastjson.JSONObject;
+import com.alibaba.fastjson.serializer.SerializerFeature;
+import com.aliyun.odps.udf.UDF;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * @author: XuJiakai
+ * 2021/7/6 15:07
+ */
+public class ToJson extends UDF {
+    public String evaluate(Integer size, String... other) {
+        if (other.length != size * 2) {
+            throw new RuntimeException("输入和size无法对应");
+        }
+        Map<String, String> map = new HashMap<>(size);
+        for (int i = 0; i < other.length / 2; i++) {
+            map.put(other[i], other[i + size]);
+        }
+        return JSONObject.toJSONString(map, SerializerFeature.WriteMapNullValue);
+    }
+
+    public static void main(String[] args) {
+        ToJson toJson = new ToJson();
+        System.out.println(toJson.evaluate(2,"a","b","c","d"));
+    }
+}

+ 32 - 0
src/main/java/com/winhc/bigdata/udf/biz/BinlogParse.java

@@ -0,0 +1,32 @@
+package com.winhc.bigdata.udf.biz;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.odps.udf.UDF;
+import com.aliyun.odps.utils.StringUtils;
+
+/**
+ * @author: XuJiakai
+ * 2021/10/18 16:20
+ */
+public class BinlogParse extends UDF {
+    public String evaluate(String data, String type, String field, String new_or_old) {
+        if (StringUtils.isEmpty(data)) {
+            return null;
+        }
+        JSONArray array = JSON.parseArray(data);
+        if (array.isEmpty()) {
+            return null;
+        }
+        JSONObject jsonObject = array.getJSONObject(0);
+
+        String key = field.toUpperCase();
+        if (jsonObject.containsKey(key)) {
+            String string = jsonObject.getString(key);
+            return string;
+        } else {
+            return null;
+        }
+    }
+}

+ 30 - 0
src/main/java/com/winhc/bigdata/udf/biz/BinlogParsePk.java

@@ -0,0 +1,30 @@
+package com.winhc.bigdata.udf.biz;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.odps.udf.UDF;
+import com.aliyun.odps.utils.StringUtils;
+
+import java.util.List;
+
+/**
+ * @author: XuJiakai
+ * 2021/10/18 17:00
+ */
+public class BinlogParsePk extends UDF {
+    public String evaluate(String data, String pk) {
+        if (StringUtils.isEmpty(data)) {
+            return null;
+        }
+        JSONArray array = JSON.parseArray(data);
+        JSONObject jsonObject = array.getJSONObject(0);
+        List<String> list = ((List<String>) JSON.parseObject(pk, List.class));
+        StringBuilder sb = new StringBuilder();
+
+        for (String s : list) {
+            sb.append(jsonObject.getString(s));
+        }
+        return sb.toString();
+    }
+}

+ 37 - 0
src/main/java/com/winhc/bigdata/udf/biz/BinlogParsePro.java

@@ -0,0 +1,37 @@
+package com.winhc.bigdata.udf.biz;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.odps.udf.UDF;
+import com.aliyun.odps.utils.StringUtils;
+
+/**
+ * @author: XuJiakai
+ * 2021/10/26 09:50
+ */
+@Deprecated
+public class BinlogParsePro extends UDF {
+
+    private String evl(String data, String field) {
+        if (StringUtils.isEmpty(data)) {
+            return null;
+        }
+        JSONArray array = JSON.parseArray(data);
+        JSONObject jsonObject = array.getJSONObject(0);
+        String key = field.toUpperCase();
+        String string = jsonObject.getString(key);
+        return string;
+    }
+
+
+    public String evaluate(String data, String old_data, String type, String field, String new_or_old) {
+        if ("old".equals(new_or_old)) {
+            return evl(old_data, field);
+        } else if ("new".equals(new_or_old)) {
+            return evl(data, field);
+        } else {
+            throw new RuntimeException("new or old !");
+        }
+    }
+}

+ 29 - 0
src/main/java/com/winhc/bigdata/udf/biz/BinlogParseUpdateFields.java

@@ -0,0 +1,29 @@
+package com.winhc.bigdata.udf.biz;
+
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.JSONArray;
+import com.alibaba.fastjson.JSONObject;
+import com.aliyun.odps.udf.UDF;
+import com.aliyun.odps.utils.StringUtils;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @author: XuJiakai
+ * 2021/10/26 09:59
+ */
+public class BinlogParseUpdateFields extends UDF {
+    public List<String> evaluate(String data) {
+        if (StringUtils.isEmpty(data)) {
+            return new ArrayList<>();
+        }
+        JSONArray array = JSON.parseArray(data);
+        if (array.isEmpty()) {
+            return new ArrayList<>();
+        }
+        JSONObject jsonObject = array.getJSONObject(0);
+        return jsonObject.keySet().stream().map(String::toLowerCase).collect(Collectors.toList());
+    }
+}

+ 16 - 0
src/main/java/com/winhc/bigdata/udf/etl/CompanyPhoneOrEmailSpilt.java

@@ -0,0 +1,16 @@
+package com.winhc.bigdata.udf.etl;
+
+import com.aliyun.odps.udf.UDF;
+import com.winhc.bigdata.utils.CompanyUtils;
+
+import java.util.List;
+
+/**
+ * @author: XuJiakai
+ * 2021/12/6 11:34
+ */
+public class CompanyPhoneOrEmailSpilt extends UDF {
+    public List<String> evaluate(String content) {
+        return CompanyUtils.spiltNames(content);
+    }
+}

+ 25 - 0
src/main/java/com/winhc/bigdata/udf/etl/CompanyPhonesFilterSplit.java

@@ -0,0 +1,25 @@
+package com.winhc.bigdata.udf.etl;
+
+import com.aliyun.odps.udf.UDF;
+import com.winhc.bigdata.utils.CompanyUtils;
+
+import java.util.List;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+
+/**
+ * @author: XuJiakai
+ * 2021/12/6 11:35
+ * split_company_mobile_phone
+ */
+public class CompanyPhonesFilterSplit extends UDF {
+    private static final Pattern pattern = Pattern.compile("1[0-9]{10}");
+    public List<String> evaluate(String content) {
+        List<String> strings = CompanyUtils.spiltNames(content);
+        return strings.stream().filter(c -> {
+            Matcher matcher = pattern.matcher(c);
+            return matcher.matches();
+        }).collect(Collectors.toList());
+    }
+}

+ 33 - 0
src/main/java/com/winhc/bigdata/utils/CompanyUtils.java

@@ -0,0 +1,33 @@
+package com.winhc.bigdata.utils;
+
+import com.aliyun.odps.utils.StringUtils;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @author: XuJiakai
+ * 2021/12/6 11:27
+ */
+public class CompanyUtils {
+    public static List<String> spiltNames(String val) {
+        if (StringUtils.isEmpty(val)) {
+            return Collections.emptyList();
+        }
+        return Arrays.stream(val.split("\t;\t")).map(String::trim).filter(StringUtils::isNotBlank).map(CompanyUtils::valTrim).collect(Collectors.toList());
+    }
+
+
+    private static String valTrim(String val) {
+        if (StringUtils.isBlank(val)) {
+            return null;
+        }
+        if (val.endsWith("\t;")) {
+            return val.replaceAll("\t;", "");
+        } else {
+            return val;
+        }
+    }
+}