From 6b97c0567c4694ef1d06374d54bf582e541d215d Mon Sep 17 00:00:00 2001 From: Xinye Tao Date: Wed, 18 Aug 2021 12:28:24 +0800 Subject: [PATCH] reduce file lookups before purging and fix double referencing files (#250) This PR contains a optimization to reduce DB mutex held time when generating obsolete files purging jobs: Obsolete file candidates consist of `obsolete_files_` that are no longer referenced by any version in `VersionSet`, or all physical files in DB directory if `doing_full_scan` is true. Consider the former case, `obsolete_files_` is guaranteed to be obsolete by protocol and thus no need to be checked against live files list. This PR avoids the unnecessary live file lookups when not doing full scan. The optimization assumes a solid reference counting for sst files, i.e. each physical SST file is represented by an unique `FileMetaData` structure. But this assumption is actually broken in existing codebase: A trivially moved file will appear as a new file meta with the same file number. This patch also fix this issue by forwarding moved old files in `VersionBuilder`. Aside from that, this PR also assumes SST file won't be moved to a different path with the same file number, and will raise error in that case. Finally, this PR shows impressive performance improvement in our own testing (thanks @5kbpers). We examine the patch in an artificial workload, where iterators are frequently created and released (active iterator count around 250, RocksDB write OPS around 20K). The logging shows before patching, `AddLiveFiles()` call takes 100+ms and in that process copies 2500K file metas with 60K of them being unique. Test Plan: - Extend existing unit tests to simulate trivial file movement and incorrect file editing. Benchmark Results: [TPC-C 5K WH 1 KV 512 threads] Before: TPM: 23676.8, 50th(ms): 570.4, 90th(ms): 704.6, 95th(ms): 738.2, 99th(ms): 872.4, 99.9th(ms): 5637.1, Max(ms): 12884.9 After: TPM: 24395.1(3%+), 50th(ms): 570.4, 90th(ms): 704.6, 95th(ms): 738.2, 99th(ms): 838.9, 99.9th(ms): 1342.2(76%-), Max(ms): 2952.8(77%-) Signed-off-by: Xinye Tao --- HISTORY.md | 2 + db/builder.h | 2 +- db/compaction/compaction_picker_test.cc | 1 - db/db_filesnapshot.cc | 4 +- db/db_impl/db_impl_files.cc | 23 ++-- db/file_indexer.h | 2 +- db/forward_iterator.h | 2 +- db/job_context.h | 3 +- db/version_builder.cc | 149 +++++++++++++---------- db/version_builder.h | 3 +- db/version_builder_test.cc | 154 +++++++++++++++++++----- db/version_edit.h | 23 +++- db/version_set.cc | 77 +++++++----- db/version_set.h | 15 ++- db/version_set_test.cc | 11 +- 15 files changed, 314 insertions(+), 157 deletions(-) diff --git a/HISTORY.md b/HISTORY.md index 5325ef3cb11..09492305577 100644 --- a/HISTORY.md +++ b/HISTORY.md @@ -22,6 +22,8 @@ * Fix a bug in which a snapshot read could be affected by a DeleteRange after the snapshot (#6062). * `WriteBatchWithIndex::DeleteRange` returns `Status::NotSupported`. Previously it returned success even though reads on the batch did not account for range tombstones. The corresponding language bindings now cannot be used. In C, that includes `rocksdb_writebatch_wi_delete_range`, `rocksdb_writebatch_wi_delete_range_cf`, `rocksdb_writebatch_wi_delete_rangev`, and `rocksdb_writebatch_wi_delete_rangev_cf`. In Java, that includes `WriteBatchWithIndex::deleteRange`. +### Performance Improvements +* When gathering unreferenced obsolete files for purging, file metas associated with active versions will no longer be copied for double-check. Updated VersionBuilder to make sure each physical file is reference counted by at most one FileMetaData. ## 6.4.6 (10/16/2019) * Fix a bug when partitioned filters and prefix search are used in conjunction, ::SeekForPrev could return invalid for an existing prefix. ::SeekForPrev might be called by the user, or internally on ::Prev, or within ::Seek if the return value involves Delete or a Merge operand. diff --git a/db/builder.h b/db/builder.h index 4fa56f50e34..1844efd60a6 100644 --- a/db/builder.h +++ b/db/builder.h @@ -25,7 +25,7 @@ namespace rocksdb { struct Options; -struct FileMetaData; +class FileMetaData; class Env; struct EnvOptions; diff --git a/db/compaction/compaction_picker_test.cc b/db/compaction/compaction_picker_test.cc index 89feae141a4..c75b14a7946 100644 --- a/db/compaction/compaction_picker_test.cc +++ b/db/compaction/compaction_picker_test.cc @@ -96,7 +96,6 @@ class CompactionPickerTest : public testing::Test { f->fd.largest_seqno = largest_seq; f->compensated_file_size = (compensated_file_size != 0) ? compensated_file_size : file_size; - f->refs = 0; vstorage_->AddFile(level, f); files_.emplace_back(f); file_map_.insert({file_number, {f, level}}); diff --git a/db/db_filesnapshot.cc b/db/db_filesnapshot.cc index 67d994f5568..a3bffe1dde0 100644 --- a/db/db_filesnapshot.cc +++ b/db/db_filesnapshot.cc @@ -119,7 +119,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, } // Make a set of all of the live *.sst files - std::vector live; + std::vector live; for (auto cfd : *versions_->GetColumnFamilySet()) { if (cfd->IsDropped()) { continue; @@ -133,7 +133,7 @@ Status DBImpl::GetLiveFiles(std::vector& ret, // create names of the live files. The names are not absolute // paths, instead they are relative to dbname_; for (const auto& live_file : live) { - ret.push_back(MakeTableFileName("", live_file.GetNumber())); + ret.push_back(MakeTableFileName("", live_file)); } ret.push_back(CurrentFileName("")); diff --git a/db/db_impl/db_impl_files.cc b/db/db_impl/db_impl_files.cc index dc6ffa437ed..a3cc0a92d96 100644 --- a/db/db_impl/db_impl_files.cc +++ b/db/db_impl/db_impl_files.cc @@ -35,11 +35,13 @@ uint64_t DBImpl::MinObsoleteSstNumberToKeep() { return std::numeric_limits::max(); } -// * Returns the list of live files in 'sst_live' // If it's doing full scan: // * Returns the list of all files in the filesystem in // 'full_scan_candidate_files'. -// Otherwise, gets obsolete files from VersionSet. +// * Returns the list of live files in 'sst_live'. +// Otherwise: +// * Gets obsolete files from VersionSet. +// // no_full_scan = true -- never do the full scan using GetChildren() // force = false -- don't force the full scan, except every // mutable_db_options_.delete_obsolete_files_period_micros @@ -103,7 +105,6 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, job_context->log_number = MinLogNumberToKeep(); job_context->prev_log_number = versions_->prev_log_number(); - versions_->AddLiveFiles(&job_context->sst_live); if (doing_the_full_scan) { InfoLogPrefix info_log_prefix(!immutable_db_options_.db_log_dir.empty(), dbname_); @@ -235,6 +236,9 @@ void DBImpl::FindObsoleteFiles(JobContext* job_context, bool force, log_recycle_files_.end()); if (job_context->HaveSomethingToDelete()) { ++pending_purge_obsolete_files_; + if (doing_the_full_scan) { + versions_->AddLiveFiles(&job_context->sst_live); + } } logs_to_free_.clear(); } @@ -303,12 +307,9 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { // FindObsoleteFiles() should've populated this so nonzero assert(state.manifest_file_number != 0); - // Now, convert live list to an unordered map, WITHOUT mutex held; - // set is slow. - std::unordered_map sst_live_map; - for (const FileDescriptor& fd : state.sst_live) { - sst_live_map[fd.GetNumber()] = &fd; - } + // Now, convert lists to unordered sets, WITHOUT mutex held; set is slow. + std::unordered_set sst_live_set(state.sst_live.begin(), + state.sst_live.end()); std::unordered_set log_recycle_files_set( state.log_recycle_files.begin(), state.log_recycle_files.end()); @@ -407,7 +408,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { case kTableFile: // If the second condition is not there, this makes // DontDeletePendingOutputs fail - keep = (sst_live_map.find(number) != sst_live_map.end()) || + keep = (sst_live_set.find(number) != sst_live_set.end()) || number >= state.min_pending_output; if (!keep) { files_to_del.insert(number); @@ -422,7 +423,7 @@ void DBImpl::PurgeObsoleteFiles(JobContext& state, bool schedule_only) { // // TODO(yhchiang): carefully modify the third condition to safely // remove the temp options files. - keep = (sst_live_map.find(number) != sst_live_map.end()) || + keep = (sst_live_set.find(number) != sst_live_set.end()) || (number == state.pending_manifest_file_number) || (to_delete.find(kOptionsFileNamePrefix) != std::string::npos); break; diff --git a/db/file_indexer.h b/db/file_indexer.h index 2091f80292b..98180c1ce2e 100644 --- a/db/file_indexer.h +++ b/db/file_indexer.h @@ -19,7 +19,7 @@ namespace rocksdb { class Comparator; -struct FileMetaData; +class FileMetaData; struct FdWithKeyRange; struct FileLevel; diff --git a/db/forward_iterator.h b/db/forward_iterator.h index fb73f458edd..39121b92853 100644 --- a/db/forward_iterator.h +++ b/db/forward_iterator.h @@ -25,7 +25,7 @@ struct SuperVersion; class ColumnFamilyData; class ForwardLevelIterator; class VersionStorageInfo; -struct FileMetaData; +class FileMetaData; class MinIterComparator { public: diff --git a/db/job_context.h b/db/job_context.h index 3978fad33c9..64415a60cf2 100644 --- a/db/job_context.h +++ b/db/job_context.h @@ -140,7 +140,8 @@ struct JobContext { std::vector full_scan_candidate_files; // the list of all live sst files that cannot be deleted - std::vector sst_live; + // (filled only if we're doing full scan) + std::vector sst_live; // a list of sst files that we need to delete std::vector sst_delete_files; diff --git a/db/version_builder.cc b/db/version_builder.cc index 2692b04f8f5..467d16e12e7 100644 --- a/db/version_builder.cc +++ b/db/version_builder.cc @@ -78,8 +78,13 @@ class VersionBuilder::Rep { }; struct LevelState { - std::unordered_set deleted_files; - // Map from file number to file meta data. + // Files in base version that should be deleted. + std::unordered_set deleted_base_files; + // Files moved from base version. + // Those files will not be referenced by VersionBuilder. + std::unordered_map moved_files; + // Files added, must not intersect with moved_files. + // Those files will be referenced during the lifetime of VersionBuilder. std::unordered_map added_files; }; @@ -130,8 +135,7 @@ class VersionBuilder::Rep { } void UnrefFile(FileMetaData* f) { - f->refs--; - if (f->refs <= 0) { + if (f->Unref()) { if (f->table_reader_handle) { assert(table_cache_ != nullptr); table_cache_->ReleaseHandle(f->table_reader_handle); @@ -296,7 +300,13 @@ class VersionBuilder::Rep { add_files.erase(add_it); } - auto& del_files = level_state.deleted_files; + auto& moved_files = level_state.moved_files; + auto moved_it = moved_files.find(file_number); + if (moved_it != moved_files.end()) { + moved_files.erase(moved_it); + } + + auto& del_files = level_state.deleted_base_files; assert(del_files.find(file_number) == del_files.end()); del_files.emplace(file_number); @@ -333,19 +343,30 @@ class VersionBuilder::Rep { } auto& level_state = levels_[level]; - - auto& del_files = level_state.deleted_files; - auto del_it = del_files.find(file_number); - if (del_it != del_files.end()) { - del_files.erase(del_it); + // Try to reuse file meta from base version. + FileMetaData* f = base_vstorage_->GetFileMetaDataByNumber(file_number); + std::unordered_map* container = nullptr; + if (f != nullptr) { + // This should be a file trivially moved to a new position. Make sure the + // two are the same physical file. + if (f->fd.GetPathId() != meta.fd.GetPathId()) { + std::ostringstream oss; + oss << "Cannot add table file #" << file_number << " to level " << level + << " by trivial move since it isn't trivial to move to a " + << "different path"; + return Status::Corruption("VersionBuilder", oss.str()); + } + // No need to Ref() this file held by base_vstorage_. + container = &level_state.moved_files; + } else { + f = new FileMetaData(meta); + // Will drop reference in dtor. + f->Ref(); + container = &level_state.added_files; } - FileMetaData* const f = new FileMetaData(meta); - f->refs = 1; - - auto& add_files = level_state.added_files; - assert(add_files.find(file_number) == add_files.end()); - add_files.emplace(file_number, f); + assert(container && container->find(file_number) == container->end()); + container->emplace(file_number, f); table_file_levels_[file_number] = level; @@ -403,39 +424,70 @@ class VersionBuilder::Rep { // Merge the set of added files with the set of pre-existing files. // Drop any deleted files. Store the result in *v. const auto& base_files = base_vstorage_->LevelFiles(level); + const auto& del_files = levels_[level].deleted_base_files; const auto& unordered_added_files = levels_[level].added_files; + const auto& unordered_moved_files = levels_[level].moved_files; + vstorage->Reserve(level, base_files.size() + unordered_added_files.size()); - // Sort added files for the level. - std::vector added_files; - added_files.reserve(unordered_added_files.size()); + // Sort delta files for the level. + std::vector delta_files; + delta_files.reserve(unordered_added_files.size() + + unordered_moved_files.size()); for (const auto& pair : unordered_added_files) { - added_files.push_back(pair.second); + delta_files.push_back(pair.second); + } + for (const auto& pair : unordered_moved_files) { + // SaveTo will always be called under db mutex. + pair.second->being_moved_to = level; + delta_files.push_back(pair.second); } - std::sort(added_files.begin(), added_files.end(), cmp); + std::sort(delta_files.begin(), delta_files.end(), cmp); #ifndef NDEBUG - FileMetaData* prev_added_file = nullptr; - for (const auto& added : added_files) { - if (level > 0 && prev_added_file != nullptr) { + FileMetaData* prev_file = nullptr; + for (const auto& delta : delta_files) { + if (level > 0 && prev_file != nullptr) { assert(base_vstorage_->InternalComparator()->Compare( - prev_added_file->smallest, added->smallest) <= 0); + prev_file->smallest, delta->smallest) <= 0); } - prev_added_file = added; + prev_file = delta; } #endif auto base_iter = base_files.begin(); auto base_end = base_files.end(); - auto added_iter = added_files.begin(); - auto added_end = added_files.end(); - while (added_iter != added_end || base_iter != base_end) { - if (base_iter == base_end || - (added_iter != added_end && cmp(*added_iter, *base_iter))) { - MaybeAddFile(vstorage, level, *added_iter++); + auto delta_iter = delta_files.begin(); + auto delta_end = delta_files.end(); + + // Delta file supersedes base file because base is masked by + // deleted_base_files. + while (delta_iter != delta_end || base_iter != base_end) { + if (delta_iter == delta_end || + (base_iter != base_end && cmp(*base_iter, *delta_iter))) { + FileMetaData* f = *base_iter++; + const uint64_t file_number = f->fd.GetNumber(); + + if (del_files.find(file_number) != del_files.end()) { + // vstorage inherited base_vstorage_'s stats, need to account for + // deleted base files. + vstorage->RemoveCurrentStats(f); + } else { +#ifndef NDEBUG + assert(unordered_added_files.find(file_number) == + unordered_added_files.end()); +#endif + vstorage->AddFile(level, f, info_log_); + } } else { - MaybeAddFile(vstorage, level, *base_iter++); + FileMetaData* f = *delta_iter++; + if (f->init_stats_from_file) { + // A moved file whose stats is inited by base_vstorage_ and then + // deleted from it. + vstorage->UpdateAccumulatedStats(f); + } + vstorage->AddFile(level, f, info_log_); } } } @@ -540,31 +592,6 @@ class VersionBuilder::Rep { } return Status::OK(); } - - void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f) { - const uint64_t file_number = f->fd.GetNumber(); - - const auto& level_state = levels_[level]; - - const auto& del_files = level_state.deleted_files; - const auto del_it = del_files.find(file_number); - - if (del_it != del_files.end()) { - // f is to-be-deleted table file - vstorage->RemoveCurrentStats(f); - } else { - const auto& add_files = level_state.added_files; - const auto add_it = add_files.find(file_number); - - // Note: if the file appears both in the base version and in the added - // list, the added FileMetaData supersedes the one in the base version. - if (add_it != add_files.end() && add_it->second != f) { - vstorage->RemoveCurrentStats(f); - } else { - vstorage->AddFile(level, f, info_log_); - } - } - } }; VersionBuilder::VersionBuilder(const EnvOptions& env_options, @@ -597,10 +624,4 @@ Status VersionBuilder::LoadTableHandlers( prefetch_index_and_filter_in_cache, is_initial_load, prefix_extractor); } - -void VersionBuilder::MaybeAddFile(VersionStorageInfo* vstorage, int level, - FileMetaData* f) { - rep_->MaybeAddFile(vstorage, level, f); -} - } // namespace rocksdb diff --git a/db/version_builder.h b/db/version_builder.h index bdd3b151ef3..504ee58c078 100644 --- a/db/version_builder.h +++ b/db/version_builder.h @@ -18,7 +18,7 @@ namespace rocksdb { class TableCache; class VersionStorageInfo; class VersionEdit; -struct FileMetaData; +class FileMetaData; class InternalStats; // A helper class so we can efficiently apply a whole sequence @@ -38,7 +38,6 @@ class VersionBuilder { bool prefetch_index_and_filter_in_cache, bool is_initial_load, const SliceTransform* prefix_extractor); - void MaybeAddFile(VersionStorageInfo* vstorage, int level, FileMetaData* f); private: class Rep; diff --git a/db/version_builder_test.cc b/db/version_builder_test.cc index 35c234fb918..2f049ef2a64 100644 --- a/db/version_builder_test.cc +++ b/db/version_builder_test.cc @@ -41,7 +41,7 @@ class VersionBuilderTest : public testing::Test { ~VersionBuilderTest() override { for (int i = 0; i < vstorage_.num_levels(); i++) { for (auto* f : vstorage_.LevelFiles(i)) { - if (--f->refs == 0) { + if (f->Unref()) { delete f; } } @@ -67,7 +67,6 @@ class VersionBuilderTest : public testing::Test { f->fd.smallest_seqno = smallest_seqno; f->fd.largest_seqno = largest_seqno; f->compensated_file_size = file_size; - f->refs = 0; f->num_entries = num_entries; f->num_deletions = num_deletions; vstorage_.AddFile(level, f); @@ -88,10 +87,30 @@ class VersionBuilderTest : public testing::Test { } }; +// Check that one file number is mapped to one unique FileMetaData in a series +// of versions. +struct FileReferenceChecker { + std::unordered_map files; + + bool Check(const VersionStorageInfo* vstorage) { + for (int i = 0; i < vstorage->num_levels(); i++) { + for (auto* f : vstorage->LevelFiles(i)) { + auto it = files.find(f->fd.GetNumber()); + if (it == files.end()) { + files[f->fd.GetNumber()] = f; + } else if (it->second != f) { + return false; + } + } + } + return true; + } +}; + void UnrefFilesInVersion(VersionStorageInfo* new_vstorage) { for (int i = 0; i < new_vstorage->num_levels(); i++) { for (auto* f : new_vstorage->LevelFiles(i)) { - if (--f->refs == 0) { + if (f->Unref()) { delete f; } } @@ -325,45 +344,121 @@ TEST_F(VersionBuilderTest, ApplyFileDeletionNotInLSMTree) { TEST_F(VersionBuilderTest, ApplyFileDeletionAndAddition) { constexpr int level = 1; constexpr uint64_t file_number = 2345; + constexpr uint64_t path_id = 0; + constexpr uint64_t file_size = 1024; constexpr char smallest[] = "bar"; constexpr char largest[] = "foo"; + constexpr SequenceNumber smallest_seq = 100; + constexpr SequenceNumber largest_seq = 100; + constexpr uint64_t num_entries = 1000; + constexpr uint64_t num_deletions = 0; + constexpr bool sampled = true; + constexpr SequenceNumber smallest_seqno = 1; + constexpr SequenceNumber largest_seqno = 1000; + constexpr bool marked_for_compaction = false; + constexpr bool force_consistency_checks = false; - Add(level, file_number, smallest, largest); + Add(level, file_number, smallest, largest, file_size, path_id, smallest_seq, + largest_seq, num_entries, num_deletions, sampled, smallest_seqno, + largest_seqno); EnvOptions env_options; constexpr TableCache* table_cache = nullptr; - VersionBuilder builder(env_options, table_cache, &vstorage_); - VersionEdit deletion; - deletion.DeleteFile(level, file_number); - ASSERT_OK(builder.Apply(&deletion)); - - VersionEdit addition; - - constexpr uint32_t path_id = 0; - constexpr uint64_t file_size = 10000; - constexpr SequenceNumber smallest_seqno = 100; - constexpr SequenceNumber largest_seqno = 1000; - constexpr bool marked_for_compaction = false; - - addition.AddFile(level, file_number, path_id, file_size, - GetInternalKey(smallest), GetInternalKey(largest), - smallest_seqno, largest_seqno, marked_for_compaction); - - ASSERT_OK(builder.Apply(&addition)); + { + VersionBuilder builder(env_options, table_cache, &vstorage_); + ASSERT_OK(builder.Apply(&deletion)); + VersionEdit addition; + addition.AddFile(level, file_number, path_id, file_size, + GetInternalKey("181", smallest_seq), + GetInternalKey("798", largest_seq), smallest_seqno, + largest_seqno, marked_for_compaction); + ASSERT_OK(builder.Apply(&addition)); + VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, + kCompactionStyleLevel, &vstorage_, + force_consistency_checks); + ASSERT_OK(builder.SaveTo(&new_vstorage)); + ASSERT_EQ(new_vstorage.GetFileLocation(file_number).GetLevel(), level); + FileReferenceChecker checker; + ASSERT_TRUE(checker.Check(&vstorage_)); + ASSERT_TRUE(checker.Check(&new_vstorage)); + UnrefFilesInVersion(&new_vstorage); + } - constexpr bool force_consistency_checks = false; - VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, - kCompactionStyleLevel, &vstorage_, - force_consistency_checks); + { // Move to a higher level. + VersionBuilder builder(env_options, table_cache, &vstorage_); + ASSERT_OK(builder.Apply(&deletion)); + VersionEdit addition; + addition.AddFile(level + 1, file_number, path_id, file_size, + GetInternalKey("181", smallest_seq), + GetInternalKey("798", largest_seq), smallest_seqno, + largest_seqno, marked_for_compaction); + ASSERT_OK(builder.Apply(&addition)); + VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, + kCompactionStyleLevel, &vstorage_, + force_consistency_checks); + ASSERT_OK(builder.SaveTo(&new_vstorage)); + ASSERT_EQ(new_vstorage.GetFileLocation(file_number).GetLevel(), level + 1); + // File movement should not change key estimation. + ASSERT_EQ(vstorage_.GetEstimatedActiveKeys(), + new_vstorage.GetEstimatedActiveKeys()); + FileReferenceChecker checker; + ASSERT_TRUE(checker.Check(&vstorage_)); + ASSERT_TRUE(checker.Check(&new_vstorage)); + UnrefFilesInVersion(&new_vstorage); + } - ASSERT_OK(builder.SaveTo(&new_vstorage)); - ASSERT_EQ(new_vstorage.GetFileLocation(file_number).GetLevel(), level); + { // Move to a different path. + VersionBuilder builder(env_options, table_cache, &vstorage_); + ASSERT_OK(builder.Apply(&deletion)); + VersionEdit addition; + addition.AddFile(level, file_number, path_id + 1, file_size, + GetInternalKey("181", smallest_seq), + GetInternalKey("798", largest_seq), smallest_seqno, + largest_seqno, marked_for_compaction); + const Status s = builder.Apply(&addition); + ASSERT_TRUE(s.IsCorruption()); + ASSERT_TRUE( + std::strstr(s.getState(), + "Cannot add table file #2345 to level 1 by trivial " + "move since it isn't trivial to move to a different " + "path")); + } - UnrefFilesInVersion(&new_vstorage); + { // Move twice. + VersionBuilder builder(env_options, table_cache, &vstorage_); + ASSERT_OK(builder.Apply(&deletion)); + VersionEdit addition_1; + addition_1.AddFile(level + 1, file_number, path_id, file_size, + GetInternalKey("181", smallest_seq), + GetInternalKey("798", largest_seq), smallest_seqno, + largest_seqno, marked_for_compaction); + VersionEdit deletion_1; + deletion_1.DeleteFile(level + 1, file_number); + VersionEdit addition_2; + addition_2.AddFile(level + 2, file_number, path_id, file_size, + GetInternalKey("181", smallest_seq), + GetInternalKey("798", largest_seq), smallest_seqno, + largest_seqno, marked_for_compaction); + ASSERT_OK(builder.Apply(&addition_1)); + ASSERT_OK(builder.Apply(&deletion_1)); + ASSERT_OK(builder.Apply(&addition_2)); + VersionStorageInfo new_vstorage(&icmp_, ucmp_, options_.num_levels, + kCompactionStyleLevel, &vstorage_, + force_consistency_checks); + ASSERT_OK(builder.SaveTo(&new_vstorage)); + ASSERT_EQ(new_vstorage.GetFileLocation(file_number).GetLevel(), level + 2); + // File movement should not change key estimation. + ASSERT_EQ(vstorage_.GetEstimatedActiveKeys(), + new_vstorage.GetEstimatedActiveKeys()); + FileReferenceChecker checker; + ASSERT_TRUE(checker.Check(&vstorage_)); + ASSERT_TRUE(checker.Check(&new_vstorage)); + UnrefFilesInVersion(&new_vstorage); + } } TEST_F(VersionBuilderTest, ApplyFileAdditionAlreadyInBase) { @@ -388,6 +483,7 @@ TEST_F(VersionBuilderTest, ApplyFileAdditionAlreadyInBase) { constexpr SequenceNumber largest_seqno = 1000; constexpr bool marked_for_compaction = false; + // Add an existing file. edit.AddFile(new_level, file_number, path_id, file_size, GetInternalKey(smallest), GetInternalKey(largest), smallest_seqno, largest_seqno, marked_for_compaction); diff --git a/db/version_edit.h b/db/version_edit.h index 0d55a4827ba..13f3e8b9e98 100644 --- a/db/version_edit.h +++ b/db/version_edit.h @@ -86,12 +86,13 @@ struct FileSampledStats { mutable std::atomic num_reads_sampled; }; -struct FileMetaData { +class FileMetaData { + public: FileDescriptor fd; InternalKey smallest; // Smallest internal key served by table InternalKey largest; // Largest internal key served by table - // Needs to be disposed when refs becomes 0. + // Needs to be disposed when refs_ becomes 0. Cache::Handle* table_reader_handle; FileSampledStats stats; @@ -109,9 +110,8 @@ struct FileMetaData { uint64_t raw_key_size; // total uncompressed key size. uint64_t raw_value_size; // total uncompressed value size. - int refs; // Reference count - bool being_compacted; // Is this file undergoing compaction? + int being_moved_to; // Is this file undergoing trivial move? bool init_stats_from_file; // true if the data-entry stats of this file // has initialized from file. @@ -125,10 +125,18 @@ struct FileMetaData { num_deletions(0), raw_key_size(0), raw_value_size(0), - refs(0), being_compacted(false), + being_moved_to(-1), init_stats_from_file(false), - marked_for_compaction(false) {} + marked_for_compaction(false), + refs_(0) {} + + void Ref() { ++refs_; } + + bool Unref() { + assert(refs_ > 0); + return --refs_ <= 0; + } // REQUIRED: Keys must be given to the function in sorted order (it expects // the last key to be the largest). @@ -173,6 +181,9 @@ struct FileMetaData { r.append("]"); return r; } + + private: + int refs_; // Reference count }; // A compressed copy of file meta data that just contain minimum data needed diff --git a/db/version_set.cc b/db/version_set.cc index c1df704625b..84c7041ec92 100644 --- a/db/version_set.cc +++ b/db/version_set.cc @@ -742,9 +742,7 @@ Version::~Version() { for (int level = 0; level < storage_info_.num_levels_; level++) { for (size_t i = 0; i < storage_info_.files_[level].size(); i++) { FileMetaData* f = storage_info_.files_[level][i]; - assert(f->refs > 0); - f->refs--; - if (f->refs <= 0) { + if (f->Unref()) { assert(cfd_ != nullptr); uint32_t path_id = f->fd.GetPathId(); assert(path_id < cfd_->ioptions()->cf_paths.size()); @@ -2297,6 +2295,7 @@ void VersionStorageInfo::EstimateCompactionBytesNeeded( namespace { uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions, const MutableCFOptions& mutable_cf_options, + int level, const std::vector& files) { uint32_t ttl_expired_files_count = 0; @@ -2305,7 +2304,8 @@ uint32_t GetExpiredTtlFilesCount(const ImmutableCFOptions& ioptions, if (status.ok()) { const uint64_t current_time = static_cast(_current_time); for (auto f : files) { - if (!f->being_compacted && f->fd.table_reader != nullptr && + if ((!f->being_compacted || f->being_moved_to == level) && + f->fd.table_reader != nullptr && f->fd.table_reader->GetTableProperties() != nullptr) { auto creation_time = f->fd.table_reader->GetTableProperties()->creation_time; @@ -2340,7 +2340,7 @@ void VersionStorageInfo::ComputeCompactionScore( int num_sorted_runs = 0; uint64_t total_size = 0; for (auto* f : files_[level]) { - if (!f->being_compacted) { + if (!f->being_compacted || f->being_moved_to == level) { total_size += f->compensated_file_size; num_sorted_runs++; } @@ -2350,7 +2350,8 @@ void VersionStorageInfo::ComputeCompactionScore( // compaction score for the whole DB. Adding other levels as if // they are L0 files. for (int i = 1; i < num_levels(); i++) { - if (!files_[i].empty() && !files_[i][0]->being_compacted) { + if (!files_[i].empty() && (!files_[i][0]->being_compacted || + files_[i][0]->being_moved_to == i)) { num_sorted_runs++; } } @@ -2366,10 +2367,10 @@ void VersionStorageInfo::ComputeCompactionScore( score); } if (mutable_cf_options.ttl > 0) { - score = std::max( - static_cast(GetExpiredTtlFilesCount( - immutable_cf_options, mutable_cf_options, files_[level])), - score); + score = std::max(static_cast(GetExpiredTtlFilesCount( + immutable_cf_options, mutable_cf_options, level, + files_[level])), + score); } } else { @@ -2388,7 +2389,7 @@ void VersionStorageInfo::ComputeCompactionScore( // Compute the ratio of current size to size limit. uint64_t level_bytes_no_compacting = 0; for (auto f : files_[level]) { - if (!f->being_compacted) { + if (!f->being_compacted || f->being_moved_to == level) { level_bytes_no_compacting += f->compensated_file_size; } } @@ -2441,7 +2442,8 @@ void VersionStorageInfo::ComputeFilesMarkedForCompaction() { for (int level = 0; level <= last_qualify_level; level++) { for (auto* f : files_[level]) { - if (!f->being_compacted && f->marked_for_compaction) { + if ((!f->being_compacted || f->being_moved_to == level) && + f->marked_for_compaction) { files_marked_for_compaction_.emplace_back(level, f); } } @@ -2463,7 +2465,8 @@ void VersionStorageInfo::ComputeExpiredTtlFiles( for (int level = 0; level < num_levels() - 1; level++) { for (auto f : files_[level]) { - if (!f->being_compacted && f->fd.table_reader != nullptr && + if ((!f->being_compacted || f->being_moved_to == level) && + f->fd.table_reader != nullptr && f->fd.table_reader->GetTableProperties() != nullptr) { auto creation_time = f->fd.table_reader->GetTableProperties()->creation_time; @@ -2493,7 +2496,8 @@ void VersionStorageInfo::ComputeFilesMarkedForPeriodicCompaction( for (int level = 0; level < num_levels(); level++) { for (auto f : files_[level]) { - if (!f->being_compacted && f->fd.table_reader != nullptr && + if ((!f->being_compacted || f->being_moved_to == level) && + f->fd.table_reader != nullptr && f->fd.table_reader->GetTableProperties() != nullptr) { // Compute a file's modification time in the following order: // 1. Use file_creation_time table property if it is > 0. @@ -2545,13 +2549,13 @@ bool CompareCompensatedSizeDescending(const Fsize& first, const Fsize& second) { } // anonymous namespace void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) { - auto* level_files = &files_[level]; + auto& level_files = files_[level]; // Must not overlap #ifndef NDEBUG - if (level > 0 && !level_files->empty() && + if (level > 0 && !level_files.empty() && internal_comparator_->Compare( - (*level_files)[level_files->size() - 1]->largest, f->smallest) >= 0) { - auto* f2 = (*level_files)[level_files->size() - 1]; + level_files[level_files.size() - 1]->largest, f->smallest) >= 0) { + auto* f2 = level_files[level_files.size() - 1]; if (info_log != nullptr) { Error(info_log, "Adding new file %" PRIu64 " range (%s, %s) to level %d but overlapping " @@ -2567,14 +2571,15 @@ void VersionStorageInfo::AddFile(int level, FileMetaData* f, Logger* info_log) { #else (void)info_log; #endif - f->refs++; - level_files->push_back(f); + level_files.push_back(f); + + f->Ref(); const uint64_t file_number = f->fd.GetNumber(); assert(file_locations_.find(file_number) == file_locations_.end()); file_locations_.emplace(file_number, - FileLocation(level, level_files->size() - 1)); + FileLocation(level, level_files.size() - 1)); } // Version::PrepareApply() need to be called before calling the function, or @@ -2617,6 +2622,11 @@ void VersionStorageInfo::SetFinalized() { if (LevelFiles(level).size() > 0) { assert(level < num_non_empty_levels()); } + for (auto* f : LevelFiles(level)) { + if (f->being_moved_to == level) { + f->being_moved_to = -1; + } + } } assert(compaction_level_.size() > 0); assert(compaction_level_.size() == compaction_score_.size()); @@ -3366,11 +3376,13 @@ bool VersionStorageInfo::RangeMightExistAfterSortedRun( return false; } -void Version::AddLiveFiles(std::vector* live) { - for (int level = 0; level < storage_info_.num_levels(); level++) { - const std::vector& files = storage_info_.files_[level]; - for (const auto& file : files) { - live->push_back(file->fd); +void Version::AddLiveFiles(std::vector* live_table_files) const { + assert(live_table_files); + for (int level = 0; level < storage_info_.num_levels(); ++level) { + const auto& level_files = storage_info_.LevelFiles(level); + for (const auto& meta : level_files) { + assert(meta); + live_table_files->emplace_back(meta->fd.GetNumber()); } } } @@ -5119,9 +5131,10 @@ uint64_t VersionSet::ApproximateSize(Version* v, const FdWithKeyRange& f, return result; } -void VersionSet::AddLiveFiles(std::vector* live_list) { +void VersionSet::AddLiveFiles(std::vector* live_table_files) const { + assert(live_table_files); // pre-calculate space requirement - int64_t total_files = 0; + int64_t total_table_files = 0; for (auto cfd : *column_family_set_) { if (!cfd->initialized()) { continue; @@ -5131,13 +5144,13 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { v = v->next_) { const auto* vstorage = v->storage_info(); for (int level = 0; level < vstorage->num_levels(); level++) { - total_files += vstorage->LevelFiles(level).size(); + total_table_files += vstorage->LevelFiles(level).size(); } } } // just one time extension to the right size - live_list->reserve(live_list->size() + static_cast(total_files)); + live_table_files->reserve(live_table_files->size() + total_table_files); for (auto cfd : *column_family_set_) { if (!cfd->initialized()) { @@ -5148,7 +5161,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { Version* dummy_versions = cfd->dummy_versions(); for (Version* v = dummy_versions->next_; v != dummy_versions; v = v->next_) { - v->AddLiveFiles(live_list); + v->AddLiveFiles(live_table_files); if (v == current) { found_current = true; } @@ -5156,7 +5169,7 @@ void VersionSet::AddLiveFiles(std::vector* live_list) { if (!found_current && current != nullptr) { // Should never happen unless it is a bug. assert(false); - current->AddLiveFiles(live_list); + current->AddLiveFiles(live_table_files); } } } diff --git a/db/version_set.h b/db/version_set.h index 88e2de6df7f..583080291a7 100644 --- a/db/version_set.h +++ b/db/version_set.h @@ -309,6 +309,17 @@ class VersionStorageInfo { return it->second; } + // REQUIRES: This version has been saved (see VersionSet::SaveTo) + FileMetaData* GetFileMetaDataByNumber(uint64_t file_number) const { + auto location = GetFileLocation(file_number); + + if (!location.IsValid()) { + return nullptr; + } + + return files_[location.GetLevel()][location.GetPosition()]; + } + const rocksdb::LevelFilesBrief& LevelFilesBrief(int level) const { assert(level < static_cast(level_files_brief_.size())); return level_files_brief_[level]; @@ -646,7 +657,7 @@ class Version { bool Unref(); // Add all files listed in the current version to *live. - void AddLiveFiles(std::vector* live); + void AddLiveFiles(std::vector* live) const; // Return a human readable string that describes this version's contents. std::string DebugString(bool hex = false, bool print_stats = false) const; @@ -1031,7 +1042,7 @@ Status DumpManifest(Options& options, std::string& dscname, const EnvOptions& env_options_compactions); // Add all files listed in any live version to *live. - void AddLiveFiles(std::vector* live_list); + void AddLiveFiles(std::vector* live_list) const; // Return the approximate size of data to be scanned for range [start, end) // in levels [start_level, end_level). If end_level == -1 it will search diff --git a/db/version_set_test.cc b/db/version_set_test.cc index 446ae94c720..d8dffaf00b1 100644 --- a/db/version_set_test.cc +++ b/db/version_set_test.cc @@ -118,7 +118,7 @@ class VersionStorageInfoTest : public testing::Test { ~VersionStorageInfoTest() override { for (int i = 0; i < vstorage_.num_levels(); i++) { for (auto* f : vstorage_.LevelFiles(i)) { - if (--f->refs == 0) { + if (f->Unref()) { delete f; } } @@ -133,7 +133,6 @@ class VersionStorageInfoTest : public testing::Test { f->smallest = GetInternalKey(smallest, 0); f->largest = GetInternalKey(largest, 0); f->compensated_file_size = file_size; - f->refs = 0; f->num_entries = 0; f->num_deletions = 0; vstorage_.AddFile(level, f); @@ -147,7 +146,6 @@ class VersionStorageInfoTest : public testing::Test { f->smallest = smallest; f->largest = largest; f->compensated_file_size = file_size; - f->refs = 0; f->num_entries = 0; f->num_deletions = 0; vstorage_.AddFile(level, f); @@ -409,7 +407,7 @@ TEST_F(VersionStorageInfoTest, GetOverlappingInputs) { 1, {"i", 0, kTypeValue}, {"j", 0, kTypeValue})); } -TEST_F(VersionStorageInfoTest, FileLocation) { +TEST_F(VersionStorageInfoTest, FileLocationAndMetaDataByNumber) { Add(0, 11U, "1", "2", 5000U); Add(0, 12U, "1", "2", 5000U); @@ -417,13 +415,18 @@ TEST_F(VersionStorageInfoTest, FileLocation) { ASSERT_EQ(vstorage_.GetFileLocation(11U), VersionStorageInfo::FileLocation(0, 0)); + ASSERT_NE(vstorage_.GetFileMetaDataByNumber(11U), nullptr); + ASSERT_EQ(vstorage_.GetFileLocation(12U), VersionStorageInfo::FileLocation(0, 1)); + ASSERT_NE(vstorage_.GetFileMetaDataByNumber(12U), nullptr); ASSERT_EQ(vstorage_.GetFileLocation(7U), VersionStorageInfo::FileLocation(2, 0)); + ASSERT_NE(vstorage_.GetFileMetaDataByNumber(7U), nullptr); ASSERT_FALSE(vstorage_.GetFileLocation(999U).IsValid()); + ASSERT_EQ(vstorage_.GetFileMetaDataByNumber(999U), nullptr); } class FindLevelFileTest : public testing::Test {