Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

MQTT 3.1.1 support - specs #570

Merged
merged 6 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
The table of contents is too big for display.
Diff view
Diff view
  •  
  •  
  •  
Original file line number Diff line number Diff line change
Expand Up @@ -107,26 +107,26 @@
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.OctetsFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.String16FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.Varuint32FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttConnackFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttConnectFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttDisconnectFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttConnackV5FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttConnectV5FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttDisconnectV5FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttPacketHeaderFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttPacketType;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttPingReqFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttPingRespFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttPropertiesFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttPropertyFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttPublishFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttSubackFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttPublishV5FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttSubackPayloadFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttSubscribeFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttSubackV5FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttSubscribePayloadFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttUnsubackFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttSubscribeV5FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttUnsubackPayloadFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttUnsubscribeFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttUnsubackV5FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttUnsubscribePayloadFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttUnsubscribeV5FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttUserPropertyFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttWillFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.codec.MqttWillV5FW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.stream.AbortFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.stream.BeginFW;
import io.aklivity.zilla.runtime.binding.mqtt.internal.types.stream.DataFW;
Expand Down Expand Up @@ -244,20 +244,19 @@ public final class MqttClientFactory implements MqttStreamFactory
private final MqttBeginExFW.Builder mqttSessionBeginExRW = new MqttBeginExFW.Builder();
private final MqttDataExFW.Builder mqttPublishDataExRW = new MqttDataExFW.Builder();
private final MqttResetExFW.Builder mqttResetExRW = new MqttResetExFW.Builder();
private final MqttWillFW.Builder willMessageRW = new MqttWillFW.Builder();
private final MqttWillV5FW.Builder willMessageRW = new MqttWillV5FW.Builder();
private final MqttPacketHeaderFW mqttPacketHeaderRO = new MqttPacketHeaderFW();
private final MqttConnackFW mqttConnackRO = new MqttConnackFW();
private final MqttSubackFW mqttSubackRO = new MqttSubackFW();
private final MqttUnsubackFW mqttUnsubackRO = new MqttUnsubackFW();
private final MqttWillFW mqttWillRO = new MqttWillFW();
private final MqttConnackV5FW mqttConnackV5RO = new MqttConnackV5FW();
private final MqttSubackV5FW mqttSubackV5RO = new MqttSubackV5FW();
private final MqttUnsubackV5FW mqttUnsubackV5RO = new MqttUnsubackV5FW();
private final MqttWillMessageFW mqttWillMessageRO = new MqttWillMessageFW();
private final MqttPublishFW mqttPublishRO = new MqttPublishFW();
private final MqttPublishV5FW mqttPublishV5RO = new MqttPublishV5FW();
private final MqttSubackPayloadFW mqttSubackPayloadRO = new MqttSubackPayloadFW();
private final MqttUnsubackPayloadFW mqttUnsubackPayloadRO = new MqttUnsubackPayloadFW();
private final MqttSubscribePayloadFW.Builder mqttSubscribePayloadRW = new MqttSubscribePayloadFW.Builder();
private final MqttUnsubscribePayloadFW.Builder mqttUnsubscribePayloadRW = new MqttUnsubscribePayloadFW.Builder();
private final MqttPingRespFW mqttPingRespRO = new MqttPingRespFW();
private final MqttDisconnectFW mqttDisconnectRO = new MqttDisconnectFW();
private final MqttDisconnectV5FW mqttDisconnectV5RO = new MqttDisconnectV5FW();

private final OctetsFW octetsRO = new OctetsFW();
private final OctetsFW.Builder octetsRW = new OctetsFW.Builder();
Expand All @@ -274,12 +273,12 @@ public final class MqttClientFactory implements MqttStreamFactory

private final MqttPublishHeader mqttPublishHeaderRO = new MqttPublishHeader();

