Procházet zdrojové kódy

feat: add & fix

- add company name alias
- fix bulk-get
许家凯 před 2 roky
rodič
revize
08728eeb49

+ 27 - 0
pom.xml

@@ -79,6 +79,26 @@
             <artifactId>springfox-boot-starter</artifactId>
             <version>3.0.0</version>
         </dependency>
+
+        <dependency>
+            <groupId>com.github.answerail</groupId>
+            <artifactId>dinger-spring-boot-starter</artifactId>
+            <version>1.2.1</version>
+        </dependency>
+
+
+        <dependency>
+            <groupId>com.aliyun.oss</groupId>
+            <artifactId>aliyun-sdk-oss</artifactId>
+            <version>3.13.1</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.winhc</groupId>
+            <artifactId>winhc-bigdata-tool</artifactId>
+            <version>1.0</version>
+        </dependency>
+
     </dependencies>
 
     <build>
@@ -112,6 +132,13 @@
                             <password>WUDk&amp;tC)SiTGouRP7noWucx{7C4|9dCf</password>
                         </auth>
                     </to>
+                    <container>
+                        <jvmFlags>
+                            <jvmFlag>-Xms2g</jvmFlag>
+                            <jvmFlag>-Xmx2g</jvmFlag>
+                        </jvmFlags>
+                        <mainClass>com.winhc.fast.query.FastQuery4SpiderApplication</mainClass>
+                    </container>
                 </configuration>
                 <executions>
                     <execution>

+ 69 - 8
src/main/java/com/winhc/fast/query/configuration/GlobalExceptionHandler.java

@@ -1,14 +1,23 @@
 package com.winhc.fast.query.configuration;
 
-import com.alibaba.lindorm.client.exception.NotServingRegionException;
+import com.github.jaemon.dinger.DingerSender;
+import com.github.jaemon.dinger.core.entity.DingerRequest;
+import com.github.jaemon.dinger.core.entity.enums.MessageSubType;
 import com.winhc.fast.query.vo.ResponseVo;
 import lombok.AllArgsConstructor;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
 import org.springframework.http.HttpStatus;
 import org.springframework.web.bind.annotation.ExceptionHandler;
 import org.springframework.web.bind.annotation.ResponseStatus;
 import org.springframework.web.bind.annotation.RestControllerAdvice;
 
+import java.io.PrintWriter;
+import java.io.StringWriter;
+
+import static com.winhc.tool.utils.BaseUtils.isWindows;
+
 /**
  * @author: XuJiakai
  * 2022/5/25 15:38
@@ -18,22 +27,74 @@ import org.springframework.web.bind.annotation.RestControllerAdvice;
 @AllArgsConstructor
 public class GlobalExceptionHandler {
     private HbaseHealthIndicator hbaseHealthIndicator;
+    private DingerSender dingerSender;
+
+    public static String getErrorMsg(Throwable thr) {
+        StringWriter stringWriter = new StringWriter();
+        PrintWriter printWriter = new PrintWriter(stringWriter);
+        try {
+            thr.printStackTrace(printWriter);
+            return stringWriter.toString();
+        } finally {
+            if (printWriter != null) {
+                printWriter.close();
+            }
+        }
+    }
+
 
     @ResponseStatus(HttpStatus.OK)
-    @ExceptionHandler(value = Exception.class)
-    public ResponseVo handleBadRequest(Exception e) {
-        log.error(e.getMessage(), e);
+    @ExceptionHandler(value = RetriesExhaustedException.class)
+    public ResponseVo handleHbaseException(RetriesExhaustedException e) {
+        String message = e.getMessage();
+        if (StringUtils.isNotBlank(message) && message.contains("TableNotFoundException")) {
+            log.error(e.getMessage());
+        } else {
+//            checkHbaseError(e);
+            error(true, e);
+            log.error("exception name:" + e.getClass().getName());
+            log.error(e.getMessage(), e);
+        }
         return ResponseVo.failure(e.getMessage());
     }
 
     @ResponseStatus(HttpStatus.OK)
-    @ExceptionHandler(value = NotServingRegionException.class)
-    public ResponseVo handleBadRequest(NotServingRegionException e) {
-        log.error("发现hbase异常!");
+    @ExceptionHandler(value = Exception.class)
+    public ResponseVo handleBadRequest(Exception e) {
+//        checkHbaseError(e);
+        error(true, e);
+        log.error("exception name:" + e.getClass().getName());
         log.error(e.getMessage(), e);
-        hbaseHealthIndicator.putErrorInfo(e);
         return ResponseVo.failure(e.getMessage());
     }
 
 
+    private void error(boolean isRestart, Exception e) {
+        Boolean windows = isWindows();
+        if (windows) {
+            return;
+        }
+        String name = e.getClass().getName();
+        if (!name.startsWith("org.apache.hadoop.hbase")) {
+            log.warn("error: {}", name);
+            return;
+        }
+
+        String errorMsg = getErrorMsg(e);
+        dingerSender.send(
+                MessageSubType.MARKDOWN,
+                DingerRequest.request("exception name: " + e.getClass().getName() + "\n" +
+                        "\n" +
+                        "exception msg:" + e.getMessage() + "\n" +
+                        "\n" +
+                        "```\n" +
+                        errorMsg +
+                        "\n" +
+                        "```\n", isRestart ? "触发重启" : "程序不健康")
+        );
+        if (isRestart) {
+            hbaseHealthIndicator.putErrorInfo(e);
+        }
+    }
+
 }

+ 28 - 0
src/main/java/com/winhc/fast/query/configuration/ProjectConfiguration.java

@@ -0,0 +1,28 @@
+package com.winhc.fast.query.configuration;
+
+import com.github.jaemon.dinger.support.CustomMessage;
+import com.winhc.tool.nlp.CompanyNameAnalyzer;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import java.io.IOException;
+
+import static com.github.jaemon.dinger.constant.DingerConstant.MARKDOWN_MESSAGE;
+
+/**
+ * @author: XuJiakai
+ * 2022/6/9 15:11
+ */
+@Configuration
+public class ProjectConfiguration {
+    @Bean(MARKDOWN_MESSAGE)
+    public CustomMessage markDownMessage() {
+        return new WinhcDingTalkMessage();
+    }
+
+    @Bean
+    public CompanyNameAnalyzer companyNameAnalyzer() throws IOException {
+        CompanyNameAnalyzer companyNameAnalyzer = new CompanyNameAnalyzer();
+        return companyNameAnalyzer;
+    }
+}

+ 32 - 0
src/main/java/com/winhc/fast/query/configuration/WinhcDingTalkMessage.java

@@ -0,0 +1,32 @@
+package com.winhc.fast.query.configuration;
+
+import com.github.jaemon.dinger.core.entity.DingerRequest;
+import com.github.jaemon.dinger.support.CustomMessage;
+
+import java.text.MessageFormat;
+import java.util.List;
+
+/**
+ * @author: XuJiakai
+ * 2022/6/9 15:06
+ */
+public class WinhcDingTalkMessage  implements CustomMessage {
+
+    @Override
+    public String message(String projectId, DingerRequest request) {
+        String content = request.getContent();
+        String title = request.getTitle();
+        List<String> phones = request.getPhones();
+        // markdown在text内容里需要有@手机号
+        StringBuilder text = new StringBuilder(title);
+        if (phones != null && !phones.isEmpty()) {
+            for (String phone : phones) {
+                text.append("@").append(phone);
+            }
+        }
+        return MessageFormat.format(
+                "#### 【{1}】 【{0}】 \n\n{2}",
+                text, projectId, content);
+    }
+
+}

+ 47 - 0
src/main/java/com/winhc/fast/query/controller/CompanyNameAliasController.java

@@ -0,0 +1,47 @@
+package com.winhc.fast.query.controller;
+
+import com.winhc.fast.query.vo.ResponseVo;
+import com.winhc.tool.nlp.CompanyNameAnalyzer;
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import lombok.AllArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.web.bind.annotation.*;
+import reactor.core.publisher.Mono;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/**
+ * @author: XuJiakai
+ * 2022/7/7 16:05
+ */
+@Slf4j
+@AllArgsConstructor
+@RestController
+@Api(tags = "公司字号提取", value = "name_alias")
+@RequestMapping("company-name-alias")
+public class CompanyNameAliasController {
+
+    private CompanyNameAnalyzer companyNameAnalyzer;
+
+
+    @PostMapping()
+    @ApiOperation(value = "提取")
+    public Mono<ResponseVo> alias4Body(@RequestBody String[] keyword) {
+        List<String> collect = Arrays.stream(keyword)
+                .map(name -> companyNameAnalyzer.nameAliasExtract(name))
+                .collect(Collectors.toList());
+        return Mono.just(ResponseVo.success(collect));
+    }
+
+    @GetMapping()
+    @ApiOperation(value = "提取")
+    public Mono<ResponseVo> alias(@RequestParam String[] keyword) {
+        List<String> collect = Arrays.stream(keyword)
+                .map(name -> companyNameAnalyzer.nameAliasExtract(name))
+                .collect(Collectors.toList());
+        return Mono.just(ResponseVo.success(collect));
+    }
+}

+ 10 - 5
src/main/java/com/winhc/fast/query/controller/HbaseQueryController.java

@@ -39,7 +39,8 @@ public class HbaseQueryController {
         return hbaseRepository.scan(tableName, rowPrefix, size)
                 .map(results -> ResponseVo.success(start, results.stream().map(HbaseResultHandler::transform).collect(Collectors.toList())))
                 .defaultIfEmpty(ResponseVo.success(start, null))
-                .onErrorResume(e -> Mono.just(ResponseVo.failure(start, e.getMessage())));
+//                .onErrorResume(e -> Mono.just(ResponseVo.failure(start, e.getMessage())))
+                ;
     }
 
     @ApiOperation(value = "点查")
@@ -49,7 +50,11 @@ public class HbaseQueryController {
         return hbaseRepository.get(tableName, rowkey)
                 .map(r -> ResponseVo.success(start, HbaseResultHandler.transform(r)))
                 .defaultIfEmpty(ResponseVo.success(start, null))
-                .onErrorResume(e -> Mono.just(ResponseVo.failure(start, e.getMessage())));
+               /* .onErrorResume(e -> {
+                    log.error(e.getMessage(),e);
+
+                    return Mono.just(ResponseVo.failure(start, e.getMessage()));
+                })*/;
 
     }
 
@@ -65,7 +70,7 @@ public class HbaseQueryController {
                 .map(String::toUpperCase)
                 .collect(Collectors.toList());
 
-        List<Get> gets = Arrays.stream(hbaseQueryParamVo.getQueryKey())
+        List<Get> gets = Arrays.stream(hbaseQueryParamVo.getRowkey())
                 .map(e -> new Get(e.getBytes(StandardCharsets.UTF_8)))
                 .peek(get -> {
                     for (String key : keys) {
@@ -73,9 +78,9 @@ public class HbaseQueryController {
                     }
                 }).collect(Collectors.toList());
         return hbaseRepository.bulkGet(tableName, gets)
-                .map(results -> ResponseVo.success(start, results.stream().map(HbaseResultHandler::transform).collect(Collectors.toList())))
+                .map(results -> ResponseVo.success(start, results.stream().map(HbaseResultHandler::transform).collect(Collectors.toMap(e->e.get("rowkey"),e->{e.remove("rowkey");return e;}))))
                 .defaultIfEmpty(ResponseVo.success(start, null))
-                .onErrorResume(e -> Mono.just(ResponseVo.failure(start, e.getMessage())));
+                ;
     }
 
 }

+ 0 - 28
src/main/java/com/winhc/fast/query/controller/TestController.java

@@ -1,28 +0,0 @@
-package com.winhc.fast.query.controller;
-
-import com.winhc.fast.query.configuration.HbaseHealthIndicator;
-import com.winhc.fast.query.vo.ResponseVo;
-import lombok.AllArgsConstructor;
-import lombok.extern.slf4j.Slf4j;
-import org.springframework.web.bind.annotation.GetMapping;
-import org.springframework.web.bind.annotation.RequestMapping;
-import org.springframework.web.bind.annotation.RestController;
-import reactor.core.publisher.Mono;
-
-/**
- * @author: XuJiakai
- * 2022/6/7 10:20
- */
-@Slf4j
-@AllArgsConstructor
-@RestController
-@RequestMapping("test")
-public class TestController {
-    HbaseHealthIndicator hbaseHealthIndicator;
-
-    @GetMapping("set-status")
-    public Mono<ResponseVo> setStatus() {
-        hbaseHealthIndicator.putErrorInfo(new RuntimeException("aaa"));
-        return Mono.just(ResponseVo.success("ok"));
-    }
-}

+ 7 - 0
src/main/resources/application-dev.yml

@@ -1,3 +1,8 @@
+logging:
+  level:
+    root: WARN
+    com.winhc: DEBUG
+
 spring:
   datasource:
     phoenix:
@@ -15,6 +20,8 @@ es:
 
 
 winhc:
+  oss:
+    end-point: oss-cn-shanghai.aliyuncs.com
   dynamic:
     elasticsearch:
       primary: new

+ 8 - 1
src/main/resources/application-prod.yml

@@ -1,5 +1,10 @@
-spring:
+logging:
+  level:
+    root: WARN
+    com.winhc: info
+
 
+spring:
   datasource:
     phoenix:
       server:
@@ -36,6 +41,8 @@ spring:
       concurrency: 1
 
 winhc:
+  oss:
+    end-point: oss-cn-shanghai.aliyuncs.com
   dynamic:
     elasticsearch:
       primary: new

+ 20 - 4
src/main/resources/application.yml

@@ -1,12 +1,20 @@
+server:
+  port: 8388
 spring:
+  application:
+    name: fast-query-4-spider
   profiles:
     active: ${SPRING_PROFILES_ACTIVE:prod}
   jackson:
     date-format: yyyy-MM-dd HH:mm:ss
     time-zone: GMT+8
-server:
-  port: 8388
-
+  dinger:
+    project-id: ${spring.application.name}
+    dingers:
+      # 使用钉钉机器人, 请根据自己机器人配置信息进行修改
+      dingtalk:
+        tokenId: 2773b742b74d84599c4f05f7b42cacd6714b10c33cd4c74402649019fa7e56c8
+        secret: SECe7b26876f443e77f872b8b10880e39b3c5dfaf44855f1aa3235372bb73698ab6
 
 odps:
   access-key-id: LTAI4FynxS5nNuKyZ3LHhMX5
@@ -19,4 +27,12 @@ management:
       base-path: /manage
   endpoint:
     health:
-      show-details: always
+      show-details: always
+
+winhc:
+  oss:
+    bucket-name: bigdata-rt
+    access-key-id: LTAI4G4yiyJV4ggnLyGMduqV
+    secret-access-key: nokDg5HlVIBh80nL2dOXsKa2La4XL5
+
+