Skip to content

Commit

Permalink
KafkaMerged acknowledge flush frame (#258)
Browse files Browse the repository at this point in the history
  • Loading branch information
bmaidics committed Jun 26, 2023
1 parent ac40b89 commit f23cc04
Show file tree
Hide file tree
Showing 20 changed files with 2,719 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1194,6 +1194,10 @@ private void onClientMessage(
final DataFW data = dataRO.wrap(buffer, index, index + length);
onClientInitialData(data);
break;
case FlushFW.TYPE_ID:
final FlushFW flush = flushRO.wrap(buffer, index, index + length);
onClientInitialFlush(flush);
break;
case EndFW.TYPE_ID:
final EndFW end = endRO.wrap(buffer, index, index + length);
onClientInitialEnd(end);
Expand Down Expand Up @@ -1275,6 +1279,25 @@ private void onClientInitialData(
doClientInitialWindow(traceId, noAck, noAck + initialBudgetMax);
}

private void onClientInitialFlush(
FlushFW flush)
{
final long sequence = flush.sequence();
final long acknowledge = flush.acknowledge();
final long traceId = flush.traceId();
final int reserved = flush.reserved();

assert acknowledge <= sequence;
assert sequence >= initialSeq;

initialSeq = sequence + reserved;

assert initialAck <= initialSeq;

final int noAck = (int) (initialSeq - initialAck);
doClientInitialWindow(traceId, noAck, noAck + initialBudgetMax);
}

private void onClientInitialEnd(
EndFW end)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1030,7 +1030,8 @@ private final class KafkaMergedStream
private int replyPad;
private long replyBudgetId;

private int nextNullKeyHash;
private int nextNullKeyHashData;
private int nextNullKeyHashFlush;
private int fetchStreamIndex;
private long mergedReplyBudgetId = NO_CREDITOR_INDEX;

Expand Down Expand Up @@ -1193,7 +1194,7 @@ private void onMergedInitialData(
final KafkaKeyFW key = kafkaMergedDataEx.key();
final KafkaOffsetFW partition = kafkaMergedDataEx.partition();
final int partitionId = partition.partitionId();
final int nextPartitionId = partitionId == DYNAMIC_PARTITION ? nextPartition(key) : partitionId;
final int nextPartitionId = partitionId == DYNAMIC_PARTITION ? nextPartitionData(key) : partitionId;

final KafkaUnmergedProduceStream newProducer = findProducePartitionLeader(nextPartitionId);
assert newProducer != null; // TODO
Expand All @@ -1218,11 +1219,21 @@ private KafkaOffsetType asMaximumOffset(
partitions.anyMatch(p -> p.latestOffset() != HISTORICAL.value()) ? LIVE : HISTORICAL;
}

private int nextPartition(
private int nextPartitionData(
KafkaKeyFW key)
{
final int partitionCount = leadersByPartitionId.size();
final int keyHash = key.length() != -1 ? defaultKeyHash(key) : nextNullKeyHash++;
final int keyHash = key.length() != -1 ? defaultKeyHash(key) : nextNullKeyHashData++;
final int partitionId = partitionCount > 0 ? (0x7fff_ffff & keyHash) % partitionCount : 0;

return partitionId;
}

private int nextPartitionFlush(
KafkaKeyFW key)
{
final int partitionCount = leadersByPartitionId.size();
final int keyHash = key.length() != -1 ? defaultKeyHash(key) : nextNullKeyHashFlush++;
final int partitionId = partitionCount > 0 ? (0x7fff_ffff & keyHash) % partitionCount : 0;

return partitionId;
Expand Down Expand Up @@ -1271,6 +1282,7 @@ private void onMergedInitialFlush(
{
final long traceId = flush.traceId();
final OctetsFW extension = flush.extension();
final int reserved = flush.reserved();
final ExtensionFW flushEx = extension.get(extensionRO::tryWrap);

final KafkaFlushExFW kafkaFlushEx = flushEx != null && flushEx.typeId() == kafkaTypeId ?
Expand Down Expand Up @@ -1310,6 +1322,23 @@ private void onMergedInitialFlush(
doFetchPartitionsIfNecessary(traceId);
doProducePartitionsIfNecessary(traceId);
}
else
{
if (hasProduceCapability(capabilities))
{
final KafkaOffsetFW partition = kafkaMergedFlushEx.partition();
final KafkaKeyFW key = kafkaMergedFlushEx.key();
if (partition != null)
{
final int partitionId = partition.partitionId();
final int nextPartitionId = partitionId == DYNAMIC_PARTITION ? nextPartitionFlush(key) : partitionId;

final KafkaUnmergedProduceStream producer = findProducePartitionLeader(nextPartitionId);
assert producer != null;
producer.doProduceInitialFlush(traceId, reserved, kafkaMergedFlushEx);
}
}
}

if (hasFetchCapability(capabilities) && !newFilters.equals(this.filters))
{
Expand Down Expand Up @@ -3051,6 +3080,28 @@ private void doProduceInitialData(
assert initialAck <= initialSeq;
}

private void doProduceInitialFlush(
long traceId,
int reserved,
KafkaMergedFlushExFW kafkaMergedFlushEx)
{
Flyweight newKafkaFlushEx = kafkaFlushExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.produce(c ->
{
c.partition(kafkaMergedFlushEx.partition());
c.key(kafkaMergedFlushEx.key());
})
.build();

doFlush(receiver, merged.routedId, merged.resolvedId, initialId, initialSeq, initialAck, initialMax,
traceId, merged.authorization, reserved, newKafkaFlushEx);

initialSeq += reserved;

assert initialAck <= initialSeq;
}

private void doProduceInitialEndIfNecessary(
long traceId)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,36 @@ public void shouldFetchMergedMessagesWithIsolationReadCommitted() throws Excepti
k3po.finish();
}

@Test
@Configuration("cache.yaml")
@Specification({
"${app}/merged.produce.flush/client",
"${app}/unmerged.produce.flush/server"})
public void shouldProduceMergedFlush() throws Exception
{
k3po.finish();
}

@Test
@Configuration("cache.yaml")
@Specification({
"${app}/merged.produce.flush.dynamic/client",
"${app}/unmerged.produce.flush.dynamic/server"})
public void shouldProduceMergedFlushDynamic() throws Exception
{
k3po.finish();
}

@Test
@Configuration("cache.yaml")
@Specification({
"${app}/merged.produce.flush.dynamic.hashed/client",
"${app}/unmerged.produce.flush.dynamic.hashed/server"})
public void shouldProduceMergedFlushDynamicHashed() throws Exception
{
k3po.finish();
}

@Test
@Configuration("cache.yaml")
@Specification({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -411,6 +411,14 @@ private void onFlush(
channel.readExtBuffer(FLUSH, false).writeBytes(flushExtCopy);
}

final int reservedBytes = flush.reserved();
if (reservedBytes > 0)
{
channel.readBytes(sequence, reservedBytes);
channel.acknowledgeBytes(reservedBytes);
sender.doWindow(channel);
}

fireInputAdvised(channel, ADVISORY_FLUSH);
}

Expand Down
Loading

0 comments on commit f23cc04

Please sign in to comment.