首页 > 学院 > 开发设计 > 正文

Netty(二)入门

2019-11-14 14:58:59
字体:
来源:转载
供稿:网友

在上篇《Netty(一)引题》中,分别对AIO,BIO,PIO,NIO进行了简单的阐述,并写了简单的demo。但是这里说的简单,我也只能呵呵了,特别是NIO、AIO(我全手打的,好麻烦)。
在开始netty开发TimeServer之前,先回顾下NIO进行服务端开发的步骤:

  • 1.创建ServerSocketChannel,配置它为非阻塞;
  • 2.绑定监听,配置TCP参数,如backlog大小;
  • 3.创建独立的IO线程,用于轮询多路复用器Selector;
  • 4.创建Selector,将之前创建的ServerSocketChannel注册到Selector上,监听SelectionKey.ACEPT;
  • 5.启动IO线程,在循环体中执行Selector.select()方法,轮询就绪的Channel;
  • 6.当轮询到处于就绪状态的Channel时,需要对其进行判断,如果是OP_ACCEPT状态,说明是新的客户端接入,则调用ServerSocketChannel.accept()方法接受新的客户端;
  • 7.设置新接入的客户端链接SocketChannel为非阻塞模式,配置其他的一些TCP参数;
  • 8.将SocketChannel注册到Selector,监听OP_READ操作位;
  • 9.如果轮询的Channel为OP_READ,则说明SocketChannel中有新的就绪的数据包需要读取,则构造ByteBuffer对象,读取数据包;
  • 10.如果轮询的Channel为OP_WRITE,说明还有数据没有发送完成,需要继续发送。

一个简单的NIO程序,需要经过繁琐的十多步操作才能完成最基本的消息读取和发送,这也是我学netty的原因,下面就看看使用netty是如何轻松写服务器的。
在这里,我使用IDEA 14 + Maven用netty写上篇中TimeServer的程序。这里我直接用Maven的pom.xml来直接下载netty的包(Maven是对依赖进行管理,支持自动化的测试、编译、构建的项目管理工具,具体的Maven请读者自行百度、google搜索)。

