Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Java¸ß²¢·¢Òì²½Socket±à³Ì
 
  6224  次浏览      27
 2018-12-25
 
±à¼­ÍƼö:
±¾ÎÄÀ´×ÔÓÚoschina£¬±¾ÎĽéÉÜÁ˾­µäµÄServerSocket¼àÌýÑ­»·¡¢Reactor ģʽÒÔ¼°WorkerÏ̵߳ÈÏà¹ØÄÚÈÝ¡£

Java¿ÉÀ©Õ¹IO

Doug Lee

´ó¸Ù

¿ÉÀ©Õ¹µÄÍøÂç·þÎñ

ʼþÇý¶¯

Reactor ģʽ

»ù´¡°æ

¶àḬ̈߳æ

ÆäËû±äÌå

java.io°üÖзÖ×èÈûIO APIÒ»ÀÀ

ÍøÂçÓ¦Ó÷þÎñÆ÷

Web·þÎñÆ÷£¬·Ö²¼Ê½¶ÔÏóϵͳµÈµÈ

ËüÃǵĹ²Í¬Ìصã

ReadÇëÇó

½âÂëÇëÇó±¨ÎÄ

ÒµÎñ´¦Àí

±àÂëÏìÓ¦±¨ÎÄ

·¢ËÍÏìÓ¦

ʵ¼ÊÓ¦ÓÃÖÐÿһ¸ö²½ÖèµÄ²»Ò»Ñù

XML½âÎö

Îļþ´«Êä

¶¯Ì¬Éú³ÉÍøÒ³

¼ÆËãÐÍ·þÎñ

¾­µäµÄ·þÎñÉè¼Æ

ÿ¸öÏß³ÌÔËÐÐÒ»¸öhandler

¾­µäµÄServerSocket¼àÌýÑ­»·

class Server implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// ÕâÀïʹÓõ¥Ï̻߳òÕßÏ̳߳Ø
} catch (IOException ex) { /* ... */ }
}

static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { /* ... */ }
}
private byte[] process(byte[] cmd) { /* ... */ }
}
}

Note: Òì³£´¦ÀíÊ¡ÂÔ

¸ß¿ÉÀ©Õ¹ÐÔÄ¿±ê

ѹÁ¦³ÖÐøÔö´óʱ·þÎñÓÅÑŵĽµ¼¶£¨¿Í»§¶ËÔö¶à£©

ÐÔÄÜËæ×Å×ÊÔ´£¨CPU£¬Äڴ棬´ÅÅÌ£¬´ø¿í£©µÄÌáÉý³ÖÐøÔö¼Ó

¸ß¿ÉÓúÍÐÔÄܵÄÄ¿±ê

µÍÑÓ³Ù

Ó¦¶ÔÇëÇó¼â·å

·þÎñÖÊÁ¿¿É¿Ø

·Ö¶øÖÎÖ®Êǽâ¾öÀ©Õ¹ÐÔÎÊÌâµÄ³£Ó÷½·¨

·Ö¶øÖÎÖ®

°Ñ´¦ÀíÁ÷³ÌÇзֳɸüСµÄtask£¬Ã¿¸ötask¶¼ÊÇ·Ç×èÈûµÄ

Ö»Óе±ÈÎÎñ×¼±¸ºÃ²ÅÈ¥Ö´ÐУ¬IOʼþÄ£ÐÍͨ³£ÊÇ´¥·¢Æ÷»úÖÆ

java.nio°ü°üº¬»ù±¾µÄ·Ç×èÈû»úÖÆ

·Ç×èÈû¶ÁºÍд

·Ö·¢ÈÎÎñ °Ñtask¹ØÁªµ½IOʼþÄ£ÐÍ

¸ü¶àÏë·¨

ʼþÇý¶¯Éè¼Æ

ʼþÇý¶¯Éè¼Æ

ͨ³£¸ü¸ßЧ

¸üÉÙµÄ×ÊÔ´ ²»ÐèҪΪÿһ¸öclientÆô¶¯Ò»¸öÏß³Ì

¼õÉÙ¿ªÏú ¼õÉÙÉÏÏÂÎÄÇл»ÒÔ¼°¸üÉÙµÄËø¾ºÕù

ʼþ·Ö·¢½ÏÂý ±ØÐë°ÑactionºÍʱ¼ä°ó¶¨ÔÚÒ»Æð

