Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Kafka consumer groups #262

Merged
merged 80 commits into from
Jul 28, 2023
Merged
Show file tree
Hide file tree
Changes from 68 commits
Commits
Show all changes
80 commits
Select commit Hold shift + click to select a range
e49a458
Checkpoint
Apr 29, 2023
1dea9a5
Checkpoint
Apr 29, 2023
4209230
Checkpoint
Apr 29, 2023
8f9ae54
Chechpoint
May 1, 2023
872762e
Combine consumer group handshake
May 1, 2023
860a780
Rename
May 1, 2023
b8c4a5a
Checkpoint
May 2, 2023
a955eee
Checkpoint
May 2, 2023
7107968
Checkpoint
May 5, 2023
6f8e5fb
Checkpoint
May 8, 2023
2e67dd0
Add unit tests
May 8, 2023
ad7444d
Merge branch 'feature/consumer-group-idl' into feature/consumer-group
May 8, 2023
919a0d4
Checkpoint
May 9, 2023
306543b
Checkpoint
May 9, 2023
c1ce5a3
Merge branch 'develop' into feature/consumer-group
May 10, 2023
b8dd0bb
Checkpoint
May 10, 2023
0f542fe
Checkpoint
May 11, 2023
175bf8b
Checkpoint
May 11, 2023
ed3e339
Checkpoint
May 12, 2023
802cff7
Checkpoint
May 12, 2023
5e6a742
Merge branch 'develop' into feature/consumer-group
May 12, 2023
df8da88
Checkpoint
May 26, 2023
9729be6
Initial implementation of consumer group
May 26, 2023
30ba518
Fix typo
May 30, 2023
af4eaa9
Revert back changes
May 30, 2023
d5beb2c
Checkpoint
Jun 8, 2023
2471375
Checkpoint
Jun 9, 2023
50690a9
Add client
Jun 12, 2023
e6571fa
Checkpoint
Jun 14, 2023
ce207fe
Checkpoint
Jun 14, 2023
3e32394
Checkpoint
Jun 15, 2023
32a0a3c
Add flush support in consumer group
Jun 19, 2023
c64f9cf
Support leave group request
Jun 23, 2023
bf7826e
Fix typo
Jun 23, 2023
999fd4d
Upgrade request version
Jun 23, 2023
18c1331
Move group related configs to KafkaConfiguration
Jun 26, 2023
816e0ac
Checkpoint
Jun 27, 2023
275d7f3
Checkpoint
Jun 28, 2023
bb94a7d
Checkpoint
Jun 28, 2023
ebf2050
Fix typo
Jun 29, 2023
08315ea
Merge branch 'develop' into feature/consumer-group
Jun 29, 2023
d71dff2
Fix MemberMetadata frame
Jun 29, 2023
95afdbe
Fix generated id
Jun 30, 2023
975a846
Fix consumer group
Jul 3, 2023
f9494b5
More fixes
Jul 7, 2023
4f2a1f5
Apply feedback from PR
Jul 7, 2023
89a7988
Fix compiler errors
Jul 7, 2023
04eed7c
Fix remaining bugs
Jul 12, 2023
635eb92
Testing
Jul 19, 2023
542940c
Sending begin reply after sending join request
Jul 20, 2023
8ceb90a
Fix typo
Jul 20, 2023
d0abbe4
Fix typo
Jul 20, 2023
a986141
Code refactoring
Jul 20, 2023
e1c986f
Fix scripts
Jul 21, 2023
42ded2f
Checkpoint
Jul 24, 2023
dfbd251
Checkpoint
Jul 24, 2023
e7f7912
Fix typo
Jul 24, 2023
58e5bd5
Checkpoint
Jul 24, 2023
515047a
Fix typo
Jul 24, 2023
c1c0e51
Handle the same group client coming to the same core id
Jul 24, 2023
e9f2d44
Checkpoint
Jul 25, 2023
9df825a
Checkpoint
Jul 25, 2023
6f2a664
Add more tests
Jul 25, 2023
5b431c4
Merge branch 'develop' into feature/consumer-group
Jul 25, 2023
8c435cb
Fix typo
Jul 25, 2023
2a52135
Add sasl test
Jul 25, 2023
2aeceec
Fix typo
Jul 25, 2023
8e80f42
Fix script bug
Jul 25, 2023
85d03ce
Checkpoint
Jul 26, 2023
9add48f
Checkpoint
Jul 26, 2023
23ffbfe
Checkpoint
Jul 26, 2023
344dee5
Checkpoint
Jul 26, 2023
002d4c1
Fix configuration decoder
Jul 26, 2023
8c85e39
Apply changes
Jul 27, 2023
bb07f7d
Apply feedback from PR
Jul 27, 2023
ef5090b
Remove uncessary code
Jul 27, 2023
eddfbea
Fix remaining issues
Jul 27, 2023
ae6eef7
Remove extra configuration
Jul 27, 2023
a71ace4
Fix typo
Jul 28, 2023
d87d676
Fix another typo
Jul 28, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaFetchDataExFW;
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaFetchFlushExFW;
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaFlushExFW;
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaGroupBeginExFW;
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaGroupDataExFW;
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaMergedBeginExFW;
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaMergedDataExFW;
import io.aklivity.zilla.runtime.command.log.internal.types.stream.KafkaMergedFlushExFW;
Expand Down Expand Up @@ -890,6 +892,9 @@ private void onKafkaBeginEx(
case KafkaBeginExFW.KIND_DESCRIBE:
onKafkaDescribeBeginEx(offset, timestamp, kafkaBeginEx.describe());
break;
case KafkaBeginExFW.KIND_GROUP:
onKafkaGroupBeginEx(offset, timestamp, kafkaBeginEx.group());
break;
case KafkaBeginExFW.KIND_FETCH:
onKafkaFetchBeginEx(offset, timestamp, kafkaBeginEx.fetch());
break;
Expand Down Expand Up @@ -946,6 +951,19 @@ private void onKafkaDescribeBeginEx(
configs.forEach(c -> out.printf(verboseFormat, index, offset, timestamp, c.asString()));
}

private void onKafkaGroupBeginEx(
int offset,
long timestamp,
KafkaGroupBeginExFW group)
{
String16FW groupId = group.groupId();
String16FW protocol = group.protocol();
int timeout = group.timeout();

out.printf(verboseFormat, index, offset, timestamp, format("[group] %s %s %d",
groupId.asString(), protocol.asString(), timeout));
}

private void onKafkaFetchBeginEx(
int offset,
long timestamp,
Expand Down Expand Up @@ -1062,6 +1080,9 @@ private void onKafkaDataEx(
case KafkaDataExFW.KIND_DESCRIBE:
onKafkaDescribeDataEx(offset, timestamp, kafkaDataEx.describe());
break;
case KafkaDataExFW.KIND_GROUP:
onKafkaGroupDataEx(offset, timestamp, kafkaDataEx.group());
break;
case KafkaDataExFW.KIND_FETCH:
onKafkaFetchDataEx(offset, timestamp, kafkaDataEx.fetch());
break;
Expand Down Expand Up @@ -1089,6 +1110,17 @@ private void onKafkaDescribeDataEx(
format("%s: %s", c.name().asString(), c.value().asString())));
}

private void onKafkaGroupDataEx(
int offset,
long timestamp,
KafkaGroupDataExFW group)
{
String16FW leader = group.leaderId();
String16FW member = group.memberId();

out.printf(verboseFormat, index, offset, timestamp, format("[group] %s %s", leader.asString(), member.asString()));
}

private void onKafkaFetchDataEx(
int offset,
long timestamp,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import java.math.BigInteger;
import java.nio.file.Path;
import java.security.SecureRandom;
import java.util.UUID;
import java.util.function.Supplier;

import org.agrona.LangUtil;
Expand Down Expand Up @@ -63,6 +64,10 @@ public class KafkaConfiguration extends Configuration
public static final IntPropertyDef KAFKA_CACHE_CLIENT_TRAILERS_SIZE_MAX;
public static final IntPropertyDef KAFKA_CACHE_SERVER_RECONNECT_DELAY;
public static final PropertyDef<NonceSupplier> KAFKA_CLIENT_SASL_SCRAM_NONCE;
public static final PropertyDef<String> KAFKA_CLIENT_GROUP_INSTANCE_ID;
public static final IntPropertyDef KAFKA_CLIENT_GROUP_REBALANCE_TIMEOUT;
public static final PropertyDef<String> KAFKA_CLIENT_ID;
public static final PropertyDef<InstanceIdSupplier> KAFKA_CLIENT_INSTANCE_ID_SUPPLIER;

private static final ConfigurationDef KAFKA_CONFIG;

Expand Down Expand Up @@ -100,6 +105,11 @@ public class KafkaConfiguration extends Configuration
KAFKA_CACHE_CLIENT_TRAILERS_SIZE_MAX = config.property("cache.client.trailers.size.max", 256);
KAFKA_CLIENT_SASL_SCRAM_NONCE = config.property(NonceSupplier.class, "client.sasl.scram.nonce",
KafkaConfiguration::decodeNonceSupplier, KafkaConfiguration::defaultNonceSupplier);
KAFKA_CLIENT_GROUP_INSTANCE_ID = config.property("client.group.instance.id", UUID.randomUUID().toString());
KAFKA_CLIENT_GROUP_REBALANCE_TIMEOUT = config.property("client.group.rebalance.timeout", 4000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be of type Duration, with default PT4S (period time 4 seconds).

KAFKA_CLIENT_ID = config.property("client.id", "zilla");
KAFKA_CLIENT_INSTANCE_ID_SUPPLIER = config.property(InstanceIdSupplier.class, "client.instance.id.supplier",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove .supplier from property name, remove _SUPPLIER from constant name, and remove Supplier suffix from property accessor method below.

What is the difference between "client.group.instance.id" and "client.instance.id.supplier"?

If they represent the same concept then we just need "client.group.instance.id" of type InstanceIdSupplier and default to a supplier that generates UUID.randomUUID().toString().

KafkaConfiguration::decodeInstanceIdSupplier, KafkaConfiguration::defaultInstanceIdSupplier);
KAFKA_CONFIG = config;
}

Expand Down Expand Up @@ -248,6 +258,21 @@ public int cacheClientTrailersSizeMax()
return KAFKA_CACHE_CLIENT_TRAILERS_SIZE_MAX.getAsInt(this);
}

public String clientGroupInstanceId()
{
return KAFKA_CLIENT_GROUP_INSTANCE_ID.get(this);
}

public String clientId()
{
return KAFKA_CLIENT_ID.get(this);
}

public int clientGroupRebalanceTimeout()
{
return KAFKA_CLIENT_GROUP_REBALANCE_TIMEOUT.getAsInt(this);
}

private static Path cacheDirectory(
Configuration config,
String cacheDirectory)
Expand All @@ -267,6 +292,11 @@ public Supplier<String> nonceSupplier()
return KAFKA_CLIENT_SASL_SCRAM_NONCE.get(this)::get;
}

