博主是基于消息公共组件做的,其中包含了站内消息,以及邮件、短信等,使用ActiveMQ做了消息解耦,webwocket实现的站内消息模块。
Websocket是h5提供的一种协议,全双工模式的,采用TCP协议进行通信,只要一次握手,就能持续交互。直接上代码
第一步先构建websocket服务器及其相应的配置
package com.chinacreator.c2.websocket; import com.alibaba.fastjson.JSON; import com.alibaba.fastjson.JSONObject; import com.chinacreator.c2.common.CustomSEnum; import com.chinacreator.c2.common.utils.RedisTools; import org.apache.commons.lang.StringUtils; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.data.redis.core.StringRedisTemplate; import org.springframework.stereotype.Component; import javax.websocket.*; import javax.websocket.server.PathParam; import javax.websocket.server.ServerEndpoint; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.stream.Collectors; @Component @ServerEndpoint("/websocketServer/{type}/{username}/{randomId}") public class WebSocketServer { /** * 以用户的姓名为key,WebSocket为对象保存起来 */ private static Map<String, WebSocketServer> clients = new ConcurrentHashMap<>(); /** * 会话 */ private Session session; /** * 业务类型 */ private String type; /** * 用户名 */ private String username; /** * 随机数 */ private String randomId; @Autowired private StringRedisTemplate stringRedisTemplate; @OnOpen public void onOpen(@PathParam("type") String type,@PathParam("username") String username,@PathParam("randomId") String randomId, Session session) { System.out.println("打开连接"); String key = type + "-" + username + "-" + randomId; this.session = session; this.type = type; this.username = username; this.randomId = randomId; clients.put(key,this); } @OnError public void onError(Session session, Throwable error) { System.out.print("用户错误:"+this.username+",错误信息:"+error.getMessage()); } @OnClose public void onClose() { String key = type + "-" + username + "-" + randomId; clients.remove(key); } @OnMessage public void onMessage(String message, Session session) { System.out.println("用户消息:"+username+",报文:"+message); JSONObject jsonObject = JSON.parseObject(message); String t = (String) jsonObject.get("type"); String receiver = (String) jsonObject.get("receiver"); String msg = (String) jsonObject.get("msg"); String filters = t + "-" + receiver; Map<String, WebSocketServer> client = parseMapForFilter(clients,filters); for(WebSocketServer webSocketServer : client.values()){ webSocketServer.session.getAsyncRemote().sendText(msg); } } /** * 从map中查询想要的map项,根据key */ public static Map<String, WebSocketServer> parseMapForFilter(Map<String, WebSocketServer> map,String filters) { if (map == null) { return null; } else { map = map.entrySet().stream() .filter((e) -> checkKey(e.getKey(),filters)) .collect(Collectors.toMap( (e) -> (String) e.getKey(), (e) -> e.getValue() )); } return map; } /** * 通过indexof匹配想要查询的字符 */ private static boolean checkKey(String key,String filters) { if (key.indexOf(filters) > -1) { return true; } else { return false; } } /** * 发送自定义消息 * @param message * @param username * @throws IOException */ public static void sendInfo(String message,@PathParam("username") String username) throws IOException { System.out.println("发送消息到:"+username+",报文:"+message); JSONObject jsonObject = JSON.parseObject(message); String t = (String) jsonObject.get("type"); String receiver = (String) jsonObject.get("receiver"); String msg = (String) jsonObject.get("msg"); String filters = t + "-" + receiver; Map<String, WebSocketServer> client = parseMapForFilter(clients,filters); for(WebSocketServer webSocketServer : client.values()){ webSocketServer.session.getAsyncRemote().sendText(msg); } } } package com.chinacreator.c2.websocket; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter; /*** * 用于注册ServerEndpoint */ @Configuration public class WebSocketConfig { @Bean public ServerEndpointExporter serverEndpointExporter() { return new ServerEndpointExporter(); } }
接着是做一个客户端
var socketUrl="http://localhost:8081/websocketServer/1/123456/www"; socketUrl=socketUrl.replace("https","ws").replace("http","ws"); console.log(socketUrl); if(socket!=null){ socket.close(); socket=null; } socket = new WebSocket(socketUrl); //打开事件 socket.onopen = function() { console.log("websocket已打开"); //socket.send("这是来自客户端的消息" + location.href + new Date()); }; //获得消息事件 socket.onmessage = function(msg) { console.log(msg.data); //发现消息进入 开始处理前端触发逻辑 }; //关闭事件 socket.onclose = function() { console.log("websocket已关闭"); }; //发生了错误事件 socket.onerror = function() { console.log("websocket发生了错误"); }
服务端发送消息至客户端
JSONObject paramObj1 = new JSONObject(); paramObj1.put("type","1"); paramObj1.put("receiver","123456"); paramObj1.put("msg","你好呢,我是服务端"); WebSocketServer.sendInfo(JSONObject.toJSONString(paramObj1),"123456");成功接收到消息