Skip to content

Commit

Permalink
convert by-group-id index to messagepack
Browse files Browse the repository at this point in the history
Signed-off-by: Jörn Friedrich Dreyer <jfd@butonic.de>
  • Loading branch information
butonic committed Jun 19, 2023
1 parent 9b53a67 commit 8602052
Showing 1 changed file with 115 additions and 31 deletions.
146 changes: 115 additions & 31 deletions pkg/storage/utils/decomposedfs/spaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -310,7 +310,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
// Acquire a read log on the index file
f, err := lockedfile.Open(indexPath)
if err != nil {
return nil, errors.Wrap(err, "unable to lock index to read")
return nil, errors.Wrap(err, "unable to lock user index to read")
}
defer func() {
rerr := f.Close()
Expand All @@ -324,13 +324,13 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
// Read current state
msgBytes, err := io.ReadAll(f)
if err != nil {
return nil, errors.Wrap(err, "unable to read index")
return nil, errors.Wrap(err, "unable to read user index")
}
links := map[string]string{}
if len(msgBytes) > 0 {
err = msgpack.Unmarshal(msgBytes, &links)
if err != nil {
return nil, errors.Wrap(err, "unable to parse index")
return nil, errors.Wrap(err, "unable to parse user index")
}
}
return links, nil
Expand Down Expand Up @@ -393,29 +393,77 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide
}

for _, group := range user.Groups {
indexPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", group)
indexPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", group+".mpk")
fi, err := os.Stat(indexPath)
if err != nil {
continue
}
allMatches, err := fs.spaceIDCache.LoadOrStore("by-group-id:"+group, fi.ModTime(), func() (map[string]string, error) {
path := filepath.Join(fs.o.Root, "indexes", "by-group-id", group, "*")
m, err := filepath.Glob(path)
if err == nil {
allMatches, err = fs.spaceIDCache.LoadOrStore("by-group-id:"+group, fi.ModTime(), func() (map[string]string, error) {
// Acquire a read log on the index file
f, err := lockedfile.Open(indexPath)
if err != nil {
return nil, errors.Wrap(err, "unable to lock group index to read")
}
defer func() {
rerr := f.Close()

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

// Read current state
msgBytes, err := io.ReadAll(f)
if err != nil {
return nil, errors.Wrap(err, "unable to read group index")
}
links := map[string]string{}
if len(msgBytes) > 0 {
err = msgpack.Unmarshal(msgBytes, &links)
if err != nil {
return nil, errors.Wrap(err, "unable to parse group index")
}
}
return links, nil
})
if err != nil {
return nil, err
}
matches := map[string]string{}
for _, match := range m {
link, err := os.Readlink(match)
} else {
indexPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", group)
fi, err := os.Stat(indexPath)
if err != nil {
// no spaces for this group
continue
}
allMatches, err = fs.spaceIDCache.LoadOrStore("by-group-id:"+group, fi.ModTime(), func() (map[string]string, error) {
links := map[string]string{}
dirIndexPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", group)
m, err := filepath.Glob(dirIndexPath + "/*")
if err != nil {
continue
return nil, err
}
matches[match] = link
for _, match := range m {
link, err := os.Readlink(match)
if err != nil {
continue
}
links[match] = link
}

// rewrite index as file
err = fs.writeIndex(indexPath+".mpk", links, nil)
if err != nil {
return nil, err
}
err = os.RemoveAll(dirIndexPath)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("group-id", group).Msg("error migrating by group index, continuing")
}
return links, nil
})
if err != nil {
return nil, err
}
return matches, nil
})
if err != nil {
return nil, err
}

if nodeID == spaceIDAny {
Expand Down Expand Up @@ -921,24 +969,60 @@ func (fs *Decomposedfs) linkSpaceByGroup(ctx context.Context, groupID, spaceID s
if groupID == "" {
return nil
}
// create group index dir
// TODO: pathify groupid
if err := os.MkdirAll(filepath.Join(fs.o.Root, "indexes", "by-group-id", groupID), 0700); err != nil {

// create user index dir
if err := os.MkdirAll(filepath.Join(fs.o.Root, "indexes", "by-group-id"), 0700); err != nil {
return err
}

err := os.Symlink("../../../spaces/"+lookup.Pathify(spaceID, 1, 2)+"/nodes/"+lookup.Pathify(spaceID, 4, 2), filepath.Join(fs.o.Root, "indexes/by-group-id", groupID, spaceID))
indexPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", groupID+".mpk")
// Acquire a write log on the index file
f, err := lockedfile.OpenFile(indexPath, os.O_RDWR|os.O_CREATE, 0600)
if err != nil {
if isAlreadyExists(err) {
appctx.GetLogger(ctx).Debug().Err(err).Str("space", spaceID).Str("group-id", groupID).Msg("symlink already exists")
// FIXME: is it ok to wipe this err if the symlink already exists?
err = nil //nolint
} else {
// TODO how should we handle error cases here?
appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("group-id", groupID).Msg("could not create symlink")
return errors.Wrap(err, "unable to lock index to write")
}
defer func() {
rerr := f.Close()

// if err is non nil we do not overwrite that
if err == nil {
err = rerr
}
}()

// Read current state
msgBytes, err := io.ReadAll(f)
if err != nil {
return errors.Wrap(err, "unable to read index")
}
return nil
links := map[string]string{}
if len(msgBytes) > 0 {
err = msgpack.Unmarshal(msgBytes, &links)
if err != nil {
return errors.Wrap(err, "unable to parse index")
}
} else {
// try reading dir index path
dirIndexPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", groupID)
m, _ := filepath.Glob(dirIndexPath + "/*")
for _, match := range m {
link, err := os.Readlink(match)
if err != nil {
continue
}
links[match] = link
}
err = os.RemoveAll(dirIndexPath)
if err != nil {
appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("group-id", groupID).Msg("error migrating by group index")
}
}

// add new entry
links[spaceID] = "../../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2)
err = fs.writeIndex(indexPath, links, f)

return err
}

// TODO: implement linkSpaceByGroup
Expand Down

0 comments on commit 8602052

Please sign in to comment.