±à¼ÍƼö: |
±¾ÎÄÀ´×ÔÓÚoschina£¬±¾ÎĽéÉÜÁ˾µäµÄServerSocket¼àÌýÑ»·¡¢Reactor
ģʽÒÔ¼°WorkerÏ̵߳ÈÏà¹ØÄÚÈÝ¡£ |
|
Java¿ÉÀ©Õ¹IO
Doug Lee
´ó¸Ù
¿ÉÀ©Õ¹µÄÍøÂç·þÎñ
ʼþÇý¶¯
Reactor ģʽ
»ù´¡°æ
¶àḬ̈߳æ
ÆäËû±äÌå
java.io°üÖзÖ×èÈûIO APIÒ»ÀÀ
ÍøÂçÓ¦Ó÷þÎñÆ÷
Web·þÎñÆ÷£¬·Ö²¼Ê½¶ÔÏóϵͳµÈµÈ
ËüÃǵĹ²Í¬Ìصã
ReadÇëÇó
½âÂëÇëÇó±¨ÎÄ
ÒµÎñ´¦Àí
±àÂëÏìÓ¦±¨ÎÄ
·¢ËÍÏìÓ¦
ʵ¼ÊÓ¦ÓÃÖÐÿһ¸ö²½ÖèµÄ²»Ò»Ñù
XML½âÎö
Îļþ´«Êä
¶¯Ì¬Éú³ÉÍøÒ³
¼ÆËãÐÍ·þÎñ
¾µäµÄ·þÎñÉè¼Æ 
ÿ¸öÏß³ÌÔËÐÐÒ»¸öhandler
¾µäµÄServerSocket¼àÌýÑ»·
class Server
implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// ÕâÀïʹÓõ¥Ï̻߳òÕßÏ̳߳Ø
} catch (IOException ex) { /* ... */ }
}
static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { /* ... */ }
}
private byte[] process(byte[] cmd) { /* ... */
}
}
} |
Note: Òì³£´¦ÀíÊ¡ÂÔ
¸ß¿ÉÀ©Õ¹ÐÔÄ¿±ê
ѹÁ¦³ÖÐøÔö´óʱ·þÎñÓÅÑŵĽµ¼¶£¨¿Í»§¶ËÔö¶à£©
ÐÔÄÜËæ×Å×ÊÔ´£¨CPU£¬Äڴ棬´ÅÅÌ£¬´ø¿í£©µÄÌáÉý³ÖÐøÔö¼Ó
¸ß¿ÉÓúÍÐÔÄܵÄÄ¿±ê
µÍÑÓ³Ù
Ó¦¶ÔÇëÇó¼â·å
·þÎñÖÊÁ¿¿É¿Ø
·Ö¶øÖÎÖ®Êǽâ¾öÀ©Õ¹ÐÔÎÊÌâµÄ³£Ó÷½·¨
·Ö¶øÖÎÖ®
°Ñ´¦ÀíÁ÷³ÌÇзֳɸüСµÄtask£¬Ã¿¸ötask¶¼ÊÇ·Ç×èÈûµÄ
Ö»Óе±ÈÎÎñ×¼±¸ºÃ²ÅÈ¥Ö´ÐУ¬IOʼþÄ£ÐÍͨ³£ÊÇ´¥·¢Æ÷»úÖÆ
java.nio°ü°üº¬»ù±¾µÄ·Ç×èÈû»úÖÆ
·Ç×èÈû¶ÁºÍд
·Ö·¢ÈÎÎñ °Ñtask¹ØÁªµ½IOʼþÄ£ÐÍ
¸ü¶àÏë·¨
ʼþÇý¶¯Éè¼Æ
ʼþÇý¶¯Éè¼Æ
ͨ³£¸ü¸ßЧ
¸üÉÙµÄ×ÊÔ´ ²»ÐèҪΪÿһ¸öclientÆô¶¯Ò»¸öÏß³Ì
¼õÉÙ¿ªÏú ¼õÉÙÉÏÏÂÎÄÇл»ÒÔ¼°¸üÉÙµÄËø¾ºÕù
ʼþ·Ö·¢½ÏÂý ±ØÐë°ÑactionºÍʱ¼ä°ó¶¨ÔÚÒ»Æð
±à³ÌÄѶȽϸß
±ØÐë°ÑÁ÷³Ì·Ö¸îΪһϵÁзÇ×èÈûµÄÈÎÎñµ¥Ôª
ÓëGUIµÄʼþÄ£ÐÍÀàËÆ
²»ÄÜÏû³ýËùÓеÄ×èÈû£¬È磺GC¡¢ÄÚ´æÒ³´íÎóµÈµÈ
±ØÐë×¢Òâ·þÎñµÄ״̬µÄ±ä»¯
Java AWTͼÐαà³ÌµÄʼþÇý¶¯

