Skip to content

Commit

Permalink
Slot migration based on raw key value
Browse files Browse the repository at this point in the history
  • Loading branch information
caipengbo committed Jun 30, 2023
1 parent 1239dba commit d1f8c6c
Show file tree
Hide file tree
Showing 25 changed files with 1,046 additions and 93 deletions.
2 changes: 2 additions & 0 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,7 @@ include(cmake/rocksdb.cmake)
include(cmake/libevent.cmake)
include(cmake/fmt.cmake)
include(cmake/jsoncons.cmake)
include(cmake/hiredis.cmake)

if (USE_LUAJIT)
include(cmake/luajit.cmake)
Expand Down Expand Up @@ -154,6 +155,7 @@ list(APPEND EXTERNAL_LIBS tbb)
list(APPEND EXTERNAL_LIBS jsoncons)
list(APPEND EXTERNAL_LIBS Threads::Threads)
list(APPEND EXTERNAL_LIBS ${Backtrace_LIBRARY})
list(APPEND EXTERNAL_LIBS hiredis)

# Add git sha to version.h
find_package(Git REQUIRED)
Expand Down
36 changes: 36 additions & 0 deletions cmake/hiredis.cmake
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

include_guard()

include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(hiredis
redis/hiredis v1.1.0
MD5=fee8ff6f43c155fcb0efd6b13835564b
)

FetchContent_MakeAvailableWithArgs(hiredis
BUILD_SHARED_LIBS=OFF
ENABLE_SSL=OFF
DISABLE_TESTS=ON
ENABLE_SSL_TESTS=OFF
ENABLE_EXAMPLES=OFF
ENABLE_ASYNC_TESTS=OFF
)

target_compile_options(hiredis PUBLIC "-Wno-c99-extensions")
24 changes: 23 additions & 1 deletion kvrocks.conf
Original file line number Diff line number Diff line change
Expand Up @@ -501,7 +501,17 @@ compaction-checker-range 0-7
#
# rename-command KEYS ""

################################ MIGRATE #####################################
################################ MIGRATION #####################################
# Slot migration supports two ways:
# - redis_command: Migrate data by redis serialization protocol(RESP).
# - raw_key_value: Migrate the raw key value data of the storage engine directly.
# This way eliminates the overhead of converting to the redis
# command, reduces resource consumption, improves migration
# efficiency, and can implement a finer rate limit.
#
# Default: redis_command
migrate-type redis_command

# If the network bandwidth is completely consumed by the migration task,
# it will affect the availability of kvrocks. To avoid this situation,
# migrate-speed is adopted to limit the migrating speed.
Expand All @@ -528,6 +538,18 @@ migrate-pipeline-size 16
# Default: 10000
migrate-sequence-gap 10000

# The raw_key_value migration way uses batch for migration. This option sets the batch size
# for each migration.
#
# Default: 16kb
migrate-batch-size-kb 16

# Rate limit for migration based on raw_key_value, representing the maximum number of data
# that can be migrated per second. 0 means no limit.
#
# Default: 16M
migrate-batch-rate-limit-mb 16

################################ ROCKSDB #####################################

# Specify the capacity of metadata column family block cache. A larger block cache
Expand Down
110 changes: 110 additions & 0 deletions src/cluster/migrate_batch.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#include "migrate_batch.h"

#include "hiredis.h"
#include "scope_exit.h"

Status MigrateBatch::Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key, const rocksdb::Slice &value) {
if (pending_entries_ == 0 && !prefix_logdata_.empty()) {
// add prefix_logdata_ when the entry is first putted
auto s = PutLogData(prefix_logdata_);
if (!s.IsOK()) {
return s;
}
}
auto s = write_batch_.Put(cf, key, value);
if (!s.ok()) {
return {Status::NotOK, fmt::format("put key value to migrate batch failed, {}", s.ToString())};
}

pending_entries_++;
entries_num_++;
return Status::OK();
}

