高级Java工程师必备 —– 深入分析 Java IO (二)NIO – Python量化投资

高级Java工程师必备 —– 深入分析 Java IO (二)NIO

接着上一篇文章 高级Java工程师必备 —– 深入分析 Java IO (一)BIO,我们来讲讲NIO

多路复用IO模型

场景描述

一个餐厅同时有100位客人到店,当然到店后第一件要做的事情就是点菜。但是问题来了,餐厅老板为了节约人力成本目前只有一位大堂服务员拿着唯一的一本菜单等待客人进行服务。

方法A: 无论有多少客人等待点餐,服务员都把仅有的一份菜单递给其中一位客人,然后站在客人身旁等待这个客人完成点菜过程。在记录客人点菜内容后,把点菜记录交给后堂厨师。然后是第二位客人。。。。然后是第三位客人。很明显,只有脑袋被门夹过的老板,才会这样设置服务流程。因为随后的80位客人,再等待超时后就会离店(还会给差评)。

方法B: 老板马上新雇佣99名服务员,同时印制99本新的菜单。每一名服务员手持一本菜单负责一位客人(关键不只在于服务员,还在于菜单。因为没有菜单客人也无法点菜)。在客人点完菜后,记录点菜内容交给后堂厨师(当然为了更高效,后堂厨师最好也有100名)。这样每一位客人享受的就是VIP服务咯,当然客人不会走,但是人力成本可是一个大头哦(亏死你)。

方法C: 就是改进点菜的方式,当客人到店后,自己申请一本菜单。想好自己要点的才后,就呼叫服务员。服务员站在自己身边后记录客人的菜单内容。将菜单递给厨师的过程也要进行改进,并不是每一份菜单记录好以后,都要交给后堂厨师。服务员可以记录号多份菜单后,同时交给厨师就行了。那么这种方式,对于老板来说人力成本是最低的;对于客人来说,虽然不再享受VIP服务并且要进行一定的等待,但是这些都是可接受的;对于服务员来说,基本上她的时间都没有浪费,基本上被老板压杆了最后一滴油水。

到店情况:并发量。到店情况不理想时,一个服务员一本菜单,当然是足够了。所以不同的老板在不同的场合下,将会灵活选择服务员和菜单的配置。
客人:客户端请求
点餐内容:客户端发送的实际数据
服务员:操作系统内核用于IO操作的线程(内核线程)
厨师:应用程序线程(当然厨房就是应用程序进程咯)
餐单传递方式:包括了阻塞式和非阻塞式两种。

  • 方法A:阻塞式/非阻塞式 同步IO
  • 方法B:使用线程进行处理的 阻塞式/非阻塞式 同步IO
  • 方法C:阻塞式/非阻塞式 多路复用IO



















多路复用IO技术最适用的是“高并发”场景,所谓高并发是指1毫秒内至少同时有上千个连接请求准备好。其他情况下多路复用IO技术发挥不出来它的优势。另一方面,使用JAVA NIO进行功能实现,相对于传统的Socket套接字实现要复杂一些,所以实际应用中,需要根据自己的业务需求进行技术选择。

NIO

概念

JDK 1.4中的java.nio.*包中引入新的Java I/O库,其目的是提高速度。实际上,“旧”的I/O包已经使用NIO重新实现过,即使我们不显式的使用NIO编程,也能从中受益。速度的提高在文件I/O和网络I/O中都可能会发生,但本文只讨论后者。

NIO我们一般认为是New I/O(也是官方的叫法),因为它是相对于老的I/O类库新增的(其实在JDK 1.4中就已经被引入了,但这个名词还会继续用很久,即使它们在现在看来已经是“旧”的了,所以也提示我们在命名时,需要好好考虑),做了很大的改变。但民间跟多人称之为Non-block I/O,即非阻塞I/O,因为这样叫,更能体现它的特点。而下文中的NIO,不是指整个新的I/O库,而是非阻塞I/O。

