Skip to content

Commit

Permalink
Add Iterator and Append method for WriteBatch (#177)
Browse files Browse the repository at this point in the history
I want to use format of rocksdb::WriteBatch to encode key-value pairs of TiKV, and I need an more effective method to copy data from Entry to WriteBatch directly so that I could avoid CPU cost of decode.

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
  • Loading branch information
Little-Wallace committed Aug 18, 2020
1 parent 1a540da commit 9b54926
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 0 deletions.
36 changes: 36 additions & 0 deletions db/write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2040,6 +2040,16 @@ Status WriteBatchInternal::SetContents(WriteBatch* b, const Slice& contents) {
return Status::OK();
}

Status WriteBatchInternal::AppendContents(WriteBatch* dst,
const Slice& content) {
size_t src_len = content.size() - WriteBatchInternal::kHeader;
SetCount(dst, Count(dst) + DecodeFixed32(content.data() + 8));
assert(content.size() >= WriteBatchInternal::kHeader);
dst->rep_.append(content.data() + WriteBatchInternal::kHeader, src_len);
dst->content_flags_.store(ContentFlags::DEFERRED, std::memory_order_relaxed);
return Status::OK();
}

Status WriteBatchInternal::Append(WriteBatch* dst, const WriteBatch* src,
const bool wal_only) {
size_t src_len;
Expand Down Expand Up @@ -2076,4 +2086,30 @@ size_t WriteBatchInternal::AppendedByteSize(size_t leftByteSize,
}
}

void WriteBatch::Iterator::SeekToFirst() {
input_ = rep_;
if (input_.size() < WriteBatchInternal::kHeader) {
valid_ = false;
return;
}
input_.remove_prefix(WriteBatchInternal::kHeader);
valid_ = true;
Next();
}

void WriteBatch::Iterator::Next() {
if (input_.empty() || !valid_) {
valid_ = false;
return;
}
Slice blob, xid;
Status s = ReadRecordFromWriteBatch(&input_, &tag_, &column_family_, &key_,
&value_, &blob, &xid);
valid_ = s.ok();
}

int WriteBatch::WriteBatchRef::Count() const {
return DecodeFixed32(rep_.data() + 8);
}

} // namespace rocksdb
2 changes: 2 additions & 0 deletions db/write_batch_internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ class WriteBatchInternal {
static Status Append(WriteBatch* dst, const WriteBatch* src,
const bool WAL_only = false);

static Status AppendContents(WriteBatch* dst, const Slice& content);

// Returns the byte size of appending a WriteBatch with ByteSize
// leftByteSize and a WriteBatch with ByteSize rightByteSize
static size_t AppendedByteSize(size_t leftByteSize, size_t rightByteSize);
Expand Down
40 changes: 40 additions & 0 deletions include/rocksdb/write_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -276,6 +276,8 @@ class WriteBatch : public WriteBatchBase {
virtual bool WriteBeforePrepare() const { return false; }
};
Status Iterate(Handler* handler) const;
class Iterator;
Iterator* NewIterator() const { return new Iterator(rep_); }

// Retrieve the serialized version of this batch.
const std::string& Data() const { return rep_; }
Expand Down Expand Up @@ -372,6 +374,44 @@ class WriteBatch : public WriteBatchBase {
const size_t timestamp_size_;

// Intentionally copyable
public:
class Iterator {
private:
Slice rep_;
Slice input_;
Slice key_;
Slice value_;
uint32_t column_family_;
char tag_;
bool valid_;

public:
explicit Iterator(const Slice& rep) : rep_(rep), valid_(false) {}

bool Valid() const { return valid_; }

Slice Key() const { return key_; }

Slice Value() const { return value_; }

uint32_t GetColumnFamilyId() const { return column_family_; }

char GetValueType() const { return tag_; };

void SeekToFirst();

void Next();
};
class WriteBatchRef {
public:
explicit WriteBatchRef(const Slice& rep) : rep_(rep) {}
Iterator* NewIterator() const { return new Iterator(rep_); }

int Count() const;

private:
const Slice& rep_;
};
};

} // namespace rocksdb

0 comments on commit 9b54926

Please sign in to comment.