Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Offset commit fixes #593

Merged
merged 3 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1294,10 +1294,10 @@ private void onConsumerInitialFlush(
kafkaFlushExRO.tryWrap(extension.buffer(), extension.offset(), extension.limit()) : null;

KafkaConsumerFlushExFW consumerFlushEx = kafkaFlushEx.consumer();
final KafkaOffsetFW partition = consumerFlushEx.partition();
final KafkaOffsetFW progress = consumerFlushEx.progress();
final int leaderEpoch = consumerFlushEx.leaderEpoch();

offsetCommit.onOffsetCommitRequest(traceId, authorization, partition, leaderEpoch);
offsetCommit.onOffsetCommitRequest(traceId, authorization, progress, leaderEpoch);
}

private void onConsumerInitialAbort(
Expand Down Expand Up @@ -1747,7 +1747,7 @@ private void onOffsetCommitResponse(
ex -> ex.set((b, o, l) -> kafkaFlushExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.consumer(c -> c
.partition(p -> p
.progress(p -> p
.partitionId(commit.partitionId)
.partitionOffset(commit.partitionOffset)
.metadata(commit.metadata)
Expand All @@ -1772,7 +1772,7 @@ private void doOffsetCommit(
.set((b, o, l) -> kafkaDataExRW.wrap(b, o, l)
.typeId(kafkaTypeId)
.offsetCommit(oc -> oc
.partition(p -> p.partitionId(commit.partitionId)
.progress(p -> p.partitionId(commit.partitionId)
.partitionOffset(commit.partitionOffset)
.metadata(commit.metadata))
.generationId(delegate.fanout.generationId)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -731,12 +731,12 @@ private void onApplicationData(
kafkaDataExRO.tryWrap(extension.buffer(), extension.offset(), extension.limit()) : null;

final KafkaOffsetCommitDataExFW commitDataExFW = kafkaDataEx.offsetCommit();
final KafkaOffsetFW partition = commitDataExFW.partition();
final KafkaOffsetFW progress = commitDataExFW.progress();
final int generationId = commitDataExFW.generationId();
final int leaderEpoch = commitDataExFW.leaderEpoch();

client.onOffsetCommit(traceId, partition.partitionId(), partition.partitionOffset(),
generationId, leaderEpoch, partition.metadata().asString());
client.onOffsetCommit(traceId, progress.partitionId(), progress.partitionOffset(),
generationId, leaderEpoch, progress.metadata().asString());
}
}

Expand Down Expand Up @@ -1568,6 +1568,8 @@ private void onDecodePartition(
delegate.doApplicationWindow(traceId);

nextResponseId++;

doEncodeRequestIfNecessary(traceId, initialBudgetId);
}

private void cleanupNetwork(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1775,7 +1775,7 @@ private void doMergedConsumerReplyFlush(
{
final KafkaFlushExFW kafkaFlushExFW = kafkaFlushExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.merged(mc -> mc.consumer(c -> c.partition(partition)))
.merged(mc -> mc.consumer(c -> c.progress(partition)))
.build();

doFlush(sender, originId, routedId, replyId, replySeq, replyAck, replyMax,
Expand Down Expand Up @@ -2100,7 +2100,7 @@ private long nextFetchPartitionOffset(
if (!offsetsByPartitionId.isEmpty())
{
KafkaPartitionOffset kafkaPartitionOffset = offsetsByPartitionId.get(partitionId);
partitionOffset = kafkaPartitionOffset.partitionOffset + 1;
partitionOffset = kafkaPartitionOffset.partitionOffset;
}
else
{
Expand Down Expand Up @@ -2807,13 +2807,13 @@ private void doConsumerInitialFlush(
{
if (!KafkaState.initialClosed(state))
{
final KafkaOffsetFW offsetAck = consumer.partition();
final KafkaOffsetFW offsetAck = consumer.progress();
final KafkaPartitionOffset partitionOffset = merged.offsetsByPartitionId.get(offsetAck.partitionId());

final KafkaFlushExFW kafkaFlushExFW = kafkaFlushExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.consumer(c -> c
.partition(p -> p
.progress(p -> p
.partitionId(offsetAck.partitionId())
.partitionOffset(offsetAck.partitionOffset())
.metadata(offsetAck.metadata()))
Expand Down Expand Up @@ -2934,9 +2934,9 @@ private void onConsumerFanReplyFlush(
kafkaFlushExRO.tryWrap(extension.buffer(), extension.offset(), extension.limit()) : null;

KafkaConsumerFlushExFW consumerFlushEx = kafkaFlushEx.consumer();
final KafkaOffsetFW partition = consumerFlushEx.partition();
final KafkaOffsetFW progress = consumerFlushEx.progress();

merged.doMergedConsumerReplyFlush(traceId, partition);
merged.doMergedConsumerReplyFlush(traceId, progress);
}

private void onConsumerReplyData(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,16 @@ public void shouldUpdateTopicPartitionOffset() throws Exception
k3po.finish();
}

@Test
@Configuration("client.yaml")
@Specification({
"${app}/update.topic.partition.offsets/client",
"${net}/update.topic.partition.offsets/server"})
public void shouldUpdateTopicPartitionOffsets() throws Exception
{
k3po.finish();
}

@Test
@Configuration("client.yaml")
@Specification({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2482,12 +2482,12 @@ private KafkaOffsetCommitDataExBuilder()
offsetCommitDataExRW.wrap(writeBuffer, KafkaDataExFW.FIELD_OFFSET_OFFSET_COMMIT, writeBuffer.capacity());
}

public KafkaOffsetCommitDataExBuilder partition(
public KafkaOffsetCommitDataExBuilder progress(
int partitionId,
long partitionOffset,
String metadata)
{
offsetCommitDataExRW.partition(p -> p
offsetCommitDataExRW.progress(p -> p
.partitionId(partitionId)
.partitionOffset(partitionOffset)
.metadata(metadata));
Expand Down Expand Up @@ -2737,30 +2737,30 @@ private KafkaMergedConsumerFlushExBuilder()
writeBuffer.capacity());
}

public KafkaMergedConsumerFlushExBuilder partition(
public KafkaMergedConsumerFlushExBuilder progress(
int partitionId,
long partitionOffset)
{
partition(partitionId, partitionOffset, DEFAULT_LATEST_OFFSET, null);
progress(partitionId, partitionOffset, DEFAULT_LATEST_OFFSET, null);
return this;
}

public KafkaMergedConsumerFlushExBuilder partition(
public KafkaMergedConsumerFlushExBuilder progress(
int partitionId,
long partitionOffset,
String metadata)
{
partition(partitionId, partitionOffset, DEFAULT_LATEST_OFFSET, metadata);
progress(partitionId, partitionOffset, DEFAULT_LATEST_OFFSET, metadata);
return this;
}

public KafkaMergedConsumerFlushExBuilder partition(
public KafkaMergedConsumerFlushExBuilder progress(
int partitionId,
long partitionOffset,
long latestOffset,
String metadata)
{
mergedConsumerFlushExRW.partition(p -> p
mergedConsumerFlushExRW.progress(p -> p
.partitionId(partitionId)
.partitionOffset(partitionOffset)
.latestOffset(latestOffset)
Expand Down Expand Up @@ -2972,22 +2972,22 @@ private KafkaConsumerFlushExBuilder()
flushConsumerExRW.wrap(writeBuffer, KafkaFlushExFW.FIELD_OFFSET_CONSUMER, writeBuffer.capacity());
}

public KafkaConsumerFlushExBuilder partition(
public KafkaConsumerFlushExBuilder progress(
int partitionId,
long partitionOffset)
{
flushConsumerExRW.partition(p -> p
flushConsumerExRW.progress(p -> p
.partitionId(partitionId)
.partitionOffset(partitionOffset));
return this;
}

public KafkaConsumerFlushExBuilder partition(
public KafkaConsumerFlushExBuilder progress(
int partitionId,
long partitionOffset,
String metadata)
{
flushConsumerExRW.partition(p -> p
flushConsumerExRW.progress(p -> p
.partitionId(partitionId)
.partitionOffset(partitionOffset)
.metadata(metadata));
Expand Down Expand Up @@ -4649,7 +4649,7 @@ private KafkaMergedConsumerFlushEx()
{
}

public KafkaMergedConsumerFlushEx partition(
public KafkaMergedConsumerFlushEx progress(
int partitionId,
long offset,
String metadata)
Expand All @@ -4662,7 +4662,7 @@ public KafkaMergedConsumerFlushEx partition(
return this;
}

public KafkaMergedConsumerFlushEx partition(
public KafkaMergedConsumerFlushEx progress(
int partitionId,
long offset)
{
Expand All @@ -4683,13 +4683,13 @@ private boolean match(
KafkaFlushExFW flushEx)
{
final KafkaMergedConsumerFlushExFW mergedFlushEx = flushEx.merged().consumer();
return matchPartition(mergedFlushEx);
return matchProgress(mergedFlushEx);
}

private boolean matchPartition(
private boolean matchProgress(
final KafkaMergedConsumerFlushExFW mergedFlush)
{
return partitionRW == null || partitionRW.build().equals(mergedFlush.partition());
return partitionRW == null || partitionRW.build().equals(mergedFlush.progress());
}
}
}
Expand Down Expand Up @@ -4878,15 +4878,15 @@ private KafkaConsumerFlushExMatchBuilder()
{
}

public KafkaConsumerFlushExMatchBuilder partition(
public KafkaConsumerFlushExMatchBuilder progress(
int partitionId,
long partitionOffset)
{
partition(partitionId, partitionOffset, null);
progress(partitionId, partitionOffset, null);
return this;
}

public KafkaConsumerFlushExMatchBuilder partition(
public KafkaConsumerFlushExMatchBuilder progress(
int partitionId,
long partitionOffset,
String metadata)
Expand Down Expand Up @@ -4919,14 +4919,14 @@ private boolean match(
KafkaFlushExFW flushEx)
{
KafkaConsumerFlushExFW consumerFlushEx = flushEx.consumer();
return matchPartition(consumerFlushEx) &&
return matchProgress(consumerFlushEx) &&
matchLeaderEpoch(consumerFlushEx);
}

private boolean matchPartition(
private boolean matchProgress(
final KafkaConsumerFlushExFW consumerFLushEx)
{
return partitionRW == null || partitionRW.build().equals(consumerFLushEx.partition());
return partitionRW == null || partitionRW.build().equals(consumerFLushEx.progress());
}

private boolean matchLeaderEpoch(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ scope kafka

struct KafkaMergedConsumerFlushEx
{
KafkaOffset partition;
KafkaOffset progress;
}

struct KafkaMergedFetchFlushEx
Expand Down Expand Up @@ -442,7 +442,7 @@ scope kafka

struct KafkaConsumerFlushEx
{
KafkaOffset partition;
KafkaOffset progress;
int32 leaderEpoch = -1;
}

Expand Down Expand Up @@ -474,7 +474,7 @@ scope kafka

struct KafkaOffsetCommitDataEx
{
KafkaOffset partition;
KafkaOffset progress;
int32 generationId;
int32 leaderEpoch;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,14 @@ read zilla:data.empty
write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 2, "test-meta")
.leaderEpoch(0)
.build()
.build()}

read advised zilla:flush ${kafka:matchFlushEx()
.typeId(zilla:id("kafka"))
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 2, "test-meta")
.build()
.build()}
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ write flush
read advised zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 2, "test-meta")
.leaderEpoch(0)
.build()
.build()}

write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 2, "test-meta")
.build()
.build()}
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ connected
write zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.offsetCommit()
.partition(0, 2, "test-meta")
.progress(0, 2, "test-meta")
.generationId(0)
.leaderEpoch(0)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ connected
read zilla:data.ext ${kafka:dataEx()
.typeId(zilla:id("kafka"))
.offsetCommit()
.partition(0, 2, "test-meta")
.progress(0, 2, "test-meta")
.generationId(0)
.leaderEpoch(0)
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,14 +48,14 @@ write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 3, "test-meta")
.build()
.build()}

read advised zilla:flush ${kafka:matchFlushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 3, "test-meta")
.build()
.build()}
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,14 @@ read advised zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 3, "test-meta")
.build()
.build()}

write advise zilla:flush ${kafka:flushEx()
.typeId(zilla:id("kafka"))
.merged()
.consumer()
.partition(0, 2, "test-meta")
.progress(0, 3, "test-meta")
.build()
.build()}
Loading