面向流与面向缓冲

Java IO和NIO之间第一个最大的区别是,IO是面向流的,NIO是面向缓冲区的。 Java IO面向流意味着每次从流中读一个或多个字节,直至读取所有字节,它们没有被缓存在任何地方。此外,它不能前后移动流中的数据。如果需要前后移动从流中读取的数据,需要先将它缓存到一个缓冲区。

面向块的 NIO一次处理一个数据块,按块处理数据比按流处理数据要快得多。数据读取到一个它稍后处理的缓冲区,需要时可在缓冲区中前后移动。这就增加了处理过程中的灵活性。但是,还需要检查是否该缓冲区中包含所有您需要处理的数据。而且,需确保当更多的数据读入缓冲区时,不要覆盖缓冲区里尚未处理的数据。

阻塞与非阻塞IO

Java IO的各种流是阻塞的。这意味着,当一个线程调用read() 或 write()时,该线程被阻塞,直到有一些数据被读取,或数据完全写入。该线程在此期间不能再干任何事情了。Java NIO的非阻塞模式,使一个线程从某通道发送请求读取数据,但是它仅能得到目前可用的数据,如果目前没有数据可用时,就什么都不会获取,而不是保持线程阻塞,所以直至数据变的可以读取之前,该线程可以继续做其他的事情。 非阻塞写也是如此。一个线程请求写入一些数据到某通道,但不需要等待它完全写入,这个线程同时可以去做别的事情。 线程通常将非阻塞IO的空闲时间用于在其它通道上执行IO操作,所以一个单独的线程现在可以管理多个输入和输出通道(channel)

通道

通道 Channel 是对原 I/O 包中的流的模拟,可以通过它读取和写入数据。

通道与流的不同之处在于,流只能在一个方向上移动(一个流必须是 InputStream 或者 OutputStream 的子类),而通道是双向的,可以用于读、写或者同时用于读写。

通道包括以下类型:

  • FileChannel:从文件中读写数据;
  • DatagramChannel:通过 UDP 读写网络中数据;
  • SocketChannel:通过 TCP 读写网络中数据;
  • ServerSocketChannel:可以监听新进来的 TCP 连接,对每一个新进来的连接都会创建一个 SocketChannel。


















缓冲区

发送给一个通道的所有数据都必须首先放到缓冲区中,同样地,从通道中读取的任何数据都要先读到缓冲区中。也就是说,不会直接对通道进行读写数据,而是要先经过缓冲区。

缓冲区实质上是一个数组,但它不仅仅是一个数组。缓冲区提供了对数据的结构化访问,而且还可以跟踪系统的读/写进程。

Buffer有两种工作模式:写模式和读模式。在读模式下,应用程序只能从Buffer中读取数据,不能进行写操作。但是在写模式下,应用程序是可以进行读操作的,这就表示可能会出现脏读的情况。所以一旦您决定要从Buffer中读取数据,一定要将Buffer的状态改为读模式。

注意:ServerSocketChannel通道它只支持对OP_ACCEPT事件的监听,所以它是不能直接进行网络数据内容的读写的。所以ServerSocketChannel是没有集成Buffer的。

缓冲区包括以下类型:

  • ByteBuffer
  • CharBuffer
  • ShortBuffer
  • IntBuffer
  • LongBuffer
  • FloatBuffer
  • DoubleBuffer


















 

 

可以用三个值指定缓冲区在任意时刻的状态:

  • position
  • limit
  • capacity





Position

您可以回想一下,缓冲区实际上就是美化了的数组。在从通道读取时,您将所读取的数据放到底层的数组中。 position 变量跟踪已经写了多少数据。更准确地说,它指定了下一个字节将放到数组的哪一个元素中。因此,如果您从通道中读三个字节到缓冲区中,那么缓冲区的 position 将会设置为3,指向数组中第四个元素。

