Äú¿ÉÒÔ¾èÖú£¬Ö§³ÖÎÒÃǵĹ«ÒæÊÂÒµ¡£

1Ôª 10Ôª 50Ôª





ÈÏÖ¤Â룺  ÑéÖ¤Âë,¿´²»Çå³þ?Çëµã»÷Ë¢ÐÂÑéÖ¤Âë ±ØÌî



  ÇóÖª ÎÄÕ ÎÄ¿â Lib ÊÓÆµ iPerson ¿Î³Ì ÈÏÖ¤ ×Éѯ ¹¤¾ß ½²×ù Model Center   Code  
»áÔ±   
   
 
     
   
 ¶©ÔÄ
  ¾èÖú
Netty¡ª¡ª»ù±¾Ê¹ÓýéÉÜ
 
  2669  次浏览      28
 2019-9-17
 
±à¼­ÍƼö:
±¾ÎÄÀ´×ÔÓÚcsdn£¬±¾ÎÄÖ÷Òª½éÉÜÁËNetty¼Ü¹¹×é³É£¬NettyͨÐŵIJ½Ö裬NettyÖнâ¾öTCPÕ³°ü/²ð°üµÄ·½·¨µÈÄÚÈÝ £¬Ï£Íû¶ÔÄúÄÜÓÐËù°ïÖú¡£

1.ΪʲôѡÔñNetty

ÔÚ¶ÔͨÐÅÄ£ÐÍÓÐÁË»ù±¾µÄÈÏʶʱ£¬Ñ§Ï°µ½µÄ½ö½öÊÇÒ»¸öÄ£ÐÍ£¬Èç¹ûÏë°ÑÕâÐ©ÕæÕýµÄÓÃÓÚʵ¼Ê¹¤×÷ÖУ¬ÄÇô»¹ÐèÒª²»¶ÏµÄÍêÉÆ¡¢À©Õ¹ºÍÓÅ»¯¡£±ÈÈç¾­µäµÄTCP¶Á°üд°üÎÊÌ⣬»òÕßÊÇÊý¾Ý½ÓÊյĴóС£¬Êµ¼ÊµÄͨÐÅ´¦ÀíÓëÓ¦´ðµÄ´¦ÀíÂß¼­µÈµÈһЩϸ½ÚÎÊÌâÐèÒªÈÏÕæµÄȥ˼¿¼£¬¶øÕâЩ¶¼ÐèÒª´óÁ¿µÄʱ¼äºÍ¾­Àú£¬ÒÔ¼°·á¸»µÄ¾­Ñé¡£ËùÒÔÏëѧºÃSocketͨÐŲ»ÊǼþÈÝÒ×Ê£¬ÄÇô½ÓÏÂÀ´¾ÍÀ´Ñ§Ï°Ò»ÏÂеļ¼ÊõNetty£¬ÎªÊ²Ã´»áÑ¡ÔñNetty£¿ÒòΪËü¼òµ¥£¡Ê¹ÓÃNetty²»±Ø±àд¸´ÔÓµÄÂß¼­´úÂëȥʵÏÖͨÐÅ£¬ÔÙÒ²²»ÐèҪȥ¿¼ÂÇÐÔÄÜÎÊÌ⣬²»ÐèÒª¿¼ÂDZàÂëÎÊÌ⣬°ë°ü¶ÁдµÈÎÊÌ⡣ǿ´óµÄNettyÒѾ­°ïÎÒÃÇʵÏÖºÃÁË£¬ÎÒÃÇÖ»ÐèҪʹÓü´¿É¡£

NettyÊÇ×îÁ÷ÐеÄNIO¿ò¼Ü£¬ËüµÄ½¡×³ÐÔ¡¢¹¦ÄÜ¡¢ÐÔÄÜ¡¢¿É¶¨ÖÆÐԺͿÉÀ©Õ¹ÐÔÔÚͬÀà¿ò¼Ü¶¼ÊÇÊ×ÇüÒ»Ö¸µÄ¡£ËüÒѾ­µÃµ½³É°ÙÉÏǧµÄÉÌÒµ/ÉÌÓÃÏîÄ¿ÑéÖ¤£¬ÈçHadoopµÄRPC¿ò¼ÜAvro¡¢RocketMQÒÔ¼°Ö÷Á÷µÄ·Ö²¼Ê½Í¨ÐÅ¿ò¼ÜDubboxµÈµÈ¡£

2.Netty¼ò½é

NettyÊÇ»ùÓÚJava NIO client-serverµÄÍøÂçÓ¦Óÿò¼Ü£¬Ê¹ÓÃNetty¿ÉÒÔ¿ìËÙ¿ª·¢ÍøÂçÓ¦Óã¬ÀýÈç·þÎñÆ÷ºÍ¿Í»§¶ËЭÒé¡£NettyÌṩÁËÒ»ÖÖеķ½Ê½À´¿ª·¢ÍøÂçÓ¦ÓóÌÐò£¬ÕâÖÖеķ½Ê½Ê¹ËüºÜÈÝÒ×ʹÓú;ßÓкÜÇ¿µÄÀ©Õ¹ÐÔ¡£NettyµÄÄÚ²¿ÊµÏÖÊǺܸ´Ôӵ쬵«ÊÇNettyÌṩÁ˼òµ¥Ò×ÓõÄAPI´ÓÍøÂç´¦Àí´úÂëÖнâñîÒµÎñÂß¼­¡£NettyÊÇÍêÈ«»ùÓÚNIOʵÏֵģ¬ËùÒÔÕû¸öNetty¶¼ÊÇÒì²½µÄ¡£

