From c3f110531734d51c7e9fd91422b77cddd372b9db Mon Sep 17 00:00:00 2001 From: bmaidics Date: Thu, 28 Sep 2023 14:47:55 -0700 Subject: [PATCH 1/3] Fix publish timeout bug, increase default timeout --- .../mqtt/internal/MqttConfiguration.java | 2 +- .../internal/stream/MqttServerFactory.java | 1 + .../internal/stream/server/PublishIT.java | 15 +++ .../client.rpt | 102 ++++++++++++++++++ .../server.rpt | 96 +++++++++++++++++ .../mqtt/streams/application/PublishIT.java | 9 ++ 6 files changed, 224 insertions(+), 1 deletion(-) create mode 100644 specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.multiple.messages.timeout/client.rpt create mode 100644 specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.multiple.messages.timeout/server.rpt diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttConfiguration.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttConfiguration.java index cf23bf107f..9466f69f15 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttConfiguration.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttConfiguration.java @@ -49,7 +49,7 @@ public class MqttConfiguration extends Configuration static { final ConfigurationDef config = new ConfigurationDef("zilla.binding.mqtt"); - PUBLISH_TIMEOUT = config.property("publish.timeout", TimeUnit.SECONDS.toSeconds(30)); + PUBLISH_TIMEOUT = config.property("publish.timeout", TimeUnit.SECONDS.toSeconds(2 * 60)); CONNECT_TIMEOUT = config.property("connect.timeout", TimeUnit.SECONDS.toSeconds(3)); CONNACK_TIMEOUT = config.property("connack.timeout", TimeUnit.SECONDS.toSeconds(3)); //TODO: better default values? diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java index 76324faac0..5cd1763789 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java @@ -3964,6 +3964,7 @@ private void doPublishAppEnd( if (!MqttState.initialClosed(state)) { doCancelPublishExpiration(); + publishStreams.remove(topicKey); doEnd(application, originId, routedId, initialId, initialSeq, initialAck, initialMax, traceId, sessionId, EMPTY_OCTETS); } diff --git a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/PublishIT.java b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/PublishIT.java index cb89fc9a97..247dfaee52 100644 --- a/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/PublishIT.java +++ b/runtime/binding-mqtt/src/test/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/server/PublishIT.java @@ -152,6 +152,21 @@ public void shouldPublishMultipleMessagesWithDelay() throws Exception k3po.finish(); } + @Test + @Configuration("server.yaml") + @Specification({ + "${net}/publish.multiple.messages.with.delay/client", + "${app}/publish.multiple.messages.timeout/server"}) + @Configure(name = PUBLISH_TIMEOUT_NAME, value = "1") + public void shouldPublishMultipleMessagesTimeout() throws Exception + { + k3po.start(); + k3po.awaitBarrier("PUBLISHED_MESSAGE_TWO"); + Thread.sleep(1500); + k3po.notifyBarrier("PUBLISH_MESSAGE_THREE"); + k3po.finish(); + } + @Test @Configuration("server.yaml") @Specification({ diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.multiple.messages.timeout/client.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.multiple.messages.timeout/client.rpt new file mode 100644 index 0000000000..e74dacf5a2 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.multiple.messages.timeout/client.rpt @@ -0,0 +1,102 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +connect "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .clientId("client") + .build() + .build()} + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .qosMax(2) + .packetSizeMax(66560) + .capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS") + .clientId("client") + .build() + .build()} + +connected + +read zilla:data.empty +read notify RECEIVED_SESSION_STATE + + +connect await RECEIVED_SESSION_STATE + "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .publish() + .clientId("client") + .topic("sensor/one") + .build() + .build()} + +connected + +write zilla:data.ext ${mqtt:dataEx() + .typeId(zilla:id("mqtt")) + .publish() + .build() + .build()} + +write "message1" + +write zilla:data.ext ${mqtt:dataEx() + .typeId(zilla:id("mqtt")) + .publish() + .build() + .build()} + +write "test" +write notify SENT_MESSAGE_TWO + +write close + + +connect await SENT_MESSAGE_TWO + "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .publish() + .clientId("client") + .topic("sensor/one") + .build() + .build()} + +connected + +write zilla:data.ext ${mqtt:dataEx() + .typeId(zilla:id("mqtt")) + .publish() + .build() + .build()} + +write "message3" diff --git a/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.multiple.messages.timeout/server.rpt b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.multiple.messages.timeout/server.rpt new file mode 100644 index 0000000000..5a04de02e6 --- /dev/null +++ b/specs/binding-mqtt.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/streams/application/publish.multiple.messages.timeout/server.rpt @@ -0,0 +1,96 @@ +# +# Copyright 2021-2023 Aklivity Inc. +# +# Aklivity licenses this file to you under the Apache License, +# version 2.0 (the "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +# + +accept "zilla://streams/app0" + option zilla:window 8192 + option zilla:transmission "duplex" + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .clientId("client") + .build() + .build()} + +write zilla:begin.ext ${mqtt:beginEx() + .typeId(zilla:id("mqtt")) + .session() + .flags("CLEAN_START") + .qosMax(2) + .packetSizeMax(66560) + .capabilities("RETAIN", "WILDCARD", "SUBSCRIPTION_IDS", "SHARED_SUBSCRIPTIONS") + .clientId("client") + .build() + .build()} + +connected + +write zilla:data.empty +write flush + + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .publish() + .clientId("client") + .topic("sensor/one") + .build() + .build()} + +connected + +read zilla:data.ext ${mqtt:matchDataEx() + .typeId(zilla:id("mqtt")) + .publish() + .build() + .build()} + +read "message1" + +read zilla:data.ext ${mqtt:matchDataEx() + .typeId(zilla:id("mqtt")) + .publish() + .build() + .build()} + +read "test" + +read closed + +accepted + +read zilla:begin.ext ${mqtt:matchBeginEx() + .typeId(zilla:id("mqtt")) + .publish() + .clientId("client") + .topic("sensor/one") + .build() + .build()} + +connected + +read zilla:data.ext ${mqtt:matchDataEx() + .typeId(zilla:id("mqtt")) + .publish() + .build() + .build()} + +read "message3" diff --git a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/PublishIT.java b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/PublishIT.java index 0380c418d8..89948529cb 100644 --- a/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/PublishIT.java +++ b/specs/binding-mqtt.spec/src/test/java/io/aklivity/zilla/specs/binding/mqtt/streams/application/PublishIT.java @@ -63,6 +63,15 @@ public void shouldSendMultipleMessages() throws Exception k3po.finish(); } + @Test + @Specification({ + "${app}/publish.multiple.messages.timeout/client", + "${app}/publish.multiple.messages.timeout/server"}) + public void shouldPublishMultipleMessagesTimeout() throws Exception + { + k3po.finish(); + } + @Test @Specification({ "${app}/publish.with.user.property/client", From b332f6e33ca181aa19d4c36ca8d61fca8b87bb05 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Thu, 28 Sep 2023 15:52:57 -0700 Subject: [PATCH 2/3] Send only DISCONNECT at session takeover --- .../runtime/binding/mqtt/internal/stream/MqttServerFactory.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java index 5cd1763789..69061042f1 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/stream/MqttServerFactory.java @@ -2372,7 +2372,7 @@ private void onDecodeError( cleanupStreamsUsingAbort(traceId); break; } - if (connected) + if (connected || reasonCode == SESSION_TAKEN_OVER) { doEncodeDisconnect(traceId, authorization, reasonCode, null); } From f872391a82dca9dc697848746a2e4aed6b3d54f0 Mon Sep 17 00:00:00 2001 From: bmaidics Date: Mon, 2 Oct 2023 19:36:42 +0200 Subject: [PATCH 3/3] Fix PR feedback --- .../zilla/runtime/binding/mqtt/internal/MqttConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttConfiguration.java b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttConfiguration.java index 9466f69f15..48f2ec524d 100644 --- a/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttConfiguration.java +++ b/runtime/binding-mqtt/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/internal/MqttConfiguration.java @@ -49,7 +49,7 @@ public class MqttConfiguration extends Configuration static { final ConfigurationDef config = new ConfigurationDef("zilla.binding.mqtt"); - PUBLISH_TIMEOUT = config.property("publish.timeout", TimeUnit.SECONDS.toSeconds(2 * 60)); + PUBLISH_TIMEOUT = config.property("publish.timeout", TimeUnit.MINUTES.toSeconds(2)); CONNECT_TIMEOUT = config.property("connect.timeout", TimeUnit.SECONDS.toSeconds(3)); CONNACK_TIMEOUT = config.property("connack.timeout", TimeUnit.SECONDS.toSeconds(3)); //TODO: better default values?