Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support Kafka consumer groups #262

Merged
merged 80 commits into from
Jul 28, 2023

Conversation

akrambek
Copy link
Contributor

@akrambek akrambek commented May 26, 2023

Description

Please include a summary of the changes and the related issue. Please also include relevant motivation and context. List any dependencies that are required for this change.

Fixes #215

@@ -321,6 +321,7 @@ scope kafka

struct KafkaGroupBeginEx
{
string16 topic;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need topic on KafkaGroupBeginEx?
Isn't the consumer group independent of any specific topic?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Forgot to remove it sorry.

final long traceId = data.traceId();
final long budgetId = data.budgetId();

coordinatorClient.doSyncRequest(traceId, budgetId, data.payload());
Copy link
Contributor Author

@akrambek akrambek May 30, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is what I am not sure. Whether to start from join request or from doSync

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the case where the leader is making assignments to the members, right?

First, we need to verify and reject if not the leader, as Kafka will reject it anyway.
Then we need to doSyncRequest to update the assignments, but the leader is already in the group, so no need to proactively re-Join the group, i think, agree?

Copy link
Contributor Author

@akrambek akrambek May 31, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the case where the leader is making assignments to the members, right?

Yes

yeah that's what I was thinking

Copy link
Contributor

@jfallows jfallows left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'll also need to add KafkaCacheServerGroupFactory and KafkaCacheClientGroupFactory so that this will work from the cache client side of the pipeline.

{
string16 memberId;
uint32 length;
octets[length] topicPartition;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Given that the format of the assignment is opaque, suggest we call this value instead of topicPartition.

read 10 # size
${newRequestId}
0 # Throttle time
0s # No error
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: all our other scripts use lower case for the comments, please follow the same lowercase pattern for comments in any new scripts you create and fix the comments in these scripts too for completeness when you get a moment.

"groupId":
{
"title": "ConsumerGroupId",
"type": "string"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the scenario where we need to configure a route by groupId in zilla.yaml?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah I think we don't really need it and it should be probably the options in mqtt-kafka binding

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good, please remove it, including any associated config parsing and routing code.

final long traceId = data.traceId();
final long budgetId = data.budgetId();

coordinatorClient.doSyncRequest(traceId, budgetId, data.payload());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is the case where the leader is making assignments to the members, right?

First, we need to verify and reject if not the leader, as Kafka will reject it anyway.
Then we need to doSyncRequest to update the assignments, but the leader is already in the group, so no need to proactively re-Join the group, i think, agree?

@jfallows jfallows changed the title Consumer group Support Kafka consumer groups Jul 25, 2023
Configuration config)
{
return () -> String.format("%s-%s", "zilla", UUID.randomUUID());
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should the "zilla" part be coming from the client.id so that if the client id is configured, we see the effect here for the group instance id as well?

Note: we have access to the config, so it's easy to extract it as a local final variable before using it in the lambda.

sender.accept(reset.typeId(), reset.buffer(), reset.offset(), reset.sizeof());
}

final class KafkaCacheServerGroupNet
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest renaming to KafkaCacheGroupNet

}
}

private final class KafkaCacheServerGroupApp
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest renaming to KafkaCacheGroupApp

stream.streamCleanup(traceId, traceId);
}

KafkaGroupStream kafkaGroupStream = new KafkaGroupStream(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest rename variable to groupStream (or group) as we already in Kafka context.

}
}

private final class GroupIdentifier
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest renaming to GroupMembership with fields instanceId and memberIds.

@@ -0,0 +1,176 @@
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest renaming scenario to coordinator.reject.invalid.consumer.

@@ -0,0 +1,150 @@
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest renaming scenario to coordinator.not.available.

@@ -0,0 +1,41 @@
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does connected mean here?

Suggest renaming scenario to client.sent.write.abort.before.coordinator.response.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Makes sesnse

@@ -0,0 +1,195 @@
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest rename scenario to rebalance.protocol.highlander

@@ -0,0 +1,165 @@
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: each of the other network script categories indicate the versions of the Kafka wire protocol api(s) in use, even if multiple - for example unmerged.p3.f5.d0.m5. 😄

Suggest doing the same for group category, also for group.sasl category.

@@ -100,6 +105,11 @@ public class KafkaConfiguration extends Configuration
KAFKA_CACHE_CLIENT_TRAILERS_SIZE_MAX = config.property("cache.client.trailers.size.max", 256);
KAFKA_CLIENT_SASL_SCRAM_NONCE = config.property(NonceSupplier.class, "client.sasl.scram.nonce",
KafkaConfiguration::decodeNonceSupplier, KafkaConfiguration::defaultNonceSupplier);
KAFKA_CLIENT_GROUP_INSTANCE_ID = config.property("client.group.instance.id", UUID.randomUUID().toString());
KAFKA_CLIENT_GROUP_REBALANCE_TIMEOUT = config.property("client.group.rebalance.timeout", 4000);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be of type Duration, with default PT4S (period time 4 seconds).

{
return Duration.parse(timeout);
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please inline this lambda as (c, v) -> Duration.parse(v) in the _PROPERTY assignment above and remove this method.

KAFKA_CLIENT_GROUP_REBALANCE_TIMEOUT = config.property(Duration.class, "client.group.rebalance.timeout",
KafkaConfiguration::rebalanceTimeoutDuration, "PT4S");
KAFKA_CLIENT_ID = config.property("client.id", "zilla");
KAFKA_CLIENT_INSTANCE_ID_SUPPLIER = config.property(InstanceIdSupplier.class, "client.instance.id.supplier",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove .supplier from property name, remove _SUPPLIER from constant name, and remove Supplier suffix from property accessor method below.

What is the difference between "client.group.instance.id" and "client.instance.id.supplier"?

If they represent the same concept then we just need "client.group.instance.id" of type InstanceIdSupplier and default to a supplier that generates UUID.randomUUID().toString().

@@ -20,6 +20,7 @@

import java.nio.file.Path;
import java.nio.file.Paths;
import java.time.Duration;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this import.

@@ -0,0 +1,69 @@
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please see comment above.

read zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.group()
.leaderId("memberid-1")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sorry to nitpick, but this should be memberId-1 not memberid-1, right?

@@ -0,0 +1,195 @@
#
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Rename scenario?

<dependency>
<groupId>org.jboss.byteman</groupId>
<artifactId>byteman</artifactId>
</dependency>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please remove this dependency, it should be unnecessary. We only use it for TCP binding to force behavior of the socket APIs when simulating different network interactions.

jfallows
jfallows previously approved these changes Jul 27, 2023
@jfallows jfallows merged commit 80e2091 into aklivity:develop Jul 28, 2023
2 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support kafka consumer groups
2 participants