diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/I3CoreQueueConfig.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/I3CoreQueueConfig.java index 2a733fa..cc9017d 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/I3CoreQueueConfig.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/I3CoreQueueConfig.java @@ -17,43 +17,48 @@ import org.springframework.context.annotation.Configuration; public class I3CoreQueueConfig { private static final Logger LOGGER = LoggerFactory.getLogger(I3CoreQueueConfig.class); - public static final String DEMO_STR_QUEUE = "demo_str_queue"; - - public static final String DEMO_OBJ_QUEUE = "demo_obj_queue"; + public static final String IMPP_MESSAGE_QUEUE = "IMPP_MESSAGE_QUEUE"; + @Bean + public Queue getImppMessageQueue() { + LOGGER.info("【开启平台消息队列】"); + return new Queue(IMPP_MESSAGE_QUEUE); + } - public static final String DEMO_HANDLE_QUEUE = "demo_handle_queue"; - public static final String DEMO_ACK_QUEUE = "ack_queue"; - - public static final String DEMO_RETURN_QUEUE = "return_queue"; + /*********** 队列demo ***********/ + public static final String DEMO_STR_QUEUE = "demo_str_queue"; @Bean public Queue getStrQueue() { //LOGGER.info("【DEMO_STR_QUEUE队列】"); return new Queue(DEMO_STR_QUEUE); } + public static final String DEMO_OBJ_QUEUE = "demo_obj_queue"; @Bean public Queue getObjQueue() throws Exception { //LOGGER.info("【DEMO_OBJ_QUEUE队列】"); return new Queue(DEMO_OBJ_QUEUE); } + public static final String DEMO_HANDLE_QUEUE = "demo_handle_queue"; @Bean public Queue getHandleQueue() throws Exception { //LOGGER.info("【DEMO_HANDLE_QUEUE队列】"); return new Queue(DEMO_HANDLE_QUEUE); } - @Bean - public Queue getReturnQueue() throws Exception { - //LOGGER.info("【DEMO_RETURN_QUEUE队列】"); - return new Queue(DEMO_RETURN_QUEUE); - } - + public static final String DEMO_ACK_QUEUE = "ack_queue"; @Bean public Queue getAckQueue() throws Exception { LOGGER.info("【DEMO_ACK_QUEUE队列】"); return new Queue(DEMO_ACK_QUEUE); } + + public static final String DEMO_RETURN_QUEUE = "return_queue"; + @Bean + public Queue getReturnQueue() throws Exception { + //LOGGER.info("【DEMO_RETURN_QUEUE队列】"); + return new Queue(DEMO_RETURN_QUEUE); + } } diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/I3CoreQueueReceiver.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/I3CoreQueueReceiver.java index ff31534..c2a4f97 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/I3CoreQueueReceiver.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/I3CoreQueueReceiver.java @@ -1,6 +1,10 @@ package cn.estsh.i3plus.core.apiservice.mq; +import cn.estsh.i3plus.core.apiservice.controller.DemoRestController; +import cn.estsh.i3plus.pojo.platform.bean.SysMessage; import com.rabbitmq.client.Channel; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component; @@ -16,6 +20,36 @@ import java.io.IOException; **/ @Component public class I3CoreQueueReceiver { + private static final Logger LOGGER = LoggerFactory.getLogger(I3CoreQueueReceiver.class); + + /** + * 系统邮件处理队列 + * @param msg + * @param channel + * @param message + * 发送:rabbitTemplate.convertAndSend(I3CoreQueueConfig.IMPP_MESSAGE_QUEUE, new SysMessage(....)); + */ + @RabbitListener(queues = I3CoreQueueConfig.IMPP_MESSAGE_QUEUE) + public void processImppMessage(SysMessage msg, Channel channel, Message message) { + try { + LOGGER.info("【MQ-IMPP_MESSAGE_QUEUE】数据接收成功:{}",msg); + + //信息已处理 + channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); + } catch (IOException e) { + LOGGER.error("【MQ-IMPP_MESSAGE_QUEUE】处理出错:{}",e.getMessage(),e); + //丢弃这条消息 + try { + // 未成功处理,重新发送 + channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); + } catch (IOException e1) { + e1.printStackTrace(); + } + } + } + + + /********************* 消息队列处理demo *******************/ /*@RabbitListener(queues = I3CoreQueueConfig.DEMO_STR_QUEUE) public void getObjQueue(String data) { @@ -71,10 +105,12 @@ public class I3CoreQueueReceiver { //在队列删掉 不会再发了 否则消息服务器以为这条消息没处理掉 后续还会在发 channel.basicAck(message.getMessageProperties().getDeliveryTag(),false); } catch (IOException e) { - e.printStackTrace(); //丢弃这条消息 + e.printStackTrace(); + //丢弃这条消息 // channel.basicNack(message.getMessageProperties().getDeliveryTag(), false,false); // System.out.println("receiver fail"); try { + // channel.basicNack(message.getMessageProperties().getDeliveryTag(),false,true); } catch (IOException e1) { e1.printStackTrace(); diff --git a/modules/i3plus-core-apiservice/src/main/resources/application-dev.properties b/modules/i3plus-core-apiservice/src/main/resources/application-dev.properties index 1c52958..02cfe38 100644 --- a/modules/i3plus-core-apiservice/src/main/resources/application-dev.properties +++ b/modules/i3plus-core-apiservice/src/main/resources/application-dev.properties @@ -14,8 +14,6 @@ filter.shiro.saadmin.filteruri = /saoperate/* #是否允许前端跨域提交 impp.web.cross = true -#允许前端跨域提交ip地址,多个以逗号分隔 -impp.web.cross.hosts = http://127.0.0.1,http://localhost ################ 主数据源 ################ # mysql