ÍøÂçÓ¦ÓóÌÐòͨ³£ÐèÒªÓнϸߵĿÉÀ©Õ¹ÐÔ£¬ÎÞÂÛÊÇNetty»¹ÊÇÆäËûµÄ»ùÓÚJava NioµÄ¿ò¼Ü£¬¶¼»áÌṩ¿ÉÀ©Õ¹ÐԵĽâ¾ö·½°¸¡£NettyÖÐÒ»¸ö¹Ø¼ü×é³É²¿·ÖÊÇËüµÄÒì²½ÌØÐÔ£¬±¾Æ¬ÎÄÕ½«ÌÖÂÛͬ²½£¨×èÈû£©ºÍÒì²½£¨·Ç×èÈû£©µÄIOÀ´ËµÃ÷ΪʲôʹÓÃÒì²½´úÂë½â¾öÀ©Õ¹ÐÔÎÊÌâÒÔ¼°ÈçºÎʹÓÃÒì²½¡£

3.Netty¼Ü¹¹×é³É£¨½èÓÃÒ»ÏÂÍøÉϵÄͼƬ£©

4.HelloworldÈëÃÅ

ÔÚѧϰNetty֮ǰ£¬ÏÈÀ´»Ø¹ËÒ»ÏÂNIOµÄͨÐŲ½Ö裺

¢Ù´´½¨ServerSocketChannel£¬ÎªÆäÅäÖ÷Ç×èÈûģʽ¡£

¢Ú°ó¶¨¼àÌý£¬ÅäÖÃTCP²ÎÊý£¬Â¼Èëbacklog´óСµÈ¡£

¢Û´´½¨Ò»¸ö¶ÀÁ¢µÄIOỊ̈߳¬ÓÃÓÚÂÖѯ¶à·¸´ÓÃÆ÷Selector¡£

¢Ü´´½¨Selector£¬½«Ö®Ç°´´½¨µÄServerSocketChannel×¢²áµ½SelectorÉÏ£¬²¢ÉèÖüàÌý±êʶλSelectionKey.OP_ACCEPT¡£

¢ÝÆô¶¯IOỊ̈߳¬ÔÚÑ­»·ÌåÖÐÖ´ÐÐSelector.select()·½·¨£¬ÂÖѯ¾ÍÐ÷µÄͨµÀ¡£

¢Þµ±ÂÖѯµ½´¦ÓÚ¾ÍÐ÷״̬µÄͨµÀʱ£¬ÐèÒª½øÐвÙ×÷λÅжϣ¬Èç¹ûÊÇACCEPT״̬£¬ËµÃ÷ÊÇеĿͻ§¶Ë½ÓÈ룬Ôòµ÷ÓÃaccept·½·¨½ÓÊÕеĿͻ§¶Ë¡£

¢ßÉèÖÃнÓÈë¿Í»§¶ËµÄһЩ²ÎÊý£¬Èç·Ç×èÈû£¬²¢½«Æä¼ÌÐø×¢²áµ½SelectorÉÏ£¬ÉèÖüàÌý±êʶλµÈ¡£

¢àÈç¹ûÂÖѯµÄͨµÀ±êʶλÊÇREAD£¬Ôò½øÐжÁÈ¡£¬¹¹ÔìBuffer¶ÔÏóµÈ¡£

¢á¸üϸ½ÚµÄÎÊÌ⻹ÓÐÊý¾Ýû·¢ËÍÍê³É¼ÌÐø·¢Ë͵ÄÎÊÌâ......

ºÃÀ²£¬¿ªÊ¼Ñ§Ï°NettyÁË¡£ÏÈÈ¥http://netty.io/ÉÏÏÂÔØËùÓеÄNetty°ü¡£

NettyͨÐŵIJ½Ö裺

¢Ù´´½¨Á½¸öNIOÏß³Ì×飬һ¸öרÃÅÓÃÓÚÍøÂçʼþ´¦Àí£¨½ÓÊܿͻ§¶ËµÄÁ¬½Ó£©£¬ÁíÒ»¸öÔò½øÐÐÍøÂçͨÐŵĶÁд¡£

¢Ú´´½¨Ò»¸öServerBootstrap¶ÔÏó£¬ÅäÖÃNettyµÄһϵÁвÎÊý£¬ÀýÈç½ÓÊÜ´«³öÊý¾ÝµÄ»º´æ´óСµÈ¡£

¢Û´´½¨Ò»¸öÓÃÓÚʵ¼Ê´¦ÀíÊý¾ÝµÄÀàChannelInitializer£¬½øÐгõʼ»¯µÄ×¼±¸¹¤×÷£¬±ÈÈçÉèÖýÓÊÜ´«³öÊý¾ÝµÄ×Ö·û¼¯¡¢¸ñʽÒÔ¼°Êµ¼Ê´¦ÀíÊý¾ÝµÄ½Ó¿Ú¡£

¢Ü°ó¶¨¶Ë¿Ú£¬Ö´ÐÐͬ²½×èÈû·½·¨µÈ´ý·þÎñÆ÷¶ËÆô¶¯¼´¿É¡£

ºÃÁË£¬ËµÁËÄÇô¶à£¬ÏÂÃæ¾ÍÀ´HelloWorldÈëÃŰɣ¡

·þÎñÆ÷¶Ë£º

