Skip to content

Commit

Permalink
reduce file lookups before purging and fix double referencing files (#…
Browse files Browse the repository at this point in the history
…250)

This PR contains a optimization to reduce DB mutex held time when generating obsolete files purging jobs:

Obsolete file candidates consist of `obsolete_files_` that are no longer referenced by any version in `VersionSet`, or all physical files in DB directory if `doing_full_scan` is true. Consider the former case, `obsolete_files_` is guaranteed to be obsolete by protocol and thus no need to be checked against live files list. This PR avoids the unnecessary live file lookups when not doing full scan.

The optimization assumes a solid reference counting for sst files, i.e. each physical SST file is represented by an unique `FileMetaData` structure. But this assumption is actually broken in existing codebase: A trivially moved file will appear as a new file meta with the same file number. This patch also fix this issue by forwarding moved old files in `VersionBuilder`.

Aside from that, this PR also assumes SST file won't be moved to a different path with the same file number, and will raise error in that case.

Finally, this PR shows impressive performance improvement in our own testing (thanks @5kbpers). We examine the patch in an artificial workload, where iterators are frequently created and released (active iterator count around 250, RocksDB write OPS around 20K). The logging shows before patching, `AddLiveFiles()` call takes 100+ms and in that process copies 2500K file metas with 60K of them being unique.

Test Plan:
- Extend existing unit tests to simulate trivial file movement and incorrect file editing.

Benchmark Results:
[TPC-C 5K WH 1 KV 512 threads]
Before: TPM: 23676.8, 50th(ms): 570.4, 90th(ms): 704.6, 95th(ms): 738.2, 99th(ms): 872.4, 99.9th(ms): 5637.1, Max(ms): 12884.9
After: TPM: 24395.1(3%+), 50th(ms): 570.4, 90th(ms): 704.6, 95th(ms): 738.2, 99th(ms): 838.9, 99.9th(ms): 1342.2(76%-), Max(ms): 2952.8(77%-)

Signed-off-by: Xinye Tao <xy.tao@outlook.com>
  • Loading branch information
