From dd5f382b92ebb454db169d7ad41a9ef09bf64c4d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?J=C3=B6rn=20Friedrich=20Dreyer?= Date: Fri, 8 Sep 2023 17:50:28 +0200 Subject: [PATCH] move more functions to upload package MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Jörn Friedrich Dreyer --- .../utils/decomposedfs/decomposedfs.go | 308 +-------------- pkg/storage/utils/decomposedfs/upload.go | 374 ++++++++---------- .../utils/decomposedfs/upload/processing.go | 319 +++++++++++++++ .../utils/decomposedfs/upload/upload.go | 50 +++ 4 files changed, 533 insertions(+), 518 deletions(-) diff --git a/pkg/storage/utils/decomposedfs/decomposedfs.go b/pkg/storage/utils/decomposedfs/decomposedfs.go index 5f84a835ee3..86c1323ad5f 100644 --- a/pkg/storage/utils/decomposedfs/decomposedfs.go +++ b/pkg/storage/utils/decomposedfs/decomposedfs.go @@ -38,7 +38,6 @@ import ( "github.com/aws/aws-sdk-go/aws/credentials" "github.com/aws/aws-sdk-go/aws/session" "github.com/aws/aws-sdk-go/service/s3" - user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" rpcv1beta1 "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" ctxpkg "github.com/cs3org/reva/v2/pkg/ctx" @@ -46,7 +45,6 @@ import ( "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/logger" "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" - "github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/tus" "github.com/cs3org/reva/v2/pkg/rhttp/datatx/utils/download" "github.com/cs3org/reva/v2/pkg/storage" "github.com/cs3org/reva/v2/pkg/storage/cache" @@ -258,317 +256,13 @@ func New(o *options.Options, lu *lookup.Lookup, p Permissions, tp Tree, es event } for i := 0; i < o.Events.NumConsumers; i++ { - go fs.Postprocessing(ch) + go upload.Postprocessing(lu, tp, fs.cache, es, tusDataStore, blobstore, fs.downloadURL, ch) } } return fs, nil } -// Postprocessing starts the postprocessing result collector -func (fs *Decomposedfs) Postprocessing(ch <-chan events.Event) { - ctx := context.TODO() // we should pass the trace id in the event and initialize the trace provider here - ctx, span := tracer.Start(ctx, "Postprocessing") - defer span.End() - log := logger.New() - for event := range ch { - switch ev := event.Event.(type) { - case events.PostprocessingFinished: - up, err := fs.tusDataStore.GetUpload(ctx, ev.UploadID) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") - continue // NOTE: since we can't get the upload, we can't delete the blob - } - info, err := up.GetInfo(ctx) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload info") - continue // NOTE: since we can't get the upload, we can't delete the blob - } - - var ( - failed bool - keepUpload bool - ) - - var sizeDiff int64 - // propagate sizeDiff after failed postprocessing - - n, err := upload.ReadNode(ctx, fs.lu, info) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID). - Str("space", info.MetaData[tus.CS3Prefix+"SpaceRoot"]). - Str("parent", info.MetaData[tus.CS3Prefix+"NodeParentId"]). - Str("node", info.MetaData[tus.CS3Prefix+"NodeId"]). - Str("revision", info.MetaData[tus.CS3Prefix+"RevisionTime"]). - Str("name", info.MetaData[tus.CS3Prefix+"filename"]). - Msg("could not read revision node") - continue - } - - switch ev.Outcome { - default: - log.Error().Str("outcome", string(ev.Outcome)).Str("uploadID", ev.UploadID).Msg("unknown postprocessing outcome - aborting") - fallthrough - case events.PPOutcomeAbort: - failed = true - keepUpload = true - case events.PPOutcomeContinue: - if err := upload.Finalize(ctx, fs.blobstore, info, n); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload") - keepUpload = true // should we keep the upload when assembling failed? - failed = true - } - sizeDiff, err = upload.SetNodeToRevision(ctx, fs.lu, n, info.MetaData[tus.CS3Prefix+"RevisionTime"]) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload") - keepUpload = true // should we keep the upload when assembling failed? - failed = true - } - case events.PPOutcomeDelete: - failed = true - } - - getParent := func() *node.Node { - p, err := n.Parent(ctx) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read parent") - return nil - } - return p - } - - now := time.Now() - if failed { - // propagate sizeDiff after failed postprocessing - if err := fs.tp.Propagate(ctx, n, 0); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change") - } - - } else if p := getParent(); p != nil { - // update parent tmtime to propagate etag change after successful postprocessing - _ = p.SetTMTime(ctx, &now) - if err := fs.tp.Propagate(ctx, p, sizeDiff); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate etag change") - } - } - - fs.Cleanup(ctx, n, info, failed) - if !keepUpload { - if tup, ok := up.(tusHandler.TerminatableUpload); ok { - tup.Terminate(ctx) - } - } - - // remove cache entry in gateway - fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) - - if err := events.Publish( - ctx, - fs.stream, - events.UploadReady{ - UploadID: ev.UploadID, - Failed: failed, - ExecutingUser: &user.User{ - Id: &user.UserId{ - Type: user.UserType(user.UserType_value[info.MetaData[tus.CS3Prefix+"ExecutantType"]]), - Idp: info.MetaData[tus.CS3Prefix+"ExecutantIdp"], - OpaqueId: info.MetaData[tus.CS3Prefix+"ExecutantId"], - }, - Username: info.MetaData[tus.CS3Prefix+"ExecutantUserName"], - }, - Filename: ev.Filename, - FileRef: &provider.Reference{ - ResourceId: &provider.ResourceId{ - StorageId: info.MetaData[tus.CS3Prefix+"providerID"], - SpaceId: info.MetaData[tus.CS3Prefix+"SpaceRoot"], - OpaqueId: info.MetaData[tus.CS3Prefix+"SpaceRoot"], - }, - // FIXME this seems wrong, path is not really relative to space root - // actually it is: InitiateUpload calls fs.lu.Path to get the path relative to the root so soarch can index the path - // hm is that robust? what if the file is moved? shouldn't we store the parent id, then? - Path: utils.MakeRelativePath(filepath.Join(info.MetaData[tus.CS3Prefix+"dir"], info.MetaData[tus.CS3Prefix+"filename"])), - }, - Timestamp: utils.TimeToTS(now), - SpaceOwner: n.SpaceOwnerOrManager(ctx), - }, - ); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event") - } - case events.RestartPostprocessing: - up, err := fs.tusDataStore.GetUpload(ctx, ev.UploadID) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") - continue // NOTE: since we can't get the upload, we can't restart postprocessing - } - info, err := up.GetInfo(ctx) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload info") - continue // NOTE: since we can't get the upload, we can't restart postprocessing - } - - n, err := upload.ReadNode(ctx, fs.lu, info) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID). - Str("space", info.MetaData[tus.CS3Prefix+"SpaceRoot"]). - Str("parent", info.MetaData[tus.CS3Prefix+"NodeParentId"]). - Str("node", info.MetaData[tus.CS3Prefix+"NodeId"]). - Str("revision", info.MetaData[tus.CS3Prefix+"RevisionTime"]). - Str("name", info.MetaData[tus.CS3Prefix+"filename"]). - Msg("could not read revision node") - continue - } - - s, err := fs.downloadURL(ctx, ev.UploadID) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url") - continue - } - // restart postprocessing - if err := events.Publish(ctx, fs.stream, events.BytesReceived{ - UploadID: info.ID, - URL: s, - SpaceOwner: n.SpaceOwnerOrManager(ctx), - ExecutingUser: &user.User{Id: &user.UserId{OpaqueId: "postprocessing-restart"}}, // send nil instead? - ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}, - Filename: info.MetaData[tus.CS3Prefix+"filename"], - Filesize: uint64(info.Size), - }); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish BytesReceived event") - } - case events.PostprocessingStepFinished: - if ev.FinishedStep != events.PPStepAntivirus { - // atm we are only interested in antivirus results - continue - } - - res := ev.Result.(events.VirusscanResult) - if res.ErrorMsg != "" { - // scan failed somehow - // Should we handle this here? - continue - } - - var n *node.Node - switch ev.UploadID { - case "": - // uploadid is empty -> this was an on-demand scan - /* ON DEMAND SCANNING NOT SUPPORTED ATM - ctx := ctxpkg.ContextSetUser(context.Background(), ev.ExecutingUser) - ref := &provider.Reference{ResourceId: ev.ResourceID} - - no, err := fs.lu.NodeFromResource(ctx, ref) - if err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to get node after scan") - continue - - } - n = no - if ev.Outcome == events.PPOutcomeDelete { - // antivir wants us to delete the file. We must obey and need to - - // check if there a previous versions existing - revs, err := fs.ListRevisions(ctx, ref) - if len(revs) == 0 { - if err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to list revisions. Fallback to delete file") - } - - // no versions -> trash file - err := fs.Delete(ctx, ref) - if err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to delete infected resource") - continue - } - - // now purge it from the recycle bin - if err := fs.PurgeRecycleItem(ctx, &provider.Reference{ResourceId: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.SpaceID}}, n.ID, "/"); err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to purge infected resource from trash") - } - - // remove cache entry in gateway - fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) - continue - } - - // we have versions - find the newest - versions := make(map[uint64]string) // remember all versions - we need them later - var nv uint64 - for _, v := range revs { - versions[v.Mtime] = v.Key - if v.Mtime > nv { - nv = v.Mtime - } - } - - // restore newest version - if err := fs.RestoreRevision(ctx, ref, versions[nv]); err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", versions[nv]).Msg("Failed to restore revision") - continue - } - - // now find infected version - revs, err = fs.ListRevisions(ctx, ref) - if err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Error listing revisions after restore") - } - - for _, v := range revs { - // we looking for a version that was previously not there - if _, ok := versions[v.Mtime]; ok { - continue - } - - if err := fs.DeleteRevision(ctx, ref, v.Key); err != nil { - log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", v.Key).Msg("Failed to delete revision") - } - } - - // remove cache entry in gateway - fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) - continue - } - */ - default: - // uploadid is not empty -> this is an async upload - up, err := fs.tusDataStore.GetUpload(ctx, ev.UploadID) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") - continue - } - info, err := up.GetInfo(ctx) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload info") - continue - } - - // scan data should be set on the node revision not the node ... then when postprocessing finishes we need to copy the state to the node - - n, err = upload.ReadNode(ctx, fs.lu, info) - if err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID). - Str("space", info.MetaData[tus.CS3Prefix+"SpaceRoot"]). - Str("parent", info.MetaData[tus.CS3Prefix+"NodeParentId"]). - Str("node", info.MetaData[tus.CS3Prefix+"NodeId"]). - Str("revision", info.MetaData[tus.CS3Prefix+"RevisionTime"]). - Str("name", info.MetaData[tus.CS3Prefix+"filename"]). - Msg("could not read revision node") - continue - } - } - - if err := n.SetScanData(ctx, res.Description, res.Scandate); err != nil { - log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", res.ResourceID).Msg("Failed to set scan results") - continue - } - - // remove cache entry in gateway - fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) - default: - log.Error().Interface("event", ev).Msg("Unknown event") - } - } -} - // Shutdown shuts down the storage func (fs *Decomposedfs) Shutdown(ctx context.Context) error { return nil diff --git a/pkg/storage/utils/decomposedfs/upload.go b/pkg/storage/utils/decomposedfs/upload.go index 2a1644eaa06..b511be43caf 100644 --- a/pkg/storage/utils/decomposedfs/upload.go +++ b/pkg/storage/utils/decomposedfs/upload.go @@ -58,6 +58,166 @@ import ( var _idRegexp = regexp.MustCompile(".*/([^/]+).info") +// InitiateUpload returns upload ids corresponding to different protocols it supports +// It creates a node for new files to persist the fileid for the new child. +// TODO read optional content for small files in this request +// TODO InitiateUpload (and Upload) needs a way to receive the expected checksum. Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? +// FIXME metadata is actually used to carry all kinds of headers +func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, headers map[string]string) (map[string]string, error) { + log := appctx.GetLogger(ctx) + + n, err := fs.lu.NodeFromResource(ctx, ref) + switch err.(type) { + case nil: + // ok + case errtypes.IsNotFound: + return nil, errtypes.PreconditionFailed(err.Error()) + default: + return nil, err + } + + // permissions are checked in NewUpload below + + relative, err := fs.lu.Path(ctx, n, node.NoCheck) + if err != nil { + return nil, err + } + + tusMetadata := tusd.MetaData{} + + // checksum is sent as tus Upload-Checksum header and should not magically become a metadata property + if checksum, ok := headers["checksum"]; ok { + parts := strings.SplitN(checksum, " ", 2) + if len(parts) != 2 { + return nil, errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'") + } + switch parts[0] { + case "sha1", "md5", "adler32": + tusMetadata[tus.CS3Prefix+"checksum"] = checksum + default: + return nil, errtypes.BadRequest("unsupported checksum algorithm: " + parts[0]) + } + } + + // if mtime has been set via tus metadata, expose it as tus metadata + if ocmtime, ok := headers["mtime"]; ok { + if ocmtime != "null" { + tusMetadata[tus.TusPrefix+"mtime"] = ocmtime + } + } + + _, err = node.CheckQuota(ctx, n.SpaceRoot, n.Exists, uint64(n.Blobsize), uint64(uploadLength)) + if err != nil { + return nil, err + } + + // check permissions + var ( + checkNode *node.Node + path string + ) + if n.Exists { + // check permissions of file to be overwritten + checkNode = n + path, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ + SpaceId: checkNode.SpaceID, + OpaqueId: checkNode.ID, + }}) + } else { + // check permissions of parent + parent, perr := n.Parent(ctx) + if perr != nil { + return nil, errors.Wrap(perr, "Decomposedfs: error getting parent "+n.ParentID) + } + checkNode = parent + path, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ + SpaceId: checkNode.SpaceID, + OpaqueId: checkNode.ID, + }, Path: n.Name}) + } + rp, err := fs.p.AssemblePermissions(ctx, checkNode) // context does not have a user? + switch { + case err != nil: + return nil, err + case !rp.InitiateFileUpload: + return nil, errtypes.PermissionDenied(path) + } + + // are we trying to overwrite a folder with a file? + if n.Exists && n.IsDir(ctx) { + return nil, errtypes.PreconditionFailed("resource is not a file") + } + + // check lock + // FIXME we cannot check the lock of a new file, because it would have to use the name ... + if err := n.CheckLock(ctx); err != nil { + return nil, err + } + + usr := ctxpkg.ContextMustGetUser(ctx) + + info := tusd.FileInfo{ + MetaData: tusMetadata, + Size: uploadLength, + } + if lockID, ok := ctxpkg.ContextGetLockID(ctx); ok { + info.MetaData[tus.CS3Prefix+"lockid"] = lockID + } + info.MetaData[tus.CS3Prefix+"dir"] = filepath.Dir(relative) + info.MetaData[tus.CS3Prefix+"filename"] = n.Name + info.MetaData[tus.CS3Prefix+"SpaceRoot"] = n.SpaceRoot.ID + info.MetaData[tus.CS3Prefix+"SpaceOwnerOrManager"] = n.SpaceOwnerOrManager(ctx).GetOpaqueId() + info.MetaData[tus.CS3Prefix+"providerID"] = headers["providerID"] + + info.MetaData[tus.CS3Prefix+"RevisionTime"] = time.Now().UTC().Format(time.RFC3339Nano) + info.MetaData[tus.CS3Prefix+"NodeId"] = n.ID + info.MetaData[tus.CS3Prefix+"NodeParentId"] = n.ParentID + + info.MetaData[tus.CS3Prefix+"ExecutantIdp"] = usr.Id.Idp + info.MetaData[tus.CS3Prefix+"ExecutantId"] = usr.Id.OpaqueId + info.MetaData[tus.CS3Prefix+"ExecutantType"] = utils.UserTypeToString(usr.Id.Type) + info.MetaData[tus.CS3Prefix+"ExecutantUserName"] = usr.Username + + info.MetaData[tus.CS3Prefix+"LogLevel"] = log.GetLevel().String() + + // expires has been set by the storageprovider, do not expose as metadata. It is sent as a tus Upload-Expires header + if expiration, ok := headers["expires"]; ok { + if expiration != "null" { // TODO this is set by the storageprovider ... it cannot be set by cliensts, so it can never be the string 'null' ... or can it??? + info.MetaData[tus.CS3Prefix+"expires"] = expiration + } + } + // only check preconditions if they are not empty + // do not expose as metadata + if headers["if-match"] != "" { + info.MetaData[tus.CS3Prefix+"if-match"] = headers["if-match"] // TODO drop? + } + if headers["if-none-match"] != "" { + info.MetaData[tus.CS3Prefix+"if-none-match"] = headers["if-none-match"] + } + if headers["if-unmodified-since"] != "" { + info.MetaData[tus.CS3Prefix+"if-unmodified-since"] = headers["if-unmodified-since"] + } + + if info.MetaData[tus.CS3Prefix+"if-none-match"] == "*" && n.Exists { + return nil, errtypes.Aborted(fmt.Sprintf("parent %s already has a child %s", n.ID, n.Name)) + } + + // create the upload + upload, err := fs.tusDataStore.NewUpload(ctx, info) + if err != nil { + return nil, err + } + + info, _ = upload.GetInfo(ctx) + + log.Debug().Interface("node", n).Interface("headers", headers).Msg("Decomposedfs: initiated upload") + + return map[string]string{ + "simple": info.ID, + "tus": info.ID, + }, nil +} + // PreFinishResponseCallback is called by the tus datatx, after all bytes have been transferred func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error { ctx := context.TODO() @@ -144,7 +304,7 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error { n, err := upload.AddRevisionToNode(ctx, fs.lu, info, attrs) if err != nil { - fs.Cleanup(ctx, n, info, true) + upload.Cleanup(ctx, fs.lu, n, info, true) if tup, ok := up.(tusHandler.TerminatableUpload); ok { tup.Terminate(ctx) } @@ -182,7 +342,7 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error { if !fs.o.AsyncFileUploads { // handle postprocessing synchronously err = upload.Finalize(ctx, fs.blobstore, info, n) // moving or copying the blob only reads the blobid, no need to change the revision nodes nodeid - fs.Cleanup(ctx, n, info, err != nil) + upload.Cleanup(ctx, fs.lu, n, info, err != nil) if tup, ok := up.(tusHandler.TerminatableUpload); ok { tup.Terminate(ctx) } @@ -200,56 +360,6 @@ func (fs *Decomposedfs) PreFinishResponseCallback(hook tusd.HookEvent) error { return fs.tp.Propagate(ctx, n, sizeDiff) } -// Cleanup cleans the upload -func (fs *Decomposedfs) Cleanup(ctx context.Context, n *node.Node, info tusd.FileInfo, failure bool) { - ctx, span := tracer.Start(ctx, "Cleanup") - defer span.End() - - if n != nil { // node can be nil when there was an error before it was created (eg. checksum-mismatch) - if failure { - fs.removeRevision(ctx, n, info.MetaData[tus.CS3Prefix+"RevisionTime"]) - } - // unset processing status - if err := n.UnmarkProcessing(ctx, info.ID); err != nil { - log := appctx.GetLogger(ctx) - log.Info().Str("path", n.InternalPath()).Err(err).Msg("unmarking processing failed") - } - } -} - -// removeRevision cleans up after the upload is finished -func (fs *Decomposedfs) removeRevision(ctx context.Context, n *node.Node, revision string) { - log := appctx.GetLogger(ctx) - nodePath := n.InternalPath() - revisionPath := nodePath + node.RevisionIDDelimiter + revision - // remove revision - if err := utils.RemoveItem(revisionPath); err != nil { - log.Info().Str("path", revisionPath).Err(err).Msg("removing revision failed") - } - // purge revision metadata to clean up cache - if err := fs.lu.MetadataBackend().Purge(revisionPath); err != nil { - log.Info().Str("path", revisionPath).Err(err).Msg("purging revision metadata failed") - } - - if n.BlobID == "" { - // no old version was present - remove child entry symlink from directory - src := filepath.Join(n.ParentPath(), n.Name) - if err := os.Remove(src); err != nil { - log.Info().Str("path", n.ParentPath()).Err(err).Msg("removing node from parent failed") - } - - // delete node - if err := utils.RemoveItem(nodePath); err != nil { - log.Info().Str("path", nodePath).Err(err).Msg("removing node failed") - } - - // purge node metadata to clean up cache - if err := fs.lu.MetadataBackend().Purge(nodePath); err != nil { - log.Info().Str("path", nodePath).Err(err).Msg("purging node metadata failed") - } - } -} - // URL returns a url to download an upload func (fs *Decomposedfs) downloadURL(_ context.Context, id string) (string, error) { type transferClaims struct { @@ -384,165 +494,7 @@ func (fs *Decomposedfs) Upload(ctx context.Context, ref *provider.Reference, r i return ri, nil } -// InitiateUpload returns upload ids corresponding to different protocols it supports -// It creates a node for new files to persist the fileid for the new child. -// TODO read optional content for small files in this request -// TODO InitiateUpload (and Upload) needs a way to receive the expected checksum. Maybe in metadata as 'checksum' => 'sha1 aeosvp45w5xaeoe' = lowercase, space separated? -// FIXME metadata is actually used to carry all kinds of headers -func (fs *Decomposedfs) InitiateUpload(ctx context.Context, ref *provider.Reference, uploadLength int64, headers map[string]string) (map[string]string, error) { - log := appctx.GetLogger(ctx) - - n, err := fs.lu.NodeFromResource(ctx, ref) - switch err.(type) { - case nil: - // ok - case errtypes.IsNotFound: - return nil, errtypes.PreconditionFailed(err.Error()) - default: - return nil, err - } - - // permissions are checked in NewUpload below - - relative, err := fs.lu.Path(ctx, n, node.NoCheck) - if err != nil { - return nil, err - } - - tusMetadata := tusd.MetaData{} - - // checksum is sent as tus Upload-Checksum header and should not magically become a metadata property - if checksum, ok := headers["checksum"]; ok { - parts := strings.SplitN(checksum, " ", 2) - if len(parts) != 2 { - return nil, errtypes.BadRequest("invalid checksum format. must be '[algorithm] [checksum]'") - } - switch parts[0] { - case "sha1", "md5", "adler32": - tusMetadata[tus.CS3Prefix+"checksum"] = checksum - default: - return nil, errtypes.BadRequest("unsupported checksum algorithm: " + parts[0]) - } - } - - // if mtime has been set via tus metadata, expose it as tus metadata - if ocmtime, ok := headers["mtime"]; ok { - if ocmtime != "null" { - tusMetadata[tus.TusPrefix+"mtime"] = ocmtime - } - } - - _, err = node.CheckQuota(ctx, n.SpaceRoot, n.Exists, uint64(n.Blobsize), uint64(uploadLength)) - if err != nil { - return nil, err - } - - // check permissions - var ( - checkNode *node.Node - path string - ) - if n.Exists { - // check permissions of file to be overwritten - checkNode = n - path, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ - SpaceId: checkNode.SpaceID, - OpaqueId: checkNode.ID, - }}) - } else { - // check permissions of parent - parent, perr := n.Parent(ctx) - if perr != nil { - return nil, errors.Wrap(perr, "Decomposedfs: error getting parent "+n.ParentID) - } - checkNode = parent - path, _ = storagespace.FormatReference(&provider.Reference{ResourceId: &provider.ResourceId{ - SpaceId: checkNode.SpaceID, - OpaqueId: checkNode.ID, - }, Path: n.Name}) - } - rp, err := fs.p.AssemblePermissions(ctx, checkNode) // context does not have a user? - switch { - case err != nil: - return nil, err - case !rp.InitiateFileUpload: - return nil, errtypes.PermissionDenied(path) - } - - // are we trying to overwrite a folder with a file? - if n.Exists && n.IsDir(ctx) { - return nil, errtypes.PreconditionFailed("resource is not a file") - } - - // check lock - // FIXME we cannot check the lock of a new file, because it would have to use the name ... - if err := n.CheckLock(ctx); err != nil { - return nil, err - } - - usr := ctxpkg.ContextMustGetUser(ctx) - - info := tusd.FileInfo{ - MetaData: tusMetadata, - Size: uploadLength, - } - if lockID, ok := ctxpkg.ContextGetLockID(ctx); ok { - info.MetaData[tus.CS3Prefix+"lockid"] = lockID - } - info.MetaData[tus.CS3Prefix+"dir"] = filepath.Dir(relative) - info.MetaData[tus.CS3Prefix+"filename"] = n.Name - info.MetaData[tus.CS3Prefix+"SpaceRoot"] = n.SpaceRoot.ID - info.MetaData[tus.CS3Prefix+"SpaceOwnerOrManager"] = n.SpaceOwnerOrManager(ctx).GetOpaqueId() - info.MetaData[tus.CS3Prefix+"providerID"] = headers["providerID"] - - info.MetaData[tus.CS3Prefix+"RevisionTime"] = time.Now().UTC().Format(time.RFC3339Nano) - info.MetaData[tus.CS3Prefix+"NodeId"] = n.ID - info.MetaData[tus.CS3Prefix+"NodeParentId"] = n.ParentID - - info.MetaData[tus.CS3Prefix+"ExecutantIdp"] = usr.Id.Idp - info.MetaData[tus.CS3Prefix+"ExecutantId"] = usr.Id.OpaqueId - info.MetaData[tus.CS3Prefix+"ExecutantType"] = utils.UserTypeToString(usr.Id.Type) - info.MetaData[tus.CS3Prefix+"ExecutantUserName"] = usr.Username - - info.MetaData[tus.CS3Prefix+"LogLevel"] = log.GetLevel().String() - - // expires has been set by the storageprovider, do not expose as metadata. It is sent as a tus Upload-Expires header - if expiration, ok := headers["expires"]; ok { - if expiration != "null" { // TODO this is set by the storageprovider ... it cannot be set by cliensts, so it can never be the string 'null' ... or can it??? - info.MetaData[tus.CS3Prefix+"expires"] = expiration - } - } - // only check preconditions if they are not empty - // do not expose as metadata - if headers["if-match"] != "" { - info.MetaData[tus.CS3Prefix+"if-match"] = headers["if-match"] // TODO drop? - } - if headers["if-none-match"] != "" { - info.MetaData[tus.CS3Prefix+"if-none-match"] = headers["if-none-match"] - } - if headers["if-unmodified-since"] != "" { - info.MetaData[tus.CS3Prefix+"if-unmodified-since"] = headers["if-unmodified-since"] - } - - if info.MetaData[tus.CS3Prefix+"if-none-match"] == "*" && n.Exists { - return nil, errtypes.Aborted(fmt.Sprintf("parent %s already has a child %s", n.ID, n.Name)) - } - - // create the upload - upload, err := fs.tusDataStore.NewUpload(ctx, info) - if err != nil { - return nil, err - } - - info, _ = upload.GetInfo(ctx) - - log.Debug().Interface("node", n).Interface("headers", headers).Msg("Decomposedfs: initiated upload") - - return map[string]string{ - "simple": info.ID, - "tus": info.ID, - }, nil -} +// FIXME all the below functions should needs a dedicated package ... the tusd datastore interface has no way of listing uploads, so we need to extend them // ListUploads returns a list of all incomplete uploads func (fs *Decomposedfs) ListUploads() ([]tusd.FileInfo, error) { diff --git a/pkg/storage/utils/decomposedfs/upload/processing.go b/pkg/storage/utils/decomposedfs/upload/processing.go index 365562e486d..bb3130c7954 100644 --- a/pkg/storage/utils/decomposedfs/upload/processing.go +++ b/pkg/storage/utils/decomposedfs/upload/processing.go @@ -20,12 +20,331 @@ package upload import ( "context" + "path/filepath" + "time" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/logger" + "github.com/cs3org/reva/v2/pkg/rhttp/datatx/manager/tus" + "github.com/cs3org/reva/v2/pkg/storage/cache" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/lookup" "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/node" + "github.com/cs3org/reva/v2/pkg/storage/utils/decomposedfs/tree" + "github.com/cs3org/reva/v2/pkg/utils" + tusd "github.com/tus/tusd/pkg/handler" ) // PermissionsChecker defines an interface for checking permissions on a Node type PermissionsChecker interface { AssemblePermissions(ctx context.Context, n *node.Node) (ap provider.ResourcePermissions, err error) } + +type Propagator interface { + Propagate(ctx context.Context, node *node.Node, sizeDiff int64) (err error) +} + +// Postprocessing starts the postprocessing result collector +func Postprocessing(lu *lookup.Lookup, propagator Propagator, cache cache.StatCache, es events.Stream, tusDataStore tusd.DataStore, blobstore tree.Blobstore, downloadURLfunc func(ctx context.Context, id string) (string, error), ch <-chan events.Event) { + ctx := context.TODO() // we should pass the trace id in the event and initialize the trace provider here + ctx, span := tracer.Start(ctx, "Postprocessing") + defer span.End() + log := logger.New() + for event := range ch { + switch ev := event.Event.(type) { + case events.PostprocessingFinished: + up, err := tusDataStore.GetUpload(ctx, ev.UploadID) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + continue // NOTE: since we can't get the upload, we can't delete the blob + } + info, err := up.GetInfo(ctx) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload info") + continue // NOTE: since we can't get the upload, we can't delete the blob + } + + var ( + failed bool + keepUpload bool + ) + + var sizeDiff int64 + // propagate sizeDiff after failed postprocessing + + n, err := ReadNode(ctx, lu, info) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID). + Str("space", info.MetaData[tus.CS3Prefix+"SpaceRoot"]). + Str("parent", info.MetaData[tus.CS3Prefix+"NodeParentId"]). + Str("node", info.MetaData[tus.CS3Prefix+"NodeId"]). + Str("revision", info.MetaData[tus.CS3Prefix+"RevisionTime"]). + Str("name", info.MetaData[tus.CS3Prefix+"filename"]). + Msg("could not read revision node") + continue + } + + switch ev.Outcome { + default: + log.Error().Str("outcome", string(ev.Outcome)).Str("uploadID", ev.UploadID).Msg("unknown postprocessing outcome - aborting") + fallthrough + case events.PPOutcomeAbort: + failed = true + keepUpload = true + case events.PPOutcomeContinue: + if err := Finalize(ctx, blobstore, info, n); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload") + keepUpload = true // should we keep the upload when assembling failed? + failed = true + } + sizeDiff, err = SetNodeToRevision(ctx, lu, n, info.MetaData[tus.CS3Prefix+"RevisionTime"]) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not finalize upload") + keepUpload = true // should we keep the upload when assembling failed? + failed = true + } + case events.PPOutcomeDelete: + failed = true + } + + getParent := func() *node.Node { + p, err := n.Parent(ctx) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not read parent") + return nil + } + return p + } + + now := time.Now() + if failed { + // propagate sizeDiff after failed postprocessing + if err := propagator.Propagate(ctx, n, 0); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate tree size change") + } + + } else if p := getParent(); p != nil { + // update parent tmtime to propagate etag change after successful postprocessing + _ = p.SetTMTime(ctx, &now) + if err := propagator.Propagate(ctx, p, sizeDiff); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not propagate etag change") + } + } + + Cleanup(ctx, lu, n, info, failed) + if !keepUpload { + if tup, ok := up.(tusd.TerminatableUpload); ok { + tup.Terminate(ctx) + } + } + + // remove cache entry in gateway + cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + + if err := events.Publish( + ctx, + es, + events.UploadReady{ + UploadID: ev.UploadID, + Failed: failed, + ExecutingUser: &user.User{ + Id: &user.UserId{ + Type: user.UserType(user.UserType_value[info.MetaData[tus.CS3Prefix+"ExecutantType"]]), + Idp: info.MetaData[tus.CS3Prefix+"ExecutantIdp"], + OpaqueId: info.MetaData[tus.CS3Prefix+"ExecutantId"], + }, + Username: info.MetaData[tus.CS3Prefix+"ExecutantUserName"], + }, + Filename: ev.Filename, + FileRef: &provider.Reference{ + ResourceId: &provider.ResourceId{ + StorageId: info.MetaData[tus.CS3Prefix+"providerID"], + SpaceId: info.MetaData[tus.CS3Prefix+"SpaceRoot"], + OpaqueId: info.MetaData[tus.CS3Prefix+"SpaceRoot"], + }, + // FIXME this seems wrong, path is not really relative to space root + // actually it is: InitiateUpload calls fs.lu.Path to get the path relative to the root so soarch can index the path + // hm is that robust? what if the file is moved? shouldn't we store the parent id, then? + Path: utils.MakeRelativePath(filepath.Join(info.MetaData[tus.CS3Prefix+"dir"], info.MetaData[tus.CS3Prefix+"filename"])), + }, + Timestamp: utils.TimeToTS(now), + SpaceOwner: n.SpaceOwnerOrManager(ctx), + }, + ); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish UploadReady event") + } + case events.RestartPostprocessing: + up, err := tusDataStore.GetUpload(ctx, ev.UploadID) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + continue // NOTE: since we can't get the upload, we can't restart postprocessing + } + info, err := up.GetInfo(ctx) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload info") + continue // NOTE: since we can't get the upload, we can't restart postprocessing + } + + n, err := ReadNode(ctx, lu, info) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID). + Str("space", info.MetaData[tus.CS3Prefix+"SpaceRoot"]). + Str("parent", info.MetaData[tus.CS3Prefix+"NodeParentId"]). + Str("node", info.MetaData[tus.CS3Prefix+"NodeId"]). + Str("revision", info.MetaData[tus.CS3Prefix+"RevisionTime"]). + Str("name", info.MetaData[tus.CS3Prefix+"filename"]). + Msg("could not read revision node") + continue + } + + s, err := downloadURLfunc(ctx, ev.UploadID) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("could not create url") + continue + } + // restart postprocessing + if err := events.Publish(ctx, es, events.BytesReceived{ + UploadID: info.ID, + URL: s, + SpaceOwner: n.SpaceOwnerOrManager(ctx), + ExecutingUser: &user.User{Id: &user.UserId{OpaqueId: "postprocessing-restart"}}, // send nil instead? + ResourceID: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}, + Filename: info.MetaData[tus.CS3Prefix+"filename"], + Filesize: uint64(info.Size), + }); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to publish BytesReceived event") + } + case events.PostprocessingStepFinished: + if ev.FinishedStep != events.PPStepAntivirus { + // atm we are only interested in antivirus results + continue + } + + res := ev.Result.(events.VirusscanResult) + if res.ErrorMsg != "" { + // scan failed somehow + // Should we handle this here? + continue + } + + var n *node.Node + switch ev.UploadID { + case "": + // uploadid is empty -> this was an on-demand scan + /* ON DEMAND SCANNING NOT SUPPORTED ATM + ctx := ctxpkg.ContextSetUser(context.Background(), ev.ExecutingUser) + ref := &provider.Reference{ResourceId: ev.ResourceID} + + no, err := fs.lu.NodeFromResource(ctx, ref) + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to get node after scan") + continue + + } + n = no + if ev.Outcome == events.PPOutcomeDelete { + // antivir wants us to delete the file. We must obey and need to + + // check if there a previous versions existing + revs, err := fs.ListRevisions(ctx, ref) + if len(revs) == 0 { + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to list revisions. Fallback to delete file") + } + + // no versions -> trash file + err := fs.Delete(ctx, ref) + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to delete infected resource") + continue + } + + // now purge it from the recycle bin + if err := fs.PurgeRecycleItem(ctx, &provider.Reference{ResourceId: &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.SpaceID}}, n.ID, "/"); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Failed to purge infected resource from trash") + } + + // remove cache entry in gateway + fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + continue + } + + // we have versions - find the newest + versions := make(map[uint64]string) // remember all versions - we need them later + var nv uint64 + for _, v := range revs { + versions[v.Mtime] = v.Key + if v.Mtime > nv { + nv = v.Mtime + } + } + + // restore newest version + if err := fs.RestoreRevision(ctx, ref, versions[nv]); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", versions[nv]).Msg("Failed to restore revision") + continue + } + + // now find infected version + revs, err = fs.ListRevisions(ctx, ref) + if err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Msg("Error listing revisions after restore") + } + + for _, v := range revs { + // we looking for a version that was previously not there + if _, ok := versions[v.Mtime]; ok { + continue + } + + if err := fs.DeleteRevision(ctx, ref, v.Key); err != nil { + log.Error().Err(err).Interface("resourceID", ev.ResourceID).Str("revision", v.Key).Msg("Failed to delete revision") + } + } + + // remove cache entry in gateway + fs.cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + continue + } + */ + default: + // uploadid is not empty -> this is an async upload + up, err := tusDataStore.GetUpload(ctx, ev.UploadID) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload") + continue + } + info, err := up.GetInfo(ctx) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Msg("Failed to get upload info") + continue + } + + // scan data should be set on the node revision not the node ... then when postprocessing finishes we need to copy the state to the node + + n, err = ReadNode(ctx, lu, info) + if err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID). + Str("space", info.MetaData[tus.CS3Prefix+"SpaceRoot"]). + Str("parent", info.MetaData[tus.CS3Prefix+"NodeParentId"]). + Str("node", info.MetaData[tus.CS3Prefix+"NodeId"]). + Str("revision", info.MetaData[tus.CS3Prefix+"RevisionTime"]). + Str("name", info.MetaData[tus.CS3Prefix+"filename"]). + Msg("could not read revision node") + continue + } + } + + if err := n.SetScanData(ctx, res.Description, res.Scandate); err != nil { + log.Error().Err(err).Str("uploadID", ev.UploadID).Interface("resourceID", res.ResourceID).Msg("Failed to set scan results") + continue + } + + // remove cache entry in gateway + cache.RemoveStatContext(ctx, ev.ExecutingUser.GetId(), &provider.ResourceId{SpaceId: n.SpaceID, OpaqueId: n.ID}) + default: + log.Error().Interface("event", ev).Msg("Unknown event") + } + } +} diff --git a/pkg/storage/utils/decomposedfs/upload/upload.go b/pkg/storage/utils/decomposedfs/upload/upload.go index e7b626b71b8..d5edde5726b 100644 --- a/pkg/storage/utils/decomposedfs/upload/upload.go +++ b/pkg/storage/utils/decomposedfs/upload/upload.go @@ -495,6 +495,56 @@ func ReadNode(ctx context.Context, lu *lookup.Lookup, info tusd.FileInfo) (*node return n, nil } +// Cleanup cleans the upload +func Cleanup(ctx context.Context, lu *lookup.Lookup, n *node.Node, info tusd.FileInfo, failure bool) { + ctx, span := tracer.Start(ctx, "Cleanup") + defer span.End() + + if n != nil { // node can be nil when there was an error before it was created (eg. checksum-mismatch) + if failure { + removeRevision(ctx, lu, n, info.MetaData[tus.CS3Prefix+"RevisionTime"]) + } + // unset processing status + if err := n.UnmarkProcessing(ctx, info.ID); err != nil { + log := appctx.GetLogger(ctx) + log.Info().Str("path", n.InternalPath()).Err(err).Msg("unmarking processing failed") + } + } +} + +// removeRevision cleans up after the upload is finished +func removeRevision(ctx context.Context, lu *lookup.Lookup, n *node.Node, revision string) { + log := appctx.GetLogger(ctx) + nodePath := n.InternalPath() + revisionPath := nodePath + node.RevisionIDDelimiter + revision + // remove revision + if err := utils.RemoveItem(revisionPath); err != nil { + log.Info().Str("path", revisionPath).Err(err).Msg("removing revision failed") + } + // purge revision metadata to clean up cache + if err := lu.MetadataBackend().Purge(revisionPath); err != nil { + log.Info().Str("path", revisionPath).Err(err).Msg("purging revision metadata failed") + } + + if n.BlobID == "" { + // no old version was present - remove child entry symlink from directory + src := filepath.Join(n.ParentPath(), n.Name) + if err := os.Remove(src); err != nil { + log.Info().Str("path", n.ParentPath()).Err(err).Msg("removing node from parent failed") + } + + // delete node + if err := utils.RemoveItem(nodePath); err != nil { + log.Info().Str("path", nodePath).Err(err).Msg("removing node failed") + } + + // purge node metadata to clean up cache + if err := lu.MetadataBackend().Purge(nodePath); err != nil { + log.Info().Str("path", nodePath).Err(err).Msg("purging node metadata failed") + } + } +} + // Finalize finalizes the upload (eg moves the file to the internal destination) func Finalize(ctx context.Context, blobstore tree.Blobstore, info tusd.FileInfo, n *node.Node) error { _, span := tracer.Start(ctx, "Finalize")