同样,在写入通道时,您是从缓冲区中获取数据。 position 值跟踪从缓冲区中获取了多少数据。更准确地说,它指定下一个字节来自数组的哪一个元素。因此如果从缓冲区写了5个字节到通道中,那么缓冲区的 position 将被设置为5,指向数组的第六个元素。

Limit

limit 变量表明还有多少数据需要取出(在从缓冲区写入通道时),或者还有多少空间可以放入数据(在从通道读入缓冲区时)。

position 总是小于或者等于 limit

Capacity

缓冲区的 capacity 表明可以储存在缓冲区中的最大数据容量。实际上,它指定了底层数组的大小 ― 或者至少是指定了准许我们使用的底层数组的容量。

limit 决不能大于 capacity

 

在实际操作数据时它们有如下关系图:

① 新建一个大小为 8 个字节的缓冲区,此时 position 为 0,而 limit = capacity = 8。capacity 变量不会改变,下面的讨论会忽略它。

② 从输入通道中读取 5 个字节数据写入缓冲区中,此时 position 为 5,limit 保持不变。

③ 在将缓冲区的数据写到输出通道之前,需要先调用 flip() 方法,这个方法将 limit 设置为当前 position,并将 position 设置为 0。

④ 从缓冲区中取 4 个字节到输出缓冲中,此时 position 设为 4。

⑤ 最后需要调用 clear() 方法来清空缓冲区,此时 position 和 limit 都被设置为最初位置。

 

文件复制 NIO 实例

以下展示了使用 NIO 快速复制文件的实例:


public static void fastCopy(String src, String dist) throws IOException {

    /* 获得源文件的输入字节流 */
    FileInputStream fin = new FileInputStream(src);

    /* 获取输入字节流的文件通道 */
    FileChannel fcin = fin.getChannel();

    /* 获取目标文件的输出字节流 */
    FileOutputStream fout = new FileOutputStream(dist);

    /* 获取输出字节流的文件通道 */
    FileChannel fcout = fout.getChannel();

    /* 为缓冲区分配 1024 个字节 */
    ByteBuffer buffer = ByteBuffer.allocateDirect(1024);

    while (true) {

        /* 从输入通道中读取数据到缓冲区中 */
        int r = fcin.read(buffer);

        /* read() 返回 -1 表示 EOF */
        if (r == -1) {
            break;
        }

        /* 切换读写 */
        buffer.flip();

        /* 把缓冲区的内容写入输出文件中 */
        fcout.write(buffer);

        /* 清空缓冲区 */
        buffer.clear();
    }
}


选择器

NIO 常常被叫做非阻塞 IO,主要是因为 NIO 在网络通信中的非阻塞特性被广泛使用。

NIO 实现了 IO 多路复用中的 Reactor 模型,一个线程 Thread 使用一个选择器 Selector 通过轮询的方式去监听多个通道 Channel 上的事件,从而让一个线程就可以处理多个事件。

通过配置监听的通道 Channel 为非阻塞,那么当 Channel 上的 IO 事件还未到达时,就不会进入阻塞状态一直等待,而是继续轮询其它 Channel,找到 IO 事件已经到达的 Channel 执行。

例如,当多个客户端通过通道向服务端传输数据时,是通过 ByteBuffer 来传输,一个文件通过多次,从输入通道中读取 N 个字节数据写入ByteBuffer,然后再将将缓冲区的数据写到输出通道,这个过程可以看成是不连续的,因为只有当缓冲区写满后,通过 buffer.flip() 切换成读模式后,才开始向输出通道写入,所以当ByteBuffer还在写入状态时,服务器是不会等待这个通道的ByteBuffer写满,而是去处理其他客户端Channel 为可读的状态,当然这个处理业务的工作可以开启多线程来处理。

因为创建和切换线程的开销很大,因此使用一个线程来处理多个事件而不是一个线程处理一个事件,对于 IO 密集型的应用具有很好地性能。

应该注意的是,只有套接字 Channel 才能配置为非阻塞,而 FileChannel 不能,为 FileChannel 配置非阻塞也没有意义。