public class Server {

private int port;

public Server(int port) {
this.port = port;
}

public void run() {
EventLoopGroup bossGroup = new NioEventLoopGroup(); //ÓÃÓÚ´¦Àí·þÎñÆ÷¶Ë½ÓÊÕ¿Í»§¶ËÁ¬½Ó
EventLoopGroup workerGroup = new NioEventLoopGroup(); //½øÐÐÍøÂçͨÐÅ£¨¶Áд£©
try {
ServerBootstrap bootstrap = new ServerBootstrap(); //¸¨Öú¹¤¾ßÀ࣬ÓÃÓÚ·þÎñÆ÷ͨµÀµÄһϵÁÐÅäÖÃ
bootstrap.group(bossGroup, workerGroup) //°ó¶¨Á½¸öÏß³Ì×é
.channel(NioServerSocketChannel.class) //Ö¸¶¨NIOµÄģʽ
.childHandler(new ChannelInitializer<SocketChannel>() { //ÅäÖþßÌåµÄÊý¾Ý´¦Àí·½Ê½
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ServerHandler());
}
})
/**
* ¶ÔÓÚChannelOption.SO_BACKLOGµÄ½âÊÍ£º
* ·þÎñÆ÷¶ËTCPÄÚºËά»¤ÓÐÁ½¸ö¶ÓÁУ¬ÎÒÃdzÆÖ®ÎªA¡¢B¶ÓÁС£¿Í»§¶ËÏò·þÎñÆ÷¶Ëconnectʱ£¬»á·¢ËÍ´øÓÐSYN±êÖ¾µÄ°ü£¨µÚÒ»´ÎÎÕÊÖ£©£¬·þÎñÆ÷¶Ë
* ½ÓÊÕµ½¿Í»§¶Ë·¢Ë͵ÄSYNʱ£¬Ïò¿Í»§¶Ë·¢ËÍSYN ACKÈ·ÈÏ£¨µÚ¶þ´ÎÎÕÊÖ£©£¬´ËʱTCPÄÚºËÄ£¿é°Ñ¿Í»§¶ËÁ¬½Ó¼ÓÈëµ½A¶ÓÁÐÖУ¬È»ºó·þÎñÆ÷½ÓÊÕµ½
* ¿Í»§¶Ë·¢Ë͵ÄACKʱ£¨µÚÈý´ÎÎÕÊÖ£©£¬TCPÄÚºËÄ£¿é°Ñ¿Í»§¶ËÁ¬½Ó´ÓA¶ÓÁÐÒÆ¶¯µ½B¶ÓÁУ¬Á¬½ÓÍê³É£¬Ó¦ÓóÌÐòµÄaccept»á·µ»Ø¡£Ò²¾ÍÊÇ˵accept
* ´ÓB¶ÓÁÐÖÐÈ¡³öÍê³ÉÁËÈý´ÎÎÕÊÖµÄÁ¬½Ó¡£
* A¶ÓÁкÍB¶ÓÁеij¤¶ÈÖ®ºÍ¾ÍÊÇbacklog¡£µ±A¡¢B¶ÓÁеij¤¶ÈÖ®ºÍ´óÓÚChannelOption.SO_BACKLOGʱ£¬ÐµÄÁ¬½Ó½«»á±»TCPÄں˾ܾø¡£
* ËùÒÔ£¬Èç¹ûbacklog¹ýС£¬¿ÉÄÜ»á³öÏÖacceptËٶȸú²»ÉÏ£¬A¡¢B¶ÓÁÐÂúÁË£¬µ¼ÖÂеĿͻ§¶ËÎÞ·¨Á¬½Ó¡£Òª×¢ÒâµÄÊÇ£¬backlog¶Ô³ÌÐòÖ§³ÖµÄ
* Á¬½ÓÊý²¢ÎÞÓ°Ï죬backlogÓ°ÏìµÄÖ»ÊÇ»¹Ã»Óб»acceptÈ¡³öµÄÁ¬½Ó
*/
.option(ChannelOption.SO_BACKLOG, 128) //ÉèÖÃTCP»º³åÇø
.option(ChannelOption.SO_SNDBUF, 32 * 1024) //ÉèÖ÷¢ËÍÊý¾Ý»º³å´óС
.option(ChannelOption.SO_RCVBUF, 32 * 1024) //ÉèÖýÓÊÜÊý¾Ý»º³å´óС
.childOption(ChannelOption.SO_KEEPALIVE, true); //±£³ÖÁ¬½Ó
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
new Server(8379).run();
}
}

ServerHandlerÀࣺ

public class ServerHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {

//do something msg
ByteBuf buf = (ByteBuf)msg;
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
String request = new String(data, "utf-8");
System.out.println("Server: " + request);
//д¸ø¿Í»§¶Ë
String response = "ÎÒÊÇ·´À¡µÄÐÅÏ¢";
ctx.writeAndFlush(Unpooled.copiedBuffer ("888".getBytes()));
//.addListener(ChannelFutureListener.CLOSE);


}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

}

¿Í»§¶Ë£º

public class Client {

public static void main(String[] args) throws InterruptedException {
EventLoopGroup workerGroup = new NioEventLoopGroup();
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(new ChannelInitializer <SocketChannel>() {
@Override
protected void initChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast(new ClientHandler());
}
});
ChannelFuture future = bootstrap. connect("127.0.0.1", 8379).sync();
future.channel().writeAndFlush (Unpooled.copiedBuffer("777".getBytes()));
future.channel().closeFuture().sync();
workerGroup.shutdownGracefully();
}

}

ClientHandlerÀࣺ

public class ClientHandler extends ChannelHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
ByteBuf buf = (ByteBuf) msg;
byte[] data = new byte[buf.readableBytes()];
buf.readBytes(data);
System.out.println("Client£º" + new String(data).trim());
} finally {
ReferenceCountUtil.release(msg);
}
}


@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

}

ÔËÐнá¹û£º

5.TCPÕ³°ü¡¢²ð°üÎÊÌâ

ÊìϤTCP±à³ÌµÄ¿ÉÄܶ¼ÖªµÀ£¬ÎÞÂÛÊÇ·þÎñÆ÷¶Ë»¹Êǿͻ§¶Ë£¬µ±ÎÒÃǶÁÈ¡»òÕß·¢ËÍÊý¾ÝµÄʱºò£¬¶¼ÐèÒª¿¼ÂÇTCPµ×²ãµÄÕ³°ü/²ð°ü»úÖÆ¡£