public Supplier<String> clientInstanceIdSupplier()
{
return KAFKA_CLIENT_INSTANCE_ID_SUPPLIER.get(this)::get;
}

@FunctionalInterface
private interface NonceSupplier
{
Expand Down Expand Up @@ -315,4 +345,22 @@ private static NonceSupplier defaultNonceSupplier(
return () ->
new BigInteger(130, new SecureRandom()).toString(Character.MAX_RADIX);
}

@FunctionalInterface
private interface InstanceIdSupplier extends Supplier<String>
{
}

private static InstanceIdSupplier decodeInstanceIdSupplier(
Configuration config,
String value)
{
return () -> String.format("%s-%s", "zilla", UUID.randomUUID());
}

private static InstanceIdSupplier defaultInstanceIdSupplier(
Configuration config)
{
return () -> String.format("%s-%s", "zilla", UUID.randomUUID());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the "zilla" part be coming from the client.id so that if the client id is configured, we see the effect here for the group instance id as well?

Note: we have access to the config, so it's easy to extract it as a local final variable before using it in the lambda.

}
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,18 @@ public KafkaRouteConfig resolve(
String topic)
{
return routes.stream()
.filter(r -> r.authorized(authorization) && r.matches(topic))
.filter(r -> r.authorized(authorization) && r.matches(topic, null))
.findFirst()
.orElse(null);
}

public KafkaRouteConfig resolve(
long authorization,
String topic,
String groupId)
{
return routes.stream()
.filter(r -> r.authorized(authorization) && r.matches(topic, groupId))
.findFirst()
.orElse(null);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@
public final class KafkaConditionConfig extends ConditionConfig
{
public final String topic;
public final String groupId;

public KafkaConditionConfig(
String topic)
String topic,
String groupId)
{
this.topic = topic;
this.groupId = groupId;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
public final class KafkaConditionConfigAdapter implements ConditionConfigAdapterSpi, JsonbAdapter<ConditionConfig, JsonObject>
{
private static final String TOPIC_NAME = "topic";
private static final String GROUP_ID_NAME = "groupId";

@Override
public String type()
Expand All @@ -47,6 +48,11 @@ public JsonObject adaptToJson(
object.add(TOPIC_NAME, kafkaCondition.topic);
}

if (kafkaCondition.groupId != null)
{
object.add(GROUP_ID_NAME, kafkaCondition.groupId);
}

return object.build();
}

Expand All @@ -58,6 +64,10 @@ public ConditionConfig adaptFromJson(
? object.getString(TOPIC_NAME)
: null;

return new KafkaConditionConfig(topic);
String groupId = object.containsKey(GROUP_ID_NAME)
? object.getString(GROUP_ID_NAME)
: null;

return new KafkaConditionConfig(topic, groupId);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,20 @@
public final class KafkaConditionMatcher
{
private final Matcher topicMatch;
private final Matcher grouoIdMatch;

public KafkaConditionMatcher(
KafkaConditionConfig condition)
{
this.topicMatch = condition.topic != null ? asMatcher(condition.topic) : null;
this.grouoIdMatch = condition.groupId != null ? asMatcher(condition.groupId) : null;
}

public boolean matches(
String topic)
String topic,
String groupId)
{
return matchesTopic(topic);
return matchesTopic(topic) && matchesGroupId(groupId);
}

private boolean matchesTopic(
Expand All @@ -40,9 +43,18 @@ private boolean matchesTopic(
return this.topicMatch == null || this.topicMatch.reset(topic).matches();
}

private boolean matchesGroupId(
String groupId)
{
return this.grouoIdMatch == null || this.grouoIdMatch.reset(groupId).matches();
}

private static Matcher asMatcher(
String wildcard)
{
return Pattern.compile(wildcard.replace(".", "\\.").replace("*", ".*")).matcher("");
return Pattern.compile(wildcard
.replace(".", "\\.")
.replace("*", ".*"))
.matcher("");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,9 @@ boolean authorized(
}

boolean matches(
String topic)
String topic,
String groupId)
{
return when.isEmpty() || when.stream().anyMatch(m -> m.matches(topic));
return when.isEmpty() || when.stream().anyMatch(m -> m.matches(topic, groupId));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ public KafkaCacheClientFactory(
final KafkaCacheClientDescribeFactory cacheDescribeFactory = new KafkaCacheClientDescribeFactory(
config, context, bindings::get, supplyCacheRoute);

final KafkaCacheGroupFactory cacheGroupFactory = new KafkaCacheGroupFactory(config, context, bindings::get);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If implementations of KafkaCacheServerGroupFactory and KafkaCacheClientGroupFactory are identical, then recommend keeping just one implementation called KafkaCacheGroupFactory and using in both KafkaCacheServerFactory and KafkaCacheClientFactory, similar to our approach for KafkaCacheMetaFactory.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sense thanks

final KafkaCacheClientFetchFactory cacheFetchFactory = new KafkaCacheClientFetchFactory(
config, context, bindings::get, accountant::supplyDebitor, supplyCache, supplyCacheRoute);

Expand All @@ -76,6 +78,7 @@ public KafkaCacheClientFactory(
final Int2ObjectHashMap<BindingHandler> factories = new Int2ObjectHashMap<>();
factories.put(KafkaBeginExFW.KIND_META, cacheMetaFactory);
factories.put(KafkaBeginExFW.KIND_DESCRIBE, cacheDescribeFactory);
factories.put(KafkaBeginExFW.KIND_GROUP, cacheGroupFactory);
factories.put(KafkaBeginExFW.KIND_FETCH, cacheFetchFactory);
factories.put(KafkaBeginExFW.KIND_PRODUCE, cacheProduceFactory);
factories.put(KafkaBeginExFW.KIND_MERGED, cacheMergedFactory);
Expand Down
Loading
Loading