1.netty 客户端到服务端数据交互
2.源码分析
1.服务端启动
b.bind(8099).sync() 服务启动调用bind方法,然后到register方法,此时EventLoop执行一个任务,任务是调用register0
EventLoop.execute方法添加任务到队列中, 并开启Reactor线程
线程开启计算策略
public int calculateStrategy(IntSupplier selectSupplier, boolean hasTasks) throws Exception {
return hasTasks ? selectSupplier.get() : SelectStrategy.SELECT;
}
private final IntSupplier selectNowSupplier = new IntSupplier() {
@Override
public int get() throws Exception {
return selectNow();
}
};
selectSupplier.get selectNow方法 。策略计算完成后 如果不符合判断语句,则执行runalltasks。在前面启动线程的时候 添加了一个runnable,任务是register0方法。执行register方法将通道注册到selector上
没有任务后线程将阻塞在KQueue.poll(kqfd, pollArrayAddress, MAX_KEVENTS, to);上 等待客户端连接
strategy>0 处理key
else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
processSelectedKeys();
} finally {
// Ensure we always run tasks.
final long ioTime = System.nanoTime() - ioStartTime;
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
处理读写事件
读message ,accept socket 并包装成NioSocketChannel,然后继续调用
pipeline.fireChannelRead(readBuf.get(i)); ->执行AbstractChannelHandlerContext.invokeChannelRead() ,循环执行handler的channelRead方法
执行到ServerBootstrap$ServerBootstrapAcceptor.channelRead方法时,会将当前通道注册到workGroup上
2.客户端启动
客户端启动和服务端启动基本一致,当处理processSelectedKey时,会进入链接事件
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
3.类的继承关系
NioEventLoop继承关系
Promise继承关系,Promise扩展了Future,添加了异步通知机制
4.ByteBuf
public static void main(String[] args) {
// 创建一个大小为 5 ByteBuf
ByteBuf buf = Unpooled.buffer(5);
// 写入数据
buf.writeBytes(new byte[]{1,3,3});
// 读取数据
if (buf.isReadable()) {
System.out.println(buf.readByte());
}
buf.discardReadBytes();
buf.writeBytes(new byte[]{2});
// 释放资源
buf.release();
}
分享到: