package chat.server.netty; import chat.server.Configer; import chat.server.im.IMDispatcher; import chat.server.moquette.BytesMetricsCollector; import chat.server.moquette.MessageMetricsCollector; import chat.server.moquette.MqttBytesMetricsHandler; import chat.server.moquette.MqttDecodHandler; import chat.server.moquette.MqttEncodHandler; import chat.server.moquette.MqttIdleTimeoutHandler; import chat.server.moquette.MqttMessageLoggerHandler; import chat.server.moquette.MqttMessageMetricsHandler; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelInitializer; import io.netty.channel.ChannelPipeline; import io.netty.channel.socket.SocketChannel; import io.netty.handler.codec.http.HttpObjectAggregator; import io.netty.handler.codec.http.HttpRequestDecoder; import io.netty.handler.codec.http.HttpResponseEncoder; import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler; import io.netty.handler.timeout.IdleStateHandler; public class NettyWSChannelInitializer extends ChannelInitializer { private static final String MQTT_SUBPROTOCOL_CSV_LIST = "mqtt, mqttv3.1, mqttv3.1.1"; private MessageMetricsCollector messageMetricsCollector; private BytesMetricsCollector bytesMetricsCollector; @Override protected void initChannel(SocketChannel channel) throws Exception { int timeoutSeconds = Configer.getInteger("timeout", 10); ChannelPipeline pipeline = channel.pipeline(); ChannelHandler bytesMetricsHandler = new MqttBytesMetricsHandler(bytesMetricsCollector); ChannelHandler idleStateHandler = new IdleStateHandler(timeoutSeconds, 0, 0); ChannelHandler timeoutHandler = new MqttIdleTimeoutHandler(); ChannelHandler sslHandler = createSSLHandler(); ChannelHandler httpEncoder = new HttpResponseEncoder(); ChannelHandler httpDecoder = new HttpRequestDecoder(); ChannelHandler aggregator = new HttpObjectAggregator(65536); ChannelHandler webSocketHandler = new WebSocketServerProtocolHandler("/org/fusesource/mqtt", MQTT_SUBPROTOCOL_CSV_LIST); ChannelHandler decoder = new MqttDecodHandler(); ChannelHandler encoder = MqttEncodHandler.getInstance(); ChannelHandler messageMetricsHandler = new MqttMessageMetricsHandler(messageMetricsCollector); ChannelHandler messageLogger = new MqttMessageLoggerHandler(); ChannelHandler dispatcher = new IMDispatcher(); pipeline.addFirst("bytemetrics", bytesMetricsHandler); pipeline.addLast("idleStateHandler", idleStateHandler); pipeline.addAfter("idleStateHandler", "idleEventHandler", timeoutHandler); pipeline.addLast("ssl", sslHandler); pipeline.addLast("httpEncoder", httpEncoder); pipeline.addLast("httpDecoder", httpDecoder); pipeline.addLast("aggregator", aggregator); pipeline.addLast("webSocketHandler", webSocketHandler); pipeline.addLast("ws2bytebufDecoder", new WebSocketFrameToByteBufDecoder()); pipeline.addLast("bytebuf2wsEncoder", new ByteBufToWebSocketFrameEncoder()); pipeline.addLast("decoder", decoder); pipeline.addLast("encoder", encoder); pipeline.addLast("metrics", messageMetricsHandler); pipeline.addLast("messageLogger", messageLogger); pipeline.addLast("handler", dispatcher); } public void setMessageMetricsCollector(MessageMetricsCollector messageMetricsCollector) { this.messageMetricsCollector = messageMetricsCollector; } public void setBytesMetricsCollector(BytesMetricsCollector bytesMetricsCollector) { this.bytesMetricsCollector = bytesMetricsCollector; } private ChannelHandler createSSLHandler() { /* * SSLContext sslContext = sslCtxCreator.initSSLContext(); SSLEngine sslEngine = * sslContext.createSSLEngine(); sslEngine.setUseClientMode(false); * sslEngine.setNeedClientAuth(false); * * return new SslHandler(sslEngine); */ return null; } }