消息队列

yun-zuoyi
yunhao.wang 6 years ago
parent b722602ed5
commit 33a67c4778

@ -30,7 +30,6 @@ public class I3CoreQueueConfig {
/*********** 队列demo ***********/
public static final String DEMO_STR_QUEUE = "demo_str_queue";
@Bean
public Queue getStrQueue() {
//LOGGER.info("【DEMO_STR_QUEUE队列】");
@ -38,7 +37,6 @@ public class I3CoreQueueConfig {
}
public static final String DEMO_OBJ_QUEUE = "demo_obj_queue";
@Bean
public Queue getObjQueue() throws Exception {
//LOGGER.info("【DEMO_OBJ_QUEUE队列】");
@ -46,7 +44,6 @@ public class I3CoreQueueConfig {
}
public static final String DEMO_HANDLE_QUEUE = "demo_handle_queue";
@Bean
public Queue getHandleQueue() throws Exception {
//LOGGER.info("【DEMO_HANDLE_QUEUE队列】");
@ -54,7 +51,6 @@ public class I3CoreQueueConfig {
}
public static final String DEMO_ACK_QUEUE = "ack_queue";
@Bean
public Queue getAckQueue() throws Exception {
//LOGGER.info("【DEMO_ACK_QUEUE队列】");
@ -62,7 +58,6 @@ public class I3CoreQueueConfig {
}
public static final String DEMO_RETURN_QUEUE = "return_queue";
@Bean
public Queue getReturnQueue() throws Exception {
//LOGGER.info("【DEMO_RETURN_QUEUE队列】");

@ -1,99 +0,0 @@
package cn.estsh.i3plus.core.apiservice.mq;
import cn.estsh.i3plus.core.api.iservice.busi.ISysMessageService;
import cn.estsh.i3plus.core.api.iservice.busi.ISysUserService;
import cn.estsh.i3plus.core.apiservice.util.MailUtil;
import cn.estsh.i3plus.platform.common.tool.TimeTool;
import cn.estsh.i3plus.platform.common.util.PlatformConstWords;
import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil;
import cn.estsh.i3plus.pojo.platform.bean.SysMessage;
import cn.estsh.i3plus.pojo.platform.bean.SysRefUserMessage;
import cn.estsh.i3plus.pojo.platform.bean.SysUser;
import com.rabbitmq.client.Channel;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
/**
* @Description :
* @Reference :
* @Author : yunhao
* @CreateDate : 2018-11-15 22:15
* @Modify:
**/
@Component
public class MailQueueReceiver {
private static final Logger LOGGER = LoggerFactory.getLogger(MailQueueReceiver.class);
@Autowired
ISysMessageService sysMessageService;
@Autowired
ISysUserService sysUserService;
@Autowired
MailUtil mailUtil;
/**
*
* @param msg
* @param channel
* @param message
*/
@RabbitListener(queues = PlatformConstWords.IMPP_MESSAGE_MAIL_QUEUE)
public void processImppMail(SysMessage msg, Channel channel, Message message) {
try {
LOGGER.info("【MQ-IMPP_MESSAGE_MAIL_QUEUE】数据接收成功{}",msg);msg = sysMessageService.insertSysMessage(msg);
mailUtil.init();
// 收件人信息
String[] messageReceiver = msg.getMessageReceiversId().split(",");
String[] receiverName = new String[messageReceiver.length];
SysRefUserMessage refUserMessage;
SysUser sysUser;
for (int i = 0; i < messageReceiver.length; i++) {
sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i]));
receiverName[i] = sysUser.getUserName();
refUserMessage = new SysRefUserMessage();
refUserMessage.setMessageId(msg.getId());
refUserMessage.setMessageTitleRdd(msg.getMessageTitle());
refUserMessage.setMessageTypeRdd(msg.getMessageType());
refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd());
refUserMessage.setReceiverId(sysUser.getId());
refUserMessage.setReceiverNameRdd(sysUser.getUserName());
refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
refUserMessage.setReceiverTime(TimeTool.getNowTime(true));
sysMessageService.insertSysRefUserMessage(refUserMessage);
// 发送邮件
mailUtil.setSubject(msg.getMessageTitle());
mailUtil.setContentType(ImppEnumUtil.MESSAGE_TYPE_CONTENT.valueOfDescription(msg.getMessageContentType()));
mailUtil.setBody(msg.getMessageContent());
mailUtil.setTo(sysUser.getUserEmail());
mailUtil.send();
}
msg.setMessageSenderNameRdd(StringUtils.join(receiverName, ","));
sysMessageService.updateSysMessage(msg);
//信息已处理
channel.basicAck(message.getMessageProperties().getDeliveryTag(),false);
} catch (IOException e) {
LOGGER.error("【MQ-IMPP_MESSAGE_MAIL_QUEUE】处理出错{}",e.getMessage(),e);
//丢弃这条消息
try {
// 未成功处理,重新发送
channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true);
} catch (IOException e1) {
e1.printStackTrace();
}
}
}
}
Loading…
Cancel
Save