tabokie committed Aug 18, 2021
1 parent 3de4cc2 commit 6b97c05
Show file tree
Hide file tree
Showing 15 changed files with 314 additions and 157 deletions.
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
* Fix a bug in which a snapshot read could be affected by a DeleteRange after the snapshot (#6062).
* `WriteBatchWithIndex::DeleteRange` returns `Status::NotSupported`. Previously it returned success even though reads on the batch did not account for range tombstones. The corresponding language bindings now cannot be used. In C, that includes `rocksdb_writebatch_wi_delete_range`, `rocksdb_writebatch_wi_delete_range_cf`, `rocksdb_writebatch_wi_delete_rangev`, and `rocksdb_writebatch_wi_delete_rangev_cf`. In Java, that includes `WriteBatchWithIndex::deleteRange`.

### Performance Improvements
* When gathering unreferenced obsolete files for purging, file metas associated with active versions will no longer be copied for double-check. Updated VersionBuilder to make sure each physical file is reference counted by at most one FileMetaData.

## 6.4.6 (10/16/2019)
* Fix a bug when partitioned filters and prefix search are used in conjunction, ::SeekForPrev could return invalid for an existing prefix. ::SeekForPrev might be called by the user, or internally on ::Prev, or within ::Seek if the return value involves Delete or a Merge operand.
Expand Down
2 changes: 1 addition & 1 deletion db/builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
namespace rocksdb {

struct Options;
struct FileMetaData;
class FileMetaData;

class Env;
struct EnvOptions;
Expand Down
1 change: 0 additions & 1 deletion db/compaction/compaction_picker_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ class CompactionPickerTest : public testing::Test {
f->fd.largest_seqno = largest_seq;
f->compensated_file_size =
(compensated_file_size != 0) ? compensated_file_size : file_size;
f->refs = 0;
vstorage_->AddFile(level, f);
files_.emplace_back(f);
file_map_.insert({file_number, {f, level}});
Expand Down
4 changes: 2 additions & 2 deletions db/db_filesnapshot.cc
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
}

// Make a set of all of the live *.sst files
std::vector<FileDescriptor> live;
std::vector<uint64_t> live;
for (auto cfd : *versions_->GetColumnFamilySet()) {
if (cfd->IsDropped()) {
continue;
Expand All @@ -133,7 +133,7 @@ Status DBImpl::GetLiveFiles(std::vector<std::string>& ret,
// create names of the live files. The names are not absolute
// paths, instead they are relative to dbname_;
for (const auto& live_file : live) {
ret.push_back(MakeTableFileName("", live_file.GetNumber()));
ret.push_back(MakeTableFileName("", live_file));
}

ret.push_back(CurrentFileName(""));
Expand Down
23 changes: 12 additions & 11 deletions db/db_impl/db_impl_files.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,13 @@ uint64_t DBImpl::MinObsoleteSstNumberToKeep() {
return std::numeric_limits<uint64_t>::max();
}

// * Returns the list of live files in 'sst_live'
// If it's doing full scan:
// * Returns the list of all files in the filesystem in
// 'full_scan_candidate_files'.
// Otherwise, gets obsolete files from VersionSet.
// * Returns the list of live files in 'sst_live'.
// Otherwise:
// * Gets obsolete files from VersionSet.
//
// no_full_scan = true -- never do the full scan using GetChildren()
// force = false -- don't force the full scan, except every
// mutable_db_options_.delete_obsolete_files_period_micros
Expand Down Expand Up @@ -103,7 +105,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
job_context->log_number = MinLogNumberToKeep();
job_context->prev_log_number = versions_->prev_log_number();

versions_->AddLiveFiles(&job_context->sst_live);
if (doing_the_full_scan) {
InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(),
dbname_);
Expand Down Expand Up @@ -235,6 +236,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force,
log_recycle_files_.end());
if (job_context->HaveSomethingToDelete()) {
++pending_purge_obsolete_files_;
if (doing_the_full_scan) {
versions_->AddLiveFiles(&job_context->sst_live);
}
}
logs_to_free_.clear();
}
Expand Down Expand Up @@ -303,12 +307,9 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
// FindObsoleteFiles() should've populated this so nonzero
assert(state.manifest_file_number != 0);

// Now, convert live list to an unordered map, WITHOUT mutex held;
// set is slow.
std::unordered_map<uint64_t, const FileDescriptor*> sst_live_map;
for (const FileDescriptor& fd : state.sst_live) {
sst_live_map[fd.GetNumber()] = &fd;
}
// Now, convert lists to unordered sets, WITHOUT mutex held; set is slow.
std::unordered_set<uint64_t> sst_live_set(state.sst_live.begin(),
state.sst_live.end());
std::unordered_set<uint64_t> log_recycle_files_set(
state.log_recycle_files.begin(), state.log_recycle_files.end());

Expand Down Expand Up @@ -407,7 +408,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
case kTableFile:
// If the second condition is not there, this makes
// DontDeletePendingOutputs fail
keep = (sst_live_map.find(number) != sst_live_map.end()) ||
keep = (sst_live_set.find(number) != sst_live_set.end()) ||
number >= state.min_pending_output;
if (!keep) {
files_to_del.insert(number);
Expand All @@ -422,7 +423,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) {
//
// TODO(yhchiang): carefully modify the third condition to safely
// remove the temp options files.
keep = (sst_live_map.find(number) != sst_live_map.end()) ||
keep = (sst_live_set.find(number) != sst_live_set.end()) ||
(number == state.pending_manifest_file_number) ||
(to_delete.find(kOptionsFileNamePrefix) != std::string::npos);
break;
Expand Down
2 changes: 1 addition & 1 deletion db/file_indexer.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
namespace rocksdb {

class Comparator;
struct FileMetaData;
class FileMetaData;
struct FdWithKeyRange;
struct FileLevel;

Expand Down
2 changes: 1 addition & 1 deletion db/forward_iterator.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ struct SuperVersion;
class ColumnFamilyData;
class ForwardLevelIterator;
class VersionStorageInfo;
struct FileMetaData;
class FileMetaData;

class MinIterComparator {
public:
Expand Down
3 changes: 2 additions & 1 deletion db/job_context.h
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,8 @@ struct JobContext {
std::vector<CandidateFileInfo> full_scan_candidate_files;

// the list of all live sst files that cannot be deleted
std::vector<FileDescriptor> sst_live;
// (filled only if we're doing full scan)
std::vector<uint64_t> sst_live;

// a list of sst files that we need to delete
std::vector<ObsoleteFileInfo> sst_delete_files;
Expand Down
149 changes: 85 additions & 64 deletions db/version_builder.cc
Original file line number Diff line number Diff line change
Expand Up @@ -78,8 +78,13 @@ class VersionBuilder::Rep {
};

struct LevelState {
std::unordered_set<uint64_t> deleted_files;
// Map from file number to file meta data.
// Files in base version that should be deleted.
std::unordered_set<uint64_t> deleted_base_files;
// Files moved from base version.
// Those files will not be referenced by VersionBuilder.
std::unordered_map<uint64_t, FileMetaData*> moved_files;
// Files added, must not intersect with moved_files.
// Those files will be referenced during the lifetime of VersionBuilder.
std::unordered_map<uint64_t, FileMetaData*> added_files;
};

Expand Down Expand Up @@ -130,8 +135,7 @@ class VersionBuilder::Rep {
}

void UnrefFile(FileMetaData* f) {
f->refs--;
if (f->refs <= 0) {
if (f->Unref()) {
if (f->table_reader_handle) {
assert(table_cache_ != nullptr);
table_cache_->ReleaseHandle(f->table_reader_handle);
Expand Down Expand Up @@ -296,7 +300,13 @@ class VersionBuilder::Rep {
add_files.erase(add_it);
}

auto& del_files = level_state.deleted_files;
auto& moved_files = level_state.moved_files;
auto moved_it = moved_files.find(file_number);
if (moved_it != moved_files.end()) {
moved_files.erase(moved_it);
}

auto& del_files = level_state.deleted_base_files;
assert(del_files.find(file_number) == del_files.end());
del_files.emplace(file_number);

Expand Down Expand Up @@ -333,19 +343,30 @@ class VersionBuilder::Rep {
}

auto& level_state = levels_[level];

auto& del_files = level_state.deleted_files;
auto del_it = del_files.find(file_number);
if (del_it != del_files.end()) {
del_files.erase(del_it);
// Try to reuse file meta from base version.
FileMetaData* f = base_vstorage_->GetFileMetaDataByNumber(file_number);
std::unordered_map<uint64_t, FileMetaData*>* container = nullptr;
if (f != nullptr) {
// This should be a file trivially moved to a new position. Make sure the
// two are the same physical file.
if (f->fd.GetPathId() != meta.fd.GetPathId()) {
std::ostringstream oss;
oss << "Cannot add table file #" << file_number << " to level " << level
<< " by trivial move since it isn't trivial to move to a "
<< "different path";
return Status::Corruption("VersionBuilder", oss.str());
}
// No need to Ref() this file held by base_vstorage_.
container = &level_state.moved_files;
} else {
f = new FileMetaData(meta);
// Will drop reference in dtor.
f->Ref();
container = &level_state.added_files;
}

FileMetaData* const f = new FileMetaData(meta);
f->refs = 1;

auto& add_files = level_state.added_files;
assert(add_files.find(file_number) == add_files.end());
add_files.emplace(file_number, f);
assert(container && container->find(file_number) == container->end());
container->emplace(file_number, f);

table_file_levels_[file_number] = level;

Expand Down Expand Up @@ -403,39 +424,70 @@ class VersionBuilder::Rep {
// Merge the set of added files with the set of pre-existing files.
// Drop any deleted files. Store the result in *v.
const auto& base_files = base_vstorage_->LevelFiles(level);
const auto& del_files = levels_[level].deleted_base_files;
const auto& unordered_added_files = levels_[level].added_files;
const auto& unordered_moved_files = levels_[level].moved_files;

vstorage->Reserve(level,
base_files.size() + unordered_added_files.size());

// Sort added files for the level.
std::vector<FileMetaData*> added_files;
added_files.reserve(unordered_added_files.size());
// Sort delta files for the level.
std::vector<FileMetaData*> delta_files;
delta_files.reserve(unordered_added_files.size() +
unordered_moved_files.size());
for (const auto& pair : unordered_added_files) {
added_files.push_back(pair.second);
delta_files.push_back(pair.second);
}
for (const auto& pair : unordered_moved_files) {
// SaveTo will always be called under db mutex.
pair.second->being_moved_to = level;
delta_files.push_back(pair.second);
}
std::sort(added_files.begin(), added_files.end(), cmp);
std::sort(delta_files.begin(), delta_files.end(), cmp);

#ifndef NDEBUG
FileMetaData* prev_added_file = nullptr;
for (const auto& added : added_files) {
if (level > 0 && prev_added_file != nullptr) {
FileMetaData* prev_file = nullptr;
for (const auto& delta : delta_files) {
if (level > 0 && prev_file != nullptr) {
assert(base_vstorage_->InternalComparator()->Compare(
prev_added_file->smallest, added->smallest) <= 0);
prev_file->smallest, delta->smallest) <= 0);
}
prev_added_file = added;
prev_file = delta;
}
#endif

auto base_iter = base_files.begin();
auto base_end = base_files.end();
auto added_iter = added_files.begin();
auto added_end = added_files.end();
while (added_iter != added_end || base_iter != base_end) {
if (base_iter == base_end ||
(added_iter != added_end && cmp(*added_iter, *base_iter))) {
MaybeAddFile(vstorage, level, *added_iter++);
auto delta_iter = delta_files.begin();
auto delta_end = delta_files.end();

// Delta file supersedes base file because base is masked by
// deleted_base_files.
while (delta_iter != delta_end || base_iter != base_end) {
if (delta_iter == delta_end ||
(base_iter != base_end && cmp(*base_iter, *delta_iter))) {
FileMetaData* f = *base_iter++;
const uint64_t file_number = f->fd.GetNumber();

if (del_files.find(file_number) != del_files.end()) {
// vstorage inherited base_vstorage_'s stats, need to account for
// deleted base files.
vstorage->RemoveCurrentStats(f);
} else {
#ifndef NDEBUG
assert(unordered_added_files.find(file_number) ==
unordered_added_files.end());
#endif
vstorage->AddFile(level, f, info_log_);
}
} else {
MaybeAddFile(vstorage, level, *base_iter++);
FileMetaData* f = *delta_iter++;
if (f->init_stats_from_file) {
// A moved file whose stats is inited by base_vstorage_ and then
// deleted from it.
vstorage->UpdateAccumulatedStats(f);
}
vstorage->AddFile(level, f, info_log_);
}
}
}
Expand Down Expand Up @@ -540,31 +592,6 @@ class VersionBuilder::Rep {
}
return Status::OK();
}

void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) {
const uint64_t file_number = f->fd.GetNumber();

const auto& level_state = levels_[level];

const auto& del_files = level_state.deleted_files;
const auto del_it = del_files.find(file_number);

if (del_it != del_files.end()) {
// f is to-be-deleted table file
vstorage->RemoveCurrentStats(f);
} else {
const auto& add_files = level_state.added_files;
const auto add_it = add_files.find(file_number);

// Note: if the file appears both in the base version and in the added
// list, the added FileMetaData supersedes the one in the base version.
if (add_it != add_files.end() && add_it->second != f) {
vstorage->RemoveCurrentStats(f);
} else {
vstorage->AddFile(level, f, info_log_);
}
}
}
};

VersionBuilder::VersionBuilder(const EnvOptions& env_options,
Expand Down Expand Up @@ -597,10 +624,4 @@ Status VersionBuilder::LoadTableHandlers(
prefetch_index_and_filter_in_cache,
is_initial_load, prefix_extractor);
}

void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level,
FileMetaData* f) {
rep_->MaybeAddFile(vstorage, level, f);
}

} // namespace rocksdb
3 changes: 1 addition & 2 deletions db/version_builder.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ namespace rocksdb {
class TableCache;
class VersionStorageInfo;
class VersionEdit;
struct FileMetaData;
class FileMetaData;
class InternalStats;

// A helper class so we can efficiently apply a whole sequence
Expand All @@ -38,7 +38,6 @@ class VersionBuilder {
bool prefetch_index_and_filter_in_cache,
bool is_initial_load,
const SliceTransform* prefix_extractor);
void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f);

private:
class Rep;
Expand Down
Loading

0 comments on commit 6b97c05

Please sign in to comment.