Java IO

Java nio and nio2

Posted by Sun Jianjiao on September 1, 2015

1 Java IO 方式

Java 1.4 提供了新的抽象,也就是Channel和Selector类,提供了使用IO服务的通用API。这些新的类提供的强大的新的框架,可以利用了当今主流操作系统的提供的高效IO特性。

JavaIO 方式有很多种,基于不同的IO抽象模型和交互方式,可以简单区分。

1.1 传统的java.io包

基于流模型实现,提供了我们最熟知的一些IO功能,比如File抽象,输入输出流等。交互方式是同步、阻塞的方式。java.io包的好处是代码比较简单、直观,缺点是IO效率和扩展性存在局限性,容易成为应用性能的瓶颈。

  • IO不仅是对文件的操作,网络编程中,比如Socket通信都是典型的IO操作目标。
  • 输入流,输出流(InputStream/OutputStream)是用于读取或写入字节的,例如操作图片文件。
  • Reader/Writer则是用于操作字符的,增加了字符编解码等功能,适用于类似从文件中读取或写入文件信息。理论上计算机操作的都是字节,不管网络通信还是文件读取。Reader/Writer相当于构建了应用逻辑和原始数据之间的桥梁。
  • BufferedOutputStream等待缓冲区的实现,可以避免频繁的磁盘读写,进而提高IO处理效率。这种设计利用了缓冲区,将批量数据进行一次操作,但是使用中千万别忘了flush。
  • 很多IO工具类都实现了Closeable接口,用于进行资源释放,比如打开FileInputStream会获取相应的文件描述符(FileDescriptor), 需要利用try-with-resources或者try-finally等机制保证资源释放。

1.2 Java NIO

Java 1.4中隐入了NIO框架(java.nio包),提供了Channel, Selector,Buffer等新的抽象,可以构建多路复用、同步非阻塞的IO程序,同时提供了接近操作系统底层的高性能数据操作方式。

  • Buffer,高效的数据容器,除了布尔类型,所有的原始数据类型都有相应的Buffer实现。
  • Channel,类型在Linux操作系统上看到的文件描述符,是NIO中被用来支持批量IO操作的一种抽象。 File 或者Socket通常认为是比较高层次的抽象,Channel是更加接近操作系统层的一种抽象,这也使得NIO得以充分利用现在操作系统的底层机制
  • Selector, NIO多路复用的基础,它提供了一种高效的机制,可以检测注册在Selector上的多个channel中,是否有Channel处于就绪状态,进而实现了单线程对多线程Channel的高效管理。Selector依赖底层操作系统,Linux上依赖epoll。
  • ChartSet,提供Unicode字符串定义,NIO也提供了相应的编解码器。

1.3 Java NIO2(AIO)

  • Java 7中,NIO有了进一步的改进,也就是NIO2,隐入了异步非阻塞IO方式,也有人叫它AIO(Asynchronous IO)。异步IO操作基于事件和回调机制,可以简单理解为,应用操作直接返回,而不会阻塞在那里,当后台处理完成,操作系统会通知相应线程进行后续工作。

2 BIO

BIO即Blocking IO, 采用阻塞的方式实现。也就是一个Socket套接字需要使用一个线程来进行处理。发生建立连接,读数据,写数据的操作时,都可能会阻塞。这个模式的好处是简单,但是带来的主要问题是一个线程只能处理一个socket, 如果是Server端,支持并发的连接时,就需要更多的线程来完成这个工作。一般情况下,Server端使用线程池减轻线程创建和销毁的开销。

BIO

如果连接数急剧上升,线程上下文的切换开销会编的很明显,只是同步阻塞方式的地扩展性劣势。

Client端的代码

package BlockIO.BlockIOClient;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

class BlockIOClient {
    private static Integer PORT = 18000;
    private static String IP_ADDRESS = "127.0.0.1";