±à³ÌÄѶȽϸß

±ØÐë°ÑÁ÷³Ì·Ö¸îΪһϵÁзÇ×èÈûµÄÈÎÎñµ¥Ôª

ÓëGUIµÄʼþÄ£ÐÍÀàËÆ

²»ÄÜÏû³ýËùÓеÄ×èÈû£¬È磺GC¡¢ÄÚ´æÒ³´íÎóµÈµÈ

±ØÐë×¢Òâ·þÎñµÄ״̬µÄ±ä»¯

Java AWTͼÐαà³ÌµÄʼþÇý¶¯

ʼþÇý¶¯µÄIOÄ£ÐÍÓë´ËÀàËÆ£¬µ«Éè¼ÆÉÏÓÐËù²»Í¬

ReactorÄ£ÐÍ

Reacttor¸ºÔð·Ö·¢Ê¼þµ½¶ÔÓ¦µÄhandler£¬ÀàËÆAWTÏß³Ì

HandlersÊÇ·Ç×èÈûµÄÈÎÎñ£¬ÀàËÆAWTÖеÄActionListeners

Manage¸ºÔð°Ñhandler°ó¶¨µ½Ê¼þÉÏ

²Î¼ûÊ©ÃÜÌØµÈÈËËùÖøµÄ¡¶Pattern-Oriented Software Architecture¡·¾í2 ÒÔ¼°Richard Stevens¹ØÓÚÍøÂçµÄÊýÒÔ¼°MattWelshµÄSEDA¿ò¼ÜµÈµÈ

Reactor»ù´¡°æ

µ¥Ḭ̈߳æ

java.nioÌṩµÄÖ§³Ö

Channels

ͨµÀÊÇÎļþ¡¢socketÖ®¼äµÄÁ¬½Ó£¬ Ö§³Ö·Ç×èÈû¶ÁÈ¡

Buffers

Êý×é¶ÔÏ󣬿ÉÒÔ±»ChannelÖ±½Ó¶Áд

Selectors

¸ºÔðɸѡÄÇЩChannelÓÐIOʼþ

SelectionKeys

±£´æIOʼþ״̬ºÍ°ó¶¨¶ÔÏó

Reactor 1£º´´½¨

class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind( new InetSocketAddress(port) );
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register( selector, SelectionKey.OP_ACCEPT );
sk.attach(new Acceptor());
}
/*
Ò²¿ÉÒÔÓÃSPI provider£¬¸üÃ÷ȷһЩ:
SelectorProvider p = SelectorProvider.provider();
selector = p.openSelector();
serverSocket = p.openServerSocketChannel();
*/

Reactor 2£º·Ö·¢Ñ­»·

public void run() { // ͨ³£Æô¶¯Ò»¸öÐÂÏß³Ì
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next());
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}

void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment());
if (r != null)
r.run();
}

Reactor 3£ºAcceptor

class Acceptor implements Runnable { // ÄÚÖÃÀà
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
} catch (IOException ex) { /* ... */ }
}
}
}

Reactor 4£º½¨Á¢Handler

final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// ³¢ÊÔ¼àÌý¶Áʼþ
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ }

Reactor 5£º´¦ÀíÇëÇó

public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}

void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// ¶ÁÍêÖ®ºó£¬Í¨³£¼àÌýдʼþ
sk.interestOps(SelectionKey.OP_WRITE);
}
}

void send() throws IOException {
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
}

Handlers״̬Á÷ת

GoFµÄ״̬ģʽµÄ¼òµ¥Ó¦ÓÃ