TCPÊÇÒ»¸ö¡°Á÷¡±Ð­Ò飬ËùνÁ÷¾ÍÊÇûÓнçÏÞµÄÒÅ´«Êý¾Ý¡£´ó¼Ò¿ÉÒÔÏëÏóһϣ¬Èç¹ûºÓË®¾ÍºÃ±ÈÊý¾Ý£¬ËûÃÇÊÇÁ¬³ÉһƬµÄ£¬Ã»ÓзֽçÏߣ¬TCPµ×²ã²¢²»Á˽âÉϲãÒµÎñÊý¾ÝµÄ¾ßÌ庬Ò壬Ëü»á¸ù¾ÝTCP»º³åÇøµÄ¾ßÌåÇé¿ö½øÐаüµÄ»®·Ö£¬Ò²¾ÍÊÇ˵£¬ÔÚÒµÎñÉÏÒ»¸öÍêÕûµÄ°ü¿ÉÄܻᱻTCP·Ö³É¶à¸ö°ü½øÐз¢ËÍ£¬Ò²¿ÉÄܰѶà¸öС°ü·â×°³ÉÒ»¸ö´óµÄÊý¾Ý°ü·¢ËͳöÈ¥£¬Õâ¾ÍÊÇËùνµÄÕ³°ü/²ð°üÎÊÌâ¡£

½â¾ö·½°¸£º

¢ÙÏûÏ¢¶¨³¤£¬ÀýÈçÿ¸ö±¨ÎĵĴóС¹Ì¶¨Îª200¸ö×Ö½Ú£¬Èç¹û²»¹»£¬¿Õλ²¹¿Õ¸ñ¡£

¢ÚÔÚ°üβ²¿Ôö¼ÓÌØÊâ×Ö·û½øÐзָÀýÈç¼Ó»Ø³µµÈ¡£

¢Û½«ÏûÏ¢·ÖΪÏûϢͷºÍÏûÏ¢Ì壬ÔÚÏûϢͷÖаüº¬±íʾÏûÏ¢×ܳ¤¶ÈµÄ×ֶΣ¬È»ºó½øÐÐÒµÎñÂß¼­µÄ´¦Àí¡£

NettyÖнâ¾öTCPÕ³°ü/²ð°üµÄ·½·¨£º

¢Ù·Ö¸ô·ûÀࣺDelimiterBasedFrameDecoder£¨×Ô¶¨Òå·Ö¸ô·û£©

¢Ú¶¨³¤£ºFixedLengthFrameDecoder

6.Netty±à½âÂë¼¼Êõ

ͨ³£ÎÒÃÇҲϰ¹ß½«±àÂ루Encode£©³ÉΪÐòÁл¯£¬Ëü½«Êý¾ÝÐòÁл¯Îª×Ö½ÚÊý×飬ÓÃÓÚÍøÂç´«Êä¡¢Êý¾Ý³Ö¾Ã»¯»òÕ߯äËûÓÃ;¡£·´Ö®£¬½âÂ루Decode£©/·´ÐòÁл¯£¨deserialization£©

°Ñ´ÓÍøÂç¡¢´ÅÅ̵ȶÁÈ¡µÄ×Ö½ÚÊý×黹ԭ³Éԭʼ¶ÔÏó£¨Í¨³£ÊÇԭʼ¶ÔÏóµÄ¿½±´£©£¬ÒÔ·½±ãºóÐøµÄÒµÎñÂß¼­²Ù×÷¡£½øÐÐÔ¶³Ì¿ç½ø³Ì·þÎñµ÷ÓÃʱ£¨ÀýÈçRPCµ÷Óã©£¬ÐèҪʹÓÃÌØ¶¨µÄ±à½âÂë¼¼Êõ£¬¶ÔÐèÒª½øÐÐÍøÂç´«ÊäµÄ¶ÔÏó×ö±àÂë»òÕß½âÂ룬ÒÔ±ãÍê³ÉÔ¶³Ìµ÷Óá£

Ö÷Á÷µÄ±à½âÂë¿ò¼Ü£º

¢ÙJBossµÄMarshalling°ü

¢ÚgoogleµÄProtobuf

¢Û»ùÓÚProtobufµÄKyro

¢ÜMessagePack¿ò¼Ü

ÉÏ´úÂ룬һ¶Á¾Í¶®£¬×¢ÒâºìÉ«×ÖÌ岿·Ö¡£

·þÎñÆ÷¶Ë£º

public class Server {

public Server(int port) {
EventLoopGroup bossGroup = newNioEventLoopGroup();
EventLoopGroup workerGroup = newNioEventLoopGroup();
try {
ServerBootstrap bootstrap = newServerBootstrap();
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.handler(newLoggingHandler(LogLevel.INFO))
.childHandler(newChannelInitializer<SocketChannel>() {
@Override
protected voidinitChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast (MarshallingCodeCFactory.buildMarshallingDecoder());
socketChannel.pipeline().addLast (MarshallingCodeCFactory.buildMarshallingEncoder());
socketChannel.pipeline().addLast (new ServerHandler());
}
})
.option(ChannelOption.SO_BACKLOG,1024)
.option(ChannelOption.SO_RCVBUF, 32 * 1024)
.option(ChannelOption.SO_SNDBUF, 32 * 1024)
.option(ChannelOption.SO_KEEPALIVE, true);
ChannelFuture future = bootstrap.bind(port).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
new Server(8765);
}
}

ServerHandlerÀࣺ

public classServerHandler extends ChannelHandlerAdapter {

@Override
public voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
ctx.close();
}

@Override
public voidchannelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}