    public void clientRequest() {
        Socket socket = null;
        BufferedReader reader = null;
        PrintWriter writer = null;
        try {
            socket = new Socket(IP_ADDRESS, PORT); // 双方通过输入和输出流进行同步阻塞式通信
            reader = new BufferedReader(new InputStreamReader(socket.getInputStream())); // 获取返回内容
            writer = new PrintWriter(socket.getOutputStream(), true);

            for (int i = 0; i < 10; i++) {
                writer.println(i);
                System.out.println(" 客户端打印返回数据 : " + reader.readLine());
            }

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                if (null != reader) {
                    reader.close();
                }

                if (null != writer) {
                    writer.close();
                }

                if (null != socket) {
                    socket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

public class Main {
    public static void main(String[] args) {
        BlockIOClient blockIOClient = new BlockIOClient();
        blockIOClient.clientRequest();
    }
}

Server端的代码:

package BlockIO;

import java.io.*;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

class BlockIO {
    private final Integer PORT = 18000;

    private void run(Socket socket) {
        InputStreamReader inputStreamReader = null;
        PrintWriter outputWriter = null;
        try {
            inputStreamReader = new InputStreamReader(socket.getInputStream());
            BufferedReader bufferedReader = new BufferedReader(inputStreamReader);
            outputWriter = new PrintWriter(socket.getOutputStream(), true);
            String line;
            while ((line = bufferedReader.readLine()) != null) {
                System.out.println("Receive: " + line);
                outputWriter.println(Byte.valueOf(line));
            }
        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            try {
                if (null != inputStreamReader) {
                    inputStreamReader.close();
                }

                if (null != outputWriter) {
                    outputWriter.close();
                }

                if (null != socket) {
                    socket.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }


    public void server() {
        ThreadPoolExecutor executor = new ThreadPoolExecutor(5, 10, 200, TimeUnit.MILLISECONDS,
                new ArrayBlockingQueue<>(5));

        ServerSocket echoServer = null;
        Socket clientSocket;
        try {
            echoServer = new ServerSocket(PORT);
        } catch (IOException e) {
            System.out.println(e);
        }
        while (true) {
            try {
                clientSocket = echoServer.accept();
                System.out.println(clientSocket.getRemoteSocketAddress() + " connect!");

                Socket finalClientSocket = clientSocket;
                executor.execute(() -> {
                    run(finalClientSocket);
                });
            } catch (IOException e) {
                System.out.println(e);
            }
        }
    }
}

public class Main {

    public static void main(String[] args) {
        new BlockIO().server();
    }
}

3 NIO

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
package NIO;

import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;
import java.util.Iterator;
import java.util.Set;

public class NIOServer extends Thread {
    final static int PORT = 18888;

    public void run() {
        try (Selector selector = Selector.open();
             ServerSocketChannel serverSocket = ServerSocketChannel.open()) {// 创建 Selector 和 Channel
            serverSocket.bind(new InetSocketAddress(InetAddress.getLocalHost(), PORT));
            serverSocket.configureBlocking(false);
            // 注册到 Selector,并说明关注点
            serverSocket.register(selector, SelectionKey.OP_ACCEPT);
            while (true) {
                selector.select();// 阻塞等待就绪的 Channel,这是关键点之一
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> iter = selectedKeys.iterator();
                while (iter.hasNext()) {
                    SelectionKey key = iter.next();
                    // 生产系统中一般会额外进行就绪状态检查
                    sayHelloWorld((ServerSocketChannel) key.channel());
                    iter.remove();
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
    private void sayHelloWorld(ServerSocketChannel server) throws IOException {
        try (SocketChannel client = server.accept()) {
            client.write(Charset.defaultCharset().encode("Hello world!"));
        }
    }

    public static void main(String[] args) throws IOException {
        NIOServer server = new NIOServer();
        server.start();
        try (Socket client = new Socket(InetAddress.getLocalHost(), PORT)) {
            BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(client.getInputStream()));
            bufferedReader.lines().forEach(s -> System.out.println(s));
        }
    }
}

NIO利用了单线程轮询事件的机制,通过高效的定位就绪的Channel, 来决定做什么,仅仅select阶段是阻塞的,可以有效避免大量客户端链接式,频繁线程切换带来的问题。