Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support eager evaluation of all Kafka filters #212

Merged
merged 10 commits into from
Apr 26, 2023

Conversation

ankitk-me
Copy link
Contributor

@ankitk-me ankitk-me commented Apr 19, 2023

Fix #209

final KafkaMergedDataExFW mergedDataEx)
{
return filters == null || filters == mergedDataEx.filters();
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Need to call matchFilters from match above.

.header("name", "value")
.build()
.build()
.build();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use further indentation to show nested .build() objects.

@@ -962,6 +963,93 @@ public String toString()
}
}

private static final class EagerOr extends KafkaFilterCondition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As discussed, suggest creating a superclass Or and subsclasses LazyOr and EagerOr for improved readability.

Only the .test(...) method implementation will differ between LazyOr and EagerOr, so everything else can be implemented in the Or superclass.

@@ -36,6 +36,7 @@ scope internal
kafka::KafkaHeader[] trailers;
uint32 paddingLen;
octets[paddingLen] padding;
int64 filters;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This change needs to be reverted as KafkaCacheEntry is shared across multiple readers, whereas filters mask applies to each individual cache client stream.

@@ -440,6 +441,7 @@ public void shouldGenerateMergedDataExtension()
.typeId(0x01)
.merged()
.timestamp(12345678L)
.filters(0L)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The example should probably show a non-zero filter mask, because filter mask zero means the message didn't match and therefore wouldn't be delivered.

@@ -219,6 +226,7 @@ scope kafka
{
int32 deferred = 0; // INIT only (TODO: move to DATA frame)
int64 timestamp = 0; // INIT only
int64 filters = 0; // INIT only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest we default to -1 instead, as filters mask 0 means no match.

@@ -269,6 +278,7 @@ scope kafka
int64 timestamp = 0; // INIT only
int32 headersSizeMax = 4; // INIT only
int64 producerId = -1; // INIT only
int64 filters = 0; // INIT only
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggest we default to -1 instead, as filters mask 0 means no match.

@ankitk-me ankitk-me marked this pull request as ready for review April 20, 2023 20:53
KafkaCacheEntryFW cacheEntry)
{
return cacheEntry != null;
return cacheEntry != null ? 1L : 0L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need to be -1 to generate the default, non-zero value for None filter?

@@ -876,11 +881,11 @@ public String toString()
}
}

private static final class Or extends KafkaFilterCondition
private static class Or extends KafkaFilterCondition
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's mark this abstract too.

public long test(KafkaCacheEntryFW cacheEntry)
{
return 0L;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove method, now that class is abstract.


@Override
public long test(
KafkaCacheEntryFW cacheEntry)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Remove extra indentation.

if (result != 0L)
{
accept |= i + 1;
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's rework this code so that it can simplify to accept |= condition.test(cacheEntry);.

That requires the condition to return the relevant bitmask for the filter index it is on, not just 0 or 1.

Therefore, when each filter condition is created for non-Or conditions, it needs to know the filter index, so it can compute 1 << index as bitmask to return when .test(...) is successful, instead of 1.

}

private static KafkaFilterCondition.Key initNullKeyInfo(
long mask,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before adding the mask, we used this as a shared instance for all null key conditions, but now we need the mask to be correct per client, so the shared part should shift to the nullKeyRO and we now need to create the mask-specific Key instance.

{
nextEntry = null;
}

filters = nextEntry != null ? condition.test(nextEntry) : 0L;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please call condition.test(nextEntry) just once.

@@ -71,7 +73,7 @@ public KafkaCacheCursorFactory(
{
this.writeBuffer = writeBuffer;
this.checksum = new CRC32C();
this.nullKeyInfo = initNullKeyInfo(checksum);
this.nullKeyRO = initNullKeyRO();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We were previously using this approach to avoid copying the null key value for each client filter.
I don't see where we are using nullKeyRO after the changes.

@ankitk-me ankitk-me self-assigned this Apr 25, 2023
@@ -71,7 +72,6 @@ public KafkaCacheCursorFactory(
{
this.writeBuffer = writeBuffer;
this.checksum = new CRC32C();
this.nullKeyInfo = initNullKeyInfo(checksum);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you might have misinterpreted the feedback. 🙂

I was saying that we want to preserve the original optimization of not copying null keys all the time, just the approach is slightly different now because we cannot pre-create the entire filter condition.


return value == null ?
new KafkaFilterCondition.Key(mask, checksum, nullKeyRO) : new KafkaFilterCondition.Key(mask, checksum, key);

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is closer, but we still make the copy of nullKeyRO each time we construct a new KafkaFilterCondition.Key.

Follow the constructor hierarchy up through super to see the copy in Equals.

@jfallows jfallows merged commit b928955 into aklivity:develop Apr 26, 2023
@ankitk-me ankitk-me deleted the eagerEvaluation branch May 26, 2023 10:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Support eager evaluation of all kafka filters and indicate which filters matched
2 participants