您可以捐助,支持我们的公益事业。

1元 10元 50元





认证码:  验证码,看不清楚?请点击刷新验证码 必填



  求知 文章 文库 Lib 视频 iPerson 课程 认证 咨询 工具 讲座 Model Center   Code  
会员   
   
 
     
   
 订阅
  捐助
Java高并发异步Socket编程
 
  5752  次浏览      17
 2018-12-25
 
编辑推荐:
本文来自于oschina,本文介绍了经典的ServerSocket监听循环、Reactor 模式以及Worker线程等相关内容。

Java可扩展IO

Doug Lee

大纲

可扩展的网络服务

事件驱动

Reactor 模式

基础版

多线程版

其他变体

java.io包中分阻塞IO API一览

网络应用服务器

Web服务器,分布式对象系统等等

它们的共同特点

Read请求

解码请求报文

业务处理

编码响应报文

发送响应

实际应用中每一个步骤的不一样

XML解析

文件传输

动态生成网页

计算型服务

经典的服务设计

每个线程运行一个handler

经典的ServerSocket监听循环

class Server implements Runnable {
public void run() {
try {
ServerSocket ss = new ServerSocket(PORT);
while (!Thread.interrupted())
new Thread(new Handler(ss.accept())).start();
// 这里使用单线程或者线程池
} catch (IOException ex) { /* ... */ }
}

static class Handler implements Runnable {
final Socket socket;
Handler(Socket s) { socket = s; }
public void run() {
try {
byte[] input = new byte[MAX_INPUT];
socket.getInputStream().read(input);
byte[] output = process(input);
socket.getOutputStream().write(output);
} catch (IOException ex) { /* ... */ }
}
private byte[] process(byte[] cmd) { /* ... */ }
}
}

Note: 异常处理省略

高可扩展性目标

压力持续增大时服务优雅的降级(客户端增多)

性能随着资源(CPU,内存,磁盘,带宽)的提升持续增加

高可用和性能的目标

低延迟

应对请求尖峰

服务质量可控

分而治之是解决扩展性问题的常用方法

分而治之

把处理流程切分成更小的task,每个task都是非阻塞的

只有当任务准备好才去执行,IO事件模型通常是触发器机制

java.nio包包含基本的非阻塞机制

非阻塞读和写

分发任务 把task关联到IO事件模型

更多想法

事件驱动设计

事件驱动设计

通常更高效

更少的资源 不需要为每一个client启动一个线程

减少开销 减少上下文切换以及更少的锁竞争

事件分发较慢 必须把action和时间绑定在一起

编程难度较高

必须把流程分割为一系列非阻塞的任务单元

与GUI的事件模型类似

不能消除所有的阻塞,如:GC、内存页错误等等

必须注意服务的状态的变化

Java AWT图形编程的事件驱动

事件驱动的IO模型与此类似,但设计上有所不同

Reactor模型

Reacttor负责分发事件到对应的handler,类似AWT线程

Handlers是非阻塞的任务,类似AWT中的ActionListeners

Manage负责把handler绑定到事件上

参见施密特等人所著的《Pattern-Oriented Software Architecture》卷2 以及Richard Stevens关于网络的数以及MattWelsh的SEDA框架等等

Reactor基础版

单线程版

java.nio提供的支持

Channels

通道是文件、socket之间的连接, 支持非阻塞读取

Buffers

数组对象,可以被Channel直接读写

Selectors

负责筛选那些Channel有IO事件

SelectionKeys

保存IO事件状态和绑定对象

Reactor 1:创建

