博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Netty3 核心代码
阅读量:3724 次
发布时间:2019-05-22

本文共 9684 字,大约阅读时间需要 32 分钟。

Netty3 核心代码

更多干货

概述

netty 核心代码基于nio 实现多线程。下面是简化版的netty代码

代码

Boss

import java.nio.channels.ServerSocketChannel;/** * boss接口 * * */public interface Boss {	/**	 * 加入一个新的ServerSocket	 * @param serverChannel	 */	public void registerAcceptChannelTask(ServerSocketChannel serverChannel);}

Worker

import java.nio.channels.SocketChannel;/** * worker接口 * * */public interface Worker {	/**	 * 加入一个新的客户端会话	 * @param channel	 */	public void registerNewChannelTask(SocketChannel channel);}

NioSelectorRunnablePool

import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicInteger;import com.cn.NioServerBoss;import com.cn.NioServerWorker;/** * selector线程管理者 * * */public class NioSelectorRunnablePool {	/**	 * boss线程数组	 */	private final AtomicInteger bossIndex = new AtomicInteger();	private Boss[] bosses;	/**	 * worker线程数组	 */	private final AtomicInteger workerIndex = new AtomicInteger();	private Worker[] workeres;	public NioSelectorRunnablePool(Executor boss, Executor worker) {		initBoss(boss, 1);		initWorker(worker, Runtime.getRuntime().availableProcessors() * 2);	}	/**	 * 初始化boss线程	 * @param boss	 * @param count	 */	private void initBoss(Executor boss, int count) {		this.bosses = new NioServerBoss[count];		for (int i = 0; i < bosses.length; i++) {			bosses[i] = new NioServerBoss(boss, "boss thread " + (i+1), this);		}	}	/**	 * 初始化worker线程	 * @param worker	 * @param count	 */	private void initWorker(Executor worker, int count) {		this.workeres = new NioServerWorker[count];		for (int i = 0; i < workeres.length; i++) {			workeres[i] = new NioServerWorker(worker, "worker thread " + (i+1), this);		}	}	/**	 * 获取一个worker	 * @return	 */	public Worker nextWorker() {		 return workeres[Math.abs(workerIndex.getAndIncrement() % workeres.length)];	}	/**	 * 获取一个boss	 * @return	 */	public Boss nextBoss() {		 return bosses[Math.abs(bossIndex.getAndIncrement() % bosses.length)];	}}

AbstractNioSelector