private final MqttConnectFW.Builder mqttConnectRW = new MqttConnectFW.Builder();
private final MqttSubscribeFW.Builder mqttSubscribeRW = new MqttSubscribeFW.Builder();
private final MqttUnsubscribeFW.Builder mqttUnsubscribeRW = new MqttUnsubscribeFW.Builder();
private final MqttPublishFW.Builder mqttPublishRW = new MqttPublishFW.Builder();
private final MqttConnectV5FW.Builder mqttConnectV5RW = new MqttConnectV5FW.Builder();
private final MqttSubscribeV5FW.Builder mqttSubscribeV5RW = new MqttSubscribeV5FW.Builder();
private final MqttUnsubscribeV5FW.Builder mqttUnsubscribeV5RW = new MqttUnsubscribeV5FW.Builder();
private final MqttPublishV5FW.Builder mqttPublishV5RW = new MqttPublishV5FW.Builder();
private final MqttPingReqFW.Builder mqttPingReqRW = new MqttPingReqFW.Builder();
private final MqttDisconnectFW.Builder mqttDisconnectRW = new MqttDisconnectFW.Builder();
private final MqttDisconnectV5FW.Builder mqttDisconnectV5RW = new MqttDisconnectV5FW.Builder();
private final Array32FW.Builder<MqttUserPropertyFW.Builder, MqttUserPropertyFW> userPropertiesRW =
new Array32FW.Builder<>(new MqttUserPropertyFW.Builder(), new MqttUserPropertyFW());
private final Array32FW.Builder<Varuint32FW.Builder, Varuint32FW> subscriptionIdsRW =
Expand Down Expand Up @@ -781,7 +780,7 @@ private int decodeConnack(
{
int reasonCode = SUCCESS;

final MqttConnackFW connack = mqttConnackRO.tryWrap(buffer, offset, limit);
final MqttConnackV5FW connack = mqttConnackV5RO.tryWrap(buffer, offset, limit);
int flags = 0;
decode:
{
Expand Down Expand Up @@ -836,7 +835,7 @@ private int decodeSuback(
{
int reasonCode = SUCCESS;

final MqttSubackFW suback = mqttSubackRO.tryWrap(buffer, offset, limit);
final MqttSubackV5FW suback = mqttSubackV5RO.tryWrap(buffer, offset, limit);
decode:
{
if (suback == null)
Expand Down Expand Up @@ -881,7 +880,7 @@ private int decodeUnsuback(
{
int reasonCode = SUCCESS;

final MqttUnsubackFW unsuback = mqttUnsubackRO.tryWrap(buffer, offset, limit);
final MqttUnsubackV5FW unsuback = mqttUnsubackV5RO.tryWrap(buffer, offset, limit);
decode:
{
if (unsuback == null)
Expand Down Expand Up @@ -926,7 +925,7 @@ private int decodePublish(
if (length >= client.decodeablePacketBytes)
{
int reasonCode = SUCCESS;
final MqttPublishFW publish = mqttPublishRO.tryWrap(buffer, offset, offset + client.decodeablePacketBytes);
final MqttPublishV5FW publish = mqttPublishV5RO.tryWrap(buffer, offset, offset + client.decodeablePacketBytes);

final MqttPublishHeader mqttPublishHeader = mqttPublishHeaderRO.reset();

Expand Down Expand Up @@ -1091,7 +1090,7 @@ private int decodeDisconnect(
{
int reasonCode = NORMAL_DISCONNECT;

final MqttDisconnectFW disconnect = mqttDisconnectRO.tryWrap(buffer, offset, limit);
final MqttDisconnectV5FW disconnect = mqttDisconnectV5RO.tryWrap(buffer, offset, limit);
if (disconnect == null)
{
reasonCode = PROTOCOL_ERROR;
Expand Down Expand Up @@ -1509,7 +1508,7 @@ private int onDecodeConnack(
DirectBuffer buffer,
int progress,
int limit,
MqttConnackFW connack)
MqttConnackV5FW connack)
{
byte reasonCode;
decode:
Expand Down Expand Up @@ -1740,7 +1739,7 @@ private int onDecodeSuback(
DirectBuffer buffer,
int progress,
int limit,
MqttSubackFW suback)
MqttSubackV5FW suback)
{
final int packetId = suback.packetId();
final OctetsFW decodePayload = suback.payload();
Expand Down Expand Up @@ -1792,7 +1791,7 @@ private int onDecodeUnsuback(
DirectBuffer buffer,
int progress,
int limit,
MqttUnsubackFW unsuback)
MqttUnsubackV5FW unsuback)
{
final int packetId = unsuback.packetId();
final OctetsFW decodePayload = unsuback.payload();
Expand Down Expand Up @@ -1879,7 +1878,7 @@ private void onDecodePublish(
private void onDecodeDisconnect(
long traceId,
long authorization,
MqttDisconnectFW disconnect)
MqttDisconnectV5FW disconnect)
{
byte reasonCode = decodeDisconnectProperties(disconnect.properties());

Expand Down Expand Up @@ -2149,8 +2148,8 @@ private void doEncodePublish(
});

final int propertiesSize0 = propertiesSize.get();
final MqttPublishFW publish =
mqttPublishRW.wrap(writeBuffer, DataFW.FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())
final MqttPublishV5FW publish =
mqttPublishV5RW.wrap(writeBuffer, DataFW.FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())
.typeAndFlags(publishNetworkTypeAndFlags)
.remainingLength(3 + topicLength + propertiesSize.get() + payloadSize + deferred)
.topicName(topic)
Expand Down Expand Up @@ -2212,7 +2211,7 @@ private void doEncodeConnect(
.build();
propertiesSize = mqttProperty.limit();
}
MqttWillFW will = null;
MqttWillV5FW will = null;
if (willMessage != null)
{
final int expiryInterval = willMessage.expiryInterval();
Expand Down Expand Up @@ -2283,8 +2282,8 @@ private void doEncodeConnect(
final int propertiesSize0 = propertiesSize;
final int willSize = will != null ? will.sizeof() : 0;
flags |= will != null ? (WILL_FLAG_MASK | ((willMessage.flags() & RETAIN_MASK) != 0 ? WILL_RETAIN_MASK : 0)) : 0;
final MqttConnectFW connect =
mqttConnectRW.wrap(writeBuffer, FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())
final MqttConnectV5FW connect =
mqttConnectV5RW.wrap(writeBuffer, FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())
.typeAndFlags(0x10)
.remainingLength(11 + propertiesSize0 + clientId.length() + 2 + willSize)
.protocolName(MQTT_PROTOCOL_NAME)
Expand Down Expand Up @@ -2342,8 +2341,8 @@ private void doEncodeSubscribe(
}

final OctetsFW encodePayload = octetsRO.wrap(encodeBuffer, encodeOffset, encodeProgress);
final MqttSubscribeFW subscribe =
mqttSubscribeRW.wrap(writeBuffer, FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())
final MqttSubscribeV5FW subscribe =
mqttSubscribeV5RW.wrap(writeBuffer, FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())
.typeAndFlags(0x82)
.remainingLength(3 + propertiesSize0 + encodePayload.sizeof())
.packetId(packetId)
Expand Down Expand Up @@ -2384,8 +2383,8 @@ private void doEncodeUnsubscribe(
}

final OctetsFW encodePayload = octetsRO.wrap(encodeBuffer, encodeOffset, encodeProgress);
final MqttUnsubscribeFW unsubscribe =
mqttUnsubscribeRW.wrap(writeBuffer, FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())
final MqttUnsubscribeV5FW unsubscribe =
mqttUnsubscribeV5RW.wrap(writeBuffer, FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())
.typeAndFlags(0xa2)
.remainingLength(3 + encodePayload.sizeof())
.packetId(packetId)
Expand Down Expand Up @@ -2420,8 +2419,8 @@ private void doEncodeDisconnect(
int propertiesSize = 0;

final int propertySize0 = propertiesSize;
final MqttDisconnectFW disconnect =
mqttDisconnectRW.wrap(writeBuffer, FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())
final MqttDisconnectV5FW disconnect =
mqttDisconnectV5RW.wrap(writeBuffer, FIELD_OFFSET_PAYLOAD, writeBuffer.capacity())
.typeAndFlags(0xe0)
.remainingLength(2 + propertySize0)
.reasonCode(reasonCode & 0xff)
Expand Down
Loading