Skip to content

Commit

Permalink
Implement pipelined commit (#267) (#284)
Browse files Browse the repository at this point in the history
* add multibatch write into memtable (#131)

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>

Support write multiple WriteBatch into RocksDB together. These WriteBatch will be assigned sequence number in order and pushed into queue. If a thread is waiting for some state, it could steal some job from work queue.

Signed-off-by: tabokie <xy.tao@outlook.com>

* Improve Multi Batch Write (#154)

* perform like normal pipelined write

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
Signed-off-by: tabokie <xy.tao@outlook.com>

* pass enable_multi_thread_write to BuildDBOptions (#170)

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
Signed-off-by: tabokie <xy.tao@outlook.com>

* Fix life time of `memtable_write_group` (#171)

* fix life time of memtable_write_group

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
Signed-off-by: tabokie <xy.tao@outlook.com>

* Commit pipeline when write a WriteBatch for linearizability (#267)

* support commit pipeline

Signed-off-by: Little-Wallace <bupt2013211450@gmail.com>
Signed-off-by: tabokie <xy.tao@outlook.com>

* format

Signed-off-by: tabokie <xy.tao@outlook.com>

* remove useless code

Signed-off-by: tabokie <xy.tao@outlook.com>

Co-authored-by: Wallace <bupt2013211450@gmail.com>
  • Loading branch information
tabokie and Little-Wallace committed May 12, 2022
1 parent 5ba6eec commit d3e4138
Show file tree
Hide file tree
Showing 19 changed files with 333 additions and 5 deletions.
6 changes: 6 additions & 0 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1396,6 +1396,12 @@ Status ColumnFamilyData::ValidateOptions(
"FIFO compaction only supported with max_open_files = -1.");
}

if (db_options.enable_pipelined_commit &&
cf_options.max_successive_merges > 0) {
return Status::NotSupported(
"Multi thread write is only supported with no successive merges");
}

return s;
}

Expand Down
10 changes: 9 additions & 1 deletion db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1310,6 +1310,13 @@ class DBImpl : public DB {
size_t batch_cnt = 0,
PreReleaseCallback* pre_release_callback = nullptr);

Status PebbleWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
uint64_t* seq_used = nullptr);
void PebbleWriteCommit(CommitRequest* request);

Status PipelinedWriteImpl(const WriteOptions& options, WriteBatch* updates,
WriteCallback* callback = nullptr,
uint64_t* log_used = nullptr, uint64_t log_ref = 0,
Expand Down Expand Up @@ -1710,7 +1717,8 @@ class DBImpl : public DB {
mutex_.Lock();
}

if (!immutable_db_options_.unordered_write) {
if (!immutable_db_options_.unordered_write &&
!immutable_db_options_.enable_pipelined_commit) {
// Then the writes are finished before the next write group starts
return;
}
Expand Down
10 changes: 10 additions & 0 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,16 @@ DBOptions SanitizeOptions(const std::string& dbname, const DBOptions& src,
result.avoid_flush_during_recovery = false;
}

// multi thread write do not support two-write-que or write in 2PC
if (result.two_write_queues || result.allow_2pc) {
result.enable_pipelined_commit = false;
}

if (result.enable_pipelined_commit) {
result.enable_pipelined_write = false;
result.allow_concurrent_memtable_write = true;
}

#ifndef ROCKSDB_LITE
ImmutableDBOptions immutable_db_options(result);
if (!immutable_db_options.IsWalDirSameAsDBPath()) {
Expand Down
159 changes: 159 additions & 0 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,156 @@ Status DBImpl::WriteWithCallback(const WriteOptions& write_options,
}
#endif // ROCKSDB_LITE

void DBImpl::PebbleWriteCommit(CommitRequest* request) {
request->applied.store(true, std::memory_order_release);
write_thread_.ExitWaitSequenceCommit(request, &versions_->last_sequence_);
size_t pending_cnt = pending_memtable_writes_.fetch_sub(1) - 1;
if (pending_cnt == 0) {
// switch_cv_ waits until pending_memtable_writes_ = 0. Locking its mutex
// before notify ensures that cv is in waiting state when it is notified
// thus not missing the update to pending_memtable_writes_ even though it
// is not modified under the mutex.
std::lock_guard<std::mutex> lck(switch_mutex_);
switch_cv_.notify_all();
}
}

Status DBImpl::PebbleWriteImpl(const WriteOptions& write_options,
WriteBatch* my_batch, WriteCallback* callback,
uint64_t* log_used, uint64_t log_ref,
uint64_t* seq_used) {
PERF_TIMER_GUARD(write_pre_and_post_process_time);
StopWatch write_sw(immutable_db_options_.clock,
immutable_db_options_.statistics.get(), DB_WRITE);
CommitRequest request;
WriteThread::Writer writer(write_options, my_batch, callback, log_ref,
false /*disable_memtable*/);
writer.request = &request;
write_thread_.JoinBatchGroup(&writer);

WriteContext write_context;
if (writer.state == WriteThread::STATE_GROUP_LEADER) {
WriteThread::WriteGroup wal_write_group;
mutex_.Lock();
if (writer.callback && !writer.callback->AllowWriteBatching()) {
WaitForPendingWrites();
}
bool need_log_sync = !write_options.disableWAL && write_options.sync;
bool need_log_dir_sync = need_log_sync && !log_dir_synced_;
PERF_TIMER_STOP(write_pre_and_post_process_time);
writer.status =
PreprocessWrite(write_options, &need_log_sync, &write_context);
PERF_TIMER_START(write_pre_and_post_process_time);
log::Writer* log_writer = logs_.back().writer;
mutex_.Unlock();

// This can set non-OK status if callback fail.
last_batch_group_size_ =
write_thread_.EnterAsBatchGroupLeader(&writer, &wal_write_group);
const SequenceNumber current_sequence =
write_thread_.UpdateLastSequence(versions_->LastSequence()) + 1;
size_t total_count = 0;
size_t total_byte_size = 0;
auto stats = default_cf_internal_stats_;
size_t memtable_write_cnt = 0;
if (writer.status.ok()) {
SequenceNumber next_sequence = current_sequence;
for (auto w : wal_write_group) {
if (w->CheckCallback(this)) {
if (w->ShouldWriteToMemtable()) {
w->sequence = next_sequence;
size_t count = WriteBatchInternal::Count(w->batch);
if (count > 0) {
w->request->commit_lsn = next_sequence + count - 1;
write_thread_.EnterCommitQueue(w->request);
}
next_sequence += count;
total_count += count;
memtable_write_cnt++;
}
total_byte_size = WriteBatchInternal::AppendedByteSize(
total_byte_size, WriteBatchInternal::ByteSize(w->batch));
}
}
if (writer.disable_wal) {
has_unpersisted_data_.store(true, std::memory_order_relaxed);
}
write_thread_.UpdateLastSequence(current_sequence + total_count - 1);
stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);
stats->AddDBStats(InternalStats::kIntStatsBytesWritten, total_byte_size);
RecordTick(stats_, BYTES_WRITTEN, total_byte_size);
RecordInHistogram(stats_, BYTES_PER_WRITE, total_byte_size);

PERF_TIMER_STOP(write_pre_and_post_process_time);
if (!write_options.disableWAL) {
PERF_TIMER_GUARD(write_wal_time);
stats->AddDBStats(InternalStats::kIntStatsWriteDoneBySelf, 1);
RecordTick(stats_, WRITE_DONE_BY_SELF, 1);
if (wal_write_group.size > 1) {
stats->AddDBStats(InternalStats::kIntStatsWriteDoneByOther,
wal_write_group.size - 1);

RecordTick(stats_, WRITE_DONE_BY_OTHER, wal_write_group.size - 1);
}
writer.status =
WriteToWAL(wal_write_group, log_writer, log_used, need_log_sync,
need_log_dir_sync, current_sequence);
}
}
if (!writer.CallbackFailed()) {
WriteStatusCheck(writer.status);
}

if (need_log_sync) {
mutex_.Lock();
if (writer.status.ok()) {
writer.status = MarkLogsSynced(logfile_number_, need_log_dir_sync);
} else {
MarkLogsNotSynced(logfile_number_);
}
mutex_.Unlock();
}
if (writer.status.ok()) {
pending_memtable_writes_ += memtable_write_cnt;
}
write_thread_.ExitAsBatchGroupLeader(wal_write_group, writer.status);
}

if (seq_used != nullptr) {
*seq_used = writer.sequence;
}
TEST_SYNC_POINT("DBImpl::WriteImpl:CommitAfterWriteWAL");

if (writer.request->commit_lsn != 0 && writer.status.ok()) {
TEST_SYNC_POINT("DBImpl::WriteImpl:BeforePipelineWriteMemtable");
PERF_TIMER_GUARD(write_memtable_time);
size_t total_count = WriteBatchInternal::Count(my_batch);
InternalStats* stats = default_cf_internal_stats_;
stats->AddDBStats(InternalStats::kIntStatsNumKeysWritten, total_count);
RecordTick(stats_, NUMBER_KEYS_WRITTEN, total_count);

ColumnFamilyMemTablesImpl column_family_memtables(
versions_->GetColumnFamilySet());
writer.status = WriteBatchInternal::InsertInto(
&writer, writer.sequence, &column_family_memtables, &flush_scheduler_,
&trim_history_scheduler_, write_options.ignore_missing_column_families,
0 /*log_number*/, this, true /*concurrent_memtable_writes*/);

WriteStatusCheck(writer.status);

if (!writer.FinalStatus().ok()) {
writer.status = writer.FinalStatus();
}
PebbleWriteCommit(writer.request);
} else if (writer.request->commit_lsn != 0) {
PebbleWriteCommit(writer.request);
} else {
writer.request->applied.store(true, std::memory_order_release);
}
return writer.status;
}

// The main write queue. This is the only write queue that updates LastSequence.
// When using one write queue, the same sequence also indicates the last
// published sequence.
Expand Down Expand Up @@ -94,6 +244,10 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return Status::NotSupported(
"pipelined_writes is not compatible with concurrent prepares");
}
if (two_write_queues_ && immutable_db_options_.enable_pipelined_commit) {
return Status::NotSupported(
"pipelined_writes is not compatible with concurrent prepares");
}
if (seq_per_batch_ && immutable_db_options_.enable_pipelined_write) {
// TODO(yiwu): update pipeline write with seq_per_batch and batch_cnt
return Status::NotSupported(
Expand Down Expand Up @@ -153,6 +307,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,
return status;
}

if (immutable_db_options_.enable_pipelined_commit && !disable_memtable) {
return PebbleWriteImpl(write_options, my_batch, callback, log_used, log_ref,
seq_used);
}

if (immutable_db_options_.enable_pipelined_write) {
return PipelinedWriteImpl(write_options, my_batch, callback, log_used,
log_ref, disable_memtable, seq_used);
Expand Down
3 changes: 3 additions & 0 deletions db/db_properties_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,9 @@ TEST_F(DBPropertiesTest, Empty) {
options.write_buffer_size = 100000; // Small write buffer
options.allow_concurrent_memtable_write = false;
options = CurrentOptions(options);
if (options.enable_pipelined_commit) {
continue;
}
CreateAndReopenWithCF({"pikachu"}, options);

std::string num;
Expand Down
6 changes: 6 additions & 0 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -556,6 +556,12 @@ Options DBTestBase::GetOptions(
options.enable_pipelined_write = true;
break;
}
case kCommitPipeline: {
options.enable_pipelined_commit = true;
options.enable_pipelined_write = false;
options.two_write_queues = false;
break;
}
case kConcurrentWALWrites: {
// This options optimize 2PC commit path
options.two_write_queues = true;
Expand Down
1 change: 1 addition & 0 deletions db/db_test_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -865,6 +865,7 @@ class DBTestBase : public testing::Test {
kConcurrentSkipList = 28,
kPipelinedWrite = 29,
kConcurrentWALWrites = 30,
kCommitPipeline = 31,
kDirectIO,
kLevelSubcompactions,
kBlockBasedTableWithIndexRestartInterval,
Expand Down
3 changes: 2 additions & 1 deletion db/db_write_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -454,7 +454,8 @@ TEST_P(DBWriteTest, ConcurrentlyDisabledWAL) {
INSTANTIATE_TEST_CASE_P(DBWriteTestInstance, DBWriteTest,
testing::Values(DBTestBase::kDefault,
DBTestBase::kConcurrentWALWrites,
DBTestBase::kPipelinedWrite));
DBTestBase::kPipelinedWrite,
DBTestBase::kCommitPipeline));

} // namespace ROCKSDB_NAMESPACE

Expand Down
30 changes: 30 additions & 0 deletions db/external_sst_file_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1723,6 +1723,36 @@ TEST_F(ExternalSSTFileTest, WithUnorderedWrite) {
SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_F(ExternalSSTFileTest, WithCommitPipeline) {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency(
{{"DBImpl::WriteImpl:CommitAfterWriteWAL",
"ExternalSSTFileTest::WithCommitPipeline:WaitWriteWAL"},
{"DBImpl::WaitForPendingWrites:BeforeBlock",
"DBImpl::WriteImpl:BeforePipelineWriteMemtable"}});
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::IngestExternalFile:NeedFlush", [&](void* need_flush) {
ASSERT_TRUE(*reinterpret_cast<bool*>(need_flush));
});

Options options = CurrentOptions();
options.unordered_write = false;
options.enable_pipelined_commit = true;
DestroyAndReopen(options);
Put("foo", "v1");
SyncPoint::GetInstance()->EnableProcessing();
port::Thread writer([&]() { Put("bar", "v2"); });

TEST_SYNC_POINT("ExternalSSTFileTest::WithCommitPipeline:WaitWriteWAL");
ASSERT_OK(GenerateAndAddExternalFile(options, {{"bar", "v3"}}, -1,
true /* allow_global_seqno */));
ASSERT_EQ(Get("bar"), "v3");

writer.join();
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
}

#if !defined(ROCKSDB_VALGRIND_RUN) || defined(ROCKSDB_FULL_VALGRIND_RUN)
TEST_P(ExternalSSTFileTest, IngestFileWithGlobalSeqnoRandomized) {
env_->skip_fsync_ = true;
Expand Down
6 changes: 6 additions & 0 deletions db/write_callback_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -160,12 +160,18 @@ TEST_P(WriteCallbackPTest, WriteWithCallbackTest) {
if (options.enable_pipelined_write && options.two_write_queues) {
continue;
}
if (options.enable_pipelined_commit && options.two_write_queues) {
continue;
}
if (options.unordered_write && !options.allow_concurrent_memtable_write) {
continue;
}
if (options.unordered_write && options.enable_pipelined_write) {
continue;
}
if (options.unordered_write && options.enable_pipelined_commit) {
continue;
}

ReadOptions read_options;
DB* db;
Expand Down
32 changes: 32 additions & 0 deletions db/write_thread.cc
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
// (found in the LICENSE.Apache file in the root directory).

#include "db/write_thread.h"

#include <chrono>
#include <thread>

#include "db/column_family.h"
#include "monitoring/perf_context_imp.h"
#include "port/port.h"
Expand Down Expand Up @@ -493,6 +495,7 @@ size_t WriteThread::EnterAsBatchGroupLeader(Writer* leader,
write_group->last_writer = w;
write_group->size++;
}

TEST_SYNC_POINT_CALLBACK("WriteThread::EnterAsBatchGroupLeader:End", w);
return size;
}
Expand Down Expand Up @@ -799,4 +802,33 @@ void WriteThread::WaitForMemTableWriters() {
newest_memtable_writer_.store(nullptr);
}

RequestQueue::RequestQueue() {}

RequestQueue::~RequestQueue() {}

void RequestQueue::Enter(CommitRequest* req) {
std::unique_lock<std::mutex> guard(commit_mu_);
requests_.push_back(req);
}

void RequestQueue::CommitSequenceAwait(CommitRequest* req,
std::atomic<uint64_t>* commit_sequence) {
std::unique_lock<std::mutex> guard(commit_mu_);
while (!requests_.empty() && requests_.front() != req && !req->committed) {
commit_cv_.wait(guard);
}
if (req->committed) {
return;
} else if (requests_.front() == req) {
while (!requests_.empty() &&
requests_.front()->applied.load(std::memory_order_acquire)) {
CommitRequest* current = requests_.front();
commit_sequence->store(current->commit_lsn, std::memory_order_release);
current->committed = true;
requests_.pop_front();
}
commit_cv_.notify_all();
}
}

} // namespace ROCKSDB_NAMESPACE
Loading

0 comments on commit d3e4138

Please sign in to comment.