/* TimeServer */

 1 public class TimeServer { 2     public void bind(int port)throws Exception{ 3         /* 配置服务端的NIO线程组 */ 4         // NioEventLoopGroup类 是个线程组,包含一组NIO线程,用于网络事件的处理 5         // (实际上它就是Reactor线程组)。 6         // 创建的2个线程组,1个是服务端接收客户端的连接,另一个是进行SocketChannel的 7         // 网络读写 8         EventLoopGroup bossGroup = new NioEventLoopGroup(); 9         EventLoopGroup WorkerGroup = new NioEventLoopGroup();10 11         try {12             // ServerBootstrap 类,是启动NIO服务器的辅助启动类13             ServerBootstrap b = new ServerBootstrap();14             b.group(bossGroup,WorkerGroup)15                     .channel(NioServerSocketChannel.class)16                     .option(ChannelOption.SO_BACKLOG,1024)17                     .childHandler(new ChildChannelHandler());18 19             // 绑定端口,同步等待成功20             ChannelFuture f= b.bind(port).sync();21 22             // 等待服务端监听端口关闭23             f.channel().closeFuture().sync();24         }finally {25             // 释放线程池资源26             bossGroup.shutdownGracefully();27             WorkerGroup.shutdownGracefully();28         }29     }30 31     PRivate class ChildChannelHandler extends ChannelInitializer<SocketChannel>{32         @Override33         protected  void initChannel(SocketChannel arg0)throws Exception{34             arg0.pipeline().addLast(new TimeServerHandler());35         }36     }37 38     public static void main(String[]args)throws Exception{39         int port = 8080;40         if(args!=null && args.length>0){41             try {42                 port = Integer.valueOf(args[0]);43             }44             catch (NumberFormatException ex){}45         }46         new TimeServer().bind(port);47     }48 }
 1 public class TimeServerHandler extends ChannelHandlerAdapter{ 2     // 用于网络的读写操作 3     @Override 4     public void channelRead(ChannelHandlerContext ctx,Object msg) 5             throws Exception{ 6         ByteBuf buf = (ByteBuf)msg; 7         byte[]req = new byte[buf.readableBytes()]; 8         buf.readBytes(req); 9         String body = new String(req,"UTF-8");10         System.out.println("the time server order : " + body);11 12         String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(13                 System.currentTimeMillis()).toString():"BAD ORDER";14         ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());15         ctx.write(resp);16     }17 18     @Override19     public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{20         ctx.flush();   // 它的作用是把消息发送队列中的消息写入SocketChannel中发送给对方21         // 为了防止频繁的唤醒Selector进行消息发送,Netty的write方法,并不直接将消息写入SocketChannel中22         // 调用write方法只是把待发送的消息发到缓冲区中,再调用flush,将发送缓冲区中的消息23         // 全部写到SocketChannel中。24     }25 26     @Override27     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){28         ctx.close();29     }30 }

 

/* TimeClient */

 1 public class TimeClient { 2     public void connect(String host,int port)throws Exception{ 3         // 配置服务端的NIO线程组 4         EventLoopGroup group = new NioEventLoopGroup(); 5  6         try { 7             // Bootstrap 类,是启动NIO服务器的辅助启动类 8             Bootstrap b = new Bootstrap(); 9             b.group(group).channel(NioSocketChannel.class)10                     .option(ChannelOption.TCP_NODELAY,true)11                     .handler(new ChannelInitializer<SocketChannel>() {12                         @Override13                         public void initChannel(SocketChannel ch)14                                 throws Exception{15                             ch.pipeline().addLast(new TimeClientHandler());16                         }17                     });18 19             // 发起异步连接操作20             ChannelFuture f= b.connect(host,port).sync();21 22             // 等待客服端链路关闭23             f.channel().closeFuture().sync();24         }finally {25             group.shutdownGracefully();26         }27     }28 29     public static void main(String[]args)throws Exception{30         int port = 8080;31         if(args!=null && args.length>0){32             try {33                 port = Integer.valueOf(args[0]);34             }35             catch (NumberFormatException ex){}36         }37         new TimeClient().connect("127.0.0.1",port);38     }39 }
 1 public class TimeClientHandler extends ChannelHandlerAdapter{ 2  3     // 写日志 4     private static final Logger logger = 5             Logger.getLogger(TimeClientHandler.class.getName()); 6  7     private final ByteBuf firstMessage; 8  9     public TimeClientHandler(){10         byte[] req = "QUERY TIME ORDER".getBytes();11         firstMessage = Unpooled.buffer(req.length);12         firstMessage.writeBytes(req);13     }14 15     @Override16     public void channelRead(ChannelHandlerContext ctx,Object msg)17             throws Exception{18         ByteBuf buf = (ByteBuf)msg;19         byte[]req = new byte[buf.readableBytes()];20         buf.readBytes(req);21         String body = new String(req,"UTF-8");22         System.out.println("Now is : " + body);23     }24 25     @Override26     public void channelActive(ChannelHandlerContext ctx){27         // 当客户端和服务端建立tcp成功之后,Netty的NIO线程会调用channelActive28         // 发送查询时间的指令给服务端。29         // 调用ChannelHandlerContext的writeAndFlush方法,将请求消息发送给服务端30         // 当服务端应答时,channelRead方法被调用31         ctx.writeAndFlush(firstMessage);32     }33 34     @Override35     public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){36         logger.warning("message from:"+cause.getMessage());37         ctx.close();38     }39 }

 

本例子没有考虑读半包的处理,对于功能演示和测试,本例子没问题,但是如果进行性能或者压力测试,就不能正常工作了。在下一节会弄正确处理半包消息的例子。

项目在源码在src/main/java/Netty/下,分为客户端和服务端。

源码下载:GitHub地址:https://github.com/orange1438/Netty_Course

题外话:虽然文章全是我纯手打,没任何复制,但是文章大多数内容来自《Netty权威指南》,我也是顺便学习的。之前我做C++服务端,因为狗血的面试C++,结果公司系统居然是java的,无耐我所在的重庆,C++少得可怜,所以只有在公司里学java了。当然,有epoll,select,事件驱动,TCP/IP概念的小伙伴来说,学这个netty,还是挺简单的。


发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表