Skip to content

Commit

Permalink
fix: prevent retention service from hanging (#25064)
Browse files Browse the repository at this point in the history
Fix issue that can cause the retention service to hang waiting on a
`Shard.Close` call. When this occurs, no other shards will be deleted
by the retention service. This is usually noticed as an increase in
disk usage because old shards are not cleaned up.

The fix adds to new methods to `Store`, `SetShardNewReadersBlocked`
and `InUse`. `InUse` can be used to poll if a shard has active readers,
which the retention service uses to skip over in-use shards to prevent
the service from hanging. `SetShardNewReadersBlocked` determines if
new read access may be granted to a shard. This is required to prevent
race conditions around the use of `InUse` and the deletion of shards.

If the retention service skips over a shard because it is in-use, the
shard will be checked again the next time the retention service is run.
It can be deleted on subsequent checks if it is no longer in-use. If
the shards is stuck in-use, the retention service will not be able to
delete the shards, which can be observed in the logs for manual
intervention. Other shards can still be deleted by the retention service
even if a shard is stuck with readers.

closes: #25063
(cherry picked from commit b4bd607)
  • Loading branch information
gwossum committed Jun 13, 2024
1 parent 9c84e6c commit c7a591b
Show file tree
Hide file tree
Showing 14 changed files with 696 additions and 119 deletions.
4 changes: 2 additions & 2 deletions cmd/influx_tools/compact/command.go
Original file line number Diff line number Diff line change
Expand Up @@ -276,12 +276,12 @@ func (sc *shardCompactor) NextGeneration() int {
panic("not implemented")
}

func (sc *shardCompactor) TSMReader(path string) *tsm1.TSMReader {
func (sc *shardCompactor) TSMReader(path string) (*tsm1.TSMReader, error) {
r := sc.files[path]
if r != nil {
r.Ref()
}
return r
return r, nil
}

func (sc *shardCompactor) String() string {
Expand Down
76 changes: 42 additions & 34 deletions internal/tsdb_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,40 +14,42 @@ import (

// TSDBStoreMock is a mockable implementation of tsdb.Store.
type TSDBStoreMock struct {
BackupShardFn func(id uint64, since time.Time, w io.Writer) error
BackupSeriesFileFn func(database string, w io.Writer) error
ExportShardFn func(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error
CloseFn func() error
CreateShardFn func(database, policy string, shardID uint64, enabled bool) error
CreateShardSnapshotFn func(id uint64) (string, error)
DatabasesFn func() []string
DeleteDatabaseFn func(name string) error
DeleteMeasurementFn func(database, name string) error
DeleteRetentionPolicyFn func(database, name string) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShardFn func(id uint64) error
DiskSizeFn func() (int64, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
ImportShardFn func(id uint64, r io.Reader) error
MeasurementSeriesCountsFn func(database string) (measuments int, series int)
MeasurementsCardinalityFn func(database string) (int64, error)
MeasurementNamesFn func(auth query.FineAuthorizer, database string, cond influxql.Expr) ([][]byte, error)
OpenFn func() error
PathFn func() string
RestoreShardFn func(id uint64, r io.Reader) error
SeriesCardinalityFn func(database string) (int64, error)
SetShardEnabledFn func(shardID uint64, enabled bool) error
ShardFn func(id uint64) *tsdb.Shard
ShardGroupFn func(ids []uint64) tsdb.ShardGroup
ShardIDsFn func() []uint64
ShardNFn func() int
ShardRelativePathFn func(id uint64) (string, error)
ShardsFn func(ids []uint64) []*tsdb.Shard
StatisticsFn func(tags map[string]string) []models.Statistic
TagKeysFn func(auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValuesFn func(auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
WithLoggerFn func(log *zap.Logger)
WriteToShardFn func(ctx tsdb.WriteContext, shardID uint64, points []models.Point) error
BackupShardFn func(id uint64, since time.Time, w io.Writer) error
BackupSeriesFileFn func(database string, w io.Writer) error
ExportShardFn func(id uint64, ExportStart time.Time, ExportEnd time.Time, w io.Writer) error
CloseFn func() error
CreateShardFn func(database, policy string, shardID uint64, enabled bool) error
CreateShardSnapshotFn func(id uint64) (string, error)
DatabasesFn func() []string
DeleteDatabaseFn func(name string) error
DeleteMeasurementFn func(database, name string) error
DeleteRetentionPolicyFn func(database, name string) error
DeleteSeriesFn func(database string, sources []influxql.Source, condition influxql.Expr) error
DeleteShardFn func(id uint64) error
DiskSizeFn func() (int64, error)
ExpandSourcesFn func(sources influxql.Sources) (influxql.Sources, error)
ImportShardFn func(id uint64, r io.Reader) error
MeasurementSeriesCountsFn func(database string) (measuments int, series int)
MeasurementsCardinalityFn func(database string) (int64, error)
MeasurementNamesFn func(auth query.FineAuthorizer, database string, cond influxql.Expr) ([][]byte, error)
OpenFn func() error
PathFn func() string
RestoreShardFn func(id uint64, r io.Reader) error
SeriesCardinalityFn func(database string) (int64, error)
SetShardEnabledFn func(shardID uint64, enabled bool) error
SetShardNewReadersBlockedFn func(shardID uint64, blocked bool) error
ShardFn func(id uint64) *tsdb.Shard
ShardGroupFn func(ids []uint64) tsdb.ShardGroup
ShardIDsFn func() []uint64
ShardInUseFn func(shardID uint64) (bool, error)
ShardNFn func() int
ShardRelativePathFn func(id uint64) (string, error)
ShardsFn func(ids []uint64) []*tsdb.Shard
StatisticsFn func(tags map[string]string) []models.Statistic
TagKeysFn func(auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagKeys, error)
TagValuesFn func(auth query.FineAuthorizer, shardIDs []uint64, cond influxql.Expr) ([]tsdb.TagValues, error)
WithLoggerFn func(log *zap.Logger)
WriteToShardFn func(ctx tsdb.WriteContext, shardID uint64, points []models.Point) error
}

func (s *TSDBStoreMock) BackupShard(id uint64, since time.Time, w io.Writer) error {
Expand Down Expand Up @@ -117,6 +119,9 @@ func (s *TSDBStoreMock) SeriesCardinality(ctx context.Context, database string)
func (s *TSDBStoreMock) SetShardEnabled(shardID uint64, enabled bool) error {
return s.SetShardEnabledFn(shardID, enabled)
}
func (s *TSDBStoreMock) SetShardNewReadersBlocked(shardID uint64, blocked bool) error {
return s.SetShardNewReadersBlockedFn(shardID, blocked)
}
func (s *TSDBStoreMock) Shard(id uint64) *tsdb.Shard {
return s.ShardFn(id)
}
Expand All @@ -126,6 +131,9 @@ func (s *TSDBStoreMock) ShardGroup(ids []uint64) tsdb.ShardGroup {
func (s *TSDBStoreMock) ShardIDs() []uint64 {
return s.ShardIDsFn()
}
func (s *TSDBStoreMock) ShardInUse(shardID uint64) (bool, error) {
return s.ShardInUseFn(shardID)
}
func (s *TSDBStoreMock) ShardN() int {
return s.ShardNFn()
}
Expand Down
165 changes: 95 additions & 70 deletions services/retention/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@ type Service struct {
TSDBStore interface {
ShardIDs() []uint64
DeleteShard(shardID uint64) error

SetShardNewReadersBlocked(shardID uint64, blocked bool) error
ShardInUse(shardID uint64) (bool, error)
}

// DropShardRef is a function that takes a shard ID and removes the
Expand Down Expand Up @@ -131,18 +134,6 @@ func (s *Service) DeletionCheck() {
}
deletedShardIDs := make(map[uint64]deletionInfo)

dropShardMetaRef := func(id uint64, info deletionInfo) error {
if err := s.DropShardMetaRef(id, info.owners); err != nil {
log.Error("Failed to drop shard meta reference",
logger.Database(info.db),
logger.Shard(id),
logger.RetentionPolicy(info.rp),
zap.Error(err))
return err
}
return nil
}

// Mark down if an error occurred during this function so we can inform the
// user that we will try again on the next interval.
// Without the message, they may see the error message and assume they
Expand All @@ -160,25 +151,26 @@ func (s *Service) DeletionCheck() {

// Determine all shards that have expired and need to be deleted.
for _, g := range r.ExpiredShardGroups(time.Now().UTC()) {
if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
log.Info("Failed to delete shard group",
logger.Database(d.Name),
logger.ShardGroup(g.ID),
logger.RetentionPolicy(r.Name),
zap.Error(err))
retryNeeded = true
continue
}
func() {
log, logEnd := logger.NewOperation(log, "Deleting expired shard group", "retention_delete_expired_shard_group",
logger.Database(d.Name), logger.ShardGroup(g.ID), logger.RetentionPolicy(r.Name))
defer logEnd()
if err := s.MetaClient.DeleteShardGroup(d.Name, r.Name, g.ID); err != nil {
log.Error("Failed to delete shard group", zap.Error(err))
retryNeeded = true
return
}

log.Info("Deleted shard group",
logger.Database(d.Name),
logger.ShardGroup(g.ID),
logger.RetentionPolicy(r.Name))
log.Info("Deleted shard group")

// Store all the shard IDs that may possibly need to be removed locally.
for _, sh := range g.Shards {
deletedShardIDs[sh.ID] = newDeletionInfo(d.Name, r.Name, sh)
}
// Store all the shard IDs that may possibly need to be removed locally.
groupShards := make([]uint64, len(g.Shards))
for _, sh := range g.Shards {
groupShards = append(groupShards, sh.ID)
deletedShardIDs[sh.ID] = newDeletionInfo(d.Name, r.Name, sh)
}
log.Info("Group's shards will be removed from local storage if found", zap.Uint64s("shards", groupShards))
}()
}
}
}
Expand All @@ -187,58 +179,91 @@ func (s *Service) DeletionCheck() {
for _, id := range s.TSDBStore.ShardIDs() {
if info, ok := deletedShardIDs[id]; ok {
delete(deletedShardIDs, id)
log.Info("Attempting deletion of shard from store",
logger.Database(info.db),
logger.Shard(id),
logger.RetentionPolicy(info.rp))
if err := s.TSDBStore.DeleteShard(id); err != nil {
log.Error("Failed to delete shard",
logger.Database(info.db),
logger.Shard(id),
logger.RetentionPolicy(info.rp),
zap.Error(err))
if errors.Is(err, tsdb.ErrShardNotFound) {
// At first you wouldn't think this could happen, we're iterating over shards
// in the store. However, if this has been a very long running operation the
// shard could have been dropped from the store while we were working on other shards.
log.Warn("Shard does not exist in store, continuing retention removal",
logger.Database(info.db),
logger.Shard(id),
logger.RetentionPolicy(info.rp))
} else {
retryNeeded = true
continue

err := func() (rErr error) {
log, logEnd := logger.NewOperation(log, "Deleting shard from shard group deleted based on retention policy", "retention_delete_shard",
logger.Database(info.db), logger.Shard(id), logger.RetentionPolicy(info.rp))
defer func() {
if rErr != nil {
// Log the error before logEnd().
log.Error("Error deleting shard", zap.Error(rErr))
}
logEnd()
}()

// Block new readers for shard and check if it is in-use before deleting. This is to prevent
// an issue where a shard that is stuck in-use will block the retention service.
if err := s.TSDBStore.SetShardNewReadersBlocked(id, true); err != nil {
return fmt.Errorf("error blocking new readers for shard: %w", err)
}
}
log.Info("Deleted shard",
logger.Database(info.db),
logger.Shard(id),
logger.RetentionPolicy(info.rp))
if err := dropShardMetaRef(id, info); err != nil {
// removeShardMetaReference already logged the error.
defer func() {
if rErr != nil && !errors.Is(rErr, tsdb.ErrShardNotFound) {
log.Info("Unblocking new readers for shard after delete failed")
if unblockErr := s.TSDBStore.SetShardNewReadersBlocked(id, false); unblockErr != nil {
log.Error("Error unblocking new readers for shard", zap.Error(unblockErr))
}
}
}()

// We should only try to delete shards that are not in-use.
if inUse, err := s.TSDBStore.ShardInUse(id); err != nil {
return fmt.Errorf("error checking if shard is in-use: %w", err)
} else if inUse {
return errors.New("can not delete an in-use shard")
}

// Now it's time to delete the shard
if err := s.TSDBStore.DeleteShard(id); err != nil {
return fmt.Errorf("error deleting shard from store: %w", err)
}
log.Info("Deleted shard")
return nil
}()
// Check for error deleting the shard from the TSDB. Continue onto DropShardMetaRef if the
// error was tsdb.ErrShardNotFound. We got here because the shard was in the metadata,
// but it wasn't really in the store, so try deleting it out of the metadata.
if err != nil && !errors.Is(err, tsdb.ErrShardNotFound) {
// Logging of error was handled by the lambda in a defer so that it is within
// the operation instead of after the operation.
retryNeeded = true
continue
}

func() {
log, logEnd := logger.NewOperation(log, "Dropping shard meta references", "retention_drop_refs",
logger.Database(info.db), logger.Shard(id), logger.RetentionPolicy(info.rp), zap.Uint64s("owners", info.owners))
defer logEnd()
if err := s.DropShardMetaRef(id, info.owners); err != nil {
log.Error("Error dropping shard meta reference", zap.Error(err))
retryNeeded = true
return
}
}()
}
}

// Check for expired phantom shards that exist in the metadata but not in the store.
for id, info := range deletedShardIDs {
log.Error("Expired phantom shard detected during retention check, removing from metadata",
logger.Database(info.db),
logger.Shard(id),
logger.RetentionPolicy(info.rp))
if err := dropShardMetaRef(id, info); err != nil {
// removeShardMetaReference already logged the error.
retryNeeded = true
continue
}
func() {
log, logEnd := logger.NewOperation(log, "Drop phantom shard references", "retention_drop_phantom_refs",
logger.Database(info.db), logger.Shard(id), logger.RetentionPolicy(info.rp), zap.Uint64s("owners", info.owners))
defer logEnd()
log.Warn("Expired phantom shard detected during retention check, removing from metadata")
if err := s.DropShardMetaRef(id, info.owners); err != nil {
log.Error("Error dropping shard meta reference for phantom shard", zap.Error(err))
retryNeeded = true
}
}()
}

if err := s.MetaClient.PruneShardGroups(); err != nil {
log.Info("Problem pruning shard groups", zap.Error(err))
retryNeeded = true
}
func() {
log, logEnd := logger.NewOperation(log, "Pruning shard groups after retention check", "retention_prune_shard_groups")
defer logEnd()
if err := s.MetaClient.PruneShardGroups(); err != nil {
log.Error("Error pruning shard groups", zap.Error(err))
retryNeeded = true
}
}()

if retryNeeded {
log.Info("One or more errors occurred during shard deletion and will be retried on the next check", logger.DurationLiteral("check_interval", time.Duration(s.config.CheckInterval)))
Expand Down
Loading

0 comments on commit c7a591b

Please sign in to comment.