定时任务

websocket
yun-zuoyi
yunhao.wang 7 years ago
parent dbcaf5221e
commit 3a46256ff0

@ -221,7 +221,7 @@ public class SysConfigController extends CoreBaseController {
mailUtil.send();
return ResultBean.success("测试邮件已发送").setCode(ResourceEnumUtil.MESSAGE.SUCCESS.getCode());
}catch(ImppBusiException busExcep){
return ResultBean.fail(busExcep.getErrorShow());
return ResultBean.fail(busExcep);
}catch(Exception e){
return ImppExceptionBuilder.newInstance().buildExceptionResult(e);
}

@ -214,7 +214,7 @@ public class SysMessageController extends CoreBaseController {
@ApiOperation(value = "查看用户未读站内信")
public ResultBean findUnreadUserMessage(){
try {
List userMessageList = sysMessageService.findSysRefUserMessageByUserIdAndStatus(getSessionUser().getUser().getId(),
List userMessageList = sysMessageService.findSysRefUserMessageByUserIdAndStatus(getSessionUser().getUserInfo().getId(),
ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
return ResultBean.success("查询成功").setResultList(userMessageList).setCode(ResourceEnumUtil.MESSAGE.SUCCESS.getCode());
}catch(ImppBusiException busExcep){

@ -8,6 +8,7 @@ import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil;
import cn.estsh.i3plus.pojo.platform.bean.SysMessage;
import cn.estsh.i3plus.pojo.platform.bean.SysRefUserMessage;
import cn.estsh.i3plus.pojo.platform.bean.SysUser;
import com.alibaba.fastjson.JSON;
import com.rabbitmq.client.Channel;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
@ -18,6 +19,7 @@ import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.io.IOException;
import java.util.List;
/**
* @Description :
@ -42,7 +44,7 @@ public class LetterQueueReceiver {
* @param message
* rabbitTemplate.convertAndSend(I3CoreQueueConfig.IMPP_MESSAGE_QUEUE, new SysMessage(....));
*/
// @RabbitListener(queues = I3CoreQueueConfig.IMPP_MESSAGE_LETTER_QUEUE)
@RabbitListener(queues = I3CoreQueueConfig.IMPP_MESSAGE_LETTER_QUEUE)
public void processImppMessage(SysMessage msg, Channel channel, Message message) {
try {
LOGGER.info("【MQ-IMPP_MESSAGE_LETTER_QUEUE】数据接收成功{}",msg);
@ -55,6 +57,7 @@ public class LetterQueueReceiver {
SysRefUserMessage refUserMessage;
SysUser sysUser;
List userMessage;
for (int i = 0; i < messageReceiver.length; i++) {
sysUser = sysUserService.getSysUserById(Long.parseLong(messageReceiver[i]));
@ -65,14 +68,18 @@ public class LetterQueueReceiver {
refUserMessage.setMessageTitleRdd(msg.getMessageTitle());
refUserMessage.setMessageTypeRdd(msg.getMessageType());
refUserMessage.setMessageSenderNameRdd(msg.getMessageSenderNameRdd());
refUserMessage.setReceiverId(sysUser.getId());
refUserMessage.setReceiverId(sysUser.getUserInfoId());
refUserMessage.setReceiverNameRdd(sysUser.getUserName());
refUserMessage.setMessageStatus(ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
refUserMessage.setReceiverTime(TimeTool.getNowTime(true));
sysMessageService.insertSysRefUserMessage(refUserMessage);
MessageWebSocket.getWebSocketSet().get(sysUser.getId()).sendMessage();
userMessage = sysMessageService.findSysRefUserMessageByUserIdAndStatus(sysUser.getUserInfoId(),
ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
MessageWebSocket.sendMessage(sysUser.getUserInfoId(),
JSON.toJSONString(userMessage)
);
}
msg.setMessageSenderNameRdd(StringUtils.join(receiverName, ","));

@ -26,7 +26,7 @@ public class DemoWebSocket {
private static int onlineCount = 0;
//concurrent线程安全集合存放客户端websocket对象
private static CopyOnWriteArraySet<DemoWebSocket> webSocketSet = new CopyOnWriteArraySet<DemoWebSocket>();
private static CopyOnWriteArraySet<DemoWebSocket> webSocketMap = new CopyOnWriteArraySet<DemoWebSocket>();
//websocket会话
private Session session;
@ -34,7 +34,7 @@ public class DemoWebSocket {
@OnOpen
public void onOpen(@PathParam("userName")String userName, Session session){
this.session = session;
webSocketSet.add(this); //加入set中
webSocketMap.add(this); //加入set中
addOnlineCount(); //在线数加1
LOGGER.info("{}加入!当前在线人数为{}",userName,getOnlineCount());
try {
@ -49,7 +49,7 @@ public class DemoWebSocket {
*/
@OnClose
public void onClose() {
webSocketSet.remove(this); //从set中删除
webSocketMap.remove(this); //从set中删除
subOnlineCount(); //在线数减1
LOGGER.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
@ -63,7 +63,7 @@ public class DemoWebSocket {
LOGGER.info("来自客户端的消息:" + message);
//群发消息
for (DemoWebSocket item : webSocketSet) {
for (DemoWebSocket item : webSocketMap) {
try {
item.sendMessage(message);
} catch (IOException e) {
@ -91,7 +91,7 @@ public class DemoWebSocket {
*
* */
public static void sendInfo(String message) throws IOException {
for (DemoWebSocket item : webSocketSet) {
for (DemoWebSocket item : webSocketMap) {
try {
item.sendMessage(message);
} catch (IOException e) {

@ -1,18 +1,14 @@
package cn.estsh.i3plus.core.apiservice.websocket;
import cn.estsh.i3plus.core.api.iservice.busi.ISysMessageService;
import cn.estsh.i3plus.pojo.base.enumutil.ImppEnumUtil;
import com.alibaba.fastjson.JSON;
import cn.estsh.impp.framework.base.controller.CoreBaseController;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.websocket.*;
import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@ -23,38 +19,25 @@ import java.util.concurrent.ConcurrentMap;
* @CreateDate : 2018-11-24 16:57
* @Modify:
**/
@ServerEndpoint(value="/message-websocket/{userId}")
@ServerEndpoint(value= CoreBaseController.BASE_URL + "/message-websocket/{userId}")
@Component
public class MessageWebSocket {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageWebSocket.class);
private Long userId = 1L;
//在线连接数
private static int onlineCount = 0;
private long userId = 1L;
//websocket会话
private Session session; // 当前对象会话
private static int sendCount = 1;
//concurrent线程安全集合存放客户端websocket对象
private static ConcurrentMap<Long,MessageWebSocket> webSocketSet = new ConcurrentHashMap<Long,MessageWebSocket>();
//websocket会话
private Session session;
public static ConcurrentMap<Long, MessageWebSocket> getWebSocketSet() {
return webSocketSet;
}
@Autowired
private ISysMessageService sysMessageService;
@OnOpen
public void onOpen(@PathParam("userId")Long userId, Session session){
this.session = session;
public void onOpen(@PathParam("userId")long userId, Session session){
this.userId = userId;
this.session = session;
webSocketSet.put(userId,this); //加入set中
addOnlineCount(); //在线数加1
webSocketSet.put(userId,this); //在线人数添加
LOGGER.info("{}加入!当前在线人数为{}",userId,getOnlineCount());
}
@ -63,8 +46,7 @@ public class MessageWebSocket {
*/
@OnClose
public void onClose() {
webSocketSet.remove(this.userId); //从set中删除
subOnlineCount(); //在线数减1
subOnlineUser(this.userId);
LOGGER.info("有一连接关闭!当前在线人数为" + getOnlineCount());
}
@ -73,8 +55,13 @@ public class MessageWebSocket {
*
* @param message */
@OnMessage
public void onMessage(String message, Session session) {
LOGGER.info("来自客户端的消息:" + message);
public void onMessage(@PathParam("userId")Long userId,String message) {
// 心跳
if("heartBit".equals(message)){
this.sendMessage(userId,"heartBit");
}else{
LOGGER.info("来自客户端的消息:" , message);
}
}
/**
@ -87,21 +74,28 @@ public class MessageWebSocket {
error.printStackTrace();
}
public void sendMessage() throws IOException {
List userMessageList = sysMessageService.findSysRefUserMessageByUserIdAndStatus(this.userId,
ImppEnumUtil.MESSAGE_STATUS.UNREAD.getValue());
this.session.getBasicRemote().sendText(JSON.toJSONString(userMessageList));
/**
*
* @param message
* @throws IOException
*/
public static void sendMessage(Long userId, String message){
try {
MessageWebSocket websocket = webSocketSet.get(userId);
if (websocket != null){
websocket.session.getBasicRemote().sendText(message + "=" + sendCount);
sendCount++;
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static synchronized int getOnlineCount() {
return onlineCount;
}
public static synchronized void addOnlineCount() {
MessageWebSocket.onlineCount++;
return webSocketSet.size();
}
public static synchronized void subOnlineCount() {
MessageWebSocket.onlineCount--;
public synchronized void subOnlineUser(long userId) {
webSocketSet.remove(userId);
}
}

Loading…
Cancel
Save