Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add LMove command #577

Merged
merged 6 commits into from
May 19, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Changelog
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Version 999.999.999

New features
- Add LMove command

ShooterIT marked this conversation as resolved.
Show resolved Hide resolved
# Version 1.3.1

New features
Expand Down
22 changes: 22 additions & 0 deletions src/lock_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

#include <thread>
#include <string>
#include <set>

LockManager::LockManager(int hash_power): hash_power_(hash_power) {
hash_mask_ = (1U << hash_power) - 1;
Expand Down Expand Up @@ -51,3 +52,24 @@ void LockManager::Lock(const rocksdb::Slice &key) {
void LockManager::UnLock(const rocksdb::Slice &key) {
mutex_pool_[hash(key)]->unlock();
}

std::vector<std::mutex *> LockManager::MultiGet(const std::vector<std::string> &keys) {
std::vector<std::mutex *> locks;
std::set<unsigned, std::greater<unsigned>> to_acquire_indexes;
// We are using the `set` to avoid retrieving the mutex, as well as guarantee to retrieve
// the order of locks.
//
// For example, we need lock the key `A` and `B` and they have the same lock hash
// index, it will be deadlock if lock the same mutex twice. Besides, we also need
// to order the mutex before acquiring locks since different threads may acquire
// same keys with different order.
for (const auto &key : keys) {
to_acquire_indexes.insert(hash(key));
}

locks.reserve(to_acquire_indexes.size());
for (const auto &index : to_acquire_indexes) {
locks.emplace_back(mutex_pool_[index]);
}
return locks;
}
25 changes: 25 additions & 0 deletions src/lock_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@

#include <mutex>
#include <vector>
#include <string>
#include <functional>

#include <rocksdb/db.h>

Expand All @@ -33,6 +35,7 @@ class LockManager {
unsigned Size();
void Lock(const rocksdb::Slice &key);
void UnLock(const rocksdb::Slice &key);
std::vector<std::mutex *> MultiGet(const std::vector<std::string> &keys);

private:
int hash_power_;
Expand All @@ -55,3 +58,25 @@ class LockGuard {
LockManager *lock_mgr_ = nullptr;
rocksdb::Slice key_;
};

class MultiLockGuard {
public:
explicit MultiLockGuard(LockManager *lock_mgr, const std::vector<std::string> &keys):
lock_mgr_(lock_mgr) {
locks_ = lock_mgr_->MultiGet(keys);
for (const auto &iter : locks_) {
iter->lock();
}
}

~MultiLockGuard() {
// Lock with order `A B C` and unlock should be `C B A`
ShooterIT marked this conversation as resolved.
Show resolved Hide resolved
for (auto iter = locks_.rbegin(); iter != locks_.rend(); ++iter) {
(*iter)->unlock();
}
}

private:
LockManager *lock_mgr_ = nullptr;
std::vector<std::mutex*> locks_;
};
33 changes: 33 additions & 0 deletions src/redis_cmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1802,6 +1802,38 @@ class CommandRPopLPUSH : public Commander {
}
};

class CommandLMove : public Commander {
public:
Status Parse(const std::vector<std::string> &args) override {
auto arg_val = Util::ToLower(args_[3]);
if (arg_val != "left" && arg_val != "right") {
return Status(Status::RedisParseErr, errInvalidSyntax);
}
src_left_ = arg_val == "left";
arg_val = Util::ToLower(args_[4]);
if (arg_val != "left" && arg_val != "right") {
return Status(Status::RedisParseErr, errInvalidSyntax);
}
dst_left_ = arg_val == "left";
return Status::OK();
}

Status Execute(Server *svr, Connection *conn, std::string *output) override {
Redis::List list_db(svr->storage_, conn->GetNamespace());
std::string elem;
auto s = list_db.LMove(args_[1], args_[2], src_left_, dst_left_, &elem);
if (!s.ok() && !s.IsNotFound()) {
return Status(Status::RedisExecErr, s.ToString());
}
*output = s.IsNotFound() ? Redis::NilString() : Redis::BulkString(elem);
return Status::OK();
}

private:
bool src_left_;
bool dst_left_;
};

class CommandSAdd : public Commander {
public:
Status Execute(Server *svr, Connection *conn, std::string *output) override {
Expand Down Expand Up @@ -4708,6 +4740,7 @@ CommandAttributes redisCommandTable[] = {
ADD_CMD("llen", 2, "read-only", 1, 1, 1, CommandLLen),
ADD_CMD("lset", 4, "write", 1, 1, 1, CommandLSet),
ADD_CMD("rpoplpush", 3, "write", 1, 2, 1, CommandRPopLPUSH),
ADD_CMD("lmove", 5, "write", 1, 2, 1, CommandLMove),

ADD_CMD("sadd", -3, "write", 1, 1, 1, CommandSAdd),
ADD_CMD("srem", -3, "write", 1, 1, 1, CommandSRem),
Expand Down
133 changes: 133 additions & 0 deletions src/redis_list.cc
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,139 @@ rocksdb::Status List::RPopLPush(const Slice &src, const Slice &dst, std::string
return s;
}

rocksdb::Status List::LMove(const rocksdb::Slice &src, const rocksdb::Slice &dst,
bool src_left, bool dst_left, std::string *elem) {
if (src == dst) {
return lmoveOnSingleList(src, src_left, dst_left, elem);
}
return lmoveOnTwoLists(src, dst, src_left, dst_left, elem);
}

rocksdb::Status List::lmoveOnSingleList(const rocksdb::Slice &src, bool src_left, bool dst_left, std::string *elem) {
std::string ns_key;
AppendNamespacePrefix(src, &ns_key);

LockGuard guard(storage_->GetLockManager(), ns_key);
ListMetadata metadata(false);
rocksdb::Status s = GetMetadata(ns_key, &metadata);
if (!s.ok()) {
return s;
}

elem->clear();

uint64_t curr_index = src_left ? metadata.head : metadata.tail - 1;
std::string curr_index_buf;
PutFixed64(&curr_index_buf, curr_index);
std::string curr_sub_key;
InternalKey(ns_key, curr_index_buf, metadata.version, storage_->IsSlotIdEncoded()).Encode(&curr_sub_key);
s = db_->Get(rocksdb::ReadOptions(), curr_sub_key, elem);
if (!s.ok()) {
return s;
}

if (src_left == dst_left) {
// no-op
return rocksdb::Status::OK();
}

if (metadata.size == 1) {
// if there is only one element in the list - do nothing, just get it
return rocksdb::Status::OK();
}

rocksdb::WriteBatch batch;
WriteBatchLogData log_data(kRedisList, {std::to_string(kRedisCmdLMove)});
batch.PutLogData(log_data.Encode());

batch.Delete(curr_sub_key);

if (src_left) {
++metadata.head;
++metadata.tail;
} else {
--metadata.head;
--metadata.tail;
}

uint64_t new_index = src_left ? metadata.tail - 1 : metadata.head;
std::string new_index_buf;
PutFixed64(&new_index_buf, new_index);
std::string new_sub_key;
InternalKey(ns_key, new_index_buf, metadata.version, storage_->IsSlotIdEncoded()).Encode(&new_sub_key);
batch.Put(new_sub_key, *elem);

std::string bytes;
metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, ns_key, bytes);

return storage_->Write(rocksdb::WriteOptions(), &batch);
}

rocksdb::Status List::lmoveOnTwoLists(const rocksdb::Slice &src, const rocksdb::Slice &dst,
bool src_left, bool dst_left, std::string *elem) {
std::string src_ns_key;
AppendNamespacePrefix(src, &src_ns_key);
std::string dst_ns_key;
AppendNamespacePrefix(dst, &dst_ns_key);

std::vector<std::string> lock_keys{src_ns_key, dst_ns_key};
MultiLockGuard guard(storage_->GetLockManager(), lock_keys);
ListMetadata src_metadata(false);
auto s = GetMetadata(src_ns_key, &src_metadata);
if (!s.ok()) {
return s;
}

ListMetadata dst_metadata(false);
s = GetMetadata(dst_ns_key, &dst_metadata);
if (!s.ok() && !s.IsNotFound()) {
return s;
}

elem->clear();

rocksdb::WriteBatch batch;
WriteBatchLogData log_data(kRedisList, {std::to_string(kRedisCmdLMove)});
batch.PutLogData(log_data.Encode());

uint64_t src_index = src_left ? src_metadata.head : src_metadata.tail - 1;
std::string src_buf;
PutFixed64(&src_buf, src_index);
std::string src_sub_key;
InternalKey(src_ns_key, src_buf, src_metadata.version, storage_->IsSlotIdEncoded()).Encode(&src_sub_key);
s = db_->Get(rocksdb::ReadOptions(), src_sub_key, elem);
if (!s.ok()) {
return s;
}

batch.Delete(src_sub_key);
if (src_metadata.size == 1) {
batch.Delete(metadata_cf_handle_, src_ns_key);
} else {
std::string bytes;
src_metadata.size -= 1;
src_left ? ++src_metadata.head : --src_metadata.tail;
src_metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, src_ns_key, bytes);
}

uint64_t dst_index = dst_left ? dst_metadata.head - 1 : dst_metadata.tail;
std::string dst_buf;
PutFixed64(&dst_buf, dst_index);
std::string dst_sub_key;
InternalKey(dst_ns_key, dst_buf, dst_metadata.version, storage_->IsSlotIdEncoded()).Encode(&dst_sub_key);
batch.Put(dst_sub_key, *elem);
dst_left ? --dst_metadata.head : ++dst_metadata.tail;

std::string bytes;
dst_metadata.size += 1;
dst_metadata.Encode(&bytes);
batch.Put(metadata_cf_handle_, dst_ns_key, bytes);

return storage_->Write(rocksdb::WriteOptions(), &batch);
}

// Caution: trim the big list may block the server
rocksdb::Status List::Trim(const Slice &user_key, int start, int stop) {
uint32_t trim_cnt = 0;
Expand Down
3 changes: 3 additions & 0 deletions src/redis_list.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,12 +40,15 @@ class List : public Database {
rocksdb::Status Rem(const Slice &user_key, int count, const Slice &elem, int *ret);
rocksdb::Status Index(const Slice &user_key, int index, std::string *elem);
rocksdb::Status RPopLPush(const Slice &src, const Slice &dst, std::string *elem);
rocksdb::Status LMove(const Slice &src, const Slice &dst, bool src_left, bool dst_left, std::string *elem);
rocksdb::Status Push(const Slice &user_key, const std::vector<Slice> &elems, bool left, int *ret);
rocksdb::Status PushX(const Slice &user_key, const std::vector<Slice> &elems, bool left, int *ret);
rocksdb::Status Range(const Slice &user_key, int start, int stop, std::vector<std::string> *elems);

private:
rocksdb::Status GetMetadata(const Slice &ns_key, ListMetadata *metadata);
rocksdb::Status push(const Slice &user_key, std::vector<Slice> elems, bool create_if_missing, bool left, int *ret);
rocksdb::Status lmoveOnSingleList(const Slice &src, bool src_left, bool dst_left, std::string *elem);
rocksdb::Status lmoveOnTwoLists(const Slice &src, const Slice &dst, bool src_left, bool dst_left, std::string *elem);
};
} // namespace Redis
1 change: 1 addition & 0 deletions src/redis_metadata.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ enum RedisCommand {
kRedisCmdExpire,
kRedisCmdSetBit,
kRedisCmdBitOp,
kRedisCmdLMove,
};

const std::vector<std::string> RedisTypeNames = {
Expand Down
23 changes: 23 additions & 0 deletions tests/rwlock_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,29 @@ TEST(LockManager, LockKey) {
}
}

TEST(LockManager, LockMultiKeys) {
LockManager lock_manager(2);

std::vector<std::string> keys1 = {"a0", "a1", "a2", "a3", "a4", "a5", "a6", "a7"};
std::vector<std::string> keys2 = {"a7", "a6", "a5", "a4", "a3", "a2", "a1", "a0"};

std::thread ths[10];
for(int i = 0; i < 10; i++) {
if (i % 2 == 0) {
ths[i] = std::thread([&lock_manager, &keys1]() {
MultiLockGuard(&lock_manager, keys1);
});
} else {
ths[i] = std::thread([&lock_manager, &keys2]() {
MultiLockGuard(&lock_manager, keys2);
});
}
}
for (auto & th : ths) {
th.join();
}
}

TEST(ReadWriteLock, ReadLockGurad) {
RWLock::ReadWriteLock rwlock;
int val = 1;
Expand Down
Loading