package chat.server.moquette; import org.apache.log4j.Logger; import io.netty.channel.Channel; import io.netty.channel.ChannelDuplexHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.channel.ChannelPromise; import io.netty.util.Attribute; import io.netty.util.AttributeKey; public class MqttMessageMetricsHandler extends ChannelDuplexHandler { private static Logger logger; private static AttributeKey ATTR_KEY_METRICS; private static AttributeKey ATTR_KEY_USERNAME; private MessageMetricsCollector collector; static { logger = Logger.getLogger(MqttMessageMetricsHandler.class); ATTR_KEY_METRICS = AttributeKey.valueOf("MessageMetrics"); ATTR_KEY_USERNAME = AttributeKey.valueOf("username"); } public MqttMessageMetricsHandler(MessageMetricsCollector collector) { this.collector = collector; } @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { Attribute attr = ctx.channel().attr(ATTR_KEY_METRICS); attr.set(new Metrics()); super.channelActive(ctx); } @Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { Metrics metrics = ctx.channel().attr(ATTR_KEY_METRICS).get(); metrics.incrementRead(1); ctx.fireChannelRead(msg); } @Override public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { Metrics metrics = ctx.channel().attr(ATTR_KEY_METRICS).get(); metrics.incrementWrote(1); ctx.write(msg, promise); } @Override public void close(ChannelHandlerContext ctx, ChannelPromise promise) throws Exception { Metrics metrics = ctx.channel().attr(ATTR_KEY_METRICS).get(); String userId = ctx.channel().attr(ATTR_KEY_USERNAME).get(); if (userId == null) { userId = ""; } logger.info("channel<" + userId + "> closing after " + metrics); collector.sumReadMessages(metrics.readLength()); collector.sumWroteMessages(metrics.wroteLength()); super.close(ctx, promise); } public static Metrics getMessageMetrics(Channel channel) { return channel.attr(ATTR_KEY_METRICS).get(); } }