package com.winhc.repal.mq.producer; import com.aliyun.openservices.ons.api.*; import com.aliyun.openservices.shade.com.alibaba.fastjson.JSON; import com.winhc.repal.mq.MQConfig; import com.winhc.repal.mq.MQTag; import com.winhc.repal.mq.bean.SendMsgBean; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Component; import javax.annotation.PostConstruct; import javax.annotation.Resource; import java.nio.charset.StandardCharsets; import java.util.Properties; /** * @author Aaron * @date 2020/12/14 13:58 * @description */ @Slf4j @Component public class RepalProducer { @Resource protected MQConfig mqConfig; protected Producer producer; @PostConstruct public void init() { log.info("初始化消息发送队列begin"); new Thread(this::initProducer).start(); log.info("初始化消息发送队列end"); } private Producer initProducer() { try { System.setProperty("rocketmq.client.log.loadconfig", "false"); Properties producerProperties = new Properties(); producerProperties.setProperty(PropertyKeyConst.AccessKey, mqConfig.getAccessKey()); producerProperties.setProperty(PropertyKeyConst.SecretKey, mqConfig.getSecretKey()); producerProperties.setProperty(PropertyKeyConst.ONSAddr, mqConfig.getUrl()); producer = ONSFactory.createProducer(producerProperties); producer.start(); log.info("Producer start success."); return producer; } catch (Exception e) { log.error("初始化失败", e); return null; } } private String send(String req, String topic, String tag) { Message message = new Message(topic, tag, req.getBytes(StandardCharsets.UTF_8)); SendResult sendResult = producer.send(message); if (sendResult != null) { log.info(" Send mq message success! Topic is:" + topic + " msgId is: " + sendResult.getMessageId()); } else { return "FAIL"; } return "SUCCESS"; } public String sendMsg(SendMsgBean req) { return send(JSON.toJSONString(req), mqConfig.getMessageTopic(), MQTag.BizTag.tag_msg.name()); } }