Skip to content

Commit

Permalink
Merge branch 'unstable' into move
Browse files Browse the repository at this point in the history
  • Loading branch information
Chiro11 committed Apr 5, 2024
2 parents 7626a8a + d7b8450 commit a7c4c38
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 67 deletions.
4 changes: 2 additions & 2 deletions cmake/snappy.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ include_guard()
include(cmake/utils.cmake)

FetchContent_DeclareGitHubWithMirror(snappy
google/snappy f725f6766bfc62418c6491b504c8e5865ec99412
MD5=17a982c9b0c667b3744e1fecba0046f7
google/snappy 1.2.0
MD5=555f4af7472ce585c12709121808a060
)

FetchContent_MakeAvailableWithArgs(snappy
Expand Down
65 changes: 0 additions & 65 deletions src/commands/cmd_server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1207,70 +1207,6 @@ class CommandRdb : public Commander {
uint32_t db_index_ = 0;
};

class CommandAnalyze : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
if (args.size() <= 1) return {Status::RedisExecErr, errInvalidSyntax};
for (unsigned int i = 1; i < args.size(); ++i) {
command_args_.push_back(args[i]);
}
return Status::OK();
}
Status Execute(Server *srv, Connection *conn, std::string *output) override {
auto commands = redis::CommandTable::Get();
auto cmd_iter = commands->find(util::ToLower(command_args_[0]));
if (cmd_iter == commands->end()) {
// unsupported redis command
return {Status::RedisExecErr, errInvalidSyntax};
}
auto redis_cmd = cmd_iter->second;
auto cmd = redis_cmd->factory();
cmd->SetAttributes(redis_cmd);
cmd->SetArgs(command_args_);

if (!cmd->GetAttributes()->CheckArity(static_cast<int>(command_args_.size()))) {
*output = redis::Error("ERR wrong number of arguments");
return {Status::RedisExecErr, errWrongNumOfArguments};
}

auto s = cmd->Parse(command_args_);
if (!s.IsOK()) {
return s;
}

auto prev_perf_level = rocksdb::GetPerfLevel();
rocksdb::SetPerfLevel(rocksdb::PerfLevel::kEnableTimeExceptForMutex);
rocksdb::get_perf_context()->Reset();
rocksdb::get_iostats_context()->Reset();

std::string command_output;
s = cmd->Execute(srv, conn, &command_output);
if (!s.IsOK()) {
return s;
}

if (command_output[0] == '-') {
*output = command_output;
return s;
}

std::string perf_context = rocksdb::get_perf_context()->ToString(true);
std::string iostats_context = rocksdb::get_iostats_context()->ToString(true);
rocksdb::get_perf_context()->Reset();
rocksdb::get_iostats_context()->Reset();
rocksdb::SetPerfLevel(prev_perf_level);

*output = redis::MultiLen(3); // command output + perf context + iostats context
*output += command_output;
*output += redis::BulkString(perf_context);
*output += redis::BulkString(iostats_context);
return Status::OK();
}

private:
std::vector<std::string> command_args_;
};

