diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/busi/SysConfigController.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/busi/SysConfigController.java index c06b38d..f0ce24b 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/busi/SysConfigController.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/busi/SysConfigController.java @@ -221,7 +221,7 @@ public class SysConfigController extends CoreBaseController { mailUtil.send(); return ResultBean.success("测试邮件已发送").setCode(ResourceEnumUtil.MESSAGE.SUCCESS.getCode()); }catch(ImppBusiException busExcep){ - return ResultBean.fail(busExcep.getErrorShow()); + return ResultBean.fail(busExcep); }catch(Exception e){ return ImppExceptionBuilder.newInstance().buildExceptionResult(e); } diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/busi/SysMessageController.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/busi/SysMessageController.java index 9493504..3c6ee64 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/busi/SysMessageController.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/controller/busi/SysMessageController.java @@ -214,7 +214,7 @@ public class SysMessageController extends CoreBaseController { @ApiOperation(value = "查看用户未读站内信") public ResultBean findUnreadUserMessage(){ try { - List userMessageList = sysMessageService.findSysRefUserMessageByUserIdAndStatus(getSessionUser().getUser().getId(), + List userMessageList = sysMessageService.findSysRefUserMessageByUserIdAndStatus(getSessionUser().getUserInfo().getId(), ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue()); return ResultBean.success("查询成功").setResultList(userMessageList).setCode(ResourceEnumUtil.MESSAGE.SUCCESS.getCode()); }catch(ImppBusiException busExcep){ diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/LetterQueueReceiver.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/LetterQueueReceiver.java index adbd949..da8aab6 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/LetterQueueReceiver.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/mq/LetterQueueReceiver.java @@ -8,6 +8,7 @@ import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil; import cn.estsh.i3plus.pojo.platform.bean.SysMessage; import cn.estsh.i3plus.pojo.platform.bean.SysRefUserMessage; import cn.estsh.i3plus.pojo.platform.bean.SysUser; +import com.alibaba.fastjson.JSON; import com.rabbitmq.client.Channel; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; @@ -18,6 +19,7 @@ import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import java.io.IOException; +import java.util.List; /** * @Description : 站内信队列处理 @@ -42,7 +44,7 @@ public class LetterQueueReceiver { * @param message * 发送:rabbitTemplate.convertAndSend(I3CoreQueueConfig.IMPP_MESSAGE_QUEUE, new SysMessage(....)); */ -// @RabbitListener(queues = I3CoreQueueConfig.IMPP_MESSAGE_LETTER_QUEUE) + @RabbitListener(queues = I3CoreQueueConfig.IMPP_MESSAGE_LETTER_QUEUE) public void processImppMessage(SysMessage msg, Channel channel, Message message) { try { LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功:{}",msg); @@ -55,6 +57,7 @@ public class LetterQueueReceiver { SysRefUserMessage refUserMessage; SysUser sysUser; + List userMessage; for (int i = 0; i < messageReceiver.length; i++) { sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i])); @@ -65,14 +68,18 @@ public class LetterQueueReceiver { refUserMessage.setMessageTitleRdd(msg.getMessageTitle()); refUserMessage.setMessageTypeRdd(msg.getMessageType()); refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd()); - refUserMessage.setReceiverId(sysUser.getId()); + refUserMessage.setReceiverId(sysUser.getUserInfoId()); refUserMessage.setReceiverNameRdd(sysUser.getUserName()); refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue()); refUserMessage.setReceiverTime(TimeTool.getNowTime(true)); sysMessageService.insertSysRefUserMessage(refUserMessage); - MessageWebSocket.getWebSocketSet().get(sysUser.getId()).sendMessage(); + userMessage = sysMessageService.findSysRefUserMessageByUserIdAndStatus(sysUser.getUserInfoId(), + ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue()); + MessageWebSocket.sendMessage(sysUser.getUserInfoId(), + JSON.toJSONString(userMessage) + ); } msg.setMessageSenderNameRdd(StringUtils.join(receiverName, ",")); diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/websocket/DemoWebSocket.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/websocket/DemoWebSocket.java index 786bd9d..1e67d14 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/websocket/DemoWebSocket.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/websocket/DemoWebSocket.java @@ -26,7 +26,7 @@ public class DemoWebSocket { private static int onlineCount = 0; //concurrent线程安全集合,存放客户端websocket对象 - private static CopyOnWriteArraySet webSocketSet = new CopyOnWriteArraySet(); + private static CopyOnWriteArraySet webSocketMap = new CopyOnWriteArraySet(); //websocket会话 private Session session; @@ -34,7 +34,7 @@ public class DemoWebSocket { @OnOpen public void onOpen(@PathParam("userName")String userName, Session session){ this.session = session; - webSocketSet.add(this); //加入set中 + webSocketMap.add(this); //加入set中 addOnlineCount(); //在线数加1 LOGGER.info("{}加入!当前在线人数为{}",userName,getOnlineCount()); try { @@ -49,7 +49,7 @@ public class DemoWebSocket { */ @OnClose public void onClose() { - webSocketSet.remove(this); //从set中删除 + webSocketMap.remove(this); //从set中删除 subOnlineCount(); //在线数减1 LOGGER.info("有一连接关闭!当前在线人数为" + getOnlineCount()); } @@ -63,7 +63,7 @@ public class DemoWebSocket { LOGGER.info("来自客户端的消息:" + message); //群发消息 - for (DemoWebSocket item : webSocketSet) { + for (DemoWebSocket item : webSocketMap) { try { item.sendMessage(message); } catch (IOException e) { @@ -91,7 +91,7 @@ public class DemoWebSocket { * 群发自定义消息 * */ public static void sendInfo(String message) throws IOException { - for (DemoWebSocket item : webSocketSet) { + for (DemoWebSocket item : webSocketMap) { try { item.sendMessage(message); } catch (IOException e) { diff --git a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/websocket/MessageWebSocket.java b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/websocket/MessageWebSocket.java index a438074..61ab42d 100644 --- a/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/websocket/MessageWebSocket.java +++ b/modules/i3plus-core-apiservice/src/main/java/cn/estsh/i3plus/core/apiservice/websocket/MessageWebSocket.java @@ -1,18 +1,14 @@ package cn.estsh.i3plus.core.apiservice.websocket; -import cn.estsh.i3plus.core.api.iservice.busi.ISysMessageService; -import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil; -import com.alibaba.fastjson.JSON; +import cn.estsh.impp.framework.base.controller.CoreBaseController; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; -import java.util.List; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -23,38 +19,25 @@ import java.util.concurrent.ConcurrentMap; * @CreateDate : 2018-11-24 16:57 * @Modify: **/ -@ServerEndpoint(value="/message-websocket/{userId}") +@ServerEndpoint(value= CoreBaseController.BASE_URL + "/message-websocket/{userId}") @Component public class MessageWebSocket { private static final Logger LOGGER = LoggerFactory.getLogger(MessageWebSocket.class); - private Long userId = 1L; - - //在线连接数 - private static int onlineCount = 0; - + private long userId = 1L; + //websocket会话 + private Session session; // 当前对象会话 + private static int sendCount = 1; //concurrent线程安全集合,存放客户端websocket对象 private static ConcurrentMap webSocketSet = new ConcurrentHashMap(); - //websocket会话 - private Session session; - - public static ConcurrentMap getWebSocketSet() { - return webSocketSet; - } - - @Autowired - private ISysMessageService sysMessageService; - - @OnOpen - public void onOpen(@PathParam("userId")Long userId, Session session){ - this.session = session; + public void onOpen(@PathParam("userId")long userId, Session session){ this.userId = userId; + this.session = session; - webSocketSet.put(userId,this); //加入set中 - addOnlineCount(); //在线数加1 + webSocketSet.put(userId,this); //在线人数添加 LOGGER.info("{}加入!当前在线人数为{}",userId,getOnlineCount()); } @@ -63,8 +46,7 @@ public class MessageWebSocket { */ @OnClose public void onClose() { - webSocketSet.remove(this.userId); //从set中删除 - subOnlineCount(); //在线数减1 + subOnlineUser(this.userId); LOGGER.info("有一连接关闭!当前在线人数为" + getOnlineCount()); } @@ -73,8 +55,13 @@ public class MessageWebSocket { * * @param message 客户端发送过来的消息*/ @OnMessage - public void onMessage(String message, Session session) { - LOGGER.info("来自客户端的消息:" + message); + public void onMessage(@PathParam("userId")Long userId,String message) { + // 心跳 + if("heartBit".equals(message)){ + this.sendMessage(userId,"heartBit"); + }else{ + LOGGER.info("来自客户端的消息:" , message); + } } /** @@ -87,21 +74,28 @@ public class MessageWebSocket { error.printStackTrace(); } - public void sendMessage() throws IOException { - List userMessageList = sysMessageService.findSysRefUserMessageByUserIdAndStatus(this.userId, - ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue()); - this.session.getBasicRemote().sendText(JSON.toJSONString(userMessageList)); + /** + * 发送消息 + * @param message + * @throws IOException + */ + public static void sendMessage(Long userId, String message){ + try { + MessageWebSocket websocket = webSocketSet.get(userId); + if (websocket != null){ + websocket.session.getBasicRemote().sendText(message + "=" + sendCount); + sendCount++; + } + } catch (IOException e) { + e.printStackTrace(); + } } public static synchronized int getOnlineCount() { - return onlineCount; - } - - public static synchronized void addOnlineCount() { - MessageWebSocket.onlineCount++; + return webSocketSet.size(); } - public static synchronized void subOnlineCount() { - MessageWebSocket.onlineCount--; + public synchronized void subOnlineUser(long userId) { + webSocketSet.remove(userId); } }