Status MigrateBatch::Delete(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key) {
auto s = write_batch_.Delete(cf, key);
if (!s.ok()) {
return {Status::NotOK, fmt::format("delete key from migrate batch failed, {}", s.ToString())};
}
pending_entries_++;
entries_num_++;
return Status::OK();
}

Status MigrateBatch::PutLogData(const rocksdb::Slice &blob) {
auto s = write_batch_.PutLogData(blob);
if (!s.ok()) {
return {Status::NotOK, fmt::format("put log data to migrate batch failed, {}", s.ToString())};
}
pending_entries_++;
entries_num_++;
return Status::OK();
}

void MigrateBatch::SetPrefixLogData(const std::string &prefix_logdata) { prefix_logdata_ = prefix_logdata; }

Status MigrateBatch::Send() {
if (pending_entries_ == 0) {
return Status::OK();
}

auto s = sendBatchSetCmd(slot_, dst_redis_context_, write_batch_);
if (!s.IsOK()) {
return {Status::NotOK, fmt::format("BATCHSET command failed, {}", s.Msg())};
}

sent_bytes_ += write_batch_.GetDataSize();
sent_batches_num_++;
pending_entries_ = 0;
write_batch_.Clear();
return Status::OK();
}

Status MigrateBatch::sendBatchSetCmd(int16_t slot, redisContext *redis_context,
const rocksdb::WriteBatch &write_batch) {
if (redis_context == nullptr) {
return {Status::NotOK, "redis context is null"};
}

auto *reply = static_cast<redisReply *>(
redisCommand(redis_context, "BATCHSET %d %b", slot, write_batch.Data().c_str(), write_batch.GetDataSize()));
auto exit = MakeScopeExit([reply] {
if (reply != nullptr) {
freeReplyObject(reply);
}
});

if (redis_context->err != 0) {
return {Status::NotOK, std::string(redis_context->errstr)};
}

if (reply == nullptr) {
return {Status::NotOK, "get null reply"};
}

if (reply->type == REDIS_REPLY_ERROR) {
auto error_str = std::string(reply->str);
return {Status::NotOK, error_str};
}
return Status::OK();
}
65 changes: 65 additions & 0 deletions src/cluster/migrate_batch.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

#pragma once

#include <rocksdb/write_batch.h>

#include "status.h"

struct redisContext;

class MigrateBatch {
public:
MigrateBatch() = default;
MigrateBatch(int16_t slot, redisContext *dst_redis_context, uint32_t max_bytes)
: slot_(slot), dst_redis_context_(dst_redis_context), max_bytes_(max_bytes) {}

~MigrateBatch() = default;

Status Put(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key, const rocksdb::Slice &value);
Status Delete(rocksdb::ColumnFamilyHandle *cf, const rocksdb::Slice &key);
Status PutLogData(const rocksdb::Slice &blob);
void SetPrefixLogData(const std::string &prefix_logdata);
Status Send();

void SetMaxBytes(uint32_t max_bytes) {
if (max_bytes_ != max_bytes) max_bytes_ = max_bytes;
}
bool IsFull() const { return write_batch_.GetDataSize() >= max_bytes_; }
size_t GetDataSize() const { return write_batch_.GetDataSize(); }
uint64_t GetSentBytes() const { return sent_bytes_; }
uint32_t GetSentBatchesNum() const { return sent_batches_num_; }
uint32_t GetEntriesNum() const { return entries_num_; }

private:
static Status sendBatchSetCmd(int16_t slot, redisContext *redis_context, const rocksdb::WriteBatch &write_batch);

rocksdb::WriteBatch write_batch_{};
std::string prefix_logdata_{};
uint64_t sent_bytes_ = 0;
uint32_t sent_batches_num_ = 0;
uint32_t entries_num_ = 0;
uint32_t pending_entries_ = 0;

int16_t slot_;
redisContext *dst_redis_context_;
uint32_t max_bytes_;
};
Loading

0 comments on commit d1f8c6c

Please sign in to comment.