wei.peng 7 years ago
commit 33bef44265

@ -221,7 +221,7 @@ public class SysConfigController extends CoreBaseController {
mailUtil.send(); mailUtil.send();
return ResultBean.success("测试邮件已发送").setCode(ResourceEnumUtil.MESSAGE.SUCCESS.getCode()); return ResultBean.success("测试邮件已发送").setCode(ResourceEnumUtil.MESSAGE.SUCCESS.getCode());
}catch(ImppBusiException busExcep){ }catch(ImppBusiException busExcep){
return ResultBean.fail(busExcep.getErrorShow()); return ResultBean.fail(busExcep);
}catch(Exception e){ }catch(Exception e){
return ImppExceptionBuilder.newInstance().buildExceptionResult(e); return ImppExceptionBuilder.newInstance().buildExceptionResult(e);
} }

@ -214,7 +214,7 @@ public class SysMessageController extends CoreBaseController {
@ApiOperation(value = "查看用户未读站内信") @ApiOperation(value = "查看用户未读站内信")
public ResultBean findUnreadUserMessage(){ public ResultBean findUnreadUserMessage(){
try { try {
List userMessageList = sysMessageService.findSysRefUserMessageByUserIdAndStatus(getSessionUser().getUser().getId(), List userMessageList = sysMessageService.findSysRefUserMessageByUserIdAndStatus(getSessionUser().getUserInfo().getId(),
ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue()); ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
return ResultBean.success("查询成功").setResultList(userMessageList).setCode(ResourceEnumUtil.MESSAGE.SUCCESS.getCode()); return ResultBean.success("查询成功").setResultList(userMessageList).setCode(ResourceEnumUtil.MESSAGE.SUCCESS.getCode());
}catch(ImppBusiException busExcep){ }catch(ImppBusiException busExcep){

@ -8,6 +8,7 @@ import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil;
import cn.estsh.i3plus.pojo.platform.bean.SysMessage; import cn.estsh.i3plus.pojo.platform.bean.SysMessage;
import cn.estsh.i3plus.pojo.platform.bean.SysRefUserMessage; import cn.estsh.i3plus.pojo.platform.bean.SysRefUserMessage;
import cn.estsh.i3plus.pojo.platform.bean.SysUser; import cn.estsh.i3plus.pojo.platform.bean.SysUser;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel; import com.rabbitmq.client.Channel;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
@ -18,6 +19,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import java.io.IOException; import java.io.IOException;
import java.util.List;
/** /**
* @Description : * @Description :
@ -42,7 +44,7 @@ public class LetterQueueReceiver {
* @param message * @param message
* rabbitTemplate.convertAndSend(I3CoreQueueConfig.IMPP_MESSAGE_QUEUE, new SysMessage(....)); * rabbitTemplate.convertAndSend(I3CoreQueueConfig.IMPP_MESSAGE_QUEUE, new SysMessage(....));
*/ */
// @RabbitListener(queues = I3CoreQueueConfig.IMPP_MESSAGE_LETTER_QUEUE) @RabbitListener(queues = I3CoreQueueConfig.IMPP_MESSAGE_LETTER_QUEUE)
public void processImppMessage(SysMessage msg, Channel channel, Message message) { public void processImppMessage(SysMessage msg, Channel channel, Message message) {
try { try {
LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功{}",msg); LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功{}",msg);
@ -55,6 +57,7 @@ public class LetterQueueReceiver {
SysRefUserMessage refUserMessage; SysRefUserMessage refUserMessage;
SysUser sysUser; SysUser sysUser;
List userMessage;
for (int i = 0; i < messageReceiver.length; i++) { for (int i = 0; i < messageReceiver.length; i++) {
sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i])); sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i]));
@ -65,14 +68,18 @@ public class LetterQueueReceiver {
refUserMessage.setMessageTitleRdd(msg.getMessageTitle()); refUserMessage.setMessageTitleRdd(msg.getMessageTitle());
refUserMessage.setMessageTypeRdd(msg.getMessageType()); refUserMessage.setMessageTypeRdd(msg.getMessageType());
refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd()); refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd());
refUserMessage.setReceiverId(sysUser.getId()); refUserMessage.setReceiverId(sysUser.getUserInfoId());
refUserMessage.setReceiverNameRdd(sysUser.getUserName()); refUserMessage.setReceiverNameRdd(sysUser.getUserName());
refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue()); refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
refUserMessage.setReceiverTime(TimeTool.getNowTime(true)); refUserMessage.setReceiverTime(TimeTool.getNowTime(true));
sysMessageService.insertSysRefUserMessage(refUserMessage); sysMessageService.insertSysRefUserMessage(refUserMessage);
MessageWebSocket.getWebSocketSet().get(sysUser.getId()).sendMessage(); userMessage = sysMessageService.findSysRefUserMessageByUserIdAndStatus(sysUser.getUserInfoId(),
ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
MessageWebSocket.sendMessage(sysUser.getUserInfoId(),
JSON.toJSONString(userMessage)
);
} }
msg.setMessageSenderNameRdd(StringUtils.join(receiverName, ",")); msg.setMessageSenderNameRdd(StringUtils.join(receiverName, ","));

@ -26,7 +26,7 @@ public class DemoWebSocket {
private static int onlineCount = 0; private static int onlineCount = 0;
//concurrent线程安全集合存放客户端websocket对象 //concurrent线程安全集合存放客户端websocket对象
private static CopyOnWriteArraySet<DemoWebSocket> webSocketSet = new CopyOnWriteArraySet<DemoWebSocket>(); private static CopyOnWriteArraySet<DemoWebSocket> webSocketMap = new CopyOnWriteArraySet<DemoWebSocket>();
//websocket会话 //websocket会话
private Session session; private Session session;
@ -34,7 +34,7 @@ public class DemoWebSocket {
@OnOpen @OnOpen
public void onOpen(@PathParam("userName")String userName, Session session){ public void onOpen(@PathParam("userName")String userName, Session session){
this.session = session; this.session = session;
webSocketSet.add(this); //加入set中 webSocketMap.add(this); //加入set中
addOnlineCount(); //在线数加1 addOnlineCount(); //在线数加1
LOGGER.info("{}加入!当前在线人数为{}",userName,getOnlineCount()); LOGGER.info("{}加入!当前在线人数为{}",userName,getOnlineCount());
try { try {
@ -49,7 +49,7 @@ public class DemoWebSocket {
*/ */
@OnClose @OnClose
public void onClose() { public void onClose() {
webSocketSet.remove(this); //从set中删除 webSocketMap.remove(this); //从set中删除
subOnlineCount(); //在线数减1 subOnlineCount(); //在线数减1
LOGGER.info("有一连接关闭!当前在线人数为" + getOnlineCount()); LOGGER.info("有一连接关闭!当前在线人数为" + getOnlineCount());
} }
@ -63,7 +63,7 @@ public class DemoWebSocket {
LOGGER.info("来自客户端的消息:" + message); LOGGER.info("来自客户端的消息:" + message);
//群发消息 //群发消息
for (DemoWebSocket item : webSocketSet) { for (DemoWebSocket item : webSocketMap) {
try { try {
item.sendMessage(message); item.sendMessage(message);
} catch (IOException e) { } catch (IOException e) {
@ -91,7 +91,7 @@ public class DemoWebSocket {
* *
* */ * */
public static void sendInfo(String message) throws IOException { public static void sendInfo(String message) throws IOException {
for (DemoWebSocket item : webSocketSet) { for (DemoWebSocket item : webSocketMap) {
try { try {
item.sendMessage(message); item.sendMessage(message);
} catch (IOException e) { } catch (IOException e) {

@ -1,18 +1,14 @@
package cn.estsh.i3plus.core.apiservice.websocket; package cn.estsh.i3plus.core.apiservice.websocket;
import cn.estsh.i3plus.core.api.iservice.busi.ISysMessageService; import cn.estsh.impp.framework.base.controller.CoreBaseController;
import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
import javax.websocket.*; import javax.websocket.*;
import javax.websocket.server.PathParam; import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import java.io.IOException; import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -23,38 +19,25 @@ import java.util.concurrent.ConcurrentMap;
* @CreateDate : 2018-11-24 16:57 * @CreateDate : 2018-11-24 16:57
* @Modify: * @Modify:
**/ **/
@ServerEndpoint(value="/message-websocket/{userId}") @ServerEndpoint(value= CoreBaseController.BASE_URL + "/message-websocket/{userId}")
@Component @Component
public class MessageWebSocket { public class MessageWebSocket {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageWebSocket.class); private static final Logger LOGGER = LoggerFactory.getLogger(MessageWebSocket.class);
private Long userId = 1L; private long userId = 1L;
//websocket会话
//在线连接数 private Session session; // 当前对象会话
private static int onlineCount = 0; private static int sendCount = 1;
//concurrent线程安全集合存放客户端websocket对象 //concurrent线程安全集合存放客户端websocket对象
private static ConcurrentMap<Long,MessageWebSocket> webSocketSet = new ConcurrentHashMap<Long,MessageWebSocket>(); private static ConcurrentMap<Long,MessageWebSocket> webSocketSet = new ConcurrentHashMap<Long,MessageWebSocket>();
//websocket会话
private Session session;
public static ConcurrentMap<Long, MessageWebSocket> getWebSocketSet() {
return webSocketSet;
}
@Autowired
private ISysMessageService sysMessageService;
@OnOpen @OnOpen
public void onOpen(@PathParam("userId")Long userId, Session session){ public void onOpen(@PathParam("userId")long userId, Session session){
this.session = session;
this.userId = userId; this.userId = userId;
this.session = session;
webSocketSet.put(userId,this); //加入set中 webSocketSet.put(userId,this); //在线人数添加
addOnlineCount(); //在线数加1
LOGGER.info("{}加入!当前在线人数为{}",userId,getOnlineCount()); LOGGER.info("{}加入!当前在线人数为{}",userId,getOnlineCount());
} }
@ -63,8 +46,7 @@ public class MessageWebSocket {
*/ */
@OnClose @OnClose
public void onClose() { public void onClose() {
webSocketSet.remove(this.userId); //从set中删除 subOnlineUser(this.userId);
subOnlineCount(); //在线数减1
LOGGER.info("有一连接关闭!当前在线人数为" + getOnlineCount()); LOGGER.info("有一连接关闭!当前在线人数为" + getOnlineCount());
} }
@ -73,8 +55,13 @@ public class MessageWebSocket {
* *
* @param message */ * @param message */
@OnMessage @OnMessage
public void onMessage(String message, Session session) { public void onMessage(@PathParam("userId")Long userId,String message) {
LOGGER.info("来自客户端的消息:" + message); // 心跳
if("heartBit".equals(message)){
this.sendMessage(userId,"heartBit");
}else{
LOGGER.info("来自客户端的消息:" , message);
}
} }
/** /**
@ -87,21 +74,28 @@ public class MessageWebSocket {
error.printStackTrace(); error.printStackTrace();
} }
public void sendMessage() throws IOException { /**
List userMessageList = sysMessageService.findSysRefUserMessageByUserIdAndStatus(this.userId, *
ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue()); * @param message
this.session.getBasicRemote().sendText(JSON.toJSONString(userMessageList)); * @throws IOException
*/
public static void sendMessage(Long userId, String message){
try {
MessageWebSocket websocket = webSocketSet.get(userId);
if (websocket != null){
websocket.session.getBasicRemote().sendText(message + "=" + sendCount);
sendCount++;
}
} catch (IOException e) {
e.printStackTrace();
}
} }
public static synchronized int getOnlineCount() { public static synchronized int getOnlineCount() {
return onlineCount; return webSocketSet.size();
}
public static synchronized void addOnlineCount() {
MessageWebSocket.onlineCount++;
} }
public static synchronized void subOnlineCount() { public synchronized void subOnlineUser(long userId) {
MessageWebSocket.onlineCount--; webSocketSet.remove(userId);
} }
} }

Loading…
Cancel
Save