分布式理论、架构设计学习:BIO、NIO、AIO

it2024-03-26  52

1. 同步和异步

同步(synchronize)、异步(asychronize)是指应用程序和内核的交互而言的

同步 指用户进程触发IO操作等待或者轮训的方式查看IO操作是否就绪。

同步举例:银行取钱,我自己去取钱,取钱的过程中等待

异步 当一个异步进程调用发出之后,调用者不会立刻得到结果。而是在调用发出之后,被调用者通过状态、通知来通知调用者,或者通过回调函数来处理这个调用。

说明: 使用异步IO时,Java将IO读写委托给OS处理,需要将数据缓冲区地址和大小传给OS,OS需要支持异步IO操作

异步举例:我请朋友帮我取钱,他取到钱后返回给我(委托给操作系统OS,OS需要支持IO异步API)

2. 阻塞和非阻塞

阻塞和非阻塞是针对于进程访问数据的时候,根据IO操作的就绪状态来采取不同的方式

简单点说就是一种读写操作方法的实现方式。 阻塞方式下读取和写入将一直等待, 而非阻塞方式下,读取和写入方法会理解返回一个状态值

阻塞 ATM机排队取款,你只能等待排队取款(使用阻塞IO的时候,Java调用会一直阻塞到读写完成才返回。)

非阻塞 柜台取款,取个号,然后坐在椅子上做其他事,等广播通知,没到你的号你就不能去,但你可以不断的问大堂经理排到了没有。(使用非阻塞IO时,如果不能读写Java调用会马上返回,当IO事件分发器会通知可读写时再继续进行读写,不断循环直到读写完成)

老张煮开水。 老张,水壶两把(普通水壶,简称水壶;会响的水壶,简称响水壶)。

老张把水壶放到火上,站立着等水开。(同步阻塞)老张把水壶放到火上,去客厅看电视,时不时去厨房看看水开没有。(同步非阻塞)老张把响水壶放到火上,立等水开。(异步阻塞)老张把响水壶放到火上,去客厅看电视,水壶响之前不再去看它了,响了再去拿壶。(异步非阻塞)

3. BIO

同步阻塞IO。B代表blocking 服务器实现模式为一个连接一个线程,即客户端有连接请求时,服务器端就需要启动一个线程进行处理,如果这个连接不做任何事情会造成不必要的线程开销,当然可以通过线程池机制改善。

适用场景:Java1.4之前唯一的选择,简单易用但资源开销太高

