diff --git a/db/column_family.cc b/db/column_family.cc index 4adf0611b0f..a84f69d9c54 100644 --- a/db/column_family.cc +++ b/db/column_family.cc @@ -485,7 +485,7 @@ ColumnFamilyData::ColumnFamilyData( } } - RecalculateWriteStallConditions(mutable_cf_options_); + RecalculateWriteStallConditions(mutable_cf_options_, ioptions_.rate_limiter); } // DB mutex held @@ -657,7 +657,7 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger, assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger); if (level0_file_num_compaction_trigger < 0) { - return std::numeric_limits::max(); + return port::kMaxInt32; } const int64_t twice_level0_trigger = @@ -720,7 +720,7 @@ ColumnFamilyData::GetWriteStallConditionAndCause( } WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( - const MutableCFOptions& mutable_cf_options) { + const MutableCFOptions& mutable_cf_options, RateLimiter* rate_limiter) { auto write_stall_condition = WriteStallCondition::kNormal; if (current_ != nullptr) { auto* vstorage = current_->storage_info(); @@ -875,6 +875,16 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions( 4); } } + if (rate_limiter) { + // pace up limiter when close to write stall + if (write_stall_condition != WriteStallCondition::kNormal || + vstorage->l0_delay_trigger_count() >= + 0.8 * mutable_cf_options.level0_slowdown_writes_trigger || + vstorage->estimated_compaction_needed_bytes() >= + 0.6 * mutable_cf_options.soft_pending_compaction_bytes_limit) { + rate_limiter->PaceUp(); + } + } prev_compaction_needed_bytes_ = compaction_needed_bytes; } return write_stall_condition; @@ -1105,8 +1115,8 @@ void ColumnFamilyData::InstallSuperVersion( super_version_ = new_superversion; ++super_version_number_; super_version_->version_number = super_version_number_; - super_version_->write_stall_condition = - RecalculateWriteStallConditions(mutable_cf_options); + super_version_->write_stall_condition = RecalculateWriteStallConditions( + mutable_cf_options, ioptions_.rate_limiter); if (old_superversion != nullptr) { // Reset SuperVersions cached in thread local storage. diff --git a/db/column_family.h b/db/column_family.h index 8180f0be26a..90e605c3c0b 100644 --- a/db/column_family.h +++ b/db/column_family.h @@ -479,7 +479,8 @@ class ColumnFamilyData { // DBImpl::MakeRoomForWrite function to decide, if it need to make // a write stall WriteStallCondition RecalculateWriteStallConditions( - const MutableCFOptions& mutable_cf_options); + const MutableCFOptions& mutable_cf_options, + RateLimiter* rate_limiter = nullptr); void set_initialized() { initialized_.store(true); } diff --git a/include/rocksdb/rate_limiter.h b/include/rocksdb/rate_limiter.h index 8ae5e6d11f3..1fdbc29619f 100644 --- a/include/rocksdb/rate_limiter.h +++ b/include/rocksdb/rate_limiter.h @@ -101,6 +101,8 @@ class RateLimiter { return true; } + virtual void PaceUp() {} + protected: Mode GetMode() { return mode_; } diff --git a/utilities/rate_limiters/write_amp_based_rate_limiter.cc b/utilities/rate_limiters/write_amp_based_rate_limiter.cc index 18deb46b1a1..ebf1f34bd56 100644 --- a/utilities/rate_limiters/write_amp_based_rate_limiter.cc +++ b/utilities/rate_limiters/write_amp_based_rate_limiter.cc @@ -51,6 +51,7 @@ WriteAmpBasedRateLimiter::WriteAmpBasedRateLimiter(int64_t rate_bytes_per_sec, tuned_time_(NowMicrosMonotonic(env_)), duration_highpri_bytes_through_(0), duration_bytes_through_(0), + should_pace_up_(false), ratio_delta_(0) { total_requests_[0] = 0; total_requests_[1] = 0; @@ -326,6 +327,12 @@ Status WriteAmpBasedRateLimiter::Tune() { } else if (util < 95 && ratio_delta_ > 0) { ratio_delta_ -= 1; } + if (should_pace_up_.load(std::memory_order_relaxed)) { + if (ratio_delta_ < 60) { + ratio_delta_ += 60; // effect lasts for at least 60 * kSecondsPerTune = 1m + } + should_pace_up_.store(false, std::memory_order_relaxed); + } int64_t new_bytes_per_sec = (ratio + ratio_padding + ratio_delta_) * @@ -344,6 +351,12 @@ Status WriteAmpBasedRateLimiter::Tune() { return Status::OK(); } +void WriteAmpBasedRateLimiter::PaceUp() { + if (auto_tuned_) { + should_pace_up_.store(true, std::memory_order_relaxed); + } +} + RateLimiter* NewWriteAmpBasedRateLimiter( int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */, int32_t fairness /* = 10 */, diff --git a/utilities/rate_limiters/write_amp_based_rate_limiter.h b/utilities/rate_limiters/write_amp_based_rate_limiter.h index 029925b7779..8f59d90b3b2 100644 --- a/utilities/rate_limiters/write_amp_based_rate_limiter.h +++ b/utilities/rate_limiters/write_amp_based_rate_limiter.h @@ -67,6 +67,8 @@ class WriteAmpBasedRateLimiter : public RateLimiter { return rate_bytes_per_sec_; } + virtual void PaceUp() override; + private: void Refill(); int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec); @@ -149,6 +151,7 @@ class WriteAmpBasedRateLimiter : public RateLimiter { WindowSmoother long_term_highpri_bytes_sampler_; WindowSmoother limit_bytes_sampler_; + std::atomic should_pace_up_; int32_t ratio_delta_; };