diff --git a/CMakeLists.txt b/CMakeLists.txt index 28538cc3d2c..ddd50d2d99c 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -125,6 +125,7 @@ include(cmake/rocksdb.cmake) include(cmake/libevent.cmake) include(cmake/fmt.cmake) include(cmake/jsoncons.cmake) +include(cmake/hiredis.cmake) if (USE_LUAJIT) include(cmake/luajit.cmake) @@ -154,6 +155,7 @@ list(APPEND EXTERNAL_LIBS tbb) list(APPEND EXTERNAL_LIBS jsoncons) list(APPEND EXTERNAL_LIBS Threads::Threads) list(APPEND EXTERNAL_LIBS ${Backtrace_LIBRARY}) +list(APPEND EXTERNAL_LIBS hiredis) # Add git sha to version.h find_package(Git REQUIRED) diff --git a/cmake/hiredis.cmake b/cmake/hiredis.cmake new file mode 100644 index 00000000000..99e6902ae3a --- /dev/null +++ b/cmake/hiredis.cmake @@ -0,0 +1,36 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +include_guard() + +include(cmake/utils.cmake) + +FetchContent_DeclareGitHubWithMirror(hiredis + redis/hiredis v1.1.0 + MD5=fee8ff6f43c155fcb0efd6b13835564b +) + +FetchContent_MakeAvailableWithArgs(hiredis + BUILD_SHARED_LIBS=OFF + ENABLE_SSL=OFF + DISABLE_TESTS=ON + ENABLE_SSL_TESTS=OFF + ENABLE_EXAMPLES=OFF + ENABLE_ASYNC_TESTS=OFF +) + +target_compile_options(hiredis PUBLIC "-Wno-c99-extensions") diff --git a/kvrocks.conf b/kvrocks.conf index 106f0d85882..1cfb05260da 100644 --- a/kvrocks.conf +++ b/kvrocks.conf @@ -501,7 +501,17 @@ compaction-checker-range 0-7 # # rename-command KEYS "" -################################ MIGRATE ##################################### +################################ MIGRATION ##################################### +# Slot migration supports two ways: +# - redis_command: Migrate data by redis serialization protocol(RESP). +# - raw_key_value: Migrate the raw key value data of the storage engine directly. +# This way eliminates the overhead of converting to the redis +# command, reduces resource consumption, improves migration +# efficiency, and can implement a finer rate limit. +# +# Default: redis_command +migrate-type redis_command + # If the network bandwidth is completely consumed by the migration task, # it will affect the availability of kvrocks. To avoid this situation, # migrate-speed is adopted to limit the migrating speed. @@ -528,6 +538,18 @@ migrate-pipeline-size 16 # Default: 10000 migrate-sequence-gap 10000 +# The raw_key_value migration way uses batch for migration. This option sets the batch size +# for each migration. +# +# Default: 16kb +migrate-batch-size-kb 16 + +# Rate limit for migration based on raw_key_value, representing the maximum number of data +# that can be migrated per second. 0 means no limit. +# +# Default: 16M +migrate-batch-rate-limit-mb 16 + ################################ ROCKSDB ##################################### # Specify the capacity of metadata column family block cache. A larger block cache diff --git a/src/cluster/migrate_batch.cc b/src/cluster/migrate_batch.cc new file mode 100644 index 00000000000..73d7680e388 --- /dev/null +++ b/src/cluster/migrate_batch.cc @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "migrate_batch.h" + +#include "hiredis.h" +#include "scope_exit.h" + +Status MigrateBatch::Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key, const rocksdb::Slice &value) { + if (pending_entries_ == 0 && !prefix_logdata_.empty()) { + // add prefix_logdata_ when the entry is first putted + auto s = PutLogData(prefix_logdata_); + if (!s.IsOK()) { + return s; + } + } + auto s = write_batch_.Put(cf, key, value); + if (!s.ok()) { + return {Status::NotOK, fmt::format("put key value to migrate batch failed, {}", s.ToString())}; + } + + pending_entries_++; + entries_num_++; + return Status::OK(); +} + +Status MigrateBatch::Delete(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key) { + auto s = write_batch_.Delete(cf, key); + if (!s.ok()) { + return {Status::NotOK, fmt::format("delete key from migrate batch failed, {}", s.ToString())}; + } + pending_entries_++; + entries_num_++; + return Status::OK(); +} + +Status MigrateBatch::PutLogData(const rocksdb::Slice &blob) { + auto s = write_batch_.PutLogData(blob); + if (!s.ok()) { + return {Status::NotOK, fmt::format("put log data to migrate batch failed, {}", s.ToString())}; + } + pending_entries_++; + entries_num_++; + return Status::OK(); +} + +void MigrateBatch::SetPrefixLogData(const std::string &prefix_logdata) { prefix_logdata_ = prefix_logdata; } + +Status MigrateBatch::Send() { + if (pending_entries_ == 0) { + return Status::OK(); + } + + auto s = sendBatchSetCmd(slot_, dst_redis_context_, write_batch_); + if (!s.IsOK()) { + return {Status::NotOK, fmt::format("BATCHSET command failed, {}", s.Msg())}; + } + + sent_bytes_ += write_batch_.GetDataSize(); + sent_batches_num_++; + pending_entries_ = 0; + write_batch_.Clear(); + return Status::OK(); +} + +Status MigrateBatch::sendBatchSetCmd(int16_t slot, redisContext *redis_context, + const rocksdb::WriteBatch &write_batch) { + if (redis_context == nullptr) { + return {Status::NotOK, "redis context is null"}; + } + + auto *reply = static_cast( + redisCommand(redis_context, "BATCHSET %d %b", slot, write_batch.Data().c_str(), write_batch.GetDataSize())); + auto exit = MakeScopeExit([reply] { + if (reply != nullptr) { + freeReplyObject(reply); + } + }); + + if (redis_context->err != 0) { + return {Status::NotOK, std::string(redis_context->errstr)}; + } + + if (reply == nullptr) { + return {Status::NotOK, "get null reply"}; + } + + if (reply->type == REDIS_REPLY_ERROR) { + auto error_str = std::string(reply->str); + return {Status::NotOK, error_str}; + } + return Status::OK(); +} diff --git a/src/cluster/migrate_batch.h b/src/cluster/migrate_batch.h new file mode 100644 index 00000000000..f636c0a58c4 --- /dev/null +++ b/src/cluster/migrate_batch.h @@ -0,0 +1,65 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include + +#include "status.h" + +struct redisContext; + +class MigrateBatch { + public: + MigrateBatch() = default; + MigrateBatch(int16_t slot, redisContext *dst_redis_context, uint32_t max_bytes) + : slot_(slot), dst_redis_context_(dst_redis_context), max_bytes_(max_bytes) {} + + ~MigrateBatch() = default; + + Status Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key, const rocksdb::Slice &value); + Status Delete(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key); + Status PutLogData(const rocksdb::Slice &blob); + void SetPrefixLogData(const std::string &prefix_logdata); + Status Send(); + + void SetMaxBytes(uint32_t max_bytes) { + if (max_bytes_ != max_bytes) max_bytes_ = max_bytes; + } + bool IsFull() const { return write_batch_.GetDataSize() >= max_bytes_; } + size_t GetDataSize() const { return write_batch_.GetDataSize(); } + uint64_t GetSentBytes() const { return sent_bytes_; } + uint32_t GetSentBatchesNum() const { return sent_batches_num_; } + uint32_t GetEntriesNum() const { return entries_num_; } + + private: + static Status sendBatchSetCmd(int16_t slot, redisContext *redis_context, const rocksdb::WriteBatch &write_batch); + + rocksdb::WriteBatch write_batch_{}; + std::string prefix_logdata_{}; + uint64_t sent_bytes_ = 0; + uint32_t sent_batches_num_ = 0; + uint32_t entries_num_ = 0; + uint32_t pending_entries_ = 0; + + int16_t slot_; + redisContext *dst_redis_context_; + uint32_t max_bytes_; +}; diff --git a/src/cluster/migrate_iterator.cc b/src/cluster/migrate_iterator.cc new file mode 100644 index 00000000000..6d2e88dee3c --- /dev/null +++ b/src/cluster/migrate_iterator.cc @@ -0,0 +1,135 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "cluster/migrate_iterator.h" + +#include "db_util.h" +#include "storage/redis_db.h" + +MigrateIterator::MigrateIterator(engine::Storage *storage, const rocksdb::ReadOptions &read_options) + : metadata_cf_(storage->GetCFHandle(engine::kMetadataColumnFamilyName)), + subkey_cf_(storage->GetCFHandle(engine::kSubkeyColumnFamilyName)), + zset_score_cf_(storage->GetCFHandle(engine::kZSetScoreColumnFamilyName)), + metadata_iter_(util::UniqueIterator(storage, read_options, metadata_cf_)), + subdata_iter_(util::UniqueIterator(storage, read_options, subkey_cf_)), + valid_(false), + metadata_(RedisType::kRedisNone, false) {} + +bool MigrateIterator::Valid() const { return valid_; } + +void MigrateIterator::Seek(const rocksdb::Slice &target) { + items_.clear(); + log_data_.clear(); + metadata_iter_->Reset(); + subdata_iter_->Reset(); + metakey_prefix_.clear(); + + metadata_iter_->Seek(target); + valid_ = metadata_iter_->Valid() && metadata_iter_->key().starts_with(target); + if (valid_) { + metakey_prefix_ = target.ToString(); + findMetaData(); + } +} + +void MigrateIterator::Next() { + assert(valid_); + valid_ = false; + items_.clear(); + + if (subdata_iter_->Valid()) { + subdata_iter_->Next(); + valid_ = subdata_iter_->Valid() && subdata_iter_->key().starts_with(subkey_prefix_); + if (valid_) { + findSubData(); + } else { + subdata_iter_->Reset(); + } + } + + if (!valid_ && metadata_iter_->Valid()) { + metadata_iter_->Next(); + valid_ = metadata_iter_->Valid() && metadata_iter_->key().starts_with(metakey_prefix_); + if (valid_) { + findMetaData(); + } + } +} + +const std::vector &MigrateIterator::GetItems() const { + assert(valid_); + return items_; +} + +std::string MigrateIterator::GetLogData() const { + assert(valid_); + return log_data_; +} + +void MigrateIterator::findMetaData() { + assert(metadata_iter_->Valid()); + Metadata metadata(kRedisNone, false /* generate_version */); + metadata.Decode(metadata_iter_->value().ToString()); + RedisType redis_type = metadata.Type(); + metadata_ = metadata; + + redis::WriteBatchLogData log_data(redis_type); + if (redis_type == RedisType::kRedisList) { + log_data.SetArguments({std::to_string(RedisCommand::kRedisCmdRPush)}); + } + + log_data_ = log_data.Encode(); + + items_.push_back(MigrateItem{metadata_cf_, metadata_iter_->key().ToString(), metadata_iter_->value().ToString()}); + + subdata_iter_->Reset(); + + if (redis_type != RedisType::kRedisNone && redis_type != RedisType::kRedisString && metadata.size > 0) { + initSubData(); + } +} + +void MigrateIterator::initSubData() { + std::string prefix_subkey; + InternalKey(metadata_iter_->key(), "", metadata_.version, true /* slot_id_encoded */).Encode(&prefix_subkey); + subdata_iter_->Seek(prefix_subkey); + subkey_prefix_ = prefix_subkey; + if (subdata_iter_->Valid() && subdata_iter_->key().starts_with(subkey_prefix_)) { + findSubData(); + } +} + +void MigrateIterator::findSubData() { + assert(subdata_iter_->Valid()); + + items_.push_back(MigrateItem{subkey_cf_, subdata_iter_->key().ToString(), subdata_iter_->value().ToString()}); + if (metadata_.Type() == RedisType::kRedisZSet) { + InternalKey inkey(subdata_iter_->key(), true /* slot_id_encoded */); + std::string score_member, new_score_key; + score_member.append(subdata_iter_->value().ToString()); + score_member.append(inkey.GetSubKey().ToString()); + std::string ns, user_key; + + ExtractNamespaceKey(metadata_iter_->key(), &ns, &user_key, true /* slot_id_encoded */); + InternalKey(metadata_iter_->key(), score_member, metadata_.version, true /* cluster_enabled */) + .Encode(&new_score_key); + items_.push_back(MigrateItem{zset_score_cf_, new_score_key, std::string()}); + } +} diff --git a/src/cluster/migrate_iterator.h b/src/cluster/migrate_iterator.h new file mode 100644 index 00000000000..81ce51fe40d --- /dev/null +++ b/src/cluster/migrate_iterator.h @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include "storage/redis_metadata.h" +#include "storage/storage.h" + +struct MigrateItem { + rocksdb::ColumnFamilyHandle *cf; + std::string key; + std::string value; +}; + +class MigrateIterator { + public: + MigrateIterator(engine::Storage *storage, const rocksdb::ReadOptions &read_options); + bool Valid() const; + void Seek(const rocksdb::Slice &target); + void Next(); + const std::vector &GetItems() const; + std::string GetLogData() const; + + private: + void findMetaData(); + void initSubData(); + void findSubData(); + + rocksdb::ColumnFamilyHandle *metadata_cf_; + rocksdb::ColumnFamilyHandle *subkey_cf_; + rocksdb::ColumnFamilyHandle *zset_score_cf_; + // TODO: support stream cf + std::unique_ptr metadata_iter_; + std::unique_ptr subdata_iter_; + + bool valid_; + Metadata metadata_; + std::string metakey_prefix_; + std::string subkey_prefix_; + std::string log_data_; + std::vector items_; +}; diff --git a/src/cluster/slot_import.cc b/src/cluster/slot_import.cc index 363f9559140..58401557893 100644 --- a/src/cluster/slot_import.cc +++ b/src/cluster/slot_import.cc @@ -48,6 +48,8 @@ bool SlotImport::Start(int fd, int slot) { return false; } + storage_->DisableCompact(slot); + import_status_ = kImportStart; import_slot_ = slot; import_fd_ = fd; @@ -68,6 +70,8 @@ bool SlotImport::Success(int slot) { return false; } + storage_->ResetDisabledCompactSlot(); + import_status_ = kImportSuccess; import_fd_ = -1; @@ -81,6 +85,8 @@ bool SlotImport::Fail(int slot) { return false; } + storage_->ResetDisabledCompactSlot(); + // Clean imported slot data auto s = ClearKeysOfSlot(namespace_, slot); if (!s.ok()) { @@ -116,6 +122,8 @@ void SlotImport::StopForLinkError(int fd) { } } + storage_->ResetDisabledCompactSlot(); + LOG(INFO) << "[import] Stop importing for link error, slot: " << import_slot_; import_status_ = kImportFailed; import_fd_ = -1; diff --git a/src/cluster/slot_migrate.cc b/src/cluster/slot_migrate.cc index 786e5eb9384..3c8ad2e2e21 100644 --- a/src/cluster/slot_migrate.cc +++ b/src/cluster/slot_migrate.cc @@ -21,13 +21,16 @@ #include "slot_migrate.h" #include -#include #include "db_util.h" #include "event_util.h" #include "fmt/format.h" +#include "hiredis.h" #include "io_util.h" +#include "migrate_iterator.h" +#include "scope_exit.h" #include "storage/batch_extractor.h" +#include "storage/storage.h" #include "sync_migrate_context.h" #include "thread_util.h" #include "time_util.h" @@ -36,6 +39,7 @@ const char *errFailedToSendCommands = "failed to send commands to restore a key"; const char *errMigrationTaskCanceled = "key migration stopped due to a task cancellation"; const char *errFailedToSetImportStatus = "failed to set import status on destination node"; +const char *errUnsupportedMigrateType = "unsupported migrate type"; static std::map type_to_cmd = { {kRedisString, "set"}, {kRedisList, "rpush"}, {kRedisHash, "hmset"}, {kRedisSet, "sadd"}, @@ -71,6 +75,11 @@ SlotMigrator::SlotMigrator(Server *svr, int max_migration_speed, int max_pipelin seq_gap_limit_ = seq_gap_limit; } + migrate_batch_size_ = svr->GetConfig()->migrate_batch_size_kb * KiB; + migrate_batch_bytes_per_sec_ = svr->GetConfig()->migrate_batch_rate_limit_mb * MiB; + migrate_batch_rate_limiter_ = std::unique_ptr( + rocksdb::NewGenericRateLimiter(static_cast(migrate_batch_bytes_per_sec_))); + if (svr->IsSlave()) { SetStopMigrationFlag(true); } @@ -211,7 +220,7 @@ void SlotMigrator::runMigrationProcess() { break; } case SlotMigrationStage::kWAL: { - auto s = syncWal(); + auto s = syncWAL(); if (s.IsOK()) { LOG(INFO) << "[migrate] Succeed to sync from WAL for a slot " << migrating_slot_; current_stage_ = SlotMigrationStage::kSuccess; @@ -287,104 +296,57 @@ Status SlotMigrator::startMigration() { } } - // Set destination node import status to START - auto s = setImportStatusOnDstNode(*dst_fd_, kImportStart); - if (!s.IsOK()) { - return s.Prefixed(errFailedToSetImportStatus); - } - - LOG(INFO) << "[migrate] Start migrating slot " << migrating_slot_ << ", connect destination fd " << *dst_fd_; - - return Status::OK(); -} - -Status SlotMigrator::sendSnapshot() { - uint64_t migrated_key_cnt = 0; - uint64_t expired_key_cnt = 0; - uint64_t empty_key_cnt = 0; - std::string restore_cmds; - int16_t slot = migrating_slot_; - - LOG(INFO) << "[migrate] Start migrating snapshot of slot " << slot; - - rocksdb::ReadOptions read_options; - read_options.snapshot = slot_snapshot_; - storage_->SetReadOptions(read_options); - rocksdb::ColumnFamilyHandle *cf_handle = storage_->GetCFHandle(engine::kMetadataColumnFamilyName); - auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle)); - - // Construct key prefix to iterate the keys belong to the target slot - std::string prefix; - ComposeSlotKeyPrefix(namespace_, slot, &prefix); - LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix; - - // Seek to the beginning of keys start with 'prefix' and iterate all these keys - for (iter->Seek(prefix); iter->Valid(); iter->Next()) { - // The migrating task has to be stopped, if server role is changed from master to slave - // or flush command (flushdb or flushall) is executed - if (stop_migration_) { - return {Status::NotOK, errMigrationTaskCanceled}; + if (GetMigrateType() == MigrateType::kRawKeyValue) { + int timeout = 3; + if (dst_redis_context_ == nullptr) { + auto s = util::CreateRedisContextFromConnectedFd(dst_fd_.Get(), timeout, &dst_redis_context_); + if (!s.IsOK()) { + return {Status::NotOK, fmt::format("create redis context failed, {}", s.Msg())}; + } } - - // Iteration is out of range - if (!iter->key().starts_with(prefix)) { - break; + // Check whether the source and target clock rollback too much + int64_t skew = 0; + auto s = getClockSkew(&skew); + if (!s.IsOK()) { + return {Status::NotOK, fmt::format("check timestamp diff with destination server failed, {}", s.Msg())}; } - // Get user key - std::string ns, user_key; - ExtractNamespaceKey(iter->key(), &ns, &user_key, true); - - // Add key's constructed commands to restore_cmds, send pipeline or not according to task's max_pipeline_size - auto result = migrateOneKey(user_key, iter->value(), &restore_cmds); - if (!result.IsOK()) { - return {Status::NotOK, fmt::format("failed to migrate a key {}: {}", user_key, result.Msg())}; - } - - if (*result == KeyMigrationResult::kMigrated) { - LOG(INFO) << "[migrate] The key " << user_key << " successfully migrated"; - migrated_key_cnt++; - } else if (*result == KeyMigrationResult::kExpired) { - LOG(INFO) << "[migrate] The key " << user_key << " is expired"; - expired_key_cnt++; - } else if (*result == KeyMigrationResult::kUnderlyingStructEmpty) { - LOG(INFO) << "[migrate] The key " << user_key << " has no elements"; - empty_key_cnt++; - } else { - LOG(ERROR) << "[migrate] Migrated a key " << user_key << " with unexpected result: " << static_cast(*result); - return {Status::NotOK}; + if (skew < 0 && std::abs(skew) > kMaxClockBackoffUS) { + return {Status::NotOK, + fmt::format("destination server system clock backoff {} us, cannot migrate using raw key value", + std::to_string(std::abs(skew)))}; } } - // It's necessary to send commands that are still in the pipeline since the final pipeline may not be sent - // while iterating keys because its size could be less than max_pipeline_size_ - auto s = sendCmdsPipelineIfNeed(&restore_cmds, true); + // Set destination node import status to START + auto s = setImportStatusOnDstNode(*dst_fd_, kImportStart); if (!s.IsOK()) { - return s.Prefixed(errFailedToSendCommands); + return s.Prefixed(errFailedToSetImportStatus); } - LOG(INFO) << "[migrate] Succeed to migrate slot snapshot, slot: " << slot << ", Migrated keys: " << migrated_key_cnt - << ", Expired keys: " << expired_key_cnt << ", Emtpy keys: " << empty_key_cnt; + LOG(INFO) << "[migrate] Start migrating slot " << migrating_slot_ << ", connect destination fd " << *dst_fd_; return Status::OK(); } -Status SlotMigrator::syncWal() { - // Send incremental data from WAL circularly until new increment less than a certain amount - auto s = syncWalBeforeForbiddingSlot(); - if (!s.IsOK()) { - return s.Prefixed("failed to sync WAL before forbidding a slot"); +Status SlotMigrator::sendSnapshot() { + MigrateType migrate_type = GetMigrateType(); + if (migrate_type == MigrateType::kRedisCommand) { + return sendSnapshotByCmd(); + } else if (migrate_type == MigrateType::kRawKeyValue) { + return sendSnapshotByRawKV(); } + return {Status::NotOK, errUnsupportedMigrateType}; +} - setForbiddenSlot(migrating_slot_); - - // Send last incremental data - s = syncWalAfterForbiddingSlot(); - if (!s.IsOK()) { - return s.Prefixed("failed to sync WAL after forbidding a slot"); +Status SlotMigrator::syncWAL() { + MigrateType migrate_type = GetMigrateType(); + if (migrate_type == MigrateType::kRedisCommand) { + return syncWALByCmd(); + } else if (migrate_type == MigrateType::kRawKeyValue) { + return syncWALByRawKV(); } - - return Status::OK(); + return {Status::NotOK, errUnsupportedMigrateType}; } Status SlotMigrator::finishSuccessfulMigration() { @@ -582,6 +544,95 @@ Status SlotMigrator::checkMultipleResponses(int sock_fd, int total) { } } +Status SlotMigrator::sendSnapshotByCmd() { + uint64_t migrated_key_cnt = 0; + uint64_t expired_key_cnt = 0; + uint64_t empty_key_cnt = 0; + std::string restore_cmds; + int16_t slot = migrating_slot_; + + LOG(INFO) << "[migrate] Start migrating snapshot of slot " << slot; + + rocksdb::ReadOptions read_options; + read_options.snapshot = slot_snapshot_; + storage_->SetReadOptions(read_options); + rocksdb::ColumnFamilyHandle *cf_handle = storage_->GetCFHandle(engine::kMetadataColumnFamilyName); + auto iter = util::UniqueIterator(storage_->GetDB()->NewIterator(read_options, cf_handle)); + + // Construct key prefix to iterate the keys belong to the target slot + std::string prefix; + ComposeSlotKeyPrefix(namespace_, slot, &prefix); + LOG(INFO) << "[migrate] Iterate keys of slot, key's prefix: " << prefix; + + // Seek to the beginning of keys start with 'prefix' and iterate all these keys + for (iter->Seek(prefix); iter->Valid(); iter->Next()) { + // The migrating task has to be stopped, if server role is changed from master to slave + // or flush command (flushdb or flushall) is executed + if (stop_migration_) { + return {Status::NotOK, errMigrationTaskCanceled}; + } + + // Iteration is out of range + if (!iter->key().starts_with(prefix)) { + break; + } + + // Get user key + std::string ns, user_key; + ExtractNamespaceKey(iter->key(), &ns, &user_key, true); + + // Add key's constructed commands to restore_cmds, send pipeline or not according to task's max_pipeline_size + auto result = migrateOneKey(user_key, iter->value(), &restore_cmds); + if (!result.IsOK()) { + return {Status::NotOK, fmt::format("failed to migrate a key {}: {}", user_key, result.Msg())}; + } + + if (*result == KeyMigrationResult::kMigrated) { + LOG(INFO) << "[migrate] The key " << user_key << " successfully migrated"; + migrated_key_cnt++; + } else if (*result == KeyMigrationResult::kExpired) { + LOG(INFO) << "[migrate] The key " << user_key << " is expired"; + expired_key_cnt++; + } else if (*result == KeyMigrationResult::kUnderlyingStructEmpty) { + LOG(INFO) << "[migrate] The key " << user_key << " has no elements"; + empty_key_cnt++; + } else { + LOG(ERROR) << "[migrate] Migrated a key " << user_key << " with unexpected result: " << static_cast(*result); + return {Status::NotOK}; + } + } + + // It's necessary to send commands that are still in the pipeline since the final pipeline may not be sent + // while iterating keys because its size could be less than max_pipeline_size_ + auto s = sendCmdsPipelineIfNeed(&restore_cmds, true); + if (!s.IsOK()) { + return s.Prefixed(errFailedToSendCommands); + } + + LOG(INFO) << "[migrate] Succeed to migrate slot snapshot, slot: " << slot << ", Migrated keys: " << migrated_key_cnt + << ", Expired keys: " << expired_key_cnt << ", Emtpy keys: " << empty_key_cnt; + + return Status::OK(); +} + +Status SlotMigrator::syncWALByCmd() { + // Send incremental data from WAL circularly until new increment less than a certain amount + auto s = syncWALBeforeForbiddingSlot(); + if (!s.IsOK()) { + return s.Prefixed("failed to sync WAL before forbidding a slot"); + } + + setForbiddenSlot(migrating_slot_); + + // Send last incremental data + s = syncWALAfterForbiddingSlot(); + if (!s.IsOK()) { + return s.Prefixed("failed to sync WAL after forbidding a slot"); + } + + return Status::OK(); +} + StatusOr SlotMigrator::migrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &encoded_metadata, std::string *restore_cmds) { @@ -1007,7 +1058,7 @@ Status SlotMigrator::migrateIncrementData(std::unique_ptrGetDB()->GetLatestSequenceNumber(); // No incremental data @@ -1113,3 +1164,230 @@ void SlotMigrator::resumeSyncCtx(const Status &migrate_result) { blocking_context_ = nullptr; } } + +void SlotMigrator::SetMigrateBatchRateLimit(size_t bytes_per_sec) { + migrate_batch_bytes_per_sec_ = + (bytes_per_sec == 0 || bytes_per_sec > kMaxMigrateBatchRate) ? kMaxMigrateBatchRate : bytes_per_sec; + migrate_batch_rate_limiter_->SetBytesPerSecond(static_cast(migrate_batch_bytes_per_sec_)); +} + +MigrateType SlotMigrator::GetMigrateType() { return static_cast(svr_->GetConfig()->migrate_type); } + +Status SlotMigrator::getClockSkew(int64_t *diff_us) { + assert(dst_redis_context_ != nullptr); + uint64_t send_timestamp = util::GetTimeStampUS(); + auto *reply = static_cast(redisCommand(dst_redis_context_, "TIME")); + uint64_t receive_timestamp = util::GetTimeStampUS(); + + auto exit = MakeScopeExit([reply] { + if (reply != nullptr) { + freeReplyObject(reply); + } + }); + + if (dst_redis_context_->err != 0) { + return {Status::NotOK, std::string(dst_redis_context_->errstr)}; + } + + if (reply == nullptr) { + return {Status::NotOK, "get null reply from TIME command"}; + } + + if (reply->type == REDIS_REPLY_ERROR) { + auto error_str = std::string(reply->str); + return {Status::NotOK, error_str}; + } + + if (reply->elements != 2) { + return {Status::NotOK, "get invalid reply from TIME command"}; + } + + uint64_t dst_timestamp = std::stoul(reply->element[0]->str) * 1000000 + std::stoul(reply->element[1]->str); + + *diff_us = static_cast(dst_timestamp - (receive_timestamp + send_timestamp) / 2); + + return Status::OK(); +} + +Status SlotMigrator::sendMigrateBatch(MigrateBatch *batch) { + // user may dynamically resize the batch, apply it when send data + batch->SetMaxBytes(migrate_batch_size_); + // rate limit + if (migrate_batch_bytes_per_sec_ < kMaxMigrateBatchRate) { + auto single_burst = migrate_batch_rate_limiter_->GetSingleBurstBytes(); + auto left = static_cast(batch->GetDataSize()); + while (left > 0) { + auto request_size = std::min(left, single_burst); + migrate_batch_rate_limiter_->Request(request_size, rocksdb::Env::IOPriority::IO_HIGH, nullptr); + left -= request_size; + } + } + return batch->Send(); +} + +Status SlotMigrator::sendSnapshotByRawKV() { + rocksdb::ReadOptions read_options; + read_options.snapshot = slot_snapshot_; + read_options.fill_cache = false; + read_options.total_order_seek = true; + + assert(dst_redis_context_ != nullptr); + + MigrateBatch migrate_batch(migrating_slot_.load(), dst_redis_context_, migrate_batch_size_.load()); + + uint64_t start_send_snapshot_time = util::GetTimeStampMS(); + + MigrateIterator migrate_iterator(storage_, read_options); + + LOG(INFO) << "[migrate] Migrating snapshot of slot " << migrating_slot_ << " by raw key value"; + + std::string prefix; + Status s; + ComposeSlotKeyPrefix(namespace_, migrating_slot_, &prefix); + + for (migrate_iterator.Seek(prefix); migrate_iterator.Valid(); migrate_iterator.Next()) { + auto items = migrate_iterator.GetItems(); + + for (const auto &item : items) { + if (item.cf == storage_->GetCFHandle(engine::kMetadataColumnFamilyName)) { + migrate_batch.SetPrefixLogData(migrate_iterator.GetLogData()); + } + s = migrate_batch.Put(item.cf, item.key, item.value); + if (!s) return s; + } + if (migrate_batch.IsFull()) { + s = sendMigrateBatch(&migrate_batch); + if (!s) return s; + } + } + + s = sendMigrateBatch(&migrate_batch); + + if (!s) return s; + + double rate = 0; + auto elapsed_time = util::GetTimeStampMS() - start_send_snapshot_time; + if (elapsed_time != 0) { + rate = + ((static_cast(migrate_batch.GetSentBytes()) / 1024.0) / (static_cast(elapsed_time) / 1000.0)); + } + LOG(INFO) << fmt::format( + "[migrate] Succeed to migrate snapshot, slot: {}, elapsed: {} ms, sent: {} bytes, rate: {:.2f} kb/s, batches: " + "{}, entries: {}", + migrating_slot_.load(), elapsed_time, migrate_batch.GetSentBytes(), rate, migrate_batch.GetSentBatchesNum(), + migrate_batch.GetEntriesNum()); + return Status::OK(); +} + +Status SlotMigrator::syncWALByRawKV() { + int epoch = 1; + MigrateBatch migrate_batch(migrating_slot_, dst_redis_context_, migrate_batch_size_); + uint64_t start_sync_wal_time = util::GetTimeStampMS(); + LOG(INFO) << "[migrate] Syncing WAL of slot " << migrating_slot_ << " by raw key value"; + uint64_t wal_incremental_seq = 0; + while (epoch <= kMaxLoopTimes) { + if (catchedUpIncrementalWAL()) { + break; + } + wal_incremental_seq = storage_->GetDB()->GetLatestSequenceNumber(); + auto s = migrateIncrementalDataByRawKV(wal_incremental_seq, &migrate_batch); + if (!s.IsOK()) { + return {Status::NotOK, fmt::format("migrate incremental data failed, {}", s.Msg())}; + } + LOG(INFO) << fmt::format("[migrate] Migrated incremental data, epoch: {}, seq from {} to {}", epoch, wal_begin_seq_, + wal_incremental_seq); + wal_begin_seq_ = wal_incremental_seq; + epoch++; + } + + setForbiddenSlot(migrating_slot_); + + wal_incremental_seq = storage_->GetDB()->GetLatestSequenceNumber(); + if (wal_incremental_seq > wal_begin_seq_) { + auto s = migrateIncrementalDataByRawKV(wal_incremental_seq, &migrate_batch); + if (!s.IsOK()) { + return {Status::NotOK, fmt::format("migrate last incremental data failed, {}", s.Msg())}; + } + LOG(INFO) << fmt::format("[migrate] Migrated last incremental data after set forbidden slot, seq from {} to {}", + wal_begin_seq_, wal_incremental_seq); + } + + double rate = 0; + auto elapsed_time = util::GetTimeStampMS() - start_sync_wal_time; + if (elapsed_time != 0) { + rate = + ((static_cast(migrate_batch.GetSentBytes()) / 1024.0) / (static_cast(elapsed_time) / 1000.0)); + } + LOG(INFO) << fmt::format( + "[migrate] Succeed to migrate incremental data, slot: {}, elapsed: {} ms, sent: {} bytes, rate: {:.2f} kb/s, " + "batches: {}, entries: {}", + migrating_slot_.load(), elapsed_time, migrate_batch.GetSentBytes(), rate, migrate_batch.GetSentBatchesNum(), + migrate_batch.GetEntriesNum()); + return Status::OK(); +} + +bool SlotMigrator::catchedUpIncrementalWAL() { + uint64_t gap = storage_->GetDB()->GetLatestSequenceNumber() - wal_begin_seq_; + if (gap <= seq_gap_limit_) { + LOG(INFO) << fmt::format("[migrate] Incremental data sequence gap: {}, less than limit: {}, set forbidden slot: {}", + gap, seq_gap_limit_, migrating_slot_.load()); + return true; + } + return false; +} + +Status SlotMigrator::migrateIncrementalDataByRawKV(uint64_t end_seq, MigrateBatch *migrate_batch) { + std::unique_ptr iter; + uint64_t next_seq = wal_begin_seq_ + 1; + auto s = storage_->GetWALIter(next_seq, &iter); + if (!s) return s; + while (true) { + if (stop_migration_) { + return {Status::NotOK, errMigrationTaskCanceled}; + } + if (!iter->Valid()) { + return {Status::NotOK, + fmt::format("WAL iterator is invalid, expected end seq: {}, next seq: {}", end_seq, next_seq)}; + } + auto batch = iter->GetBatch(); + if (batch.sequence != next_seq) { + return {Status::NotOK, + fmt::format("WAL data might be lost, expected next seq: {}, got {}", next_seq, batch.sequence)}; + } + if (!batch.writeBatchPtr) { + return {Status::NotOK, "WAL write batch empty"}; + } + + s = extractSlotDataFromWAL(migrating_slot_, batch.writeBatchPtr, migrate_batch); + if (!s) return s; + if (migrate_batch->IsFull()) { + s = sendMigrateBatch(migrate_batch); + if (!s) return s; + } + + next_seq = batch.sequence + batch.writeBatchPtr->Count(); + if (next_seq > end_seq) { + break; + } + iter->Next(); + } + // send the remaining data + return sendMigrateBatch(migrate_batch); +} + +Status SlotMigrator::extractSlotDataFromWAL(int16_t slot, const std::unique_ptr &write_batch, + MigrateBatch *migrate_batch) { + // todo: stream, put it in SlotMigrateWriteBatchHandler + std::unordered_map cf_id_map = { + {static_cast(kColumnFamilyIDMetadata), storage_->GetCFHandle(engine::kMetadataColumnFamilyName)}, + {static_cast(kColumnFamilyIDDefault), storage_->GetCFHandle(engine::kSubkeyColumnFamilyName)}, + {static_cast(kColumnFamilyIDZSetScore), storage_->GetCFHandle(engine::kZSetScoreColumnFamilyName)}, + }; + + SlotMigrateWriteBatchHandler write_batch_handler(cf_id_map, slot, migrate_batch); + auto s = write_batch->Iterate(&write_batch_handler); + if (!s.ok()) { + return {Status::NotOK, fmt::format("encountered error while parsing WAL, {}", s.ToString())}; + } + return Status::OK(); +} diff --git a/src/cluster/slot_migrate.h b/src/cluster/slot_migrate.h index 5dc1559ccf0..cfe7d103cd4 100644 --- a/src/cluster/slot_migrate.h +++ b/src/cluster/slot_migrate.h @@ -22,6 +22,7 @@ #include #include +#include #include #include #include @@ -36,6 +37,7 @@ #include "config.h" #include "encoding.h" +#include "migrate_batch.h" #include "parse_util.h" #include "redis_slot.h" #include "server/server.h" @@ -51,6 +53,10 @@ enum class SlotMigrationStage { kNone, kStart, kSnapshot, kWAL, kSuccess, kFaile enum class KeyMigrationResult { kMigrated, kExpired, kUnderlyingStructEmpty }; +enum class MigrateType { kRedisCommand = 0, kRawKeyValue }; + +struct redisContext; + struct SlotMigrationJob { SlotMigrationJob(int slot_id, std::string dst_ip, int dst_port, int speed, int pipeline_size, int seq_gap) : slot_id(static_cast(slot_id)), @@ -94,11 +100,14 @@ class SlotMigrator : public redis::Database { void SetSequenceGapLimit(int value) { if (value > 0) seq_gap_limit_ = value; } + void SetMigrateBatchRateLimit(size_t bytes_per_sec); + void SetMigrateBatchSize(size_t size) { migrate_batch_size_ = size; }; void SetStopMigrationFlag(bool value) { stop_migration_ = value; } bool IsMigrationInProgress() const { return migration_state_ == MigrationState::kStarted; } SlotMigrationStage GetCurrentSlotMigrationStage() const { return current_stage_; } int16_t GetForbiddenSlot() const { return forbidden_slot_; } int16_t GetMigratingSlot() const { return migrating_slot_; } + MigrateType GetMigrateType(); void GetMigrationInfo(std::string *info) const; void CancelSyncCtx(); @@ -108,7 +117,7 @@ class SlotMigrator : public redis::Database { bool isTerminated() { return thread_state_ == ThreadState::Terminated; } Status startMigration(); Status sendSnapshot(); - Status syncWal(); + Status syncWAL(); Status finishSuccessfulMigration(); Status finishFailedMigration(); void clean(); @@ -118,6 +127,8 @@ class SlotMigrator : public redis::Database { Status checkSingleResponse(int sock_fd); Status checkMultipleResponses(int sock_fd, int total); + Status sendSnapshotByCmd(); + Status syncWALByCmd(); StatusOr migrateOneKey(const rocksdb::Slice &key, const rocksdb::Slice &encoded_metadata, std::string *restore_cmds); Status migrateSimpleKey(const rocksdb::Slice &key, const Metadata &metadata, const std::string &bytes, @@ -131,10 +142,19 @@ class SlotMigrator : public redis::Database { void applyMigrationSpeedLimit() const; Status generateCmdsFromBatch(rocksdb::BatchResult *batch, std::string *commands); Status migrateIncrementData(std::unique_ptr *iter, uint64_t end_seq); - Status syncWalBeforeForbiddingSlot(); - Status syncWalAfterForbiddingSlot(); + Status syncWALBeforeForbiddingSlot(); + Status syncWALAfterForbiddingSlot(); void setForbiddenSlot(int16_t slot); - std::unique_lock blockingLock() { return std::unique_lock(blocking_mutex_); } + + // migration of RawKV + Status getClockSkew(int64_t *diff_us); + Status sendMigrateBatch(MigrateBatch *batch); + Status sendSnapshotByRawKV(); + Status syncWALByRawKV(); + bool catchedUpIncrementalWAL(); + Status migrateIncrementalDataByRawKV(uint64_t end_seq, MigrateBatch *migrate_batch); + Status extractSlotDataFromWAL(int16_t slot, const std::unique_ptr &write_batch, + MigrateBatch *migrate_batch); void resumeSyncCtx(const Status &migrate_result); @@ -146,11 +166,17 @@ class SlotMigrator : public redis::Database { static const int kDefaultSequenceGapLimit = 10000; static const int kMaxItemsInCommand = 16; // number of items in every write command of complex keys static const int kMaxLoopTimes = 10; + static const int kMaxClockBackoffUS = 1000000; + static const size_t kMaxMigrateBatchRate = 1 * GiB; Server *svr_; int max_migration_speed_; int max_pipeline_size_; - int seq_gap_limit_; + uint64_t seq_gap_limit_; + + std::unique_ptr migrate_batch_rate_limiter_; + std::atomic migrate_batch_bytes_per_sec_; + std::atomic migrate_batch_size_; SlotMigrationStage current_stage_ = SlotMigrationStage::kNone; ParserState parser_state_ = ParserState::ArrayLen; @@ -169,6 +195,7 @@ class SlotMigrator : public redis::Database { std::string dst_ip_; int dst_port_ = -1; UniqueFD dst_fd_; + redisContext *dst_redis_context_ = nullptr; std::atomic forbidden_slot_ = -1; std::atomic migrating_slot_ = -1; diff --git a/src/commands/cmd_server.cc b/src/commands/cmd_server.cc index 18c56f86266..3acc9a7d8c9 100644 --- a/src/commands/cmd_server.cc +++ b/src/commands/cmd_server.cc @@ -969,6 +969,29 @@ class CommandStats : public Commander { } }; +class CommandBatchSet : public Commander { + public: + Status Parse(const std::vector &args) override { + slot_ = GET_OR_RET(ParseInt(args[1], 10)); + write_batch_ = args[2]; + return Commander::Parse(args); + } + + Status Execute(Server *svr, Connection *conn, std::string *output) override { + size_t size = write_batch_.size(); + auto s = svr->storage->ApplyWriteBatch(std::move(write_batch_)); + if (!s) { + return s; + } + *output = redis::Integer(size); + return Status::OK(); + } + + private: + int64_t slot_ = -1; + std::string write_batch_; +}; + REDIS_REGISTER_COMMANDS(MakeCmdAttr("auth", 2, "read-only ok-loading", 0, 0, 0), MakeCmdAttr("ping", -1, "read-only", 0, 0, 0), MakeCmdAttr("select", 2, "read-only", 0, 0, 0), @@ -1000,6 +1023,7 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr("auth", 2, "read-only ok-loadin MakeCmdAttr("bgsave", 1, "read-only no-script", 0, 0, 0), MakeCmdAttr("flushbackup", 1, "read-only no-script", 0, 0, 0), MakeCmdAttr("slaveof", 3, "read-only exclusive no-script", 0, 0, 0), - MakeCmdAttr("stats", 1, "read-only", 0, 0, 0), ) + MakeCmdAttr("stats", 1, "read-only", 0, 0, 0), + MakeCmdAttr("batchset", 3, "write", 0, 0, 0), ) } // namespace redis diff --git a/src/common/io_util.cc b/src/common/io_util.cc index cb30a48047b..27f137a52c1 100644 --- a/src/common/io_util.cc +++ b/src/common/io_util.cc @@ -29,6 +29,8 @@ #include #include +#include "hiredis.h" + #ifdef __linux__ #include #endif @@ -404,4 +406,23 @@ Status Write(int fd, const std::string &data) { return WriteImpl(fd, data Status Pwrite(int fd, const std::string &data, off_t offset) { return WriteImpl(fd, data, offset); } +Status CreateRedisContextFromConnectedFd(int fd, int timeout, redisContext **redis_context) { + *redis_context = redisConnectFd(fd); + if (*redis_context == nullptr) { + return {Status::NotOK, "init failed"}; + } + + if ((*redis_context)->err != 0) { + auto error_str = std::string((*redis_context)->errstr); + redisFree(*redis_context); + return {Status::NotOK, error_str}; + } + + if (redisSetTimeout(*redis_context, timeval{timeout, 0}) != REDIS_OK) { + redisFree(*redis_context); + return {Status::NotOK, "set timeout failed"}; + } + return Status::OK(); +} + } // namespace util diff --git a/src/common/io_util.h b/src/common/io_util.h index 8391d2cc129..b0c0331681c 100644 --- a/src/common/io_util.h +++ b/src/common/io_util.h @@ -24,6 +24,8 @@ #include "status.h" +struct redisContext; + namespace util { sockaddr_in NewSockaddrInet(const std::string &host, uint32_t port); @@ -46,4 +48,6 @@ int AeWait(int fd, int mask, int milliseconds); Status Write(int fd, const std::string &data); Status Pwrite(int fd, const std::string &data, off_t offset); +Status CreateRedisContextFromConnectedFd(int fd, int response_timeout, redisContext **redis_context); + } // namespace util diff --git a/src/config/config.cc b/src/config/config.cc index 64cce3c4ba3..d99029f83c4 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -62,6 +62,9 @@ ConfigEnum log_levels[] = {{"info", google::INFO}, {"fatal", google::FATAL}, {nullptr, 0}}; +ConfigEnum migrate_types[] = {{"redis_command", static_cast(MigrateType::kRedisCommand)}, + {"raw_key_value", static_cast(MigrateType::kRawKeyValue)}}; + std::string TrimRocksDbPrefix(std::string s) { if (strncasecmp(s.data(), "rocksdb.", 8) != 0) return s; return s.substr(8, s.size() - 8); @@ -161,6 +164,9 @@ Config::Config() { {"migrate-speed", false, new IntField(&migrate_speed, 4096, 0, INT_MAX)}, {"migrate-pipeline-size", false, new IntField(&pipeline_size, 16, 1, INT_MAX)}, {"migrate-sequence-gap", false, new IntField(&sequence_gap, 10000, 1, INT_MAX)}, + {"migrate-type", false, new EnumField(&migrate_type, migrate_types, 0)}, + {"migrate-batch-size-kb", false, new IntField(&migrate_batch_size_kb, 16, 1, INT_MAX)}, + {"migrate-batch-rate-limit-mb", false, new IntField(&migrate_batch_rate_limit_mb, 16, 0, INT_MAX)}, {"unixsocket", true, new StringField(&unixsocket, "")}, {"unixsocketperm", true, new OctalField(&unixsocketperm, 0777, 1, INT_MAX)}, {"log-retention-days", false, new IntField(&log_retention_days, -1, -1, INT_MAX)}, @@ -466,6 +472,18 @@ void Config::initFieldCallback() { if (cluster_enabled) srv->slot_migrator->SetSequenceGapLimit(sequence_gap); return Status::OK(); }}, + {"migrate-batch-rate-limit-mb", + [this](Server *srv, const std::string &k, const std::string &v) -> Status { + if (!srv) return Status::OK(); + srv->slot_migrator->SetMigrateBatchRateLimit(migrate_batch_rate_limit_mb * MiB); + return Status::OK(); + }}, + {"migrate-batch-size-kb", + [this](Server *srv, const std::string &k, const std::string &v) -> Status { + if (!srv) return Status::OK(); + srv->slot_migrator->SetMigrateBatchSize(migrate_batch_size_kb * KiB); + return Status::OK(); + }}, {"log-retention-days", [this](Server *srv, const std::string &k, const std::string &v) -> Status { if (!srv) return Status::OK(); diff --git a/src/config/config.h b/src/config/config.h index 3a06300bc26..87cd2a0c058 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -143,6 +143,9 @@ struct Config { int migrate_speed; int pipeline_size; int sequence_gap; + int migrate_type; + int migrate_batch_size_kb; + int migrate_batch_rate_limit_mb; bool redis_cursor_compatible = false; int log_retention_days; diff --git a/src/server/server.cc b/src/server/server.cc index a5604496b8b..6dbb8348535 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -824,6 +824,7 @@ void Server::GetRocksDBInfo(std::string *info) { db_job_mu_.lock(); string_stream << "is_bgsaving:" << (is_bgsave_in_progress_ ? "yes" : "no") << "\r\n"; string_stream << "is_compacting:" << (db_compacting_ ? "yes" : "no") << "\r\n"; + string_stream << "slot_disabled_compaction:" << storage->GetDisabledCompactSlot() << "\r\n"; db_job_mu_.unlock(); *info = string_stream.str(); diff --git a/src/storage/batch_extractor.cc b/src/storage/batch_extractor.cc index 16f02bbcd80..deabc8574a3 100644 --- a/src/storage/batch_extractor.cc +++ b/src/storage/batch_extractor.cc @@ -415,3 +415,67 @@ Status WriteBatchExtractor::ExtractStreamAddCommand(bool is_slot_id_encoded, con return Status::OK(); } + +void SlotMigrateWriteBatchHandler::LogData(const rocksdb::Slice &blob) { + if (ServerLogData::IsServerLogData(blob.data())) { + return; + } + + log_data_ = blob.ToString(); +} + +rocksdb::Status SlotMigrateWriteBatchHandler::PutCF(uint32_t column_family_id, const Slice &key, const Slice &value) { + if (cf_id_map_.find(column_family_id) == cf_id_map_.end()) { + return rocksdb::Status::OK(); + } + uint16_t slot_id = 0; + ExtractSlotId(key, &slot_id); + if (slot_ != slot_id) { + return rocksdb::Status::OK(); + } + + if (!log_data_.empty()) { + auto s = migrate_batch_->PutLogData(log_data_); + if (!s) { + return rocksdb::Status::Aborted(s.Msg()); + } + log_data_.clear(); + } + + auto s = migrate_batch_->Put(cf_id_map_[column_family_id], key, value); + if (!s) { + return rocksdb::Status::Aborted(s.Msg()); + } + + return rocksdb::Status::OK(); +} + +rocksdb::Status SlotMigrateWriteBatchHandler::DeleteCF(uint32_t column_family_id, const Slice &key) { + if (cf_id_map_.find(column_family_id) == cf_id_map_.end()) { + return rocksdb::Status::OK(); + } + uint16_t slot_id = 0; + ExtractSlotId(key, &slot_id); + if (slot_ != slot_id) { + return rocksdb::Status::OK(); + } + + if (!log_data_.empty()) { + auto s = migrate_batch_->PutLogData(log_data_); + if (!s) { + return rocksdb::Status::Aborted(s.Msg()); + } + log_data_.clear(); + } + + auto s = migrate_batch_->Delete(cf_id_map_[column_family_id], key); + if (!s) { + return rocksdb::Status::Aborted(s.Msg()); + } + return rocksdb::Status::OK(); +} + +rocksdb::Status SlotMigrateWriteBatchHandler::DeleteRangeCF(uint32_t column_family_id, const Slice &begin_key, + const Slice &end_key) { + return rocksdb::Status::OK(); +} diff --git a/src/storage/batch_extractor.h b/src/storage/batch_extractor.h index 694cc3bfa63..10623ebedbd 100644 --- a/src/storage/batch_extractor.h +++ b/src/storage/batch_extractor.h @@ -52,3 +52,25 @@ class WriteBatchExtractor : public rocksdb::WriteBatch::Handler { int slot_id_; bool to_redis_; }; + +class MigrateBatch; + +// extractor for raw key value migrate, must be slot encoded +class SlotMigrateWriteBatchHandler : public rocksdb::WriteBatch::Handler { + public: + SlotMigrateWriteBatchHandler(const std::unordered_map &cf_id_map, + int16_t slot, MigrateBatch *migrate_batch) + : cf_id_map_(cf_id_map), slot_(slot), migrate_batch_(migrate_batch) {} + void LogData(const rocksdb::Slice &blob) override; + rocksdb::Status PutCF(uint32_t column_family_id, const Slice &key, const Slice &value) override; + + rocksdb::Status DeleteCF(uint32_t column_family_id, const Slice &key) override; + rocksdb::Status DeleteRangeCF(uint32_t column_family_id, const Slice &begin_key, const Slice &end_key) override; + + private: + std::string log_data_; + + std::unordered_map cf_id_map_; + int slot_; + MigrateBatch *migrate_batch_; +}; diff --git a/src/storage/compact_filter.cc b/src/storage/compact_filter.cc index 984c618bdbc..447ab2aee7f 100644 --- a/src/storage/compact_filter.cc +++ b/src/storage/compact_filter.cc @@ -43,6 +43,15 @@ bool MetadataFilter::Filter(int level, const Slice &key, const Slice &value, std << ", namespace: " << ns << ", key: " << user_key << ", err: " << s.ToString(); return false; } + + if (stor_->IsSlotIdEncoded()) { + uint16_t slot_id = 0; + ExtractSlotId(key, &slot_id); + if (stor_->IsCompactDisabled(slot_id)) { + return false; + } + } + DLOG(INFO) << "[compact_filter/metadata] " << "namespace: " << ns << ", key: " << user_key << ", result: " << (metadata.Expired() ? "deleted" : "reserved"); @@ -135,6 +144,10 @@ bool SubKeyFilter::Filter(int level, const Slice &key, const Slice &value, std:: return false; } + if (stor_->IsSlotIdEncoded() && stor_->IsCompactDisabled(ikey.GetSlotId())) { + return false; + } + return IsMetadataExpired(ikey, metadata) || (metadata.Type() == kRedisBitmap && redis::Bitmap::IsEmptySegment(value)); } diff --git a/src/storage/redis_db.cc b/src/storage/redis_db.cc index 1d2bb229f48..21dcdf6f49a 100644 --- a/src/storage/redis_db.cc +++ b/src/storage/redis_db.cc @@ -617,6 +617,8 @@ RedisType WriteBatchLogData::GetRedisType() { return type_; } std::vector *WriteBatchLogData::GetArguments() { return &args_; } +void WriteBatchLogData::SetArguments(std::vector &&args) { args_ = std::move(args); } + std::string WriteBatchLogData::Encode() { std::string ret = std::to_string(type_); for (const auto &arg : args_) { diff --git a/src/storage/redis_db.h b/src/storage/redis_db.h index cbe10192b2f..72abe9854f2 100644 --- a/src/storage/redis_db.h +++ b/src/storage/redis_db.h @@ -95,6 +95,7 @@ class WriteBatchLogData { RedisType GetRedisType(); std::vector *GetArguments(); + void SetArguments(std::vector &&args); std::string Encode(); Status Decode(const rocksdb::Slice &blob); diff --git a/src/storage/redis_metadata.cc b/src/storage/redis_metadata.cc index 125f3081c16..55da4e5f68e 100644 --- a/src/storage/redis_metadata.cc +++ b/src/storage/redis_metadata.cc @@ -75,6 +75,8 @@ Slice InternalKey::GetKey() const { return key_; } Slice InternalKey::GetSubKey() const { return sub_key_; } +uint16_t InternalKey::GetSlotId() const { return slotid_; } + uint64_t InternalKey::GetVersion() const { return version_; } void InternalKey::Encode(std::string *out) { @@ -124,6 +126,15 @@ void ExtractNamespaceKey(Slice ns_key, std::string *ns, std::string *key, bool s *key = ns_key.ToString(); } +// must slot encoded +void ExtractSlotId(Slice ns_key, uint16_t *slot_id) { + uint8_t namespace_size = 0; + GetFixed8(&ns_key, &namespace_size); + ns_key.remove_prefix(namespace_size); + + GetFixed16(&ns_key, slot_id); +} + void ComposeNamespaceKey(const Slice &ns, const Slice &key, std::string *ns_key, bool slot_id_encoded) { ns_key->clear(); diff --git a/src/storage/redis_metadata.h b/src/storage/redis_metadata.h index 5407ff80712..82f591bf3f7 100644 --- a/src/storage/redis_metadata.h +++ b/src/storage/redis_metadata.h @@ -74,6 +74,7 @@ struct KeyNumStats { }; void ExtractNamespaceKey(Slice ns_key, std::string *ns, std::string *key, bool slot_id_encoded); +void ExtractSlotId(Slice ns_key, uint16_t *slot_id); void ComposeNamespaceKey(const Slice &ns, const Slice &key, std::string *ns_key, bool slot_id_encoded); void ComposeSlotKeyPrefix(const Slice &ns, int slotid, std::string *output); @@ -86,6 +87,7 @@ class InternalKey { Slice GetNamespace() const; Slice GetKey() const; Slice GetSubKey() const; + uint16_t GetSlotId() const; uint64_t GetVersion() const; void Encode(std::string *out); bool operator==(const InternalKey &that) const; diff --git a/src/storage/storage.cc b/src/storage/storage.cc index 65906b6fb55..921669b703f 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -36,6 +36,7 @@ #include #include +#include "cluster/cluster.h" #include "compact_filter.h" #include "db_util.h" #include "event_listener.h" @@ -1101,4 +1102,20 @@ bool Storage::ReplDataManager::FileExists(Storage *storage, const std::string &d return crc == tmp_crc; } +Status Storage::ApplyWriteBatch(std::string &&raw_batch) { + auto bat = rocksdb::WriteBatch(std::move(raw_batch)); + auto s = Write(rocksdb::WriteOptions(), &bat); + if (!s.ok()) { + return {Status::NotOK, s.ToString()}; + } + return Status::OK(); +} + +void Storage::DisableCompact(int slot_id) { + assert(Cluster::IsValidSlot(slot_id)); + disable_compact_slot_ = slot_id; +} + +void Storage::ResetDisabledCompactSlot() { disable_compact_slot_ = -1; } + } // namespace engine diff --git a/src/storage/storage.h b/src/storage/storage.h index ae4dc5777d1..49febb07318 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -177,6 +177,12 @@ class Storage { std::string GetReplIdFromWalBySeq(rocksdb::SequenceNumber seq); std::string GetReplIdFromDbEngine(); + Status ApplyWriteBatch(std::string &&raw_batch); + void DisableCompact(int slot_id); + void ResetDisabledCompactSlot(); + bool IsCompactDisabled(int slot_id) { return disable_compact_slot_ == slot_id; } + int GetDisabledCompactSlot() { return disable_compact_slot_; } + private: std::unique_ptr db_ = nullptr; std::string replid_; @@ -210,6 +216,8 @@ class Storage { rocksdb::WriteOptions write_opts_ = rocksdb::WriteOptions(); + std::atomic disable_compact_slot_{-1}; + rocksdb::Status writeToDB(const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates); };