博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
高性能IO -Reactor模式的实现
阅读量:6069 次
发布时间:2019-06-20

本文共 8633 字,大约阅读时间需要 28 分钟。

hot3.png

在了解Reactor模式之前, 首先了解什么是NIO.

java.nio全称java non-blocking IO 即非阻塞IO.这个地方要明白,非阻塞不等于异步。

非阻塞:当一个线上读取数据,没有数据时该线程可以干其他的事情。就是调用了之后立马返回。

异步IO: 在一个IO操作中,用户态的线程完全不用考虑数据的读取过程,都交给操作系统完成,完成之后通知用户线程即可。这才是真正的异步操作。

同步IO  每个请求必须逐个的被处理,一个流程的处理会导致整个流程的暂时等待。

阻塞:  某个请求发出后,该请求操作需要的条件不满足,请求会一直阻塞,不会返回,直到条件满足。

 

 其中java NIO 中的 Select 在Linux中基于epoll实现。基于IO多路复用。就是一个线程来管理多个IO.

epoll全称eventpoll 是linux内核针对IO多路复用的实现。在linux中,和epoll类似的由select和poll。

其中epoll监听的fd集合是一直在内核存在的,有三个系统调用:epoll_create epoll_wait epoll_ctl 通过epoll_wait可以多次监听同一个fd结合,只返回可读写的那部分。

select只有一个系统调用,就是每次都需要将要监听的所有集合都传给操作系统,当有事件发生时。操作系统在返回给你整个集合。

 

NIO核心包含三个部分: Channels Buffers Selectors.

Channel: 在NIO中,所有的IO过程都是从建立一个Channel开始的,数据可以从channel中读取到Buffer中 也可以从Buffer中写入到channel中。channel就好像BIO中的流。但是channel时双向的,我感觉这样更贴近于现实,毕竟TCP连接是全双工的。

  • FileChannel
  • DatagramChannel
  • SocketChannel
  • ServerSocketChannel

channel分为这四种,分别对应着文件,UDP TCP网络IO.

Buffer buffer即为缓冲区,也就是数据块。

  • ByteBuffer
  • CharBuffer
  • DoubleBuffer
  • FloatBuffer
  • IntBuffer
  • LongBuffer
  • ShortBuffer

java基础数据类型中除了boolean都有对应的buffer实现。

Selector (选择器): 他是NIO中的关键所在,我们在程序中可以通过它来实现一个线程同时处理多个Channel 也就是多个连接。

30bac323c233efccbf385beaf2a0c5d80c0.jpg

如上图,一个Selectot监听五个通道,在使用时首先需要将通道以及对应感兴趣的事件(Accept   read  writer等 )注册到Selector上 。当发生对应的事件时,操作系统回通知我们的程序。在Selector中可以读取到对应的Channel 根据事件类型做出相应的操作。

零拷贝

java NIO中提供的FileChannel拥有transferTo和transferFrom两个方法,可以直接把FileChannel中的数据拷贝到另一个Channel,或者把另一个Channel中的数据拷贝到FileChannel .在操作系统的支持下,通过这个方法传输数据不需要将原数据从内核态拷贝到用户态,再从用户态拷贝到内核态。

 

Reactor实现一个简单的Echo服务器  基于单个线程同时处理多个连接。这样一个Selector同时完成Accept  Read Write事件的监听,同时业逻辑也和Selector在同一个线程中执行。这里可以优化一下将业务逻辑在新的线程中执行。

public class EchoService {    private final String ip;    private final int port;    public EchoService(String ip, int port) {        this.ip = ip;        this.port = port;    }    public void start(){        try {            Selector selector=Selector.open();            ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();            serverSocketChannel.bind(new InetSocketAddress(ip,port))                    .configureBlocking(false)                    .register(selector, SelectionKey.OP_ACCEPT);            while (true){                selector.select();                Set
keys=selector.selectedKeys(); Iterator
iteratorKey=keys.iterator(); while (iteratorKey.hasNext()){ SelectionKey key=iteratorKey.next(); if (key.isAcceptable()){ ServerSocketChannel serverChannel= (ServerSocketChannel) key.channel(); SocketChannel socketChannel=serverChannel.accept(); socketChannel.configureBlocking(false) .register(selector,SelectionKey.OP_READ|SelectionKey.OP_WRITE,ByteBuffer.allocate(1024)); } if (key.isReadable()){ SocketChannel sc= (SocketChannel) key.channel(); ByteBuffer buffer= (ByteBuffer) key.attachment(); buffer.clear(); int readCount= sc.read(buffer); if (readCount<0){ iteratorKey.remove(); continue; } buffer.flip(); sc.write(buffer); System.out.print(new String(buffer.array(),0,readCount)); } iteratorKey.remove(); } } } catch (Exception e) { e.printStackTrace(); }finally { System.out.println("exit"); } }}

现在计算机的核数越来越多,仅仅用一个核心来处理IO连接有点让费系统资源,因此我们可以多见几个Reactor  .其中住Reactor负责TCP的连接(Accept),连接之后分配到子Reactor来处理IO的读写事件。

