NIO
NIO 与 BIO的区别
BIO
Java IO 核心就是流。流只能单向,要么输入,要么输出。只能选其一。
Java IO 就是典型的 BIO 模型,即面向流编程,一个流要么是输入,要么是输出。
BIO 是阻塞的,即在准备读取数据到数据返回期间需要等待内核将数据准备完毕,再通过 IO 阻塞传输到用户空间。
NIO
Java NIO 与 BIO 不同,NIO 有三个核心组件 Channel、Buffer、Selector,在 NIO 中我们是面向块(block) 或是缓冲区(buffer) 编程的。与 Stream 不同的是,Channel 是双向的,流只能单向所以区分 In 和 Out,所以 Channel 打开后可以进行读取、写入或是读写。
NIO 是非阻塞的,在数据准备阶段不需要等待,需要启动一个线程一直监听内核是否将数据准备完毕,准备完毕后,监听线程通知 IO 线程阻塞读取数据(这一块还是阻塞的)。
由于 Channel 是双向的,因此它能更好地反映出底层操作系统的真实情况;在 Linux 系统中,底层操作系统的通道就是双向的。
NIO 核心组件介绍
NIO 包含3个核心的组件:
- Channel(通道)
- Buffer(缓冲区)
- Selector(选择器)
缓冲区(Buffer)
在谈到缓冲区,我们说缓冲区对象本质上是一个数组,但它其实是一个特殊的数组,缓冲区对象内置了一些机制,能够追踪和记录缓冲区的状态变化情况,如果我们使用 get()
方法从缓冲区获取数据或者使用 put()
方法把数据写入缓冲区,都会引起缓冲区状态的变化。
缓冲区三个重要属性:
- position:指定下一个将要被写入或者读取的元素索引,它的值由 get()/put() 方法自动更新,在新创建一个 Buffer 对象时,position 被初始化为 0。
- limit:指定还有多少数据需要取出(在从缓冲区写入通道时),或者还有多少空间可以放入数据(在从通道读入缓冲区时)。
- capacity:指定了可以存储在缓冲区的最大数据容量,实际上,它指定了底层数据的大小,或者至少时指定了准许我们使用的底层数组的容量。
注:0<= position <= limit <= capacity
缓冲区的容量(capacity)是不变的,而位置(position)和上限(limit)以根据实际需要改变。也就是说可以通过改变当前位置和上限来操作缓冲区内任意位置的数据。
通过源码控制初始化时的上限(limit)和容量(capacity)是相同的,而位置(position)则是被初始化为了 0。
public static ByteBuffer allocate(int capacity) { if (capacity < 0) throw new IllegalArgumentException(); return new HeapByteBuffer(capacity, capacity); }
HeapByteBuffer(int cap, int lim) { super(-1, 0, lim, cap, new byte[cap], 0);
|
在 NIO 中,所有的缓冲区类型都继承与抽象类 Buffer,最常用的就是 ByteBuffer,对于 Java 中的基本类型,基本都有一个具体 Buffer 类型与之相对应。
缓存区的分配:可以通过调用静态方法 allocate()
来指定缓冲区的容量,其实调用 allocate 方法相当于创建了一个指定大小的数组,并把它包装为缓冲区对象。我们也可以自己创建一个数组通过调用静态方法 wrap()
来将其包装为缓冲区对象。
缓冲区分片:根据现有的缓冲区对象创建一个子缓冲区,即在现有缓冲区上切出一片作为一个新的缓冲区,但现有的缓冲区与创建的子缓冲区在底层数面上是数据共享的(子缓冲区相当于现有缓冲区的一个视图窗口)。可以通过调用缓冲区对象的 slice()
创建。
只读缓冲区:通过调用缓冲区对象的 asReadOnlyBuffer()
方法,将任何常规缓冲区转换为只读缓冲区,这个方法返回一个与原缓冲区完全相同的缓冲区,并与原缓冲区共享数据,只不过它是只读的。如果原缓冲区的内容发生了变化,只读缓冲区的内容也随之发生变化。注意:尝试修改只读缓冲区的内容,则会报 ReadOnlyBufferException 异常;只可以 常规–> 只读 不可以 只读 –> 可写
直接缓冲区:直接缓冲区是为了加快 I/O 速度,使用一种特殊方式为其分配内存的缓冲区。该缓冲区会在每一次调用底层操作系统的本机 I/O 操作之前(或之后),尝试避免将缓冲区内容拷贝到一个中间缓冲区拷贝数据。通过调用静态方法 allocateDirect()
方法
内存映射:比常规的基于流或者基于通道的 I/O 快得多。 内存映射文件 I/O 通过使文件的数据表现为内存数组的内容来完成。一般来说,只有文件中实际读取或写入的部分才会映射到内存中。
Buffer 数据类型
从类图中可以看到,7中数据类型对应着 7 中子类,这些名字是 Heap 开头子类,数据是存放在 JVM 堆中的。
MappedByteBuffer
与 HeapByteBuffer 数据存放在 JVM 堆中的不同,MappedByteBuffer 是将数据存放在堆以外的直接内存的,可以映射到文件。
通过 java.nio 包和 MappedByteBuffer 允许 Java 程序直接从内存中读取文件内容,通过整个或部分文件映射到内存,由操作系统来处理加载请求和写文件,应用只需要和内存打交道,这使得 IO 很快。
Mmap 内存映射和普通标准 IO 操作的本质区别在于它并不需要将文件中的数据先拷贝至 OS 的内核 IO 缓冲区,而是可以直接将用户进程私有地址空间一块区域与文件对象建立映射关系,这样程序就好像可以直接从内存中完成对文件 读/写 操作一样。
采用Mmap的方式其读/写的效率和性能都非常高,大家熟知的 RocketMQ 就使用了该技术。
选择器(Selector)
NIO 中非阻塞 I/O 采用了基于 Reactor 模式的工作方式, I/O 调用不会被阻塞,而是注册感兴趣的特定 I/O 事件,如可读数据到达、新的套接字连接等,在发生特定事件时,系统再通知我们。NIO 中实现非阻塞 I/O 的核心对象是 Selector,Selector 是注册各种 I/O 事件的地方,而且当那些事情发生时,就是 Selector 告诉我们所发生的事件。
Selector 会不断地轮询注册在上面所有 Channel,如果某个 channel 为读写等事件做好准备,那么就处于就绪状态,通过 Selector 可以不断轮询发现出就绪的 channel,进行后续的 IO 操作。
一个Selector能够同时轮询多个channel,这样,一个单独的线程就可以管理多个channel,从而管理多个网络连接,这样就不用为每一个连接都创建一个线程,同时也避免了多线程之间上下文切换导致的开销。(较与 BIO 的优点)。
通道(Channel)
通道是一个对象,通过它可以读取和写入数据,当然所有数据都通过 Buffer 对象来处理。我们永远不会将字节直接写入通道,而是将数据写入包含一个或者多个字节的缓冲区。同样也不会直接从通道中读取字节,而是通过数据从通道读入缓冲区,再从缓冲区获取这个字节。
反应堆
阻塞 I/O 的通信模型如下图所示。
每个客户端连接成功后,服务端都会启动一个线程区处理该客户端请求。
阻塞 I/O 通信模型缺点
- 当客户端多时,会创建大量的处理线程。且每个线程都要占用栈空间和一些 CPU 时间。
- 阻塞可能带来频繁的上下文切换,且大部分上下文切换可能是无意义的。
在这种情况下非阻塞 I/O 就有了它的应用前景。
Java NIO 工作原理。
- 有一个专门的线程来处理所有 I/O 事件,并负责分发。
- 事件驱动机制:事件到的时候出发,而不是同步地去监视事件。
- 线程通信:线程之间通过 wait、notify 等方式通信。保证每次上下文切换都是有意义的,减少无谓的线程切换。
Java NIO 反应堆工作原理图。
(注:每个线程的处理流程大概都是读取数据、解码、计算处理、编码和发送响应。)
NIO 理解与使用
Buffer 的常用方法
NIO提供一系列方法来操作Buffer的位置(position)和上限(limit),以及向缓冲区读写数据。
put() get() flip() rewind() remaining hasRemaining() mark() reset() duplicate()
|
创建缓冲区
ByteBuffer byteBuffer1 = ByteBuffer.allocate(10);
ByteBuffer byteBuffer2 = ByteBuffer.wrap("abcdef".getBytes());
|
–
获取/设置缓冲区参数
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
System.out.println("位置:"+byteBuffer.position()); System.out.println("上限:"+byteBuffer.limit()); System.out.println("容量:"+byteBuffer.capacity());
|
添加数据到缓冲区
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
byteBuffer.put("abcde".getBytes()); System.out.println("position位置:"+byteBuffer.position()); System.out.println("limit上限:"+byteBuffer.limit()); System.out.println("capacity容量:"+byteBuffer.capacity());
|
rewind 重置缓冲区
rewind 函数将 position 置为 0 位置,并清除标记。
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
byteBuffer.put("abcde".getBytes());
System.out.println("position位置:"+byteBuffer.position()); System.out.println("limit上限:"+byteBuffer.limit()); System.out.println("capacity容量:"+byteBuffer.capacity());
System.out.println("---------------------------------------");
byteBuffer.rewind();
System.out.println("position位置:"+byteBuffer.position()); System.out.println("limit上限:"+byteBuffer.limit()); System.out.println("capacity容量:"+byteBuffer.capacity());
|
flip()
重置缓冲区
flip 函数将 limit 设置为 position 位置,再将 position 置为 0 位置,并清除 mar 标记。
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
byteBuffer.put("abcde".getBytes());
System.out.println("position位置:"+byteBuffer.position()); System.out.println("limit上限:"+byteBuffer.limit()); System.out.println("capacity容量:"+byteBuffer.capacity());
System.out.println("---------------------------------------");
byteBuffer.rewind();
System.out.println("position位置:"+byteBuffer.position()); System.out.println("limit上限:"+byteBuffer.limit()); System.out.println("capacity容量:"+byteBuffer.capacity());
|
clear()
清空缓冲区
clear()
方法也将 position 置为0,同时将 limit 置为 capacity 的大小,并清除 mark 标记。
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
byteBuffer.limit(5);
byteBuffer.put("abcde".getBytes());
System.out.println("position位置:"+byteBuffer.position()); System.out.println("limit上限:"+byteBuffer.limit()); System.out.println("capacity容量:"+byteBuffer.capacity());
System.out.println("---------------------------------------");
byteBuffer.clear();
System.out.println("position位置:"+byteBuffer.position()); System.out.println("limit上限:"+byteBuffer.limit()); System.out.println("capacity容量:"+byteBuffer.capacity());
|
标记和恢复
ByteBuffer byteBuffer = ByteBuffer.allocate(10);
byteBuffer.put("abcde".getBytes());
byteBuffer.mark(); System.out.println("标记位置:"+byteBuffer.position());
byteBuffer.put("fijkl".getBytes()); System.out.println("标记位置:"+byteBuffer.position());
byteBuffer.reset(); System.out.println("恢复标记位置:"+byteBuffer.position());
|
FileChannel 通道
本地文件 IO 通道,用于读取、写入、映射和操作文件的通道。
FileChannel fisChannel = new FileInputStream("day05/src/a.txt").getChannel();
FileChannel fosChannel = new FileOutputStream("day05/src/b.txt").getChannel();
ByteBuffer buffer = ByteBuffer.allocate(2); while (fisChannel.read(buffer)!=-1){ System.out.println("position:"+buffer.position()); System.out.println("limit:"+buffer.limit()); buffer.flip(); fosChannel.write(buffer); buffer.clear(); }
fisChannel.close(); fosChannel.close();
|
SocketChannel 通道
使用 SocketChannel 通道上传文件到服务器。
public class SocketChannelDemo {
public static void main(String[] args) throws IOException { SocketChannel socketChannel = SocketChannel.open(new InetSocketAddress("localhost", 8080));
ByteBuffer byteBuffer = ByteBuffer.allocate(1024); FileChannel fisChannel = new FileInputStream("day05/src/a.txt").getChannel();
while (fisChannel.read(byteBuffer)!=-1){ byteBuffer.flip(); socketChannel.write(byteBuffer); byteBuffer.clear(); }
fisChannel.close();
byteBuffer.clear(); int read = socketChannel.read(byteBuffer); System.out.println(new String(byteBuffer.array(),0,read));
socketChannel.close(); }
}
|
ServerSocketChannel 通道
使用 ServerSocketChannel 通道接收文件并保存在服务器。
public class ServerSocketChannelDemo { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
serverSocketChannel.bind(new InetSocketAddress(8080));
serverSocketChannel.configureBlocking(false); System.out.println("服务器已开启");
while (true){ SocketChannel socketChannel = serverSocketChannel.accept(); UUID uuid = UUID.randomUUID(); FileChannel fosChannel=new FileOutputStream("day05/src/"+uuid+".txt").getChannel(); ByteBuffer buffer = ByteBuffer.allocate(1024);
while (socketChannel.read(buffer)!=-1){ buffer.flip(); fosChannel.write(buffer); buffer.clear(); } fosChannel.close();
ByteBuffer resultBuffer = ByteBuffer.wrap("上传文件成功".getBytes()); socketChannel.write(resultBuffer);
socketChannel.close(); } } }
|
NIO Selector 的服务器
public class Server { public static void main(String[] args) throws IOException { ServerSocketChannel serverSocketChannel = ServerSocketChannel.open(); serverSocketChannel.socket().bind(new InetSocketAddress("localhost", 8080)); serverSocketChannel.configureBlocking(false);
Selector selector = Selector.open(); serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
while (true) { int readyNum = selector.select(); if (readyNum == 0) { continue; } Set<SelectionKey> selectionKeys = selector.selectedKeys(); Iterator<SelectionKey> it = selectionKeys.iterator(); while (it.hasNext()) { SelectionKey key = it.next(); if (key.isAcceptable()) { } else if (key.isReadable()){ } else if (key.isWritable()) { } it.remove(); } } } }
|
上面的代码可以当作是一个模板。