Netty项目起源于2004年,Java社区第一个基于事件驱动的网络应用开发框架。Hadoop、Cassandra、Spark、Dubbo、gRPC、RocketMQ、Zookeeper、Spring5等都在使用Netty,是开发高性能Java服务器的必学框架。
应用现状:截至2019年9月,30000+项目在使用,实际使用中远大于30000 使用Netty的典型项目:Cassandra数据库、Spark、Hadoop、RocketMQ、Elasticsearch、gRPC、Spring5、Dubbo等
更多流行协议的支持; 紧跟新JDK的步伐; 更多易用性、人性化的支持:IP地址黑白名单、流量整形; 应用越来越多。
泛型+反射+工厂模式实现I/O模式的切换
假设客户端分别发送了两个数据包D1和D2给服务端,由于服务端一次读取到的字节数是不确定的,故可能存在以下4种情况。
服务端分两次读取到了两个独立的数据包,分别是D1和D2,没有粘包和拆包;服务端一次接收到了两个数据包,D1和D2粘合在一起,被称为TCP粘包;服务端分两次读取到了两个数据包,第一次读取到了完整的D1包和D2包的部分内容,第二次读取到了D2包的剩余内容,这被称为TCP拆包;服务端分两次读取到了两个数据包,第一次读取到了D1包的部分内容D1_1,第二次读取到了D1包的剩余内容D1_2和D2包的整包;如果此时服务端TCP接收滑窗非常小,而数据包D1和D2比较大,很有可能会发生第五种可能,即服务端分多次才能将D1和D2包接收完全,期间发生多次拆包。TCP是流式协议,消息无边界
当启用(默认关闭)keepalive时,TCP在连接没有数据。通过7200秒后发送keepalive消息,当探测没有确认时,按75秒的重试频率重发,一直发9个探测包都没有确认,就认定连接失败。所以总耗时一般为:2小时11分钟(7200 + 75秒 * 9次)
假设开了一个饭店,,别人电话来订餐,电话通了后,订餐的说了一堆订餐要求,说着说着,对方就不讲话了。你会稍等片刻,看短时间内对方还会不会说话(idle检测),如果对方不说,认定对方存在问题(idle),于是开始发问“你还在吗”(keepalive),或者直接挂机(关闭连接)。
idle检测,只是负责诊断,诊断后,做出不同的行为,决定idle检测的最终用途。
发送keepalive:一般用来配合keepalive,减少keepalive消息。直接关闭连接:快速释放损坏的、恶意的、很久不用的连接,让系统时刻保持最好的状态;简单粗暴,客户端可能需要重连开启keepalive:
Server端开其TCP keepalive bootstrap.childOption(ChannelOption.SO_KEEPALIVE, true) bootstrap.childOption(NioChannelOption.of(StandardSocketOptions.SO_KEEPALIVE), true) 开启不同的idle check ch.pipeline().addLast("idleCheckHandler", new IdleStateHandler(0, 20, 0, TimeUnit.SECONDS));synchronized method -> synchronized block
@Override void init(Channel channel) throw Exception { final Map<ChannelOption<?>, Object> options = options0(); synchronized (options) { setChannelOptions(channel, options, logger); } final Map<AttributeKey<?>, Object> attrs = attrs0(); synchronized (attrs) { for (Entry<AttributeKey<?>, Object> e : attrs.entrySet()) { AttributeKey<Object> key = (AttributeKey<Object>) e.getKey(); channel.attr(key).set(e.getValue()); } } }AtomicLong -> Volatile long + AtomicLongFieldUpdater
private static final AtomicLongFieldUpdater<ChannelOutboundBuffer> TOTAL_PENDING_SIZE_UPDATER = AtomicLongFieldUpdater.newUpdater(ChannelOutboundBuffer.class, "totalPendingSize"); private volatile long totalPendingSize; private void incrementPendingOutboundBytes(long size, boolean invokerLater) { if (size == 0) { return; } long newWriteBufferSize = TOTAL_PENGING_SIZE_UPDATER.addAndGet(this, size); if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) { setUnwritable(invokeLater); } }Atomic long VS long: 前者是一个对象,包含对象头(object header)以用来保存hashcode、lock等信息,32位系统占用8字节;64位系统占16字节,所以在64位系统情况下:
volatile long = 8 bytes AtomicLong = 8 bytes(volatile long)+ 16bytes(对象头)+ 8 bytes(引用)= 32 bytes至少节约24字节 结论:Atomic* objects -> Volatile primary type + static Atomic* FieldUpdater
记录内存分配字节数等功能用到的LongCounter 高并发时:java.util.concurrent.atomic.AtomicLong -> java.util.concurrent.atomic.LongAdder 结论:及时衡量、使用JDK最新的功能
Object.wait/notify -> CountDownLatch
有一点我们需要知道的是,ByteBuf的jar包,是可以单独使用的。比如某个项目中有一个场景,需要处理某个自定义的协议,那么我们在解析协议时,就可以将接收到的将字节内容写入一个ByteBuf,然后从ByteBuf中慢慢的将内容读取出来。下面让我们用一个例子简单的了解下ByteBuf的使用。
要想使用ByteBuf,首先肯定是要创建一个ByteBuf,更确切的说法就是要申请一块内存,后续可以在这块内存中执行写入数据读取数据等等一系列的操作。
那么如何创建一个ByteBuf呢?Netty中设计了一个专门负责分配ByteBuf的接口:ByteBufAllocator。该接口有一个抽象子类和两个实现类,分别对应了用来分配池化的ByteBuf和非池化的ByteBuf。
有了Allocator之后,Netty又为我们提供了两个工具类:Pooled、Unpooled,分类用来分配池化的和未池化的ByteBuf,进一步简化了创建ByteBuf的步骤,只需要调用这两个工具类的静态方法即可。
我们以Unpooled类为例,查看Unpooled的源码可以发现,他为我们提供了许多创建ByteBuf的方法,但最终都是以下这几种,只是参数不一样而已:
// 在堆上分配一个ByteBuf,并指定初始容量和最大容量 public static ByteBuf buffer(int initialCapacity, int maxCapacity) { return ALLOC.heapBuffer(initialCapacity, maxCapacity); } // 在堆外分配一个ByteBuf,并指定初始容量和最大容量 public static ByteBuf directBuffer(int initialCapacity, int maxCapacity) { return ALLOC.directBuffer(initialCapacity, maxCapacity); } // 使用包装的方式,将一个byte[]包装成一个ByteBuf后返回 public static ByteBuf wrappedBuffer(byte[] array) { if (array.length == 0) { return EMPTY_BUFFER; } return new UnpooledHeapByteBuf(ALLOC, array, array.length); } // 返回一个组合ByteBuf,并指定组合的个数 public static CompositeByteBuf compositeBuffer(int maxNumComponents){ return new CompositeByteBuf(ALLOC, false, maxNumComponents); }其中包装方法除了上述这个方法之外,还有一些其他常用的包装方法,比如参数是一个ByteBuf的包装方法,比如参数是一个原生的ByteBuffer的包装方法,比如指定一个内存地址和大小的包装方法等等。 另外还有一些copy*开头的方法,实际是调用了buffer(int initialCapacity, int maxCapacity)或directBuffer(int initialCapacity, int maxCapacity)方法,然后将具体的内容write进生成的ByteBuf中返回。 以上所有的这些方法都实际通过一个叫ALLOC的静态变量进行了调用,来实现具体的ByteBuf的创建,而这个ALLOC实际是一个ByteBufAllocator:
private static final ByteBufAllocator ALLOC = UnpooledByteBufAllocator.DEFAULT;ByteBufAllocator是一个专门负责ByteBuf分配的接口,对应的Unpooled实现类就是UnpooledByteBufAllocator。在UnpooledByteBufAllocator类中可以看到UnpooledByteBufAllocator.DEFAULT变量是一个final类型的静态变量
/** * Default instance which uses leak-detection for direct buffers. * 默认的UnpooledByteBufAllocator实例,并且会对堆外内存进行泄漏检测 */ public static final UnpooledByteBufAllocator DEFAULT = new UnpooledByteBufAllocator(PlatformDependent.directBufferPreferred());ByteBuf和ByteBufAllocator之间是一种相辅相成的关系,ByteBufAllocator用来创建一个ByteBuf,而ByteBuf亦可以返回创建他的Allocator。ByteBuf和ByteBufAllocator之间是一种 抽象工厂模式
ChannelHandler是netty中的核心处理部分,我们使用netty的绝大部分代码都写在这部分,所以了解它的一些机制和特性是很有必要的。
Channel接口抽象了底层socket的一些状态属性以及调用方法 针对不同类型的socket提供不同的子类实现。
ChannelHandler用于处理Channel对应的事件ChannelHandler接口里面只定义了三个生命周期方法,我们主要实现它的子接口ChannelInboundHandler和ChannelOutboundHandler,为了便利,框架提供ChannelInboundHandlerAdapter,ChannelOutboundHandlerAdapter和ChannelDuplexHandler这三个适配类,在使用的时候只需要实现你关注的方法即可
ChannelHandler里面定义三个生命周期方法,分别会在当前ChannelHander加入ChannelHandlerContext中,从ChannelHandlerContext中移除,以及ChannelHandler回调方法出现异常时被回调
这些回调方法被触发的时机
回调方法 触发时机clientserverchannelRegistered当前channel注册到EventLooptruetruechannelUnregistered当前channel当前channel从EventLoop取消注册 到EventLooptruetruechannelActive当前channel激活的时候truetruechannelInactive当前channel不活跃的时候,也就是当前channel到了它生命周期末 truetruechannelRead当前channel从远端读取到数据truetruechannelReadCompletechannel read消费完读取的数据的时候被触发truetrueuserEventTriggered用户事件触发的时候truetruechannelWritabilityChangedchannel的写状态变化的时候触发truetrue 可以注意到每个方法都带了ChannelHandlerContext作为参数,具体作用是,在每个回调事件里面,处理完成之后,使用ChannelHandlerContext的fireChannelXXX方法来传递给下个ChannelHandler,netty的codec模块和业务处理代码分离就用到了这个链路处理注意到一些回调方法有ChannelPromise这个参数,我们可以调用它的addListener注册监听,当回调方法所对应的操作完成后,会触发这个监听 下面这个代码,会在写操作完成后触发,完成操作包括成功和失败
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ctx.write(msg,promise); System.out.println("out write"); promise.addListener(new GenericFutureListener<Future<? super Void>>() { @Override public void operationComplete(Future<? super Void> future) throws Exception { if(future.isSuccess()){ System.out.println("OK"); } } }); }in和out的区别主要在于ChannelInboundHandler的channelRead和channelReadComplete回调和ChannelOutboundHandler的write和flush回调上,ChannelOutboundHandler的channelRead回调负责执行入栈数据的decode逻辑,ChannelOutboundHandler的write负责执行出站数据的encode工作。其他回调方法和具体触发逻辑有关,和in与out无关。
每个ChannelHandler通过add方法加入到ChannelPipeline中去的时候,会创建一个对应的ChannelHandlerContext,并且绑定,ChannelPipeline实际维护的是ChannelHandlerContext 的关系 在DefaultChannelPipeline源码中可以看到会保存第一个ChannelHandlerContext以及最后一个ChannelHandlerContext的引用
final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail;而在AbstractChannelHandlerContext源码中可以看到
volatile AbstractChannelHandlerContext next; volatile AbstractChannelHandlerContext prev;每个ChannelHandlerContext之间形成双向链表
ChannelPipeline 在Channel创建的时候,会同时创建ChannelPipeline
protected AbstractChannel(Channel parent) { this.parent = parent; id = newId(); unsafe = newUnsafe(); pipeline = newChannelPipeline(); }在ChannelPipeline中也会持有Channel的引用
protected DefaultChannelPipeline newChannelPipeline() { return new DefaultChannelPipeline(this); }ChannelPipeline会维护一个ChannelHandlerContext的双向链表
final AbstractChannelHandlerContext head; final AbstractChannelHandlerContext tail;链表的头尾有默认实现
protected DefaultChannelPipeline(Channel channel) { this.channel = ObjectUtil.checkNotNull(channel, "channel"); succeededFuture = new SucceededChannelFuture(channel, null); voidPromise = new VoidChannelPromise(channel, true); tail = new TailContext(this); head = new HeadContext(this); head.next = tail; tail.prev = head; }我们添加的自定义ChannelHandler会插入到head和tail之间,如果是ChannelInboundHandler的回调,根据插入的顺序从左向右进行链式调用,ChannelOutboundHandler则相反
具体关系如下,但是下图没有把默认的head和tail画出来,这两个ChannelHandler做的工作相当重要 上面的整条链式的调用是通过Channel接口的方法直接触发的,如果使用ChannelContextHandler的接口方法间接触发,链路会从ChannelContextHandler对应的ChannelHandler开始,而不是从头或尾开始
HeadContext实现了ChannelOutboundHandler,ChannelInboundHandler这两个接口
class HeadContext extends AbstractChannelHandlerContext implements ChannelOutboundHandler, ChannelInboundHandler因为在头部,所以说HeadContext中关于in和out的回调方法都会触发 关于ChannelInboundHandler,HeadContext的作用是进行一些前置操作,以及把事件传递到下一个ChannelHandlerContext的ChannelInboundHandler中去 看下其中channelRegistered的实现
public void channelRegistered(ChannelHandlerContext ctx) throws Exception { invokeHandlerAddedIfNeeded(); ctx.fireChannelRegistered(); }从语义上可以看出来在把这个事件传递给下一个ChannelHandler之前会回调ChannelHandler的handlerAdded方法而有关ChannelOutboundHandler接口的实现,会在链路的最后执行,看下write方法的实现
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { unsafe.write(msg, promise); }这边的unsafe接口封装了底层Channel的调用,之所以取名为unsafe,是不需要用户手动去调用这些方法。这个和阻塞原语的unsafe不是同一个。也就是说,当我们通过Channel接口执行write之后,会执行ChannelOutboundHandler链式调用,在链尾的HeadContext ,在通过unsafe回到对应Channel做相关调用 从netty Channel接口的实现就能论证这个
public ChannelFuture write(Object msg) { return pipeline.write(msg); }TailContext实现了ChannelInboundHandler接口,会在ChannelInboundHandler调用链最后执行,只要是对调用链完成处理的情况进行处理,看下channelRead实现
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { onUnhandledInboundMessage(msg); }如果我们自定义的最后一个ChannelInboundHandler,也把处理操作交给下一个ChannelHandler,那么就会到TailContext,在TailContext会提供一些默认处理
protected void onUnhandledInboundMessage(Object msg) { try { logger.debug( "Discarded inbound message {} that reached at the tail of the pipeline. " + "Please check your pipeline configuration.", msg); } finally { ReferenceCountUtil.release(msg); } }比如channelRead中的onUnhandledInboundMessage方法,会把msg资源回收,防止内存泄露
强调一点的是,如果要执行整个链路,必须通过调用Channel方法触发,ChannelHandlerContext引用了ChannelPipeline,所以也能间接操作channel的方法,但是会从当前ChannelHandlerContext绑定的ChannelHandler作为起点开始,而不是ChannelHandlerContext的头和尾 这个特性在不需要调用整个链路的情况下可以使用,可以增加一些效率