class Reactor implements Runnable {
final Selector selector;
final ServerSocketChannel serverSocket;
Reactor(int port) throws IOException {
selector = Selector.open();
serverSocket = ServerSocketChannel.open();
serverSocket.socket().bind( new InetSocketAddress(port) );
serverSocket.configureBlocking(false);
SelectionKey sk = serverSocket.register( selector, SelectionKey.OP_ACCEPT );
sk.attach(new Acceptor());
}
/*
也可以用SPI provider,更明确一些:
SelectorProvider p = SelectorProvider.provider();
selector = p.openSelector();
serverSocket = p.openServerSocketChannel();
*/

Reactor 2:分发循环

public void run() { // 通常启动一个新线程
try {
while (!Thread.interrupted()) {
selector.select();
Set selected = selector.selectedKeys();
Iterator it = selected.iterator();
while (it.hasNext())
dispatch((SelectionKey)(it.next());
selected.clear();
}
} catch (IOException ex) { /* ... */ }
}

void dispatch(SelectionKey k) {
Runnable r = (Runnable)(k.attachment());
if (r != null)
r.run();
}

Reactor 3:Acceptor

class Acceptor implements Runnable { // 内置类
public void run() {
try {
SocketChannel c = serverSocket.accept();
if (c != null)
new Handler(selector, c);
} catch (IOException ex) { /* ... */ }
}
}
}

Reactor 4:建立Handler

final class Handler implements Runnable {
final SocketChannel socket;
final SelectionKey sk;
ByteBuffer input = ByteBuffer.allocate(MAXIN);
ByteBuffer output = ByteBuffer.allocate(MAXOUT);
static final int READING = 0, SENDING = 1;
int state = READING;
Handler(Selector sel, SocketChannel c) throws IOException {
socket = c;
c.configureBlocking(false);
// 尝试监听读事件
sk = socket.register(sel, 0);
sk.attach(this);
sk.interestOps(SelectionKey.OP_READ);
sel.wakeup();
}
boolean inputIsComplete() { /* ... */ }
boolean outputIsComplete() { /* ... */ }
void process() { /* ... */ }

Reactor 5:处理请求

public void run() {
try {
if (state == READING) read();
else if (state == SENDING) send();
} catch (IOException ex) { /* ... */ }
}

void read() throws IOException {
socket.read(input);
if (inputIsComplete()) {
process();
state = SENDING;
// 读完之后,通常监听写事件
sk.interestOps(SelectionKey.OP_WRITE);
}
}

void send() throws IOException {
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
}

Handlers状态流转

GoF的状态模式的简单应用

class Handler { // ...
public void run() { // 初始状态是read
socket.read(input);
if (inputIsComplete()) {
process();
sk.attach(new Sender());
sk.interest(SelectionKey.OP_WRITE);
sk.selector().wakeup();
}
}

class Sender implements Runnable {
public void run(){ // ...
socket.write(output);
if (outputIsComplete())
sk.cancel();
}
}
}

多线程版

启动适当的线程数,尤其是多核情况下

工作线程

Reactors必须快速触发handlers

Handler任务重,会拖慢Reactor

把计算型的工作交给其他的线程

多个Readctor线程

Reactor线程专注于IO操作

把压力分摊到其他的reactor

负载均衡要考虑适配CPU和IO速率

Worker线程

只处理计算型任务,加速Reactor线程

类似于POSA2书上的Proactor模式

比改成事件驱动模式简单

只处理计算型任务

重复IO比较难处理

最好在第一次读的时候就全部读出来到缓存中

使用线程池,方便控制

通常只需少量的线程,比客户端数量少的多

Worker线程池

带线程池的Handler

class Handler implements Runnable {
// 使用 util.concurrent中的线程池
static PooledExecutor pool = new PooledExecutor(...);
static final int PROCESSING = 3;
// ...
synchronized void read() { // ...
socket.read(input);
if (inputIsComplete()) {
state = PROCESSING;
pool.execute(new Processer());
}
}
synchronized void processAndHandOff() {
process();
state = SENDING; // or rebind attachment
sk.interest(SelectionKey.OP_WRITE);
}

class Processer implements Runnable {
public void run() { processAndHandOff(); }
}
}

调用任务的方式

链式传递

每个任务负责调用下一个任务,通常是效率最高的,但容易出问题

由到每个handler的分发器回调

通过设置状态,绑定对象等方式实现,GoF Mediator模式的变体

队列

就像上面的例子,通过队列传递buffer中的数据

Future

调用方通过join或者wait/notify方法获取每个task的执行结果

使用PooledExecutor

调度worker线程

主要方法:execute(Runnable r)

有如下控制参数

队列类型

最大线程数

最小线程数

"Warm" versus on-demand threads

自动回收空闲线程

需要的时候再创建新的线程

多种策略应对任务饱和

阻塞,丢弃,producer-runs,等等

多个Reactor线程

使用Reactor池

适配CPU和IO速率

静态创建或动态创建

每一个reactor都有自己的Selector,线程,分发循环

主acceptor分发给其他的reactors

Selector[] selectors;
int next = 0;
class Acceptor { // ...
public synchronized void run() { ...
Socket connection = serverSocket.accept();
if (connection != null)
new Handler(selectors[next], connection);
if (++next == selectors.length)
next = 0;
}
}

多Reactor示例

其它的java.nio特性

每个Reactor包含多个Selector

把不同的handler绑定到不同的IO事件时需要特别小心同步问题

文件传输

自动文件传输:file-to-net或者net-to-file复制

内存映射文件

通过buffers访问文件

直接访问buffer

有时可以达到零拷贝的目的

But have setup and finalization overhead

非常适合长连接应用

扩展网络连接

同时收到多个请求

客户端建立连接

客户端发送一连串的消息/请求

客户端断开连接

举个例子

数据库事务监视器

多人在线游戏、聊天室等等

扩展上文的基础网络模型

保持许多相对长时间的存活的客户端

跟踪客户端,保持会话状态(包括丢弃)

分布式服务,横跨多个主机

API一览

Buffer

ByteBuffer

(CharBuffer、LongBuffer等等)

Channel

SelectableChannel

SocketChannel

ServerSocketChannel

FileChannel

Selector

SelectionKey

Buffer

abstract class Buffer {
int capacity();
int position();
Buffer position(int newPosition);
int limit();
Buffer limit(int newLimit);
Buffer mark();
Buffer reset();
Buffer clear();
Buffer flip();
Buffer rewind();
int remaining();
boolean hasRemaining();
boolean isReadOnly();
}

ByteBuffer

abstract class ByteBuffer extends Buffer {
static ByteBuffer allocateDirect(int capacity);
static ByteBuffer allocate(int capacity);
static ByteBuffer wrap(byte[] src, int offset, int len);
static ByteBuffer wrap(byte[] src);
boolean isDirect();
ByteOrder order();
ByteBuffer order(ByteOrder bo);
ByteBuffer slice();
ByteBuffer duplicate();
ByteBuffer compact();
ByteBuffer asReadOnlyBuffer();

byte get();
byte get(int index);
ByteBuffer get(byte[] dst, int offset, int length);
ByteBuffer get(byte[] dst);
ByteBuffer put(byte b);
ByteBuffer put(int index, byte b);
ByteBuffer put(byte[] src, int offset, int length);
ByteBuffer put(ByteBuffer src);
ByteBuffer put(byte[] src);
char getChar();
char getChar(int index);
ByteBuffer putChar(char value);
ByteBuffer putChar(int index, char value);
CharBuffer asCharBuffer();
short getShort();
short getShort(int index);
ByteBuffer putShort(short value);
ByteBuffer putShort(int index, short value);
ShortBuffer asShortBuffer();
int getInt();
int getInt(int index);
ByteBuffer putInt(int value);
ByteBuffer putInt(int index, int value);
IntBuffer asIntBuffer();
long getLong();
long getLong(int index);
ByteBuffer putLong(long value);
ByteBuffer putLong(int index, long value);
LongBuffer asLongBuffer();
float getFloat();
float getFloat(int index);
ByteBuffer putFloat(float value);
ByteBuffer putFloat(int index, float value);
FloatBuffer asFloatBuffer();
double getDouble();
double getDouble(int index);
ByteBuffer putDouble(double value);
ByteBuffer putDouble(int index, double value);
DoubleBuffer asDoubleBuffer();
}

Channel

interface Channel {
boolean isOpen();
void close() throws IOException;
}
interface ReadableByteChannel extends Channel {
int read(ByteBuffer dst) throws IOException;
}
interface WritableByteChannel extends Channel {
int write(ByteBuffer src) throws IOException;
}
interface ScatteringByteChannel extends ReadableByteChannel {
int read(ByteBuffer[] dsts, int offset, int length) throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
}
interface GatheringByteChannel extends WritableByteChannel {
int write(ByteBuffer[] srcs, int offset, int length) throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
}

SelectableChannel

abstract class SelectableChannel implements Channel {
int validOps();
boolean isRegistered();
SelectionKey keyFor(Selector sel);
SelectionKey register(Selector sel, int ops) throws ClosedChannelException;
void configureBlocking(boolean block) throws IOException;
boolean isBlocking();
Object blockingLock();
}

SocketChannel

abstract class SocketChannel implements ByteChannel ... {
static SocketChannel open() throws IOException;
Socket socket();
int validOps();
boolean isConnected();
boolean isConnectionPending();
boolean isInputOpen();
boolean isOutputOpen();
boolean connect(SocketAddress remote) throws IOException;
boolean finishConnect() throws IOException;
void shutdownInput() throws IOException;
void shutdownOutput() throws IOException;
int read(ByteBuffer dst) throws IOException;
int read(ByteBuffer[] dsts, int offset, int length)
throws IOException;
int read(ByteBuffer[] dsts) throws IOException;
int write(ByteBuffer src) throws IOException;
int write(ByteBuffer[] srcs, int offset, int length)
throws IOException;
int write(ByteBuffer[] srcs) throws IOException;
}

ServerSocketChannel

abstract class ServerSocketChannel extends ... {
static ServerSocketChannel open() throws IOException;
int validOps();
ServerSocket socket();
SocketChannel accept() throws IOException;
}

FileChannel

abstract class FileChannel implements ... {
int read(ByteBuffer dst);
int read(ByteBuffer dst, long position);
int read(ByteBuffer[] dsts, int offset, int length);
int read(ByteBuffer[] dsts);
int write(ByteBuffer src);
int write(ByteBuffer src, long position);
int write(ByteBuffer[] srcs, int offset, int length);
int write(ByteBuffer[] srcs);
long position();
void position(long newPosition);
long size();
void truncate(long size);
void force(boolean flushMetaDataToo);
int transferTo(long position, int count,
WritableByteChannel dst);
int transferFrom(ReadableByteChannel src,
long position, int count);
FileLock lock(long position, long size, boolean shared);
FileLock lock();
FileLock tryLock(long pos, long size, boolean shared);
FileLock tryLock();
static final int MAP_RO, MAP_RW, MAP_COW;
MappedByteBuffer map(int mode, long position, int size);
}
NOTE: 所有的方法都抛IOException

Selector

abstract class Selector {
static Selector open() throws IOException;
Set keys();
Set selectedKeys();
int selectNow() throws IOException;
int select(long timeout) throws IOException;
int select() throws IOException;
void wakeup();
void close() throws IOException;
}

SelectionKey

abstract class SelectionKey {
static final int OP_READ, OP_WRITE, OP_CONNECT, OP_ACCEPT;
SelectableChannel channel();
Selector selector();
boolean isValid();
void cancel();
int interestOps();
void interestOps(int ops);
int readyOps();
boolean isReadable();
boolean isWritable();
boolean isConnectable();
boolean isAcceptable();
Object attach(Object ob);
Object attachment();
}
 
   
5752 次浏览       17
相关文章

Java微服务新生代之Nacos
深入理解Java中的容器
Java容器详解
Java代码质量检查工具及使用案例
相关文档

Java性能优化
Spring框架
SSM框架简单简绍
从零开始学java编程经典
相关课程

高性能Java编程与系统性能优化
JavaEE架构、 设计模式及性能调优
Java编程基础到应用开发
JAVA虚拟机原理剖析