@Override
public void channelRead(ChannelHandlerContextctx, Object msg) throws Exception {
Request request = (Request) msg;
System.out.println("Server:"+ request.getId() + "," + request.getName() + "," +request.getReqeustMessag());

Response response = new Response();
response.setId(request.getId());
response.setName("response "+ request.getId());
response.setResponseMessage("ÏìÓ¦ÄÚÈÝ£º" +request.getReqeustMessag());
byte[] unGizpData =GzipUtils.unGzip(request.getAttachment());
char separator = File.separatorChar;
FileOutputStream outputStream = newFileOutputStream(System.getProperty("user.dir") + separator +"recieve" + separator + "1.png");
outputStream.write(unGizpData);
outputStream.flush();
outputStream.close();
ctx.writeAndFlush(response);
}
}

¿Í»§¶Ë£º

public class Client {

public static void main(String[] args) {
EventLoopGroup workerGroup = newNioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.handler(newLoggingHandler(LogLevel.INFO))
.channel(NioSocketChannel.class)
.handler(newChannelInitializer<SocketChannel>() {
@Override
protected voidinitChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast (MarshallingCodeCFactory.buildMarshallingEncoder());
socketChannel.pipeline().addLast (MarshallingCodeCFactory.buildMarshallingDecoder());
socketChannel.pipeline().addLast (new ClientHandler());
}
});
ChannelFuture future =bootstrap.connect (new InetSocketAddress("127.0.01", 8765)).sync();
for(int i=1; i<=5; i++) {
Request request = newRequest();
request.setId(i);
request.setName("pro"+ i);
request.setReqeustMessag("Êý¾ÝÐÅÏ¢" + i);
//´«ÊäͼƬ
char separator =File.separatorChar;
File file = newFile(System.getProperty("user.dir") + separator + "source"+ separator + "2.jpg");
FileInputStream inputStream = newFileInputStream(file);
byte[] data = newbyte[inputStream.available()];
inputStream.read(data);
inputStream.close();
byte[] gzipData =GzipUtils.gzip(data);
request.setAttachment(gzipData);
future.channel().writeAndFlush(request);
}

future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}

}

ClientHandlerÀࣺ

public classClientHandler extends ChannelHandlerAdapter {
@Override
public voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
super.exceptionCaught(ctx, cause);
}

@Override
public voidchannelActive(ChannelHandlerContext ctx) throws Exception {
super.channelActive(ctx);
}

@Override
public voidchannelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
Response response = (Response) msg;
System.out.println("Client:"+ response.getId() + "," + response.getName() + "," +response.getResponseMessage());
}
}

Marshalling¹¤¾ßÀࣺ

public final classMarshallingCodeCFactory {

/**
* ´´½¨Jboss Marshalling½âÂëÆ÷MarshallingDecoder
* @return MarshallingDecoder
*/
public static MarshallingDecoderbuildMarshallingDecoder() {
//Ê×ÏÈͨ¹ýMarshalling¹¤¾ßÀàµÄ¾«Í¨·½·¨»ñÈ¡MarshallingʵÀý¶ÔÏó ²ÎÊýserial±êʶ´´½¨µÄÊÇjavaÐòÁл¯¹¤³§¶ÔÏó¡£
final MarshallerFactorymarshallerFactory =Marshalling.getProvidedMarshallerFactory("serial");
//´´½¨ÁËMarshallingConfiguration¶ÔÏó£¬ÅäÖÃÁ˰汾ºÅΪ5
final MarshallingConfigurationconfiguration = new MarshallingConfiguration();
configuration.setVersion(5);
//¸ù¾ÝmarshallerFactoryºÍconfiguration´´½¨provider
UnmarshallerProvider provider= new DefaultUnmarshallerProvider(marshallerFactory, configuration);
//¹¹½¨NettyµÄMarshallingDecoder¶ÔÏó£¬Á©¸ö²ÎÊý·Ö±ðΪproviderºÍµ¥¸öÏûÏ¢ÐòÁл¯ºóµÄ×î´ó³¤¶È
MarshallingDecoder decoder =new MarshallingDecoder(provider, 1024 * 1024);
return decoder;
}

/**
* ´´½¨Jboss Marshalling±àÂëÆ÷MarshallingEncoder
* @return MarshallingEncoder
*/
public static MarshallingEncoderbuildMarshallingEncoder() {
final MarshallerFactorymarshallerFactory =Marshalling.getProvidedMarshallerFactory("serial");
final MarshallingConfigurationconfiguration = new MarshallingConfiguration();
configuration.setVersion(5);
MarshallerProvider provider =new DefaultMarshallerProvider(marshallerFactory, configuration);
//¹¹½¨NettyµÄMarshallingEncoder¶ÔÏó£¬MarshallingEncoderÓÃÓÚʵÏÖÐòÁл¯½Ó¿ÚµÄPOJO¶ÔÏóÐòÁл¯Îª¶þ½øÖÆÊý×é
MarshallingEncoder encoder =new MarshallingEncoder(provider);
return encoder;
}
}

GizpѹËõÓë½âѹËõ¹¤¾ßÀࣺ

public classGzipUtils {
public static byte[] gzip(byte[] val)throws IOException {
ByteArrayOutputStream bos = newByteArrayOutputStream(val.length);
GZIPOutputStream gos = null;
try {
gos = new GZIPOutputStream(bos);
gos.write(val, 0, val.length);
gos.finish();
gos.flush();
bos.flush();
val = bos.toByteArray();
} finally {
if (gos != null)
gos.close();
if (bos != null)
bos.close();
}
return val;
}

public static byte[] unGzip(byte[] buf)throws IOException {
GZIPInputStream gzi = null;
ByteArrayOutputStream bos = null;
try {
gzi = new GZIPInputStream(newByteArrayInputStream(buf));
bos = newByteArrayOutputStream(buf.length);
int count = 0;
byte[] tmp = new byte[2048];
while ((count = gzi.read(tmp)) !=-1) {
bos.write(tmp, 0, count);
}
buf = bos.toByteArray();
} finally {
if (bos != null) {
bos.flush();
bos.close();
}
if (gzi != null)
gzi.close();
}
return buf;
}
}