服务端代码 package com.study; import java.io.IOException; import java.net.InetSocketAddress; import java.net.ServerSocket; import java.net.Socket; public class IOServer { public static void main(String[] args) throws IOException { ServerSocket serverSocket = new ServerSocket(); serverSocket.bind(new InetSocketAddress("127.0.0.1",9999)); while (true){ final Socket socket = serverSocket.accept(); // 同步阻塞 new Thread(() -> { byte[] bytes = new byte[1024]; try { int len = socket.getInputStream().read(bytes); // 同步阻塞 System.out.println(new String(bytes,0,len)); socket.getOutputStream().write(bytes,0,len); socket.getOutputStream().flush(); } catch (IOException e) { e.printStackTrace(); } }).start(); } } } 客户端代码 package com.study; import java.io.IOException; import java.net.Socket; public class IOClient { public static void main(String[] args) throws IOException { Socket socket = new Socket("127.0.0.1",9999); socket.getOutputStream().write("hello".getBytes()); socket.getOutputStream().flush(); System.out.println("server send back data ====="); byte[] bytes = new byte[1024]; int len = socket.getInputStream().read(bytes); System.out.println(new String(bytes,0,len)); socket.close(); } } 执行结果

4. NIO

4.1 NIO 介绍

同步非阻塞IO (non-blocking IO / new io)是指JDK 1.4 及以上版本。

服务器实现模式为一个请求一个通道,即客户端发送的连接请求都会注册到多路复用器上,多路复用器轮询到连接

有IO请求时才启动一个线程进行处理。

通道(Channels) NIO 新引入的最重要的抽象是通道的概念。Channel 数据连接的通道。 数据可以从Channel读到Buffer中,也可以从Buffer 写到Channel中

缓冲区(Buffers) 通道channel可以向缓冲区Buffer中写数据,也可以像buffer中存数据。

选择器(Selector) 使用选择器,借助单一线程,就可对数量庞大的活动 I/O 通道实时监控和维护。

4.2 特点

当一个连接创建后,不会需要对应一个线程,这个连接会被注册到多路复用器,所以一个连接只需要一个线程即可,所有的连接需要一个线程就可以操作,该线程的多路复用器会轮询,发现连接有请求时,才开启一个线程处理。

IO 和 NIO 区别 IO模型中,一个连接来了,会创建一个线程,对应一个while死循环,死循环的目的就是不断监测这条连接上是否有数据可以读,大多数情况下,1w个连接里面同一时刻只有少量的连接有数据可读,因此,很多个while死循环都白白浪费掉了,因为读不出数据。

而在NIO模型中,他把这么多while死循环变成一个死循环,这个死循环由一个线程控制,那么他又是如何做到一个线程,一个while死循环就能监测1w个连接是否有数据可读的呢? 这就是NIO模型中selector的作用,一条连接来了之后,现在不创建一个while死循环去监听是否有数据可读了,而是直接把这条连接注册到selector上,然后,通过检查这个selector,就可以批量监测出有数据可读的连接,进而读取数据

举个非常简单的生活中的例子说明IO与NIO的区别

在一家幼儿园里,小朋友有上厕所的需求,小朋友都太小以至于你要问他要不要上厕所,他才会告诉你。幼儿园一共有100个小朋友,有两种方案可以解决小朋友上厕所的问题:

第一种方案: 每个小朋友配一个老师。每个老师隔段时间询问小朋友是否要上厕所,如果要上,就领他去厕所,100个小朋友就需要 100个老师来询问,并且每个小朋友上厕所的时候都需要一个老师领着他去上,这就是IO模型,一个连接对应一个线程。

第二种方案 所有的小朋友都配同一个老师。这个老师隔段时间询问所有的小朋友是否有人要上厕所,然后每一时刻把所有要上厕所 的小朋友批量领到厕所,这就是NIO模型,所有小朋友都注册到同一个老师,对应的就是所有的连接都注册到一个线程,然 后批量轮询。

4.3 NIO使用

介绍完 JDK NIO 的解决方案之后,接下来使用NIO的方案替换掉IO的方案

服务端 package com.study; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.*; import java.nio.charset.StandardCharsets; import java.util.Iterator; import java.util.Scanner; public class NIOServer extends Thread{ //1.声明多路复用器 private Selector selector; //2.定义读写缓冲区 private ByteBuffer readBuffer = ByteBuffer.allocate(1024); private ByteBuffer writeBuffer = ByteBuffer.allocate(1024); //3.定义构造方法初始化端口 public NIOServer(int port) { init(port); } //4.main方法启动线程 public static void main(String[] args) { new NIOServer(8889).start(); } //5.初始化 private void init(int port) { try { System.out.println("服务器正在启动......"); //1)开启多路复用器 this.selector = Selector.open(); //2) 开启服务通道 ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); //3)设置为非阻塞 serverSocketChannel.configureBlocking(false); //4)绑定端口 serverSocketChannel.bind(new InetSocketAddress(port)); /* * SelectionKey.OP_ACCEPT —— 接收连接继续事件,表示服务器监听到了客户连接,服务器可以接收这个连接了 * SelectionKey.OP_CONNECT —— 连接就绪事件,表示客户与服务器的连接已经建立成功 * SelectionKey.OP_READ —— 读就绪事件,表示通道中已经有了可读的数据,可以执行读操作了(通道目前有数据,可以进行读操作了) * SelectionKey.OP_WRITE —— 写就绪事件,表示已经可以向通道写数据了(通道目前可以用于写操作) */ //5)注册,标记服务连接状态为ACCEPT状态 serverSocketChannel.register(this.selector, SelectionKey.OP_ACCEPT); System.out.println("服务器启动完毕"); } catch (IOException e) { e.printStackTrace(); } } public void run(){ while (true){ try { //1.当有至少一个通道被选中,执行此方法 this.selector.select(); //2.获取选中的通道编号集合 Iterator<SelectionKey> keys = this.selector.selectedKeys().iterator(); //3.遍历keys while (keys.hasNext()) { SelectionKey key = keys.next(); //4.当前key需要从动刀集合中移出,如果不移出,下次循环会执行对应的逻辑,造成业务错乱 keys.remove(); //5.判断通道是否有效 if (key.isValid()) { try { //6.判断是否可以连接 if (key.isAcceptable()) { accept(key); } } catch (CancelledKeyException e) { //出现异常断开连接 key.cancel(); } try { //7.判断是否可读 if (key.isReadable()) { read(key); } } catch (CancelledKeyException e) { //出现异常断开连接 key.cancel(); } try { //8.判断是否可写 if (key.isWritable()) { write(key); } } catch (CancelledKeyException e) { //出现异常断开连接 key.cancel(); } } } } catch (IOException e) { e.printStackTrace(); } } } private void accept(SelectionKey key) { try { //1.当前通道在init方法中注册到了selector中的ServerSocketChannel ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel(); //2.阻塞方法, 客户端发起后请求返回. SocketChannel channel = serverSocketChannel.accept(); ///3.serverSocketChannel设置为非阻塞 channel.configureBlocking(false); //4.设置对应客户端的通道标记,设置次通道为可读时使用 channel.register(this.selector, SelectionKey.OP_READ); } catch (IOException e) { e.printStackTrace(); } } //使用通道读取数据 private void read(SelectionKey key) { try{ //清空缓存 this.readBuffer.clear(); //获取当前通道对象 SocketChannel channel = (SocketChannel) key.channel(); //将通道的数据(客户发送的data)读到缓存中. int readLen = channel.read(readBuffer); //如果通道中没有数据 if(readLen == -1 ){ //关闭通道 key.channel().close(); //关闭连接 key.cancel(); return; } //Buffer中有游标,游标不会重置,需要我们调用flip重置. 否则读取不一致 this.readBuffer.flip(); //创建有效字节长度数组 byte[] bytes = new byte[readBuffer.remaining()]; //读取buffer中数据保存在字节数组 readBuffer.get(bytes); System.out.println("收到了从客户端 "+ channel.getRemoteAddress() + " : "+ new String(bytes, StandardCharsets.UTF_8)); //注册通道,标记为写操作 channel.register(this.selector,SelectionKey.OP_WRITE); }catch (Exception e){ e.printStackTrace(); } } //给通道中写操作 private void write(SelectionKey key) { //清空缓存 this.readBuffer.clear(); //获取当前通道对象 SocketChannel channel = (SocketChannel) key.channel(); //录入数据 Scanner scanner = new Scanner(System.in); try { System.out.println("即将发送数据到客户端.."); String line = scanner.nextLine(); //把录入的数据写到Buffer中 writeBuffer.put(line.getBytes(StandardCharsets.UTF_8)); //重置缓存游标 writeBuffer.flip(); channel.write(writeBuffer); channel.register(this.selector,SelectionKey.OP_READ); } catch (Exception e) { e.printStackTrace(); } } } 客户端 package com.study; import java.io.IOException; import java.net.InetSocketAddress; import java.nio.ByteBuffer; import java.nio.channels.SocketChannel; import java.nio.charset.StandardCharsets; import java.util.Scanner; public class NIOClient { public static void main(String[] args) { //创建远程地址 InetSocketAddress address = new InetSocketAddress("127.0.0.1",8889); SocketChannel channel = null; //定义缓存 ByteBuffer buffer = ByteBuffer.allocate(1024); try { //开启通道 channel = SocketChannel.open(); //连接远程远程服务器 channel.connect(address); Scanner sc = new Scanner(System.in); while (true){ System.out.println("客户端即将给 服务器发送数据.."); String line = sc.nextLine(); if(line.equals("exit")){ break; } //控制台输入数据写到缓存 buffer.put(line.getBytes(StandardCharsets.UTF_8)); //重置buffer 游标 buffer.flip(); //数据发送到数据 channel.write(buffer); //清空缓存数据 buffer.clear(); //读取服务器返回的数据 int readLen = channel.read(buffer); if(readLen == -1){ break; } //重置buffer游标 buffer.flip(); byte[] bytes = new byte[buffer.remaining()]; //读取数据到字节数组 buffer.get(bytes); System.out.println("收到了服务器发送的数据 : "+ new String(bytes, StandardCharsets.UTF_8)); buffer.clear(); } } catch (IOException e) { e.printStackTrace(); } finally { if (null != channel){ try { channel.close(); } catch (IOException e) { e.printStackTrace(); } } } } } 测试

5. AIO

异步非阻塞IO。A代表asynchronize 当有流可以读时,操作系统会将可以读的流传入read方法的缓冲区,并通知应用程序,对于写操作,OS将write方法的流写入完毕是操作系统会主动通知应用程序。因此read和write都是异步 的,完成后会调用回调函数。 使用场景:连接数目多且连接比较长(重操作)的架构,比如相册服务器。重点调用了OS参与并发操作,编程比较复杂。Java7开始支持

最新回复(0)