package chat.server.netty;
|
|
import static io.netty.channel.ChannelFutureListener.FIRE_EXCEPTION_ON_FAILURE;
|
|
import java.util.concurrent.TimeUnit;
|
|
import org.apache.log4j.Logger;
|
|
import chat.consts.BrokerConstants;
|
import chat.server.Configer;
|
import chat.server.moquette.BytesMetricsCollector;
|
import chat.server.moquette.MessageMetricsCollector;
|
import io.netty.bootstrap.ServerBootstrap;
|
import io.netty.channel.Channel;
|
import io.netty.channel.ChannelFuture;
|
import io.netty.channel.ChannelInitializer;
|
import io.netty.channel.EventLoopGroup;
|
import io.netty.channel.epoll.EpollEventLoopGroup;
|
import io.netty.channel.epoll.EpollServerSocketChannel;
|
import io.netty.channel.nio.NioEventLoopGroup;
|
import io.netty.channel.socket.ServerSocketChannel;
|
import io.netty.channel.socket.SocketChannel;
|
import io.netty.channel.socket.nio.NioServerSocketChannel;
|
import io.netty.util.concurrent.Future;
|
|
public class NettyAcceptor {
|
|
private static Logger logger;
|
|
private static NettyAcceptor instance;
|
private EventLoopGroup bossGroup;
|
private EventLoopGroup workerGroup;
|
|
private Class<? extends ServerSocketChannel> channelClass;
|
private ChannelFuture httpChannelFuture;
|
private ChannelFuture tcpChannelFuture;
|
private ChannelFuture sslTCPChannelFuture;
|
private ChannelFuture wsChannelFuture;
|
|
private MessageMetricsCollector messageMetricsCollector;
|
private BytesMetricsCollector bytesMetricsCollector;
|
|
|
static {
|
logger = Logger.getLogger(NettyAcceptor.class);
|
}
|
|
private NettyAcceptor() {
|
|
}
|
|
public static synchronized NettyAcceptor getInstance() {
|
if (instance == null) {
|
instance = new NettyAcceptor();
|
}
|
|
return instance;
|
}
|
|
public static void start() throws Exception {
|
instance = getInstance();
|
instance.doStart();
|
}
|
|
public static void stop() {
|
try {
|
instance = getInstance();
|
instance.doStop();
|
}
|
catch(Exception e) {
|
logger.error(e);
|
}
|
}
|
|
public void doStart() throws Exception {
|
logger.info("Initializing Netty acceptor...");
|
|
//1.
|
initLocalVariable();
|
|
//2. HTTP 短连接
|
initializeHttpTransport();
|
|
//3. TCP 长连接
|
initializePlainTCPTransport();
|
|
//4. SSL 长连接
|
initializeSSLTCPTransport();
|
|
//5. WebSocket 长连接
|
initializeWSSTransport();
|
|
logger.info("Netty acceptor initialized");
|
}
|
|
public void doStop() {
|
logger.info("Closing Netty acceptor...");
|
|
//1.
|
closeOneChannel(httpChannelFuture);
|
closeOneChannel(tcpChannelFuture);
|
closeOneChannel(sslTCPChannelFuture);
|
closeOneChannel(wsChannelFuture);
|
|
//2.
|
closeOneEventLoopGroup("worker group", workerGroup);
|
closeOneEventLoopGroup("boss group", bossGroup);
|
}
|
|
private void initLocalVariable() {
|
//1.
|
boolean epoll = Configer.getBoolean(BrokerConstants.NETTY_EPOLL_PROPERTY_NAME, false);
|
if (epoll) {
|
// 由于目前只支持TCP MQTT, 所以bossGroup的线程数配置为1
|
logger.info("Netty is using Epoll");
|
bossGroup = new EpollEventLoopGroup(1);
|
workerGroup = new EpollEventLoopGroup();
|
channelClass = EpollServerSocketChannel.class;
|
}
|
else {
|
logger.info("Netty is using NIO");
|
bossGroup = new NioEventLoopGroup(1);
|
workerGroup = new NioEventLoopGroup();
|
channelClass = NioServerSocketChannel.class;
|
}
|
|
//2.
|
bytesMetricsCollector = new BytesMetricsCollector();
|
messageMetricsCollector = new MessageMetricsCollector();
|
}
|
|
private void initializeHttpTransport() throws Exception {
|
String protocol = "HTTP";
|
Integer port = Configer.getInteger("http_port", null);
|
|
if (port == null) {
|
logger.info("skip server, Protocol=" + protocol);
|
return;
|
}
|
|
//2. create bootstrap
|
logger.info("Initializing server. Protocol=" + protocol);
|
NettyHttpChannelInitializer initializer = new NettyHttpChannelInitializer();
|
ServerBootstrap bootStrap = new NettyServerBootstrap(this, initializer);
|
|
//3. Bind and start to accept incoming connections.
|
httpChannelFuture = bootStrap.bind(port);
|
httpChannelFuture.sync().addListener(FIRE_EXCEPTION_ON_FAILURE);
|
|
logger.info("server bounded. Protocol=" + protocol + ": " + port);
|
}
|
|
private void initializePlainTCPTransport() throws Exception {
|
//1. load host and port
|
String protocol = "TCP";
|
Integer port = Configer.getInteger("port", null);
|
|
if (port == null) {
|
logger.info("skip server, Protocol=" + protocol);
|
return;
|
}
|
|
//2. create bootstrap
|
NettyTCPChannelInitializer initializer = new NettyTCPChannelInitializer();
|
initializer.setMessageMetricsCollector(messageMetricsCollector);
|
initializer.setBytesMetricsCollector(bytesMetricsCollector);
|
|
ServerBootstrap bootStrap = new NettyServerBootstrap(this, initializer);
|
|
//3. Bind and start to accept incoming connections.
|
tcpChannelFuture = bootStrap.bind(port);
|
tcpChannelFuture.sync().addListener(FIRE_EXCEPTION_ON_FAILURE);
|
|
logger.info("server bounded. Protocol=" + protocol + ": " + port);
|
}
|
|
private void initializeSSLTCPTransport() throws InterruptedException {
|
//1. load host and port
|
String protocol = "SSL";
|
Integer port = Configer.getInteger("ssl_port", null);
|
|
if (port == null) {
|
logger.info("skip server, Protocol=" + protocol);
|
return;
|
}
|
|
//2. create bootstrap
|
NettySSLChannelInitializer initializer = new NettySSLChannelInitializer();
|
initializer.setMessageMetricsCollector(messageMetricsCollector);
|
initializer.setBytesMetricsCollector(bytesMetricsCollector);
|
|
ServerBootstrap bootStrap = new NettyServerBootstrap(this, initializer);
|
|
//3. Bind and start to accept incoming connections.
|
sslTCPChannelFuture = bootStrap.bind(port);
|
sslTCPChannelFuture.sync().addListener(FIRE_EXCEPTION_ON_FAILURE);
|
|
logger.info("server bounded. Protocol=" + protocol + ": " + port);
|
}
|
|
private void initializeWSSTransport() throws Exception {
|
//1. load host and port
|
String protocol = "Secure Websocket";
|
Integer port = Configer.getInteger("ws_port", null);
|
|
if (port == null) {
|
logger.info("skip server, Protocol=" + protocol);
|
return;
|
}
|
|
//2. create bootstrap
|
ChannelInitializer<SocketChannel> initializer = new NettyWSChannelInitializer();
|
ServerBootstrap bootStrap = new NettyServerBootstrap(this, initializer);
|
|
//3. Bind and start to accept incoming connections.
|
wsChannelFuture = bootStrap.bind(port);
|
wsChannelFuture.sync().addListener(FIRE_EXCEPTION_ON_FAILURE);
|
|
logger.info("server bounded. Protocol=" + protocol + ": " + port);
|
}
|
|
private void closeOneChannel(ChannelFuture channelFuture) {
|
if (channelFuture == null) {
|
return;
|
}
|
|
Channel channel = channelFuture.channel();
|
try {
|
channel.close();
|
} catch (Exception e) {
|
}
|
|
try {
|
channel.closeFuture().sync();
|
} catch (Exception e) {
|
}
|
}
|
|
private void closeOneEventLoopGroup(String name, EventLoopGroup loopGroup) {
|
if (loopGroup == null) {
|
return;
|
}
|
|
//1. close gracefully
|
Future<?> waiter = loopGroup.shutdownGracefully();
|
|
logger.info("Waiting for" + name + " event loop groups to terminate...");
|
try {
|
waiter.await(3, TimeUnit.SECONDS);
|
}
|
catch (InterruptedException iex) {
|
logger.warn("An InterruptedException was caught while waiting for event loops to terminate...");
|
}
|
|
//2. close force
|
if (!loopGroup.isTerminated()) {
|
logger.warn("Forcing shutdown " + name + "...");
|
loopGroup.shutdownGracefully(0L, 0L, TimeUnit.MILLISECONDS);
|
}
|
}
|
|
public EventLoopGroup getBossGroup() {
|
return bossGroup;
|
}
|
|
public EventLoopGroup getWorkerGroup() {
|
return workerGroup;
|
}
|
|
public Class<? extends ServerSocketChannel> getChannelClass() {
|
return channelClass;
|
}
|
|
}
|