并且每个子Reactor分别属于一个独立的线程,每个成功连接后的Channel的所有操作自始至终旨在一个线程处理。这样保证了同一个请求的所有状态和上下文在同一个线程中,方便监控请求相应状态。

5730f49eb765eb65be1b504ca144cf4d8d2.jpg

具体代码实现 EchoService为例:

public class EchoService {    private static final Logger logger= LoggerFactory.getLogger(EchoService.class);    private final String ip;    private final int port;    public EchoService(String ip, int port) {        this.ip = ip;        this.port = port;    }    public void start(){        logger.info("echo service start......");        try {            Selector selector=Selector.open();            ServerSocketChannel serverSocketChannel=ServerSocketChannel.open();            serverSocketChannel.bind(new InetSocketAddress(ip,port))                    .configureBlocking(false)                    .register(selector,SelectionKey.OP_ACCEPT);            int coreNum = Runtime.getRuntime().availableProcessors();            Processor[] processors = new Processor[coreNum];            for (int i = 0; i < processors.length; i++) {                logger.info("creat processor :{}",i+1);                processors[i] = new Processor();            }            int index=0;            while (Status.running){                selector.select();                Set
keys=selector.selectedKeys(); Iterator
iterator=keys.iterator(); while (iterator.hasNext()){ SelectionKey selectionKey=iterator.next(); iterator.remove(); if (selectionKey.isAcceptable()){ ServerSocketChannel currServerSocketChannel= (ServerSocketChannel) selectionKey.channel(); SocketChannel socketChannel=currServerSocketChannel.accept(); socketChannel.configureBlocking(false); logger.info("Accept request from {}",socketChannel.getRemoteAddress()); Processor processor=processors[(++index)%coreNum]; processor.addChannel(socketChannel); } } } } catch (IOException e) { logger.error("io exception {}",e.getMessage()); } }}
public class Processor {    private static final Logger logger= LoggerFactory.getLogger(Processor.class);    private static final ExecutorService service=            Executors.newFixedThreadPool(2*Runtime.getRuntime().availableProcessors());    private final Selector selector;    private volatile boolean running=true;    public Processor() throws IOException {        this.selector= SelectorProvider.provider().openSelector();    }    public void addChannel(SocketChannel socketChannel){        try {            socketChannel.register(this.selector, SelectionKey.OP_READ);            if (running){                running=false;                start();            }            wakeup();        } catch (ClosedChannelException e) {            logger.error("register channel error :{}",e.getMessage());        }    }    private void wakeup(){        this.selector.wakeup();    }    private void start(){        service.submit(new ProcessorTask(selector));    }}
public class ProcessorTask implements Runnable {    private final static Logger logger= LoggerFactory.getLogger(ProcessorTask.class);    private Selector selector;    ProcessorTask(Selector selector) {        this.selector = selector;    }    @Override    public void run() {        logger.info("{}\tsub reactor start listener",Thread.currentThread().getName());        while (Status.running){            try {                selector.select();                Set
keys=selector.selectedKeys(); Iterator
iterator=keys.iterator(); while (iterator.hasNext()){ SelectionKey key=iterator.next(); iterator.remove(); if (key.isReadable()){ ByteBuffer buffer= ByteBuffer.allocate(1024); SocketChannel socketChannel= (SocketChannel) key.channel(); int count=socketChannel.read(buffer); if (count<0){ socketChannel.close(); key.cancel(); logger.info("{}\t Read ended",socketChannel); }else if (count==0){ logger.info("{}\t Message size is 0",socketChannel); }else { buffer.flip(); socketChannel.write(buffer); logger.info("{}\t Read message{}",socketChannel,new String(buffer.array())); } } } } catch (IOException e) { logger.error("select error :{}",e.getMessage()); } } }}

在EchoService中 ,主Reactor接受到新的连接后,将channel注册到subReactor的Selector中。每个子Reactor都有一个自己的Selector对象,并有独立的一个线程处理。

转载于:https://my.oschina.net/wang520/blog/3036562

你可能感兴趣的文章
总结出来的一些ASP.NET程序性能优化的注意事项[不断补充]
查看>>
对象合成复用之策略模式
查看>>
步步为营 .NET 设计模式学习笔记 八、State(状态模式)
查看>>
MEF(Managed Extensibility Framework)有选择性地使用扩展组件
查看>>
在Brackets中使用Emmet
查看>>
VC 最爱问的问题:你这个创业项目,如果腾讯跟进了,而且几乎是产品上完全复制,你会怎么办?...
查看>>
SGU 303 Great Berland Wall(计算几何判环+最小割)
查看>>
3D游戏引擎剖析
查看>>
21. D3D-Sprite
查看>>
转载 关于使用typeid时要注意的问题
查看>>
JAVA的String 类
查看>>
cocos2d 的touch事件要点
查看>>
程序入口
查看>>
Spring MVC 3.0 RestTemplate
查看>>
ADO.NET访问数据集的表、行和列
查看>>
Flink - Working with State
查看>>
svn add xxx.txt 提示A (bin) xxx.txt
查看>>
数据库设计
查看>>
C语言 · 分数统计
查看>>
推断两条单链表是否相交
查看>>