From 1339019776f749eee34206a1cebbbe79f9504218 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 6 Oct 2023 20:00:15 -0700 Subject: [PATCH 1/4] 0 for no mqtt session expiry should be mapped to max value for the group stream --- .../mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java | 5 +++-- .../zilla/specs/binding/kafka/internal/KafkaFunctions.java | 4 ++-- .../unmerged.group.produce.invalid.partition/server.rpt | 2 +- .../merged/unmerged.group.produce.message.value/server.rpt | 2 +- .../session.connect.override.min.session.expiry/client.rpt | 2 +- .../session.connect.override.min.session.expiry/server.rpt | 2 +- 6 files changed, 9 insertions(+), 8 deletions(-) diff --git a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java index 50492e1efb..5046c0c512 100644 --- a/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java +++ b/runtime/binding-mqtt-kafka/src/main/java/io/aklivity/zilla/runtime/binding/mqtt/kafka/internal/stream/MqttKafkaSessionFactory.java @@ -3817,13 +3817,14 @@ private MessageConsumer newGroupStream( String16FW clientId, int sessionExpiryMs) { + final int timeout = sessionExpiryMs == 0 ? Integer.MAX_VALUE : sessionExpiryMs; + final KafkaBeginExFW kafkaBeginEx = kafkaBeginExRW.wrap(writeBuffer, BeginFW.FIELD_OFFSET_EXTENSION, writeBuffer.capacity()) .typeId(kafkaTypeId) - .group(g -> g.groupId(clientId).protocol(GROUP_PROTOCOL).timeout(sessionExpiryMs)) + .group(g -> g.groupId(clientId).protocol(GROUP_PROTOCOL).timeout(timeout)) .build(); - final BeginFW begin = beginRW.wrap(writeBuffer, 0, writeBuffer.capacity()) .originId(originId) .routedId(routedId) diff --git a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java index 0a3791c485..ef3aa8660d 100644 --- a/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java +++ b/specs/binding-kafka.spec/src/main/java/io/aklivity/zilla/specs/binding/kafka/internal/KafkaFunctions.java @@ -4462,7 +4462,7 @@ public final class KafkaGroupBeginExMatcherBuilder { private String16FW groupId; private String16FW protocol; - private int timeout; + private Integer timeout; private byte[] metadata; @@ -4529,7 +4529,7 @@ private boolean matchProtocol( private boolean matchTimeout( final KafkaGroupBeginExFW groupBeginExFW) { - return timeout == 0 || timeout == groupBeginExFW.timeout(); + return timeout == null || timeout == groupBeginExFW.timeout(); } private boolean matchMetadata( diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/server.rpt index d6cb1fba53..e76764dcb8 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/server.rpt @@ -116,7 +116,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .group() .groupId("client-1") .protocol("highlander") - .timeout(0) + .timeout(45000) .metadata(kafka:memberMetadata() .consumerId("consumer-1") .topic("test") diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/server.rpt index d5ba82e974..305a7aff51 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/server.rpt @@ -116,7 +116,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .group() .groupId("client-1") .protocol("highlander") - .timeout(0) + .timeout(45000) .metadata(kafka:memberMetadata() .consumerId("consumer-1") .topic("test") diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/client.rpt index f66932d8cd..a8e565b0e6 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/client.rpt @@ -60,7 +60,7 @@ write zilla:begin.ext ${kafka:beginEx() .group() .groupId("client-1") .protocol("highlander") - .timeout(0) + .timeout("0x7fff_ffff") .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/server.rpt index 627ce38229..6cf618be44 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/server.rpt @@ -56,7 +56,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .group() .groupId("client-1") .protocol("highlander") - .timeout(0) + .timeout("0x7fff_ffff") .build() .build()} From e2948e2d63c47b5746b770d511be4614c08eb40f Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 6 Oct 2023 20:54:33 -0700 Subject: [PATCH 2/4] Fix typo --- .../session.connect.override.min.session.expiry/client.rpt | 2 +- .../session.connect.override.min.session.expiry/server.rpt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/client.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/client.rpt index a8e565b0e6..cf97184844 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/client.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/client.rpt @@ -60,7 +60,7 @@ write zilla:begin.ext ${kafka:beginEx() .group() .groupId("client-1") .protocol("highlander") - .timeout("0x7fff_ffff") + .timeout(2147483647) .build() .build()} diff --git a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/server.rpt b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/server.rpt index 6cf618be44..7b5d8f7109 100644 --- a/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/server.rpt +++ b/specs/binding-mqtt-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/mqtt/kafka/streams/kafka/session.connect.override.min.session.expiry/server.rpt @@ -56,7 +56,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .group() .groupId("client-1") .protocol("highlander") - .timeout("0x7fff_ffff") + .timeout(2147483647) .build() .build()} From 24c1d42e10a592771de9185e562b387f80ab0584 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 6 Oct 2023 21:16:55 -0700 Subject: [PATCH 3/4] FIx typo --- .../merged/unmerged.group.produce.invalid.partition/client.rpt | 2 +- .../merged/unmerged.group.produce.invalid.partition/server.rpt | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/client.rpt index 1acf4ae01b..6d4bb4e42a 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/client.rpt @@ -115,7 +115,7 @@ write zilla:begin.ext ${kafka:beginEx() .group() .groupId("client-1") .protocol("highlander") - .timeout(45000) + .timeout(0) .metadata(kafka:memberMetadata() .consumerId("consumer-1") .topic("test") diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/server.rpt index e76764dcb8..d6cb1fba53 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.invalid.partition/server.rpt @@ -116,7 +116,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .group() .groupId("client-1") .protocol("highlander") - .timeout(45000) + .timeout(0) .metadata(kafka:memberMetadata() .consumerId("consumer-1") .topic("test") From 4613a0d9b16ddd57b5838ce5ace9fd0c7510ab59 Mon Sep 17 00:00:00 2001 From: Akram Yakubov Date: Fri, 6 Oct 2023 21:24:13 -0700 Subject: [PATCH 4/4] Fix typo --- .../runtime/binding/kafka/internal/stream/CacheMergedIT.java | 2 +- .../merged/unmerged.group.produce.message.value/client.rpt | 2 +- .../merged/unmerged.group.produce.message.value/server.rpt | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) diff --git a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheMergedIT.java b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheMergedIT.java index e31c1f94c7..2769ccca46 100644 --- a/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheMergedIT.java +++ b/runtime/binding-kafka/src/test/java/io/aklivity/zilla/runtime/binding/kafka/internal/stream/CacheMergedIT.java @@ -629,7 +629,7 @@ public void shouldRejectMessageForInvalidPartition() throws Exception @Specification({ "${app}/merged.group.produce.message.value/client", "${app}/unmerged.group.produce.message.value/server"}) - public void shouldProduceMergedMergedMessageValue() throws Exception + public void shouldProduceMergedMessageValue() throws Exception { k3po.finish(); } diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/client.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/client.rpt index 9a9aec1d74..066d56c375 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/client.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/client.rpt @@ -115,7 +115,7 @@ write zilla:begin.ext ${kafka:beginEx() .group() .groupId("client-1") .protocol("highlander") - .timeout(45000) + .timeout(0) .metadata(kafka:memberMetadata() .consumerId("consumer-1") .topic("test") diff --git a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/server.rpt b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/server.rpt index 305a7aff51..d5ba82e974 100644 --- a/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/server.rpt +++ b/specs/binding-kafka.spec/src/main/scripts/io/aklivity/zilla/specs/binding/kafka/streams/application/merged/unmerged.group.produce.message.value/server.rpt @@ -116,7 +116,7 @@ read zilla:begin.ext ${kafka:matchBeginEx() .group() .groupId("client-1") .protocol("highlander") - .timeout(45000) + .timeout(0) .metadata(kafka:memberMetadata() .consumerId("consumer-1") .topic("test")