class Handler { // ...
public void run() { // ³õʼ״̬ÊÇread
socket.read(input);
if (inputIsComplete()) {
process();
sk.attach(new Sender());
sk.interest(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}

class Sender implements Runnable {
public void run(){ // ...
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
}
}

¶àḬ̈߳æ

Æô¶¯Êʵ±µÄÏß³ÌÊý£¬ÓÈÆäÊǶàºËÇé¿öÏÂ

¹¤×÷Ïß³Ì

Reactors±ØÐë¿ìËÙ´¥·¢handlers

HandlerÈÎÎñÖØ£¬»áÍÏÂýReactor

°Ñ¼ÆËãÐ͵Ť×÷½»¸øÆäËûµÄÏß³Ì

¶à¸öReadctorÏß³Ì

ReactorÏß³ÌרעÓÚIO²Ù×÷

°ÑѹÁ¦·Ö̯µ½ÆäËûµÄreactor

¸ºÔؾùºâÒª¿¼ÂÇÊÊÅäCPUºÍIOËÙÂÊ

WorkerÏß³Ì

Ö»´¦Àí¼ÆËãÐÍÈÎÎñ£¬¼ÓËÙReactorÏß³Ì

ÀàËÆÓÚPOSA2ÊéÉϵÄProactorģʽ

±È¸Ä³ÉʼþÇý¶¯Ä£Ê½¼òµ¥

Ö»´¦Àí¼ÆËãÐÍÈÎÎñ

ÖØ¸´IO±È½ÏÄÑ´¦Àí

×îºÃÔÚµÚÒ»´Î¶ÁµÄʱºò¾ÍÈ«²¿¶Á³öÀ´µ½»º´æÖÐ

ʹÓÃÏ̳߳أ¬·½±ã¿ØÖÆ

ͨ³£Ö»ÐèÉÙÁ¿µÄỊ̈߳¬±È¿Í»§¶ËÊýÁ¿ÉٵĶà

WorkerÏ̳߳Ø

´øÏ̳߳صÄHandler

class Handler implements Runnable {
// ʹÓà util.concurrentÖеÄÏ̳߳Ø
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() { // ...
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new Processer());
}
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE);
}

class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
}

µ÷ÓÃÈÎÎñµÄ·½Ê½

Á´Ê½´«µÝ

ÿ¸öÈÎÎñ¸ºÔðµ÷ÓÃÏÂÒ»¸öÈÎÎñ£¬Í¨³£ÊÇЧÂÊ×î¸ßµÄ£¬µ«ÈÝÒ׳öÎÊÌâ

Óɵ½Ã¿¸öhandlerµÄ·Ö·¢Æ÷»Øµ÷

ͨ¹ýÉèÖÃ״̬£¬°ó¶¨¶ÔÏóµÈ·½Ê½ÊµÏÖ£¬GoF MediatorģʽµÄ±äÌå

¶ÓÁÐ

¾ÍÏñÉÏÃæµÄÀý×Ó£¬Í¨¹ý¶ÓÁд«µÝbufferÖеÄÊý¾Ý

Future

µ÷Ó÷½Í¨¹ýjoin»òÕßwait/notify·½·¨»ñȡÿ¸ötaskµÄÖ´Ðнá¹û

ʹÓÃPooledExecutor

µ÷¶ÈworkerÏß³Ì

Ö÷Òª·½·¨£ºexecute(Runnable r)

ÓÐÈçÏ¿ØÖƲÎÊý

¶ÓÁÐÀàÐÍ

×î´óÏß³ÌÊý

×îСÏß³ÌÊý

"Warm" versus on-demand threads

×Ô¶¯»ØÊÕ¿ÕÏÐÏß³Ì

ÐèÒªµÄʱºòÔÙ´´½¨ÐµÄÏß³Ì

¶àÖÖ²ßÂÔÓ¦¶ÔÈÎÎñ±¥ºÍ

×èÈû£¬¶ªÆú£¬producer-runs,µÈµÈ

¶à¸öReactorÏß³Ì

ʹÓÃReactor³Ø

ÊÊÅäCPUºÍIOËÙÂÊ

¾²Ì¬´´½¨»ò¶¯Ì¬´´½¨

ÿһ¸öreactor¶¼ÓÐ×Ô¼ºµÄSelector£¬Ị̈߳¬·Ö·¢Ñ­»·

Ö÷acceptor·Ö·¢¸øÆäËûµÄreactors

Selector[] selectors;
int next = 0;
class Acceptor { // ...
public synchronized void run() { ...
Socket connection = serverSocket.accept();
if (connection != null)
new Handler(selectors[next], connection);
if (++next == selectors.length)
next = 0;
}
}

¶àReactorʾÀý

ÆäËüµÄjava.nioÌØÐÔ

ÿ¸öReactor°üº¬¶à¸öSelector

°Ñ²»Í¬µÄhandler°ó¶¨µ½²»Í¬µÄIOʼþʱÐèÒªÌØ±ðСÐÄͬ²½ÎÊÌâ

