Skip to content

Commit

Permalink
Track WAL in MANIFEST: LogAndApply WAL events to MANIFEST (facebook#7601
Browse files Browse the repository at this point in the history
)

Summary:
When a WAL is synced, an edit is written to MANIFEST.
After flushing memtables, the obsoleted WALs are piggybacked to MANIFEST while writing the new L0 files to MANIFEST.

Pull Request resolved: facebook#7601

Test Plan:
`track_and_verify_wals_in_manifest` is enabled by default for all tests extending `DBBasicTest`, and in db_stress_test.
Unit test `wal_edit_test`, `version_edit_test`, and `version_set_test` are also updated.
Watch all tests to pass.

Reviewed By: ltamasi

Differential Revision: D24553957

Pulled By: cheng-chang

fbshipit-source-id: 66a569ff1bdced38e22900bd240b73113906e040
  • Loading branch information
Cheng Chang authored and codingrhythm committed Mar 5, 2021
1 parent 95d423d commit ae3682d
Show file tree
Hide file tree
Showing 19 changed files with 263 additions and 155 deletions.
50 changes: 40 additions & 10 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1283,7 +1283,11 @@ Status DBImpl::SyncWAL() {
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
{
InstrumentedMutexLock l(&mutex_);
MarkLogsSynced(current_log_number, need_log_dir_sync, status);
if (status.ok()) {
status = MarkLogsSynced(current_log_number, need_log_dir_sync);
} else {
MarkLogsNotSynced(current_log_number);
}
}
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");

Expand All @@ -1309,27 +1313,53 @@ Status DBImpl::UnlockWAL() {
return Status::OK();
}

void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
const Status& status) {
Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
mutex_.AssertHeld();
if (synced_dir && logfile_number_ == up_to && status.ok()) {
if (synced_dir && logfile_number_ == up_to) {
log_dir_synced_ = true;
}
VersionEdit synced_wals;
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& log = *it;
assert(log.getting_synced);
if (status.ok() && logs_.size() > 1) {
logs_to_free_.push_back(log.ReleaseWriter());
auto& wal = *it;
assert(wal.getting_synced);
if (logs_.size() > 1) {
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
synced_wals.AddWal(wal.number,
WalMetadata(wal.writer->file()->GetFileSize()));
}
logs_to_free_.push_back(wal.ReleaseWriter());
// To modify logs_ both mutex_ and log_write_mutex_ must be held
InstrumentedMutexLock l(&log_write_mutex_);
it = logs_.erase(it);
} else {
log.getting_synced = false;
wal.getting_synced = false;
++it;
}
}
assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
assert(logs_.empty() || logs_[0].number > up_to ||
(logs_.size() == 1 && !logs_[0].getting_synced));

Status s;
if (synced_wals.IsWalAddition()) {
// not empty, write to MANIFEST.
s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_);
if (!s.ok() && versions_->io_status().IsIOError()) {
s = error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
}
log_sync_cv_.SignalAll();
return s;
}

void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
mutex_.AssertHeld();
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
++it) {
auto& wal = *it;
assert(wal.getting_synced);
wal.getting_synced = false;
}
log_sync_cv_.SignalAll();
}

Expand Down
14 changes: 11 additions & 3 deletions db/db_impl/db_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -1702,7 +1702,9 @@ class DBImpl : public DB {
std::unique_ptr<TaskLimiterToken>* token, LogBuffer* log_buffer);

// helper function to call after some of the logs_ were synced
void MarkLogsSynced(uint64_t up_to, bool synced_dir, const Status& status);
Status MarkLogsSynced(uint64_t up_to, bool synced_dir);
// WALs with log number up to up_to are not synced successfully.
void MarkLogsNotSynced(uint64_t up_to);

