yun-zuoyi
wei.peng 6 years ago
commit c60e404776

@ -1,6 +1,7 @@
package cn.estsh.i3plus.core.apiservice.websocket; package cn.estsh.i3plus.core.apiservice.websocket;
import cn.estsh.i3plus.platform.common.util.PlatformConstWords; import cn.estsh.i3plus.platform.common.util.PlatformConstWords;
import io.netty.util.internal.ConcurrentSet;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component; import org.springframework.stereotype.Component;
@ -8,7 +9,12 @@ import org.springframework.stereotype.Component;
import javax.websocket.*; import javax.websocket.*;
import javax.websocket.server.PathParam; import javax.websocket.server.PathParam;
import javax.websocket.server.ServerEndpoint; import javax.websocket.server.ServerEndpoint;
import java.io.EOFException;
import java.io.IOException; import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
@ -19,25 +25,38 @@ import java.util.concurrent.ConcurrentMap;
* @CreateDate : 2018-11-24 16:57 * @CreateDate : 2018-11-24 16:57
* @Modify: * @Modify:
**/ **/
@ServerEndpoint(value= PlatformConstWords.WEBSOCKET_URL + "/message-websocket/{userId}") @ServerEndpoint(value = PlatformConstWords.WEBSOCKET_URL + "/message-websocket/{userId}/{userLoginSid}")
@Component @Component
public class MessageWebSocket { public class MessageWebSocket {
private static final Logger LOGGER = LoggerFactory.getLogger(MessageWebSocket.class); private static final Logger LOGGER = LoggerFactory.getLogger(MessageWebSocket.class);
private long userId = 1L; private long userId;
private String userLoginSid;
//websocket会话 //websocket会话
private Session session; // 当前对象会话 private Session session; // 当前对象会话
private static int sendCount = 1; private static int sendCount = 1;
//concurrent线程安全集合存放客户端websocket对象
private static ConcurrentMap<Long,MessageWebSocket> webSocketSet = new ConcurrentHashMap<Long,MessageWebSocket>(); // 用户会话消息
private static ConcurrentMap<Long, ConcurrentSet<String>> userSessionMap = new ConcurrentHashMap<>();
// concurrent线程安全集合存放客户端websocket对象
private static ConcurrentMap<String,MessageWebSocket> webSocketMap = new ConcurrentHashMap<>();
@OnOpen @OnOpen
public void onOpen(@PathParam("userId")long userId, Session session){ public void onOpen(@PathParam("userId")long userId,@PathParam("userLoginSid")String userLoginSid, Session session){
this.userId = userId; this.userId = userId;
this.userLoginSid = userLoginSid;
this.session = session; this.session = session;
webSocketSet.put(userId,this); //在线人数添加 ConcurrentSet<String> sidSet = userSessionMap.get(userId);
if(sidSet == null){
sidSet = new ConcurrentSet<>();
}
sidSet.add(userLoginSid);
userSessionMap.put(userId, sidSet);
webSocketMap.put(userLoginSid, this); //在线人数添加
LOGGER.info("{}加入!当前在线人数为{}",userId,getOnlineCount()); LOGGER.info("{}加入!当前在线人数为{}",userId,getOnlineCount());
} }
@ -46,7 +65,7 @@ public class MessageWebSocket {
*/ */
@OnClose @OnClose
public void onClose() { public void onClose() {
subOnlineUser(this.userId); subOnlineUser(this.userId, this.userLoginSid);
LOGGER.info("有一连接关闭!当前在线人数为" + getOnlineCount()); LOGGER.info("有一连接关闭!当前在线人数为" + getOnlineCount());
} }
@ -55,10 +74,10 @@ public class MessageWebSocket {
* *
* @param message */ * @param message */
@OnMessage @OnMessage
public void onMessage(@PathParam("userId")Long userId,String message) { public void onMessage(@PathParam("userLoginSid")String userLoginSid,String message) {
// 心跳 // 心跳
if("heartBit".equals(message)){ if("heartBit".equals(message)){
this.sendMessage(userId,"heartBit"); this.sendMessage(userLoginSid,"heartBit");
}else{ }else{
LOGGER.info("来自客户端的消息:" , message); LOGGER.info("来自客户端的消息:" , message);
} }
@ -70,25 +89,26 @@ public class MessageWebSocket {
*/ */
@OnError @OnError
public void onError(Session session, Throwable error) { public void onError(Session session, Throwable error) {
LOGGER.info("发生错误"); if(error.getClass().equals(EOFException.class)){
error.printStackTrace(); LOGGER.error("WebSocket连接已断开");
}else {
LOGGER.error("发生错误",error.toString());
error.printStackTrace();
}
} }
/** /**
* * id
* @param message * @param userId id
* @throws IOException * @param message
*/ */
public static void sendMessage(Long userId, String message){ public static void sendMessage(Long userId, String message){
try { try {
MessageWebSocket websocket = webSocketSet.get(userId); ConcurrentSet<String> websocket = userSessionMap.get(userId);
synchronized (websocket){ if (websocket != null && websocket.size() != 0) {
if(websocket != null) { for (String sid : websocket) {
if (message.equals("heartBit")) { if(webSocketMap.get(sid).session.isOpen()){
websocket.session.getBasicRemote().sendText(message + "=" + sendCount); webSocketMap.get(sid).session.getBasicRemote().sendText(message);
sendCount++;
} else {
websocket.session.getBasicRemote().sendText(message);
} }
} }
} }
@ -97,11 +117,39 @@ public class MessageWebSocket {
} }
} }
/**
* id
* @param userLoginSid id
* @param message
*/
public static void sendMessage(String userLoginSid, String message){
try {
MessageWebSocket websocket = webSocketMap.get(userLoginSid);
if (websocket != null && websocket.session.isOpen()) {
if (message.equals("heartBit")) {
websocket.session.getBasicRemote().sendText(message + "=" + sendCount);
sendCount++;
} else {
websocket.session.getBasicRemote().sendText(message);
}
}
} catch (IOException e) {
e.printStackTrace();
}
}
public static synchronized int getOnlineCount() { public static synchronized int getOnlineCount() {
return webSocketSet.size(); return userSessionMap.size();
} }
public synchronized void subOnlineUser(long userId) { public synchronized void subOnlineUser(long userId, String userLoginSid) {
webSocketSet.remove(userId); webSocketMap.remove(userId);
ConcurrentSet sidSet = userSessionMap.get(userId);
if(sidSet != null){
sidSet.remove(userLoginSid);
if(sidSet.isEmpty()){
userSessionMap.remove(userId);
}
}
} }
} }

Loading…
Cancel
Save