java websocket客户端 服务端demo

it2025-07-22  11

代码是 Java WebSocket;编程 开发、部署和保护动态Web应用书上的代码,自己加上了ping pong保活,修改作者的字符串分段发送为字节分块发送 MessagezModesServer

必须加@Component 具体原因 参考ServerEndpointExporter

@ServerEndpoint(value = "/modes") @Component public class MessagezModesServer { public static final int MESSAGE_MAX = 15 * 1024 * 1024; @OnOpen public void open(Session session){ session.setMaxBinaryMessageBufferSize(MESSAGE_MAX); session.setMaxIdleTimeout(15000); reportMessage(session,"Connected !"+"\r\n"); } /* @OnMessage public void testFixMessage(byte[] bytes,Session session){ String report = "Processed partial text message of size" + bytes.length +"b..."+"\r\n"; reportMessage(session,report); }*/ @OnMessage( maxMessageSize = MESSAGE_MAX ) /** * 分块发送自定义的二进制数据,接收完统一处理 */ public void binaryMessage(byte[] bytes, boolean isLast, Session session) { String report = "Processed partial text message of size " + bytes.length + "b ..."; System.out.println("isLast" + isLast); Map<String, Object> userProperties = session.getUserProperties(); Object userData = userProperties.get("userData"); if (userData == null){ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes); userProperties.put("userData", byteBuffer); }else { ByteBuffer byteBuffer = (ByteBuffer)userData; ByteBuffer allocate = ByteBuffer.allocate(byteBuffer.array().length + bytes.length); allocate.put(byteBuffer.array()); allocate.put(bytes); userProperties.put("userData", allocate); } if (isLast) { report = report + "message complete." + "\r\n"; ByteBuffer byteBuffer = (ByteBuffer)userProperties.get("userData"); String s = new String(byteBuffer.array()); System.out.println(s); } reportMessage(session, report); } /** * 一般直接把ping消息 直接返回 * @param session * @param pm */ @OnMessage public void handlePong(Session session,PongMessage pm){ try { session.getBasicRemote().sendPong(pm.getApplicationData()); } catch (IOException e) { e.printStackTrace(); } } public void reportMessage(Session session,String message){ String format = DateFormat.getDateInstance().format(new Date()); try { session.getBasicRemote().sendText(format + " " + message); } catch (IOException e) { e.printStackTrace(); } } @OnClose public void close(Session session, CloseReason cr){ System.out.println(cr.getReasonPhrase()); } @OnError public void error(Session session,Throwable throwable){ System.out.println(throwable.getMessage()); } }

客户端代码 MessageModesClient

