From 406bfda3cc70a72b024e89d02cbca3526541fd5e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Tue, 20 Jun 2023 10:05:30 +0200 Subject: [PATCH] string to []byte, by type index MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../utils/decomposedfs/decomposedfs.go | 2 +- pkg/storage/utils/decomposedfs/spaces.go | 263 ++++++++++++------ 2 files changed, 174 insertions(+), 91 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index d105bea917d..6b847b2eb4a 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -101,7 +101,7 @@ type Decomposedfs struct { cache cache.StatCache UserCache *ttlcache.Cache - spaceIDCache mtimesyncedcache.Cache[string, map[string]string] + spaceIDCache mtimesyncedcache.Cache[string, map[string][]byte] } // NewDefault returns an instance with default components diff --git a/pkg/storage/utils/decomposedfs/spaces.go b/pkg/storage/utils/decomposedfs/spaces.go index aa816fb5252..28514628a10 100644 --- a/pkg/storage/utils/decomposedfs/spaces.go +++ b/pkg/storage/utils/decomposedfs/spaces.go @@ -300,13 +300,13 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide } matches := map[string]struct{}{} + var allMatches map[string][]byte if requestedUserID != nil { - allMatches := map[string]string{} indexPath := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId()+".mpk") fi, err := os.Stat(indexPath) if err == nil { - allMatches, err = fs.spaceIDCache.LoadOrStore("by-user-id:"+requestedUserID.GetOpaqueId(), fi.ModTime(), func() (map[string]string, error) { + allMatches, err = fs.spaceIDCache.LoadOrStore("by-user-id:"+requestedUserID.GetOpaqueId(), fi.ModTime(), func() (map[string][]byte, error) { // Acquire a read log on the index file f, err := lockedfile.Open(indexPath) if err != nil { @@ -326,7 +326,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide if err != nil { return nil, errors.Wrap(err, "unable to read user index") } - links := map[string]string{} + links := map[string][]byte{} if len(msgBytes) > 0 { err = msgpack.Unmarshal(msgBytes, &links) if err != nil { @@ -341,34 +341,36 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide } else { indexPath := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId()) fi, err := os.Stat(indexPath) - if err == nil { - allMatches, err = fs.spaceIDCache.LoadOrStore("by-user-id:"+requestedUserID.GetOpaqueId(), fi.ModTime(), func() (map[string]string, error) { - links := map[string]string{} - dirIndexPath := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId()) - m, err := filepath.Glob(dirIndexPath + "/*") - if err != nil { - return nil, err - } - for _, match := range m { - link, err := os.Readlink(match) - if err != nil { - continue - } - links[match] = link - } + if err != nil { + return nil, err + } - // rewrite index as file - err = fs.writeIndex(indexPath+".mpk", links, nil) - if err != nil { - return nil, err - } - err = os.RemoveAll(dirIndexPath) + allMatches, err = fs.spaceIDCache.LoadOrStore("by-user-id:"+requestedUserID.GetOpaqueId(), fi.ModTime(), func() (map[string][]byte, error) { + links := map[string][]byte{} + dirIndexPath := filepath.Join(fs.o.Root, "indexes", "by-user-id", requestedUserID.GetOpaqueId()) + m, err := filepath.Glob(dirIndexPath + "/*") + if err != nil { + return nil, err + } + for _, match := range m { + link, err := os.Readlink(match) if err != nil { - appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("user-id", requestedUserID.GetOpaqueId()).Msg("error migrating by user index, continuing") + continue } - return links, nil - }) - } + links[match] = []byte(link) + } + + // rewrite index as file + err = fs.writeIndex(indexPath+".mpk", links, nil) + if err != nil { + return nil, err + } + err = os.RemoveAll(dirIndexPath) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("user-id", requestedUserID.GetOpaqueId()).Msg("error migrating by user index, continuing") + } + return links, nil + }) if err != nil { return nil, err } @@ -376,10 +378,10 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide if nodeID == spaceIDAny { for _, match := range allMatches { - matches[match] = struct{}{} + matches[string(match)] = struct{}{} } } else { - matches[allMatches[nodeID]] = struct{}{} + matches[string(allMatches[nodeID])] = struct{}{} } // get Groups for userid @@ -396,7 +398,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide indexPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", group+".mpk") fi, err := os.Stat(indexPath) if err == nil { - allMatches, err = fs.spaceIDCache.LoadOrStore("by-group-id:"+group, fi.ModTime(), func() (map[string]string, error) { + allMatches, err = fs.spaceIDCache.LoadOrStore("by-group-id:"+group, fi.ModTime(), func() (map[string][]byte, error) { // Acquire a read log on the index file f, err := lockedfile.Open(indexPath) if err != nil { @@ -416,7 +418,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide if err != nil { return nil, errors.Wrap(err, "unable to read group index") } - links := map[string]string{} + links := map[string][]byte{} if len(msgBytes) > 0 { err = msgpack.Unmarshal(msgBytes, &links) if err != nil { @@ -435,8 +437,8 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide // no spaces for this group continue } - allMatches, err = fs.spaceIDCache.LoadOrStore("by-group-id:"+group, fi.ModTime(), func() (map[string]string, error) { - links := map[string]string{} + allMatches, err = fs.spaceIDCache.LoadOrStore("by-group-id:"+group, fi.ModTime(), func() (map[string][]byte, error) { + links := map[string][]byte{} dirIndexPath := filepath.Join(fs.o.Root, "indexes", "by-group-id", group) m, err := filepath.Glob(dirIndexPath + "/*") if err != nil { @@ -447,7 +449,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide if err != nil { continue } - links[match] = link + links[match] = []byte(link) } // rewrite index as file @@ -468,51 +470,104 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide if nodeID == spaceIDAny { for _, match := range allMatches { - matches[match] = struct{}{} + matches[string(match)] = struct{}{} } } else { - matches[allMatches[nodeID]] = struct{}{} + matches[string(allMatches[nodeID])] = struct{}{} } } } if requestedUserID == nil { - for spaceType := range spaceTypes { - indexPath := filepath.Join(fs.o.Root, "indexes", "by-type") - if spaceType != spaceTypeAny { - indexPath = filepath.Join(indexPath, spaceType) + if _, ok := spaceTypes[spaceTypeAny]; ok { + // TODO do not hardcode dirs + spaceTypes = map[string]struct{}{ + "personal": {}, + "project": {}, + "share": {}, } + } + + for spaceType := range spaceTypes { + indexPath := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType+".mpk") fi, err := os.Stat(indexPath) - if err != nil { - continue - } - allMatches, err := fs.spaceIDCache.LoadOrStore("by-type:"+spaceType, fi.ModTime(), func() (map[string]string, error) { - path := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType, "*") - m, err := filepath.Glob(path) + if err == nil { + allMatches, err = fs.spaceIDCache.LoadOrStore("by-type:"+spaceType, fi.ModTime(), func() (map[string][]byte, error) { + // Acquire a read log on the index file + f, err := lockedfile.Open(indexPath) + if err != nil { + return nil, errors.Wrap(err, "unable to lock type index to read") + } + defer func() { + rerr := f.Close() + + // if err is non nil we do not overwrite that + if err == nil { + err = rerr + } + }() + + // Read current state + msgBytes, err := io.ReadAll(f) + if err != nil { + return nil, errors.Wrap(err, "unable to read type index") + } + links := map[string][]byte{} + if len(msgBytes) > 0 { + err = msgpack.Unmarshal(msgBytes, &links) + if err != nil { + return nil, errors.Wrap(err, "unable to parse type index") + } + } + return links, nil + }) if err != nil { return nil, err } - matches := map[string]string{} - for _, match := range m { - link, err := os.Readlink(match) + } else { + indexPath := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType) + fi, err := os.Stat(indexPath) + if err != nil { + continue + } + + allMatches, err = fs.spaceIDCache.LoadOrStore("by-type:"+spaceType, fi.ModTime(), func() (map[string][]byte, error) { + dirIndexPath := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType, "*") + m, err := filepath.Glob(dirIndexPath) if err != nil { - continue + return nil, err } - matches[match] = link + links := map[string][]byte{} + for _, match := range m { + link, err := os.Readlink(match) + if err != nil { + continue + } + links[match] = []byte(link) + } + // rewrite index as file + err = fs.writeIndex(indexPath+".mpk", links, nil) + if err != nil { + return nil, err + } + err = os.RemoveAll(dirIndexPath) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("spaceType", spaceType).Msg("error migrating by type index, continuing") + } + return links, nil + }) + if err != nil { + return nil, err } - return matches, nil - }) - if err != nil { - return nil, err } if nodeID == spaceIDAny { for _, match := range allMatches { - matches[match] = struct{}{} + matches[string(match)] = struct{}{} } } else { - matches[allMatches[nodeID]] = struct{}{} + matches[string(allMatches[nodeID])] = struct{}{} } } } @@ -605,7 +660,7 @@ func (fs *Decomposedfs) ListStorageSpaces(ctx context.Context, filter []*provide } -func (*Decomposedfs) writeIndex(indexPath string, links map[string]string, writer io.Writer) error { +func (*Decomposedfs) writeIndex(indexPath string, links map[string][]byte, writer io.Writer) error { if writer == nil { var err error // aquire writelock @@ -858,6 +913,7 @@ func (fs *Decomposedfs) DeleteStorageSpace(ctx context.Context, req *provider.De return err } // remove type index + // TODO invalidate ALL indexes in msgpack, not only by type spaceTypePath := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType, spaceID) if err := os.Remove(spaceTypePath); err != nil { return err @@ -919,7 +975,7 @@ func (fs *Decomposedfs) linkSpaceByUser(ctx context.Context, userID, spaceID str // Acquire a write log on the index file f, err := lockedfile.OpenFile(indexPath, os.O_RDWR|os.O_CREATE, 0600) if err != nil { - return errors.Wrap(err, "unable to lock index to write") + return errors.Wrap(err, "unable to lock user index to write") } defer func() { rerr := f.Close() @@ -933,13 +989,13 @@ func (fs *Decomposedfs) linkSpaceByUser(ctx context.Context, userID, spaceID str // Read current state msgBytes, err := io.ReadAll(f) if err != nil { - return errors.Wrap(err, "unable to read index") + return errors.Wrap(err, "unable to read user index") } - links := map[string]string{} + links := map[string][]byte{} if len(msgBytes) > 0 { err = msgpack.Unmarshal(msgBytes, &links) if err != nil { - return errors.Wrap(err, "unable to parse index") + return errors.Wrap(err, "unable to parse user index") } } else { // try reading dir index path @@ -950,7 +1006,7 @@ func (fs *Decomposedfs) linkSpaceByUser(ctx context.Context, userID, spaceID str if err != nil { continue } - links[match] = link + links[match] = []byte(link) } err = os.RemoveAll(dirIndexPath) if err != nil { @@ -959,10 +1015,8 @@ func (fs *Decomposedfs) linkSpaceByUser(ctx context.Context, userID, spaceID str } // add new entry - links[spaceID] = "../../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2) - err = fs.writeIndex(indexPath, links, f) - - return err + links[spaceID] = []byte("../../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2)) + return fs.writeIndex(indexPath, links, f) } func (fs *Decomposedfs) linkSpaceByGroup(ctx context.Context, groupID, spaceID string) error { @@ -979,7 +1033,7 @@ func (fs *Decomposedfs) linkSpaceByGroup(ctx context.Context, groupID, spaceID s // Acquire a write log on the index file f, err := lockedfile.OpenFile(indexPath, os.O_RDWR|os.O_CREATE, 0600) if err != nil { - return errors.Wrap(err, "unable to lock index to write") + return errors.Wrap(err, "unable to lock group index to write") } defer func() { rerr := f.Close() @@ -993,13 +1047,13 @@ func (fs *Decomposedfs) linkSpaceByGroup(ctx context.Context, groupID, spaceID s // Read current state msgBytes, err := io.ReadAll(f) if err != nil { - return errors.Wrap(err, "unable to read index") + return errors.Wrap(err, "unable to read group index") } - links := map[string]string{} + links := map[string][]byte{} if len(msgBytes) > 0 { err = msgpack.Unmarshal(msgBytes, &links) if err != nil { - return errors.Wrap(err, "unable to parse index") + return errors.Wrap(err, "unable to parse group index") } } else { // try reading dir index path @@ -1010,7 +1064,7 @@ func (fs *Decomposedfs) linkSpaceByGroup(ctx context.Context, groupID, spaceID s if err != nil { continue } - links[match] = link + links[match] = []byte(link) } err = os.RemoveAll(dirIndexPath) if err != nil { @@ -1019,10 +1073,8 @@ func (fs *Decomposedfs) linkSpaceByGroup(ctx context.Context, groupID, spaceID s } // add new entry - links[spaceID] = "../../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2) - err = fs.writeIndex(indexPath, links, f) - - return err + links[spaceID] = []byte("../../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2)) + return fs.writeIndex(indexPath, links, f) } // TODO: implement linkSpaceByGroup @@ -1032,26 +1084,57 @@ func (fs *Decomposedfs) linkStorageSpaceType(ctx context.Context, spaceType stri return nil } // create space type dir - if err := os.MkdirAll(filepath.Join(fs.o.Root, "indexes", "by-type", spaceType), 0700); err != nil { + if err := os.MkdirAll(filepath.Join(fs.o.Root, "indexes", "by-type"), 0700); err != nil { return err } - // link space in spacetypes - err := os.Symlink("../../../spaces/"+lookup.Pathify(spaceID, 1, 2)+"/nodes/"+lookup.Pathify(spaceID, 4, 2), filepath.Join(fs.o.Root, "indexes", "by-type", spaceType, spaceID)) + indexPath := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType+".mpk") + // Acquire a write log on the index file + f, err := lockedfile.OpenFile(indexPath, os.O_RDWR|os.O_CREATE, 0600) if err != nil { - if isAlreadyExists(err) { - appctx.GetLogger(ctx).Debug().Err(err).Str("space", spaceID).Str("spacetype", spaceType).Msg("symlink already exists") - // FIXME: is it ok to wipe this err if the symlink already exists? - } else { - // TODO how should we handle error cases here? - appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("spacetype", spaceType).Msg("could not create symlink") - return err + return errors.Wrap(err, "unable to lock type index to write") + } + defer func() { + rerr := f.Close() + + // if err is non nil we do not overwrite that + if err == nil { + err = rerr + } + }() + + // Read current state + msgBytes, err := io.ReadAll(f) + if err != nil { + return errors.Wrap(err, "unable to read type index") + } + links := map[string][]byte{} + if len(msgBytes) > 0 { + err = msgpack.Unmarshal(msgBytes, &links) + if err != nil { + return errors.Wrap(err, "unable to parse type index") + } + } else { + // try reading dir index path + dirIndexPath := filepath.Join(fs.o.Root, "indexes", "by-type", spaceType) + m, _ := filepath.Glob(dirIndexPath + "/*") + for _, match := range m { + link, err := os.Readlink(match) + if err != nil { + continue + } + links[match] = []byte(link) + } + err = os.RemoveAll(dirIndexPath) + if err != nil { + appctx.GetLogger(ctx).Error().Err(err).Str("space", spaceID).Str("spaceType", spaceType).Msg("error migrating by type index") } } - // touch index root to invalidate caches - now := time.Now() - return os.Chtimes(filepath.Join(fs.o.Root, "indexes", "by-type"), now, now) + // add new entry + links[spaceID] = []byte("../../../spaces/" + lookup.Pathify(spaceID, 1, 2) + "/nodes/" + lookup.Pathify(spaceID, 4, 2)) + + return fs.writeIndex(indexPath, links, f) } func (fs *Decomposedfs) storageSpaceFromNode(ctx context.Context, n *node.Node, checkPermissions bool) (*provider.StorageSpace, error) {