ʼþÇý¶¯µÄIOÄ£ÐÍÓë´ËÀàËÆ£¬µ«Éè¼ÆÉÏÓÐËù²»Í¬
ReactorÄ£ÐÍ
Reacttor¸ºÔð·Ö·¢Ê¼þµ½¶ÔÓ¦µÄhandler£¬ÀàËÆAWTÏß³Ì
HandlersÊÇ·Ç×èÈûµÄÈÎÎñ£¬ÀàËÆAWTÖеÄActionListeners
Manage¸ºÔð°Ñhandler°ó¶¨µ½Ê¼þÉÏ
²Î¼ûÊ©ÃÜÌØµÈÈËËùÖøµÄ¡¶Pattern-Oriented Software Architecture¡·¾í2
ÒÔ¼°Richard Stevens¹ØÓÚÍøÂçµÄÊýÒÔ¼°MattWelshµÄSEDA¿ò¼ÜµÈµÈ
Reactor»ù´¡°æ 
µ¥Ḭ̈߳æ
java.nioÌṩµÄÖ§³Ö
Channels
ͨµÀÊÇÎļþ¡¢socketÖ®¼äµÄÁ¬½Ó£¬ Ö§³Ö·Ç×èÈû¶ÁÈ¡
Buffers
Êý×é¶ÔÏ󣬿ÉÒÔ±»ChannelÖ±½Ó¶Áд
Selectors
¸ºÔðɸѡÄÇЩChannelÓÐIOʼþ
SelectionKeys
±£´æIOʼþ״̬ºÍ°ó¶¨¶ÔÏó
Reactor 1£º´´½¨
class Reactor
implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind( new InetSocketAddress(port)
);
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register( selector,
SelectionKey.OP_ACCEPT );
sk.attach(new Acceptor());
}
/*
Ò²¿ÉÒÔÓÃSPI provider£¬¸üÃ÷ȷһЩ:
SelectorProvider p = SelectorProvider.provider();
selector = p.openSelector();
serverSocket = p.openServerSocketChannel();
*/ |
Reactor 2£º·Ö·¢Ñ»·
public void
run() { // ͨ³£Æô¶¯Ò»¸öÐÂÏß³Ì
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next());
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}
void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment());
if (r != null)
r.run();
} |
Reactor 3£ºAcceptor
class Acceptor
implements Runnable { // ÄÚÖÃÀà
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
} catch (IOException ex) { /* ... */ }
}
}
} |

