Skip to content

Commit

Permalink
Merge pull request #90 from jfallows/develop
Browse files Browse the repository at this point in the history
Scope topic partition leader info by both resolved binding and topic …
  • Loading branch information
jfallows committed Jul 12, 2022
2 parents 32069ce + e88c941 commit afaa9e3
Show file tree
Hide file tree
Showing 9 changed files with 42 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
Expand Down Expand Up @@ -225,7 +226,8 @@ public MessageConsumer newStream(
final KafkaFilterCondition condition = cursorFactory.asCondition(filters);
final long latestOffset = kafkaFetchBeginEx.partition().latestOffset();
final KafkaOffsetType maximumOffset = KafkaOffsetType.valueOf((byte) latestOffset);
final int leaderId = cacheRoute.leadersByPartitionId.get(partitionId);
final Int2IntHashMap leadersByPartitionId = cacheRoute.supplyLeadersByPartitionId(topicName);
final int leaderId = leadersByPartitionId.get(partitionId);

newStream = new KafkaCacheClientFetchStream(
fanout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.Long2ObjectHashMap;
import org.agrona.collections.MutableInteger;
import org.agrona.concurrent.UnsafeBuffer;
Expand Down Expand Up @@ -244,7 +245,8 @@ public MessageConsumer newStream(
fan = newFan;
}

final int leaderId = cacheRoute.leadersByPartitionId.get(partitionId);
final Int2IntHashMap leadersByPartitionId = cacheRoute.supplyLeadersByPartitionId(topicName);
final int leaderId = leadersByPartitionId.get(partitionId);
newStream = new KafkaCacheClientProduceStream(
fan,
sender,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -390,6 +390,7 @@ final class KafkaCacheMetaFanout
private final KafkaCacheTopic topic;
private final List<KafkaCacheMetaStream> members;
private final KafkaCacheRoute cacheRoute;
private final Int2IntHashMap leadersByPartitionId;

private long initialId;
private long replyId;
Expand Down Expand Up @@ -418,6 +419,7 @@ private KafkaCacheMetaFanout(
this.topic = topic;
this.members = new ArrayList<>();
this.cacheRoute = supplyCacheRoute.apply(routeId);
this.leadersByPartitionId = cacheRoute.supplyLeadersByPartitionId(topic.name());
}

private void onMetaFanoutMemberOpening(
Expand Down Expand Up @@ -445,7 +447,6 @@ private void onMetaFanoutMemberOpened(
long traceId,
KafkaCacheMetaStream member)
{
final Int2IntHashMap leadersByPartitionId = cacheRoute.leadersByPartitionId;
if (!leadersByPartitionId.isEmpty())
{
final KafkaDataExFW kafkaDataEx =
Expand Down Expand Up @@ -679,7 +680,6 @@ private void onMetaFanoutReplyData(
if (kafkaMetaDataEx != null)
{
final ArrayFW<KafkaPartitionFW> partitions = kafkaMetaDataEx.partitions();
final Int2IntHashMap leadersByPartitionId = cacheRoute.leadersByPartitionId;
leadersByPartitionId.clear();
partitions.forEach(p -> leadersByPartitionId.put(p.partitionId(), p.leaderId()));

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public final class KafkaCacheRoute
public final Long2ObjectHashMap<KafkaCacheClientProduceFan> clientProduceFansByTopicPartition;
public final Long2ObjectHashMap<KafkaCacheServerProduceFan> serverProduceFansByTopicPartition;
public final Long2ObjectHashMap<KafkaCacheClientBudget> clientBudgetsByTopic;
public final Int2IntHashMap leadersByPartitionId;
public final Int2ObjectHashMap<Int2IntHashMap> leadersByPartitionId;


public KafkaCacheRoute(
Expand All @@ -54,9 +54,16 @@ public KafkaCacheRoute(
this.clientProduceFansByTopicPartition = new Long2ObjectHashMap<>();
this.serverProduceFansByTopicPartition = new Long2ObjectHashMap<>();
this.clientBudgetsByTopic = new Long2ObjectHashMap<>();
this.leadersByPartitionId = new Int2IntHashMap(Integer.MIN_VALUE);
this.leadersByPartitionId = new Int2ObjectHashMap<>();
}

public Int2IntHashMap supplyLeadersByPartitionId(
String topic)
{
return leadersByPartitionId.computeIfAbsent(topicKey(topic), k -> new Int2IntHashMap(Integer.MIN_VALUE));
}


public int topicKey(
String topic)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@

import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
Expand Down Expand Up @@ -219,7 +220,8 @@ public MessageConsumer newStream(
fanout = newFanout;
}

final int leaderId = cacheRoute.leadersByPartitionId.get(partitionId);
final Int2IntHashMap leadersByPartitionId = cacheRoute.supplyLeadersByPartitionId(topicName);
final int leaderId = leadersByPartitionId.get(partitionId);

newStream = new KafkaCacheServerFetchStream(
fanout,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@

import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.concurrent.UnsafeBuffer;

import io.aklivity.zilla.runtime.binding.kafka.internal.KafkaBinding;
Expand Down Expand Up @@ -210,7 +211,8 @@ public MessageConsumer newStream(
fan = newFan;
}

final int leaderId = cacheRoute.leadersByPartitionId.get(partitionId);
final Int2IntHashMap leadersByPartitionId = cacheRoute.supplyLeadersByPartitionId(topicName);
final int leaderId = leadersByPartitionId.get(partitionId);
final String cacheName = String.format("%s.%s", supplyNamespace.apply(routeId), supplyLocalName.apply(routeId));
final KafkaCache cache = supplyCache.apply(cacheName);
final KafkaCacheTopic topic = cache.supplyTopic(topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.agrona.DirectBuffer;
import org.agrona.MutableDirectBuffer;
import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.LongLongConsumer;
import org.agrona.concurrent.UnsafeBuffer;

Expand Down Expand Up @@ -1728,7 +1729,7 @@ private void onApplicationBegin(

state = KafkaState.openingInitial(state);

if (clientRoute.partitions.get(client.partitionId) != leaderId)
if (client.topicPartitions.get(client.partitionId) != leaderId)
{
client.network = MessageConsumer.NOOP;
cleanupApplication(traceId, ERROR_NOT_LEADER_FOR_PARTITION);
Expand Down Expand Up @@ -1983,6 +1984,7 @@ private final class KafkaFetchClient extends KafkaSaslClient
private MessageConsumer network;
private final KafkaFetchStream stream;
private final String topic;
private final Int2IntHashMap topicPartitions;
private final int partitionId;

private long nextOffset;
Expand Down Expand Up @@ -2041,6 +2043,7 @@ private final class KafkaFetchClient extends KafkaSaslClient
super(sasl, routeId);
this.stream = KafkaFetchStream.this;
this.topic = requireNonNull(topic);
this.topicPartitions = clientRoute.supplyPartitions(topic);
this.partitionId = partitionId;
this.nextOffset = initialOffset;
this.latestOffset = latestOffset;
Expand Down Expand Up @@ -2909,7 +2912,7 @@ private void onDecodeFetchResponse(
{
nextResponseId++;

if (clientRoute.partitions.get(partitionId) == leaderId)
if (topicPartitions.get(partitionId) == leaderId)
{
signaler.signalNow(routeId, initialId, SIGNAL_NEXT_REQUEST, 0);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -811,8 +811,8 @@ private final class KafkaMetaStream
this.initialId = initialId;
this.replyId = supplyReplyId.applyAsLong(initialId);
this.affinity = affinity;
this.client = new KafkaMetaClient(resolvedId, topic, sasl);
this.clientRoute = supplyClientRoute.apply(resolvedId);
this.client = new KafkaMetaClient(resolvedId, topic, sasl);
}

private void onApplication(
Expand Down Expand Up @@ -1086,6 +1086,7 @@ private final class KafkaMetaClient extends KafkaSaslClient

private MessageConsumer network;
private final String topic;
private final Int2IntHashMap topicPartitions;

private final Long2ObjectHashMap<KafkaBrokerInfo> newBrokers;
private final Int2IntHashMap newPartitions;
Expand Down Expand Up @@ -1130,6 +1131,7 @@ private final class KafkaMetaClient extends KafkaSaslClient
{
super(sasl, routeId);
this.topic = requireNonNull(topic);
this.topicPartitions = clientRoute.supplyPartitions(topic);
this.newBrokers = new Long2ObjectHashMap<>();
this.newPartitions = new Int2IntHashMap(-1);

Expand Down Expand Up @@ -1748,7 +1750,7 @@ private void onDecodeMetaResponse(
doApplicationBeginIfNecessary(traceId, authorization, topic);

// TODO: share partitions across cores
final Int2IntHashMap sharedPartitions = clientRoute.partitions;
final Int2IntHashMap sharedPartitions = topicPartitions;
if (!sharedPartitions.equals(newPartitions))
{
sharedPartitions.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,14 @@
package io.aklivity.zilla.runtime.binding.kafka.internal.stream;

import org.agrona.collections.Int2IntHashMap;
import org.agrona.collections.Int2ObjectHashMap;
import org.agrona.collections.Long2ObjectHashMap;

public final class KafkaClientRoute
{
public final long routeId;
public final Long2ObjectHashMap<KafkaBrokerInfo> brokers;
public final Int2IntHashMap partitions;
public final Int2ObjectHashMap<Int2IntHashMap> partitions;

public volatile long metaInitialId;

Expand All @@ -31,6 +32,13 @@ public KafkaClientRoute(
{
this.routeId = routeId;
this.brokers = new Long2ObjectHashMap<>();
this.partitions = new Int2IntHashMap(-1);
this.partitions = new Int2ObjectHashMap<>();
}

public Int2IntHashMap supplyPartitions(
String topic)
{
int topicKey = System.identityHashCode(topic.intern());
return partitions.computeIfAbsent(topicKey, k -> new Int2IntHashMap(-1));
}
}

0 comments on commit afaa9e3

Please sign in to comment.