package chat.server.moquette; import java.util.List; import org.apache.log4j.Logger; import chat.server.moquette.message.MqttConnAckMessage; import chat.server.moquette.message.MqttConnectAckPayload; import chat.server.moquette.message.MqttConnectMessage; import chat.server.moquette.message.MqttConnectPayload; import chat.server.moquette.message.MqttConnectVariableHeader; import chat.server.moquette.message.MqttFixedHeader; import chat.server.moquette.message.MqttMessage; import chat.server.moquette.message.MqttMessageIdVariableHeader; import chat.server.moquette.message.MqttPublishMessage; import chat.server.moquette.message.MqttPublishVariableHeader; import chat.server.moquette.message.MqttSubAckMessage; import chat.server.moquette.message.MqttSubscribeMessage; import chat.server.moquette.message.MqttSubscribePayload; import chat.server.moquette.message.MqttTopicSubscription; import chat.server.moquette.message.MqttUnsubscribeMessage; import chat.server.moquette.message.MqttUnsubscribePayload; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufAllocator; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelHandlerContext; import io.netty.handler.codec.MessageToMessageEncoder; import io.netty.util.CharsetUtil; import io.netty.util.internal.EmptyArrays; public class MqttEncodHandler extends MessageToMessageEncoder { public static final MqttEncodHandler INSTANCE = new MqttEncodHandler(); private static Logger logger = Logger.getLogger(MqttEncodHandler.class); public MqttEncodHandler() { } public static ChannelHandler getInstance() { return INSTANCE; } @Override protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List out) throws Exception { //logger.debug("begin encode"); out.add(doEncode(ctx.alloc(), msg)); } /** * This is the main encoding method. * It's only visible for testing. * * @param byteBufAllocator Allocates ByteBuf * @param message MQTT message to encode * @return ByteBuf with encoded bytes */ static ByteBuf doEncode(ByteBufAllocator byteBufAllocator, MqttMessage message) { try { switch (message.fixedHeader().messageType()) { case CONNECT: logger.debug("encode CONNECT"); return encodeConnectMessage(byteBufAllocator, (MqttConnectMessage) message); case CONNACK: logger.debug("encode CONNACK"); return encodeConnAckMessage(byteBufAllocator, (MqttConnAckMessage) message); case PUBLISH: logger.debug("encode PUBLISH"); return encodePublishMessage(byteBufAllocator, (MqttPublishMessage) message); case SUBSCRIBE: logger.debug("encode SUBSCRIBE"); return encodeSubscribeMessage(byteBufAllocator, (MqttSubscribeMessage) message); case UNSUBSCRIBE: logger.debug("encode UNSUBSCRIBE"); return encodeUnsubscribeMessage(byteBufAllocator, (MqttUnsubscribeMessage) message); case SUBACK: logger.debug("encode SUBACK"); return encodeSubAckMessage(byteBufAllocator, (MqttSubAckMessage) message); case UNSUBACK: case PUBACK: case PUBREC: case PUBREL: case PUBCOMP: logger.debug("encode PUBCOMP"); return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(byteBufAllocator, message); case PINGREQ: case PINGRESP: case DISCONNECT: logger.debug("encode DISCONNECT"); return encodeMessageWithOnlySingleByteFixedHeader(byteBufAllocator, message); default: throw new IllegalArgumentException( "Unknown message type: " + message.fixedHeader().messageType().value()); } } catch (Exception e) { e.printStackTrace(); return null; } } private static ByteBuf encodeConnectMessage( ByteBufAllocator byteBufAllocator, MqttConnectMessage message) { logger.debug("encode encodeConnectMessage"); int payloadBufferSize = 0; MqttFixedHeader mqttFixedHeader = message.fixedHeader(); MqttConnectVariableHeader variableHeader = message.variableHeader(); MqttConnectPayload payload = message.payload(); MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(), (byte) variableHeader.version()); // Client id String clientIdentifier = payload.clientIdentifier(); if (!MqttCodecUtil.isValidClientId(mqttVersion, clientIdentifier)) { throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier); } byte[] clientIdentifierBytes = encodeStringUtf8(clientIdentifier); payloadBufferSize += 2 + clientIdentifierBytes.length; // Will topic and message String willTopic = payload.willTopic(); byte[] willTopicBytes = willTopic != null ? encodeStringUtf8(willTopic) : EmptyArrays.EMPTY_BYTES; String willMessage = payload.willMessage(); byte[] willMessageBytes = willMessage != null ? encodeStringUtf8(willMessage) : EmptyArrays.EMPTY_BYTES; if (variableHeader.isWillFlag()) { payloadBufferSize += 2 + willTopicBytes.length; payloadBufferSize += 2 + willMessageBytes.length; } String userName = payload.userName(); byte[] userNameBytes = userName != null ? encodeStringUtf8(userName) : EmptyArrays.EMPTY_BYTES; if (variableHeader.hasUserName()) { payloadBufferSize += 2 + userNameBytes.length; } byte[] passwordBytes = payload.password(); if (variableHeader.hasPassword()) { payloadBufferSize += 2 + passwordBytes.length; } // Fixed header byte[] protocolNameBytes = mqttVersion.protocolNameBytes(); int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4; int variablePartSize = variableHeaderBufferSize + payloadBufferSize; int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); writeVariableLengthInt(buf, variablePartSize); buf.writeShort(protocolNameBytes.length); buf.writeBytes(protocolNameBytes); buf.writeByte(variableHeader.version()); buf.writeByte(getConnVariableHeaderFlag(variableHeader)); buf.writeShort(variableHeader.keepAliveTimeSeconds()); // Payload buf.writeShort(clientIdentifierBytes.length); buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length); if (variableHeader.isWillFlag()) { buf.writeShort(willTopicBytes.length); buf.writeBytes(willTopicBytes, 0, willTopicBytes.length); buf.writeShort(willMessageBytes.length); buf.writeBytes(willMessageBytes, 0, willMessageBytes.length); } if (variableHeader.hasUserName()) { buf.writeShort(userNameBytes.length); buf.writeBytes(userNameBytes, 0, userNameBytes.length); } if (variableHeader.hasPassword()) { buf.writeShort(passwordBytes.length); buf.writeBytes(passwordBytes, 0, passwordBytes.length); } return buf; } private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) { logger.debug("encode getConnVariableHeaderFlag"); int flagByte = 0; if (variableHeader.hasUserName()) { flagByte |= 0x80; } if (variableHeader.hasPassword()) { flagByte |= 0x40; } if (variableHeader.isWillRetain()) { flagByte |= 0x20; } flagByte |= (variableHeader.willQos() & 0x03) << 3; if (variableHeader.isWillFlag()) { flagByte |= 0x04; } if (variableHeader.isCleanSession()) { flagByte |= 0x02; } return flagByte; } private static ByteBuf encodeConnAckMessage( ByteBufAllocator byteBufAllocator, MqttConnAckMessage message) { logger.debug("encode encodeConnAckMessage"); int length = 4; MqttConnectAckPayload payload = null; if (message.payload() != null) { payload = (MqttConnectAckPayload)message.payload(); if (payload.getData() != null && payload.getData().length > 0) { length += payload.getData().length; } } ByteBuf buf = byteBufAllocator.buffer(length); buf.writeByte(getFixedHeaderByte1(message.fixedHeader())); buf.writeByte(length - 2); buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00); buf.writeByte(message.variableHeader().connectReturnCode().byteValue()); if (length > 4) { buf.writeBytes(payload.getData()); } return buf; } private static ByteBuf encodeSubscribeMessage( ByteBufAllocator byteBufAllocator, MqttSubscribeMessage message) { logger.debug("encode encodeSubscribeMessage"); int variableHeaderBufferSize = 2; int payloadBufferSize = 0; MqttFixedHeader mqttFixedHeader = message.fixedHeader(); MqttMessageIdVariableHeader variableHeader = message.variableHeader(); MqttSubscribePayload payload = message.payload(); for (MqttTopicSubscription topic : payload.topicSubscriptions()) { String topicName = topic.topicName(); byte[] topicNameBytes = encodeStringUtf8(topicName); payloadBufferSize += 2 + topicNameBytes.length; payloadBufferSize += 1; } int variablePartSize = variableHeaderBufferSize + payloadBufferSize; int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); writeVariableLengthInt(buf, variablePartSize); // Variable Header int messageId = variableHeader.messageId(); buf.writeShort(messageId); // Payload for (MqttTopicSubscription topic : payload.topicSubscriptions()) { String topicName = topic.topicName(); byte[] topicNameBytes = encodeStringUtf8(topicName); buf.writeShort(topicNameBytes.length); buf.writeBytes(topicNameBytes, 0, topicNameBytes.length); buf.writeByte(topic.qualityOfService().value()); } return buf; } private static ByteBuf encodeUnsubscribeMessage( ByteBufAllocator byteBufAllocator, MqttUnsubscribeMessage message) { logger.debug("encode encodeUnsubscribeMessage"); int variableHeaderBufferSize = 2; int payloadBufferSize = 0; MqttFixedHeader mqttFixedHeader = message.fixedHeader(); MqttMessageIdVariableHeader variableHeader = message.variableHeader(); MqttUnsubscribePayload payload = message.payload(); for (String topicName : payload.topics()) { byte[] topicNameBytes = encodeStringUtf8(topicName); payloadBufferSize += 2 + topicNameBytes.length; } int variablePartSize = variableHeaderBufferSize + payloadBufferSize; int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); writeVariableLengthInt(buf, variablePartSize); // Variable Header int messageId = variableHeader.messageId(); buf.writeShort(messageId); // Payload for (String topicName : payload.topics()) { byte[] topicNameBytes = encodeStringUtf8(topicName); buf.writeShort(topicNameBytes.length); buf.writeBytes(topicNameBytes, 0, topicNameBytes.length); } return buf; } private static ByteBuf encodeSubAckMessage( ByteBufAllocator byteBufAllocator, MqttSubAckMessage message) { logger.debug("encode encodeSubAckMessage"); int variableHeaderBufferSize = 2; int payloadBufferSize = message.payload().grantedQoSLevels().size(); int variablePartSize = variableHeaderBufferSize + payloadBufferSize; int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); buf.writeByte(getFixedHeaderByte1(message.fixedHeader())); writeVariableLengthInt(buf, variablePartSize); buf.writeShort(message.variableHeader().messageId()); for (int qos : message.payload().grantedQoSLevels()) { buf.writeByte(qos); } return buf; } @SuppressWarnings("deprecation") private static ByteBuf encodePublishMessage( ByteBufAllocator byteBufAllocator, MqttPublishMessage message) { logger.debug("encode encodePublishMessage"); MqttFixedHeader mqttFixedHeader = message.fixedHeader(); MqttPublishVariableHeader variableHeader = message.variableHeader(); ByteBuf payload = message.payload().duplicate(); String topicName = variableHeader.topicName(); byte[] topicNameBytes = encodeStringUtf8(topicName); int variableHeaderBufferSize = 2 + topicNameBytes.length + (mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0); int payloadBufferSize = payload.readableBytes(); int variablePartSize = variableHeaderBufferSize + payloadBufferSize; int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize); ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize); buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); writeVariableLengthInt(buf, variablePartSize); buf.writeShort(topicNameBytes.length); buf.writeBytes(topicNameBytes); if (mqttFixedHeader.qosLevel().value() > 0) { buf.writeShort(variableHeader.messageId()); } buf.writeBytes(payload); return buf; } private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId( ByteBufAllocator byteBufAllocator, MqttMessage message) { logger.debug("encode encodeMessageWithOnlySingleByteFixedHeaderAndMessageId"); MqttFixedHeader mqttFixedHeader = message.fixedHeader(); MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader(); int msgId = variableHeader.messageId(); int variableHeaderBufferSize = 2; // variable part only has a message id ByteBuf payload = null; if (message.payload() != null) { payload = ((ByteBuf)message.payload()).duplicate(); variableHeaderBufferSize += payload.readableBytes(); } int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize); ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize); buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); writeVariableLengthInt(buf, variableHeaderBufferSize); buf.writeShort(msgId); if (payload != null) { buf.writeBytes(payload); } return buf; } private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader( ByteBufAllocator byteBufAllocator, MqttMessage message) { //logger.debug("encode encodeMessageWithOnlySingleByteFixedHeader"); MqttFixedHeader mqttFixedHeader = message.fixedHeader(); ByteBuf buf = byteBufAllocator.buffer(2); buf.writeByte(getFixedHeaderByte1(mqttFixedHeader)); buf.writeByte(0); return buf; } private static int getFixedHeaderByte1(MqttFixedHeader header) { //logger.debug("encode getFixedHeaderByte1"); int ret = 0; ret |= header.messageType().value() << 4; if (header.isDup()) { ret |= 0x08; } ret |= header.qosLevel().value() << 1; if (header.isRetain()) { ret |= 0x01; } return ret; } private static void writeVariableLengthInt(ByteBuf buf, int num) { //logger.debug("encode writeVariableLengthInt"); do { int digit = num % 128; num /= 128; if (num > 0) { digit |= 0x80; } buf.writeByte(digit); } while (num > 0); } private static int getVariableLengthInt(int num) { int count = 0; do { num /= 128; count++; } while (num > 0); return count; } private static byte[] encodeStringUtf8(String s) { //logger.debug("encode encodeStringUtf8"); return s.getBytes(CharsetUtil.UTF_8); } }