使用开发工具为Android Studio
build.gradle文件的dependencies标签下添加Netty引用
dependencies { api 'io.netty:netty-all:5.0.0.Alpha2' }实现Netty网络访问模块与外部交互 ①定义一个交互类型类MessageType
public class MessageType { /** * 接收到数据 * */ public static final int RECEIVE_DATA = 1; /** * 服务端异常 * */ public static final int SERVER_EXCEPTION = 100; /** * 服务启动成功 * */ public static final int SERVER_START_SUCCESS = 101; /** * 服务启动失败 * */ public static final int SERVER_START_FAILED = 102; /** * 服务端被客户端连接成功 * */ public static final int SERVER_CONNECT_SUCCESS = 103; /** * 服务端断开连接成功 * */ public static final int SERVER_DISCONNECT_SUCCESS = 105; /** * 服务端关闭成功 * */ public static final int SERVER_CLOSE_SUCCESS = 106; /** * 客户端异常 * */ public static final int CLIENT_EXCEPTION = 200; /** * 客户端连接服务成功 * */ public static final int CLIENT_CONNECT_SUCCESS = 203; /** * 客户端连接断服务失败 * */ public static final int CLIENT_CONNECT_FAILED = 204; /** * 客户端断开连接成功 * */ public static final int CLIENT_DISCONNECT_SUCCESS = 205; /** * 客户端关闭成功 * */ public static final int CLIENT_CLOSE_SUCCESS = 206; }②定义一个消息处理类 MessageHandler
public class MessageHandler { private Handler handler; private static MessageHandler instance = new MessageHandler(); public static MessageHandler getInstance() { return instance; } public void setHandler(Handler handler) { this.handler = handler; } public void sendMessage(int code, Object data) { if (handler != null) { Message msg = new Message(); msg.what = code; msg.obj = data; handler.sendMessage(msg); } } }①定义一个心跳数据传入接口HeartBeatListener
public interface HeartBeatListener { byte[] getHeartBeat(); }②实现数据收发处理和心跳处理DataHandlerAdapter
/** * 数据收发处理类 * <p>接收处理</p> * <p>发送处理</p> * <p>心跳发送</p> * @author liangc * */ //因为客户端和服务端都有使用,必须添加此注释实现共享 @ChannelHandler.Sharable public class DataHandlerAdapter extends ChannelHandlerAdapter { private static final String TAG = "DataHandlerAdapter"; public enum ConnectType { SERVER, CLIENT, } private ConnectType type; private ChannelHandlerContext channelHandlerContext; private HeartBeatListener listener; DataHandlerAdapter(ConnectType type) { this.type = type; } @Override public void channelActive(ChannelHandlerContext ctx) { this.channelHandlerContext = ctx; //连接成功 InetSocketAddress socketAddress = (InetSocketAddress)ctx.channel().remoteAddress(); String connectAddress = socketAddress.getAddress().getHostAddress(); if (type == ConnectType.SERVER) { MessageHandler.getInstance().sendMessage(MessageType.SERVER_CONNECT_SUCCESS, connectAddress); } Log.w(TAG, "连接成功:" + connectAddress); } @Override public void channelInactive(ChannelHandlerContext ctx) { this.channelHandlerContext = ctx; Log.w(TAG, "连接断开"); if (type == ConnectType.CLIENT) { MessageHandler.getInstance().sendMessage(MessageType.CLIENT_DISCONNECT_SUCCESS, "连接断开"); } else { MessageHandler.getInstance().sendMessage(MessageType.SERVER_DISCONNECT_SUCCESS, "连接断开"); } } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) { this.channelHandlerContext = ctx; //接收数据 Log.w(TAG, "收到数据"); //取出数据 ByteBuf byteBuf = (ByteBuf)msg; byte[] recvData = new byte[byteBuf.readableBytes()]; byteBuf.readBytes(recvData); byteBuf.clear(); MessageHandler.getInstance().sendMessage(MessageType.RECEIVE_DATA, recvData); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) { this.channelHandlerContext = ctx; //发送数据 ctx.write(msg); } @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { this.channelHandlerContext = ctx; if (evt instanceof IdleStateEvent) { IdleState state = ((IdleStateEvent)evt).state(); if (state == IdleState.ALL_IDLE) { //发送心跳 if (listener != null) { sendData(listener.getHeartBeat()); } } } else { super.userEventTriggered(ctx, evt); } } /** * 心跳数据 * */ void addHeartBeatListener(HeartBeatListener listener) { this.listener = listener; } boolean sendData(byte[] data) { ByteBuf byteBuf = channelHandlerContext.alloc().buffer(); byteBuf.writeBytes(data); ChannelFuture future = channelHandlerContext.channel().write(byteBuf); channelHandlerContext.flush(); return future.isSuccess(); } }线程处理类 ChannelInitClient 注意:客户端为 Channel,服务端为 SocketChannel
/** * 客户端数据收发线程 * @author liangc * */ public class ChannelInitClient extends ChannelInitializer<Channel> { private DataHandlerAdapter adapter; ChannelInitClient(DataHandlerAdapter adapter) { this.adapter = adapter; } @Override protected void initChannel(Channel ch) { try { ChannelPipeline channelPipeline = ch.pipeline(); //添加心跳机制,例:每3000ms发送一次心跳 channelPipeline.addLast(new IdleStateHandler(3000, 3000, 3000, TimeUnit.MILLISECONDS)); //添加数据处理(接收、发送、心跳) channelPipeline.addLast(adapter); } catch (Exception e) { e.printStackTrace(); } } }客户端实现类 NettyClient
/** * Netty客户端 * @author liangc * */ public class NettyClient { private static final String TAG = "NettyClient"; /** * 网络连接 * */ private Channel channel; /** * 连接地址 * */ private String address; /** * 监听端口 * */ private int port; private DataHandlerAdapter dataHandlerAdapter; public NettyClient(String address, int port) { this.address = address; this.port = port; dataHandlerAdapter = new DataHandlerAdapter(DataHandlerAdapter.ConnectType.CLIENT); } /** * 启动客户端 * */ public void start() { Executors.newSingleThreadScheduledExecutor().submit(new Runnable() { @Override public void run() { Log.w(TAG, "启动客户端"); EventLoopGroup group = new NioEventLoopGroup(); try { ChannelInitClient channelInit = new ChannelInitClient(dataHandlerAdapter); Bootstrap bootstrap = new Bootstrap(); bootstrap.group(group) .channel(NioSocketChannel.class) .remoteAddress(new InetSocketAddress(address, port)) .handler(channelInit) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true); ChannelFuture channelFuture = bootstrap.connect().sync(); channel = channelFuture.channel(); channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) { if (future.isSuccess()) { //绑定成功 Log.w(TAG, "客户端连接成功"); MessageHandler.getInstance().sendMessage(MessageType.CLIENT_CONNECT_SUCCESS, "客户端连接成功"); } else { //绑定失败 Log.w(TAG, "客户端连接失败"); MessageHandler.getInstance().sendMessage(MessageType.CLIENT_CONNECT_FAILED, "客户端连接失败"); } } }); channel.closeFuture().sync(); Log.w(TAG, "客户端关闭成功"); MessageHandler.getInstance().sendMessage(MessageType.CLIENT_CLOSE_SUCCESS, "客户端关闭成功"); } catch (Exception e) { e.printStackTrace(); MessageHandler.getInstance().sendMessage(MessageType.CLIENT_EXCEPTION, "客户端异常:" + e.getMessage()); } finally { try { group.shutdownGracefully().sync(); } catch (InterruptedException e) { e.printStackTrace(); MessageHandler.getInstance().sendMessage(MessageType.CLIENT_EXCEPTION, "客户端异常2:" + e.getMessage()); } } } }); } public void addHeartBeat(HeartBeatListener listener) { if (dataHandlerAdapter != null) { dataHandlerAdapter.addHeartBeatListener(listener); } } public void setHandler(Handler handler) { MessageHandler.getInstance().setHandler(handler); } public boolean sentData(byte[] data) { dataHandlerAdapter.sendData(data); } public void stop() { Executors.newSingleThreadScheduledExecutor().submit(new Runnable() { @Override public void run() { if (channel != null) { channel.close(); channel = null; } } }); } }线程处理类 ChannelInitClient 注意:客户端为 Channel,服务端为 SocketChannel
/** * 服务端数据收发线程 * @author liangc * */ public class ChannelInitServer extends ChannelInitializer<SocketChannel> { private DataHandlerAdapter adapter; ChannelInitServer(DataHandlerAdapter adapter) { this.adapter = adapter; } @Override protected void initChannel(SocketChannel ch) { try { ChannelPipeline channelPipeline = ch.pipeline(); //添加心跳机制,例:每3000ms发送一次心跳 channelPipeline.addLast(new IdleStateHandler(3000, 3000, 3000, TimeUnit.MILLISECONDS)); //添加数据处理(接收、发送、心跳) channelPipeline.addLast(adapter); } catch (Exception e) { e.printStackTrace(); } } }服务端实现类 NettyServer
/** * Netty服务端 * @author liangc * */ public class NettyServer { private static final String TAG = "NettyServer"; /** * 网络连接 * */ private Channel channel; /** * 监听端口 * */ private int port; private DataHandlerAdapter dataHandlerAdapter; public NettyServer(int port) { this.port = port; dataHandlerAdapter = new DataHandlerAdapter(DataHandlerAdapter.ConnectType.SERVER); } /** * 启动客户端 * */ public void start() { Executors.newSingleThreadScheduledExecutor().submit(new Runnable() { @Override public void run() { Log.w(TAG, "服务启动"); EventLoopGroup bossGroup = new NioEventLoopGroup(); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ChannelInitServer channelInit = new ChannelInitServer(dataHandlerAdapter); ServerBootstrap serverBootstrap = new ServerBootstrap(); serverBootstrap.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(channelInit) .option(ChannelOption.SO_BACKLOG, 128) .option(ChannelOption.TCP_NODELAY, true) .option(ChannelOption.SO_KEEPALIVE, true); ChannelFuture channelFuture = serverBootstrap.bind(port).sync(); channel = channelFuture.channel(); channelFuture.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) { if (future.isSuccess()) { //服务启动成功 Log.w(TAG, "服务启动成功"); MessageHandler.getInstance().sendMessage(MessageType.SERVER_START_SUCCESS, "服务启动成功"); } else { //服务启动失败 Log.w(TAG, "服务启动失败"); MessageHandler.getInstance().sendMessage(MessageType.SERVER_START_FAILED, "服务启动失败"); } } }); channel.closeFuture().sync(); Log.w(TAG, "服务关闭成功"); MessageHandler.getInstance().sendMessage(MessageType.SERVER_CLOSE_SUCCESS, "服务关闭成功"); } catch (Exception e) { e.printStackTrace(); MessageHandler.getInstance().sendMessage(MessageType.SERVER_EXCEPTION, "服务异常:" + e.getMessage()); } finally { try { workerGroup.shutdownGracefully().sync(); bossGroup.shutdownGracefully().sync(); } catch (InterruptedException e) { e.printStackTrace(); MessageHandler.getInstance().sendMessage(MessageType.SERVER_EXCEPTION, "服务异常2:" + e.getMessage()); } } } }); } public void addHeartBeat(HeartBeatListener listener) { if (dataHandlerAdapter != null) { dataHandlerAdapter.addHeartBeatListener(listener); } } public void setHandler(Handler handler) { MessageHandler.getInstance().setHandler(handler); } public boolean sentData(byte[] data) { return dataHandlerAdapter.sendData(data); } public void stop() { Executors.newSingleThreadScheduledExecutor().submit(new Runnable() { @Override public void run() { if (channel != null) { channel.close(); channel = null; } } }); } }在AndroidManifest.xml文件中添加网络访问权限
<uses-permission android:name="android.permission.INTERNET" />在Activity文件中调用
/** * Netty测试 * @author liangc * */ public class MainActivity extends AppCompatActivity { private boolean isTestServer = false; private boolean isTestClient = false; private NettyClient client; private NettyServer server; private TextView tvResult; private String result = ""; @Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); tvResult = findViewById(R.id.tvTestResult); findViewById(R.id.btnTestServer).setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { if (!isTestServer) { result = ""; testNettyServer(); } else { stopNettyServer(); } isTestServer = !isTestServer; } }); findViewById(R.id.btnTestClient).setOnClickListener(new View.OnClickListener() { @Override public void onClick(View view) { if (!isTestClient) { result = ""; testNettyClient(); } else { stopNettyClient(); } isTestClient = !isTestClient; } }); } private void testNettyClient() { client = new NettyClient("10.1.4.104", 6800); client.addHeartBeat(new HeartBeatListener() { @Override public byte[] getHeartBeat() { String data = "心跳"; try { client.sentData("测试数据".getBytes("GBK")); return data.getBytes("GBK"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } }); client.setHandler(handler); client.start(); } private void stopNettyClient() { if (client != null) { client.stop(); } } private void testNettyServer() { server = new NettyServer(9527); server.addHeartBeat(new HeartBeatListener() { @Override public byte[] getHeartBeat() { String data = "心跳"; try { server.sentData("测试数据".getBytes("GBK")); return data.getBytes("GBK"); } catch (UnsupportedEncodingException e) { e.printStackTrace(); } return null; } }); server.setHandler(handler); server.start(); } private void stopNettyServer() { if (server != null) { server.stop(); } } @SuppressLint("HandlerLeak") private Handler handler = new Handler() { @Override public void handleMessage(@NonNull Message msg) { result += "\r\n"; result += msg.obj; tvResult.setText(result); } }; }对应的布局文件
<?xml version="1.0" encoding="utf-8"?> <RelativeLayout xmlns:android="http://schemas.android.com/apk/res/android" xmlns:tools="http://schemas.android.com/tools" android:layout_width="match_parent" android:layout_height="match_parent" tools:context="com.liangc.test.MainActivity"> <TextView android:id="@+id/tvTestResult" android:layout_width="match_parent" android:layout_height="match_parent" android:layout_above="@+id/btnTestServer" android:padding="20dp" /> <Button android:id="@+id/btnTestServer" android:text="测试服务端" android:layout_above="@+id/btnTestClient" android:layout_margin="10dp" android:layout_width="match_parent" android:layout_height="wrap_content"/> <Button android:id="@+id/btnTestClient" android:text="测试客户端" android:layout_alignParentBottom="true" android:layout_margin="10dp" android:layout_width="match_parent" android:layout_height="wrap_content"/> </RelativeLayout>觉得有用又有积分的请下载支持下,只需要1个积分,完整工程代码:https://download.csdn.net/download/sinat_17164495/12998646