@ClientEndpoint public class MessageModesClient { private Session session; private MessageModesClientListener listener; private static class PartialMessageSendListener{ public void reportProcess(int counter){ System.out.println("当前是第 :"+counter +"块"); } } private static class MessageModesClientListener { public void setConnected(boolean isCocnected,CloseReason cr){ if (isCocnected){ System.out.println("连接已经打开"); }else { System.out.println("连接已经关闭 :" + cr.getReasonPhrase()); } } public void reportConnectionHealth(long millis){ System.out.println("ping pong 耗时毫秒数: "+millis); } public void reportMessage(String message){ System.out.println("收到相应 :"+message); } } private class BinaryDataIterator{ private byte[] data; private int chunk;//总数据块 private int pos = 0;//第几块 public BinaryDataIterator(byte[] data, int chunk) { this.data = data; this.chunk = chunk; } public boolean isLast(){ return pos == chunk; } public boolean hasNext(){ if (pos <= chunk-1 ){ return true; } return false; } public byte[] next(){ byte[] bytes = null; if (pos == chunk-1){ bytes = Arrays.copyOfRange(data, pos * PIECES_COUNT, data.length); pos++; }else { bytes = Arrays.copyOfRange(data, pos * PIECES_COUNT, (pos + 1) * PIECES_COUNT); pos++; } return bytes; } } static int PIECES_COUNT = 53; private int sendTimeout = 10; public MessageModesClient(MessageModesClientListener listener){ this.listener = listener; } public void setSendTimeout(int mills){ this.sendTimeout = mills; } @OnOpen public void open(Session session){ this.session = session; listener.setConnected(true,null); //TODO } @OnMessage public void handleMessage(String message){ listener.reportMessage(message); } @OnMessage public void handlePong(PongMessage pm){ String send = new String(pm.getApplicationData().array()); long senTime = Long.parseLong(send); long roundtripMills = System.currentTimeMillis() - senTime; listener.reportConnectionHealth(roundtripMills); } @OnClose public void close(Session session,CloseReason cr){ listener.setConnected(false,cr); } public void disconnect() throws IOException{ this.session.close(new CloseReason(CloseReason.CloseCodes.NORMAL_CLOSURE,"User close application")); } public void sendPing() throws IOException{ long now = System.currentTimeMillis(); byte[] data = ("" + now).getBytes(); session.getBasicRemote().sendPing(ByteBuffer.wrap(data)); } public Future<Void> sendAsyncFuture(byte[] data) throws IOException{ RemoteEndpoint.Async asyncRemote = session.getAsyncRemote(); asyncRemote.setSendTimeout(this.sendTimeout); ByteBuffer bb = ByteBuffer.wrap(data); return asyncRemote.sendBinary(bb); } public Future<Void> sendAsyncFuture(String data) throws IOException{ RemoteEndpoint.Async asyncRemote = session.getAsyncRemote(); asyncRemote.setSendTimeout(this.sendTimeout); return asyncRemote.sendBinary(ByteBuffer.wrap(data.getBytes())); } public void sendAsyncWithHandler(byte[] data,SendHandler sendHandler) throws IOException{ RemoteEndpoint.Async asyncRemote = session.getAsyncRemote(); asyncRemote.setSendTimeout(this.sendTimeout); asyncRemote.sendBinary(ByteBuffer.wrap(data),sendHandler); } public void sendAsyncStringWithHandler(String data,SendHandler sendHandler) throws IOException{ RemoteEndpoint.Async asyncRemote = session.getAsyncRemote(); asyncRemote.setSendTimeout(this.sendTimeout); asyncRemote.sendBinary(ByteBuffer.wrap(data.getBytes()),sendHandler); } public void sendSynchron(byte[] data) throws IOException{ RemoteEndpoint.Basic basicRemote = session.getBasicRemote(); basicRemote.sendBinary(ByteBuffer.wrap(data),true); } public void sendInPieces(byte[] message,PartialMessageSendListener partialMessageSendListener){ RemoteEndpoint.Basic basicRemote = session.getBasicRemote(); byte[] messageBytes = message; int chunkSize = (int)Math.ceil((double) messageBytes.length / PIECES_COUNT); BinaryDataIterator binaryDataIterator = new BinaryDataIterator(messageBytes,chunkSize); boolean isLast = false; int count = 1; while ((isLast = binaryDataIterator.hasNext())){ byte[] next = binaryDataIterator.next(); System.out.println("isLast "+binaryDataIterator.isLast() + next[0]); try { session.getBasicRemote().sendBinary(ByteBuffer.wrap(next),binaryDataIterator.isLast()); } catch (IOException e) { e.printStackTrace(); } partialMessageSendListener.reportProcess(count); count++; } } public static void main(String[] args) throws URISyntaxException, IOException, DeploymentException { WebSocketContainer webSocketContainer = ContainerProvider.getWebSocketContainer(); MessageModesClient messageModesClient = new MessageModesClient(new MessageModesClientListener()); webSocketContainer.connectToServer(messageModesClient, new URI("ws://127.0.0.1:22598/modes")); String data = "曹操以汉献帝刘协名义征讨四方 aaa !@#¥%*()"; PartialMessageSendListener partialMessageSendListener = new PartialMessageSendListener(); //messageModesClient.sendInPieces(data.getBytes(),partialMessageSendListener); //messageModesClient.sendSynchron(data.getBytes()); /*messageModesClient.sendAsyncStringWithHandler(data, new SendHandler() { @Override public void onResult(SendResult result) { if (result.isOK()){ System.out.println("发送成功"); }else { System.out.println(result.getException()); } } });*/ Future<Void> voidFuture = messageModesClient.sendAsyncFuture(data); if (voidFuture.isDone()){ System.out.println("发送成功"); } Thread thread = new Thread(){ @Override public void run() { while (true){ try { Thread.sleep(5000); } catch (InterruptedException e) { e.printStackTrace(); } try { messageModesClient.sendPing(); } catch (IOException e) { e.printStackTrace(); } } } }; thread.setDaemon(false); thread.start(); } }
最新回复(0)