Skip to content

Commit

Permalink
DRY up code
Browse files Browse the repository at this point in the history
  • Loading branch information
aduffeck committed Jun 20, 2023
1 parent a0181a6 commit 0d4a53f
Showing 1 changed file with 77 additions and 179 deletions.
256 changes: 77 additions & 179 deletions pkg/storage/utils/decomposedfs/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,73 +314,19 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
indexPath := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId()+".mpk")
fi, err := os.Stat(indexPath)
if err == nil {
allMatches, err = fs.spaceIDCache.LoadOrStore("by-user-id:"+requestedUserID.GetOpaqueId(), fi.ModTime(), func() (map[string][]byte, error) {
// Acquire a read log on the index file
f, err := lockedfile.Open(indexPath)
if err != nil {
return nil, errors.Wrap(err, "unable to lock user index to read")
}
defer func() {
rerr := f.Close()

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

// Read current state
msgBytes, err := io.ReadAll(f)
if err != nil {
return nil, errors.Wrap(err, "unable to read user index")
}
links := map[string][]byte{}
if len(msgBytes) > 0 {
err = msgpack.Unmarshal(msgBytes, &links)
if err != nil {
return nil, errors.Wrap(err, "unable to parse user index")
}
}
return links, nil
})
allMatches, err = fs.readSpaceIndex(indexPath, "by-user-id:"+requestedUserID.GetOpaqueId(), fi.ModTime())
if err != nil {
return nil, err
return nil, errors.Wrap(err, "error reading user index")
}
} else {
indexPath := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId())
fi, err := os.Stat(indexPath)
dirIndexPath := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId())
fi, err := os.Stat(dirIndexPath)
if err != nil {
return nil, err
}

allMatches, err = fs.spaceIDCache.LoadOrStore("by-user-id:"+requestedUserID.GetOpaqueId(), fi.ModTime(), func() (map[string][]byte, error) {
links := map[string][]byte{}
dirIndexPath := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId())
m, err := filepath.Glob(dirIndexPath + "/*")
if err != nil {
return nil, err
}
for _, match := range m {
link, err := os.Readlink(match)
if err != nil {
continue
}
links[match] = []byte(link)
}

// rewrite index as file
err = fs.writeIndex(indexPath+".mpk", links, nil)
if err != nil {
return nil, err
}
err = os.RemoveAll(dirIndexPath)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("user-id", requestedUserID.GetOpaqueId()).Msg("error migrating by user index, continuing")
}
return links, nil
})
allMatches, err = fs.migrateSpaceIndex(indexPath, dirIndexPath, "by-user-id:"+requestedUserID.GetOpaqueId(), fi.ModTime())
if err != nil {
return nil, err
appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("user-id", requestedUserID.GetOpaqueId()).Msg("error migrating by user index, continuing")
}
}

Expand All @@ -406,73 +352,20 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
indexPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", group+".mpk")
fi, err := os.Stat(indexPath)
if err == nil {
allMatches, err = fs.spaceIDCache.LoadOrStore("by-group-id:"+group, fi.ModTime(), func() (map[string][]byte, error) {
// Acquire a read log on the index file
f, err := lockedfile.Open(indexPath)
if err != nil {
return nil, errors.Wrap(err, "unable to lock group index to read")
}
defer func() {
rerr := f.Close()

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

// Read current state
msgBytes, err := io.ReadAll(f)
if err != nil {
return nil, errors.Wrap(err, "unable to read group index")
}
links := map[string][]byte{}
if len(msgBytes) > 0 {
err = msgpack.Unmarshal(msgBytes, &links)
if err != nil {
return nil, errors.Wrap(err, "unable to parse group index")
}
}
return links, nil
})
allMatches, err = fs.readSpaceIndex(indexPath, "by-group-id:"+group, fi.ModTime())
if err != nil {
return nil, err
return nil, errors.Wrap(err, "error reading group index")
}
} else {
indexPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", group)
fi, err := os.Stat(indexPath)
dirIndexPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", group)
fi, err := os.Stat(dirIndexPath)
if err != nil {
// no spaces for this group
continue
}
allMatches, err = fs.spaceIDCache.LoadOrStore("by-group-id:"+group, fi.ModTime(), func() (map[string][]byte, error) {
links := map[string][]byte{}
dirIndexPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", group)
m, err := filepath.Glob(dirIndexPath + "/*")
if err != nil {
return nil, err
}
for _, match := range m {
link, err := os.Readlink(match)
if err != nil {
continue
}
links[match] = []byte(link)
}

// rewrite index as file
err = fs.writeIndex(indexPath+".mpk", links, nil)
if err != nil {
return nil, err
}
err = os.RemoveAll(dirIndexPath)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("group-id", group).Msg("error migrating by group index, continuing")
}
return links, nil
})
allMatches, err = fs.migrateSpaceIndex(indexPath, dirIndexPath, "by-group-id:"+group, fi.ModTime())
if err != nil {
return nil, err
appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("group-id", group).Msg("error migrating by group index, continuing")
}
}