SnapshotImpl* GetSnapshotImpl(bool is_write_conflict_boundary,
bool lock = true);
Expand Down Expand Up @@ -2204,12 +2206,18 @@ extern CompressionType GetCompressionFlush(
// `memtables_to_flush`) will be flushed and thus will not depend on any WAL
// file.
// The function is only applicable to 2pc mode.
extern uint64_t PrecomputeMinLogNumberToKeep(
extern uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
autovector<VersionEdit*> edit_list,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker);

// In non-2PC mode, WALs with log number < the returned number can be
// deleted after the cfd_to_flush column family is flushed successfully.
extern uint64_t PrecomputeMinLogNumberToKeepNon2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
const autovector<VersionEdit*>& edit_list);

// `cfd_to_flush` is the column family whose memtable will be flushed and thus
// will not depend on any WAL file. nullptr means no memtable is being flushed.
// The function is only applicable to 2pc mode.
Expand Down
6 changes: 5 additions & 1 deletion db/db_impl/db_impl_compaction_flush.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,11 @@ IOStatus DBImpl::SyncClosedLogs(JobContext* job_context) {

// "number <= current_log_number - 1" is equivalent to
// "number < current_log_number".
MarkLogsSynced(current_log_number - 1, true, io_s);
if (io_s.ok()) {
io_s = status_to_io_status(MarkLogsSynced(current_log_number - 1, true));
} else {
MarkLogsNotSynced(current_log_number - 1);
}
if (!io_s.ok()) {
if (total_log_size_ > 0) {
error_handler_.SetBGError(io_s, BackgroundErrorReason::kFlush)
Expand Down
26 changes: 18 additions & 8 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -680,16 +680,10 @@ uint64_t FindMinPrepLogReferencedByMemTable(
return min_log;
}

uint64_t PrecomputeMinLogNumberToKeep(
uint64_t PrecomputeMinLogNumberToKeepNon2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
autovector<VersionEdit*> edit_list,
const autovector<MemTable*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker) {
const autovector<VersionEdit*>& edit_list) {
assert(vset != nullptr);
assert(prep_tracker != nullptr);
// Calculate updated min_log_number_to_keep
// Since the function should only be called in 2pc mode, log number in
// the version edit should be sufficient.

// Precompute the min log number containing unflushed data for the column
// family being flushed (`cfd_to_flush`).
Expand All @@ -713,6 +707,22 @@ uint64_t PrecomputeMinLogNumberToKeep(
min_log_number_to_keep =
std::min(cf_min_log_number_to_keep, min_log_number_to_keep);
}
return min_log_number_to_keep;
}

uint64_t PrecomputeMinLogNumberToKeep2PC(
VersionSet* vset, const ColumnFamilyData& cfd_to_flush,
const autovector<VersionEdit*>& edit_list,
const autovector<MemTable*>& memtables_to_flush,
LogsWithPrepTracker* prep_tracker) {
assert(vset != nullptr);
assert(prep_tracker != nullptr);
// Calculate updated min_log_number_to_keep
// Since the function should only be called in 2pc mode, log number in
// the version edit should be sufficient.

uint64_t min_log_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, cfd_to_flush, edit_list);

// if are 2pc we must consider logs containing prepared
// sections of outstanding transactions.
Expand Down
14 changes: 8 additions & 6 deletions db/db_impl/db_impl_open.cc
Original file line number Diff line number Diff line change
Expand Up @@ -589,18 +589,20 @@ Status DBImpl::Recover(
}

if (immutable_db_options_.track_and_verify_wals_in_manifest) {
// Verify WALs in MANIFEST.
s = versions_->GetWalSet().CheckWals(env_, wal_files);
if (!immutable_db_options_.best_efforts_recovery) {
// Verify WALs in MANIFEST.
s = versions_->GetWalSet().CheckWals(env_, wal_files);
} // else since best effort recovery does not recover from WALs, no need
// to check WALs.
} else if (!versions_->GetWalSet().GetWals().empty()) {
// Tracking is disabled, clear previously tracked WALs from MANIFEST,
// otherwise, in the future, if WAL tracking is enabled again,
// since the WALs deleted when WAL tracking is disabled are not persisted
// into MANIFEST, WAL check may fail.
VersionEdit edit;
for (const auto& wal : versions_->GetWalSet().GetWals()) {
WalNumber number = wal.first;
edit.DeleteWal(number);
}
WalNumber max_wal_number =
versions_->GetWalSet().GetWals().rbegin()->first;
edit.DeleteWalsBefore(max_wal_number + 1);
s = versions_->LogAndApplyToDefaultColumnFamily(&edit, &mutex_);
}
if (!s.ok()) {
Expand Down
12 changes: 10 additions & 2 deletions db/db_impl/db_impl_write.cc
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,11 @@ Status DBImpl::WriteImpl(const WriteOptions& write_options,

if (need_log_sync) {
mutex_.Lock();
MarkLogsSynced(logfile_number_, need_log_dir_sync, status);
if (status.ok()) {
status = MarkLogsSynced(logfile_number_, need_log_dir_sync);
} else {
MarkLogsNotSynced(logfile_number_);
}
mutex_.Unlock();
// Requesting sync with two_write_queues_ is expected to be very rare. We
// hence provide a simple implementation that is not necessarily efficient.
Expand Down Expand Up @@ -551,7 +555,11 @@ Status DBImpl::PipelinedWriteImpl(const WriteOptions& write_options,

if (need_log_sync) {
mutex_.Lock();
MarkLogsSynced(logfile_number_, need_log_dir_sync, w.status);
if (w.status.ok()) {
w.status = MarkLogsSynced(logfile_number_, need_log_dir_sync);
} else {
MarkLogsNotSynced(logfile_number_);
}
mutex_.Unlock();
}

Expand Down
2 changes: 1 addition & 1 deletion db/db_range_del_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ TEST_F(DBRangeDelTest, MaxCompactionBytesCutsOutputFiles) {
// Want max_compaction_bytes to trigger the end of compaction output file, not
// target_file_size_base, so make the latter much bigger
opts.target_file_size_base = 100 * opts.max_compaction_bytes;
Reopen(opts);
DestroyAndReopen(opts);

// snapshot protects range tombstone from dropping due to becoming obsolete.
const Snapshot* snapshot = db_->GetSnapshot();
Expand Down
1 change: 1 addition & 0 deletions db/db_test_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,7 @@ Options DBTestBase::GetDefaultOptions() const {
options.wal_recovery_mode = WALRecoveryMode::kTolerateCorruptedTailRecords;
options.compaction_pri = CompactionPri::kByCompensatedSize;
options.env = env_;
options.track_and_verify_wals_in_manifest = true;
return options;
}

Expand Down
53 changes: 48 additions & 5 deletions db/memtable_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -473,12 +473,27 @@ Status MemTableList::TryInstallMemtableFlushResults(

// TODO(myabandeh): Not sure how batch_count could be 0 here.
if (batch_count > 0) {
uint64_t min_wal_number_to_keep = 0;
if (vset->db_options()->allow_2pc) {
assert(edit_list.size() > 0);
min_wal_number_to_keep = PrecomputeMinLogNumberToKeep2PC(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker);
// We piggyback the information of earliest log file to keep in the
// manifest entry for the last file flushed.
edit_list.back()->SetMinLogNumberToKeep(PrecomputeMinLogNumberToKeep(
vset, *cfd, edit_list, memtables_to_flush, prep_tracker));
edit_list.back()->SetMinLogNumberToKeep(min_wal_number_to_keep);
} else {
min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfd, edit_list);
}

std::unique_ptr<VersionEdit> wal_deletion;
if (vset->db_options()->track_and_verify_wals_in_manifest) {
const auto& wals = vset->GetWalSet().GetWals();
if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) {
wal_deletion.reset(new VersionEdit);
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
edit_list.push_back(wal_deletion.get());
}
}

const auto manifest_write_cb = [this, cfd, batch_count, log_buffer,
Expand Down Expand Up @@ -704,6 +719,10 @@ Status InstallMemtableAtomicFlushResults(
if (imm_lists != nullptr) {
assert(imm_lists->size() == num);
}
if (num == 0) {
return Status::OK();
}

for (size_t k = 0; k != num; ++k) {
#ifndef NDEBUG
const auto* imm =
Expand Down Expand Up @@ -732,12 +751,36 @@ Status InstallMemtableAtomicFlushResults(
++num_entries;
edit_lists.emplace_back(edits);
}

// TODO(cc): after https://github.com/facebook/rocksdb/pull/7570, handle 2pc
// here.
std::unique_ptr<VersionEdit> wal_deletion;
if (vset->db_options()->track_and_verify_wals_in_manifest) {
uint64_t min_wal_number_to_keep =
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfds[0], edit_lists[0]);
for (size_t i = 1; i < cfds.size(); i++) {
min_wal_number_to_keep = std::min(
min_wal_number_to_keep,
PrecomputeMinLogNumberToKeepNon2PC(vset, *cfds[i], edit_lists[i]));
}
const auto& wals = vset->GetWalSet().GetWals();
if (!wals.empty() && min_wal_number_to_keep > wals.begin()->first) {
wal_deletion.reset(new VersionEdit);
wal_deletion->DeleteWalsBefore(min_wal_number_to_keep);
edit_lists.back().push_back(wal_deletion.get());
++num_entries;
}
}

// Mark the version edits as an atomic group if the number of version edits
// exceeds 1.
if (cfds.size() > 1) {
for (auto& edits : edit_lists) {
assert(edits.size() == 1);
edits[0]->MarkAtomicGroup(--num_entries);
for (size_t i = 0; i < edit_lists.size(); i++) {
assert((edit_lists[i].size() == 1) ||
((edit_lists[i].size() == 2) && (i == edit_lists.size() - 1)));
for (auto& e : edit_lists[i]) {
e->MarkAtomicGroup(--num_entries);
}
}
assert(0 == num_entries);
}
Expand Down
29 changes: 11 additions & 18 deletions db/version_edit.cc
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void VersionEdit::Clear() {
blob_file_additions_.clear();
blob_file_garbages_.clear();
wal_additions_.clear();
wal_deletions_.clear();
wal_deletion_.Reset();
column_family_ = 0;
is_column_family_add_ = false;
is_column_family_drop_ = false;
Expand Down Expand Up @@ -229,9 +229,9 @@ bool VersionEdit::EncodeTo(std::string* dst) const {
wal_addition.EncodeTo(dst);
}

for (const auto& wal_deletion : wal_deletions_) {
if (!wal_deletion_.IsEmpty()) {
PutVarint32(dst, kWalDeletion);
wal_deletion.EncodeTo(dst);
wal_deletion_.EncodeTo(dst);
}

// 0 is default and does not need to be explicitly written
Expand Down Expand Up @@ -576,7 +576,7 @@ Status VersionEdit::DecodeFrom(const Slice& src) {
return s;
}

wal_deletions_.emplace_back(std::move(wal_deletion));
wal_deletion_ = std::move(wal_deletion);
break;
}

Expand Down Expand Up @@ -725,9 +725,9 @@ std::string VersionEdit::DebugString(bool hex_key) const {
r.append(wal_addition.DebugString());
}

for (const auto& wal_deletion : wal_deletions_) {
if (!wal_deletion_.IsEmpty()) {
r.append("\n WalDeletion: ");
r.append(wal_deletion.DebugString());
r.append(wal_deletion_.DebugString());
}

r.append("\n ColumnFamily: ");
Expand Down Expand Up @@ -854,18 +854,11 @@ std::string VersionEdit::DebugJSON(int edit_num, bool hex_key) const {
jw.EndArray();
}

if (!wal_deletions_.empty()) {
jw << "WalDeletions";

jw.StartArray();

for (const auto& wal_deletion : wal_deletions_) {
jw.StartArrayedObject();
jw << wal_deletion;
jw.EndArrayedObject();
}

jw.EndArray();
if (!wal_deletion_.IsEmpty()) {
jw << "WalDeletion";
jw.StartObject();
jw << wal_deletion_;
jw.EndObject();
}

jw << "ColumnFamily" << column_family_;
Expand Down
Loading

0 comments on commit ae3682d

Please sign in to comment.