实践netty

server

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.allinpay.io.framework.netty.socket.Constants;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.logging.LogLevel;
import io.netty.handler.logging.LoggingHandler;

public class NettyTcpServer {

	private static final Logger logger = LoggerFactory.getLogger(NettyTcpServer.class);

	private EventLoopGroup bossGroup;

	private EventLoopGroup workerGroup;

	private ServerBootstrap bootstrap;

	/**
	 * 服务端监听地址
	 */
	private String localHost;

	/**
	 * 服务端监听端口
	 */
	private int localPort;

	/**
	 * 同一个端口是否支持多连接
	 */
	private String whetherMultiConnection = Constants.SUPPORT_MULTI_CONNECTION;

	/**
	 * 服务端Channel,负责监听
	 */
	private Channel serverChannel;

	/**
	 * 服务端业务处理器
	 */
	private ChannelHandler serverChannelHandlerInitializer;

	/**
	 * 关闭服务
	 */
	public void close() {
		bossGroup.shutdownGracefully();
		workerGroup.shutdownGracefully();
		logger.info("Stopped Netty Tcp Server: " + localHost + ":" + localPort);
	}

	/**
	 * 初始化并启动监听
	 */
	public void init() {
		// 可指定线程数
		bossGroup = new NioEventLoopGroup();
        // 可指定线程数
		workerGroup = new NioEventLoopGroup();
		bootstrap = new ServerBootstrap();
		bootstrap.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class);

		bootstrap.option(ChannelOption.SO_KEEPALIVE, true);
		bootstrap.handler(new LoggingHandler(LogLevel.DEBUG));

		bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
			@Override
			protected void initChannel(SocketChannel ch) throws Exception {
				ch.pipeline().addLast(new LoggingHandler(LogLevel.DEBUG));
				ch.pipeline().addLast(new ChannelHandlerAdapter() {
					@Override
					public void channelInactive(ChannelHandlerContext ctx) throws Exception {
						InetSocketAddress remoteInetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
						logger.info("连接被关闭:" + remoteInetSocketAddress.getAddress().getHostAddress() + ":"
								+ remoteInetSocketAddress.getPort() + " -> " + localHost + ":" + localPort);
						if (Constants.NOT_SUPPORT_MULTI_CONNECTION.equals(whetherMultiConnection)) {
							// 连接断开,开启绑定监听
							doBind();
						}
					}

					@Override
					public void channelActive(ChannelHandlerContext ctx) throws Exception {
						InetSocketAddress remoteInetSocketAddress = (InetSocketAddress) ctx.channel().remoteAddress();
						logger.info("被连接成功:" + remoteInetSocketAddress.getAddress().getHostAddress() + ":"
								+ remoteInetSocketAddress.getPort() + " -> " + localHost + ":" + localPort);
						if (Constants.NOT_SUPPORT_MULTI_CONNECTION.equals(whetherMultiConnection)) {
							// 连接建立,取消绑定监听
							unBind();
						}
					}
				}).addLast(serverChannelHandlerInitializer);
			}
		});

		doBind();
	}

	/**
	 * 执行绑定,开启监听
	 */
	protected void doBind() {
		logger.debug("start to listening: " + localHost + ":" + localPort);
		bootstrap.bind(localHost, localPort).addListener(new ChannelFutureListener() {
			@Override
			public void operationComplete(ChannelFuture bindChannelFuture) throws Exception {
				if (bindChannelFuture.isSuccess()) {
					// 绑定成功,设置服务端监听Channel
					serverChannel = bindChannelFuture.channel();
					logger.info("start listening successfully: " + localHost + ":" + localPort);
				} else {
					logger.error("failed in starting listening : " + localHost + ":" + localPort,
							bindChannelFuture.cause());
					// 失败重试
					bindChannelFuture.channel().eventLoop().schedule(new Runnable() {
						@Override
						public void run() {
							doBind();
						}
					}, Constants.RE_BIND_INTERVAL, TimeUnit.SECONDS);
				}
			}
		});
	}

	/**
	 * 解除绑定,取消监听
	 */
	protected void unBind() {
		logger.debug("stop to listening: " + localHost + ":" + localPort);
		serverChannel.close().addListener(new ChannelFutureListener() {
			@Override
			public void operationComplete(ChannelFuture closeChannelFuture) throws Exception {
				if (closeChannelFuture.isSuccess()) {
					logger.info("stop listening successfully: " + localHost + ":" + localPort);
				} else {
					logger.error("failed in stopping listening: " + localHost + ":" + localPort,
							closeChannelFuture.cause());
				}
			}
		});
	}

	public String getLocalHost() {
		return localHost;
	}

	public void setLocalHost(String localHost) {
		this.localHost = localHost;
	}

	public int getLocalPort() {
		return localPort;
	}

	public void setLocalPort(int localPort) {
		this.localPort = localPort;
	}

	public ChannelHandler getServerChannelHandlerInitializer() {
		return serverChannelHandlerInitializer;
	}

	public void setServerChannelHandlerInitializer(ChannelHandler serverChannelHandlerInitializer) {
		this.serverChannelHandlerInitializer = serverChannelHandlerInitializer;
	}

	public String getWhetherMultiConnection() {
		return whetherMultiConnection;
	}

	public void setWhetherMultiConnection(String whetherMultiConnection) {
		this.whetherMultiConnection = whetherMultiConnection;
	}

}

