在上篇《Netty(一)引题》中,分别对AIO,BIO,PIO,NIO进行了简单的阐述,并写了简单的demo。但是这里说的简单,我也只能呵呵了,特别是NIO、AIO(我全手打的,好麻烦)。
在开始netty开发TimeServer之前,先回顾下NIO进行服务端开发的步骤:
一个简单的NIO程序,需要经过繁琐的十多步操作才能完成最基本的消息读取和发送,这也是我学netty的原因,下面就看看使用netty是如何轻松写服务器的。
在这里,我使用IDEA 14 + Maven用netty写上篇中TimeServer的程序。这里我直接用Maven的pom.xml来直接下载netty的包(Maven是对依赖进行管理,支持自动化的测试、编译、构建的项目管理工具,具体的Maven请读者自行百度、google搜索)。
/* TimeServer */
1 public class TimeServer { 2 public void bind(int port)throws Exception{ 3 /* 配置服务端的NIO线程组 */ 4 // NioEventLoopGroup类 是个线程组,包含一组NIO线程,用于网络事件的处理 5 // (实际上它就是Reactor线程组)。 6 // 创建的2个线程组,1个是服务端接收客户端的连接,另一个是进行SocketChannel的 7 // 网络读写 8 EventLoopGroup bossGroup = new NioEventLoopGroup(); 9 EventLoopGroup WorkerGroup = new NioEventLoopGroup();10 11 try {12 // ServerBootstrap 类,是启动NIO服务器的辅助启动类13 ServerBootstrap b = new ServerBootstrap();14 b.group(bossGroup,WorkerGroup)15 .channel(NioServerSocketChannel.class)16 .option(ChannelOption.SO_BACKLOG,1024)17 .childHandler(new ChildChannelHandler());18 19 // 绑定端口,同步等待成功20 ChannelFuture f= b.bind(port).sync();21 22 // 等待服务端监听端口关闭23 f.channel().closeFuture().sync();24 }finally {25 // 释放线程池资源26 bossGroup.shutdownGracefully();27 WorkerGroup.shutdownGracefully();28 }29 }30 31 PRivate class ChildChannelHandler extends ChannelInitializer<SocketChannel>{32 @Override33 protected void initChannel(SocketChannel arg0)throws Exception{34 arg0.pipeline().addLast(new TimeServerHandler());35 }36 }37 38 public static void main(String[]args)throws Exception{39 int port = 8080;40 if(args!=null && args.length>0){41 try {42 port = Integer.valueOf(args[0]);43 }44 catch (NumberFormatException ex){}45 }46 new TimeServer().bind(port);47 }48 }
1 public class TimeServerHandler extends ChannelHandlerAdapter{ 2 // 用于网络的读写操作 3 @Override 4 public void channelRead(ChannelHandlerContext ctx,Object msg) 5 throws Exception{ 6 ByteBuf buf = (ByteBuf)msg; 7 byte[]req = new byte[buf.readableBytes()]; 8 buf.readBytes(req); 9 String body = new String(req,"UTF-8");10 System.out.println("the time server order : " + body);11 12 String currentTime = "QUERY TIME ORDER".equalsIgnoreCase(body)?new Date(13 System.currentTimeMillis()).toString():"BAD ORDER";14 ByteBuf resp = Unpooled.copiedBuffer(currentTime.getBytes());15 ctx.write(resp);16 }17 18 @Override19 public void channelReadComplete(ChannelHandlerContext ctx)throws Exception{20 ctx.flush(); // 它的作用是把消息发送队列中的消息写入SocketChannel中发送给对方21 // 为了防止频繁的唤醒Selector进行消息发送,Netty的write方法,并不直接将消息写入SocketChannel中22 // 调用write方法只是把待发送的消息发到缓冲区中,再调用flush,将发送缓冲区中的消息23 // 全部写到SocketChannel中。24 }25 26 @Override27 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){28 ctx.close();29 }30 }
/* TimeClient */
1 public class TimeClient { 2 public void connect(String host,int port)throws Exception{ 3 // 配置服务端的NIO线程组 4 EventLoopGroup group = new NioEventLoopGroup(); 5 6 try { 7 // Bootstrap 类,是启动NIO服务器的辅助启动类 8 Bootstrap b = new Bootstrap(); 9 b.group(group).channel(NioSocketChannel.class)10 .option(ChannelOption.TCP_NODELAY,true)11 .handler(new ChannelInitializer<SocketChannel>() {12 @Override13 public void initChannel(SocketChannel ch)14 throws Exception{15 ch.pipeline().addLast(new TimeClientHandler());16 }17 });18 19 // 发起异步连接操作20 ChannelFuture f= b.connect(host,port).sync();21 22 // 等待客服端链路关闭23 f.channel().closeFuture().sync();24 }finally {25 group.shutdownGracefully();26 }27 }28 29 public static void main(String[]args)throws Exception{30 int port = 8080;31 if(args!=null && args.length>0){32 try {33 port = Integer.valueOf(args[0]);34 }35 catch (NumberFormatException ex){}36 }37 new TimeClient().connect("127.0.0.1",port);38 }39 }
1 public class TimeClientHandler extends ChannelHandlerAdapter{ 2 3 // 写日志 4 private static final Logger logger = 5 Logger.getLogger(TimeClientHandler.class.getName()); 6 7 private final ByteBuf firstMessage; 8 9 public TimeClientHandler(){10 byte[] req = "QUERY TIME ORDER".getBytes();11 firstMessage = Unpooled.buffer(req.length);12 firstMessage.writeBytes(req);13 }14 15 @Override16 public void channelRead(ChannelHandlerContext ctx,Object msg)17 throws Exception{18 ByteBuf buf = (ByteBuf)msg;19 byte[]req = new byte[buf.readableBytes()];20 buf.readBytes(req);21 String body = new String(req,"UTF-8");22 System.out.println("Now is : " + body);23 }24 25 @Override26 public void channelActive(ChannelHandlerContext ctx){27 // 当客户端和服务端建立tcp成功之后,Netty的NIO线程会调用channelActive28 // 发送查询时间的指令给服务端。29 // 调用ChannelHandlerContext的writeAndFlush方法,将请求消息发送给服务端30 // 当服务端应答时,channelRead方法被调用31 ctx.writeAndFlush(firstMessage);32 }33 34 @Override35 public void exceptionCaught(ChannelHandlerContext ctx,Throwable cause){36 logger.warning("message from:"+cause.getMessage());37 ctx.close();38 }39 }
本例子没有考虑读半包的处理,对于功能演示和测试,本例子没问题,但是如果进行性能或者压力测试,就不能正常工作了。在下一节会弄正确处理半包消息的例子。
项目在源码在src/main/java/Netty/下,分为客户端和服务端。
源码下载:GitHub地址:https://github.com/orange1438/Netty_Course
题外话:虽然文章全是我纯手打,没任何复制,但是文章大多数内容来自《Netty权威指南》,我也是顺便学习的。之前我做C++服务端,因为狗血的面试C++,结果公司系统居然是java的,无耐我所在的重庆,C++少得可怜,所以只有在公司里学java了。当然,有epoll,select,事件驱动,TCP/IP概念的小伙伴来说,学这个netty,还是挺简单的。
新闻热点
疑难解答