Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement the data structure and partial commands of the Redis stream #745

Merged
merged 16 commits into from
Aug 17, 2022

Conversation

torwig
Copy link
Contributor

@torwig torwig commented Jul 21, 2022

Available commands:

  • XADD
  • XDEL
  • XINFO STREAM
  • XLEN
  • XRANGE
  • XREAD
  • XREVRANGE
  • XTRIM

Design notes
Link to the Redis data type description: https://redis.io/docs/manual/data-types/streams/
A stream is a sequence of entries.
Each entry has a unique ID in the form "1-2" where two 64-bit numbers are divided by a hyphen.
By default, the first number is set to a millisecond timestamp, and the second one is a so-called sequence number - for cases when more than one entry was added at the same millisecond.
The value of an entry is a set of key-value pairs.
In case of the command:
XADD s1 1-0 key1 val1 key2 val2
the ID is 1-0 and the value is key1 val1 key2 val2.
In RocksDB entries are represented as key-value pairs where the key is formed as:
key | version | entry-ID-milliseconds-value | entry-ID-sequence-number-value
and the value is encoded as:
key1-length(fixed-32) | key1 | val1-length(fixed-32) | val1 | key2-length(fixed-32) | key2 | val2-length(fixed-32) | val2.
Thanks to the structure of a key, all entries in a stream are sorted in chronological order.
As for value decoding: this is the first idea that came to my mind and maybe it's not very efficient because has an overhead (4 bytes on every argument).
Why did I introduce such a weird encoding scheme? Because if you are reading entries, Redis responds not with a single string:

1) 1) "s1"
   2) 1) 1) "1-0"
         2) 1) "key1"
            2) "val1"
            3) "key2"
            4) "val2"

Perhaps, command args can be joined with a ' '(space) into a single string and this string should be saved in RocksDB? After reading, it will be split while constructing the reponse. With this encoding scheme, I was thinking about the possible spaces inside arguments and how to deal with them?

Differences from Redis

  1. XTRIM and XADD with trim possibility: nearly exact trimming (via ~) is not possible due to implementation details (no radix tree here). However, LIMIT option is working while in Redis it is allowed only in combination with ~. LIMIT can be disallowed to be consistent with Redis protocol. I didn't do that because I want to hear opinions from kvrocks maintainers.

Replication is not implemented yet. Basically, I didn't test streams in a cluster configuration. Perhaps, the plain XREAD on a replica will work, but blocking XREAD that unblocks after XADD on the master - I'm sure that some code should be written. It would be greatly appreciated if maintainers provide me with some hints about how to implement this.

Consumer groups are not implemented. I'm thinking about the possible implementation.

Right now I'm looking for any maintainers' feedback from the adding-new-data-type perspective (maybe, I didn't add a new column family to some filter/checker/extractor, etc.) and information about proper replicating a stream from master to other nodes.

This closes #532

@caipengbo
Copy link
Contributor

Can you add some design documents so that people can understand your design ideas :)

@git-hulk
Copy link
Member

Thanks to @torwig great contribution.

For the replication part, Kvrocks will replicate the data after writing into db, so we needn't to do anything if it has no special case. For the new column family, I prefer to keeping less column family and feel free to add if we MUST.

@torwig
Copy link
Contributor Author

torwig commented Jul 22, 2022

@caipengbo Yes, sure, I will :)

@torwig
Copy link
Contributor Author

torwig commented Jul 22, 2022

@git-hulk I mean that if I run
XREAD BLOCK 0 STREAMS s1 $ on a replica (it will block)
and then execute
XADD s1 * k1 v1 on master
the XREAD will be unblocked and return the added entry.
It's very similar to publish-subscribe - there is a fan-out.
Does it require some additional code in processing replicated data?

@git-hulk
Copy link
Member

@git-hulk I mean that if I run XREAD BLOCK 0 STREAMS s1 $ on a replica (it will block) and then execute XADD s1 * k1 v1 on master the XREAD will be unblocked and return the added entry. It's very similar to publish-subscribe - there is a fan-out. Does it require some additional code in processing replicated data?

Yes, you are right. Kvrocks default replication only sends the data to replicas, we need to recognize those special write batch and wake up the waiting clients.

@torwig
Copy link
Contributor Author

torwig commented Jul 23, 2022

@caipengbo Added design notes.

@git-hulk
Copy link
Member

@caipengbo Added design notes.

Many thanks to @torwig detail explanation and bring this good feature into Kvrocks community, we will take a look recently.

@git-hulk
Copy link
Member

I took a look briefly and overall design is good to me. There are two small issues in design internal:

  1. I think we should save the entries number as well in the encoded value instead of depending on '\0', we should allow using '\0' in the string.
  2. We need to create the column family for stream in storage::open if wants to use the new column family, or it will fallback to the default column family.

