Skip to content

Commit

Permalink
Pace up limiter before triggering write stall (#207)
Browse files Browse the repository at this point in the history
Proactively pace up auto-tuned rate limiter when LSM shape (L0-count/pending-bytes) is close to trigger write stall.

Test plan: sysbench wide_table 512 threads 100 columns

Signed-off-by: tabokie <xy.tao@outlook.com>
  • Loading branch information
tabokie committed Nov 16, 2020
1 parent 7093ba3 commit 355b496
Show file tree
Hide file tree
Showing 5 changed files with 35 additions and 6 deletions.
20 changes: 15 additions & 5 deletions db/column_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -485,7 +485,7 @@ ColumnFamilyData::ColumnFamilyData(
}
}

RecalculateWriteStallConditions(mutable_cf_options_);
RecalculateWriteStallConditions(mutable_cf_options_, ioptions_.rate_limiter);
}

// DB mutex held
Expand Down Expand Up @@ -657,7 +657,7 @@ int GetL0ThresholdSpeedupCompaction(int level0_file_num_compaction_trigger,
assert(level0_file_num_compaction_trigger <= level0_slowdown_writes_trigger);

if (level0_file_num_compaction_trigger < 0) {
return std::numeric_limits<int>::max();
return port::kMaxInt32;
}

const int64_t twice_level0_trigger =
Expand Down Expand Up @@ -720,7 +720,7 @@ ColumnFamilyData::GetWriteStallConditionAndCause(
}

WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options) {
const MutableCFOptions& mutable_cf_options, RateLimiter* rate_limiter) {
auto write_stall_condition = WriteStallCondition::kNormal;
if (current_ != nullptr) {
auto* vstorage = current_->storage_info();
Expand Down Expand Up @@ -875,6 +875,16 @@ WriteStallCondition ColumnFamilyData::RecalculateWriteStallConditions(
4);
}
}
if (rate_limiter) {
// pace up limiter when close to write stall
if (write_stall_condition != WriteStallCondition::kNormal ||
vstorage->l0_delay_trigger_count() >=
0.8 * mutable_cf_options.level0_slowdown_writes_trigger ||
vstorage->estimated_compaction_needed_bytes() >=
0.6 * mutable_cf_options.soft_pending_compaction_bytes_limit) {
rate_limiter->PaceUp();
}
}
prev_compaction_needed_bytes_ = compaction_needed_bytes;
}
return write_stall_condition;
Expand Down Expand Up @@ -1105,8 +1115,8 @@ void ColumnFamilyData::InstallSuperVersion(
super_version_ = new_superversion;
++super_version_number_;
super_version_->version_number = super_version_number_;
super_version_->write_stall_condition =
RecalculateWriteStallConditions(mutable_cf_options);
super_version_->write_stall_condition = RecalculateWriteStallConditions(
mutable_cf_options, ioptions_.rate_limiter);

if (old_superversion != nullptr) {
// Reset SuperVersions cached in thread local storage.
Expand Down
3 changes: 2 additions & 1 deletion db/column_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,8 @@ class ColumnFamilyData {
// DBImpl::MakeRoomForWrite function to decide, if it need to make
// a write stall
WriteStallCondition RecalculateWriteStallConditions(
const MutableCFOptions& mutable_cf_options);
const MutableCFOptions& mutable_cf_options,
RateLimiter* rate_limiter = nullptr);

void set_initialized() { initialized_.store(true); }

Expand Down
2 changes: 2 additions & 0 deletions include/rocksdb/rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,8 @@ class RateLimiter {
return true;
}

virtual void PaceUp() {}

protected:
Mode GetMode() { return mode_; }

Expand Down
13 changes: 13 additions & 0 deletions utilities/rate_limiters/write_amp_based_rate_limiter.cc
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ WriteAmpBasedRateLimiter::WriteAmpBasedRateLimiter(int64_t rate_bytes_per_sec,
tuned_time_(NowMicrosMonotonic(env_)),
duration_highpri_bytes_through_(0),
duration_bytes_through_(0),
should_pace_up_(false),
ratio_delta_(0) {
total_requests_[0] = 0;
total_requests_[1] = 0;
Expand Down Expand Up @@ -326,6 +327,12 @@ Status WriteAmpBasedRateLimiter::Tune() {
} else if (util < 95 && ratio_delta_ > 0) {
ratio_delta_ -= 1;
}
if (should_pace_up_.load(std::memory_order_relaxed)) {
if (ratio_delta_ < 60) {
ratio_delta_ += 60; // effect lasts for at least 60 * kSecondsPerTune = 1m
}
should_pace_up_.store(false, std::memory_order_relaxed);
}

int64_t new_bytes_per_sec =
(ratio + ratio_padding + ratio_delta_) *
Expand All @@ -344,6 +351,12 @@ Status WriteAmpBasedRateLimiter::Tune() {
return Status::OK();
}

void WriteAmpBasedRateLimiter::PaceUp() {
if (auto_tuned_) {
should_pace_up_.store(true, std::memory_order_relaxed);
}
}

RateLimiter* NewWriteAmpBasedRateLimiter(
int64_t rate_bytes_per_sec, int64_t refill_period_us /* = 100 * 1000 */,
int32_t fairness /* = 10 */,
Expand Down
3 changes: 3 additions & 0 deletions utilities/rate_limiters/write_amp_based_rate_limiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ class WriteAmpBasedRateLimiter : public RateLimiter {
return rate_bytes_per_sec_;
}

virtual void PaceUp() override;

private:
void Refill();
int64_t CalculateRefillBytesPerPeriod(int64_t rate_bytes_per_sec);
Expand Down Expand Up @@ -149,6 +151,7 @@ class WriteAmpBasedRateLimiter : public RateLimiter {
WindowSmoother<kLongTermWindowSize> long_term_highpri_bytes_sampler_;
WindowSmoother<kRecentSmoothWindowSize, kRecentSmoothWindowSize>
limit_bytes_sampler_;
std::atomic<bool> should_pace_up_;
int32_t ratio_delta_;
};

Expand Down

0 comments on commit 355b496

Please sign in to comment.