Îļþ´«Êä

×Ô¶¯Îļþ´«Ê䣺file-to-net»òÕßnet-to-file¸´ÖÆ

ÄÚ´æÓ³ÉäÎļþ

ͨ¹ýbuffers·ÃÎÊÎļþ

Ö±½Ó·ÃÎÊbuffer

ÓÐʱ¿ÉÒÔ´ïµ½Á㿽±´µÄÄ¿µÄ

But have setup and finalization overhead

·Ç³£Êʺϳ¤Á¬½ÓÓ¦ÓÃ

À©Õ¹ÍøÂçÁ¬½Ó

ͬʱÊÕµ½¶à¸öÇëÇó

¿Í»§¶Ë½¨Á¢Á¬½Ó

¿Í»§¶Ë·¢ËÍÒ»Á¬´®µÄÏûÏ¢/ÇëÇó

¿Í»§¶Ë¶Ï¿ªÁ¬½Ó

¾Ù¸öÀý×Ó

Êý¾Ý¿âÊÂÎñ¼àÊÓÆ÷

¶àÈËÔÚÏßÓÎÏ·¡¢ÁÄÌìÊҵȵÈ

À©Õ¹ÉÏÎĵĻù´¡ÍøÂçÄ£ÐÍ

±£³ÖÐí¶àÏà¶Ô³¤Ê±¼äµÄ´æ»îµÄ¿Í»§¶Ë

¸ú×Ù¿Í»§¶Ë£¬±£³Ö»á»°×´Ì¬£¨°üÀ¨¶ªÆú£©

·Ö²¼Ê½·þÎñ£¬ºá¿ç¶à¸öÖ÷»ú

APIÒ»ÀÀ

Buffer

ByteBuffer

£¨CharBuffer¡¢LongBufferµÈµÈ£©

Channel

SelectableChannel

SocketChannel

ServerSocketChannel

FileChannel

Selector

SelectionKey

Buffer

abstract class Buffer {
int capacity();
int position();
Buffer position(int newPosition);
int limit();
Buffer limit(int newLimit);
Buffer mark();
Buffer reset();
Buffer clear();
Buffer flip();
Buffer rewind();
int remaining();
boolean hasRemaining();
boolean isReadOnly();
}

ByteBuffer

abstract class ByteBuffer extends Buffer {
static ByteBuffer allocateDirect(int capacity);
static ByteBuffer allocate(int capacity);
static ByteBuffer wrap(byte[] src, int offset, int len);
static ByteBuffer wrap(byte[] src);
boolean isDirect();
ByteOrder order();
ByteBuffer order(ByteOrder bo);
ByteBuffer slice();
ByteBuffer duplicate();
ByteBuffer compact();
ByteBuffer asReadOnlyBuffer();

byte get();
byte get(int index);
ByteBuffer get(byte[] dst, int offset, int length);
ByteBuffer get(byte[] dst);
ByteBuffer put(byte b);
ByteBuffer put(int index, byte b);
ByteBuffer put(byte[] src, int offset, int length);
ByteBuffer put(ByteBuffer src);
ByteBuffer put(byte[] src);
char getChar();
char getChar(int index);
ByteBuffer putChar(char value);
ByteBuffer putChar(int index, char value);
CharBuffer asCharBuffer();
short getShort();
short getShort(int index);
ByteBuffer putShort(short value);
ByteBuffer putShort(int index, short value);
ShortBuffer asShortBuffer();
int getInt();
int getInt(int index);
ByteBuffer putInt(int value);
ByteBuffer putInt(int index, int value);
IntBuffer asIntBuffer();
long getLong();
long getLong(int index);
ByteBuffer putLong(long value);
ByteBuffer putLong(int index, long value);
LongBuffer asLongBuffer();
float getFloat();
float getFloat(int index);
ByteBuffer putFloat(float value);
ByteBuffer putFloat(int index, float value);
FloatBuffer asFloatBuffer();
double getDouble();
double getDouble(int index);
ByteBuffer putDouble(double value);
ByteBuffer putDouble(int index, double value);
DoubleBuffer asDoubleBuffer();
}

Channel