Reactor 4£º½¨Á¢Handler
final class
Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws
IOException {
socket = c;
c.configureBlocking(false);
// ³¢ÊÔ¼àÌý¶Áʼþ
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ } |
Reactor 5£º´¦ÀíÇëÇó
public void
run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}
void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// ¶ÁÍêÖ®ºó£¬Í¨³£¼àÌýдʼþ
sk.interestOps(SelectionKey.OP_WRITE);
}
}
void send() throws IOException {
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
} |
Handlers״̬Á÷ת
GoFµÄ״̬ģʽµÄ¼òµ¥Ó¦ÓÃ
class Handler
{ // ...
public void run() { // ³õʼ״̬ÊÇread
socket.read(input);
if (inputIsComplete()) {
process();
sk.attach(new Sender());
sk.interest(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}
class Sender implements Runnable {
public void run(){ // ...
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
}
} |
¶àḬ̈߳æ
Æô¶¯Êʵ±µÄÏß³ÌÊý£¬ÓÈÆäÊǶàºËÇé¿öÏÂ
¹¤×÷Ïß³Ì
Reactors±ØÐë¿ìËÙ´¥·¢handlers
HandlerÈÎÎñÖØ£¬»áÍÏÂýReactor
°Ñ¼ÆËãÐ͵Ť×÷½»¸øÆäËûµÄÏß³Ì
¶à¸öReadctorÏß³Ì
ReactorÏß³ÌרעÓÚIO²Ù×÷
°ÑѹÁ¦·Ö̯µ½ÆäËûµÄreactor
¸ºÔؾùºâÒª¿¼ÂÇÊÊÅäCPUºÍIOËÙÂÊ
WorkerÏß³Ì
Ö»´¦Àí¼ÆËãÐÍÈÎÎñ£¬¼ÓËÙReactorÏß³Ì
ÀàËÆÓÚPOSA2ÊéÉϵÄProactorģʽ
±È¸Ä³ÉʼþÇý¶¯Ä£Ê½¼òµ¥
Ö»´¦Àí¼ÆËãÐÍÈÎÎñ
ÖØ¸´IO±È½ÏÄÑ´¦Àí
×îºÃÔÚµÚÒ»´Î¶ÁµÄʱºò¾ÍÈ«²¿¶Á³öÀ´µ½»º´æÖÐ
ʹÓÃÏ̳߳أ¬·½±ã¿ØÖÆ
ͨ³£Ö»ÐèÉÙÁ¿µÄỊ̈߳¬±È¿Í»§¶ËÊýÁ¿ÉٵĶà
WorkerÏ̳߳Ø

´øÏ̳߳صÄHandler
class Handler
implements Runnable {
// ʹÓà util.concurrentÖеÄÏ̳߳Ø
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() { // ...
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new Processer());
}
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE);
}
class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
} |
µ÷ÓÃÈÎÎñµÄ·½Ê½
Á´Ê½´«µÝ
ÿ¸öÈÎÎñ¸ºÔðµ÷ÓÃÏÂÒ»¸öÈÎÎñ£¬Í¨³£ÊÇЧÂÊ×î¸ßµÄ£¬µ«ÈÝÒ׳öÎÊÌâ
Óɵ½Ã¿¸öhandlerµÄ·Ö·¢Æ÷»Øµ÷
ͨ¹ýÉèÖÃ״̬£¬°ó¶¨¶ÔÏóµÈ·½Ê½ÊµÏÖ£¬GoF MediatorģʽµÄ±äÌå
¶ÓÁÐ
¾ÍÏñÉÏÃæµÄÀý×Ó£¬Í¨¹ý¶ÓÁд«µÝbufferÖеÄÊý¾Ý
Future
µ÷Ó÷½Í¨¹ýjoin»òÕßwait/notify·½·¨»ñȡÿ¸ötaskµÄÖ´Ðнá¹û
ʹÓÃPooledExecutor
µ÷¶ÈworkerÏß³Ì
Ö÷Òª·½·¨£ºexecute(Runnable r)
ÓÐÈçÏ¿ØÖƲÎÊý
¶ÓÁÐÀàÐÍ
×î´óÏß³ÌÊý
×îСÏß³ÌÊý
"Warm" versus on-demand threads
×Ô¶¯»ØÊÕ¿ÕÏÐÏß³Ì
ÐèÒªµÄʱºòÔÙ´´½¨ÐµÄÏß³Ì
¶àÖÖ²ßÂÔÓ¦¶ÔÈÎÎñ±¥ºÍ
×èÈû£¬¶ªÆú£¬producer-runs,µÈµÈ
¶à¸öReactorÏß³Ì
ʹÓÃReactor³Ø
ÊÊÅäCPUºÍIOËÙÂÊ
¾²Ì¬´´½¨»ò¶¯Ì¬´´½¨
ÿһ¸öreactor¶¼ÓÐ×Ô¼ºµÄSelector£¬Ị̈߳¬·Ö·¢Ñ»·
Ö÷acceptor·Ö·¢¸øÆäËûµÄreactors
Selector[] selectors;
int next = 0;
class Acceptor { // ...
public synchronized void run() { ...
Socket connection = serverSocket.accept();
if (connection != null)
new Handler(selectors[next], connection);
if (++next == selectors.length)
next = 0;
}
} |
¶àReactorʾÀý 
ÆäËüµÄjava.nioÌØÐÔ
ÿ¸öReactor°üº¬¶à¸öSelector
°Ñ²»Í¬µÄhandler°ó¶¨µ½²»Í¬µÄIOʼþʱÐèÒªÌØ±ðСÐÄͬ²½ÎÊÌâ
Îļþ´«Êä
×Ô¶¯Îļþ´«Ê䣺file-to-net»òÕßnet-to-file¸´ÖÆ
ÄÚ´æÓ³ÉäÎļþ
ͨ¹ýbuffers·ÃÎÊÎļþ
Ö±½Ó·ÃÎÊbuffer
ÓÐʱ¿ÉÒÔ´ïµ½Á㿽±´µÄÄ¿µÄ
But have setup and finalization overhead
·Ç³£Êʺϳ¤Á¬½ÓÓ¦ÓÃ
À©Õ¹ÍøÂçÁ¬½Ó
ͬʱÊÕµ½¶à¸öÇëÇó
¿Í»§¶Ë½¨Á¢Á¬½Ó
¿Í»§¶Ë·¢ËÍÒ»Á¬´®µÄÏûÏ¢/ÇëÇó
¿Í»§¶Ë¶Ï¿ªÁ¬½Ó
¾Ù¸öÀý×Ó
Êý¾Ý¿âÊÂÎñ¼àÊÓÆ÷
¶àÈËÔÚÏßÓÎÏ·¡¢ÁÄÌìÊҵȵÈ
À©Õ¹ÉÏÎĵĻù´¡ÍøÂçÄ£ÐÍ
±£³ÖÐí¶àÏà¶Ô³¤Ê±¼äµÄ´æ»îµÄ¿Í»§¶Ë
¸ú×Ù¿Í»§¶Ë£¬±£³Ö»á»°×´Ì¬£¨°üÀ¨¶ªÆú£©
·Ö²¼Ê½·þÎñ£¬ºá¿ç¶à¸öÖ÷»ú
APIÒ»ÀÀ
Buffer
ByteBuffer
£¨CharBuffer¡¢LongBufferµÈµÈ£©
Channel
SelectableChannel
SocketChannel
ServerSocketChannel
FileChannel
Selector
SelectionKey
Buffer
abstract class
Buffer {
int capacity();
int position();
Buffer position(int newPosition);
int limit();
Buffer limit(int newLimit);
Buffer mark();
Buffer reset();
Buffer clear();
Buffer flip();
Buffer rewind();
int remaining();
boolean hasRemaining();
boolean isReadOnly();
} |

