Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Track WAL in MANIFEST: LogAndApply WAL events to MANIFEST #7601

Closed
wants to merge 19 commits into from
Closed
Show file tree
Hide file tree
Changes from 18 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/sanity_check.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ jobs:
- name: Download clang-format-diff.py
uses: wei/wget@v1
with:
args: https://raw.githubusercontent.com/llvm-mirror/clang/master/tools/clang-format/clang-format-diff.py
args: https://raw.githubusercontent.com/llvm/llvm-project/master/clang/tools/clang-format/clang-format-diff.py

- name: Check format
run: VERBOSE_CHECK=1 make check-format
Expand Down
2 changes: 2 additions & 0 deletions HISTORY.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@
* Reverted a behavior change silently introduced in 6.14.2, in which the effects of the `ignore_unknown_options` flag (used in option parsing/loading functions) changed.
* Reverted a behavior change silently introduced in 6.14, in which options parsing/loading functions began returning `NotFound` instead of `InvalidArgument` for option names not available in the present version.
* Fixed MultiGet bugs it doesn't return valid data with user defined timestamp.
* Fixed a potential bug caused by evaluating `TableBuilder::NeedCompact()` before `TableBuilder::Finish()` in compaction job. For example, the `NeedCompact()` method of `CompactOnDeletionCollector` returned by built-in `CompactOnDeletionCollectorFactory` requires `BlockBasedTable::Finish()` to return the correct result. The bug can cause a compaction-generated file not to be marked for future compaction based on deletion ratio.

### Public API Change
* Deprecate `BlockBasedTableOptions::pin_l0_filter_and_index_blocks_in_cache` and `BlockBasedTableOptions::pin_top_level_index_and_filter`. These options still take effect until users migrate to the replacement APIs in `BlockBasedTableOptions::metadata_cache_options`. Migration guidance can be found in the API comments on the deprecated options.
* Add new API `DB::VerifyFileChecksums` to verify SST file checksum with corresponding entries in the MANIFEST if present. Current implementation requires scanning and recomputing file checksums.

