NIO、BIO、AIO网络通信

一、概念

1、同步和异步

同步:

用户线程发起I/O请求后需要等待或者轮询内核I/O操作完成后才能继续执行

异步:

用户线程发起I/O请求后仍需要继续执行,当内核I/O操作完成后会通知用户线程,或者调用用户线程注册的回调函数

2、阻塞和非阻塞

讨论的是参与通信双方的工作机制,是否需要互相等待对方的执行

阻塞:

在通信过程中,
一方在处理通信,
另一方要等待对方执行并返回信息不能去做其他无关的事

非阻塞:

在通信过程中,
一方在处理通信,
另一方可以不用等待执行并返回信息而可以去做其他无关的事 直到对方处理通信完成 再在适合的时候继续处理通信过程

二、BIO (同步阻塞)

代码示例

服务端业务代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

/**
* 客户端消息处理线程ServerHandler
*
* @author 蒋时华
* @date 2017/6/24
*/
public class ServerHandler implements Runnable{
private Socket socket;
public ServerHandler(Socket socket) {
this.socket = socket;
}
@Override
public void run() {
BufferedReader in = null;
PrintWriter out = null;
try{
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(),true);
String expression;
String result;
while(true){
//通过BufferedReader读取一行
//如果已经读到输入流尾部,返回null,退出循环
//如果得到非空值,就尝试计算结果并返回
if((expression = in.readLine())==null) {
break;
}
System.out.println("server阻塞测试");
System.out.println("服务器收到消息:" + expression);
try{
result = Calculator.cal(expression).toString();
}catch(Exception e){
result = "计算错误:" + e.getMessage();
}
out.println(result);
}
}catch(Exception e){
e.printStackTrace();
}finally{
//一些必要的清理工作
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}
if(out != null){
out.close();
out = null;
}
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}

final class Calculator {
private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
public static Object cal(String expression) throws ScriptException {
return jse.eval(expression);
}
}

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
import java.io.IOException;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ServerNormal {

//默认的端口号
private static int DEFAULT_PORT = 12344;
//单例的ServerSocket
private static ServerSocket server;

//线程池 懒汉式的单例
private static ExecutorService executorService = Executors.newFixedThreadPool(1);

//根据传入参数设置监听端口,如果没有参数调用以下方法并使用默认值
public static void start() throws IOException {
//使用默认值
start(DEFAULT_PORT);
}
//这个方法不会被大量并发访问,不太需要考虑效率,直接进行方法同步就行了
public synchronized static void start(int port) throws IOException{
if(server != null) return;
try{
//通过构造函数创建ServerSocket
//如果端口合法且空闲,服务端就监听成功
server = new ServerSocket(port);
System.out.println("服务器已启动,端口号:" + port);
//通过无线循环监听客户端连接
while(true){
//如果没有客户端接入,将阻塞在accept操作上。
Socket socket = server.accept();
//当有新的客户端接入时,会执行下面的代码
//然后创建一个新的线程处理这条Socket链路
// TODO: 2019/8/28 问题所在
// new Thread(new ServerHandler(socket)).start();
executorService.execute(new ServerHandler(socket));
}
}finally{
//一些必要的清理工作
if(server != null){
System.out.println("服务器已关闭。");
server.close();
server = null;
}
}
}


}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.Socket;

/**
* 同步阻塞式I/O创建的Client源码
*
* @author 蒋时华
* @date 2017/6/24
*/
public class Client {

//默认的端口号
private static int DEFAULT_SERVER_PORT = 12344;
private static String DEFAULT_SERVER_IP = "127.0.0.1";
public static void send(String expression){
send(DEFAULT_SERVER_PORT,expression);
}
public static void send(int port,String expression){
System.out.println("算术表达式为:" + expression);
Socket socket = null;
BufferedReader in = null;
PrintWriter out = null;
try{
socket = new Socket(DEFAULT_SERVER_IP,port);
in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
out = new PrintWriter(socket.getOutputStream(),true);
out.println(expression);
System.out.println("___结果为:" + in.readLine());
System.out.println("client阻塞测试");
}catch(Exception e){
e.printStackTrace();
}finally{
//一下必要的清理工作
if(in != null){
try {
in.close();
} catch (IOException e) {
e.printStackTrace();
}
in = null;
}
if(out != null){
out.close();
out = null;
}
if(socket != null){
try {
socket.close();
} catch (IOException e) {
e.printStackTrace();
}
socket = null;
}
}
}
}

