首页 > 学院 > 开发设计 > 正文

基于Netty的RPC架构笔记3之线程模型源码分析(1)

2019-11-11 07:06:51
字体:
来源:转载
供稿:网友

      随着用户量上升,项目的架构也在不断的升级,由最开始的MVC的垂直架构(传统项目)到RPC架构(webservice,rest,netty,mina),再到SOA模型(dubbo),再到最近的微服务,又比如Tomcat6之前的IO模型都是BIO 也就是阻塞IO,到后来变成多路复用,也是阻塞IO。到非阻塞NIO,再到异步非阻塞AIO,

     言归正传,接着谈netty,传统IO是一个线程服务一个客户,后来通过netty,可以一个线程服务多个客户,下面的那个图展示的是netty的NIO通过引入多线程来提高性能,既一个线程负责一片用户

直接上代码

package com.cn;import java.net.InetSocketAddress;import java.util.concurrent.Executors;import com.cn.pool.NioSelectorRunnablePool;/** * 启动函数 * */public class Start {	public static void main(String[] args) {				//初始化线程		NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());				//获取服务类		ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool);				//绑定端口		bootstrap.bind(new InetSocketAddress(10101));				System.out.PRintln("start");	}}
package com.cn.pool;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicInteger;import com.cn.NioServerBoss;import com.cn.NioServerWorker;/** * selector线程管理者 * */public class NioSelectorRunnablePool {	/**	 * boss线程数组	 */	private final AtomicInteger bossIndex = new AtomicInteger();	private Boss[] bosses;	/**	 * worker线程数组	 */	private final AtomicInteger workerIndex = new AtomicInteger();	private Worker[] workeres;		public NioSelectorRunnablePool(Executor boss, Executor worker) {		initBoss(boss, 1);		initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);	}	/**	 * 初始化boss线程	 * @param boss	 * @param count	 */	private void initBoss(Executor boss, int count) {		this.bosses = new NioServerBoss[count];		for (int i = 0; i < bosses.length; i++) {			bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);		}	}	/**	 * 初始化worker线程	 * @param worker	 * @param count	 */	private void initWorker(Executor worker, int count) {		this.workeres = new NioServerWorker[count];		for (int i = 0; i < workeres.length; i++) {			workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this);		}	}	/**	 * 获取一个worker	 * @return	 */	public Worker nextWorker() {		 return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)];	}	/**	 * 获取一个boss	 * @return	 */	public Boss nextBoss() {		 return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];	}}
package com.cn;import java.net.SocketAddress;import java.nio.channels.ServerSocketChannel;import com.cn.pool.Boss;import com.cn.pool.NioSelectorRunnablePool;/** * 服务类 * */public class ServerBootstrap {private NioSelectorRunnablePool selectorRunnablePool;		public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {		this.selectorRunnablePool = selectorRunnablePool;	}		/**	 * 绑定端口	 * @param localAddress	 */	public void bind(final SocketAddress localAddress){		try {			// 获得一个ServerSocket通道			ServerSocketChannel serverChannel = ServerSocketChannel.open();			// 设置通道为非阻塞			serverChannel.configureBlocking(false);			// 将该通道对应的ServerSocket绑定到port端口			serverChannel.socket().bind(localAddress);						//获取一个boss线程			Boss nextBoss = selectorRunnablePool.nextBoss();			//向boss注册一个ServerSocket通道			nextBoss.registerAcceptChannelTask(serverChannel);		} catch (Exception e) {			e.printStackTrace();		}	}}
package com.cn.pool;import java.nio.channels.SocketChannel;/** * worker接口 * */public interface Worker {		/**	 * 加入一个新的客户端会话	 * @param channel	 */	public void registerNewChannelTask(SocketChannel channel);}
package com.cn.pool;import java.nio.channels.ServerSocketChannel;/** * boss接口 * */public interface Boss {		/**	 * 加入一个新的ServerSocket	 * @param serverChannel	 */	public void registerAcceptChannelTask(ServerSocketChannel serverChannel);}
package com.cn;import java.io.IOException;import java.nio.channels.Selector;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicBoolean;import com.cn.pool.NioSelectorRunnablePool;/** * 抽象selector线程类 *  *  */public abstract class AbstractNioSelector implements Runnable {	/**	 * 线程池	 */	private final Executor executor;	/**	 * 选择器	 */	protected Selector selector;	/**	 * 选择器wakenUp状态标记	 */	protected final AtomicBoolean wakenUp = new AtomicBoolean();	/**	 * 任务队列	 */	private final Queue<Runnable> taskQueue = new ConcurrentLinkedQueue<Runnable>();	/**	 * 线程名称	 */	private String threadName;		/**	 * 线程管理对象	 */	protected NioSelectorRunnablePool selectorRunnablePool;	AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {		this.executor = executor;		this.threadName = threadName;		this.selectorRunnablePool = selectorRunnablePool;		openSelector();	}	/**	 * 获取selector并启动线程	 */	private void openSelector() {		try {			this.selector = Selector.open();		} catch (IOException e) {			throw new RuntimeException("Failed to create a selector.");		}		executor.execute(this);	}	@Override	public void run() {				Thread.currentThread().setName(this.threadName);		while (true) {			try {				wakenUp.set(false);				select(selector);				processTaskQueue();				process(selector);			} catch (Exception e) {				// ignore			}		}	}	/**	 * 注册一个任务并激活selector	 * 	 * @param task	 */	protected final void registerTask(Runnable task) {		taskQueue.add(task);		Selector selector = this.selector;		if (selector != null) {			if (wakenUp.compareAndSet(false, true)) {				selector.wakeup();			}		} else {			taskQueue.remove(task);		}	}	/**	 * 执行队列里的任务	 */	private void processTaskQueue() {		for (;;) {			final Runnable task = taskQueue.poll();			if (task == null) {				break;			}			task.run();		}	}		/**	 * 获取线程管理对象	 * @return	 */	public NioSelectorRunnablePool getSelectorRunnablePool() {		return selectorRunnablePool;	}	/**	 * select抽象方法	 * 	 * @param selector	 * @return	 * @throws IOException	 */	protected abstract int select(Selector selector) throws IOException;	/**	 * selector的业务处理	 * 	 * @param selector	 * @throws IOException	 */	protected abstract void process(Selector selector) throws IOException;}
package com.cn;import java.io.IOException;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.Executor;import com.cn.pool.Boss;import com.cn.pool.NioSelectorRunnablePool;import com.cn.pool.Worker;/** * boss实现类 * */public class NioServerBoss extends AbstractNioSelector implements Boss{	public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {		super(executor, threadName, selectorRunnablePool);	}	@Override	protected void process(Selector selector) throws IOException {		Set<SelectionKey> selectedKeys = selector.selectedKeys();        if (selectedKeys.isEmpty()) {            return;        }                for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {            SelectionKey key = i.next();            i.remove();            ServerSocketChannel server = (ServerSocketChannel) key.channel();    		// 新客户端    		SocketChannel channel = server.accept();    		// 设置为非阻塞    		channel.configureBlocking(false);    		// 获取一个worker    		Worker nextworker = getSelectorRunnablePool().nextWorker();    		// 注册新客户端接入任务    		nextworker.registerNewChannelTask(channel);    		    		System.out.println("新客户端链接");        }	}			public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){		 final Selector selector = this.selector;		 registerTask(new Runnable() {			@Override			public void run() {				try {					//注册serverChannel到selector					serverChannel.register(selector, SelectionKey.OP_ACCEPT);				} catch (ClosedChannelException e) {					e.printStackTrace();				}			}		});	}		@Override	protected int select(Selector selector) throws IOException {		return selector.select();	}}
package com.cn;import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.Executor;import com.cn.pool.NioSelectorRunnablePool;import com.cn.pool.Worker;/** * worker实现类 * */public class NioServerWorker extends AbstractNioSelector implements Worker{	public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {		super(executor, threadName, selectorRunnablePool);	}	@Override	protected void process(Selector selector) throws IOException {		Set<SelectionKey> selectedKeys = selector.selectedKeys();        if (selectedKeys.isEmpty()) {            return;        }        Iterator<SelectionKey> ite = this.selector.selectedKeys().iterator();		while (ite.hasNext()) {			SelectionKey key = (SelectionKey) ite.next();			// 移除,防止重复处理			ite.remove();						// 得到事件发生的Socket通道			SocketChannel channel = (SocketChannel) key.channel();						// 数据总长度			int ret = 0;			boolean failure = true;			ByteBuffer buffer = ByteBuffer.allocate(1024);			//读取数据			try {				ret = channel.read(buffer);				failure = false;			} catch (Exception e) {				// ignore			}			//判断是否连接已断开			if (ret <= 0 || failure) {				key.cancel();				System.out.println("客户端断开连接");	        }else{	        	 System.out.println("收到数据:" + new String(buffer.array()));	        	 	     		//回写数据	     		ByteBuffer outBuffer = ByteBuffer.wrap("收到/n".getBytes());	     		channel.write(outBuffer);// 将消息回送给客户端	        }		}	}	/**	 * 加入一个新的socket客户端	 */	public void registerNewChannelTask(final SocketChannel channel){		 final Selector selector = this.selector;		 registerTask(new Runnable() {			@Override			public void run() {				try {					//将客户端注册到selector中					channel.register(selector, SelectionKey.OP_READ);				} catch (ClosedChannelException e) {					e.printStackTrace();				}			}		});	}	@Override	protected int select(Selector selector) throws IOException {		return selector.select(500);	}	}


上一篇:CString类的完美总结

下一篇:Qt之QFtp

发表评论 共有条评论
用户名: 密码:
验证码: 匿名发表