package chat.server.netty;
|
|
import chat.server.Configer;
|
import chat.server.im.IMDispatcher;
|
import chat.server.moquette.BytesMetricsCollector;
|
import chat.server.moquette.MessageMetricsCollector;
|
import chat.server.moquette.MqttBytesMetricsHandler;
|
import chat.server.moquette.MqttDecodHandler;
|
import chat.server.moquette.MqttEncodHandler;
|
import chat.server.moquette.MqttIdleTimeoutHandler;
|
import chat.server.moquette.MqttMessageLoggerHandler;
|
import chat.server.moquette.MqttMessageMetricsHandler;
|
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelInitializer;
|
import io.netty.channel.ChannelPipeline;
|
import io.netty.channel.socket.SocketChannel;
|
import io.netty.handler.codec.http.HttpObjectAggregator;
|
import io.netty.handler.codec.http.HttpRequestDecoder;
|
import io.netty.handler.codec.http.HttpResponseEncoder;
|
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
|
import io.netty.handler.timeout.IdleStateHandler;
|
|
|
public class NettyWSChannelInitializer extends ChannelInitializer<SocketChannel> {
|
|
private static final String MQTT_SUBPROTOCOL_CSV_LIST = "mqtt, mqttv3.1, mqttv3.1.1";
|
private MessageMetricsCollector messageMetricsCollector;
|
private BytesMetricsCollector bytesMetricsCollector;
|
|
@Override
|
protected void initChannel(SocketChannel channel) throws Exception {
|
int timeoutSeconds = Configer.getInteger("timeout", 10);
|
|
ChannelPipeline pipeline = channel.pipeline();
|
|
ChannelHandler bytesMetricsHandler = new MqttBytesMetricsHandler(bytesMetricsCollector);
|
ChannelHandler idleStateHandler = new IdleStateHandler(timeoutSeconds, 0, 0);
|
ChannelHandler timeoutHandler = new MqttIdleTimeoutHandler();
|
|
ChannelHandler sslHandler = createSSLHandler();
|
ChannelHandler httpEncoder = new HttpResponseEncoder();
|
ChannelHandler httpDecoder = new HttpRequestDecoder();
|
ChannelHandler aggregator = new HttpObjectAggregator(65536);
|
ChannelHandler webSocketHandler = new WebSocketServerProtocolHandler("/org/fusesource/mqtt", MQTT_SUBPROTOCOL_CSV_LIST);
|
|
ChannelHandler decoder = new MqttDecodHandler();
|
ChannelHandler encoder = MqttEncodHandler.getInstance();
|
ChannelHandler messageMetricsHandler = new MqttMessageMetricsHandler(messageMetricsCollector);
|
ChannelHandler messageLogger = new MqttMessageLoggerHandler();
|
ChannelHandler dispatcher = new IMDispatcher();
|
|
pipeline.addFirst("bytemetrics", bytesMetricsHandler);
|
pipeline.addLast("idleStateHandler", idleStateHandler);
|
pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler);
|
|
pipeline.addLast("ssl", sslHandler);
|
pipeline.addLast("httpEncoder", httpEncoder);
|
pipeline.addLast("httpDecoder", httpDecoder);
|
pipeline.addLast("aggregator", aggregator);
|
pipeline.addLast("webSocketHandler", webSocketHandler);
|
pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder());
|
pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder());
|
|
pipeline.addLast("decoder", decoder);
|
pipeline.addLast("encoder", encoder);
|
pipeline.addLast("metrics", messageMetricsHandler);
|
pipeline.addLast("messageLogger", messageLogger);
|
pipeline.addLast("handler", dispatcher);
|
}
|
|
public void setMessageMetricsCollector(MessageMetricsCollector messageMetricsCollector) {
|
this.messageMetricsCollector = messageMetricsCollector;
|
}
|
|
public void setBytesMetricsCollector(BytesMetricsCollector bytesMetricsCollector) {
|
this.bytesMetricsCollector = bytesMetricsCollector;
|
}
|
|
private ChannelHandler createSSLHandler() {
|
/*
|
* SSLContext sslContext = sslCtxCreator.initSSLContext(); SSLEngine sslEngine =
|
* sslContext.createSSLEngine(); sslEngine.setUseClientMode(false);
|
* sslEngine.setNeedClientAuth(false);
|
*
|
* return new SslHandler(sslEngine);
|
*/
|
return null;
|
}
|
}
|