ByteBuffer
abstract class
ByteBuffer extends Buffer {
static ByteBuffer allocateDirect(int capacity);
static ByteBuffer allocate(int capacity);
static ByteBuffer wrap(byte[] src, int offset,
int len);
static ByteBuffer wrap(byte[] src);
boolean isDirect();
ByteOrder order();
ByteBuffer order(ByteOrder bo);
ByteBuffer slice();
ByteBuffer duplicate();
ByteBuffer compact();
ByteBuffer asReadOnlyBuffer();
byte get();
byte get(int index);
ByteBuffer get(byte[] dst, int offset, int length);
ByteBuffer get(byte[] dst);
ByteBuffer put(byte b);
ByteBuffer put(int index, byte b);
ByteBuffer put(byte[] src, int offset, int length);
ByteBuffer put(ByteBuffer src);
ByteBuffer put(byte[] src);
char getChar();
char getChar(int index);
ByteBuffer putChar(char value);
ByteBuffer putChar(int index, char value);
CharBuffer asCharBuffer();
short getShort();
short getShort(int index);
ByteBuffer putShort(short value);
ByteBuffer putShort(int index, short value);
ShortBuffer asShortBuffer();
int getInt();
int getInt(int index);
ByteBuffer putInt(int value);
ByteBuffer putInt(int index, int value);
IntBuffer asIntBuffer();
long getLong();
long getLong(int index);
ByteBuffer putLong(long value);
ByteBuffer putLong(int index, long value);
LongBuffer asLongBuffer();
float getFloat();
float getFloat(int index);
ByteBuffer putFloat(float value);
ByteBuffer putFloat(int index, float value);
FloatBuffer asFloatBuffer();
double getDouble();
double getDouble(int index);
ByteBuffer putDouble(double value);
ByteBuffer putDouble(int index, double value);
DoubleBuffer asDoubleBuffer();
} |
Channel
interface Channel
{
boolean isOpen();
void close() throws IOException;
}
interface ReadableByteChannel extends Channel
{
int read(ByteBuffer dst) throws IOException;
}
interface WritableByteChannel extends Channel
{
int write(ByteBuffer src) throws IOException;
}
interface ScatteringByteChannel extends ReadableByteChannel
{
int read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
}
interface GatheringByteChannel extends WritableByteChannel
{
int write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
} |
SelectableChannel
abstract class
SelectableChannel implements Channel {
int validOps();
boolean isRegistered();
SelectionKey keyFor(Selector sel);
SelectionKey register(Selector sel, int ops) throws
ClosedChannelException;
void configureBlocking(boolean block) throws IOException;
boolean isBlocking();
Object blockingLock();
} |
SocketChannel
abstract class
SocketChannel implements ByteChannel ... {
static SocketChannel open() throws IOException;
Socket socket();
int validOps();
boolean isConnected();
boolean isConnectionPending();
boolean isInputOpen();
boolean isOutputOpen();
boolean connect(SocketAddress remote) throws IOException;
boolean finishConnect() throws IOException;
void shutdownInput() throws IOException;
void shutdownOutput() throws IOException;
int read(ByteBuffer dst) throws IOException;
int read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
int write(ByteBuffer src) throws IOException;
int write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
} |
ServerSocketChannel
abstract class
ServerSocketChannel extends ... {
static ServerSocketChannel open() throws IOException;
int validOps();
ServerSocket socket();
SocketChannel accept() throws IOException;
} |
FileChannel
abstract class
FileChannel implements ... {
int read(ByteBuffer dst);
int read(ByteBuffer dst, long position);
int read(ByteBuffer[] dsts, int offset, int length);
int read(ByteBuffer[] dsts);
int write(ByteBuffer src);
int write(ByteBuffer src, long position);
int write(ByteBuffer[] srcs, int offset, int length);
int write(ByteBuffer[] srcs);
long position();
void position(long newPosition);
long size();
void truncate(long size);
void force(boolean flushMetaDataToo);
int transferTo(long position, int count,
WritableByteChannel dst);
int transferFrom(ReadableByteChannel src,
long position, int count);
FileLock lock(long position, long size, boolean
shared);
FileLock lock();
FileLock tryLock(long pos, long size, boolean
shared);
FileLock tryLock();
static final int MAP_RO, MAP_RW, MAP_COW;
MappedByteBuffer map(int mode, long position,
int size);
}
NOTE: ËùÓеķ½·¨¶¼Å×IOException |
Selector
abstract class
Selector {
static Selector open() throws IOException;
Set keys();
Set selectedKeys();
int selectNow() throws IOException;
int select(long timeout) throws IOException;
int select() throws IOException;
void wakeup();
void close() throws IOException;
} |
SelectionKey
abstract class
SelectionKey {
static final int OP_READ, OP_WRITE, OP_CONNECT,
OP_ACCEPT;
SelectableChannel channel();
Selector selector();
boolean isValid();
void cancel();
int interestOps();
void interestOps(int ops);
int readyOps();
boolean isReadable();
boolean isWritable();
boolean isConnectable();
boolean isAcceptable();
Object attach(Object ob);
Object attachment();
} |
|