Skip to content

Commit

Permalink
Allows Kvrocks to listen to only the unix socket (#809)
Browse files Browse the repository at this point in the history
If unix socket is specified, don't listen default TCP if addr:port wasn't explicitly set.
  • Loading branch information
torwig committed Sep 6, 2022
1 parent 9c64cb2 commit dbd968b
Show file tree
Hide file tree
Showing 7 changed files with 28 additions and 16 deletions.
7 changes: 3 additions & 4 deletions kvrocks.conf
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
################################ GENERAL #####################################

# By default kvrocks listens for connections from all the network interfaces
# available on the server. It is possible to listen to just one or multiple
# interfaces using the "bind" configuration directive, followed by one or
# more IP addresses.
# By default kvrocks listens for connections from localhost interface.
# It is possible to listen to just one or multiple interfaces using
# the "bind" configuration directive, followed by one or more IP addresses.
#
# Examples:
#
Expand Down
21 changes: 15 additions & 6 deletions src/config.cc
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ const char *errNotEnableBlobDB = "Must set rocksdb.enable_blob_files to yes firs
const char *errNotSetLevelCompactionDynamicLevelBytes =
"Must set rocksdb.level_compaction_dynamic_level_bytes yes first.";

const char *kDefaultBindAddress = "127.0.0.1";

configEnum compression_type_enum[] = {
{"no", rocksdb::CompressionType::kNoCompression},
{"snappy", rocksdb::CompressionType::kSnappyCompression},
Expand All @@ -59,8 +61,6 @@ configEnum supervised_mode_enum[] = {
{nullptr, 0}
};

ConfigField::~ConfigField() = default;

std::string trimRocksDBPrefix(std::string s) {
if (strncasecmp(s.data(), "rocksdb.", 8)) return s;
return s.substr(8, s.size()-8);
Expand Down Expand Up @@ -88,14 +88,13 @@ Config::Config() {
bool readonly;
std::unique_ptr<ConfigField> field;

FieldWrapper(std::string name, bool readonly,
ConfigField* field)
FieldWrapper(std::string name, bool readonly, ConfigField *field)
: name(std::move(name)), readonly(readonly), field(field) {}
};
FieldWrapper fields[] = {
{"daemonize", true, new YesNoField(&daemonize, false)},
{"bind", true, new StringField(&binds_, "127.0.0.1")},
{"port", true, new IntField(&port, 6666, 1, 65535)},
{"bind", true, new StringField(&binds_, "")},
{"port", true, new IntField(&port, kDefaultPort, 1, 65535)},
{"workers", true, new IntField(&workers, 8, 1, 256)},
{"timeout", false, new IntField(&timeout, 0, 0, INT_MAX)},
{"tcp-backlog", true, new IntField(&backlog, 511, 0, INT_MAX)},
Expand Down Expand Up @@ -539,6 +538,16 @@ Status Config::finish() {
if ((cluster_enabled) && !tokens.empty()) {
return Status(Status::NotOK, "enabled cluster mode wasn't allowed while the namespace exists");
}
if (unixsocket.empty() && binds.size() == 0) {
binds.emplace_back(kDefaultBindAddress);
}
if (cluster_enabled && binds.size() == 0) {
return Status(Status::NotOK, "node is in cluster mode, but TCP listen address "
"wasn't specified via configuration file");
}
if (master_port != 0 && binds.size() == 0) {
return Status(Status::NotOK, "replication doesn't supports unix socket");
}
if (db_dir.empty()) db_dir = dir + "/db";
if (backup_dir.empty()) backup_dir = dir + "/backup";
if (log_dir.empty()) log_dir = dir;
Expand Down
4 changes: 3 additions & 1 deletion src/config.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ class Storage;
const size_t KiB = 1024L;
const size_t MiB = 1024L * KiB;
const size_t GiB = 1024L * MiB;
const int kDefaultPort = 6666;

extern const char *kDefaultNamespace;

Expand All @@ -64,7 +65,8 @@ struct Config{
public:
Config();
~Config() = default;
int port = 6666;

int port = 0;
int workers = 0;
int timeout = 0;
int loglevel = 0;
Expand Down
2 changes: 1 addition & 1 deletion src/config_type.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ const char *configEnumGetName(configEnum *ce, int val);
class ConfigField {
public:
ConfigField() = default;
virtual ~ConfigField() = 0;
virtual ~ConfigField() = default;
virtual std::string ToString() = 0;
virtual Status Set(const std::string &v) = 0;
virtual Status ToNumber(int64_t *n) { return Status(Status::NotOK, "not supported"); }
Expand Down
4 changes: 2 additions & 2 deletions src/main.cc
Original file line number Diff line number Diff line change
Expand Up @@ -285,11 +285,11 @@ int main(int argc, char* argv[]) {
exit(1);
}
initGoogleLog(&config);
LOG(INFO)<< "Version: " << VERSION << " @" << GIT_COMMIT << std::endl;
LOG(INFO) << "Version: " << VERSION << " @" << GIT_COMMIT << std::endl;
// Tricky: We don't expect that different instances running on the same port,
// but the server use REUSE_PORT to support the multi listeners. So we connect
// the listen port to check if the port has already listened or not.
if (Util::IsPortInUse(config.port)) {
if (config.binds.size() != 0 && Util::IsPortInUse(config.port)) {
LOG(ERROR)<< "Could not create server TCP since the specified port["
<< config.port << "] is already in use" << std::endl;
exit(1);
Expand Down
3 changes: 2 additions & 1 deletion src/server.cc
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,11 @@ Server::Server(Engine::Storage *storage, Config *config) :
if (!config->unixsocket.empty() && i == 0) {
Status s = worker->ListenUnixSocket(config->unixsocket, config->unixsocketperm, config->backlog);
if (!s.IsOK()) {
LOG(ERROR) << "[server] Failed to listen on unix socket: "<< config->unixsocket
LOG(ERROR) << "[server] Failed to listen on unix socket: " << config->unixsocket
<< ", encounter error: " << s.Msg();
exit(1);
}
LOG(INFO) << "[server] Listening on unix socket: " << config->unixsocket;
}
worker_threads_.emplace_back(Util::MakeUnique<WorkerThread>(std::move(worker)));
}
Expand Down
3 changes: 2 additions & 1 deletion src/worker.cc
Original file line number Diff line number Diff line change
Expand Up @@ -50,10 +50,11 @@ Worker::Worker(Server *svr, Config *config, bool repl) : svr_(svr) {
for (const auto &bind : binds) {
s = listenTCP(bind, port, config->backlog);
if (!s.IsOK()) {
LOG(ERROR) << "[worker] Failed to listen on: "<< bind << ":" << port
LOG(ERROR) << "[worker] Failed to listen on: " << bind << ":" << port
<< ", encounter error: " << s.Msg();
exit(1);
}
LOG(INFO) << "[worker] Listening on: " << bind << ":" << port;
}
}

Expand Down

0 comments on commit dbd968b

Please sign in to comment.