通过上一篇博客《Netty原理解析系列(五)—Netty线程模型》中了解了Netty的线程模型,对Netty的整体架构有了一定的认识。这篇文章将介绍Netty相关的核心组件,并通过一些简单的demo来了解如何使用Netty。
上一篇博客的简单介绍了一下这个组件,它是一个事件循环器,充当了Reactor的核心。每个EventLoop 都会包含一个Selector选择器,用于监听IO事件,和一个taskQueue用于存储用户提交的任务。此EventLoop会用一个独有的线程,默认是不启动的,当有任务触发时就会启动,并一直轮询下去。
一般我们不会单独的使用它,而是使用EventLoopGroup,而EventLoopGroup顶层接口是Executor,也就是说可以把它当作是一个线程池。在平时的使用过程中,会创建两个EventLoopGroup,一个是BossGroup,一个是WorkGroup。
在BossGroup中一般设置一个EventLoop。而WorkGroup可以根据自己的需求设定。默认的线程数量是cpu核心数的两倍。BossGroup用于监听客户端的连接,注册新的channel到WorkGroup当中。WorkGroup监听客户端的读写请求。
注意事项:
一个EventLoopGroup可以包含一个或者多个EventLoop。一个EventLoop在它的生命周期内只能与一个线程绑定。一个EventLoop可以分配一至多个Channel,而这些Channel就会和这个EventLoop所绑定的线程而绑定。这些Channel所处理的I/O将会通过这个线程来处理提供了I/O的基本操作,如bind、connect、read、write等。而上面的这些操作都是通过ChannelPipeline中的多个Handler来实现的。
当BossGroup监听到连接请求的时候,处理连接,然后创建新的channel绑定到WorkGroup当中的EventLoop。采用轮询的方式绑定到不同的EventLoop。
例如,EventLoopGroup当中有8个EventLoop,每个EventLoop与一个线程绑定在一起。当第一个客户端请求连接的时候,BossGroup中的EventLoop接收连接,创建新的channel绑定到了WorkGroup当中的第一个EventLoop,第二个客户端请求连接的时候,创建的新的channel会绑定到第二个EventLoop上。就这样每个客户端连接后,新创建的channel以轮询的方式绑定到不同的EventLoop。
Channel的种类也有很多,如下图展示了一部分的Channel实现类
最为常用的是NioSocketChannel和NioServerSocketChannel
每个Channel中都会有一条唯一的Pipeline 其用于流转的方式处理Channel中发生的事件比如注册、绑定端口、读写消息等。这些事件会在pipeline流中的各个节点轮转并依次处理,而每个节点就可以处理相对应的功能,这是一种责任链式的设计模式,其目的是为让各个节点处理理聚焦的业务。主要的业务逻辑可以自定义实现ChannelHandler加入到pipeline中。
ChannelPipeline可以看作是一个双向链表,里面维护了一系列的ChannelHandler
而ChannelHandler 和 ChannelPipeline 是通过ChannelHandlerContext 来联系到一起的。
ChannelHandler又分为ChannelInboundHandler 和ChannelOutboundHandler。
ChannelInboundHandler 用于处理入站的消息,也就是读请求的时候处理,处理完毕后交给下一个ChannelInboundHandler。ChannelOutboundHandler 用于处理出站的消息,也就是写请求的时候处理,处理完毕后交给下一个ChannelOutboundHandler。Netty 为异步非阻塞,即所有的 I/O 操作都为异步的,因此,我们不能立刻得知消息是否已经被处理了。Netty 提供了 ChannelFuture 接口,该接口继承了JUC中的Future,也就是说两者差不多,都是用来得到任务执行的结果。通过该接口的 addListener() 方法注册一个 ChannelFutureListener,当操作执行成功或者失败时,监听就会自动触发返回结果。
可以发现,其实boss的代码基本上是固定的,只是用来监听客户端的连接。所以Netty提供了更简单的使用方式—Bootstrap。
下面就编写一个简单的Http服务来演示Bootstrap的使用。
public class BootstrapTest { // 编写一个Http 服务 // http-->TCP public void open(int port) { // ServerBootstrap bootstrap = new ServerBootstrap(); EventLoopGroup boss = new NioEventLoopGroup(1);// EventLoopGroup work = new NioEventLoopGroup(8);// bootstrap.group(boss, work)//连接分给boss 读写分给work .channel(NioServerSocketChannel.class)// 指定要打开的管道 自动进行进行注册==》NioServerSocketChannel -> eventLoop -> Selector .childHandler(new ChannelInitializer<Channel>() {//指定 子管道 @Override protected void initChannel(Channel ch) { ch.pipeline().addLast("decode", new HttpRequestDecoder()); // 输入 解码 ch.pipeline().addLast("aggregator",new HttpObjectAggregator(65536));//合并器 将请求体和请求包合并起来 ch.pipeline().addLast("servlet", new MyServlet()); // 业务处理 ch.pipeline().addFirst("encode", new HttpResponseEncoder());// 输出流 编码 } }); ChannelFuture future = bootstrap.bind(port);//异步操作 future.addListener(future1 -> System.out.println("注册成功"));//监听注册成功 } private class MyServlet extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { // request (请求头) // body (请求体) if (msg instanceof FullHttpRequest) { FullHttpRequest request= (FullHttpRequest) msg; System.out.println("url"+request.uri()); System.out.println(request.content().toString(Charset.defaultCharset())); FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8"); response.content().writeBytes("hello".getBytes()); ChannelFuture future = ctx.writeAndFlush(response); future.addListener(ChannelFutureListener.CLOSE); } if (msg instanceof HttpRequest) { HttpRequest request = (HttpRequest) msg; System.out.println("当前请求:" + request.uri()); } if (msg instanceof HttpContent) { // 写入文件流 ByteBuf content = ((HttpContent) msg).content(); OutputStream out = new FileOutputStream("/Users/gongsenlin/code/IdeaWorkspace/Netty/coderead-netty4/test.txt", true); content.readBytes(out, content.readableBytes()); out.close(); } if (msg instanceof LastHttpContent) {//上传大文件 要分开发 最后一个包是lastHttpContent FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HTTP_1_0, HttpResponseStatus.OK); response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/html;charset=utf-8"); response.content().writeBytes("上传完毕".getBytes()); ChannelFuture future = ctx.writeAndFlush(response); future.addListener(ChannelFutureListener.CLOSE); } } } public static void main(String[] args) throws IOException { new BootstrapTest().open(8080); System.in.read(); } }此时不需要像上一个demo那样去写两个添加channel以及相关处理器的代码。
bootstrap.group(boss, work) .channel(NioServerSocketChannel.class) .childHandlernew ChannelInitializer<Channel>() { @Override protected void initChannel(Channel ch) { } });只需要关注work的channel中的channelHandler的执行逻辑即可,简化了很多的操作,也就是程序员只需要关注读写需要处理的逻辑即可。
这里的管道中添加了4个channelHandler ,其中HttpRequestDecoder和HttpObjectAggregator是入站信息处理器,也就是读进来的消息会走这两个处理器去进行处理。MyServlet 是自定义的入站消息处理器,实现了SimpleChannelInboundHandler接口。注意这3个处理器都是入站消息处理器,都是调用addLast按照顺序添加到队尾的,入站消息处理器的执行顺序是从队头执行到队尾,所以具体的先后顺序 要根据程序员自己想要的业务逻辑来决定。
而HttpResponseEncoder是一个出站消息处理器。
在MyServlet中有这样一段代码 关键就在这里,writeAndFlush,会将response交给在这个消息处理器之前出现的第一个出站消息处理器,也就是排队排在它前面出现的第一个出站消息处理器。所以HttpResponseEncoder必须排在MyServlet之前。否则会报错。
通过这篇博客了解了各个组件的作用以及学会了初步的使用netty。之后的篇章将会介绍各组件的实现细节。