Skip to content

Commit

Permalink
feat: Bootstrap metastore for wal segments (#13550)
Browse files Browse the repository at this point in the history
  • Loading branch information
cyriltovena committed Jul 19, 2024
1 parent 960c034 commit 0b47498
Show file tree
Hide file tree
Showing 197 changed files with 91,730 additions and 3,507 deletions.
30 changes: 30 additions & 0 deletions docs/sources/shared/configuration.md
Original file line number Diff line number Diff line change
Expand Up @@ -845,6 +845,33 @@ compactor_grpc_client:
# a ring unless otherwise specified in the component's configuration section.
[memberlist: <memberlist>]

metastore:
# CLI flag: -metastore.data-dir
[data_dir: <string> | default = "./data-metastore/data"]

raft:
# CLI flag: -metastore.raft.dir
[dir: <string> | default = "./data-metastore/raft"]

# CLI flag: -metastore.raft.bootstrap-peers
[bootstrap_peers: <list of strings> | default = []]

# CLI flag: -metastore.raft.server-id
[server_id: <string> | default = "localhost:9099"]

# CLI flag: -metastore.raft.bind-address
[bind_address: <string> | default = "localhost:9099"]

# CLI flag: -metastore.raft.advertise-address
[advertise_address: <string> | default = "localhost:9099"]

metastore_client:
# CLI flag: -metastore.address
[address: <string> | default = "localhost:9095"]

# Configures the gRPC client used to communicate with the metastore.
[grpc_client_config: <grpc_client>]

# Configuration for 'runtime config' module, responsible for reloading runtime
# configuration file.
[runtime_config: <runtime_config>]
Expand Down Expand Up @@ -2598,6 +2625,8 @@ The `frontend_worker` configures the worker - running within the Loki querier -
# Configures the querier gRPC client used to communicate with the
# query-scheduler. This can't be used in conjunction with 'grpc_client_config'.
# The CLI flags prefix for this block configuration is:
# metastore.grpc-client-config
[query_scheduler_grpc_client: <grpc_client>]
```

Expand Down Expand Up @@ -2656,6 +2685,7 @@ The `grpc_client` block configures the gRPC client used to communicate between a
- `frontend.grpc-client-config`
- `ingester-rf1.client`
- `ingester.client`
- `metastore.grpc-client-config`
- `pattern-ingester.client`
- `querier.frontend-client`
- `querier.frontend-grpc-client`
Expand Down
8 changes: 7 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ require (
github.com/grafana/jsonparser v0.0.0-20240209175146-098958973a2d
github.com/grafana/loki/pkg/push v0.0.0-20231124142027-e52380921608
github.com/hashicorp/golang-lru/v2 v2.0.7
github.com/hashicorp/raft v1.7.0
github.com/hashicorp/raft-wal v0.4.1
github.com/heroku/x v0.0.61
github.com/influxdata/tdigest v0.0.2-0.20210216194612-fc98d27c9e8b
github.com/mitchellh/colorstring v0.0.0-20190213212951-d06e56a500db
Expand All @@ -151,10 +153,14 @@ require (
)

require (
github.com/benbjohnson/immutable v0.4.0 // indirect
github.com/containerd/containerd v1.7.20 // indirect
github.com/coreos/etcd v3.3.27+incompatible // indirect
github.com/coreos/pkg v0.0.0-20220810130054-c7d1c02cb6cf // indirect
github.com/dlclark/regexp2 v1.4.0 // indirect
github.com/go-kit/kit v0.12.0 // indirect
github.com/go-ole/go-ole v1.2.6 // indirect
github.com/hashicorp/go-msgpack/v2 v2.1.1 // indirect
github.com/lufia/plan9stats v0.0.0-20211012122336-39d0f177ccd0 // indirect
github.com/pires/go-proxyproto v0.7.0 // indirect
github.com/power-devops/perfstat v0.0.0-20210106213030-5aafc221ea8c // indirect
Expand Down Expand Up @@ -266,7 +272,7 @@ require (
github.com/hashicorp/go-cleanhttp v0.5.2 // indirect
github.com/hashicorp/go-hclog v1.6.3 // indirect
github.com/hashicorp/go-immutable-radix v1.3.1 // indirect
github.com/hashicorp/go-msgpack v0.5.5 // indirect
github.com/hashicorp/go-msgpack v1.1.5 // indirect
github.com/hashicorp/go-multierror v1.1.1 // indirect
github.com/hashicorp/go-retryablehttp v0.7.7 // indirect
github.com/hashicorp/go-rootcerts v1.0.2 // indirect
Expand Down
18 changes: 16 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -388,6 +388,8 @@ github.com/benbjohnson/clock v1.0.3/go.mod h1:bGMdMPoPVvcYyt1gHDf4J2KE153Yf9BuiU
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/clock v1.3.5 h1:VvXlSJBzZpA/zum6Sj74hxwYI2DIxRWuNIoXAzHZz5o=
github.com/benbjohnson/clock v1.3.5/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/benbjohnson/immutable v0.4.0 h1:CTqXbEerYso8YzVPxmWxh2gnoRQbbB9X1quUC8+vGZA=
github.com/benbjohnson/immutable v0.4.0/go.mod h1:iAr8OjJGLnLmVUr9MZ/rz4PWUy6Ouc2JLYuMArmvAJM=
github.com/beorn7/perks v0.0.0-20180321164747-3a771d992973/go.mod h1:Dwedo/Wpr24TaqPxmxbtue+5NUziq4I4S80YR8gNf3Q=
github.com/beorn7/perks v1.0.0/go.mod h1:KWe93zE9D1o94FZ5RNwFwVgaQK1VOXiVxmqh+CedLV8=
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
Expand Down Expand Up @@ -465,6 +467,8 @@ github.com/coredns/coredns v1.6.6/go.mod h1:Bdcnka9HmKGYj12ZIDF3lpQSfDHSsMc85Wj9
github.com/coredns/federation v0.0.0-20190818181423-e032b096babe/go.mod h1:MoqTEFX8GlnKkyq8eBCF94VzkNAOgjdlCJ+Pz/oCLPk=
github.com/coreos/bbolt v1.3.2/go.mod h1:iRUV2dpdMOn7Bo10OQBFzIJO9kkE559Wcmn+qkEiiKk=
github.com/coreos/etcd v3.3.10+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/etcd v3.3.27+incompatible h1:QIudLb9KeBsE5zyYxd1mjzRSkzLg9Wf9QlRwFgd6oTA=
github.com/coreos/etcd v3.3.27+incompatible/go.mod h1:uF7uidLiAD3TWHmW31ZFd/JWoc32PjwdhPthX9715RE=
github.com/coreos/go-etcd v2.0.0+incompatible/go.mod h1:Jez6KQU2B/sWsbdaef3ED8NzMklzPG4d5KIOhIy30Tk=
github.com/coreos/go-oidc v2.1.0+incompatible/go.mod h1:CgnwVTmzoESiwO9qyAFEMiHoZ1nMCKZlZ9V6mm3/LKc=
github.com/coreos/go-semver v0.2.0/go.mod h1:nnelYz7RCh+5ahJtPPxZlU+153eP4D4r3EedlOD2RNk=
Expand All @@ -480,6 +484,8 @@ github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8
github.com/coreos/go-systemd/v22 v22.5.0/go.mod h1:Y58oyj3AT4RCenI/lSvhwexgC+NSVTIJ3seZv2GcEnc=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20220810130054-c7d1c02cb6cf h1:GOPo6vn/vTN+3IwZBvXX0y5doJfSC7My0cdzelyOCsQ=
github.com/coreos/pkg v0.0.0-20220810130054-c7d1c02cb6cf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/couchbase/go-couchbase v0.0.0-20180501122049-16db1f1fe037/go.mod h1:TWI8EKQMs5u5jLKW/tsb9VwauIrMIxQG1r5fMsswK5U=
github.com/couchbase/gomemcached v0.0.0-20180502221210-0da75df14530/go.mod h1:srVSlQLB8iXBVXHgnqemxUXqN6FCvClgCMPCsjBDR7c=
github.com/couchbase/goutils v0.0.0-20180530154633-e865a1461c8a/go.mod h1:BQwMFlJzDjFDG3DJUdU0KORxn88UlsOULuxLExMh3Hs=
Expand Down Expand Up @@ -1098,9 +1104,12 @@ github.com/hashicorp/go-immutable-radix v1.3.1/go.mod h1:0y9vanUI8NX6FsYoO3zeMjh
github.com/hashicorp/go-kms-wrapping/entropy v0.1.0/go.mod h1:d1g9WGtAunDNpek8jUIEJnBlbgKS1N2Q61QkHiZyR1g=
github.com/hashicorp/go-memdb v1.3.4/go.mod h1:uBTr1oQbtuMgd1SSGoR8YV27eT3sBHbYiNm53bMpgSg=
github.com/hashicorp/go-msgpack v0.5.3/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-msgpack v0.5.5 h1:i9R9JSrqIz0QVLz3sz+i3YJdT7TTSLcfLLzJi9aZTuI=
github.com/hashicorp/go-msgpack v0.5.5/go.mod h1:ahLV/dePpqEmjfWmKiqvPkv/twdG7iPBM1vqhUKIvfM=
github.com/hashicorp/go-msgpack v1.1.5 h1:9byZdVjKTe5mce63pRVNP1L7UAmdHOTEMGehn6KvJWs=
github.com/hashicorp/go-msgpack v1.1.5/go.mod h1:gWVc3sv/wbDmR3rQsj1CAktEZzoz1YNK9NfGLXJ69/4=
github.com/hashicorp/go-msgpack/v2 v2.0.0/go.mod h1:JIxYkkFJRDDRSoWQBSh7s9QAVThq+82iWmUpmE4jKak=
github.com/hashicorp/go-msgpack/v2 v2.1.1 h1:xQEY9yB2wnHitoSzk/B9UjXWRQ67QKu5AOm8aFp8N3I=
github.com/hashicorp/go-msgpack/v2 v2.1.1/go.mod h1:upybraOAblm4S7rx0+jeNy+CWWhzywQsSRV5033mMu4=
github.com/hashicorp/go-multierror v1.0.0/go.mod h1:dHtQlpGsu+cZNNAkkCN/P3hoUDHhCYQXV3UM06sGGrk=
github.com/hashicorp/go-multierror v1.1.0/go.mod h1:spPvp8C1qA32ftKqdAHm4hHTbPw+vmowP0z+KUhOZdA=
github.com/hashicorp/go-multierror v1.1.1 h1:H5DkEtf6CXdFp0N0Em5UCwQpXMWke8IA0+lD48awMYo=
Expand Down Expand Up @@ -1156,11 +1165,15 @@ github.com/hashicorp/raft v1.1.0/go.mod h1:4Ak7FSPnuvmb0GV6vgIAJ4vYT4bek9bb6Q+7H
github.com/hashicorp/raft v1.1.1/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.2.0/go.mod h1:vPAJM8Asw6u8LxC3eJCUZmRP/E4QmUGE1R7g7k8sG/8=
github.com/hashicorp/raft v1.3.11/go.mod h1:J8naEwc6XaaCfts7+28whSeRvCqTd6e20BlCU3LtEO4=
github.com/hashicorp/raft v1.7.0 h1:4u24Qn6lQ6uwziM++UgsyiT64Q8GyRn43CV41qPiz1o=
github.com/hashicorp/raft v1.7.0/go.mod h1:N1sKh6Vn47mrWvEArQgILTyng8GoDRNYlgKyK7PMjs0=
github.com/hashicorp/raft-autopilot v0.1.6/go.mod h1:Af4jZBwaNOI+tXfIqIdbcAnh/UyyqIMj/pOISIfhArw=
github.com/hashicorp/raft-boltdb v0.0.0-20171010151810-6e5ba93211ea/go.mod h1:pNv7Wc3ycL6F5oOWn+tPGo2gWD4a5X+yp/ntwdKLjRk=
github.com/hashicorp/raft-boltdb v0.0.0-20210409134258-03c10cc3d4ea/go.mod h1:qRd6nFJYYS6Iqnc/8HcUmko2/2Gw8qTFEmxDLii6W5I=
github.com/hashicorp/raft-boltdb v0.0.0-20211202195631-7d34b9fb3f42/go.mod h1:wcXL8otVu5cpJVLjcmq7pmfdRCdaP+xnvu7WQcKJAhs=
github.com/hashicorp/raft-boltdb/v2 v2.2.2/go.mod h1:N8YgaZgNJLpZC+h+by7vDu5rzsRgONThTEeUS3zWbfY=
github.com/hashicorp/raft-wal v0.4.1 h1:aU8XZ6x8R9BAIB/83Z1dTDtXvDVmv9YVYeXxd/1QBSA=
github.com/hashicorp/raft-wal v0.4.1/go.mod h1:A6vP5o8hGOs1LHfC1Okh9xPwWDcmb6Vvuz/QyqUXlOE=
github.com/hashicorp/serf v0.8.1/go.mod h1:h/Ru6tmZazX7WO/GDmwdpS975F019L4t5ng5IgwbNrE=
github.com/hashicorp/serf v0.8.2/go.mod h1:6hOLApaqBFA1NXqRQAsxw9QxuDEvNxSQRwA/JwenrHc=
github.com/hashicorp/serf v0.10.1 h1:Z1H2J60yRKvfDYAOZLd2MU0ND4AH/WDz7xYHDWQsIPY=
Expand Down Expand Up @@ -2400,8 +2413,9 @@ golang.zx2c4.com/wireguard v0.0.20200121/go.mod h1:P2HsVp8SKwZEufsnezXZA4GRX/T49
golang.zx2c4.com/wireguard/wgctrl v0.0.0-20200205215550-e35592f146e4/go.mod h1:UdS9frhv65KTfwxME1xE8+rHYoFpbm36gOud1GhBe9c=
gonum.org/v1/gonum v0.0.0-20180816165407-929014505bf4/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.0.0-20181121035319-3f7ecaa7e8ca/go.mod h1:Y+Yx5eoAFn32cQvJDxZx5Dpnq+c3wtXuadVZAcxbbBo=
gonum.org/v1/gonum v0.6.2 h1:4r+yNT0+8SWcOkXP+63H2zQbN+USnC73cjGUxnDF94Q=
gonum.org/v1/gonum v0.6.2/go.mod h1:9mxDZsDKxgMAuccQkewq682L+0eCu4dCN2yonUJTCLU=
gonum.org/v1/gonum v0.12.0 h1:xKuo6hzt+gMav00meVPUlXwSdoEJP46BR+wdxQEFK2o=
gonum.org/v1/gonum v0.12.0/go.mod h1:73TDxJfAAHeA8Mk9mf8NlIppyhQNo5GLTcYeqgo2lvY=
gonum.org/v1/netlib v0.0.0-20181029234149-ec6d1f5cefe6/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/netlib v0.0.0-20190313105609-8cb42192e0e0/go.mod h1:wa6Ws7BG/ESfp6dHfk7C6KdzKA7wR7u/rKwOGE66zvw=
gonum.org/v1/plot v0.0.0-20190515093506-e2840ee46a6b/go.mod h1:Wt8AAjI+ypCyYX3nZBvf6cAIx93T+c/OS2HFAYskSZc=
Expand Down
12 changes: 10 additions & 2 deletions pkg/ingester-rf1/flush.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/oklog/ulid"
"golang.org/x/net/context"

"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/storage/wal"
)

Expand Down Expand Up @@ -110,11 +111,18 @@ func (i *Ingester) flushSegment(ctx context.Context, j int, w *wal.SegmentWriter
return err
}

id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader)
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id.String()), buf); err != nil {
id := ulid.MustNew(ulid.Timestamp(time.Now()), rand.Reader).String()
if err := i.store.PutObject(ctx, fmt.Sprintf("loki-v2/wal/anon/"+id), buf); err != nil {
i.metrics.flushFailuresTotal.Inc()
return fmt.Errorf("failed to put object: %w", err)
}

if _, err := i.metastoreClient.AddBlock(ctx, &metastorepb.AddBlockRequest{
Block: w.Meta(id),
}); err != nil {
i.metrics.flushFailuresTotal.Inc()
return fmt.Errorf("metastore add block: %w", err)
}

return nil
}
43 changes: 17 additions & 26 deletions pkg/ingester-rf1/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/opentracing/opentracing-go"

"github.com/grafana/loki/v3/pkg/ingester-rf1/clientpool"
"github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
"github.com/grafana/loki/v3/pkg/ingester-rf1/objstore"
"github.com/grafana/loki/v3/pkg/ingester/index"
"github.com/grafana/loki/v3/pkg/loghttp/push"
Expand Down Expand Up @@ -181,6 +182,7 @@ type Ingester struct {
lifecyclerWatcher *services.FailureWatcher

store Storage
metastoreClient metastorepb.MetastoreServiceClient
periodicConfigs []config.PeriodConfig

loopQuit chan struct{}
Expand Down Expand Up @@ -222,6 +224,7 @@ func New(cfg Config, clientConfig client.Config,
storageConfig storage.Config,
clientMetrics storage.ClientMetrics,
limits Limits, configs *runtime.TenantConfigs,
metastoreClient metastorepb.MetastoreServiceClient,
registerer prometheus.Registerer,
writeFailuresCfg writefailures.Cfg,
metricsNamespace string,
Expand All @@ -247,19 +250,19 @@ func New(cfg Config, clientConfig client.Config,
}

i := &Ingester{
cfg: cfg,
logger: logger,
clientConfig: clientConfig,
tenantConfigs: configs,
instances: map[string]*instance{},
store: storage,
periodicConfigs: periodConfigs,
flushBuffers: make([]*bytes.Buffer, cfg.ConcurrentFlushes),
flushWorkersDone: sync.WaitGroup{},
loopQuit: make(chan struct{}),
tailersQuit: make(chan struct{}),
metrics: metrics,
// flushOnShutdownSwitch: &OnceSwitch{},
cfg: cfg,
logger: logger,
clientConfig: clientConfig,
tenantConfigs: configs,
instances: map[string]*instance{},
store: storage,
periodicConfigs: periodConfigs,
flushBuffers: make([]*bytes.Buffer, cfg.ConcurrentFlushes),
flushWorkersDone: sync.WaitGroup{},
loopQuit: make(chan struct{}),
tailersQuit: make(chan struct{}),
metrics: metrics,
metastoreClient: metastoreClient,
terminateOnShutdown: false,
streamRateCalculator: NewStreamRateCalculator(),
writeLogManager: writefailures.NewManager(logger, registerer, writeFailuresCfg, configs, "ingester_rf1"),
Expand All @@ -285,19 +288,7 @@ func New(cfg Config, clientConfig client.Config,

i.setupAutoForget()

//if i.cfg.ChunkFilterer != nil {
// i.SetChunkFilterer(i.cfg.ChunkFilterer)
//}
//
//if i.cfg.PipelineWrapper != nil {
// i.SetPipelineWrapper(i.cfg.PipelineWrapper)
//}
//
//if i.cfg.SampleExtractorWrapper != nil {
// i.SetExtractorWrapper(i.cfg.SampleExtractorWrapper)
//}
//
//i.recalculateOwnedStreams = newRecalculateOwnedStreams(i.getInstances, i.lifecycler.ID, i.readRing, cfg.OwnedStreamsCheckInterval, util_log.Logger)
// i.recalculateOwnedStreams = newRecalculateOwnedStreams(i.getInstances, i.lifecycler.ID, i.readRing, cfg.OwnedStreamsCheckInterval, util_log.Logger)

return i, nil
}
Expand Down
82 changes: 82 additions & 0 deletions pkg/ingester-rf1/metastore/client/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package metastoreclient

import (
"flag"
"fmt"

"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/services"
"google.golang.org/grpc"

metastorepb "github.com/grafana/loki/v3/pkg/ingester-rf1/metastore/metastorepb"
)

type Config struct {
MetastoreAddress string `yaml:"address"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with the metastore."`
}

