在了解Reactor模式之前, 首先了解什么是NIO.
java.nio全称java non-blocking IO 即非阻塞IO.这个地方要明白,非阻塞不等于异步。
非阻塞:当一个线上读取数据,没有数据时该线程可以干其他的事情。就是调用了之后立马返回。
异步IO: 在一个IO操作中,用户态的线程完全不用考虑数据的读取过程,都交给操作系统完成,完成之后通知用户线程即可。这才是真正的异步操作。
同步IO 每个请求必须逐个的被处理,一个流程的处理会导致整个流程的暂时等待。
阻塞: 某个请求发出后,该请求操作需要的条件不满足,请求会一直阻塞,不会返回,直到条件满足。
其中java NIO 中的 Select 在Linux中基于epoll实现。基于IO多路复用。就是一个线程来管理多个IO.
epoll全称eventpoll 是linux内核针对IO多路复用的实现。在linux中,和epoll类似的由select和poll。
其中epoll监听的fd集合是一直在内核存在的,有三个系统调用:epoll_create epoll_wait epoll_ctl 通过epoll_wait可以多次监听同一个fd结合,只返回可读写的那部分。
select只有一个系统调用,就是每次都需要将要监听的所有集合都传给操作系统,当有事件发生时。操作系统在返回给你整个集合。
NIO核心包含三个部分: Channels Buffers Selectors.
Channel: 在NIO中,所有的IO过程都是从建立一个Channel开始的,数据可以从channel中读取到Buffer中 也可以从Buffer中写入到channel中。channel就好像BIO中的流。但是channel时双向的,我感觉这样更贴近于现实,毕竟TCP连接是全双工的。
- FileChannel
- DatagramChannel
- SocketChannel
- ServerSocketChannel
channel分为这四种,分别对应着文件,UDP TCP网络IO.
Buffer buffer即为缓冲区,也就是数据块。
- ByteBuffer
- CharBuffer
- DoubleBuffer
- FloatBuffer
- IntBuffer
- LongBuffer
- ShortBuffer
java基础数据类型中除了boolean都有对应的buffer实现。
Selector (选择器): 他是NIO中的关键所在,我们在程序中可以通过它来实现一个线程同时处理多个Channel 也就是多个连接。
如上图,一个Selectot监听五个通道,在使用时首先需要将通道以及对应感兴趣的事件(Accept read writer等 )注册到Selector上 。当发生对应的事件时,操作系统回通知我们的程序。在Selector中可以读取到对应的Channel 根据事件类型做出相应的操作。
零拷贝
java NIO中提供的FileChannel拥有transferTo和transferFrom两个方法,可以直接把FileChannel中的数据拷贝到另一个Channel,或者把另一个Channel中的数据拷贝到FileChannel .在操作系统的支持下,通过这个方法传输数据不需要将原数据从内核态拷贝到用户态,再从用户态拷贝到内核态。
Reactor实现一个简单的Echo服务器 基于单个线程同时处理多个连接。这样一个Selector同时完成Accept Read Write事件的监听,同时业逻辑也和Selector在同一个线程中执行。这里可以优化一下将业务逻辑在新的线程中执行。
public class EchoService { private final String ip; private final int port; public EchoService(String ip, int port) { this.ip = ip; this.port = port; } public void start(){ try { Selector selector=Selector.open(); ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(ip,port)) .configureBlocking(false) .register(selector, SelectionKey.OP_ACCEPT); while (true){ selector.select(); Setkeys=selector.selectedKeys(); Iterator iteratorKey=keys.iterator(); while (iteratorKey.hasNext()){ SelectionKey key=iteratorKey.next(); if (key.isAcceptable()){ ServerSocketChannel serverChannel= (ServerSocketChannel) key.channel(); SocketChannel socketChannel=serverChannel.accept(); socketChannel.configureBlocking(false) .register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,ByteBuffer.allocate(1024)); } if (key.isReadable()){ SocketChannel sc= (SocketChannel) key.channel(); ByteBuffer buffer= (ByteBuffer) key.attachment(); buffer.clear(); int readCount= sc.read(buffer); if (readCount<0){ iteratorKey.remove(); continue; } buffer.flip(); sc.write(buffer); System.out.print(new String(buffer.array(),0,readCount)); } iteratorKey.remove(); } } } catch (Exception e) { e.printStackTrace(); }finally { System.out.println("exit"); } }}
现在计算机的核数越来越多,仅仅用一个核心来处理IO连接有点让费系统资源,因此我们可以多见几个Reactor .其中住Reactor负责TCP的连接(Accept),连接之后分配到子Reactor来处理IO的读写事件。
并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作自始至终旨在一个线程处理。这样保证了同一个请求的所有状态和上下文在同一个线程中,方便监控请求相应状态。
具体代码实现 EchoService为例:
public class EchoService { private static final Logger logger= LoggerFactory.getLogger(EchoService.class); private final String ip; private final int port; public EchoService(String ip, int port) { this.ip = ip; this.port = port; } public void start(){ logger.info("echo service start......"); try { Selector selector=Selector.open(); ServerSocketChannel serverSocketChannel=ServerSocketChannel.open(); serverSocketChannel.bind(new InetSocketAddress(ip,port)) .configureBlocking(false) .register(selector,SelectionKey.OP_ACCEPT); int coreNum = Runtime.getRuntime().availableProcessors(); Processor[] processors = new Processor[coreNum]; for (int i = 0; i < processors.length; i++) { logger.info("creat processor :{}",i+1); processors[i] = new Processor(); } int index=0; while (Status.running){ selector.select(); Setkeys=selector.selectedKeys(); Iterator iterator=keys.iterator(); while (iterator.hasNext()){ SelectionKey selectionKey=iterator.next(); iterator.remove(); if (selectionKey.isAcceptable()){ ServerSocketChannel currServerSocketChannel= (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel=currServerSocketChannel.accept(); socketChannel.configureBlocking(false); logger.info("Accept request from {}",socketChannel.getRemoteAddress()); Processor processor=processors[(++index)%coreNum]; processor.addChannel(socketChannel); } } } } catch (IOException e) { logger.error("io exception {}",e.getMessage()); } }}
public class Processor { private static final Logger logger= LoggerFactory.getLogger(Processor.class); private static final ExecutorService service= Executors.newFixedThreadPool(2*Runtime.getRuntime().availableProcessors()); private final Selector selector; private volatile boolean running=true; public Processor() throws IOException { this.selector= SelectorProvider.provider().openSelector(); } public void addChannel(SocketChannel socketChannel){ try { socketChannel.register(this.selector, SelectionKey.OP_READ); if (running){ running=false; start(); } wakeup(); } catch (ClosedChannelException e) { logger.error("register channel error :{}",e.getMessage()); } } private void wakeup(){ this.selector.wakeup(); } private void start(){ service.submit(new ProcessorTask(selector)); }}
public class ProcessorTask implements Runnable { private final static Logger logger= LoggerFactory.getLogger(ProcessorTask.class); private Selector selector; ProcessorTask(Selector selector) { this.selector = selector; } @Override public void run() { logger.info("{}\tsub reactor start listener",Thread.currentThread().getName()); while (Status.running){ try { selector.select(); Setkeys=selector.selectedKeys(); Iterator iterator=keys.iterator(); while (iterator.hasNext()){ SelectionKey key=iterator.next(); iterator.remove(); if (key.isReadable()){ ByteBuffer buffer= ByteBuffer.allocate(1024); SocketChannel socketChannel= (SocketChannel) key.channel(); int count=socketChannel.read(buffer); if (count<0){ socketChannel.close(); key.cancel(); logger.info("{}\t Read ended",socketChannel); }else if (count==0){ logger.info("{}\t Message size is 0",socketChannel); }else { buffer.flip(); socketChannel.write(buffer); logger.info("{}\t Read message{}",socketChannel,new String(buffer.array())); } } } } catch (IOException e) { logger.error("select error :{}",e.getMessage()); } } }}
在EchoService中 ,主Reactor接受到新的连接后,将channel注册到subReactor的Selector中。每个子Reactor都有一个自己的Selector对象,并有独立的一个线程处理。