`
Mojarra
  • 浏览: 128738 次
  • 性别: Icon_minigender_1
社区版块
存档分类
最新评论

Reactor Pattern (二)

阅读更多

 

 

JDK1.4 后, Sun 积极推广 New IO ,其中 non-blocking 是新的 socket 编程模式,大幅度提高了服务器端 socket 并发处理能力, Selector, SocketChannel SelectionKey 这三个类配合使用,可以构成一个比较经典的 Reactor 模式。其中 Selector 类只筛选 socket 通讯中感兴趣的事件,如接受连接,读取数据,写数据等。 SocketChannel 是真正的 socket 通讯的实现者,负责读传到服务器端 socket 数据和往客户端 socket 写数据。 SelectionKey 是对 socket 通讯事件的封装。在实行一个 non-blocking IO reactor 模式过程中,我仍然按照 reactor 模式的定义,设计多路复用分发器,分配器,任务处理程序,并模拟并发请求客户端。

 

 

多路复用分发器

DemultiPlexer 继承了 Thread 类,在 signal 方法中,调用 selector.select() 函数,该函数等待一个 SocketChannel 中的事件出现才返回。在 signal 方法中,对两种事件感兴趣,一个是 OP_ACCEPT, 一个是 OP_READ ,当收到 OP_ACCEPT 事件时,把 SelectionKey 中的 channel 中转化成 ServerSocketChannel ,并且掉中 ServerSocketChannel.accpet() 方法,返回一个 SocketChannel ,然后在设置成 non-blocking IO 模式,最后注册 OP_READ 事件。当收到一个 OP_READ 事件时,想 Dispatcher 注册这个事件,让 Dispatcher 去启动一个线程去处理该事件,处理完成后,并把该事件从 Selector 中撤销。

 

class DemultiPlexer extends Thread {
	private Selector selector;
	private Dispatcher dispatcher;

	@Override
	public void run() {
		for (;;) {
			try {
				signal();
			} catch (IOException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}

	private void signal() throws IOException, InterruptedException {
		int c = selector.select();
		if (c <= 0) {
			return;
		}
		Iterator<SelectionKey> keys = selector.selectedKeys().iterator();
		while (keys.hasNext()) {
			SelectionKey key = keys.next();
			keys.remove();
			if (key.isValid()) {
				if (key.isAcceptable()) {
					accept(key);
				} else if (key.isReadable()) {
					dispatcher.register(key);
				}
			}
		}
	}

	private void accept(SelectionKey key) throws IOException {
		ServerSocketChannel ssc = (ServerSocketChannel) key.channel();
		SocketChannel channel = ssc.accept();
		channel.configureBlocking(false);
		channel.register(selector, SelectionKey.OP_READ);
//		key.cancel();
	}

	public DemultiPlexer(Selector selector, Dispatcher dispatcher) {
		this.selector = selector;
		this.dispatcher = dispatcher;
	}
}
 

 

分配器

分配器的主要职责是负责监视 SelectionKey 队列之中已经被选取的 OP_READ 事件,如果队列中无任何事件,分配器线程处于中断状态,一旦有新的事件被注册到分配器中,分配器线程会被唤醒,去处理这个事件。为了说明 Reactor 模式,本例中,设计了只要有事件被注册,分配器就新建一个请求处理线程去读取数据这种简单的资源调度机制。在实际应用中,线程、数据库连接,内存等等都是资源,在有限的情况下,分配这些资源都会比这个模型复杂的多。

 

class Dispatcher extends Thread {
	 
	private static LinkedBlockingQueue<SelectionKey> queue = new LinkedBlockingQueue<SelectionKey>();

	public void run() {
		for (;;) {
			try {
				dispatch();
			} catch (IOException e) {
				e.printStackTrace();
			} catch (InterruptedException e) {
				
                            e.printStackTrace();                            
			}
		}
	}

	public void dispatch() throws IOException, InterruptedException {
		synchronized (queue) {
			if (queue.size() <= 0) {
				queue.wait();
			} else {
                                //拿出对列中的第一个事件,创建一个新的任务处理程序
                                SelectionKey key = queue.poll();  
				new RequestHandler(key).start();
			}
		}
	}
      /**
       * 注册新的事件,并唤醒当前线程。
       */  
      public void register(SelectionKey key) throws IOException {
		synchronized (queue) {
			queue.add(key);
			queue.notify();
		}		 
	}

	 
}

 

 

请求处理程序

请求处理程序是真正的业务处理代码。本例中主要负责读取客户端 socket 写入的数据,然后把当前的 SelectionKey 取消,让下一次 select 操作忽略掉当前的 SelectionKey. 这个类中使用了 Charset 辅助类来完成把读取的字节反编成一个可读的字符串。

 

class RequestHandler extends Thread {
	private SelectionKey selectionKey;
	private static Charset utf8 = Charset.forName("utf-8");

	@Override
	public void run() {
		SocketChannel socketChannel = (SocketChannel) selectionKey.channel();
		ByteBuffer buff = ByteBuffer.allocate(128);
		try {  
                        // make sure the socketchannel is connected. 
			if (!socketChannel.isConnected())
				socketChannel.finishConnect();
                        

                        // read the data and print it after decode.
			while (socketChannel.read(buff) > 0) {
				buff.flip();
				System.out.println(utf8.decode(buff));
				buff.clear();
			}
			
			//remove the key for next selection operation
			selectionKey.cancel();			 
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public RequestHandler(SelectionKey selectionKey) {
		this.selectionKey = selectionKey;
	}
}

 

 

请求客户端

一个请求客户端连上服务器端后,往服务器端发送 ”Hello non-blocking IO reactor Pattern” 字符串。在 non blocking IO socket 编程中,客户端的 SocketChannel 也使用 Selector ,并注册感兴趣的事件。本例中注册 OP_CONNECT 事件,然后确保 SocketChannel 是连接上服务器端的,再向服务器端发送一个字符串后,请求客户端的任务完成。 main 函数中,模拟了 50 个并发的客户端。

 

class Request extends Thread {
	private int port = 9000;

	@Override
	public void run() {
		Charset utf8 = Charset.forName("utf-8");
		try {
			SocketChannel channel = SocketChannel.open(new InetSocketAddress(port));
			channel.configureBlocking(false);			 
			Selector selector = Selector.open();
			channel.register(selector, SelectionKey.OP_CONNECT);
			if (!channel.isConnected())
				channel.finishConnect();
			channel.write(utf8.encode("Hello non-blocking IO reactor pattern.."));
			channel.close();
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public static void main(String args[]) {
		for (int i = 0; i < 50; i++)
			new Request().start();
	}
}
 

最后启动一个服务器端,来运行这个non-blocking IO的服务器端。在main函数中,首先open一个selector和ServerSocketChannel,配置为non-blocking IO模式,绑定服务端口9000并注册OP_ACCEPT。最后启动多路复用分发器线程和分配器线程。

 

public class ReactorPattern {
	public static void main(String args[]) throws IOException {
		Selector sel = Selector.open();
		ServerSocketChannel ssc = ServerSocketChannel.open();
		ssc.configureBlocking(false);
		ssc.socket().bind(new InetSocketAddress(9000));
		ssc.register(sel, SelectionKey.OP_ACCEPT);
		Dispatcher disp = new Dispatcher();
		disp.start();
		new DemultiPlexer(sel, disp).start();
	}
}

 小结

在non-blocking IO模式中,DemultiPlexer类是担任了筛选事件和注册事件的工作,Selector.select()采用的是blocking方式,调用select方法,都必须等到其有返回值为至,但select方法本身是线程安全的,因此DemultiPlexer类在运行环境下可以存在多个线程,配合多个Dispacher线程一起工作。如

 

 // initialize selector and register OP_ACCEPT
......

Dispatcher disp1 = new Dispatcher();
disp1.start();
new DemultiPlexer(sel, disp1).start();


Dispatcher disp2 = new Dispatcher();
disp2.start();
new DemultiPlexer(sel, disp2).start();

 

预告

Proactor模式是Reactor模式的兄弟,两者有很多相似之处,Proactor模式有何特点,相对于Reactor模式,它又有什么优点呢,如果您感兴趣,请继续关注我的博客。

 

 

[原创内容,版权所有,如有转载,请注明出处,如果您发现文中有什么错误请指出,不胜感激]

0
0
分享到:
评论
1 楼 Mojarra 2011-11-10  
ps:
Non-blocking模式中,Selector注册一个事件后,对这个事件做了适当的处理之后,必须在下一次select操作之前必须调用SelectionKey.cancel()方法。这一个特性限制了Selector在Reactor模式中的应用,试想一下,启动一个任务处理线程后,多路事件分发器在事件轮循中并不知道这个线程处理此事件是处于何种状态。这个事件到底有没有被处理完成,多路事件分发器是不清楚的,无论是SelectionKey,还是SocketChannel,都没有一个合适的方法知道它所代表的事件已经被处理。如果检测到一个key后,立即调用这个key的cancel方法,会发生一些意象不到的效果。

一个比较笨拙的办法是在Dispatcher中维护一个正在处理的事件的HashMap,在一个新的事件被注册到分配器时,首先检查这个事件的hash是否已存在于当前的hashmap中,如果存在则不启动新的任务处理线程,否则启动。事件处理完成后,从hashmap中删除此事件的hash。

还有一种方法是OP_ACCEPT事件从selector轮循中剥离出去,Selector只放在socketChannel中使用,ServerSocketChannel仍使用类似面向线程的socket编程方法,在ServerSocketChannel收到一个接入的连接时,启动一个线程,让socketChannel在线程中注册感兴趣的事件以及移除该事件。这样会导致一个任务处理线程开一个Selector实例,系统的开销增加。

这两种方法中,前者的开销小,编程复杂度低,整体上优于第二个方法。

相关推荐

    Reactor Pattern (一)

    NULL 博文链接:https://mojarra.iteye.com/blog/1225094

    reactor-siemens

    Reactor design pattern handles service requests that are delivered concurrently to an application by one or more clients

    Network Pattern

    Network Pattern 常用的网络编程模式,如Reactor、Proactor

    reactor-io-pattern

    这就是io设计模式---reactor Reactor 是事件驱动的,并使用 os api(select、poll、epoll、kqueue、iocp 等)来调度 socket 事件。 编程只需要向Reactor注册事件源socket发生的读、写或except以及具体的事件处理...

    Scalable IO in Java -- Doug Lea

    Scalable network services,Event-driven processing,Reactor pattern and so on。

    Node.js Design Patterns Second Edition[July 2016]

    It covers the Node.js ecosystem and its philosophy, a short introduction to Node.js version 6, ES2015, and the reactor pattern. Chapter 2, Node.js Essential Patterns, introduces the first steps ...

    Packt.Reactive.Programming.in.Kotlin

    Chapter 1, A Short Introduction to Reactive Programming, helps you understand the context, thinking pattern, and principles of reactive programming. Chapter 2, Functional Programming with Kotlin and ...

    Spring Boot Messaging

    In addition, you’ll discover the new Spring Integration DSL and use it with Spring Cloud Stream to build integration solutions using every enterprise integration pattern. Finally, you’ll see Spring...

    delphi 7 gif控件

    // Incorporated additional modifications by Alexey Barkovoy (clootie@reactor.ru) // found in his Delphi 6 GifImage.pas (from 22-Dec-2001). // // Alexey Barkovoy's Delphi 6 gifimage.pas can be ...

Global site tag (gtag.js) - Google Analytics