interface Channel {
boolean isOpen();
void close() throws IOException;
}
interface ReadableByteChannel extends Channel {
int read(ByteBuffer dst) throws IOException;
}
interface WritableByteChannel extends Channel {
int write(ByteBuffer src) throws IOException;
}
interface ScatteringByteChannel extends ReadableByteChannel {
int read(ByteBuffer[] dsts, int offset, int length) throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
}
interface GatheringByteChannel extends WritableByteChannel {
int write(ByteBuffer[] srcs, int offset, int length) throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
}

SelectableChannel

abstract class SelectableChannel implements Channel {
int validOps();
boolean isRegistered();
SelectionKey keyFor(Selector sel);
SelectionKey register(Selector sel, int ops) throws ClosedChannelException;
void configureBlocking(boolean block) throws IOException;
boolean isBlocking();
Object blockingLock();
}

SocketChannel

abstract class SocketChannel implements ByteChannel ... {
static SocketChannel open() throws IOException;
Socket socket();
int validOps();
boolean isConnected();
boolean isConnectionPending();
boolean isInputOpen();
boolean isOutputOpen();
boolean connect(SocketAddress remote) throws IOException;
boolean finishConnect() throws IOException;
void shutdownInput() throws IOException;
void shutdownOutput() throws IOException;
int read(ByteBuffer dst) throws IOException;
int read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
int write(ByteBuffer src) throws IOException;
int write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
}

ServerSocketChannel

abstract class ServerSocketChannel extends ... {
static ServerSocketChannel open() throws IOException;
int validOps();
ServerSocket socket();
SocketChannel accept() throws IOException;
}

FileChannel

abstract class FileChannel implements ... {
int read(ByteBuffer dst);
int read(ByteBuffer dst, long position);
int read(ByteBuffer[] dsts, int offset, int length);
int read(ByteBuffer[] dsts);
int write(ByteBuffer src);
int write(ByteBuffer src, long position);
int write(ByteBuffer[] srcs, int offset, int length);
int write(ByteBuffer[] srcs);
long position();
void position(long newPosition);
long size();
void truncate(long size);
void force(boolean flushMetaDataToo);
int transferTo(long position, int count,
WritableByteChannel dst);
int transferFrom(ReadableByteChannel src,
long position, int count);
FileLock lock(long position, long size, boolean shared);
FileLock lock();
FileLock tryLock(long pos, long size, boolean shared);
FileLock tryLock();
static final int MAP_RO, MAP_RW, MAP_COW;
MappedByteBuffer map(int mode, long position, int size);
}
NOTE: ËùÓеķ½·¨¶¼Å×IOException

Selector

abstract class Selector {
static Selector open() throws IOException;
Set keys();
Set selectedKeys();
int selectNow() throws IOException;
int select(long timeout) throws IOException;
int select() throws IOException;
void wakeup();
void close() throws IOException;
}

SelectionKey

abstract class SelectionKey {
static final int OP_READ, OP_WRITE, OP_CONNECT, OP_ACCEPT;
SelectableChannel channel();
Selector selector();
boolean isValid();
void cancel();
int interestOps();
void interestOps(int ops);
int readyOps();
boolean isReadable();
boolean isWritable();
boolean isConnectable();
boolean isAcceptable();
Object attach(Object ob);
Object attachment();
}
 
   
6224 ´Îä¯ÀÀ       27
Ïà¹ØÎÄÕÂ

Java΢·þÎñÐÂÉú´úÖ®Nacos
ÉîÈëÀí½âJavaÖеÄÈÝÆ÷
JavaÈÝÆ÷Ïê½â
Java´úÂëÖÊÁ¿¼ì²é¹¤¾ß¼°Ê¹Óð¸Àý
Ïà¹ØÎĵµ

JavaÐÔÄÜÓÅ»¯
Spring¿ò¼Ü
SSM¿ò¼Ü¼òµ¥¼òÉÜ
´ÓÁ㿪ʼѧjava±à³Ì¾­µä
Ïà¹Ø¿Î³Ì

¸ßÐÔÄÜJava±à³ÌÓëϵͳÐÔÄÜÓÅ»¯
JavaEE¼Ü¹¹¡¢ Éè¼ÆÄ£Ê½¼°ÐÔÄܵ÷ÓÅ
Java±à³Ì»ù´¡µ½Ó¦Óÿª·¢
JAVAÐéÄâ»úÔ­ÀíÆÊÎö