@torwig
Copy link
Contributor Author

torwig commented Jul 26, 2022

I took a look briefly and overall design is good to me. There are two small issues in design internal:

  1. I think we should save the entries number as well in the encoded value instead of depending on '\0', we should allow using '\0' in the string.
  2. We need to create the column family for stream in storage::open if wants to use the new column family, or it will fallback to the default column family.

Thank you for your review. I'll fix the issues.
Just one question for now: what do you mean by depending on '\0'? Currently, the encoded value consists of:
first-string-len | first-string | second-string-len | second string
as implemented here: https://github.com/apache/incubator-kvrocks/pull/745/files#diff-eee0baf52c379afb7e25739db14ec62c603d3c8f4c7f46af24e019319b2bb4beR135

@git-hulk
Copy link
Member

I took a look briefly and overall design is good to me. There are two small issues in design internal:

  1. I think we should save the entries number as well in the encoded value instead of depending on '\0', we should allow using '\0' in the string.
  2. We need to create the column family for stream in storage::open if wants to use the new column family, or it will fallback to the default column family.

Thank you for your review. I'll fix the issues. Just one question for now: what do you mean by depending on '\0'? Currently, the encoded value consists of: first-string-len | first-string | second-string-len | second string as implemented here: https://github.com/apache/incubator-kvrocks/pull/745/files#diff-eee0baf52c379afb7e25739db14ec62c603d3c8f4c7f46af24e019319b2bb4beR135

Yes, I mean we can also save the number of encoded value. As this example, the encoded value can be 2(4bytes)|first-string-len | first-string | second-string-len | second string which we can count the number of strings directly instead of depending of end of string. How do you think about this case?

@torwig
Copy link
Contributor Author

torwig commented Jul 27, 2022

@git-hulk Fixed possible crashes via access to command non-existing command arguments by index. Created column family for streams. I will use it also for replication purposes, to unblock blocked readers.

@torwig
Copy link
Contributor Author

torwig commented Jul 27, 2022

I took a look briefly and overall design is good to me. There are two small issues in design internal:

  1. I think we should save the entries number as well in the encoded value instead of depending on '\0', we should allow using '\0' in the string.
  2. We need to create the column family for stream in storage::open if wants to use the new column family, or it will fallback to the default column family.

Thank you for your review. I'll fix the issues. Just one question for now: what do you mean by depending on '\0'? Currently, the encoded value consists of: first-string-len | first-string | second-string-len | second string as implemented here: https://github.com/apache/incubator-kvrocks/pull/745/files#diff-eee0baf52c379afb7e25739db14ec62c603d3c8f4c7f46af24e019319b2bb4beR135

Yes, I mean we can also save the number of encoded value. As this example, the encoded value can be 2(4bytes)|first-string-len | first-string | second-string-len | second string which we can count the number of strings directly instead of depending of end of string. How do you think about this case?

I've got your idea about writing the number of strings to the encoded value. However, I didn't understand how this information can help us. I mean, if we know that there are 2 strings encoded, how this number can work in the decoding process? Could you please provide me with any example?
Currently, we read length len1, the read exactly len1 bytes as a string, then read len2, then read exactly len2 bytes as a string.

@git-hulk
Copy link
Member

I took a look briefly and overall design is good to me. There are two small issues in design internal:

  1. I think we should save the entries number as well in the encoded value instead of depending on '\0', we should allow using '\0' in the string.
  2. We need to create the column family for stream in storage::open if wants to use the new column family, or it will fallback to the default column family.

Thank you for your review. I'll fix the issues. Just one question for now: what do you mean by depending on '\0'? Currently, the encoded value consists of: first-string-len | first-string | second-string-len | second string as implemented here: https://github.com/apache/incubator-kvrocks/pull/745/files#diff-eee0baf52c379afb7e25739db14ec62c603d3c8f4c7f46af24e019319b2bb4beR135

Yes, I mean we can also save the number of encoded value. As this example, the encoded value can be 2(4bytes)|first-string-len | first-string | second-string-len | second string which we can count the number of strings directly instead of depending of end of string. How do you think about this case?

I've got your idea about writing the number of strings to the encoded value. However, I didn't understand how this information can help us. I mean, if we know that there are 2 strings encoded, how this number can work in the decoding process? Could you please provide me with any example? Currently, we read length len1, the read exactly len1 bytes as a string, then read len2, then read exactly len2 bytes as a string.

I rethink about this point, current implementation is ok since C++ string didn't depend on '\0' at all, so use the string length to determine where's value end is ok. Sorry that I misunderstand the implementation. Just disregard it.

@torwig
Copy link
Contributor Author

torwig commented Aug 2, 2022

@git-hulk Added batch handling on replicas to unblock clients that were blocked by the XREAD command by the respective XADD.

Copy link
Member