7.×î¼Ñʵ¼ù

£¨1£©Êý¾ÝͨÐÅ

ÎÒÃÇÐèÒªÁ˽âÔÚÕæÕýÏîÄ¿ÖÐÈçºÎʹÓÃNetty£¬´óÌåÉ϶ÔÓÚһЩ²ÎÊýÉèÖö¼ÊǸù¾Ý·þÎñÆ÷ÐÔÄܾö¶¨µÄ¡£ÎÒÃÇÐèÒª¿¼ÂǵÄÎÊÌâÊÇÁ½Ì¨»úÆ÷£¨ÉõÖÁ¶ą̀£©Ê¹ÓÃNettyÔõÑù½øÐÐͨÐÅ¡£

´óÌåÉÏ·ÖΪÈýÖÖ£º

¢ÙʹÓó¤Á¬½ÓͨµÀ²»¶Ï¿ªµÄÐÎʽ½øÐÐͨÐÅ£¬Ò²¾ÍÊÇ·þÎñÆ÷ºÍ¿Í»§¶ËµÄͨµÀÒ»Ö±´¦ÓÚ¿ªÆô״̬£¬Èç¹û·þÎñÆ÷ÐÔÄÜ×ã¹»ºÃ£¬²¢ÇÒ¿Í»§¶ËÊýÁ¿Ò²±È½ÏÉϵÄÇé¿öÏ£¬ÍƼöÕâÖÖ·½Ê½¡£

¢ÚÒ»´ÎÐÔÅúÁ¿Ìá½»Êý¾Ý£¬²ÉÓöÌÁ¬½Ó·½Ê½¡£Ò²¾ÍÊÇ˵ÏȰÑÊý¾Ý±£´æµ½±¾µØÁÙʱ»º´æÇø»òÕßÁÙʱ±í£¬µ±´ïµ½½çֵʱ½øÐÐÒ»´ÎÐÔÅúÁ¿Ìá½»£¬ÓÖ»òÕ߸ù¾Ý¶¨Ê±ÈÎÎñÂÖѯÌá½»£¬

ÕâÖÖÇé¿öµÄ±×¶ËÊÇ×ö²»µ½ÊµÊ±ÐÔ´«Ê䣬¶ÔʵʱÐÔÒªÇ󲻸ߵÄÓ¦ÓóÌÐòÖÐÍÆ¼öʹÓá£

¢ÛʹÓÃÒ»ÖÖÌØÊâµÄ³¤Á¬½Ó£¬ÔÚijһָ¶¨Ê±¼ä¶ÎÄÚ£¬·þÎñÆ÷Óëij̨¿Í»§¶ËûÓÐÈκÎͨÐÅ£¬Ôò¶Ï¿ªÁ¬½Ó¡£Ï´ÎÁ¬½ÓÔòÊǿͻ§¶ËÏò·þÎñÆ÷·¢ËÍÇëÇóµÄʱºò£¬Ôٴν¨Á¢Á¬½Ó¡£

ÔÚÕâÀォ½éÉÜʹÓÃNettyʵÏÖµÚÈýÖÖ·½Ê½µÄÁ¬½Ó£¬µ«ÊÇÎÒÃÇÐèÒª¿¼ÂÇÁ½¸öÒòËØ£º

¢ÙÈçºÎÔÚ³¬Ê±£¨¼´·þÎñÆ÷ºÍ¿Í»§¶ËûÓÐÈκÎͨÐÅ£©ºó¹Ø±ÕͨµÀ£¿¹Ø±ÕͨµÀºóÓÖÈçºÎÔٴν¨Á¢Á¬½Ó£¿

¢Ú¿Í»§¶Ëå´»úʱ£¬ÎÒÃÇÎÞÐ迼ÂÇ£¬Ï´ÎÖØÆô¿Í»§¶ËÖ®ºó¾Í¿ÉÒÔÓë·þÎñÆ÷½¨Á¢Á¬½Ó£¬µ«·þÎñÆ÷å´»úʱ£¬¿Í»§¶ËÈçºÎÓë·þÎñÆ÷¶ËͨÐÅ£¿

·þÎñÆ÷¶Ë£ºÔö¼ÓÁ˺ìÉ«¿ò²¿·Ö

¿Í»§¶Ë£¨×¢ÒâºìÉ«×ÖÌ岿·Ö£©£º