Expand All @@ -498,75 +391,24 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
}

for spaceType := range spaceTypes {
indexPath := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType+".mpk")
indexPath := filepath.Join(fs.o.Root, "indexes", "by-type:", spaceType+".mpk")

fi, err := os.Stat(indexPath)
if err == nil {
allMatches, err = fs.spaceIDCache.LoadOrStore("by-type:"+spaceType, fi.ModTime(), func() (map[string][]byte, error) {
// Acquire a read log on the index file
f, err := lockedfile.Open(indexPath)
if err != nil {
return nil, errors.Wrap(err, "unable to lock type index to read")
}
defer func() {
rerr := f.Close()

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

// Read current state
msgBytes, err := io.ReadAll(f)
if err != nil {
return nil, errors.Wrap(err, "unable to read type index")
}
links := map[string][]byte{}
if len(msgBytes) > 0 {
err = msgpack.Unmarshal(msgBytes, &links)
if err != nil {
return nil, errors.Wrap(err, "unable to parse type index")
}
}
return links, nil
})
allMatches, err = fs.readSpaceIndex(indexPath, "by-type:"+spaceType, fi.ModTime())
if err != nil {
return nil, err
return nil, errors.Wrap(err, "error reading type index")
}
} else {
indexPath := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType)
fi, err := os.Stat(indexPath)
dirIndexPath := filepath.Join(fs.o.Root, "indexes", "by-type:", spaceType)
fi, err := os.Stat(dirIndexPath)
if err != nil {
// no spaces for this type
continue
}

allMatches, err = fs.spaceIDCache.LoadOrStore("by-type:"+spaceType, fi.ModTime(), func() (map[string][]byte, error) {
dirIndexPath := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType, "*")
m, err := filepath.Glob(dirIndexPath)
if err != nil {
return nil, err
}
links := map[string][]byte{}
for _, match := range m {
link, err := os.Readlink(match)
if err != nil {
continue
}
links[match] = []byte(link)
}
// rewrite index as file
err = fs.writeIndex(indexPath+".mpk", links, nil)
if err != nil {
return nil, err
}
err = os.RemoveAll(dirIndexPath)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("spaceType", spaceType).Msg("error migrating by type index, continuing")
}
return links, nil
})
allMatches, err = fs.migrateSpaceIndex(indexPath, dirIndexPath, "by-type:"+spaceType, fi.ModTime())
if err != nil {
return nil, err
appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("type", spaceType).Msg("error migrating by type index, continuing")
}
}

Expand Down Expand Up @@ -1479,3 +1321,59 @@ func canDeleteSpace(ctx context.Context, spaceID string, typ string, purge bool,

return errtypes.PermissionDenied(fmt.Sprintf("user is not allowed to delete space %s", n.ID))
}

func (fs *Decomposedfs) readSpaceIndex(indexPath, cacheKey string, mtime time.Time) (map[string][]byte, error) {
return fs.spaceIDCache.LoadOrStore(cacheKey, mtime, func() (map[string][]byte, error) {
// Acquire a read log on the index file
f, err := lockedfile.Open(indexPath)
if err != nil {
return nil, errors.Wrap(err, "unable to lock index to read")
}
defer func() {
rerr := f.Close()

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

// Read current state
msgBytes, err := io.ReadAll(f)
if err != nil {
return nil, errors.Wrap(err, "unable to read index")
}
links := map[string][]byte{}
if len(msgBytes) > 0 {
err = msgpack.Unmarshal(msgBytes, &links)
if err != nil {
return nil, errors.Wrap(err, "unable to parse index")
}
}
return links, nil
})
}

func (fs *Decomposedfs) migrateSpaceIndex(indexPath, dirIndexPath, cacheKey string, mtime time.Time) (map[string][]byte, error) {
return fs.spaceIDCache.LoadOrStore(cacheKey, mtime, func() (map[string][]byte, error) {
links := map[string][]byte{}
m, err := filepath.Glob(dirIndexPath + "/*")
if err != nil {
return nil, err
}
for _, match := range m {
link, err := os.Readlink(match)
if err != nil {
continue
}
links[match] = []byte(link)
}

// rewrite index as file
err = fs.writeIndex(indexPath, links, nil)
if err != nil {
return nil, err
}
return links, os.RemoveAll(dirIndexPath)
})
}

0 comments on commit 0d4a53f

Please sign in to comment.