func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.MetastoreAddress, "metastore.address", "localhost:9095", "")
cfg.GRPCClientConfig.RegisterFlagsWithPrefix("metastore.grpc-client-config", f)
}

func (cfg *Config) Validate() error {
if cfg.MetastoreAddress == "" {
return fmt.Errorf("metastore.address is required")
}
return cfg.GRPCClientConfig.Validate()
}

type Client struct {
metastorepb.MetastoreServiceClient
service services.Service
conn *grpc.ClientConn
config Config
}

func New(config Config) (c *Client, err error) {
c = &Client{config: config}
c.conn, err = dial(c.config)
if err != nil {
return nil, err
}
c.MetastoreServiceClient = metastorepb.NewMetastoreServiceClient(c.conn)
c.service = services.NewIdleService(nil, c.stopping)
return c, nil
}

func (c *Client) stopping(error) error { return c.conn.Close() }

func (c *Client) Service() services.Service { return c.service }

func dial(cfg Config) (*grpc.ClientConn, error) {
if err := cfg.Validate(); err != nil {
return nil, err
}
options, err := cfg.GRPCClientConfig.DialOption(nil, nil)
if err != nil {
return nil, err
}
// TODO: https://github.com/grpc/grpc-proto/blob/master/grpc/service_config/service_config.proto
options = append(options, grpc.WithDefaultServiceConfig(grpcServiceConfig))
return grpc.Dial(cfg.MetastoreAddress, options...)
}