public class Client {

private static class SingleHodler {
static final Client client = newClient();
}

public static Client getInstance() {
return SingleHodler.client;
}

private EventLoopGroup workerGroup;
private Bootstrap bootstrap;
private ChannelFuture future;

private Client() {
workerGroup = new NioEventLoopGroup();
bootstrap = new Bootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(newChannelInitializer<SocketChannel>() {
@Override
protected voidinitChannel(SocketChannel socketChannel) throws Exception {
socketChannel.pipeline().addLast (MarshallingCodeCFactory.buildMarshallingEncoder());
socketChannel.pipeline().addLast (MarshallingCodeCFactory.buildMarshallingDecoder());
socketChannel.pipeline().addLast (newReadTimeoutHandler(5)); //5ÃëºóδÓë·þÎñÆ÷ͨÐÅ£¬Ôò¶Ï¿ªÁ¬½Ó¡£
socketChannel.pipeline().addLast (new ClientHandler());
}
});
}

public void connect() {
try {
future =bootstrap.connect("127.0.0.1", 8765).sync();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

public ChannelFuture getFuture() {
if(future == null ||!future.channel().isActive()) {
this.connect();
}
return future;
}

public static void main(String[] args)throws InterruptedException {
Client client = getInstance();
ChannelFuture future = client.getFuture();

for(int i=1; i<=3; i++) {
Message message = new Message(i,"pro" + i, "Êý¾ÝÐÅÏ¢" + i);
future.channel().writeAndFlush(message);
Thread.sleep(4000); //ÐÝÃß4ÃëºóÔÙ·¢ËÍÊý¾Ý
}

future.channel().closeFuture().sync();

new Thread(() -> {
try {
System.out.println("×ÓÏ߳̿ªÊ¼....");
ChannelFuture f =client.getFuture();
Message message = newMessage(4, "pro" + 4, "Êý¾ÝÐÅÏ¢" + 4);
f.channel().writeAndFlush(message);
f.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
}
}).start();

System.out.println("Ö÷Ïß³ÌÍ˳ö......");
}
}

ÆäËûµÄÀàÓë֮ǰµÄÒ»Ñù£¬Ã»Óб仯¡£

ÔËÐнá¹û£º

£¨2£©ÐÄÌø¼ì²â

ÎÒÃÇʹÓÃSocketͨÐÅÒ»°ã¾­³£»á´¦Àí¶à¸ö·þÎñÆ÷Ö®¼äµÄÐÄÌø¼ì²â£¬Ò»°ãÀ´½²ÎÒÃÇȥά»¤·þÎñÆ÷¼¯Èº£¬¿Ï¶¨ÒªÓÐһ̨»ò¶ą̀·þÎñÆ÷Ö÷»ú£¨Master£©£¬È»ºó»¹Ó¦¸ÃÓÐN̨£¨Slave£©£¬ÄÇôÎÒÃǵÄÖ÷»ú¿Ï¶¨ÒªÊ±Ê±¿Ì¿ÌÖªµÀ×Ô¼ºÏÂÃæµÄ´Ó·þÎñÆ÷µÄ¸÷·½ÃæÇé¿ö£¬È»ºó½øÐÐʵʱ¼à¿ØµÄ¹¦ÄÜ¡£Õâ¸öÔÚ·Ö²¼Ê½¼Ü¹¹Àï½»×öÐÄÌø¼ì²â»òÕßÐÄÌø¼à¿Ø¡£×î¼Ñ´¦Àí·½°¸ÊÇʹÓÃһЩͨÐÅ¿ò¼Ü½øÐÐʵÏÖ£¬Netty¾Í¿ÉÒÔ×öÕâÑùµÄÊ¡£

Server

public class Server {
public Server(int port) {
EventLoopGroup bossGroup = newNioEventLoopGroup();
EventLoopGroup workerGroup = newNioEventLoopGroup();
try {
ServerBootstrap bootstrap = newServerBootstrap();
bootstrap.group(bossGroup,workerGroup)
.channel(NioServerSocketChannel.class)
.childHandler(newChannelInitializer<SocketChannel>() {
@Override
protected voidinitChannel(SocketChannel sc) throws Exception {
sc.pipeline().addLast (MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast (MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast (new ServerHeartBeatHandler());
}
})
.handler(newLoggingHandler(LogLevel.INFO))
.option(ChannelOption.SO_BACKLOG, 1024);
ChannelFuture future =bootstrap.bind(new InetSocketAddress("127.0.0.1", port)).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}

public static void main(String[] args) {
new Server(8765);
}
}

ServerHeartBeatHandlerÀࣺ

public classServerHeartBeatHandler extends ChannelHandlerAdapter {

private static Map<String, String>AUTH_IP_MAP = new HashMap<>();
private static final String SUCCESS_KEY ="auth_success_key";

static {
AUTH_IP_MAP.put("192.168.3.176","1234");
}

private boolean auth(ChannelHandlerContextctx, Object msg) {
String[] rets = ((String)msg).split(",");
String auth = AUTH_IP_MAP.get(rets[0]);
if(auth != null &&auth.equals(rets[1])) {
ctx.writeAndFlush(SUCCESS_KEY);
return true;
} else {
ctx.writeAndFlush("authfailure!") .addListener(ChannelFutureListener.CLOSE);
return false;
}
}

@Override
public void channelRead(ChannelHandlerContextctx, Object msg) throws Exception {
if(msg instanceof String) {
auth(ctx, msg);
} else if(msg instanceof RequestInfo) {
RequestInfo info = (RequestInfo)msg;
System.out.println("----------------------------------------------");
System.out.println("µ±Ç°Ö÷»úip£º" +info.getIp());
System.out.println("µ±Ç°Ö÷»úcpu£ºÇé¿ö");
Map<String, Object> cpuMap =info.getCpuPercMap();
System.out.println("×ÜʹÓÃÂÊ£º" + cpuMap.get("combined"));
System.out.println("Óû§Ê¹ÓÃÂÊ£º" +cpuMap.get("user"));
System.out.println("ϵͳʹÓÃÂÊ£º" +cpuMap.get("sys"));
System.out.println("µÈ´ýÂÊ£º" +cpuMap.get("wait"));
System.out.println("¿ÕÏÐÂÊ£º" +cpuMap.get("idle"));
System.out.println("µ±Ç°Ö÷»úmemoryÇé¿ö£º");
Map<String, Object> memMap =info.getMemoryMap();
System.out.println("ÄÚ´æ×ÜÁ¿£º" +memMap.get("total"));
System.out.println("µ±Ç°ÄÚ´æÊ¹ÓÃÁ¿£º" +memMap.get("used"));
System.out.println("µ±Ç°ÄÚ´æÊ£ÓàÁ¿£º" +memMap.get("free"));
System.out.println("-----------------------------------------------");
ctx.writeAndFlush("inforeceived!");
} else {
ctx.writeAndFlush("connectfailure") .addListener(ChannelFutureListener.CLOSE);
}
}
}

ClientÀࣺ

public class Client {
public static void main(String[] args) {
EventLoopGroup workerGroup = newNioEventLoopGroup();
try {
Bootstrap bootstrap = newBootstrap();
bootstrap.group(workerGroup)
.channel(NioSocketChannel.class)
.handler(newChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannelsc) throws Exception {
sc.pipeline().addLast (MarshallingCodeCFactory.buildMarshallingDecoder());
sc.pipeline().addLast (MarshallingCodeCFactory.buildMarshallingEncoder());
sc.pipeline().addLast (new ClientHeartBeatHandler());
}
});
ChannelFuture future =bootstrap.connect (new InetSocketAddress("127.0.0.1", 8765)).sync();
future.channel().closeFuture().sync();
} catch (Exception e) {
e.printStackTrace();
} finally {
workerGroup.shutdownGracefully();
}
}
}

ClientHeartBeatHandlerÀࣺ

public classClientHeartBeatHandler extends ChannelHandlerAdapter {

private ScheduledExecutorService scheduled= Executors.newScheduledThreadPool(1);
private ScheduledFuture<> heartBeat;
private InetAddress address;
private static final String SUCCESS_KEY ="auth_success_key";

@Override
public voidchannelActive(ChannelHandlerContext ctx) throws Exception {
address = InetAddress.getLocalHost();
String ip = address.getHostAddress();
String key = "1234";
String auth = ip + "," + key;
ctx.writeAndFlush(auth);
}

@Override
public voidexceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
cause.printStackTrace();
if(heartBeat != null) {
heartBeat.cancel(true);
heartBeat = null;
}
ctx.fireExceptionCaught(cause);
}

@Override
public voidchannelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
try {
if(msg instanceof String) {
String data = (String) msg;
if(SUCCESS_KEY.equals(data)) {
heartBeat =scheduled.scheduleWithFixedDelay(new HeartBeatTask(ctx), 0, 5,TimeUnit.SECONDS);
System.out.println(msg);
} else {
System.out.println(msg);
}
}
} finally {
ReferenceCountUtil.release(msg);
}
}

private class HeartBeatTask implements Runnable{
private final ChannelHandlerContextctx;

publicHeartBeatTask(ChannelHandlerContext ctx) {
this.ctx = ctx;
}

@Override
public void run() {
try {
RequestInfo requestInfo = newRequestInfo();
requestInfo.setIp(address.getHostAddress());
Sigar sigar = new Sigar();
CpuPerc cpuPerc =sigar.getCpuPerc();
Map<String, Object>cpuPercMap = new HashMap<>();
cpuPercMap.put("combined",cpuPerc.getCombined());
cpuPercMap.put("user", cpuPerc.getUser());
cpuPercMap.put("sys",cpuPerc.getSys());
cpuPercMap.put("wait", cpuPerc.getWait());
cpuPercMap.put("idle",cpuPerc.getIdle());

Mem mem = sigar.getMem();
Map<String, Object>memoryMap = new HashMap<>();
memoryMap.put("total", mem.getTotal() / (1024 * 1024));
memoryMap.put("used",mem.getUsed() / (1024 * 1024));
memoryMap.put("free",mem.getFree() / (1024 * 1024));

requestInfo.setCpuPercMap(cpuPercMap);
requestInfo.setMemoryMap(memoryMap);

ctx.writeAndFlush(requestInfo);
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

RequestInfoÀࣺ

public classRequestInfo implements Serializable {

private String ip ;
private Map<String, Object>cpuPercMap ;
private Map<String, Object>memoryMap;
//.. other field

public String getIp() {
return ip;
}

public void setIp(String ip) {
this.ip = ip;
}

public Map<String, Object>getCpuPercMap() {
return cpuPercMap;
}

public voidsetCpuPercMap(Map<String, Object> cpuPercMap) {
this.cpuPercMap = cpuPercMap;
}

public Map<String, Object>getMemoryMap() {
return memoryMap;
}

public void setMemoryMap(Map<String,Object> memoryMap) {
this.memoryMap = memoryMap;
}
}

MarshallingCodeCFactoryÀà¾Í²»Ìù³öÀ´ÁË£¬¸ú֮ǰµÄÒ»Ñù¡£

ÔËÐнá¹û£º

ÿ5Ãë·¢ËÍÒ»´ÎÊý¾Ýµ½·þÎñÆ÷¶Ë£¬ÕâÑùÖ÷·þÎñÆ÷¾Í¿ÉÒÔÖªµÀÿ̨´Ó·þÎñÆ÷µÄ״̬ÁË¡£µ±È»£¬ÕâÖ»ÊÇÒ»¸ö¼òµ¥µÄСÀý×Ó£¬ÕæÊµ»·¾³Öп϶¨ÐèÒª¸üÑϸñµÄУÑé¡£

 
   
2669 ´Îä¯ÀÀ       28
Ïà¹ØÎÄÕÂ

Java΢·þÎñÐÂÉú´úÖ®Nacos
ÉîÈëÀí½âJavaÖеÄÈÝÆ÷
JavaÈÝÆ÷Ïê½â
Java´úÂëÖÊÁ¿¼ì²é¹¤¾ß¼°Ê¹Óð¸Àý
Ïà¹ØÎĵµ

JavaÐÔÄÜÓÅ»¯
Spring¿ò¼Ü
SSM¿ò¼Ü¼òµ¥¼òÉÜ
´ÓÁ㿪ʼѧjava±à³Ì¾­µä
Ïà¹Ø¿Î³Ì

¸ßÐÔÄÜJava±à³ÌÓëϵͳÐÔÄÜÓÅ»¯
JavaEE¼Ü¹¹¡¢ Éè¼ÆÄ£Ê½¼°ÐÔÄܵ÷ÓÅ
Java±à³Ì»ù´¡µ½Ó¦Óÿª·¢
JAVAÐéÄâ»úÔ­ÀíÆÊÎö