class CommandReset : public Commander {
public:
Status Execute(Server *srv, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -1372,7 +1308,6 @@ REDIS_REGISTER_COMMANDS(MakeCmdAttr<CommandAuth>("auth", 2, "read-only ok-loadin
MakeCmdAttr<CommandSlaveOf>("slaveof", 3, "read-only exclusive no-script", 0, 0, 0),
MakeCmdAttr<CommandStats>("stats", 1, "read-only", 0, 0, 0),
MakeCmdAttr<CommandRdb>("rdb", -3, "write exclusive", 0, 0, 0),
MakeCmdAttr<CommandAnalyze>("analyze", -1, "", 0, 0, 0),
MakeCmdAttr<CommandReset>("reset", 1, "ok-loading multi no-script pub-sub", 0, 0, 0),
MakeCmdAttr<CommandApplyBatch>("applybatch", -2, "write no-multi", 0, 0, 0), )
} // namespace redis
3 changes: 3 additions & 0 deletions src/server/namespace.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ bool Namespace::IsAllowModify() const {

Status Namespace::LoadAndRewrite() {
auto config = storage_->GetConfig();
// Namespace is NOT allowed in the cluster mode, so we don't need to rewrite here.
if (config->cluster_enabled) return Status::OK();

// Load from the configuration file first
tokens_ = config->load_tokens;

Expand Down
3 changes: 3 additions & 0 deletions src/storage/redis_pubsub.cc
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@
namespace redis {

rocksdb::Status PubSub::Publish(const Slice &channel, const Slice &value) {
if (storage_->GetConfig()->IsSlave()) {
return rocksdb::Status::NotSupported("can't publish to db in slave mode");
}
auto batch = storage_->GetWriteBatchBase();
batch->Put(pubsub_cf_handle_, channel, value);
return storage_->Write(storage_->DefaultWriteOptions(), batch->GetWriteBatch());
Expand Down
3 changes: 3 additions & 0 deletions src/storage/storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -863,6 +863,9 @@ ObserverOrUniquePtr<rocksdb::WriteBatchBase> Storage::GetWriteBatchBase() {
}

Status Storage::WriteToPropagateCF(const std::string &key, const std::string &value) {
if (config_->IsSlave()) {
return {Status::NotOK, "cannot write to propagate column family in slave mode"};
}
auto batch = GetWriteBatchBase();
auto cf = GetCFHandle(kPropagateColumnFamilyName);
batch->Put(cf, key, value);
Expand Down
58 changes: 58 additions & 0 deletions tests/gocase/integration/replication/replication_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,64 @@ import (
"github.com/stretchr/testify/require"
)

func TestClusterReplication(t *testing.T) {
ctx := context.Background()

masterSrv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"})
defer func() { masterSrv.Close() }()
masterClient := masterSrv.NewClient()
defer func() { require.NoError(t, masterClient.Close()) }()
masterNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00"
require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterNodeID).Err())

replicaSrv := util.StartServer(t, map[string]string{
"cluster-enabled": "yes",
// enabled the replication namespace to reproduce the issue #2214
"repl-namespace-enabled": "yes",
})
defer func() { replicaSrv.Close() }()
replicaClient := replicaSrv.NewClient()
// allow to run the read-only command in the replica
require.NoError(t, replicaClient.ReadOnly(ctx).Err())
defer func() { require.NoError(t, replicaClient.Close()) }()
replicaNodeID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01"
require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODEID", replicaNodeID).Err())

clusterNodes := fmt.Sprintf("%s 127.0.0.1 %d master - 0-16383", masterNodeID, masterSrv.Port())
clusterNodes = fmt.Sprintf("%s\n%s 127.0.0.1 %d slave %s", clusterNodes, replicaNodeID, replicaSrv.Port(), masterNodeID)

require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())
require.NoError(t, replicaClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err())

t.Run("Cluster replication should work", func(t *testing.T) {
util.WaitForSync(t, replicaClient)
require.Equal(t, "slave", util.FindInfoEntry(replicaClient, "role"))
masterClient.Set(ctx, "k0", "v0", 0)
masterClient.LPush(ctx, "k1", "e0", "e1", "e2")
util.WaitForOffsetSync(t, masterClient, replicaClient)

require.Equal(t, "v0", replicaClient.Get(ctx, "k0").Val())
require.Equal(t, []string{"e2", "e1", "e0"}, replicaClient.LRange(ctx, "k1", 0, -1).Val())
})

t.Run("Cluster replication should work normally after restart(issue #2214)", func(t *testing.T) {
replicaSrv.Close()
masterClient.Set(ctx, "k0", "v1", 0)
masterClient.HSet(ctx, "k2", "f0", "v0", "f1", "v1")

// start the replica server again
replicaSrv.Start()
_ = replicaClient.Close()
replicaClient = replicaSrv.NewClient()
// allow to run the read-only command in the replica
require.NoError(t, replicaClient.ReadOnly(ctx).Err())

util.WaitForOffsetSync(t, masterClient, replicaClient)
require.Equal(t, "v1", replicaClient.Get(ctx, "k0").Val())
require.Equal(t, map[string]string{"f0": "v0", "f1": "v1"}, replicaClient.HGetAll(ctx, "k2").Val())
})
}

func TestReplicationWithHostname(t *testing.T) {
srvA := util.StartServer(t, map[string]string{})
defer srvA.Close()
Expand Down
3 changes: 3 additions & 0 deletions tests/gocase/util/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,10 @@ func (s *KvrocksServer) close(keepDir bool) {

func (s *KvrocksServer) Restart() {
s.close(true)
s.Start()
}

func (s *KvrocksServer) Start() {
b := *binPath
require.NotEmpty(s.t, b, "please set the binary path by `-binPath`")
cmd := exec.Command(b)
Expand Down

0 comments on commit a7c4c38

Please sign in to comment.