package chat.server.moquette;
|
|
import java.util.List;
|
|
import org.apache.log4j.Logger;
|
|
import chat.server.moquette.message.MqttConnAckMessage;
|
import chat.server.moquette.message.MqttConnectAckPayload;
|
import chat.server.moquette.message.MqttConnectMessage;
|
import chat.server.moquette.message.MqttConnectPayload;
|
import chat.server.moquette.message.MqttConnectVariableHeader;
|
import chat.server.moquette.message.MqttFixedHeader;
|
import chat.server.moquette.message.MqttMessage;
|
import chat.server.moquette.message.MqttMessageIdVariableHeader;
|
import chat.server.moquette.message.MqttPublishMessage;
|
import chat.server.moquette.message.MqttPublishVariableHeader;
|
import chat.server.moquette.message.MqttSubAckMessage;
|
import chat.server.moquette.message.MqttSubscribeMessage;
|
import chat.server.moquette.message.MqttSubscribePayload;
|
import chat.server.moquette.message.MqttTopicSubscription;
|
import chat.server.moquette.message.MqttUnsubscribeMessage;
|
import chat.server.moquette.message.MqttUnsubscribePayload;
|
import io.netty.buffer.ByteBuf;
|
import io.netty.buffer.ByteBufAllocator;
|
import io.netty.channel.ChannelHandler;
|
import io.netty.channel.ChannelHandlerContext;
|
import io.netty.handler.codec.MessageToMessageEncoder;
|
import io.netty.util.CharsetUtil;
|
import io.netty.util.internal.EmptyArrays;
|
|
public class MqttEncodHandler extends MessageToMessageEncoder<MqttMessage> {
|
|
public static final MqttEncodHandler INSTANCE = new MqttEncodHandler();
|
|
private static Logger logger = Logger.getLogger(MqttEncodHandler.class);
|
|
public MqttEncodHandler() { }
|
|
public static ChannelHandler getInstance() {
|
return INSTANCE;
|
}
|
|
@Override
|
protected void encode(ChannelHandlerContext ctx, MqttMessage msg, List<Object> out) throws Exception {
|
//logger.debug("begin encode");
|
out.add(doEncode(ctx.alloc(), msg));
|
}
|
|
/**
|
* This is the main encoding method.
|
* It's only visible for testing.
|
*
|
* @param byteBufAllocator Allocates ByteBuf
|
* @param message MQTT message to encode
|
* @return ByteBuf with encoded bytes
|
*/
|
static ByteBuf doEncode(ByteBufAllocator byteBufAllocator, MqttMessage message) {
|
try {
|
switch (message.fixedHeader().messageType()) {
|
case CONNECT:
|
logger.debug("encode CONNECT");
|
return encodeConnectMessage(byteBufAllocator, (MqttConnectMessage) message);
|
|
case CONNACK:
|
logger.debug("encode CONNACK");
|
return encodeConnAckMessage(byteBufAllocator, (MqttConnAckMessage) message);
|
|
case PUBLISH:
|
logger.debug("encode PUBLISH");
|
return encodePublishMessage(byteBufAllocator, (MqttPublishMessage) message);
|
|
case SUBSCRIBE:
|
logger.debug("encode SUBSCRIBE");
|
return encodeSubscribeMessage(byteBufAllocator, (MqttSubscribeMessage) message);
|
|
case UNSUBSCRIBE:
|
logger.debug("encode UNSUBSCRIBE");
|
return encodeUnsubscribeMessage(byteBufAllocator, (MqttUnsubscribeMessage) message);
|
|
case SUBACK:
|
logger.debug("encode SUBACK");
|
return encodeSubAckMessage(byteBufAllocator, (MqttSubAckMessage) message);
|
|
case UNSUBACK:
|
case PUBACK:
|
case PUBREC:
|
case PUBREL:
|
case PUBCOMP:
|
logger.debug("encode PUBCOMP");
|
return encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(byteBufAllocator, message);
|
|
case PINGREQ:
|
case PINGRESP:
|
case DISCONNECT:
|
logger.debug("encode DISCONNECT");
|
return encodeMessageWithOnlySingleByteFixedHeader(byteBufAllocator, message);
|
|
default:
|
throw new IllegalArgumentException(
|
"Unknown message type: " + message.fixedHeader().messageType().value());
|
}
|
}
|
catch (Exception e) {
|
e.printStackTrace();
|
return null;
|
}
|
}
|
|
private static ByteBuf encodeConnectMessage(
|
ByteBufAllocator byteBufAllocator,
|
MqttConnectMessage message) {
|
logger.debug("encode encodeConnectMessage");
|
|
int payloadBufferSize = 0;
|
|
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
|
MqttConnectVariableHeader variableHeader = message.variableHeader();
|
MqttConnectPayload payload = message.payload();
|
MqttVersion mqttVersion = MqttVersion.fromProtocolNameAndLevel(variableHeader.name(),
|
(byte) variableHeader.version());
|
|
// Client id
|
String clientIdentifier = payload.clientIdentifier();
|
if (!MqttCodecUtil.isValidClientId(mqttVersion, clientIdentifier)) {
|
throw new MqttIdentifierRejectedException("invalid clientIdentifier: " + clientIdentifier);
|
}
|
byte[] clientIdentifierBytes = encodeStringUtf8(clientIdentifier);
|
payloadBufferSize += 2 + clientIdentifierBytes.length;
|
|
// Will topic and message
|
String willTopic = payload.willTopic();
|
byte[] willTopicBytes = willTopic != null ? encodeStringUtf8(willTopic) : EmptyArrays.EMPTY_BYTES;
|
String willMessage = payload.willMessage();
|
byte[] willMessageBytes = willMessage != null ? encodeStringUtf8(willMessage) : EmptyArrays.EMPTY_BYTES;
|
if (variableHeader.isWillFlag()) {
|
payloadBufferSize += 2 + willTopicBytes.length;
|
payloadBufferSize += 2 + willMessageBytes.length;
|
}
|
|
String userName = payload.userName();
|
byte[] userNameBytes = userName != null ? encodeStringUtf8(userName) : EmptyArrays.EMPTY_BYTES;
|
if (variableHeader.hasUserName()) {
|
payloadBufferSize += 2 + userNameBytes.length;
|
}
|
|
byte[] passwordBytes = payload.password();
|
if (variableHeader.hasPassword()) {
|
payloadBufferSize += 2 + passwordBytes.length;
|
}
|
|
// Fixed header
|
byte[] protocolNameBytes = mqttVersion.protocolNameBytes();
|
int variableHeaderBufferSize = 2 + protocolNameBytes.length + 4;
|
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
|
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
|
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
|
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
|
writeVariableLengthInt(buf, variablePartSize);
|
|
buf.writeShort(protocolNameBytes.length);
|
buf.writeBytes(protocolNameBytes);
|
|
buf.writeByte(variableHeader.version());
|
buf.writeByte(getConnVariableHeaderFlag(variableHeader));
|
buf.writeShort(variableHeader.keepAliveTimeSeconds());
|
|
// Payload
|
buf.writeShort(clientIdentifierBytes.length);
|
buf.writeBytes(clientIdentifierBytes, 0, clientIdentifierBytes.length);
|
if (variableHeader.isWillFlag()) {
|
buf.writeShort(willTopicBytes.length);
|
buf.writeBytes(willTopicBytes, 0, willTopicBytes.length);
|
buf.writeShort(willMessageBytes.length);
|
buf.writeBytes(willMessageBytes, 0, willMessageBytes.length);
|
}
|
if (variableHeader.hasUserName()) {
|
buf.writeShort(userNameBytes.length);
|
buf.writeBytes(userNameBytes, 0, userNameBytes.length);
|
}
|
if (variableHeader.hasPassword()) {
|
buf.writeShort(passwordBytes.length);
|
buf.writeBytes(passwordBytes, 0, passwordBytes.length);
|
}
|
return buf;
|
}
|
|
private static int getConnVariableHeaderFlag(MqttConnectVariableHeader variableHeader) {
|
logger.debug("encode getConnVariableHeaderFlag");
|
|
int flagByte = 0;
|
if (variableHeader.hasUserName()) {
|
flagByte |= 0x80;
|
}
|
if (variableHeader.hasPassword()) {
|
flagByte |= 0x40;
|
}
|
if (variableHeader.isWillRetain()) {
|
flagByte |= 0x20;
|
}
|
flagByte |= (variableHeader.willQos() & 0x03) << 3;
|
if (variableHeader.isWillFlag()) {
|
flagByte |= 0x04;
|
}
|
if (variableHeader.isCleanSession()) {
|
flagByte |= 0x02;
|
}
|
return flagByte;
|
}
|
|
private static ByteBuf encodeConnAckMessage(
|
ByteBufAllocator byteBufAllocator,
|
MqttConnAckMessage message) {
|
logger.debug("encode encodeConnAckMessage");
|
|
int length = 4;
|
MqttConnectAckPayload payload = null;
|
if (message.payload() != null) {
|
payload = (MqttConnectAckPayload)message.payload();
|
if (payload.getData() != null && payload.getData().length > 0) {
|
length += payload.getData().length;
|
}
|
}
|
ByteBuf buf = byteBufAllocator.buffer(length);
|
buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
|
buf.writeByte(length - 2);
|
buf.writeByte(message.variableHeader().isSessionPresent() ? 0x01 : 0x00);
|
buf.writeByte(message.variableHeader().connectReturnCode().byteValue());
|
if (length > 4) {
|
buf.writeBytes(payload.getData());
|
}
|
|
return buf;
|
}
|
|
private static ByteBuf encodeSubscribeMessage(
|
ByteBufAllocator byteBufAllocator,
|
MqttSubscribeMessage message) {
|
logger.debug("encode encodeSubscribeMessage");
|
|
int variableHeaderBufferSize = 2;
|
int payloadBufferSize = 0;
|
|
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
|
MqttMessageIdVariableHeader variableHeader = message.variableHeader();
|
MqttSubscribePayload payload = message.payload();
|
|
for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
|
String topicName = topic.topicName();
|
byte[] topicNameBytes = encodeStringUtf8(topicName);
|
payloadBufferSize += 2 + topicNameBytes.length;
|
payloadBufferSize += 1;
|
}
|
|
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
|
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
|
|
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
|
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
|
writeVariableLengthInt(buf, variablePartSize);
|
|
// Variable Header
|
int messageId = variableHeader.messageId();
|
buf.writeShort(messageId);
|
|
// Payload
|
for (MqttTopicSubscription topic : payload.topicSubscriptions()) {
|
String topicName = topic.topicName();
|
byte[] topicNameBytes = encodeStringUtf8(topicName);
|
buf.writeShort(topicNameBytes.length);
|
buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
|
buf.writeByte(topic.qualityOfService().value());
|
}
|
|
return buf;
|
}
|
|
private static ByteBuf encodeUnsubscribeMessage(
|
ByteBufAllocator byteBufAllocator,
|
MqttUnsubscribeMessage message) {
|
logger.debug("encode encodeUnsubscribeMessage");
|
|
int variableHeaderBufferSize = 2;
|
int payloadBufferSize = 0;
|
|
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
|
MqttMessageIdVariableHeader variableHeader = message.variableHeader();
|
MqttUnsubscribePayload payload = message.payload();
|
|
for (String topicName : payload.topics()) {
|
byte[] topicNameBytes = encodeStringUtf8(topicName);
|
payloadBufferSize += 2 + topicNameBytes.length;
|
}
|
|
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
|
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
|
|
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
|
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
|
writeVariableLengthInt(buf, variablePartSize);
|
|
// Variable Header
|
int messageId = variableHeader.messageId();
|
buf.writeShort(messageId);
|
|
// Payload
|
for (String topicName : payload.topics()) {
|
byte[] topicNameBytes = encodeStringUtf8(topicName);
|
buf.writeShort(topicNameBytes.length);
|
buf.writeBytes(topicNameBytes, 0, topicNameBytes.length);
|
}
|
|
return buf;
|
}
|
|
private static ByteBuf encodeSubAckMessage(
|
ByteBufAllocator byteBufAllocator,
|
MqttSubAckMessage message) {
|
logger.debug("encode encodeSubAckMessage");
|
|
int variableHeaderBufferSize = 2;
|
int payloadBufferSize = message.payload().grantedQoSLevels().size();
|
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
|
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
|
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
|
buf.writeByte(getFixedHeaderByte1(message.fixedHeader()));
|
writeVariableLengthInt(buf, variablePartSize);
|
buf.writeShort(message.variableHeader().messageId());
|
for (int qos : message.payload().grantedQoSLevels()) {
|
buf.writeByte(qos);
|
}
|
|
return buf;
|
}
|
|
@SuppressWarnings("deprecation")
|
private static ByteBuf encodePublishMessage(
|
ByteBufAllocator byteBufAllocator,
|
MqttPublishMessage message) {
|
logger.debug("encode encodePublishMessage");
|
|
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
|
MqttPublishVariableHeader variableHeader = message.variableHeader();
|
ByteBuf payload = message.payload().duplicate();
|
|
String topicName = variableHeader.topicName();
|
byte[] topicNameBytes = encodeStringUtf8(topicName);
|
|
int variableHeaderBufferSize = 2 + topicNameBytes.length +
|
(mqttFixedHeader.qosLevel().value() > 0 ? 2 : 0);
|
int payloadBufferSize = payload.readableBytes();
|
int variablePartSize = variableHeaderBufferSize + payloadBufferSize;
|
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variablePartSize);
|
|
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variablePartSize);
|
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
|
writeVariableLengthInt(buf, variablePartSize);
|
buf.writeShort(topicNameBytes.length);
|
buf.writeBytes(topicNameBytes);
|
if (mqttFixedHeader.qosLevel().value() > 0) {
|
buf.writeShort(variableHeader.messageId());
|
}
|
buf.writeBytes(payload);
|
|
return buf;
|
}
|
|
private static ByteBuf encodeMessageWithOnlySingleByteFixedHeaderAndMessageId(
|
ByteBufAllocator byteBufAllocator,
|
MqttMessage message) {
|
logger.debug("encode encodeMessageWithOnlySingleByteFixedHeaderAndMessageId");
|
|
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
|
MqttMessageIdVariableHeader variableHeader = (MqttMessageIdVariableHeader) message.variableHeader();
|
int msgId = variableHeader.messageId();
|
|
int variableHeaderBufferSize = 2; // variable part only has a message id
|
|
ByteBuf payload = null;
|
if (message.payload() != null) {
|
payload = ((ByteBuf)message.payload()).duplicate();
|
variableHeaderBufferSize += payload.readableBytes();
|
}
|
int fixedHeaderBufferSize = 1 + getVariableLengthInt(variableHeaderBufferSize);
|
ByteBuf buf = byteBufAllocator.buffer(fixedHeaderBufferSize + variableHeaderBufferSize);
|
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
|
writeVariableLengthInt(buf, variableHeaderBufferSize);
|
buf.writeShort(msgId);
|
if (payload != null) {
|
buf.writeBytes(payload);
|
}
|
|
return buf;
|
}
|
|
private static ByteBuf encodeMessageWithOnlySingleByteFixedHeader(
|
ByteBufAllocator byteBufAllocator,
|
MqttMessage message) {
|
//logger.debug("encode encodeMessageWithOnlySingleByteFixedHeader");
|
|
MqttFixedHeader mqttFixedHeader = message.fixedHeader();
|
ByteBuf buf = byteBufAllocator.buffer(2);
|
buf.writeByte(getFixedHeaderByte1(mqttFixedHeader));
|
buf.writeByte(0);
|
|
return buf;
|
}
|
|
private static int getFixedHeaderByte1(MqttFixedHeader header) {
|
//logger.debug("encode getFixedHeaderByte1");
|
|
int ret = 0;
|
ret |= header.messageType().value() << 4;
|
if (header.isDup()) {
|
ret |= 0x08;
|
}
|
ret |= header.qosLevel().value() << 1;
|
if (header.isRetain()) {
|
ret |= 0x01;
|
}
|
return ret;
|
}
|
|
private static void writeVariableLengthInt(ByteBuf buf, int num) {
|
//logger.debug("encode writeVariableLengthInt");
|
|
do {
|
int digit = num % 128;
|
num /= 128;
|
if (num > 0) {
|
digit |= 0x80;
|
}
|
buf.writeByte(digit);
|
} while (num > 0);
|
}
|
|
private static int getVariableLengthInt(int num) {
|
int count = 0;
|
do {
|
num /= 128;
|
count++;
|
} while (num > 0);
|
return count;
|
}
|
|
private static byte[] encodeStringUtf8(String s) {
|
//logger.debug("encode encodeStringUtf8");
|
return s.getBytes(CharsetUtil.UTF_8);
|
}
|
|
|
}
|