### Behavior Changes
* The dictionary compression settings specified in `ColumnFamilyOptions::compression_opts` now additionally affect files generated by flush and compaction to non-bottommost level. Previously those settings at most affected files generated by compaction to bottommost level, depending on whether `ColumnFamilyOptions::bottommost_compression_opts` overrode them. Users who relied on dictionary compression settings in `ColumnFamilyOptions::compression_opts` affecting only the bottommost level can keep the behavior by moving their dictionary settings to `ColumnFamilyOptions::bottommost_compression_opts` and setting its `enabled` flag.
Expand Down
8 changes: 4 additions & 4 deletions build_tools/format-diff.sh
Original file line number Diff line number Diff line change
Expand Up @@ -52,15 +52,15 @@ else
else
echo "You didn't have clang-format-diff.py and/or clang-format available in your computer!"
echo "You can download clang-format-diff.py by running: "
echo " curl --location http://goo.gl/iUW1u2 -o ${CLANG_FORMAT_DIFF}"
echo " curl --location https://tinyurl.com/y2kvokof -o ${REPO_ROOT}/clang-format-diff.py"
echo "You can download clang-format by running:"
echo " brew install clang-format"
echo " Or"
echo " apt install clang-format"
echo " This might work too:"
echo " yum install git-clang-format"
echo "Then, move both files (i.e. ${CLANG_FORMAT_DIFF} and clang-format) to some directory within PATH=${PATH}"
echo "and make sure ${CLANG_FORMAT_DIFF} is executable."
echo "Then make sure clang-format is available and executable from \$PATH:"
echo " clang-format --version"
exit 128
fi
# Check argparse pre-req on interpreter, or it will fail
Expand All @@ -82,7 +82,7 @@ else
echo "You have clang-format-diff.py for Python 2 but are using a Python 3"
echo "interpreter (${PYTHON:-python3})."
echo "You can download clang-format-diff.py for Python 3 by running: "
echo " curl --location http://goo.gl/iUW1u2 -o ${REPO_ROOT}/clang-format-diff.py"
echo " curl --location https://tinyurl.com/y2kvokof -o ${REPO_ROOT}/clang-format-diff.py"
exit 130
fi
CLANG_FORMAT_DIFF="${PYTHON:-python3} $CFD_PATH"
Expand Down
5 changes: 3 additions & 2 deletions db/column_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,10 +35,10 @@ namespace ROCKSDB_NAMESPACE {
static const int kValueSize = 1000;

// counts how many operations were performed
class EnvCounter : public EnvWrapper {
class EnvCounter : public SpecialEnv {
public:
explicit EnvCounter(Env* base)
: EnvWrapper(base), num_new_writable_file_(0) {}
: SpecialEnv(base), num_new_writable_file_(0) {}
int GetNumberOfNewWritableFileCalls() {
return num_new_writable_file_;
}
Expand Down Expand Up @@ -68,6 +68,7 @@ class ColumnFamilyTestBase : public testing::Test {
#endif // !ROCKSDB_LITE
EXPECT_NE(nullptr, base_env);
env_ = new EnvCounter(base_env);
env_->skip_fsync_ = true;
dbname_ = test::PerThreadDBPath("column_family_test");
db_options_.create_if_missing = true;
db_options_.fail_if_options_file_error = true;
Expand Down
2 changes: 1 addition & 1 deletion db/compaction/compaction_job.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1439,7 +1439,6 @@ Status CompactionJob::FinishCompactionOutputFile(
ExtractInternalKeyFooter(meta->smallest.Encode()) !=
PackSequenceAndType(0, kTypeRangeDeletion));
}
meta->marked_for_compaction = sub_compact->builder->NeedCompact();
}
const uint64_t current_entries = sub_compact->builder->NumEntries();
if (s.ok()) {
Expand All @@ -1454,6 +1453,7 @@ Status CompactionJob::FinishCompactionOutputFile(
const uint64_t current_bytes = sub_compact->builder->FileSize();
if (s.ok()) {
meta->fd.file_size = current_bytes;
meta->marked_for_compaction = sub_compact->builder->NeedCompact();
}
sub_compact->current_output()->finished = true;
sub_compact->total_bytes += current_bytes;
Expand Down
46 changes: 44 additions & 2 deletions db/corruption_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@

namespace ROCKSDB_NAMESPACE {

static const int kValueSize = 1000;
static constexpr int kValueSize = 1000;

class CorruptionTest : public testing::Test {
public:
Expand Down Expand Up @@ -68,9 +68,16 @@ class CorruptionTest : public testing::Test {
}

~CorruptionTest() override {
SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->LoadDependency({});
SyncPoint::GetInstance()->ClearAllCallBacks();
delete db_;
db_ = nullptr;
DestroyDB(dbname_, Options());
if (getenv("KEEP_DB")) {
fprintf(stdout, "db is still at %s\n", dbname_.c_str());
} else {
EXPECT_OK(DestroyDB(dbname_, Options()));
}
}

void CloseDb() {
Expand Down Expand Up @@ -825,6 +832,41 @@ TEST_F(CorruptionTest, DisableKeyOrderCheck) {
ROCKSDB_NAMESPACE::SyncPoint::GetInstance()->ClearAllCallBacks();
}

TEST_F(CorruptionTest, VerifyWholeTableChecksum) {
CloseDb();
Options options;
options.env = &env_;
ASSERT_OK(DestroyDB(dbname_, options));
options.create_if_missing = true;
options.file_checksum_gen_factory =
ROCKSDB_NAMESPACE::GetFileChecksumGenCrc32cFactory();
Reopen(&options);

Build(10, 5);

ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
CloseDb();

// Corrupt the first byte of each table file, this must be data block.
Corrupt(kTableFile, 0, 1);

ASSERT_OK(TryReopen(&options));

SyncPoint::GetInstance()->DisableProcessing();
SyncPoint::GetInstance()->ClearAllCallBacks();
int count{0};
SyncPoint::GetInstance()->SetCallBack(
"DBImpl::VerifySstFileChecksum:mismatch", [&](void* arg) {
auto* s = reinterpret_cast<Status*>(arg);
assert(s);
++count;
ASSERT_NOK(*s);
});
SyncPoint::GetInstance()->EnableProcessing();
ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsCorruption());
ASSERT_EQ(1, count);
}

} // namespace ROCKSDB_NAMESPACE

int main(int argc, char** argv) {
Expand Down
22 changes: 22 additions & 0 deletions db/db_basic_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -3304,6 +3304,28 @@ TEST_F(DBBasicTest, ManifestWriteFailure) {
Reopen(options);
}

#ifndef ROCKSDB_LITE
TEST_F(DBBasicTest, VerifyFileChecksums) {
Options options = GetDefaultOptions();
options.create_if_missing = true;
options.env = env_;
DestroyAndReopen(options);
ASSERT_OK(Put("a", "value"));
ASSERT_OK(Flush());
ASSERT_TRUE(db_->VerifyFileChecksums(ReadOptions()).IsInvalidArgument());

options.file_checksum_gen_factory = GetFileChecksumGenCrc32cFactory();
Reopen(options);
ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));

// Write an L0 with checksum computed.
ASSERT_OK(Put("b", "value"));
ASSERT_OK(Flush());

ASSERT_OK(db_->VerifyFileChecksums(ReadOptions()));
}
#endif // !ROCKSDB_LITE

// A test class for intercepting random reads and injecting artificial
// delays. Used for testing the deadline/timeout feature
class DBBasicTestDeadline
Expand Down
116 changes: 102 additions & 14 deletions db/db_impl/db_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
#include <cstdio>
#include <map>
#include <set>
#include <sstream>
#include <stdexcept>
#include <string>
#include <unordered_map>
Expand Down Expand Up @@ -1282,7 +1283,11 @@ Status DBImpl::SyncWAL() {
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:1");
{
InstrumentedMutexLock l(&mutex_);
MarkLogsSynced(current_log_number, need_log_dir_sync, status);
if (status.ok()) {
status = MarkLogsSynced(current_log_number, need_log_dir_sync);
} else {
MarkLogsNotSynced(current_log_number);
}
}
TEST_SYNC_POINT("DBImpl::SyncWAL:BeforeMarkLogsSynced:2");

Expand All @@ -1308,27 +1313,53 @@ Status DBImpl::UnlockWAL() {
return Status::OK();
}

void DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir,
const Status& status) {
Status DBImpl::MarkLogsSynced(uint64_t up_to, bool synced_dir) {
mutex_.AssertHeld();
if (synced_dir && logfile_number_ == up_to && status.ok()) {
if (synced_dir && logfile_number_ == up_to) {
log_dir_synced_ = true;
}
VersionEdit synced_wals;
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;) {
auto& log = *it;
assert(log.getting_synced);
if (status.ok() && logs_.size() > 1) {
logs_to_free_.push_back(log.ReleaseWriter());
auto& wal = *it;
assert(wal.getting_synced);
if (logs_.size() > 1) {
if (immutable_db_options_.track_and_verify_wals_in_manifest) {
synced_wals.AddWal(wal.number,
WalMetadata(wal.writer->file()->GetFileSize()));
ltamasi marked this conversation as resolved.
Show resolved Hide resolved
}
logs_to_free_.push_back(wal.ReleaseWriter());
// To modify logs_ both mutex_ and log_write_mutex_ must be held
InstrumentedMutexLock l(&log_write_mutex_);
it = logs_.erase(it);
} else {
log.getting_synced = false;
wal.getting_synced = false;
++it;
}
}
assert(!status.ok() || logs_.empty() || logs_[0].number > up_to ||
assert(logs_.empty() || logs_[0].number > up_to ||
(logs_.size() == 1 && !logs_[0].getting_synced));

Status s;
if (synced_wals.IsWalAddition()) {
// not empty, write to MANIFEST.
s = versions_->LogAndApplyToDefaultColumnFamily(&synced_wals, &mutex_);
if (!s.ok() && versions_->io_status().IsIOError()) {
s = error_handler_.SetBGError(versions_->io_status(),
BackgroundErrorReason::kManifestWrite);
}
}
log_sync_cv_.SignalAll();
return s;
}

void DBImpl::MarkLogsNotSynced(uint64_t up_to) {
mutex_.AssertHeld();
for (auto it = logs_.begin(); it != logs_.end() && it->number <= up_to;
++it) {
auto& wal = *it;
assert(wal.getting_synced);
wal.getting_synced = false;
}
log_sync_cv_.SignalAll();
}

Expand Down Expand Up @@ -4727,8 +4758,29 @@ Status DBImpl::CreateColumnFamilyWithImport(
return status;
}

Status DBImpl::VerifyFileChecksums(const ReadOptions& read_options) {
return VerifyChecksumInternal(read_options, /*use_file_checksum=*/true);
}

Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
return VerifyChecksumInternal(read_options, /*use_file_checksum=*/false);
}

Status DBImpl::VerifyChecksumInternal(const ReadOptions& read_options,
bool use_file_checksum) {
Status s;

if (use_file_checksum) {
FileChecksumGenFactory* const file_checksum_gen_factory =
immutable_db_options_.file_checksum_gen_factory.get();
if (!file_checksum_gen_factory) {
s = Status::InvalidArgument(
"Cannot verify file checksum if options.file_checksum_gen_factory is "
"null");
return s;
}
}

std::vector<ColumnFamilyData*> cfd_list;
{
InstrumentedMutexLock l(&mutex_);
Expand All @@ -4743,23 +4795,31 @@ Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
for (auto cfd : cfd_list) {
sv_list.push_back(cfd->GetReferencedSuperVersion(this));
}

for (auto& sv : sv_list) {
VersionStorageInfo* vstorage = sv->current->storage_info();
ColumnFamilyData* cfd = sv->current->cfd();
Options opts;
{
if (!use_file_checksum) {
InstrumentedMutexLock l(&mutex_);
opts = Options(BuildDBOptions(immutable_db_options_, mutable_db_options_),
cfd->GetLatestCFOptions());
}
for (int i = 0; i < vstorage->num_non_empty_levels() && s.ok(); i++) {
for (size_t j = 0; j < vstorage->LevelFilesBrief(i).num_files && s.ok();
j++) {
const auto& fd = vstorage->LevelFilesBrief(i).files[j].fd;
const auto& fd_with_krange = vstorage->LevelFilesBrief(i).files[j];
const auto& fd = fd_with_krange.fd;
const FileMetaData* fmeta = fd_with_krange.file_metadata;
assert(fmeta);
std::string fname = TableFileName(cfd->ioptions()->cf_paths,
fd.GetNumber(), fd.GetPathId());
s = ROCKSDB_NAMESPACE::VerifySstFileChecksum(opts, file_options_,
read_options, fname);
if (use_file_checksum) {
s = VerifySstFileChecksum(*fmeta, fname, read_options);
} else {
s = ROCKSDB_NAMESPACE::VerifySstFileChecksum(opts, file_options_,
read_options, fname);
}
}
}
if (!s.ok()) {
Expand Down Expand Up @@ -4790,6 +4850,34 @@ Status DBImpl::VerifyChecksum(const ReadOptions& read_options) {
return s;
}

Status DBImpl::VerifySstFileChecksum(const FileMetaData& fmeta,
const std::string& fname,
const ReadOptions& read_options) {
Status s;
if (fmeta.file_checksum == kUnknownFileChecksum) {
return s;
}
std::string file_checksum;
std::string func_name;
s = ROCKSDB_NAMESPACE::GenerateOneFileChecksum(
fs_.get(), fname, immutable_db_options_.file_checksum_gen_factory.get(),
fmeta.file_checksum_func_name, &file_checksum, &func_name,
read_options.readahead_size, immutable_db_options_.allow_mmap_reads,
io_tracer_);
if (s.ok()) {
assert(fmeta.file_checksum_func_name == func_name);
if (file_checksum != fmeta.file_checksum) {
std::ostringstream oss;
oss << fname << " file checksum mismatch, ";
oss << "expecting " << Slice(fmeta.file_checksum).ToString(/*hex=*/true);
oss << ", but actual " << Slice(file_checksum).ToString(/*hex=*/true);
s = Status::Corruption(oss.str());
TEST_SYNC_POINT_CALLBACK("DBImpl::VerifySstFileChecksum:mismatch", &s);
}
}
return s;
}

void DBImpl::NotifyOnExternalFileIngested(
ColumnFamilyData* cfd, const ExternalSstFileIngestionJob& ingestion_job) {
if (immutable_db_options_.listeners.empty()) {
Expand Down
Loading