套接字 NIO 实例


package com.chenhao.nio;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Date;
import java.util.Iterator;
import java.util.Scanner;

import org.junit.Test;

/*
 * 一、使用 NIO 完成网络通信的三个核心:
 * 
 * 1. 通道(Channel):负责连接
 *         
 *        java.nio.channels.Channel 接口:
 *             |--SelectableChannel
 *                 |--SocketChannel
 *                 |--ServerSocketChannel
 *                 |--DatagramChannel
 * 
 * 2. 缓冲区(Buffer):负责数据的存取
 * 
 * 3. 选择器(Selector):是 SelectableChannel 的多路复用器。用于监控 SelectableChannel 的 IO 状况
 * 
 */
public class TestNonBlockingNIO {
    
    //客户端
    @Test
    public void client() throws IOException{
        //1. 获取通道
        SocketChannel sChannel = SocketChannel.open(new InetSocketAddress("127.0.0.1", 9898));
        
        //2. 切换非阻塞模式
        sChannel.configureBlocking(false);
        
        //3. 分配指定大小的缓冲区
        ByteBuffer buf = ByteBuffer.allocate(1024);
        
        //4. 发送数据给服务端
        Scanner scan = new Scanner(System.in);
        
        while(scan.hasNext()){
            String str = scan.next();
            buf.put((new Date().toString() + "\n" + str).getBytes());
            buf.flip();
            sChannel.write(buf);
            buf.clear();
        }
        
        //5. 关闭通道
        sChannel.close();
    }

    //服务端
    @Test
    public void server() throws IOException{
        //1. 获取通道
        ServerSocketChannel ssChannel = ServerSocketChannel.open();
        
        //2. 切换非阻塞模式
        ssChannel.configureBlocking(false);
        
        //3. 绑定连接
        ssChannel.bind(new InetSocketAddress(9898));
        
        //4. 获取选择器
        Selector selector = Selector.open();
        
        //5. 将通道注册到选择器上, 并且指定“监听接收事件”
        ssChannel.register(selector, SelectionKey.OP_ACCEPT);
        
        //6. 轮询式的获取选择器上已经“准备就绪”的事件
        //使用 select() 来监听到达的事件,它会一直阻塞直到有至少一个事件到达。
        while(selector.select() > 0){
            
            //7. 获取当前选择器中所有注册的“选择键(已就绪的监听事件)”
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            
            while(it.hasNext()){
                //8. 获取准备“就绪”的是事件
                SelectionKey sk = it.next();
                
                //9. 判断具体是什么事件准备就绪
                if(sk.isAcceptable()){
                    //10. 若“接收就绪”,获取客户端连接
                    SocketChannel sChannel = ssChannel.accept();
                    
                    //11. 切换非阻塞模式
                    sChannel.configureBlocking(false);
                    
                    //12. 将该通道注册到选择器上
                    sChannel.register(selector, SelectionKey.OP_READ);
                }else if(sk.isReadable()){
                    //13. 获取当前选择器上“读就绪”状态的通道
                    SocketChannel sChannel = (SocketChannel) sk.channel();
                    
                    //14. 读取数据
                    ByteBuffer buf = ByteBuffer.allocate(1024);
                    
                    int len = 0;
                    while((len = sChannel.read(buf)) > 0 ){
                        buf.flip();
                        System.out.println(new String(buf.array(), 0, len));
                        buf.clear();
                    }
                }
                
                //15. 取消选择键 SelectionKey
                //每一个“事件关键字”被处理后都必须移除,否则下一次轮询时,这个事件会被重复处理
                it.remove();
            }
        }
    }
}


NIO传输文件

服务器端代码


