2.非阻塞就是请求者发起一个请求,不用一直等着结果返回,可以先去干其他事情,当条件就绪的时候,就自动回来。
public class BioServerTest { public static void main(String[] args) throws IOException { // 堆代码 duidaima.com //初始化服务端socket并且绑定 8080 端口 ServerSocket serverSocket = new ServerSocket(8080); //循环监听客户端请求 while (true){ try { //监听客户端请求 Socket socket = serverSocket.accept(); //将字节流转化成字符流,读取客户端输入的内容 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //读取一行数据 String str = bufferedReader.readLine(); //打印客户端发送的信息 System.out.println("服务端收到客户端发送的信息:" + str); //向客户端返回信息,将字符转化成字节流,并输出 PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()),true); printWriter.println("hello,我是服务端,已收到消息"); // 关闭流 bufferedReader.close(); printWriter.close(); } catch (IOException e) { e.printStackTrace(); } } } }客户端操作,样例程序如下:
public class BioClientTest { public static void main(String[] args) { //堆代码 duidaima.com //创建10个线程,模拟10个客户端,同时向服务端发送请求 for (int i = 0; i < 10; i++) { final int j = i;//定义变量 new Thread(new Runnable() { @Override public void run() { try { //通过IP和端口与服务端建立连接 Socket socket =new Socket("127.0.0.1",8080); //将字符流转化成字节流,并输出 PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()),true); String str="Hello,我是" + j + "个,客户端!"; printWriter.println(str); //从输入流中读取服务端返回的信息,将字节流转化成字符流 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //读取内容 String result = bufferedReader.readLine(); //打印服务端返回的信息 System.out.println("客户端发送请求内容:" + str + " -> 收到服务端返回的内容:" + result); // 关闭流 bufferedReader.close(); printWriter.close(); // 关闭socket socket.close(); } catch (IOException e) { e.printStackTrace(); } } }).start(); } } }最后,依次启动服务端、客户端,看看控制台输出情况如何。
服务端收到客户端发送的信息:Hello,我是8个,客户端! 服务端收到客户端发送的信息:Hello,我是9个,客户端! 服务端收到客户端发送的信息:Hello,我是7个,客户端! 服务端收到客户端发送的信息:Hello,我是5个,客户端! 服务端收到客户端发送的信息:Hello,我是4个,客户端! 服务端收到客户端发送的信息:Hello,我是3个,客户端! 服务端收到客户端发送的信息:Hello,我是6个,客户端! 服务端收到客户端发送的信息:Hello,我是2个,客户端! 服务端收到客户端发送的信息:Hello,我是1个,客户端! 服务端收到客户端发送的信息:Hello,我是0个,客户端!客户端控制台结果如下:
客户端发送请求内容:Hello,我是8个,客户端! -> 收到服务端返回的内容:hello,我是服务端,已收到消息 客户端发送请求内容:Hello,我是9个,客户端! -> 收到服务端返回的内容:hello,我是服务端,已收到消息 客户端发送请求内容:Hello,我是7个,客户端! -> 收到服务端返回的内容:hello,我是服务端,已收到消息 客户端发送请求内容:Hello,我是5个,客户端! -> 收到服务端返回的内容:hello,我是服务端,已收到消息 客户端发送请求内容:Hello,我是4个,客户端! -> 收到服务端返回的内容:hello,我是服务端,已收到消息 客户端发送请求内容:Hello,我是3个,客户端! -> 收到服务端返回的内容:hello,我是服务端,已收到消息 客户端发送请求内容:Hello,我是6个,客户端! -> 收到服务端返回的内容:hello,我是服务端,已收到消息 客户端发送请求内容:Hello,我是2个,客户端! -> 收到服务端返回的内容:hello,我是服务端,已收到消息 客户端发送请求内容:Hello,我是1个,客户端! -> 收到服务端返回的内容:hello,我是服务端,已收到消息 客户端发送请求内容:Hello,我是0个,客户端! -> 收到服务端返回的内容:hello,我是服务端,已收到消息随着客户端的请求次数越来越多,可能需要排队的时间会越来越长,因此是否可以在服务端,采用多线程编程进行处理呢?答案是,可以的!
public class BioServerTest { public static void main(String[] args) throws IOException { //初始化服务端socket并且绑定 8080 端口 ServerSocket serverSocket = new ServerSocket(8080); //循环监听客户端请求 while (true){ //监听客户端请求 Socket socket = serverSocket.accept(); new Thread(new Runnable() { @Override public void run() { try { String threadName = Thread.currentThread().toString(); //将字节流转化成字符流,读取客户端输入的内容 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //读取一行数据 String str = bufferedReader.readLine(); //打印客户端发送的信息 System.out.println("线程名称" + threadName + ",服务端收到客户端发送的信息:" + str); //向客户端返回信息,将字符转化成字节流,并输出 PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()),true); printWriter.println("hello,我是服务端,已收到消息"); // 关闭流 bufferedReader.close(); printWriter.close(); } catch (IOException e) { e.printStackTrace(); } } }).start(); } } }依次启动服务端、客户端,服务端控制台输出结果如下:
线程名称Thread[Thread-8,5,main],服务端收到客户端发送的信息:Hello,我是4个,客户端! 线程名称Thread[Thread-4,5,main],服务端收到客户端发送的信息:Hello,我是8个,客户端! 线程名称Thread[Thread-0,5,main],服务端收到客户端发送的信息:Hello,我是1个,客户端! 线程名称Thread[Thread-7,5,main],服务端收到客户端发送的信息:Hello,我是5个,客户端! 线程名称Thread[Thread-5,5,main],服务端收到客户端发送的信息:Hello,我是2个,客户端! 线程名称Thread[Thread-9,5,main],服务端收到客户端发送的信息:Hello,我是3个,客户端! 线程名称Thread[Thread-1,5,main],服务端收到客户端发送的信息:Hello,我是0个,客户端! 线程名称Thread[Thread-3,5,main],服务端收到客户端发送的信息:Hello,我是7个,客户端! 线程名称Thread[Thread-2,5,main],服务端收到客户端发送的信息:Hello,我是9个,客户端! 线程名称Thread[Thread-6,5,main],服务端收到客户端发送的信息:Hello,我是6个,客户端!当服务端接收到客户端的请求时,会给每个客户端创建一个新的线程进行链路处理,处理完成之后,通过输出流返回应答给客户端,最后线程会销毁。但是这样的编程模型也有很大的弊端,如果出现 100、1000、甚至 10000 个客户端同时请求服务端,采用这种编程模型,服务端也会创建与之相同的线程数量,线程数急剧膨胀可能会导致线程堆栈溢出、创建新线程失败等问题,最终可能导致服务端宕机或者僵死,不能对外提供服务。
public class BioServerTest { public static void main(String[] args) throws IOException { //在线程池中创建5个固定大小线程,来处理客户端的请求 ExecutorService executorService = Executors.newFixedThreadPool(5); //初始化服务端socket并且绑定 8080 端口 ServerSocket serverSocket = new ServerSocket(8080); //循环监听客户端请求 while (true){ //监听客户端请求 Socket socket = serverSocket.accept(); //使用线程池执行任务 executorService.execute(new Runnable() { @Override public void run() { try { String threadName = Thread.currentThread().toString(); //将字节流转化成字符流,读取客户端输入的内容 BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(socket.getInputStream())); //读取一行数据 String str = bufferedReader.readLine(); //打印客户端发送的信息 System.out.println("线程名称" + threadName + ",服务端收到客户端发送的信息:" + str); //向客户端返回信息,将字符转化成字节流,并输出 PrintWriter printWriter = new PrintWriter(new OutputStreamWriter(socket.getOutputStream()),true); printWriter.println("hello,我是服务端,已收到消息"); // 关闭流 bufferedReader.close(); printWriter.close(); } catch (IOException e) { e.printStackTrace(); } } }); } } }依次启动服务端、客户端,服务端控制台输出结果如下:
线程名称Thread[pool-1-thread-4,5,main],服务端收到客户端发送的信息:Hello,我是6个,客户端! 线程名称Thread[pool-1-thread-2,5,main],服务端收到客户端发送的信息:Hello,我是8个,客户端! 线程名称Thread[pool-1-thread-3,5,main],服务端收到客户端发送的信息:Hello,我是9个,客户端! 线程名称Thread[pool-1-thread-5,5,main],服务端收到客户端发送的信息:Hello,我是5个,客户端! 线程名称Thread[pool-1-thread-1,5,main],服务端收到客户端发送的信息:Hello,我是7个,客户端! 线程名称Thread[pool-1-thread-5,5,main],服务端收到客户端发送的信息:Hello,我是2个,客户端! 线程名称Thread[pool-1-thread-5,5,main],服务端收到客户端发送的信息:Hello,我是0个,客户端! 线程名称Thread[pool-1-thread-1,5,main],服务端收到客户端发送的信息:Hello,我是1个,客户端! 线程名称Thread[pool-1-thread-5,5,main],服务端收到客户端发送的信息:Hello,我是3个,客户端! 线程名称Thread[pool-1-thread-1,5,main],服务端收到客户端发送的信息:Hello,我是4个,客户端!本例中测试的客户端数量是 10,服务端使用 java 线程池来处理任务,线程数量为 5 个,服务端不用为每个客户端都创建一个线程,由于线程池可以设置消息队列的大小和最大线程数,因此它的资源占用是可控的,无论多少个客户端并发访问,都不会导致资源的耗尽和宕机。在活动连接数不是特别高的情况下,这种模型还是不错的,可以让每一个连接专注于自己的 I/O 并且编程模型简单,也不用过多考虑系统的过载、限流等问题。
/** * NIO 服务端 */ public class NioServerTest { public static void main(String[] args) throws IOException { // 打开服务器套接字通道 ServerSocketChannel ssc = ServerSocketChannel.open(); // 服务器配置为非阻塞 ssc.configureBlocking(false); // 进行服务的绑定,监听8080端口 ssc.socket().bind(new InetSocketAddress(8080)); // 构建一个Selector选择器,并且将channel注册上去 Selector selector = Selector.open(); // 将serverSocketChannel注册到selector,并对accept事件感兴趣(serverSocketChannel只能支持accept操作) ssc.register(selector, SelectionKey.OP_ACCEPT); while (true){ // 查询指定事件已经就绪的通道数量,select方法有阻塞效果,直到有事件通知才会有返回,如果为0就跳过 int readyChannels = selector.select(); if(readyChannels == 0) { continue; }; //通过选择器取得所有key集合 Set<SelectionKey> selectedKeys = selector.selectedKeys(); Iterator<SelectionKey> iterator = selectedKeys.iterator(); while (iterator.hasNext()){ SelectionKey key = iterator.next(); //判断状态是否有效 if (!key.isValid()) { continue; } if (key.isAcceptable()) { // 处理通道中的连接事件 ServerSocketChannel server = (ServerSocketChannel) key.channel(); SocketChannel sc = server.accept(); sc.configureBlocking(false); System.out.println("接收到新的客户端连接,地址:" + sc.getRemoteAddress()); // 将通道注册到选择器并处理通道中可读事件 sc.register(selector, SelectionKey.OP_READ); } else if (key.isReadable()) { // 处理通道中的可读事件 SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); while (channel.isOpen() && channel.read(byteBuffer) != -1) { // 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了) if (byteBuffer.position() > 0) { break; }; } byteBuffer.flip(); //获取缓冲中的数据 String result = new String(byteBuffer.array(), 0, byteBuffer.limit()); System.out.println("收到客户端发送的信息,内容:" + result); // 将通道注册到选择器并处理通道中可写事件 channel.register(selector, SelectionKey.OP_WRITE); } else if (key.isWritable()) { // 处理通道中的可写事件 SocketChannel channel = (SocketChannel) key.channel(); ByteBuffer byteBuffer = ByteBuffer.allocate(1024); byteBuffer.put("server send".getBytes()); byteBuffer.flip(); channel.write(byteBuffer); // 将通道注册到选择器并处理通道中可读事件 channel.register(selector, SelectionKey.OP_READ); //写完之后关闭通道 channel.close(); } //当前事件已经处理完毕,可以丢弃 iterator.remove(); } } } }客户端操作,样例程序如下:
/** * NIO 客户端 */ public class NioClientTest { public static void main(String[] args) throws IOException { // 打开socket通道 SocketChannel sc = SocketChannel.open(); //设置为非阻塞 sc.configureBlocking(false); //连接服务器地址和端口 sc.connect(new InetSocketAddress("127.0.0.1", 8080)); while (!sc.finishConnect()) { // 没连接上,则一直等待 System.out.println("客户端正在连接中,请耐心等待"); } // 发送内容 ByteBuffer writeBuffer = ByteBuffer.allocate(1024); writeBuffer.put("Hello,我是客户端".getBytes()); writeBuffer.flip(); sc.write(writeBuffer); // 读取响应 ByteBuffer readBuffer = ByteBuffer.allocate(1024); while (sc.isOpen() && sc.read(readBuffer) != -1) { // 长连接情况下,需要手动判断数据有没有读取结束 (此处做一个简单的判断: 超过0字节就认为请求结束了) if (readBuffer.position() > 0) { break; }; } readBuffer.flip(); String result = new String(readBuffer.array(), 0, readBuffer.limit()); System.out.println("客户端收到服务端:" + sc.socket().getRemoteSocketAddress() + ",返回的信息:" + result); // 关闭通道 sc.close(); } }最后,依次启动服务端、客户端,看看控制台输出情况如何。
接收到新的客户端连接,地址:/127.0.0.1:57644 收到客户端发送的信息,内容:Hello,我是客户端客户端控制台结果如下:
客户端收到服务端:/127.0.0.1:8080,返回的信息:server send从编程上可以看到,NIO 的操作比传统的 IO 操作要复杂的多!Selector 被称为选择器 ,当然你也可以翻译为多路复用器 。它是Java NIO 核心组件中的一个,用于检查一个或多个 Channel(通道)的状态是否处于连接就绪、接受就绪、可读就绪、可写就绪。
从上面的代码中大家都可以看出来,除了编程复杂之外,还有几个让人诟病的问题:
1.JDK 的 NIO 底层由 epoll 实现,该实现饱受诟病的空轮询 bug 会导致 cpu 飙升 100%!
2.项目庞大之后,自行实现的 NIO 很容易出现各类 bug,维护成本较高!/** * aio 服务端 */ public class AioServer { public AsynchronousServerSocketChannel serverChannel; /** * 监听客户端请求 * @throws Exception */ public void listen() throws Exception { //打开一个服务端通道 serverChannel = AsynchronousServerSocketChannel.open(); serverChannel.bind(new InetSocketAddress(8080));//监听8080端口 //服务监听 serverChannel.accept(this, new CompletionHandler<AsynchronousSocketChannel,AioServer>(){ @Override public void completed(AsynchronousSocketChannel client, AioServer attachment) { try { if (client.isOpen()) { System.out.println("接收到新的客户端连接,地址:" + client.getRemoteAddress()); final ByteBuffer buffer = ByteBuffer.allocate(1024); //读取客户端发送的信息 client.read(buffer, client, new CompletionHandler<Integer, AsynchronousSocketChannel>(){ @Override public void completed(Integer result, AsynchronousSocketChannel attachment) { try { //读取请求,处理客户端发送的数据 buffer.flip(); String content = new String(buffer.array(), 0, buffer.limit()); System.out.println("服务端收到客户端发送的信息:" + content); //向客户端发送数据 ByteBuffer writeBuffer = ByteBuffer.allocate(1024); writeBuffer.put("server send".getBytes()); writeBuffer.flip(); attachment.write(writeBuffer).get(); } catch (Exception e) { e.printStackTrace(); } } @Override public void failed(Throwable exc, AsynchronousSocketChannel attachment) { try { exc.printStackTrace(); attachment.close(); } catch (IOException e) { e.printStackTrace(); } } }); } } catch (Exception e) { e.printStackTrace(); } finally { //当有新客户端接入的时候,直接调用accept方法,递归执行下去,保证多个客户端都可以阻塞 attachment.serverChannel.accept(attachment, this); } } @Override public void failed(Throwable exc, AioServer attachment) { exc.printStackTrace(); } }); } public static void main(String[] args) throws Exception { //启动服务器,并监听客户端 new AioServer().listen(); //因为是异步IO执行,让主线程睡眠但不关闭 Thread.sleep(Integer.MAX_VALUE); } }客户端操作,样例程序如下:
/** * aio 客户端 */ public class AioClient { public static void main(String[] args) throws IOException, InterruptedException { //打开一个客户端通道 AsynchronousSocketChannel channel = AsynchronousSocketChannel.open(); //与服务器建立连接 channel.connect(new InetSocketAddress("127.0.0.1", 8080)); //睡眠1s,等待与服务器建立连接 Thread.sleep(1000); try { //向服务器发送数据 channel.write(ByteBuffer.wrap("Hello,我是客户端".getBytes())).get(); } catch (Exception e) { e.printStackTrace(); } try { //从服务器读取数据 ByteBuffer byteBuffer = ByteBuffer.allocate(1024); channel.read(byteBuffer).get();//将通道中的数据写入缓冲buffer byteBuffer.flip(); String result = new String(byteBuffer.array(), 0, byteBuffer.limit()); System.out.println("客户端收到服务器返回的内容:" + result);//输出返回结果 } catch (Exception e) { e.printStackTrace(); } } }同样的,依次启动服务端程序,再启动客户端程序,看看运行结果!
接收到新的客户端连接,地址:/127.0.0.1:56606 服务端收到客户端发送的信息:Hello,我是客户端客户端控制台结果如下:
客户端收到服务器返回的内容:server send这种组合方式用起来十分复杂,只有在一些非常复杂的分布式情况下使用,像集群之间的消息同步机制一般用这种 I/O 组合方式。如 Cassandra 的 Gossip 通信机制就是采用异步非阻塞的方式,可以实现非常高的网络传输性能。Netty 之前也尝试使用过 AIO,不过又放弃了!