自
JDK1.4
后,
Sun
积极推广
New IO
,其中
non-blocking
是新的
socket
编程模式,大幅度提高了服务器端
socket
并发处理能力,
Selector, SocketChannel
和
SelectionKey
这三个类配合使用,可以构成一个比较经典的
Reactor
模式。其中
Selector
类只筛选
socket
通讯中感兴趣的事件,如接受连接,读取数据,写数据等。
SocketChannel
是真正的
socket
通讯的实现者,负责读传到服务器端
socket
数据和往客户端
socket
写数据。
SelectionKey
是对
socket
通讯事件的封装。在实行一个
non-blocking IO
的
reactor
模式过程中,我仍然按照
reactor
模式的定义,设计多路复用分发器,分配器,任务处理程序,并模拟并发请求客户端。
多路复用分发器
DemultiPlexer
继承了
Thread
类,在
signal
方法中,调用
selector.select()
函数,该函数等待一个
SocketChannel
中的事件出现才返回。在
signal
方法中,对两种事件感兴趣,一个是
OP_ACCEPT,
一个是
OP_READ
,当收到
OP_ACCEPT
事件时,把
SelectionKey
中的
channel
中转化成
ServerSocketChannel
,并且掉中
ServerSocketChannel.accpet()
方法,返回一个
SocketChannel
,然后在设置成
non-blocking IO
模式,最后注册
OP_READ
事件。当收到一个
OP_READ
事件时,想
Dispatcher
注册这个事件,让
Dispatcher
去启动一个线程去处理该事件,处理完成后,并把该事件从
Selector
中撤销。
class DemultiPlexer extends Thread {
private Selector selector;
private Dispatcher dispatcher;
@Override
public void run() {
for (;;) {
try {
signal();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private void signal() throws IOException, InterruptedException {
int c = selector.select();
if (c <= 0) {
return;
}
Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
while (keys.hasNext()) {
SelectionKey key = keys.next();
keys.remove();
if (key.isValid()) {
if (key.isAcceptable()) {
accept(key);
} else if (key.isReadable()) {
dispatcher.register(key);
}
}
}
}
private void accept(SelectionKey key) throws IOException {
ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
SocketChannel channel = ssc.accept();
channel.configureBlocking(false);
channel.register(selector, SelectionKey.OP_READ);
// key.cancel();
}
public DemultiPlexer(Selector selector, Dispatcher dispatcher) {
this.selector = selector;
this.dispatcher = dispatcher;
}
}
分配器
分配器的主要职责是负责监视
SelectionKey
队列之中已经被选取的
OP_READ
事件,如果队列中无任何事件,分配器线程处于中断状态,一旦有新的事件被注册到分配器中,分配器线程会被唤醒,去处理这个事件。为了说明
Reactor
模式,本例中,设计了只要有事件被注册,分配器就新建一个请求处理线程去读取数据这种简单的资源调度机制。在实际应用中,线程、数据库连接,内存等等都是资源,在有限的情况下,分配这些资源都会比这个模型复杂的多。
class Dispatcher extends Thread {
private static LinkedBlockingQueue<SelectionKey> queue = new LinkedBlockingQueue<SelectionKey>();
public void run() {
for (;;) {
try {
dispatch();
} catch (IOException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
public void dispatch() throws IOException, InterruptedException {
synchronized (queue) {
if (queue.size() <= 0) {
queue.wait();
} else {
//拿出对列中的第一个事件,创建一个新的任务处理程序
SelectionKey key = queue.poll();
new RequestHandler(key).start();
}
}
}
/**
* 注册新的事件,并唤醒当前线程。
*/
public void register(SelectionKey key) throws IOException {
synchronized (queue) {
queue.add(key);
queue.notify();
}
}
}
请求处理程序
请求处理程序是真正的业务处理代码。本例中主要负责读取客户端
socket
写入的数据,然后把当前的
SelectionKey
取消,让下一次
select
操作忽略掉当前的
SelectionKey.
这个类中使用了
Charset
辅助类来完成把读取的字节反编成一个可读的字符串。
class RequestHandler extends Thread {
private SelectionKey selectionKey;
private static Charset utf8 = Charset.forName("utf-8");
@Override
public void run() {
SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
ByteBuffer buff = ByteBuffer.allocate(128);
try {
// make sure the socketchannel is connected.
if (!socketChannel.isConnected())
socketChannel.finishConnect();
// read the data and print it after decode.
while (socketChannel.read(buff) > 0) {
buff.flip();
System.out.println(utf8.decode(buff));
buff.clear();
}
//remove the key for next selection operation
selectionKey.cancel();
} catch (IOException e) {
e.printStackTrace();
}
}
public RequestHandler(SelectionKey selectionKey) {
this.selectionKey = selectionKey;
}
}
请求客户端
一个请求客户端连上服务器端后,往服务器端发送
”Hello non-blocking IO reactor Pattern”
字符串。在
non blocking
IO socket
编程中,客户端的
SocketChannel
也使用
Selector
,并注册感兴趣的事件。本例中注册
OP_CONNECT
事件,然后确保
SocketChannel
是连接上服务器端的,再向服务器端发送一个字符串后,请求客户端的任务完成。
在
main
函数中,模拟了
50
个并发的客户端。
class Request extends Thread {
private int port = 9000;
@Override
public void run() {
Charset utf8 = Charset.forName("utf-8");
try {
SocketChannel channel = SocketChannel.open(new InetSocketAddress(port));
channel.configureBlocking(false);
Selector selector = Selector.open();
channel.register(selector, SelectionKey.OP_CONNECT);
if (!channel.isConnected())
channel.finishConnect();
channel.write(utf8.encode("Hello non-blocking IO reactor pattern.."));
channel.close();
} catch (IOException e) {
e.printStackTrace();
}
}
public static void main(String args[]) {
for (int i = 0; i < 50; i++)
new Request().start();
}
}
最后启动一个服务器端,来运行这个non-blocking IO的服务器端。在main函数中,首先open一个selector和ServerSocketChannel,配置为non-blocking IO模式,绑定服务端口9000并注册OP_ACCEPT。最后启动多路复用分发器线程和分配器线程。
public class ReactorPattern {
public static void main(String args[]) throws IOException {
Selector sel = Selector.open();
ServerSocketChannel ssc = ServerSocketChannel.open();
ssc.configureBlocking(false);
ssc.socket().bind(new InetSocketAddress(9000));
ssc.register(sel, SelectionKey.OP_ACCEPT);
Dispatcher disp = new Dispatcher();
disp.start();
new DemultiPlexer(sel, disp).start();
}
}
小结
在non-blocking IO模式中,DemultiPlexer类是担任了筛选事件和注册事件的工作,Selector.select()采用的是blocking方式,调用select方法,都必须等到其有返回值为至,但select方法本身是线程安全的,因此DemultiPlexer类在运行环境下可以存在多个线程,配合多个Dispacher线程一起工作。如
// initialize selector and register OP_ACCEPT
......
Dispatcher disp1 = new Dispatcher();
disp1.start();
new DemultiPlexer(sel, disp1).start();
Dispatcher disp2 = new Dispatcher();
disp2.start();
new DemultiPlexer(sel, disp2).start();
预告
Proactor模式是Reactor模式的兄弟,两者有很多相似之处,Proactor模式有何特点,相对于Reactor模式,它又有什么优点呢,如果您感兴趣,请继续关注我的博客。
[原创内容,版权所有,如有转载,请注明出处,如果您发现文中有什么错误请指出,不胜感激]
分享到:
相关推荐
NULL 博文链接:https://mojarra.iteye.com/blog/1225094
Reactor design pattern handles service requests that are delivered concurrently to an application by one or more clients
Network Pattern 常用的网络编程模式,如Reactor、Proactor
这就是io设计模式---reactor Reactor 是事件驱动的,并使用 os api(select、poll、epoll、kqueue、iocp 等)来调度 socket 事件。 编程只需要向Reactor注册事件源socket发生的读、写或except以及具体的事件处理...
Scalable network services,Event-driven processing,Reactor pattern and so on。
It covers the Node.js ecosystem and its philosophy, a short introduction to Node.js version 6, ES2015, and the reactor pattern. Chapter 2, Node.js Essential Patterns, introduces the first steps ...
Chapter 1, A Short Introduction to Reactive Programming, helps you understand the context, thinking pattern, and principles of reactive programming. Chapter 2, Functional Programming with Kotlin and ...
In addition, you’ll discover the new Spring Integration DSL and use it with Spring Cloud Stream to build integration solutions using every enterprise integration pattern. Finally, you’ll see Spring...
// Incorporated additional modifications by Alexey Barkovoy (clootie@reactor.ru) // found in his Delphi 6 GifImage.pas (from 22-Dec-2001). // // Alexey Barkovoy's Delphi 6 gifimage.pas can be ...