import java.io.IOException;import java.nio.channels.Selector;import java.util.Queue;import java.util.concurrent.ConcurrentLinkedQueue;import java.util.concurrent.Executor;import java.util.concurrent.atomic.AtomicBoolean;import com.cn.pool.NioSelectorRunnablePool;/** * 抽象selector线程类 * * * */public abstract class AbstractNioSelector implements Runnable {	/**	 * 线程池	 */	private final Executor executor;	/**	 * 选择器	 */	protected Selector selector;	/**	 * 选择器wakenUp状态标记	 */	protected final AtomicBoolean wakenUp = new AtomicBoolean();	/**	 * 任务队列	 */	private final Queue
taskQueue = new ConcurrentLinkedQueue
(); /** * 线程名称 */ private String threadName; /** * 线程管理对象 */ protected NioSelectorRunnablePool selectorRunnablePool; AbstractNioSelector(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) { this.executor = executor; this.threadName = threadName; this.selectorRunnablePool = selectorRunnablePool; openSelector(); } /** * 获取selector并启动线程 */ private void openSelector() { try { this.selector = Selector.open(); } catch (IOException e) { throw new RuntimeException("Failed to create a selector."); } executor.execute(this); } @Override public void run() { Thread.currentThread().setName(this.threadName); while (true) { try { wakenUp.set(false); select(selector); processTaskQueue(); process(selector); } catch (Exception e) { // ignore } } } /** * 注册一个任务并激活selector * * @param task */ protected final void registerTask(Runnable task) { taskQueue.add(task); Selector selector = this.selector; if (selector != null) { if (wakenUp.compareAndSet(false, true)) { selector.wakeup(); } } else { taskQueue.remove(task); } } /** * 执行队列里的任务 */ private void processTaskQueue() { for (;;) { final Runnable task = taskQueue.poll(); if (task == null) { break; } task.run(); } } /** * 获取线程管理对象 * @return */ public NioSelectorRunnablePool getSelectorRunnablePool() { return selectorRunnablePool; } /** * select抽象方法 * * @param selector * @return * @throws IOException */ protected abstract int select(Selector selector) throws IOException; /** * selector的业务处理 * * @param selector * @throws IOException */ protected abstract void process(Selector selector) throws IOException;}

NioServerBoss

import java.io.IOException;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.ServerSocketChannel;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.Executor;import com.cn.pool.Boss;import com.cn.pool.NioSelectorRunnablePool;import com.cn.pool.Worker;/** * boss实现类 * * */public class NioServerBoss extends AbstractNioSelector implements Boss{	public NioServerBoss(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {		super(executor, threadName, selectorRunnablePool);	}	@Override	protected void process(Selector selector) throws IOException {		Set
selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } for (Iterator
i = selectedKeys.iterator(); i.hasNext();) { SelectionKey key = i.next(); i.remove(); ServerSocketChannel server = (ServerSocketChannel) key.channel(); // 新客户端 SocketChannel channel = server.accept(); // 设置为非阻塞 channel.configureBlocking(false); // 获取一个worker Worker nextworker = getSelectorRunnablePool().nextWorker(); // 注册新客户端接入任务 nextworker.registerNewChannelTask(channel); System.out.println("新客户端链接"); } } public void registerAcceptChannelTask(final ServerSocketChannel serverChannel){ final Selector selector = this.selector; registerTask(new Runnable() { @Override public void run() { try { //注册serverChannel到selector serverChannel.register(selector, SelectionKey.OP_ACCEPT); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); } @Override protected int select(Selector selector) throws IOException { return selector.select(); }}

NioServerWorker

import java.io.IOException;import java.nio.ByteBuffer;import java.nio.channels.ClosedChannelException;import java.nio.channels.SelectionKey;import java.nio.channels.Selector;import java.nio.channels.SocketChannel;import java.util.Iterator;import java.util.Set;import java.util.concurrent.Executor;import com.cn.pool.NioSelectorRunnablePool;import com.cn.pool.Worker;/** * worker实现类 * * */public class NioServerWorker extends AbstractNioSelector implements Worker{	public NioServerWorker(Executor executor, String threadName, NioSelectorRunnablePool selectorRunnablePool) {		super(executor, threadName, selectorRunnablePool);	}	@Override	protected void process(Selector selector) throws IOException {		Set
selectedKeys = selector.selectedKeys(); if (selectedKeys.isEmpty()) { return; } Iterator
ite = this.selector.selectedKeys().iterator(); while (ite.hasNext()) { SelectionKey key = (SelectionKey) ite.next(); // 移除,防止重复处理 ite.remove(); // 得到事件发生的Socket通道 SocketChannel channel = (SocketChannel) key.channel(); // 数据总长度 int ret = 0; boolean failure = true; ByteBuffer buffer = ByteBuffer.allocate(1024); //读取数据 try { ret = channel.read(buffer); failure = false; } catch (Exception e) { // ignore } //判断是否连接已断开 if (ret <= 0 || failure) { key.cancel(); System.out.println("客户端断开连接"); }else{ System.out.println("收到数据:" + new String(buffer.array())); //回写数据 ByteBuffer outBuffer = ByteBuffer.wrap("收到\n".getBytes()); channel.write(outBuffer);// 将消息回送给客户端 } } } /** * 加入一个新的socket客户端 */ public void registerNewChannelTask(final SocketChannel channel){ final Selector selector = this.selector; registerTask(new Runnable() { @Override public void run() { try { //将客户端注册到selector中 channel.register(selector, SelectionKey.OP_READ); } catch (ClosedChannelException e) { e.printStackTrace(); } } }); } @Override protected int select(Selector selector) throws IOException { return selector.select(500); }}

ServerBootstrap

import java.net.SocketAddress;import java.nio.channels.ServerSocketChannel;import com.cn.pool.Boss;import com.cn.pool.NioSelectorRunnablePool;/** * 服务类 * * */public class ServerBootstrap {private NioSelectorRunnablePool selectorRunnablePool;	public ServerBootstrap(NioSelectorRunnablePool selectorRunnablePool) {		this.selectorRunnablePool = selectorRunnablePool;	}	/**	 * 绑定端口	 * @param localAddress	 */	public void bind(final SocketAddress localAddress){		try {			// 获得一个ServerSocket通道			ServerSocketChannel serverChannel = ServerSocketChannel.open();			// 设置通道为非阻塞			serverChannel.configureBlocking(false);			// 将该通道对应的ServerSocket绑定到port端口			serverChannel.socket().bind(localAddress);			//获取一个boss线程			Boss nextBoss = selectorRunnablePool.nextBoss();			//向boss注册一个ServerSocket通道			nextBoss.registerAcceptChannelTask(serverChannel);		} catch (Exception e) {			e.printStackTrace();		}	}}

Start

import java.net.InetSocketAddress;import java.util.concurrent.Executors;import com.cn.pool.NioSelectorRunnablePool;/** * 启动函数 * * */public class Start {	public static void main(String[] args) {		//初始化线程		NioSelectorRunnablePool nioSelectorRunnablePool = new NioSelectorRunnablePool(Executors.newCachedThreadPool(), Executors.newCachedThreadPool());		//获取服务类		ServerBootstrap bootstrap = new ServerBootstrap(nioSelectorRunnablePool);		//绑定端口		bootstrap.bind(new InetSocketAddress(10101));		System.out.println("start");	}}

转载地址:http://rdlnn.baihongyu.com/

你可能感兴趣的文章
线程池C++面向对象封装
查看>>
opencv -- 基础
查看>>
opencv--简小开发
查看>>
vs2019使用呢数据库远程连接Linux(完成版)
查看>>
01-Linux内核编程-设备和文件IO
查看>>
安装与使用说明-IDM
查看>>
1创建数据库
查看>>
狂神SSM数据库
查看>>
实现EasyExcel对Excel进行读操作(测试)
查看>>
课程分类添加功能
查看>>
教育项目课程模块2
查看>>
教育项目课程模块3(课程分类前端实现)
查看>>
课程管理需求+后端接口
查看>>
3搭建service模块
查看>>
4service_edu模块
查看>>
1查询所有讲师
查看>>
2逻辑删除
查看>>
3整合swagger进行接口测试
查看>>
4统一返回数据格式
查看>>
5使用R方法
查看>>