EpollServerSocketChannel

client

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
import java.net.InetSocketAddress;
import java.util.concurrent.TimeUnit;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

public class NettyTcpClient {

	private static final Logger logger = LoggerFactory.getLogger(NettyTcpClient.class);

	/**
	 * 远程服务端地址
	 */
	private String remoteServerHost;

	/**
	 * 远程服务端端口
	 */
	private int remoteServerPort;

	/**
	 * 重连时间间隔,单位:毫秒
	 * 
	 * 注:小于等于0则不需要重连
	 */
	private long reConnnectInterval = 3 * 1000L;

	private volatile EventLoopGroup workerGroup;

	private volatile Bootstrap bootstrap;

	private ChannelHandler clientChannelHandlerInitializer;

	public void close() {
		workerGroup.shutdownGracefully();
	}

	public void init() {
        // 可指定线程数
		workerGroup = new NioEventLoopGroup();
		bootstrap = new Bootstrap();
		bootstrap.group(workerGroup).channel(NioSocketChannel.class);

		bootstrap.handler(new ChannelInitializer<SocketChannel>() {
			@Override
			public void initChannel(SocketChannel ch) throws Exception {
				// 定时重连
				ch.pipeline().addLast(new ChannelHandlerAdapter() {
					@Override
					public void channelInactive(ChannelHandlerContext ctx) throws Exception {
						logger.error("连接被关闭" + ctx.channel().toString());
						scheduleConnect();
					}

					@Override
					public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
						logger.warn("发生异常,关闭连接" + ctx.channel().toString(), cause);
						// 断开连接
						ctx.close();
					}
				});
				ch.pipeline().addLast(clientChannelHandlerInitializer);
			}
		});
		doConnect();
	}

	private void doConnect() {
		logger.info("开始连接:" + getServerInfo());
		bootstrap.connect(new InetSocketAddress(remoteServerHost, remoteServerPort)).addListener(new ChannelFutureListener() {
			public void operationComplete(ChannelFuture f) throws Exception {
				if (f.isSuccess()) {
					InetSocketAddress localInetSocketAddress = (InetSocketAddress) f.channel().localAddress();
					logger.info("连接成功:" + localInetSocketAddress.getAddress().getHostAddress() + ":"
							+ localInetSocketAddress.getPort() + " -> " + getServerInfo());
				} else {
					logger.error("连接失败:" + getServerInfo() + ",原因:" + f.cause());
					// 启动连接失败时定时重连
					scheduleConnect();
				}
			}
		});
	}

	/**
	 * 定时重连
	 */
	private void scheduleConnect() {
		if (reConnnectInterval > 0) {
			workerGroup.schedule(new Runnable() {
				@Override
				public void run() {
					doConnect();
				}
			}, reConnnectInterval, TimeUnit.MILLISECONDS);
		}
	}

	private String getServerInfo() {
		return String.format("%s:%d", remoteServerHost, remoteServerPort);
	}

	public EventLoopGroup getWorkerGroup() {
		return workerGroup;
	}

	public Bootstrap getBootstrap() {
		return bootstrap;
	}

	public String getRemoteServerHost() {
		return remoteServerHost;
	}

	public void setRemoteServerHost(String remoteServerHost) {
		this.remoteServerHost = remoteServerHost;
	}

	public int getRemoteServerPort() {
		return remoteServerPort;
	}

	public void setRemoteServerPort(int remoteServerPort) {
		this.remoteServerPort = remoteServerPort;
	}

	public long getReConnnectInterval() {
		return reConnnectInterval;
	}

	public void setReConnnectInterval(long reConnnectInterval) {
		this.reConnnectInterval = reConnnectInterval;
	}

	public ChannelHandler getClientChannelHandlerInitializer() {
		return clientChannelHandlerInitializer;
	}

	public void setClientChannelHandlerInitializer(ChannelHandler clientChannelHandlerInitializer) {
		this.clientChannelHandlerInitializer = clientChannelHandlerInitializer;
	}

}

NioEventLoop

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
@Override
    protected void run() {
}

private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
        final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
        if (!k.isValid()) {
            final EventLoop eventLoop;
            try {
                eventLoop = ch.eventLoop();
            } catch (Throwable ignored) {
                // If the channel implementation throws an exception because there is no event loop, we ignore this
                // because we are only trying to determine if ch is registered to this event loop and thus has authority
                // to close ch.
                return;
            }
            // Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
            // and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
            // still healthy and should not be closed.
            // See https://github.com/netty/netty/issues/5125
            if (eventLoop == this) {
                // close the channel if the key is not valid anymore
                unsafe.close(unsafe.voidPromise());
            }
            return;
        }

        try {
            int readyOps = k.readyOps();
            // We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
            // the NIO JDK channel implementation may throw a NotYetConnectedException.
            if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
                // remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
                // See https://github.com/netty/netty/issues/924
                int ops = k.interestOps();
                ops &= ~SelectionKey.OP_CONNECT;
                k.interestOps(ops);

                unsafe.finishConnect();
            }

            // Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                // Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
                ch.unsafe().forceFlush();
            }

            // Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
            // to a spin loop
            if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
                unsafe.read();
            }
        } catch (CancelledKeyException ignored) {
            unsafe.close(unsafe.voidPromise());
        }
    }

EpollSocketChannel