public class Server {
    private ByteBuffer buffer = ByteBuffer.allocate(1024*1024);
        //使用Map保存每个连接,当OP_READ就绪时,根据key找到对应的文件对其进行写入。若将其封装成一个类,作为值保存,可以再上传过程中显示进度等等
    Map<SelectionKey, FileChannel> fileMap = new HashMap<SelectionKey, FileChannel>();
    public static void main(String[] args) throws IOException{
        Server server = new Server();
        server.startServer();
    }
    public void startServer() throws IOException{
        Selector selector = Selector.open();
        ServerSocketChannel serverChannel = ServerSocketChannel.open();
        serverChannel.configureBlocking(false);
        serverChannel.bind(new InetSocketAddress(8888));
        serverChannel.register(selector, SelectionKey.OP_ACCEPT);
        System.out.println("服务器已开启...");
        while (true) {
            int num = selector.select();
            if (num == 0) continue;
            Iterator<SelectionKey> it = selector.selectedKeys().iterator();
            while (it.hasNext()) {
                SelectionKey key = it.next();
                if (key.isAcceptable()) {
                    ServerSocketChannel serverChannel1 = (ServerSocketChannel) key.channel();
                    SocketChannel socketChannel = serverChannel1.accept();
                    if (socketChannel == null) continue;
                    socketChannel.configureBlocking(false);
                    SelectionKey key1 = socketChannel.register(selector, SelectionKey.OP_READ);
                    InetSocketAddress remoteAddress = (InetSocketAddress)socketChannel.getRemoteAddress();
                    File file = new File(remoteAddress.getHostName() + "_" + remoteAddress.getPort() + ".txt");
                    FileChannel fileChannel = new FileOutputStream(file).getChannel();
                    fileMap.put(key1, fileChannel);
                    System.out.println(socketChannel.getRemoteAddress() + "连接成功...");
                    writeToClient(socketChannel);
                }
                else if (key.isReadable()){
                    readData(key);
                }
                // NIO的特点只会累加,已选择的键的集合不会删除,ready集合会被清空
                // 只是临时删除已选择键集合,当该键代表的通道上再次有感兴趣的集合准备好之后,又会被select函数选中
                it.remove();
            }
        }
    }
    private void writeToClient(SocketChannel socketChannel) throws IOException {
        buffer.clear();
        buffer.put((socketChannel.getRemoteAddress() + "连接成功").getBytes());
        buffer.flip();
        socketChannel.write(buffer);
        buffer.clear();
    }
    private void readData(SelectionKey key) throws IOException  {
        FileChannel fileChannel = fileMap.get(key);
        buffer.clear();
        SocketChannel socketChannel = (SocketChannel) key.channel();
        int num = 0;
        try {
            while ((num = socketChannel.read(buffer)) > 0) {
                buffer.flip();
                // 写入文件
                fileChannel.write(buffer);
                buffer.clear();
                }
        } catch (IOException e) {
            key.cancel();
            e.printStackTrace();
            return;
        }
        // 调用close为-1 到达末尾
        if (num == -1) {
            fileChannel.close();
            System.out.println("上传完毕");
            buffer.put((socketChannel.getRemoteAddress() + "上传成功").getBytes());
            buffer.clear();
            socketChannel.write(buffer);
            key.cancel();
        }
    }
}


 

客户端模拟三个客户端同时向服务器发送文件


public class Client {
    public static void main(String[] args) {
        for (int i = 0; i < 3; i++) {
            // 模拟三个发端
            new Thread() {
                public void run() {
                    try {
                        SocketChannel socketChannel = SocketChannel.open();
                        socketChannel.socket().connect(new InetSocketAddress("127.0.0.1", 8888));
                        File file = new File("E:\\" + 11 + ".txt");
                        FileChannel fileChannel = new FileInputStream(file).getChannel();
                        ByteBuffer buffer = ByteBuffer.allocate(100);
                        socketChannel.read(buffer);
                        buffer.flip();
                        System.out.println(new String(buffer.array(), 0, buffer.limit(), Charset.forName("utf-8")));
                        buffer.clear();
                        int num = 0;
                        while ((num=fileChannel.read(buffer)) > 0) {
                            buffer.flip();                        
                            socketChannel.write(buffer);
                            buffer.clear();
                        }
                        if (num == -1) {
                            fileChannel.close();
                            socketChannel.shutdownOutput();
                        }
                        // 接受服务器
                        socketChannel.read(buffer);
                        buffer.flip();
                        System.out.println(new String(buffer.array(), 0, buffer.limit(), Charset.forName("utf-8")));
                        buffer.clear();
                        socketChannel.close();
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    
                };
            }.start();
            
        }
        Thread.yield();
    }
}


可见这里我们仅仅使用了一个线程就管理了三个连接,相比以前使用阻塞的Socket要在accept函数返回后开启线程来管理这个连接,而使用NIO我们在accept返回后,仅仅将其注册到选择器上,读操作在下次检测到有可读的键的集合时就会去处理。

NIO+线程池改进


public class ThreadPoolServer extends Server{
    private ExecutorService exec = Executors.newFixedThreadPool(10);
    public static void main(String[] args) throws IOException {
        ThreadPoolServer server = new ThreadPoolServer();
        server.startServer();
    }

    @Override
    protected void readData(final SelectionKey key) throws IOException {
        // 移除掉这个key的可读事件,已经在线程池里面处理,如果不改变当前Key的状态,这里交给另外一个线程去处理,主线程下一次遍历此KEY还是可读事件,会重复开启线程处理任务
        key.interestOps(key.interestOps() & (~SelectionKey.OP_READ));
        exec.execute(new Runnable() {
            @Override
            public void run() {
                ByteBuffer buffer = ByteBuffer.allocate(1024 * 1024);
                FileChannel fileChannel = fileMap.get(key);
                buffer.clear();
                SocketChannel socketChannel = (SocketChannel) key.channel();
                int num = 0;
                try {
                    while ((num = socketChannel.read(buffer)) > 0) {
                        buffer.flip();
                        // 写入文件
                        fileChannel.write(buffer);
                        buffer.clear();
                    }
                } catch (IOException e) {
                    key.cancel();
                    e.printStackTrace();
                    return;
                }
                // 调用close为-1 到达末尾
                if (num == -1) {
                    try {
                        fileChannel.close();
                        System.out.println("上传完毕");
                        buffer.put((socketChannel.getRemoteAddress() + "上传成功").getBytes());
                        buffer.clear();
                        socketChannel.write(buffer);
                    } catch (IOException e) {
                        e.printStackTrace();
                    }
                    // 只有调用cancel才会真正从已选择的键的集合里面移除,否则下次select的时候又能得到
                    // 一端close掉了,其对端仍然是可读的,读取得到EOF,返回-1
                    key.cancel(); 
                    return;
                }
                // Channel的read方法可能返回0,返回0并不一定代表读取完了。
                // 工作线程结束对通道的读取,需要再次更新键的ready集合,将感兴趣的集合重新放在里面
                key.interestOps(key.interestOps() | SelectionKey.OP_READ);
                // 调用wakeup,使得选择器上的第一个还没有返回的选择操作立即返回即重新select
                key.selector().wakeup();
            }
        });
    }
}


推荐博客

  程序员写代码之外,如何再赚一份工资?

多路复用IO的优缺点

  • 不用再使用多线程来进行IO处理了(包括操作系统内核IO管理模块和应用程序进程而言)。当然实际业务的处理中,应用程序进程还是可以引入线程池技术的
  • 同一个端口可以处理多种协议,例如,使用ServerSocketChannel的服务器端口监听,既可以处理TCP协议又可以处理UDP协议。
  • 操作系统级别的优化:多路复用IO技术可以是操作系统级别在一个端口上能够同时接受多个客户端的IO事件。同时具有之前我们讲到的阻塞式同步IO和非阻塞式同步IO的所有特点。Selector的一部分作用更相当于“轮询代理器”。
「点点赞赏,手留余香」

    还没有人赞赏,快来当第一个赞赏的人吧!