diff --git a/DEPS.bzl b/DEPS.bzl index 9b5b58ca8c866..442749f514a5e 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -3582,8 +3582,8 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sum = "h1:cPtMXTExqjzk8L40qhrgB/mXiBXKP5LRU0vwjtI2Xxo=", - version = "v2.0.4", + sum = "h1:RI6bs9TDIIJ96N0lR5uZoGO8QNot4qS/1l+Mobx0InM=", + version = "v2.0.5-0.20230110071533-f313ddf58d73", ) go_repository( name = "com_github_tikv_pd_client", diff --git a/build/nogo_config.json b/build/nogo_config.json index 3aecc81b0519e..97a1a1feed50e 100644 --- a/build/nogo_config.json +++ b/build/nogo_config.json @@ -405,7 +405,8 @@ "parser/": "parser code", "meta/": "parser code", "extension/": "extension code", - "resourcemanager/": "resourcemanager code" + "resourcemanager/": "resourcemanager code", + "keyspace": "keyspace code" } }, "shift": { @@ -767,7 +768,8 @@ "server/conn_stmt.go": "server/conn_stmt.go", "server/conn_test.go": "server/conn_test.go", "extension/": "extension code", - "resourcemanager/": "resourcemanager code" + "resourcemanager/": "resourcemanager code", + "keyspace/": "keyspace code" } }, "SA2000": { diff --git a/config/config.go b/config/config.go index bc25b8c9b9ec3..54ed0cf44c0c6 100644 --- a/config/config.go +++ b/config/config.go @@ -89,6 +89,8 @@ const ( DefTempDir = "/tmp/tidb" // DefAuthTokenRefreshInterval is the default time interval to refresh tidb auth token. DefAuthTokenRefreshInterval = time.Hour + // EnvVarKeyspaceName is the system env name for keyspace name. + EnvVarKeyspaceName = "KEYSPACE_NAME" ) // Valid config maps @@ -183,6 +185,7 @@ type Config struct { VersionComment string `toml:"version-comment" json:"version-comment"` TiDBEdition string `toml:"tidb-edition" json:"tidb-edition"` TiDBReleaseVersion string `toml:"tidb-release-version" json:"tidb-release-version"` + KeyspaceName string `toml:"keyspace-name" json:"keyspace-name"` Log Log `toml:"log" json:"log"` Instance Instance `toml:"instance" json:"instance"` Security Security `toml:"security" json:"security"` @@ -1457,3 +1460,9 @@ func ContainHiddenConfig(s string) bool { } return false } + +// GetGlobalKeyspaceName is used to get global keyspace name +// from config file or command line. +func GetGlobalKeyspaceName() string { + return GetGlobalConfig().KeyspaceName +} diff --git a/config/config_test.go b/config/config_test.go index 9a6d12a284817..4bd0911661e11 100644 --- a/config/config_test.go +++ b/config/config_test.go @@ -1288,3 +1288,18 @@ func TestStatsLoadLimit(t *testing.T) { checkQueueSizeValid(DefMaxOfStatsLoadQueueSizeLimit, true) checkQueueSizeValid(DefMaxOfStatsLoadQueueSizeLimit+1, false) } + +func TestGetGlobalKeyspaceName(t *testing.T) { + conf := NewConfig() + require.Empty(t, conf.KeyspaceName) + + UpdateGlobal(func(conf *Config) { + conf.KeyspaceName = "test" + }) + + require.Equal(t, "test", GetGlobalKeyspaceName()) + + UpdateGlobal(func(conf *Config) { + conf.KeyspaceName = "" + }) +} diff --git a/domain/BUILD.bazel b/domain/BUILD.bazel index f04948ddfe709..6d5306beff501 100644 --- a/domain/BUILD.bazel +++ b/domain/BUILD.bazel @@ -32,6 +32,7 @@ go_library( "//errno", "//infoschema", "//infoschema/perfschema", + "//keyspace", "//kv", "//meta", "//metrics", @@ -55,6 +56,7 @@ go_library( "//util/dbterror", "//util/domainutil", "//util/engine", + "//util/etcd", "//util/execdetails", "//util/expensivequery", "//util/logutil", diff --git a/domain/domain.go b/domain/domain.go index 5f6b0ce3a08a2..a24de789515bc 100644 --- a/domain/domain.go +++ b/domain/domain.go @@ -45,6 +45,7 @@ import ( "github.com/pingcap/tidb/errno" "github.com/pingcap/tidb/infoschema" "github.com/pingcap/tidb/infoschema/perfschema" + "github.com/pingcap/tidb/keyspace" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/meta" "github.com/pingcap/tidb/metrics" @@ -65,6 +66,7 @@ import ( "github.com/pingcap/tidb/util/dbterror" "github.com/pingcap/tidb/util/domainutil" "github.com/pingcap/tidb/util/engine" + "github.com/pingcap/tidb/util/etcd" "github.com/pingcap/tidb/util/expensivequery" "github.com/pingcap/tidb/util/logutil" "github.com/pingcap/tidb/util/memory" @@ -965,6 +967,9 @@ func (do *Domain) Init( if err != nil { return errors.Trace(err) } + + etcd.SetEtcdCliByNamespace(cli, keyspace.MakeKeyspaceEtcdNamespace(do.store.GetCodec())) + do.etcdClient = cli } } diff --git a/go.mod b/go.mod index 1af0523464e97..65440ab9d18cb 100644 --- a/go.mod +++ b/go.mod @@ -90,7 +90,7 @@ require ( github.com/stretchr/testify v1.8.0 github.com/tdakkota/asciicheck v0.1.1 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.4 + github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73 github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 github.com/twmb/murmur3 v1.1.3 diff --git a/go.sum b/go.sum index f7e1d15fa689c..e34b4c5935340 100644 --- a/go.sum +++ b/go.sum @@ -936,8 +936,8 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/client-go/v2 v2.0.4 h1:cPtMXTExqjzk8L40qhrgB/mXiBXKP5LRU0vwjtI2Xxo= -github.com/tikv/client-go/v2 v2.0.4/go.mod h1:v52O5zDtv2BBus4lm5yrSQhxGW4Z4RaXWfg0U1Kuyqo= +github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73 h1:RI6bs9TDIIJ96N0lR5uZoGO8QNot4qS/1l+Mobx0InM= +github.com/tikv/client-go/v2 v2.0.5-0.20230110071533-f313ddf58d73/go.mod h1:dO/2a/xi/EO3eVv9xN5G1VFtd/hythzgTeeCbW5SWuI= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07 h1:ckPpxKcl75mO2N6a4cJXiZH43hvcHPpqc9dh1TmH1nc= github.com/tikv/pd/client v0.0.0-20221031025758-80f0d8ca4d07/go.mod h1:CipBxPfxPUME+BImx9MUYXCnAVLS3VJUr3mnSJwh40A= github.com/timakin/bodyclose v0.0.0-20210704033933-f49887972144 h1:kl4KhGNsJIbDHS9/4U9yQo1UcPQM0kOMJHn29EoH/Ro= diff --git a/keyspace/BUILD.bazel b/keyspace/BUILD.bazel new file mode 100644 index 0000000000000..a536722a018d7 --- /dev/null +++ b/keyspace/BUILD.bazel @@ -0,0 +1,12 @@ +load("@io_bazel_rules_go//go:def.bzl", "go_library") + +go_library( + name = "keyspace", + srcs = ["keyspace.go"], + importpath = "github.com/pingcap/tidb/keyspace", + visibility = ["//visibility:public"], + deps = [ + "@com_github_pingcap_kvproto//pkg/kvrpcpb", + "@com_github_tikv_client_go_v2//tikv", + ], +) diff --git a/keyspace/keyspace.go b/keyspace/keyspace.go new file mode 100644 index 0000000000000..103d0f742cee7 --- /dev/null +++ b/keyspace/keyspace.go @@ -0,0 +1,38 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package keyspace + +import ( + "fmt" + + "github.com/pingcap/kvproto/pkg/kvrpcpb" + "github.com/tikv/client-go/v2/tikv" +) + +const ( + // tidbKeyspaceEtcdPathPrefix is the keyspace prefix for etcd namespace + tidbKeyspaceEtcdPathPrefix = "/keyspaces/tidb/" +) + +// CodecV1 represents api v1 codec. +var CodecV1 = tikv.NewCodecV1(tikv.ModeTxn) + +// MakeKeyspaceEtcdNamespace return the keyspace prefix path for etcd namespace +func MakeKeyspaceEtcdNamespace(c tikv.Codec) string { + if c.GetAPIVersion() == kvrpcpb.APIVersion_V1 { + return "" + } + return fmt.Sprintf(tidbKeyspaceEtcdPathPrefix+"%d", c.GetKeyspaceID()) +} diff --git a/kv/interface_mock_test.go b/kv/interface_mock_test.go index 561c0aa12baaf..a4eb9b8a71f7d 100644 --- a/kv/interface_mock_test.go +++ b/kv/interface_mock_test.go @@ -187,6 +187,10 @@ func newMockTxn() Transaction { // mockStorage is used to start a must commit-failed txn. type mockStorage struct{} +func (s *mockStorage) GetCodec() tikv.Codec { + return nil +} + func (s *mockStorage) Begin(opts ...tikv.TxnOption) (Transaction, error) { return newMockTxn(), nil } diff --git a/kv/kv.go b/kv/kv.go index 4c855c0938308..346b6a4d25d02 100644 --- a/kv/kv.go +++ b/kv/kv.go @@ -651,6 +651,8 @@ type Storage interface { GetMinSafeTS(txnScope string) uint64 // GetLockWaits return all lock wait information GetLockWaits() ([]*deadlockpb.WaitForEntry, error) + // GetCodec gets the codec of the storage. + GetCodec() tikv.Codec } // EtcdBackend is used for judging a storage is a real TiKV. diff --git a/server/http_status.go b/server/http_status.go index fef13b2c3fd2f..b32237c376f7a 100644 --- a/server/http_status.go +++ b/server/http_status.go @@ -461,8 +461,14 @@ func (s *Server) startStatusServerAndRPCServer(serverMux *http.ServeMux) { grpcServer := NewRPCServer(s.cfg, s.dom, s) service.RegisterChannelzServiceToServer(grpcServer) if s.cfg.Store == "tikv" { + keyspaceName := config.GetGlobalKeyspaceName() for { - fullPath := fmt.Sprintf("tikv://%s", s.cfg.Path) + var fullPath string + if keyspaceName == "" { + fullPath = fmt.Sprintf("%s://%s", s.cfg.Store, s.cfg.Path) + } else { + fullPath = fmt.Sprintf("%s://%s?keyspaceName=%s", s.cfg.Store, s.cfg.Path, keyspaceName) + } store, err := store.New(fullPath) if err != nil { logutil.BgLogger().Error("new tikv store fail", zap.Error(err)) diff --git a/store/BUILD.bazel b/store/BUILD.bazel index dc33aa14eea94..bf8faa0dde9b9 100644 --- a/store/BUILD.bazel +++ b/store/BUILD.bazel @@ -10,6 +10,7 @@ go_library( "//util", "//util/logutil", "@com_github_pingcap_errors//:errors", + "@com_github_pingcap_kvproto//pkg/pdpb", "@org_uber_go_zap//:zap", ], ) diff --git a/store/copr/coprocessor_test.go b/store/copr/coprocessor_test.go index 7790e8f7661fc..4f47bf454d270 100644 --- a/store/copr/coprocessor_test.go +++ b/store/copr/coprocessor_test.go @@ -39,7 +39,7 @@ func TestBuildTasksWithoutBuckets(t *testing.T) { }() _, regionIDs, _ := testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) - pdCli := &tikv.CodecPDClient{Client: pdClient} + pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) defer pdCli.Close() cache := NewRegionCache(tikv.NewRegionCache(pdCli)) @@ -168,7 +168,7 @@ func TestBuildTasksByBuckets(t *testing.T) { cluster.SplitRegionBuckets(regionIDs[0], [][]byte{{}, {'c'}, {'g'}, {'k'}, {'n'}}, regionIDs[0]) cluster.SplitRegionBuckets(regionIDs[1], [][]byte{{'n'}, {'t'}, {'x'}}, regionIDs[1]) cluster.SplitRegionBuckets(regionIDs[2], [][]byte{{'x'}, {}}, regionIDs[2]) - pdCli := &tikv.CodecPDClient{Client: pdClient} + pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) defer pdCli.Close() cache := NewRegionCache(tikv.NewRegionCache(pdCli)) @@ -363,7 +363,7 @@ func TestSplitRegionRanges(t *testing.T) { }() testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) - pdCli := &tikv.CodecPDClient{Client: pdClient} + pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) defer pdCli.Close() cache := NewRegionCache(tikv.NewRegionCache(pdCli)) @@ -425,7 +425,7 @@ func TestRebuild(t *testing.T) { }() storeID, regionIDs, peerIDs := testutils.BootstrapWithMultiRegions(cluster, []byte("m")) - pdCli := &tikv.CodecPDClient{Client: pdClient} + pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) defer pdCli.Close() cache := NewRegionCache(tikv.NewRegionCache(pdCli)) defer cache.Close() @@ -488,7 +488,7 @@ func TestBuildPagingTasks(t *testing.T) { }() _, regionIDs, _ := testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) - pdCli := &tikv.CodecPDClient{Client: pdClient} + pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) defer pdCli.Close() cache := NewRegionCache(tikv.NewRegionCache(pdCli)) @@ -667,7 +667,7 @@ func TestBuildCopTasksWithRowCountHint(t *testing.T) { require.NoError(t, err) }() _, _, _ = testutils.BootstrapWithMultiRegions(cluster, []byte("g"), []byte("n"), []byte("t")) - pdCli := &tikv.CodecPDClient{Client: pdClient} + pdCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) defer pdCli.Close() cache := NewRegionCache(tikv.NewRegionCache(pdCli)) defer cache.Close() diff --git a/store/driver/tikv_driver.go b/store/driver/tikv_driver.go index e1ba5d121608f..bf0f1272184dd 100644 --- a/store/driver/tikv_driver.go +++ b/store/driver/tikv_driver.go @@ -88,6 +88,7 @@ func WithPDClientConfig(client config.PDClient) Option { // TiKVDriver implements engine TiKV. type TiKVDriver struct { + keyspaceName string pdConfig config.PDClient security config.Security tikvConfig config.TiKVClient @@ -117,7 +118,7 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (kv.Storage, mc.Lock() defer mc.Unlock() d.setDefaultAndOptions(options...) - etcdAddrs, disableGC, err := config.ParsePath(path) + etcdAddrs, disableGC, keyspaceName, err := config.ParsePath(path) if err != nil { return nil, errors.Trace(err) } @@ -157,11 +158,39 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (kv.Storage, return nil, errors.Trace(err) } - pdClient := tikv.CodecPDClient{Client: pdCli} - s, err := tikv.NewKVStore(uuid, &pdClient, spkv, tikv.NewRPCClient(tikv.WithSecurity(d.security))) + // ---------------- keyspace logic ---------------- + var ( + pdClient *tikv.CodecPDClient + ) + + if keyspaceName == "" { + logutil.BgLogger().Info("using API V1.") + pdClient = tikv.NewCodecPDClient(tikv.ModeTxn, pdCli) + } else { + logutil.BgLogger().Info("using API V2.", zap.String("keyspaceName", keyspaceName)) + pdClient, err = tikv.NewCodecPDClientWithKeyspace(tikv.ModeTxn, pdCli, keyspaceName) + if err != nil { + return nil, errors.Trace(err) + } + // If there's setting keyspace-name, then skipped GC worker logic. + // It needs a group of special tidb nodes to execute GC worker logic. + // TODO: remove this restriction while merged keyspace GC worker logic. + disableGC = true + } + + codec := pdClient.GetCodec() + + rpcClient := tikv.NewRPCClient( + tikv.WithSecurity(d.security), + tikv.WithCodec(codec), + ) + + s, err := tikv.NewKVStore(uuid, pdClient, spkv, rpcClient) if err != nil { return nil, errors.Trace(err) } + + // ---------------- keyspace logic ---------------- if d.txnLocalLatches.Enabled { s.EnableTxnLocalLatches(d.txnLocalLatches.Capacity) } @@ -178,6 +207,7 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (kv.Storage, memCache: kv.NewCacheDB(), enableGC: !disableGC, coprStore: coprStore, + codec: codec, } mc.cache[uuid] = store @@ -192,6 +222,7 @@ type tikvStore struct { enableGC bool gcWorker *gcworker.GCWorker coprStore *copr.Store + codec tikv.Codec } // Name gets the name of the storage engine @@ -343,3 +374,7 @@ func (s *tikvStore) GetLockWaits() ([]*deadlockpb.WaitForEntry, error) { } return result, nil } + +func (s *tikvStore) GetCodec() tikv.Codec { + return s.codec +} diff --git a/store/helper/helper.go b/store/helper/helper.go index 8d584b216d9ad..c4fe7c7cc38f0 100644 --- a/store/helper/helper.go +++ b/store/helper/helper.go @@ -78,6 +78,7 @@ type Storage interface { Closed() <-chan struct{} GetMinSafeTS(txnScope string) uint64 GetLockWaits() ([]*deadlockpb.WaitForEntry, error) + GetCodec() tikv.Codec } // Helper is a middleware to get some information from tikv/pd. It can be used for TiDB's http api or mem table. diff --git a/store/mockstore/mockstorage/storage.go b/store/mockstore/mockstorage/storage.go index a85b46166631f..6a05a78fef0ff 100644 --- a/store/mockstore/mockstorage/storage.go +++ b/store/mockstore/mockstorage/storage.go @@ -117,6 +117,12 @@ func (s *mockStorage) Close() error { return s.KVStore.Close() } +func (s *mockStorage) GetCodec() tikv.Codec { + pdClient := s.KVStore.GetPDClient() + pdCodecCli := tikv.NewCodecPDClient(tikv.ModeTxn, pdClient) + return pdCodecCli.GetCodec() +} + // MockLockWaitSetter is used to set the mocked lock wait information, which helps implementing tests that uses the // GetLockWaits function. type MockLockWaitSetter interface { diff --git a/store/store.go b/store/store.go index d4b51f025d824..cbc91a4fce259 100644 --- a/store/store.go +++ b/store/store.go @@ -20,6 +20,7 @@ import ( "sync" "github.com/pingcap/errors" + "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/tidb/kv" "github.com/pingcap/tidb/util" "github.com/pingcap/tidb/util/logutil" @@ -74,7 +75,7 @@ func newStoreWithRetry(path string, maxRetries int) (kv.Storage, error) { err = util.RunWithRetry(maxRetries, util.RetryInterval, func() (bool, error) { logutil.BgLogger().Info("new store", zap.String("path", path)) s, err = d.Open(path) - return kv.IsTxnRetryableError(err), err + return isNewStoreRetryableError(err), err }) if err == nil { @@ -91,3 +92,32 @@ func loadDriver(name string) (kv.Driver, bool) { d, ok := stores[name] return d, ok } + +// isOpenRetryableError check if the new store operation should be retried under given error +// currently, it should be retried if: +// +// Transaction conflict and is retryable (kv.IsTxnRetryableError) +// PD is not bootstrapped at the time of request +// Keyspace requested does not exist (request prior to PD keyspace pre-split) +func isNewStoreRetryableError(err error) bool { + if err == nil { + return false + } + return kv.IsTxnRetryableError(err) || IsNotBootstrappedError(err) || IsKeyspaceNotExistError(err) +} + +// IsNotBootstrappedError returns true if the error is pd not bootstrapped error. +func IsNotBootstrappedError(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), pdpb.ErrorType_NOT_BOOTSTRAPPED.String()) +} + +// IsKeyspaceNotExistError returns true the error is caused by keyspace not exists. +func IsKeyspaceNotExistError(err error) bool { + if err == nil { + return false + } + return strings.Contains(err.Error(), pdpb.ErrorType_ENTRY_NOT_FOUND.String()) +} diff --git a/tidb-server/main.go b/tidb-server/main.go index 05e16e67db8be..41050db2e1f10 100644 --- a/tidb-server/main.go +++ b/tidb-server/main.go @@ -124,6 +124,7 @@ const ( nmInitializeInsecure = "initialize-insecure" nmInitializeSQLFile = "initialize-sql-file" nmDisconnectOnExpiredPassword = "disconnect-on-expired-password" + nmKeyspaceName = "keyspace-name" ) var ( @@ -172,6 +173,7 @@ var ( initializeInsecure = flagBoolean(nmInitializeInsecure, true, "bootstrap tidb-server in insecure mode") initializeSQLFile = flag.String(nmInitializeSQLFile, "", "SQL file to execute on first bootstrap") disconnectOnExpiredPassword = flagBoolean(nmDisconnectOnExpiredPassword, true, "the server disconnects the client when the password is expired") + keyspaceName = flag.String(nmKeyspaceName, "", "keyspace name.") ) func main() { @@ -214,8 +216,11 @@ func main() { printInfo() setupBinlogClient() setupMetrics() + + keyspaceName := config.GetGlobalKeyspaceName() + resourcemanager.GlobalResourceManager.Start() - storage, dom := createStoreAndDomain() + storage, dom := createStoreAndDomain(keyspaceName) svr := createServer(storage, dom) // Register error API is not thread-safe, the caller MUST NOT register errors after initialization. @@ -306,9 +311,14 @@ func registerMetrics() { } } -func createStoreAndDomain() (kv.Storage, *domain.Domain) { +func createStoreAndDomain(keyspaceName string) (kv.Storage, *domain.Domain) { cfg := config.GetGlobalConfig() - fullPath := fmt.Sprintf("%s://%s", cfg.Store, cfg.Path) + var fullPath string + if keyspaceName == "" { + fullPath = fmt.Sprintf("%s://%s", cfg.Store, cfg.Path) + } else { + fullPath = fmt.Sprintf("%s://%s?keyspaceName=%s", cfg.Store, cfg.Path, keyspaceName) + } var err error storage, err := kvstore.New(fullPath) terror.MustNil(err) @@ -565,6 +575,10 @@ func overrideConfig(cfg *config.Config) { } cfg.InitializeSQLFile = *initializeSQLFile } + + if actualFlags[nmKeyspaceName] { + cfg.KeyspaceName = *keyspaceName + } } func setVersions() { diff --git a/util/etcd/BUILD.bazel b/util/etcd/BUILD.bazel index 65b3e7a016047..f832e254d74a7 100644 --- a/util/etcd/BUILD.bazel +++ b/util/etcd/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "@com_github_pingcap_errors//:errors", "@io_etcd_go_etcd_client_v3//:client", + "@io_etcd_go_etcd_client_v3//namespace", ], ) diff --git a/util/etcd/etcd.go b/util/etcd/etcd.go index 6735adbb9c12a..167889849bf36 100644 --- a/util/etcd/etcd.go +++ b/util/etcd/etcd.go @@ -24,6 +24,7 @@ import ( "github.com/pingcap/errors" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/client/v3/namespace" ) // Node organizes the ectd query result as a Trie tree @@ -333,3 +334,10 @@ func keyWithPrefix(prefix, key string) string { return path.Join(prefix, key) } + +// SetEtcdCliByNamespace is used to add an etcd namespace prefix before etcd path. +func SetEtcdCliByNamespace(cli *clientv3.Client, namespacePrefix string) { + cli.KV = namespace.NewKV(cli.KV, namespacePrefix) + cli.Watcher = namespace.NewWatcher(cli.Watcher, namespacePrefix) + cli.Lease = namespace.NewLease(cli.Lease, namespacePrefix) +} diff --git a/util/etcd/etcd_test.go b/util/etcd/etcd_test.go index c99b43bf46841..f98c1393d5f6c 100644 --- a/util/etcd/etcd_test.go +++ b/util/etcd/etcd_test.go @@ -395,3 +395,30 @@ func testSetup(t *testing.T) (context.Context, *Client, *integration.ClusterV3) etcd := NewClient(cluster.RandClient(), "binlog") return context.Background(), etcd, cluster } + +func testSetupOriginal(t *testing.T) (context.Context, *clientv3.Client, *integration.ClusterV3) { + cluster := integration.NewClusterV3(t, &integration.ClusterConfig{Size: 1}) + return context.Background(), cluster.RandClient(), cluster +} + +func TestSetEtcdCliByNamespace(t *testing.T) { + integration.BeforeTest(t) + ctx, origEtcdCli, etcdMockCluster := testSetupOriginal(t) + defer etcdMockCluster.Terminate(t) + + namespacePrefix := "testNamespace/" + key := "testkey" + obj := "test" + + unprefixedKV := origEtcdCli.KV + cliNamespace := origEtcdCli + SetEtcdCliByNamespace(cliNamespace, namespacePrefix) + + _, err := cliNamespace.Put(ctx, key, obj) + require.NoError(t, err) + + // verify that kv pair is empty before set + getResp, err := unprefixedKV.Get(ctx, namespacePrefix+key) + require.NoError(t, err) + require.Len(t, getResp.Kvs, 1) +} diff --git a/util/mock/store.go b/util/mock/store.go index 3e5784fdb4d5a..ea7ca8e55fa3f 100644 --- a/util/mock/store.go +++ b/util/mock/store.go @@ -80,3 +80,8 @@ func (*Store) GetMinSafeTS(_ string) uint64 { func (*Store) GetLockWaits() ([]*deadlockpb.WaitForEntry, error) { return nil, nil } + +// GetCodec implements kv.Storage interface. +func (*Store) GetCodec() tikv.Codec { + return nil +}