测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import java.io.IOException;
import java.util.Random;
/**
* Test class
*
* @author 蒋时华
* @date 2017/6/24
*/
public class BIOTest {

//测试主方法
public static void main(String[] args) throws InterruptedException {
//运行服务器
new Thread(() -> {
try {
ServerNormal.start();
} catch (IOException e) {
e.printStackTrace();
}
}).start();

//避免客户端先于服务器启动前执行代码
Thread.sleep(100);
//运行客户端
char operators[] = {'+','-','*','/'};
Random random = new Random(System.currentTimeMillis());
new Thread(() -> {
while(true){
//随机产生算术表达式
String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);
Client.send(expression);
try {
Thread.sleep(random.nextInt(1000));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}).start();
}
}

问题

同步阻塞式I/O创建的Server

结构图

1、BIO主要的问题在于每当有一个新的客户端请求接入时,服务端必须创建一个新的线程来处理这条链路,在需要满足高性能、高并发的场景是没法应用的(大量创建新的线程会严重影响服务器性能,甚至罢工)

1、限制了线程数量,如果发生大量并发请求,超过最大数量的线程就只能等待,直到线程池中的有空闲的线程可以被复用。而对Socket的输入流进行读取时,会一直阻塞

所以在读取数据较慢时(比如数据量大、网络传输慢等),大量并发的情况下,其他接入的消息,只能一直等待,这就是最大的弊端。而NIO,就能解决这个难题

三、NIO

代码示例

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Server {
private static int DEFAULT_PORT = 12345;
private static ServerHandle serverHandle;
public static void start(){
start(DEFAULT_PORT);
}
public static synchronized void start(int port){
if(serverHandle!=null) {
serverHandle.stop();
}
serverHandle = new ServerHandle(port);
new Thread(serverHandle,"Server").start();
}
}

class ServerHandle implements Runnable{
private Selector selector;
private ServerSocketChannel serverChannel;
private volatile boolean started;
/**
* 构造方法
* @param port 指定要监听的端口号
*/
public ServerHandle(int port) {
try{
//创建选择器
selector = Selector.open();
//打开监听通道
serverChannel = ServerSocketChannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
serverChannel.configureBlocking(false);//开启非阻塞模式
//绑定端口 backlog设为1024
serverChannel.socket().bind(new InetSocketAddress(port),1024);
//监听客户端连接请求
// 如果你对不止一种事件感兴趣,使用或运算符即可,如下:
// int interestSet = SelectionKey.OP_READ | SelectionKey.OP_WRITE;
serverChannel.register(selector, SelectionKey.OP_ACCEPT);
//标记服务器已开启
started = true;
System.out.println("服务器已启动,端口号:" + port);
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
started = false;
}
@Override
public void run() {
//循环遍历selector
while(started){
try{
// 阻塞,阻塞到至少有一个通道在你注册的事件上就绪了。
// 这个过程可能会造成调用线程进入阻塞状态, 通过调用Selector对象的wakeup()方法让处在阻塞状态的select()方法立刻返回
// wakeup()该方法使得选择器上的第一个还没有返回的选择操作立即返回。如果当前没有进行中的选择操作,那么下一次对select()方法的一次调用将立即返回。
// selector.select();
// 和select()一样,但最长阻塞时间为timeout毫秒。
// selector.select(1000);
// 非阻塞,只要有通道就绪就立刻返回。
selector.selectNow();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Throwable t){
t.printStackTrace();
}
}
//selector关闭后会自动释放里面管理的资源
if(selector != null) {
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
//处理新接入的请求消息
if(key.isAcceptable()){
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
//通过ServerSocketChannel的accept创建SocketChannel实例
//完成该操作意味着完成TCP三次握手,TCP物理链路正式建立
SocketChannel sc = ssc.accept();
//设置为非阻塞的
// 神奇的事情,blocking默认是true
// TODO: 2019/8/28
sc.configureBlocking(false);
//注册为读
sc.register(selector, SelectionKey.OP_READ);
}
//读消息
if(key.isReadable()){
SocketChannel sc = (SocketChannel) key.channel();
//创建ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if(readBytes>0){
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String expression = new String(bytes,"UTF-8");
System.out.println("服务器收到消息:" + expression);
//处理数据
String result = null;
try{
result = Calculator.cal(expression).toString();
}catch(Exception e){
result = "计算错误:" + e.getMessage();
}
//发送应答消息
doWrite(sc,result);
}
//没有读取到字节 忽略
// else if(readBytes==0);
//链路已经关闭,释放资源
else if(readBytes<0){
key.cancel();
sc.close();
}
}
}
}
//异步发送应答消息
private void doWrite(SocketChannel channel,String response) throws IOException {
//将消息编码为字节数组
byte[] bytes = response.getBytes();
//根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//发送缓冲区的字节数组
channel.write(writeBuffer);
//****此处不含处理“写半包”的代码
}
}

final class Calculator {
private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
public static Object cal(String expression) throws ScriptException {
return jse.eval(expression);
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.SocketChannel;
import java.util.Iterator;
import java.util.Set;

public class Client {
private static String DEFAULT_HOST = "127.0.0.1";
private static int DEFAULT_PORT = 12345;
private static ClientHandle clientHandle;
public static void start(){
start(DEFAULT_HOST,DEFAULT_PORT);
}
public static synchronized void start(String ip,int port){
if(clientHandle!=null)
clientHandle.stop();
clientHandle = new ClientHandle(ip,port);
new Thread(clientHandle,"Server").start();
}
//向服务器发送消息
public static boolean sendMsg(String msg) throws Exception{
if(msg.equals("q")) return false;
clientHandle.sendMsg(msg);
return true;
}
}

class ClientHandle implements Runnable{
private String host;
private int port;
private Selector selector;

private SocketChannel socketChannel;

private volatile boolean started;

public ClientHandle(String ip,int port) {
this.host = ip;
this.port = port;
try{
//创建选择器
selector = Selector.open();
//打开监听通道
socketChannel = SocketChannel.open();
//如果为 true,则此通道将被置于阻塞模式;如果为 false,则此通道将被置于非阻塞模式
socketChannel.configureBlocking(false);//开启非阻塞模式
started = true;
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
}
public void stop(){
started = false;
}
@Override
public void run() {
try{
doConnect();
}catch(IOException e){
e.printStackTrace();
System.exit(1);
}
//循环遍历selector
while(started){
try{
//无论是否有读写事件发生,selector每隔1s被唤醒一次
selector.select(1000);
//阻塞,只有当至少一个注册的事件发生的时候才会继续.
// selector.select();
Set<SelectionKey> keys = selector.selectedKeys();
Iterator<SelectionKey> it = keys.iterator();
SelectionKey key = null;
while(it.hasNext()){
key = it.next();
it.remove();
try{
handleInput(key);
}catch(Exception e){
if(key != null){
key.cancel();
if(key.channel() != null){
key.channel().close();
}
}
}
}
}catch(Exception e){
e.printStackTrace();
System.exit(1);
}
}
//selector关闭后会自动释放里面管理的资源
if(selector != null)
try{
selector.close();
}catch (Exception e) {
e.printStackTrace();
}
}
private void handleInput(SelectionKey key) throws IOException{
if(key.isValid()){
SocketChannel sc = (SocketChannel) key.channel();
if(key.isConnectable()){
if(sc.finishConnect());
else System.exit(1);
}
//读消息
if(key.isReadable()){
//创建ByteBuffer,并开辟一个1M的缓冲区
ByteBuffer buffer = ByteBuffer.allocate(1024);
//读取请求码流,返回读取到的字节数
int readBytes = sc.read(buffer);
//读取到字节,对字节进行编解码
if(readBytes>0){
//将缓冲区当前的limit设置为position=0,用于后续对缓冲区的读取操作
buffer.flip();
//根据缓冲区可读字节数创建字节数组
byte[] bytes = new byte[buffer.remaining()];
//将缓冲区可读字节数组复制到新建的数组中
buffer.get(bytes);
String result = new String(bytes,"UTF-8");
System.out.println("客户端收到消息:" + result);
}
//没有读取到字节 忽略
// else if(readBytes==0);
//链路已经关闭,释放资源
else if(readBytes<0){
key.cancel();
sc.close();
}
}
}
}
//异步发送消息
private void doWrite(SocketChannel channel,String request) throws IOException {
//将消息编码为字节数组
byte[] bytes = request.getBytes();
//根据数组容量创建ByteBuffer
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
//将字节数组复制到缓冲区
writeBuffer.put(bytes);
//flip操作
writeBuffer.flip();
//发送缓冲区的字节数组
channel.write(writeBuffer);
//****此处不含处理“写半包”的代码
// TODO: 2019/8/28 因为应答消息的发送,
// SocketChannel也是异步非阻塞的,
// 所以不能保证一次能吧需要发送的数据发送完,
// 此时就会出现写半包的问题。我们需要注册写操作,
// 不断轮询Selector将没有发送完的消息发送完毕,
// 然后通过Buffer的hasRemain()方法判断消息是否发送完成。
}
private void doConnect() throws IOException{
if(socketChannel.connect(new InetSocketAddress(host,port)));
else socketChannel.register(selector, SelectionKey.OP_CONNECT);
}
public void sendMsg(String msg) throws Exception{
socketChannel.register(selector, SelectionKey.OP_WRITE);
doWrite(socketChannel, msg);

}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.Random;

/**
* Test class
*
* @author 蒋时华
* @date 2017/6/24
*/
public class NIOTest {

public static void main(String[] args) throws Exception{
//运行服务器
Server.start();
//避免客户端先于服务器启动前执行代码
Thread.sleep(1000);
//运行客户端
Client.start();
Thread.sleep(3000);

Random random = new Random(System.currentTimeMillis());
char operators[] = {'+','-','*','/'};
// String expression = random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1);
// while(Client.sendMsg(new Scanner(System.in).nextLine()));
while(
Client.sendMsg(random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1))){
Thread.sleep(random.nextInt(1000));
};
}

}

nio 结构

服务端
  • 打开ServerSocketChannel,监听客户端连接
  • 绑定监听端口,设置连接为非阻塞模式
  • 创建Reactor线程,创建多路复用器并启动线程
  • 将ServerSocketChannel注册到Reactor线程中的Selector上,监听ACCEPT事件
  • Selector轮询准备就绪的key
  • Selector监听到新的客户端接入,处理新的接入请求,完成TCP三次握手,建立链路
客户端
  • 设置客户端链路为非阻塞模式
  • 将新接入的客户端连接注册到Reactor线程的Selector上,监听读操作,读取客户端发送的网络消息
  • 异步读取客户端消息到缓冲区
  • 对Buffer编解码,处理半包消息,将解码成功的消息封装成Task
  • 将应答消息编码为Buffer,调用SocketChannel的write将消息异步发送给客户端
Selector(多路复用器|选择某个通道器)

选择器类管理着一个被注册的通道集合的信息和它们的就绪状态。
通道是和选择器一起被注册的,并且使用选择器来更新通道的就绪状态,
当这么做的时候,可以选择将被激发的线程挂起直到有就绪的通道。
使用Selector的好处在于: 使用更少的线程来就可以来处理通道了, 相比使用多个线程,避免了线程上下文切换带来的开销。

SelectionKey

表示了一个特定的通道对象和一个特定的选择器对象之间的注册关系。
key.attachment(); //返回SelectionKey的attachment,attachment可以在注册channel的时候指定。
key.channel(); // 返回该SelectionKey对应的channel。
key.selector(); // 返回该SelectionKey对应的Selector。
key.interestOps(); //返回代表需要Selector监控的IO操作的bit mask
key.readyOps(); // 返回一个bit mask,代表在相应channel上可以进行的IO操作。

事件名对应值
服务端接收客户端连接事件SelectionKey.OP_ACCEPT(16)
客户端连接服务端事件SelectionKey.OP_CONNECT(8)
读事件SelectionKey.OP_READ(1)
写事件SelectionKey.OP_WRITE(4)

buffer(解决bio中数据不可重复读的问题)

存储基本类型数组数据:ByteBuffer、CharBuffer、FloatBuffer、ShortBuffer、StringCharBuffer等等
这些方法中大部分是对mark、position、limit、capacity的操作。
对于数组来说,需要以下一些重要元素,比如数组大小(capacity)
此时如果是对数组的读取操作时,需要表明当前读到了哪个位置(position),总共可以读到哪个位置(limit),也就是当前数组中有几个元素。
此时如果是写操作,那么需要知道现在写到了哪个位置(position),最大可以写到哪个位置(limit)
最后为了实现可重复读,产生一个备忘位置,即标记(mark)。
源码(只截取部分):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
public abstract class Buffer {

// ...
// Invariants: mark <= position <= limit <= capacity
private int mark = -1;
private int position = 0;
private int limit;
private int capacity;
// ...

// Creates a new buffer with the given mark, position, limit, and capacity,
// after checking invariants.
//
Buffer(int mark, int pos, int lim, int cap) { // package-private
if (cap < 0)
throw new IllegalArgumentException("Negative capacity: " + cap);
this.capacity = cap;
limit(lim);
position(pos);
if (mark >= 0) {
if (mark > pos)
throw new IllegalArgumentException("mark > position: ("
+ mark + " > " + pos + ")");
this.mark = mark;
}
}

/**
* Returns this buffer's capacity.
*
* @return The capacity of this buffer
*/
public final int capacity() {
return capacity;
}

/**
* Returns this buffer's position.
*
* @return The position of this buffer
*/
public final int position() {
return position;
}

/**
* Sets this buffer's position. If the mark is defined and larger than the
* new position then it is discarded.
*
* @param newPosition
* The new position value; must be non-negative
* and no larger than the current limit
*
* @return This buffer
*
* @throws IllegalArgumentException
* If the preconditions on <tt>newPosition</tt> do not hold
*/
public final Buffer position(int newPosition) {
if ((newPosition > limit) || (newPosition < 0))
throw new IllegalArgumentException();
position = newPosition;
if (mark > position) mark = -1;
return this;
}

/**
* Returns this buffer's limit.
*
* @return The limit of this buffer
*/
public final int limit() {
return limit;
}

/**
* Sets this buffer's limit. If the position is larger than the new limit
* then it is set to the new limit. If the mark is defined and larger than
* the new limit then it is discarded.
*
* @param newLimit
* The new limit value; must be non-negative
* and no larger than this buffer's capacity
*
* @return This buffer
*
* @throws IllegalArgumentException
* If the preconditions on <tt>newLimit</tt> do not hold
*/
public final Buffer limit(int newLimit) {
if ((newLimit > capacity) || (newLimit < 0))
throw new IllegalArgumentException();
limit = newLimit;
if (position > limit) position = limit;
if (mark > limit) mark = -1;
return this;
}

/**
* Sets this buffer's mark at its position.
*
* @return This buffer
*/
public final Buffer mark() {
mark = position;
return this;
}

/**
* Resets this buffer's position to the previously-marked position.
*
* <p> Invoking this method neither changes nor discards the mark's
* value. </p>
*
* @return This buffer
*
* @throws InvalidMarkException
* If the mark has not been set
*/
public final Buffer reset() {
int m = mark;
if (m < 0)
throw new InvalidMarkException();
position = m;
return this;
}

/**
* Clears this buffer. The position is set to zero, the limit is set to
* the capacity, and the mark is discarded.
*
* <p> Invoke this method before using a sequence of channel-read or
* <i>put</i> operations to fill this buffer. For example:
*
* <blockquote><pre>
* buf.clear(); // Prepare buffer for reading
* in.read(buf); // Read data</pre></blockquote>
*
* <p> This method does not actually erase the data in the buffer, but it
* is named as if it did because it will most often be used in situations
* in which that might as well be the case. </p>
*
* @return This buffer
*/
public final Buffer clear() {
position = 0;
limit = capacity;
mark = -1;
return this;
}

/**
* Flips this buffer. The limit is set to the current position and then
* the position is set to zero. If the mark is defined then it is
* discarded.
*
* <p> After a sequence of channel-read or <i>put</i> operations, invoke
* this method to prepare for a sequence of channel-write or relative
* <i>get</i> operations. For example:
*
* <blockquote><pre>
* buf.put(magic); // Prepend header
* in.read(buf); // Read data into rest of buffer
* buf.flip(); // Flip buffer
* out.write(buf); // Write header + data to channel</pre></blockquote>
*
* <p> This method is often used in conjunction with the {@link
* java.nio.ByteBuffer#compact compact} method when transferring data from
* one place to another. </p>
*
* @return This buffer
*/
public final Buffer flip() {
limit = position;
position = 0;
mark = -1;
return this;
}

/**
* Rewinds this buffer. The position is set to zero and the mark is
* discarded.
*
* <p> Invoke this method before a sequence of channel-write or <i>get</i>
* operations, assuming that the limit has already been set
* appropriately. For example:
*
* <blockquote><pre>
* out.write(buf); // Write remaining data
* buf.rewind(); // Rewind buffer
* buf.get(array); // Copy data into array</pre></blockquote>
*
* @return This buffer
*/
public final Buffer rewind() {
position = 0;
mark = -1;
return this;
}

// ...
}

直接缓冲区与非直接缓冲区(ByteBuffer):

  • 非直接缓冲区:
    优点:在虚拟机内存中创建,易回收
    缺点:但占用虚拟机内存开销,处理中有复制过程。
  • 直接缓冲区:
    优点:在虚拟机内存外,开辟的内存,IO操作直接进行,没有再次复制
    缺点:创建和销毁开销大,没有管理权(基于系统的物理内存没有分代回收机制)


    用通俗的话讲就是,比如你是个小组长(jvm堆内存),你管理者你底下的人,
    但是你的领导(内核[物理空间])要知道你的情况,你需要把你的组内的情况汇报给他(复制),而他自己本身只知道你的情况,你下面人的情况他是不了解的也是不关心的,相当于他把这块区域分配给你,至于你要干什么,他是不管的

JVM创建一个缓冲区的时候,实际上做了如下几件事:

  • JVM确保Heap区域内的空间足够,如果不够则使用触发GC在内的方法获得空间;
  • 获得空间之后会找一组堆内的连续地址分配数组, 这里需要注意的是,在物理内存上,这些字节是不一定连续的;
  • 对于不涉及到IO的操作,这样的处理没有任何问题,但是当进行IO操作的时候就会出现一点性能问题.
    所有的IO操作都需要操作系统进入内核态才行,而JVM进程属于用户态进程, 当JVM需要把一个缓冲区写到某个Channel或Socket的时候,需要切换到内核态.
    而内核态由于并不知道JVM里面这个缓冲区存储在物理内存的什么地址,并且这些物理地址并不一定是连续的(或者说不一定是IO操作需要的块结构),
    所以在切换之前JVM需要把缓冲区复制到物理内存一块连续的内存上, 然后由内核去读取这块物理内存,整合成连续的、分块的内存.

三、AIO

代码示例:

服务端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
import javax.script.ScriptEngine;
import javax.script.ScriptEngineManager;
import javax.script.ScriptException;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousServerSocketChannel;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.concurrent.CountDownLatch;

/**
* Server class
*
* @author 蒋时华
* @date 2017/6/24
*/
public class Server {

private static int DEFAULT_PORT = 12345;
private static AsyncServerHandler serverHandle;
public volatile static long clientCount = 0;
public static void start(){
start(DEFAULT_PORT);
}
public static synchronized void start(int port){
if(serverHandle!=null)
return;
serverHandle = new AsyncServerHandler(port);
new Thread(serverHandle,"Server").start();
}
public static void main(String[] args){
Server.start();
}


}

class AsyncServerHandler implements Runnable {
public CountDownLatch latch;
public AsynchronousServerSocketChannel channel;
public AsyncServerHandler(int port) {
try {
//创建服务端通道
channel = AsynchronousServerSocketChannel.open();
//绑定端口
channel.bind(new InetSocketAddress(port));
System.out.println("服务器已启动,端口号:" + port);
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//CountDownLatch初始化
//它的作用:在完成一组正在执行的操作之前,允许当前的现场一直阻塞
//此处,让现场在此阻塞,防止服务端执行完成后退出
//也可以使用while(true)+sleep
//生成环境就不需要担心这个问题,以为服务端是不会退出的
latch = new CountDownLatch(1);
//用于接收客户端的连接
channel.accept(this,new AcceptHandler());
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

class AcceptHandler implements CompletionHandler<AsynchronousSocketChannel, AsyncServerHandler> {
@Override
public void completed(AsynchronousSocketChannel channel,AsyncServerHandler serverHandler) {
//继续接受其他客户端的请求
Server.clientCount++;
System.out.println("连接的客户端数:" + Server.clientCount);
serverHandler.channel.accept(serverHandler, this);
//创建新的Buffer
ByteBuffer buffer = ByteBuffer.allocate(1024);
//异步读 第三个参数为接收消息回调的业务Handler
channel.read(buffer, buffer, new ReadHandler(channel));
}
@Override
public void failed(Throwable exc, AsyncServerHandler serverHandler) {
exc.printStackTrace();
serverHandler.latch.countDown();
}
}

class ReadHandler implements CompletionHandler<Integer, ByteBuffer> {
//用于读取半包消息和发送应答
private AsynchronousSocketChannel channel;
public ReadHandler(AsynchronousSocketChannel channel) {
this.channel = channel;
}
//读取到消息后的处理
@Override
public void completed(Integer result, ByteBuffer attachment) {
//flip操作
attachment.flip();
//根据
byte[] message = new byte[attachment.remaining()];
attachment.get(message);
try {
String expression = new String(message, "UTF-8");
System.out.println("服务器收到消息: " + expression);
String calrResult = null;
try{
calrResult = Calculator.cal(expression).toString();
}catch(Exception e){
calrResult = "计算错误:" + e.getMessage();
}
//向客户端发送消息
doWrite(calrResult);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
//发送消息
private void doWrite(String result) {
byte[] bytes = result.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
writeBuffer.put(bytes);
writeBuffer.flip();
//异步写数据 参数与前面的read一样
channel.write(writeBuffer, writeBuffer,new CompletionHandler<Integer, ByteBuffer>() {
@Override
public void completed(Integer result, ByteBuffer buffer) {
//如果没有发送完,就继续发送直到完成
if (buffer.hasRemaining())
channel.write(buffer, buffer, this);
else{
//创建新的Buffer
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
//异步读 第三个参数为接收消息回调的业务Handler
channel.read(readBuffer, readBuffer, new ReadHandler(channel));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
channel.close();
} catch (IOException e) {
}
}
});
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
try {
this.channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}

final class Calculator {
private final static ScriptEngine jse = new ScriptEngineManager().getEngineByName("JavaScript");
public static Object cal(String expression) throws ScriptException {
return jse.eval(expression);
}
}

客户端代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
import java.io.IOException;
import java.io.UnsupportedEncodingException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousSocketChannel;
import java.nio.channels.CompletionHandler;
import java.util.Scanner;
import java.util.concurrent.CountDownLatch;

/**
* Client class
*
* @author 蒋时华
* @date 2017/6/24
*/
public class Client {

private static String DEFAULT_HOST = "127.0.0.1";
private static int DEFAULT_PORT = 12345;
private static AsyncClientHandler clientHandle;
public static void start(){
start(DEFAULT_HOST,DEFAULT_PORT);
}
public static synchronized void start(String ip,int port){
if(clientHandle!=null)
return;
clientHandle = new AsyncClientHandler(ip,port);
new Thread(clientHandle,"Client").start();
}
//向服务器发送消息
public static boolean sendMsg(String msg) throws Exception{
if(msg.equals("q")) return false;
clientHandle.sendMsg(msg);
return true;
}
@SuppressWarnings("resource")
public static void main(String[] args) throws Exception{
Client.start();
System.out.println("请输入请求消息:");
Scanner scanner = new Scanner(System.in);
while(Client.sendMsg(scanner.nextLine()));
}

}

class AsyncClientHandler implements CompletionHandler<Void, AsyncClientHandler>, Runnable {
private AsynchronousSocketChannel clientChannel;
private String host;
private int port;
private CountDownLatch latch;
public AsyncClientHandler(String host, int port) {
this.host = host;
this.port = port;
try {
//创建异步的客户端通道
clientChannel = AsynchronousSocketChannel.open();
} catch (IOException e) {
e.printStackTrace();
}
}
@Override
public void run() {
//创建CountDownLatch等待
latch = new CountDownLatch(1);
//发起异步连接操作,回调参数就是这个类本身,如果连接成功会回调completed方法
clientChannel.connect(new InetSocketAddress(host, port), this, this);
try {
latch.await();
} catch (InterruptedException e1) {
e1.printStackTrace();
}
try {
clientChannel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
//连接服务器成功
//意味着TCP三次握手完成
@Override
public void completed(Void result, AsyncClientHandler attachment) {
System.out.println("客户端成功连接到服务器...");
}
//连接服务器失败
@Override
public void failed(Throwable exc, AsyncClientHandler attachment) {
System.err.println("连接服务器失败...");
exc.printStackTrace();
try {
clientChannel.close();
latch.countDown();
} catch (IOException e) {
e.printStackTrace();
}
}
//向服务器发送消息
public void sendMsg(String msg){
System.out.println("算术表达式为:" + msg);
byte[] req = msg.getBytes();
ByteBuffer writeBuffer = ByteBuffer.allocate(req.length);
writeBuffer.put(req);
writeBuffer.flip();
//异步写
clientChannel.write(writeBuffer, writeBuffer,new WriteHandler(clientChannel, latch));
}
}

class WriteHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
private CountDownLatch latch;
public WriteHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
this.clientChannel = clientChannel;
this.latch = latch;
}
@Override
public void completed(Integer result, ByteBuffer buffer) {
//完成全部数据的写入
if (buffer.hasRemaining()) {
clientChannel.write(buffer, buffer, this);
}
else {
//读取数据
ByteBuffer readBuffer = ByteBuffer.allocate(1024);
clientChannel.read(readBuffer,readBuffer,new ClientReadHandler(clientChannel, latch));
}
}
@Override
public void failed(Throwable exc, ByteBuffer attachment) {
System.err.println("数据发送失败...");
try {
clientChannel.close();
latch.countDown();
} catch (IOException e) {
}
}
}

class ClientReadHandler implements CompletionHandler<Integer, ByteBuffer> {
private AsynchronousSocketChannel clientChannel;
private CountDownLatch latch;
public ClientReadHandler(AsynchronousSocketChannel clientChannel,CountDownLatch latch) {
this.clientChannel = clientChannel;
this.latch = latch;
}
@Override
public void completed(Integer result,ByteBuffer buffer) {
buffer.flip();
byte[] bytes = new byte[buffer.remaining()];
buffer.get(bytes);
String body;
try {
body = new String(bytes,"UTF-8");
System.out.println("客户端收到结果:"+ body);
} catch (UnsupportedEncodingException e) {
e.printStackTrace();
}
}
@Override
public void failed(Throwable exc,ByteBuffer attachment) {
System.err.println("数据读取失败...");
try {
clientChannel.close();
latch.countDown();
} catch (IOException e) {
}
}
}

测试代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import java.util.Random;
import java.util.Scanner;

/**
* Test class
*
* @author 蒋时华
* @date 2017/6/24
*/
public class AIOTest {

public static void main(String[] args) throws Exception{
//运行服务器
Server.start();
//避免客户端先于服务器启动前执行代码
Thread.sleep(1000);

//运行客户端
Client.start();
Thread.sleep(3000);

Random random = new Random(System.currentTimeMillis());
char operators[] = {'+','-','*','/'};

while(Client.sendMsg(random.nextInt(10)+""+operators[random.nextInt(4)]+(random.nextInt(10)+1))){
Thread.sleep(random.nextInt(1000));
}
}

}

四、总结

同步和异步

我认为的网络层面io的同步和异步描述的是一种消息通知的机制,主动等待消息返回还是被动接受消息
同步io:指的是调用方通过主动等待获取调用返回的结果来获取消息通知。
异步io:指的是被调用方通过某种方式(如,回调函数)来通知调用方获取消息。

阻塞和非阻塞

NIO、AIO为什么被称为非阻塞?

  • BIO在发起读请求以后,会一直等待,一直到拿到结果
  • NIO在发起读请求以后,不会立即拿到结果
  • AIO通过回调方法,被动的接受

BIO(blocking IO):

同步阻塞式IO 面向流 操作字节或字符 单向传输数据

NIO(non blocking IO):

同步非阻塞式IO 面向通道 操作缓冲区 双向传输数据

AIO(async IO):

同步非阻塞式IO 大量使用回调函数 异步处理通信过程 异步的双向传输数据

Jeff-Eric wechat