@PragmaTwice PragmaTwice left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such a great work! Not familiar with redis stream, just comment for some little code smell.

src/redis_stream_base.h Outdated Show resolved Hide resolved
src/redis_stream_base.h Outdated Show resolved Hide resolved
src/redis_stream_base.h Outdated Show resolved Hide resolved
@git-hulk git-hulk requested review from PragmaTwice and removed request for git-hulk August 7, 2022 09:19
@git-hulk
Copy link
Member

git-hulk commented Aug 7, 2022

@git-hulk Added batch handling on replicas to unblock clients that were blocked by the XREAD command by the respective XADD.

Thanks for your great contribution, I will try and review the PR in a few days.

@torwig
Copy link
Contributor Author

torwig commented Aug 8, 2022

@git-hulk I've removed the LIMIT option from trim options to be compliant with Redis protocol.
By default, Redis uses exact trimming (or if = is specified). And in this case, the LIMIT option can't be used.
The LIMIT option can be used only with nearly exact trimming (if ~ is specified instead of =), but nearly exact trimming is bound to Redis implementation of a stream (there are nodes with elements), so it can't be applied with RocksDB.

Also, I will resolve some conflicts with the unstable branch in a few minutes.

git-hulk
git-hulk previously approved these changes Aug 14, 2022
@Alfejik Alfejik self-assigned this Aug 15, 2022
@Alfejik Alfejik added the enhancement type enhancement label Aug 15, 2022
@git-hulk git-hulk added feature type new feature release notes labels Aug 16, 2022
@git-hulk
Copy link
Member

@torwig Can you help to resolve the conflict?

@torwig torwig dismissed stale reviews from git-hulk, PragmaTwice, and caipengbo via 1572f71 August 16, 2022 06:59
@torwig
Copy link
Contributor Author

torwig commented Aug 16, 2022

@git-hulk Done

@git-hulk
Copy link
Member

git-hulk commented Aug 16, 2022

Thanks @torwig

We can merge this PR after one of @Alfejik @ShooterIT approved since the new commit only resolved command number conflict, so I think the previous approves are still valid.

Of course, guys can approve it again if you're free. cc @caipengbo @PragmaTwice

@tisonkun
Copy link
Member

I don't have time to go through this patch. Although, it seems this patch looks good to two reviewers. If we accept such a new feature, remember to track updates on our doc site :)

@git-hulk
Copy link
Member

I don't have time to go through this patch. Although, it seems this patch looks good to two reviewers. If we accept such a new feature, remember to track updates on our doc site :)

Thanks to @Tison warm remind. Will merge this PR if have no further feedback tomorrow, then update the doc site after merging.

@git-hulk
Copy link
Member

git-hulk commented Aug 17, 2022

Hello, thanks to @torwig brings this awesome feature for Kvrocks community and everyone who reviewed this PR. I will summary and merge this PR and welcome to create a new thread to further discussion. Cheers!!!

@git-hulk git-hulk merged commit 73092dc into apache:unstable Aug 17, 2022
@git-hulk
Copy link
Member

Thanks @torwig and @aleksraiden again.

@git-hulk git-hulk changed the title Add streams Implement the data structure and partial commands of the Redis stream Aug 17, 2022
@git-hulk git-hulk added major decision Requires project management committee consensus and removed enhancement type enhancement labels Aug 17, 2022
@ShooterIT
Copy link
Member

as we know, prefix bloom filter can improve the performance of range read, but current encoding make it hard to use since we can't know key length and how to set prefix length. One PR still is in draft #508

key | version | entry-ID-milliseconds-value | entry-ID-sequence-number-value

for the entry encoding, i think we can put version at the front, so that prefix bloom filter will be useful.

furthermore, maybe we should redesign the encoding for all complex data structure for better range read performance. Of course, we should keep compatibility, such as, adding an encoding version to distinguish different version for decoding, this way also allow us implement new features easily.

key1-length(fixed-32) | key1 | val1-length(fixed-32) | val1 | key2-length(fixed-32) | key2 | val2-length(fixed-32) | val2.

i think we use variable length encoding, such as EncodeVarint32 in leveldb/rocksdb, to save disk space.

@git-hulk
Copy link
Member

for the entry encoding, i think we can put version at the front, so that prefix bloom filter will be useful.

I think we can change all complex data structures at the next major release to make the prefix bloom filter more effective.

ShooterIT pushed a commit that referenced this pull request Aug 27, 2022
…791)

Changes PutFixed32 to PutVarint32 and GetFixed32 to GetVarint32,
this way could effectively save space usage for small entries.

Obviously, as ##745, this change makes it impossible to decode entries
if entries were encoded with PutFixed32 (an error will be returned).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
feature type new feature major decision Requires project management committee consensus release notes
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[QUESTION] Any plan to support Redis Stream?
7 participants