RepalProducer.java 2.2 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768
  1. package com.winhc.repal.mq.producer;
  2. import com.aliyun.openservices.ons.api.*;
  3. import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON;
  4. import com.winhc.repal.mq.MQConfig;
  5. import com.winhc.repal.mq.MQTag;
  6. import com.winhc.repal.mq.bean.SendMsgBean;
  7. import lombok.extern.slf4j.Slf4j;
  8. import org.springframework.stereotype.Component;
  9. import javax.annotation.PostConstruct;
  10. import javax.annotation.Resource;
  11. import java.nio.charset.StandardCharsets;
  12. import java.util.Properties;
  13. /**
  14. * @author Aaron
  15. * @date 2020/12/14 13:58
  16. * @description
  17. */
  18. @Slf4j
  19. @Component
  20. public class RepalProducer {
  21. @Resource
  22. protected MQConfig mqConfig;
  23. protected Producer producer;
  24. @PostConstruct
  25. public void init() {
  26. log.info("初始化消息发送队列begin");
  27. new Thread(this::initProducer).start();
  28. log.info("初始化消息发送队列end");
  29. }
  30. private Producer initProducer() {
  31. try {
  32. System.setProperty("rocketmq.client.log.loadconfig", "false");
  33. Properties producerProperties = new Properties();
  34. producerProperties.setProperty(PropertyKeyConst.AccessKey, mqConfig.getAccessKey());
  35. producerProperties.setProperty(PropertyKeyConst.SecretKey, mqConfig.getSecretKey());
  36. producerProperties.setProperty(PropertyKeyConst.ONSAddr, mqConfig.getUrl());
  37. producer = ONSFactory.createProducer(producerProperties);
  38. producer.start();
  39. log.info("Producer start success.");
  40. return producer;
  41. } catch (Exception e) {
  42. log.error("初始化失败", e);
  43. return null;
  44. }
  45. }
  46. private String send(String req, String topic, String tag) {
  47. Message message = new Message(topic, tag, req.getBytes(StandardCharsets.UTF_8));
  48. SendResult sendResult = producer.send(message);
  49. if (sendResult != null) {
  50. log.info(" Send mq message success! Topic is:" + topic + " msgId is: "
  51. + sendResult.getMessageId());
  52. } else {
  53. return "FAIL";
  54. }
  55. return "SUCCESS";
  56. }
  57. public String sendMsg(SendMsgBean req) {
  58. return send(JSON.toJSONString(req), mqConfig.getMessageTopic(), MQTag.BizTag.tag_msg.name());
  59. }
  60. }