const grpcServiceConfig = `{
"healthCheckConfig": {
"serviceName": "metastorepb.MetastoreService.RaftLeader"
},
"loadBalancingPolicy":"round_robin",
"methodConfig": [{
"name": [{"service": "metastorepb.MetastoreService"}],
"waitForReady": true,
"retryPolicy": {
"MaxAttempts": 4,
"InitialBackoff": ".01s",
"MaxBackoff": ".01s",
"BackoffMultiplier": 1.0,
"RetryableStatusCodes": [ "UNAVAILABLE" ]
}
}]
}`
33 changes: 33 additions & 0 deletions pkg/ingester-rf1/metastore/health/health.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package health

import (
"github.com/grafana/dskit/services"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
)

type Service interface {
SetServingStatus(string, grpc_health_v1.HealthCheckResponse_ServingStatus)
}

type noopService struct{}

var NoOpService = noopService{}

func (noopService) SetServingStatus(string, grpc_health_v1.HealthCheckResponse_ServingStatus) {}

func NewGRPCHealthService() *GRPCHealthService {
s := health.NewServer()
return &GRPCHealthService{
Server: s,
Service: services.NewIdleService(nil, func(error) error {
s.Shutdown()
return nil
}),
}
}

type GRPCHealthService struct {
services.Service
*health.Server
}
Loading

0 comments on commit 0b47498

Please sign in to comment.