Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support eager evaluation of all Kafka filters #212

Merged
merged 10 commits into from
Apr 26, 2023
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.types.ArrayFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaType;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaEvaluation;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaFilterFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaIsolation;
Expand Down Expand Up @@ -196,6 +197,7 @@ public MessageConsumer newStream(
final String16FW beginTopic = kafkaFetchBeginEx.topic();
final KafkaOffsetFW progress = kafkaFetchBeginEx.partition();
final ArrayFW<KafkaFilterFW> filters = kafkaFetchBeginEx.filters();
final KafkaEvaluation evaluation = kafkaFetchBeginEx.evaluation().get();
final KafkaIsolation isolation = kafkaFetchBeginEx.isolation().get();
final KafkaDeltaType deltaType = kafkaFetchBeginEx.deltaType().get();
final String topicName = beginTopic.asString();
Expand Down Expand Up @@ -236,7 +238,7 @@ public MessageConsumer newStream(
fanout = newFanout;
}

final KafkaFilterCondition condition = cursorFactory.asCondition(filters);
final KafkaFilterCondition condition = cursorFactory.asCondition(filters, evaluation);
final long latestOffset = kafkaFetchBeginEx.partition().latestOffset();
final KafkaOffsetType maximumOffset = KafkaOffsetType.valueOf((byte) latestOffset);
final Int2IntHashMap leadersByPartitionId = cacheRoute.supplyLeadersByPartitionId(topicName);
Expand Down Expand Up @@ -1218,6 +1220,7 @@ private void doClientReplyData(

final long partitionOffset = nextEntry.offset$();
final long timestamp = nextEntry.timestamp();
final long filters = cursor.filters;
final long ownerId = nextEntry.ownerId();
final int entryFlags = nextEntry.flags();
final KafkaKeyFW key = nextEntry.key();
Expand Down Expand Up @@ -1289,11 +1292,11 @@ private void doClientReplyData(
switch (flags & ~FLAG_SKIP)
{
case FLAG_INIT | FLAG_FIN:
doClientReplyDataFull(traceId, timestamp, ownerId, key, headers, deltaType, ancestor, fragment,
doClientReplyDataFull(traceId, timestamp, ownerId, filters, key, headers, deltaType, ancestor, fragment,
reserved, flags, partitionId, partitionOffset, stableOffset, latestOffset);
break;
case FLAG_INIT:
doClientReplyDataInit(traceId, deferred, timestamp, ownerId, key, deltaType, ancestor, fragment,
doClientReplyDataInit(traceId, deferred, timestamp, ownerId, filters, key, deltaType, ancestor, fragment,
reserved, length, flags, partitionId, partitionOffset, stableOffset, latestOffset);
break;
case FLAG_NONE:
Expand Down Expand Up @@ -1322,6 +1325,7 @@ private void doClientReplyDataFull(
long traceId,
long timestamp,
long producerId,
long filters,
KafkaKeyFW key,
ArrayFW<KafkaHeaderFW> headers,
KafkaDeltaType deltaType,
Expand All @@ -1340,6 +1344,7 @@ private void doClientReplyDataFull(
.typeId(kafkaTypeId)
.fetch(f -> f.timestamp(timestamp)
.producerId(producerId)
.filters(filters)
.partition(p -> p.partitionId(partitionId)
.partitionOffset(partitionOffset)
.stableOffset(stableOffset)
Expand Down Expand Up @@ -1367,6 +1372,7 @@ private void doClientReplyDataInit(
int deferred,
long timestamp,
long producerId,
long filters,
KafkaKeyFW key,
KafkaDeltaType deltaType,
long ancestorOffset,
Expand All @@ -1386,6 +1392,7 @@ private void doClientReplyDataInit(
.fetch(f -> f.deferred(deferred)
.timestamp(timestamp)
.producerId(producerId)
.filters(filters)
.partition(p -> p.partitionId(partitionId)
.partitionOffset(partitionOffset)
.stableOffset(stableOffset)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaAckMode;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaType;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaEvaluation;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaFilterFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaKeyFW;
Expand Down Expand Up @@ -532,7 +533,10 @@ private KafkaCacheClientProduceFan(
this.topicName = topicName;
this.members = new Long2ObjectHashMap<>();
this.defaultOffset = KafkaOffsetType.LIVE;
this.cursor = cursorFactory.newCursor(cursorFactory.asCondition(EMPTY_FILTER), KafkaDeltaType.NONE);
this.cursor = cursorFactory.newCursor(
cursorFactory
.asCondition(EMPTY_FILTER, KafkaEvaluation.LAZY),
KafkaDeltaType.NONE);

partition.newHeadIfNecessary(0L);

Expand Down Expand Up @@ -1158,7 +1162,10 @@ private final class KafkaCacheClientProduceStream
long leaderId,
long authorization)
{
this.cursor = cursorFactory.newCursor(cursorFactory.asCondition(EMPTY_FILTER), KafkaDeltaType.NONE);
this.cursor = cursorFactory.newCursor(
cursorFactory
.asCondition(EMPTY_FILTER, KafkaEvaluation.LAZY),
KafkaDeltaType.NONE);
this.entryMark = new MutableInteger(0);
this.position = new MutableInteger(0);
this.fan = fan;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.types.Flyweight;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaAckMode;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaType;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaEvaluation;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaFilterFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaKeyFW;
Expand Down Expand Up @@ -1023,7 +1024,10 @@ private final class KafkaCacheServerProduceStream
KafkaCachePartition partition)
{
this.partition = partition;
this.cursor = cursorFactory.newCursor(cursorFactory.asCondition(EMPTY_FILTER), KafkaDeltaType.NONE);
this.cursor = cursorFactory.newCursor(
cursorFactory
.asCondition(EMPTY_FILTER, KafkaEvaluation.LAZY),
KafkaDeltaType.NONE);
this.fan = fan;
this.sender = sender;
this.originId = originId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaConfigFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaDeltaType;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaEvaluationFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaFilterFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaHeaderFW;
import io.aklivity.zilla.runtime.binding.kafka.internal.types.KafkaHeadersFW;
Expand Down Expand Up @@ -1013,6 +1014,7 @@ private final class KafkaMergedStream

private KafkaOffsetType maximumOffset;
private List<KafkaMergedFilter> filters;
private KafkaEvaluationFW evaluation;

private int state;
private KafkaCapabilities capabilities;
Expand Down Expand Up @@ -1144,6 +1146,7 @@ private void onMergedInitialBegin(

this.maximumOffset = asMaximumOffset(mergedBeginEx.partitions());
this.filters = asMergedFilters(filters);
this.evaluation = mergedBeginEx.evaluation();

describeStream.doDescribeInitialBegin(traceId);
}
Expand Down Expand Up @@ -1494,13 +1497,15 @@ private void doMergedReplyData(
final KafkaKeyFW key = kafkaFetchDataEx.key();
final ArrayFW<KafkaHeaderFW> headers = kafkaFetchDataEx.headers();
final KafkaDeltaFW delta = kafkaFetchDataEx.delta();
final long filters = kafkaFetchDataEx.filters();

nextOffsetsById.put(partitionId, partitionOffset + 1);

newKafkaDataEx = kafkaDataExRW.wrap(extBuffer, 0, extBuffer.capacity())
.typeId(kafkaTypeId)
.merged(f -> f.deferred(deferred)
.timestamp(timestamp)
.filters(filters)
.partition(p -> p.partitionId(partitionId)
.partitionOffset(partitionOffset)
.latestOffset(latestOffset))
Expand Down Expand Up @@ -2553,6 +2558,7 @@ private void doFetchInitialBegin(
.partitionOffset(partitionOffset)
.latestOffset(merged.maximumOffset.value()))
.filters(fs -> merged.filters.forEach(mf -> fs.item(i -> setFetchFilter(i, mf))))
.evaluation(merged.evaluation)
.isolation(i -> i.set(merged.isolation))
.deltaType(t -> t.set(merged.deltaType)))
.build()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -554,6 +554,21 @@ public void shouldReceiveMessagesWithHeaderOrHeaderFilter() throws Exception
k3po.finish();
}

@Test
@Configuration("cache.yaml")
@Specification({
"${app}/filter.header.or.header.eager/client",
"${app}/filter.none/server"})
@ScriptProperty("serverAddress \"zilla://streams/app1\"")
public void shouldReceiveMessagesWithHeaderOrHeaderEagerFilter() throws Exception
{
partition.append(1L);
k3po.start();
k3po.awaitBarrier("RECEIVED_MESSAGE_2");
k3po.notifyBarrier("SEND_MESSAGE_3");
k3po.finish();
}

@Test
@Configuration("cache.yaml")
@Specification({
Expand Down Expand Up @@ -710,6 +725,21 @@ public void shouldReceiveMessagesWithHeadersManyFilter() throws Exception
k3po.finish();
}

@Test
@Configuration("cache.yaml")
@Specification({
"${app}/filter.headers.many.eager/client",
"${app}/filter.none/server"})
@ScriptProperty("serverAddress \"zilla://streams/app1\"")
public void shouldReceiveMessagesWithHeadersManyEagerFilter() throws Exception
{
partition.append(1L);
k3po.start();
k3po.awaitBarrier("RECEIVED_MESSAGE_2");
k3po.notifyBarrier("SEND_MESSAGE_3");
k3po.finish();
}

@Test
@Configuration("cache.yaml")
@Specification({
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,17 @@ public void shouldFetchMergedMessagesWithKeyOrHeaderFilter() throws Exception
k3po.finish();
}

@Ignore("requires k3po parallel reads")
@Test
@Configuration("cache.options.merged.yaml")
@Specification({
"${app}/merged.fetch.filter.key.or.header.eager/client",
"${app}/unmerged.fetch.filter.none/server"})
public void shouldFetchMergedMessagesWithKeyOrHeaderEagerFilter() throws Exception
{
k3po.finish();
}

@Ignore("GitHub Actions")
@Test
@Configuration("cache.options.merged.yaml")
Expand Down
Loading