package chat.server.netty; import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE; import java.util.concurrent.TimeUnit; import org.apache.log4j.Logger; import chat.consts.BrokerConstants; import chat.server.Configer; import chat.server.moquette.BytesMetricsCollector; import chat.server.moquette.MessageMetricsCollector; import io.netty.bootstrap.ServerBootstrap; import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelInitializer; import io.netty.channel.EventLoopGroup; import io.netty.channel.epoll.EpollEventLoopGroup; import io.netty.channel.epoll.EpollServerSocketChannel; import io.netty.channel.nio.NioEventLoopGroup; import io.netty.channel.socket.ServerSocketChannel; import io.netty.channel.socket.SocketChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.concurrent.Future; public class NettyAcceptor { private static Logger logger; private static NettyAcceptor instance; private EventLoopGroup bossGroup; private EventLoopGroup workerGroup; private Class channelClass; private ChannelFuture httpChannelFuture; private ChannelFuture tcpChannelFuture; private ChannelFuture sslTCPChannelFuture; private ChannelFuture wsChannelFuture; private MessageMetricsCollector messageMetricsCollector; private BytesMetricsCollector bytesMetricsCollector; static { logger = Logger.getLogger(NettyAcceptor.class); } private NettyAcceptor() { } public static synchronized NettyAcceptor getInstance() { if (instance == null) { instance = new NettyAcceptor(); } return instance; } public static void start() throws Exception { instance = getInstance(); instance.doStart(); } public static void stop() { try { instance = getInstance(); instance.doStop(); } catch(Exception e) { logger.error(e); } } public void doStart() throws Exception { logger.info("Initializing Netty acceptor..."); //1. initLocalVariable(); //2. HTTP 短连接 initializeHttpTransport(); //3. TCP 长连接 initializePlainTCPTransport(); //4. SSL 长连接 initializeSSLTCPTransport(); //5. WebSocket 长连接 initializeWSSTransport(); logger.info("Netty acceptor initialized"); } public void doStop() { logger.info("Closing Netty acceptor..."); //1. closeOneChannel(httpChannelFuture); closeOneChannel(tcpChannelFuture); closeOneChannel(sslTCPChannelFuture); closeOneChannel(wsChannelFuture); //2. closeOneEventLoopGroup("worker group", workerGroup); closeOneEventLoopGroup("boss group", bossGroup); } private void initLocalVariable() { //1. boolean epoll = Configer.getBoolean(BrokerConstants.NETTY_EPOLL_PROPERTY_NAME, false); if (epoll) { // 由于目前只支持TCP MQTT, 所以bossGroup的线程数配置为1 logger.info("Netty is using Epoll"); bossGroup = new EpollEventLoopGroup(1); workerGroup = new EpollEventLoopGroup(); channelClass = EpollServerSocketChannel.class; } else { logger.info("Netty is using NIO"); bossGroup = new NioEventLoopGroup(1); workerGroup = new NioEventLoopGroup(); channelClass = NioServerSocketChannel.class; } //2. bytesMetricsCollector = new BytesMetricsCollector(); messageMetricsCollector = new MessageMetricsCollector(); } private void initializeHttpTransport() throws Exception { String protocol = "HTTP"; Integer port = Configer.getInteger("http_port", null); if (port == null) { logger.info("skip server, Protocol=" + protocol); return; } //2. create bootstrap logger.info("Initializing server. Protocol=" + protocol); NettyHttpChannelInitializer initializer = new NettyHttpChannelInitializer(); ServerBootstrap bootStrap = new NettyServerBootstrap(this, initializer); //3. Bind and start to accept incoming connections. httpChannelFuture = bootStrap.bind(port); httpChannelFuture.sync().addListener(FIRE_EXCEPTION_ON_FAILURE); logger.info("server bounded. Protocol=" + protocol + ": " + port); } private void initializePlainTCPTransport() throws Exception { //1. load host and port String protocol = "TCP"; Integer port = Configer.getInteger("port", null); if (port == null) { logger.info("skip server, Protocol=" + protocol); return; } //2. create bootstrap NettyTCPChannelInitializer initializer = new NettyTCPChannelInitializer(); initializer.setMessageMetricsCollector(messageMetricsCollector); initializer.setBytesMetricsCollector(bytesMetricsCollector); ServerBootstrap bootStrap = new NettyServerBootstrap(this, initializer); //3. Bind and start to accept incoming connections. tcpChannelFuture = bootStrap.bind(port); tcpChannelFuture.sync().addListener(FIRE_EXCEPTION_ON_FAILURE); logger.info("server bounded. Protocol=" + protocol + ": " + port); } private void initializeSSLTCPTransport() throws InterruptedException { //1. load host and port String protocol = "SSL"; Integer port = Configer.getInteger("ssl_port", null); if (port == null) { logger.info("skip server, Protocol=" + protocol); return; } //2. create bootstrap NettySSLChannelInitializer initializer = new NettySSLChannelInitializer(); initializer.setMessageMetricsCollector(messageMetricsCollector); initializer.setBytesMetricsCollector(bytesMetricsCollector); ServerBootstrap bootStrap = new NettyServerBootstrap(this, initializer); //3. Bind and start to accept incoming connections. sslTCPChannelFuture = bootStrap.bind(port); sslTCPChannelFuture.sync().addListener(FIRE_EXCEPTION_ON_FAILURE); logger.info("server bounded. Protocol=" + protocol + ": " + port); } private void initializeWSSTransport() throws Exception { //1. load host and port String protocol = "Secure Websocket"; Integer port = Configer.getInteger("ws_port", null); if (port == null) { logger.info("skip server, Protocol=" + protocol); return; } //2. create bootstrap ChannelInitializer initializer = new NettyWSChannelInitializer(); ServerBootstrap bootStrap = new NettyServerBootstrap(this, initializer); //3. Bind and start to accept incoming connections. wsChannelFuture = bootStrap.bind(port); wsChannelFuture.sync().addListener(FIRE_EXCEPTION_ON_FAILURE); logger.info("server bounded. Protocol=" + protocol + ": " + port); } private void closeOneChannel(ChannelFuture channelFuture) { if (channelFuture == null) { return; } Channel channel = channelFuture.channel(); try { channel.close(); } catch (Exception e) { } try { channel.closeFuture().sync(); } catch (Exception e) { } } private void closeOneEventLoopGroup(String name, EventLoopGroup loopGroup) { if (loopGroup == null) { return; } //1. close gracefully Future waiter = loopGroup.shutdownGracefully(); logger.info("Waiting for" + name + " event loop groups to terminate..."); try { waiter.await(3, TimeUnit.SECONDS); } catch (InterruptedException iex) { logger.warn("An InterruptedException was caught while waiting for event loops to terminate..."); } //2. close force if (!loopGroup.isTerminated()) { logger.warn("Forcing shutdown " + name + "..."); loopGroup.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS); } } public EventLoopGroup getBossGroup() { return bossGroup; } public EventLoopGroup getWorkerGroup() { return workerGroup; } public Class getChannelClass() { return channelClass; } }