From 92195ae04b8454b93f73011c91d1190fd791709f Mon Sep 17 00:00:00 2001 From: jkoberg Date: Wed, 8 Feb 2023 12:38:04 +0100 Subject: [PATCH] sharpen userlog service Signed-off-by: jkoberg --- .drone.star | 1 + .../pkg/config/defaults/defaultconfig.go | 9 +- services/userlog/README.md | 25 +- services/userlog/pkg/command/root.go | 2 +- services/userlog/pkg/command/server.go | 44 ++- services/userlog/pkg/config/config.go | 14 +- .../pkg/config/defaults/defaultconfig.go | 5 + services/userlog/pkg/config/parser/parse.go | 5 + services/userlog/pkg/server/http/option.go | 18 ++ services/userlog/pkg/server/http/server.go | 48 +-- services/userlog/pkg/service/http.go | 118 ++++++++ services/userlog/pkg/service/options.go | 82 ++++++ services/userlog/pkg/service/service.go | 275 ++++++++++++++---- 13 files changed, 556 insertions(+), 90 deletions(-) create mode 100644 services/userlog/pkg/service/http.go create mode 100644 services/userlog/pkg/service/options.go diff --git a/.drone.star b/.drone.star index cd87aa5fb0e..610f6b59b05 100644 --- a/.drone.star +++ b/.drone.star @@ -79,6 +79,7 @@ config = { "services/storage-users", "services/store", "services/thumbnails", + "services/userlog", "services/users", "services/web", "services/webdav", diff --git a/services/proxy/pkg/config/defaults/defaultconfig.go b/services/proxy/pkg/config/defaults/defaultconfig.go index 140a57d8705..daf3a7534ec 100644 --- a/services/proxy/pkg/config/defaults/defaultconfig.go +++ b/services/proxy/pkg/config/defaults/defaultconfig.go @@ -101,6 +101,11 @@ func DefaultPolicies() []config.Policy { Endpoint: "/archiver", Service: "com.owncloud.web.frontend", }, + { + // reroute oc10 notifications endpoint to userlog service + Endpoint: "/ocs/v2.php/apps/notifications/api/v1/notifications", + Service: "com.owncloud.userlog.userlog", + }, { Type: config.RegexRoute, Endpoint: "/ocs/v[12].php/cloud/user/signing-key", // only `user/signing-key` is left in ocis-ocs @@ -202,10 +207,6 @@ func DefaultPolicies() []config.Policy { Endpoint: "/api/v0/settings", Service: "com.owncloud.web.settings", }, - { - Endpoint: "/api/v0/activities", - Service: "com.owncloud.userlog.userlog", - }, }, }, } diff --git a/services/userlog/README.md b/services/userlog/README.md index 612af80c3e4..b6478e3f90d 100644 --- a/services/userlog/README.md +++ b/services/userlog/README.md @@ -1,11 +1,28 @@ -# Userlog service +# Userlog Service -The `userlog` service provides a way to configure which events a user wants to be informed about and an API to retrieve them. +The `userlog` service is a mediator between the `eventhistory` service and clients who want to be informed about user related events. It provides an API to retrieve those. + +## Store + +The `userlog` service persists information via the configured store in `USERLOG_STORE_TYPE`. Possible stores are: + - `mem`: Basic in-memory store and the default. + - `ocmem`: Advanced in-memory store allowing max size. + - `redis`: Stores data in a configured redis cluster. + - `etcd`: Stores data in a configured etcd cluster. + - `nats-js`: Stores data using key-value-store feature of [nats jetstream](https://docs.nats.io/nats-concepts/jetstream/key-value-store) + - `noop`: Stores nothing. Useful for testing. Not recommended in productive enviroments. + +1. Note that in-memory stores are by nature not reboot persistent. +2. Though usually not necessary, a database name and a database table can be configured for event stores if the event store supports this. Generally not applicapable for stores of type `in-memory`. These settings are blank by default which means that the standard settings of the configured store applies. ## Configuring -The `userlog` service has hardcoded configuration for now. +For the time being, the configuration which user related events are of interest is hardcoded and cannot be changed. ## Retrieving -The `userlog` service provides an API to retrieve configured events. +The `userlog` service provides an API to retrieve configured events. For now this API is mostly following [oc10 notification GET API](https://doc.owncloud.com/server/next/developer_manual/core/apis/ocs-notification-endpoint-v1.html#get-user-notifications) + +## Deleting + +To delete events for an user use a `DELETE` request to `ocs/v2.php/apps/notifications/api/v1/notifications` containing the ids to delete. diff --git a/services/userlog/pkg/command/root.go b/services/userlog/pkg/command/root.go index c9fb4b39be3..4a7e7983971 100644 --- a/services/userlog/pkg/command/root.go +++ b/services/userlog/pkg/command/root.go @@ -43,7 +43,7 @@ type SutureService struct { // NewSutureService creates a new userlog.SutureService func NewSutureService(cfg *ociscfg.Config) suture.Service { - cfg.Notifications.Commons = cfg.Commons + cfg.Userlog.Commons = cfg.Commons return SutureService{ cfg: cfg.Userlog, } diff --git a/services/userlog/pkg/command/server.go b/services/userlog/pkg/command/server.go index 076eed54e41..cc5838b1818 100644 --- a/services/userlog/pkg/command/server.go +++ b/services/userlog/pkg/command/server.go @@ -3,25 +3,37 @@ package command import ( "context" "fmt" + "strings" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" + "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" + "github.com/owncloud/ocis/v2/ocis-pkg/store" "github.com/owncloud/ocis/v2/ocis-pkg/version" + ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" "github.com/owncloud/ocis/v2/services/userlog/pkg/config/parser" "github.com/owncloud/ocis/v2/services/userlog/pkg/logging" "github.com/owncloud/ocis/v2/services/userlog/pkg/metrics" "github.com/owncloud/ocis/v2/services/userlog/pkg/server/http" "github.com/urfave/cli/v2" - "go-micro.dev/v4/store" ) // all events we care about var _registeredEvents = []events.Unmarshaller{ events.UploadReady{}, + events.ContainerCreated{}, + events.FileTouched{}, + events.FileDownloaded{}, + events.FileVersionRestored{}, + events.ItemMoved{}, + events.ItemTrashed{}, + events.ItemPurged{}, + events.ItemRestored{}, } // Server is the entrypoint for the server command. @@ -48,7 +60,9 @@ func Server(cfg *config.Config) *cli.Command { } return context.WithCancel(cfg.Context) }() + mtrcs := metrics.New() + mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1) defer cancel() @@ -57,15 +71,27 @@ func Server(cfg *config.Config) *cli.Command { return err } - var st store.Store - switch cfg.Store.Type { - case "inmemory": - st = store.NewMemoryStore() - default: - return fmt.Errorf("unknown store '%s' configured", cfg.Store.Type) + st := store.Create( + store.Type(cfg.Store.Type), + store.Addresses(strings.Split(cfg.Store.Addresses, ",")...), + store.Database(cfg.Store.Database), + store.Table(cfg.Store.Table), + ) + + tm, err := pool.StringToTLSMode(cfg.GRPCClientTLS.Mode) + if err != nil { + return err + } + gwclient, err := pool.GetGatewayServiceClient( + cfg.RevaGateway, + pool.WithTLSCACert(cfg.GRPCClientTLS.CACert), + pool.WithTLSMode(tm), + ) + if err != nil { + return fmt.Errorf("could not get reva client: %s", err) } - mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1) + hClient := ehsvc.NewEventHistoryService("com.owncloud.api.eventhistory", grpc.DefaultClient()) { server, err := http.Server( @@ -75,6 +101,8 @@ func Server(cfg *config.Config) *cli.Command { http.Metrics(mtrcs), http.Store(st), http.Consumer(consumer), + http.Gateway(gwclient), + http.History(hClient), http.RegisteredEvents(_registeredEvents), ) diff --git a/services/userlog/pkg/config/config.go b/services/userlog/pkg/config/config.go index 874c63fa7e8..acf98e1a27a 100644 --- a/services/userlog/pkg/config/config.go +++ b/services/userlog/pkg/config/config.go @@ -2,7 +2,6 @@ package config import ( "context" - "time" "github.com/owncloud/ocis/v2/ocis-pkg/shared" ) @@ -19,16 +18,21 @@ type Config struct { HTTP HTTP `yaml:"http"` GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"` - Events Events `yaml:"events"` - Store Store `yaml:"store"` + MachineAuthAPIKey string `yaml:"machine_auth_api_key" env:"OCIS_MACHINE_AUTH_API_KEY;USERLOG_MACHINE_AUTH_API_KEY" desc:"Machine auth API key used to validate internal requests necessary to access resources from other services."` + RevaGateway string `yaml:"reva_gateway" env:"REVA_GATEWAY" desc:"CS3 gateway used to look up user metadata"` + Events Events `yaml:"events"` + Store Store `yaml:"store"` Context context.Context `yaml:"-"` } // Store configures the store to use type Store struct { - Type string `yaml:"type" env:"USERLOG_STORE_TYPE" desc:"The type of the store. Supported is inmemory"` - RecordExpiry time.Duration `yaml:"record_expiry" env:"USERLOG_RECORD_EXPIRY" desc:"time to life for events in the store"` + Type string `yaml:"type" env:"USERLOG_STORE_TYPE" desc:"The type of the userlog store. Supported values are: 'mem', 'ocmem', 'etcd', 'redis', 'nats-js', 'noop'. See the text description for details."` + Addresses string `yaml:"addresses" env:"USERLOG_STORE_ADDRESSES" desc:"A comma separated list of addresses to access the configured store. This has no effect when 'in-memory' stores are configured. Note that the behaviour how addresses are used is dependent on the library of the configured store."` + Database string `yaml:"database" env:"USERLOG_STORE_DATABASE" desc:"(optional) The database name the configured store should use. This has no effect when 'in-memory' stores are configured."` + Table string `yaml:"table" env:"USERLOG_STORE_TABLE" desc:"(optional) The database table the store should use. This has no effect when 'in-memory' stores are configured."` + Size int `yaml:"size" env:"USERLOG_STORE_SIZE" desc:"The maximum quantity of items in the store. Only applies when store type 'ocmem' is configured. Defaults to 512."` } // Events combines the configuration options for the event bus. diff --git a/services/userlog/pkg/config/defaults/defaultconfig.go b/services/userlog/pkg/config/defaults/defaultconfig.go index faae9fdde33..012baeadbfd 100644 --- a/services/userlog/pkg/config/defaults/defaultconfig.go +++ b/services/userlog/pkg/config/defaults/defaultconfig.go @@ -29,6 +29,7 @@ func DefaultConfig() *config.Config { Store: config.Store{ Type: "inmemory", }, + RevaGateway: shared.DefaultRevaConfig().Address, HTTP: config.HTTP{ Addr: "127.0.0.1:0", Root: "/", @@ -57,6 +58,10 @@ func EnsureDefaults(cfg *config.Config) { cfg.Log = &config.Log{} } + if cfg.MachineAuthAPIKey == "" && cfg.Commons != nil && cfg.Commons.MachineAuthAPIKey != "" { + cfg.MachineAuthAPIKey = cfg.Commons.MachineAuthAPIKey + } + if cfg.GRPCClientTLS == nil { cfg.GRPCClientTLS = &shared.GRPCClientTLS{} if cfg.Commons != nil && cfg.Commons.GRPCClientTLS != nil { diff --git a/services/userlog/pkg/config/parser/parse.go b/services/userlog/pkg/config/parser/parse.go index 254d4667c3e..b3218cec9b2 100644 --- a/services/userlog/pkg/config/parser/parse.go +++ b/services/userlog/pkg/config/parser/parse.go @@ -4,6 +4,7 @@ import ( "errors" ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config" + "github.com/owncloud/ocis/v2/ocis-pkg/shared" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" "github.com/owncloud/ocis/v2/services/userlog/pkg/config/defaults" @@ -34,5 +35,9 @@ func ParseConfig(cfg *config.Config) error { // Validate validates the config func Validate(cfg *config.Config) error { + if cfg.MachineAuthAPIKey == "" { + return shared.MissingMachineAuthApiKeyError(cfg.Service.Name) + } + return nil } diff --git a/services/userlog/pkg/server/http/option.go b/services/userlog/pkg/server/http/option.go index 905621f5673..11700b5c145 100644 --- a/services/userlog/pkg/server/http/option.go +++ b/services/userlog/pkg/server/http/option.go @@ -3,8 +3,10 @@ package http import ( "context" + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" "github.com/cs3org/reva/v2/pkg/events" "github.com/owncloud/ocis/v2/ocis-pkg/log" + ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" "github.com/owncloud/ocis/v2/services/userlog/pkg/metrics" "github.com/urfave/cli/v2" @@ -24,6 +26,8 @@ type Options struct { Namespace string Store store.Store Consumer events.Consumer + GatewayClient gateway.GatewayAPIClient + HistoryClient ehsvc.EventHistoryService RegisteredEvents []events.Unmarshaller } @@ -94,6 +98,20 @@ func Consumer(consumer events.Consumer) Option { } } +// Gateway provides a function to configure the gateway client +func Gateway(gw gateway.GatewayAPIClient) Option { + return func(o *Options) { + o.GatewayClient = gw + } +} + +// History provides a function to configure the event history client +func History(h ehsvc.EventHistoryService) Option { + return func(o *Options) { + o.HistoryClient = h + } +} + // RegisteredEvents provides a function to register events func RegisteredEvents(evs []events.Unmarshaller) Option { return func(o *Options) { diff --git a/services/userlog/pkg/server/http/server.go b/services/userlog/pkg/server/http/server.go index 38b5db78a51..e603f5294a0 100644 --- a/services/userlog/pkg/server/http/server.go +++ b/services/userlog/pkg/server/http/server.go @@ -3,6 +3,11 @@ package http import ( "fmt" + stdhttp "net/http" + + "github.com/go-chi/chi/v5" + chimiddleware "github.com/go-chi/chi/v5/middleware" + "github.com/owncloud/ocis/v2/ocis-pkg/middleware" "github.com/owncloud/ocis/v2/ocis-pkg/service/http" "github.com/owncloud/ocis/v2/ocis-pkg/version" svc "github.com/owncloud/ocis/v2/services/userlog/pkg/service" @@ -34,29 +39,36 @@ func Server(opts ...Option) (http.Service, error) { return http.Service{}, fmt.Errorf("could not initialize http service: %w", err) } - //middlewares := []func(stdhttp.Handler) stdhttp.Handler{ - //middleware.TraceContext, - //chimiddleware.RequestID, - //middleware.Version( - //"userlog", - //version.GetString(), - //), - //middleware.Logger( - //options.Logger, - //), - //} + middlewares := []func(stdhttp.Handler) stdhttp.Handler{ + middleware.TraceContext, + chimiddleware.RequestID, + middleware.Version( + "userlog", + version.GetString(), + ), + middleware.Logger( + options.Logger, + ), + middleware.ExtractAccountUUID(), + } + + mux := chi.NewMux() + mux.Use(middlewares...) - handle, err := svc.NewUserlogService(options.Config, options.Consumer, options.Store, options.RegisteredEvents) + handle, err := svc.NewUserlogService( + svc.Logger(options.Logger), + svc.Consumer(options.Consumer), + svc.Mux(mux), + svc.Store(options.Store), + svc.Config(options.Config), + svc.HistoryClient(options.HistoryClient), + svc.GatewayClient(options.GatewayClient), + svc.RegisteredEvents(options.RegisteredEvents), + ) if err != nil { return http.Service{}, err } - { - //handle = svc.NewInstrument(handle, options.Metrics) - //handle = svc.NewLogging(handle, options.Logger) - //handle = svc.NewTracing(handle) - } - if err := micro.RegisterHandler(service.Server(), handle); err != nil { return http.Service{}, err } diff --git a/services/userlog/pkg/service/http.go b/services/userlog/pkg/service/http.go new file mode 100644 index 00000000000..30ff04b46e7 --- /dev/null +++ b/services/userlog/pkg/service/http.go @@ -0,0 +1,118 @@ +package service + +import ( + "encoding/json" + "fmt" + "net/http" + "time" + + revactx "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/utils" + ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0" +) + +// ServeHTTP fulfills Handler interface +func (ul *UserlogService) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ul.m.ServeHTTP(w, r) +} + +// HandleGetEvents is the GET handler for events +func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request) { + u, ok := revactx.ContextGetUser(r.Context()) + if !ok { + w.WriteHeader(http.StatusUnauthorized) + return + } + userID := u.GetId().GetOpaqueId() + + evs, err := ul.GetEvents(r.Context(), userID) + if err != nil { + return + } + + resp := GetEventResponseOC10{} + for _, e := range evs { + resp.OCS.Data = append(resp.OCS.Data, ul.convertEvent(e)) + } + + resp.OCS.Meta.StatusCode = http.StatusOK + b, _ := json.Marshal(resp) + w.Write(b) +} + +// HandleDeleteEvents is the DELETE handler for events +func (ul *UserlogService) HandleDeleteEvents(w http.ResponseWriter, r *http.Request) { + u, ok := revactx.ContextGetUser(r.Context()) + if !ok { + w.WriteHeader(http.StatusUnauthorized) + return + } + + var ids []string + if err := json.NewDecoder(r.Body).Decode(&ids); err != nil { + w.WriteHeader(http.StatusBadRequest) + return + } + + if err := ul.DeleteEvents(u.GetId().GetOpaqueId(), ids); err != nil { + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +func (ul *UserlogService) convertEvent(event *ehmsg.Event) OC10Notification { + etype, ok := ul.registeredEvents[event.Type] + if !ok { + // this should not happen + return OC10Notification{} + } + + einterface, err := etype.Unmarshal(event.Event) + if err != nil { + // this shouldn't happen either + return OC10Notification{} + } + + noti := OC10Notification{ + EventID: event.Id, + Service: "userlog", + Timestamp: time.Now().Format(time.RFC3339Nano), + } + + switch ev := einterface.(type) { + case events.UploadReady: + ctx, _, _ := utils.Impersonate(ev.SpaceOwner, ul.gwClient, ul.cfg.MachineAuthAPIKey) + space, _ := ul.getSpace(ctx, ev.FileRef.GetResourceId().GetSpaceId()) + noti.UserID = ev.ExecutingUser.GetId().GetOpaqueId() + noti.Subject = "File uploaded" + noti.Message = fmt.Sprintf("File %s was uploaded to space %s by user %s", ev.Filename, space.GetName(), ev.ExecutingUser.GetUsername()) + } + + return noti +} + +// OC10Notification is the oc10 style representation of an event +// some fields are left out for simplicity +type OC10Notification struct { + EventID string `json:"notification_id"` + Service string `json:"app"` + Timestamp string `json:"datetime"` + UserID string `json:"user"` + Subject string `json:"subject"` + Message string `json:"message"` +} + +// GetEventResponseOC10 is the response from GET events endpoint in oc10 style +type GetEventResponseOC10 struct { + OCS struct { + Meta struct { + Message string `json:"message"` + Status string `json:"status"` + StatusCode int `json:"statuscode"` + } `json:"meta"` + Data []OC10Notification `json:"data"` + } `json:"ocs"` +} diff --git a/services/userlog/pkg/service/options.go b/services/userlog/pkg/service/options.go new file mode 100644 index 00000000000..b84c67725e1 --- /dev/null +++ b/services/userlog/pkg/service/options.go @@ -0,0 +1,82 @@ +package service + +import ( + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/go-chi/chi/v5" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" + "github.com/owncloud/ocis/v2/services/userlog/pkg/config" + "go-micro.dev/v4/store" +) + +// Option for the userlog service +type Option func(*Options) + +// Options for the userlog service +type Options struct { + Logger log.Logger + Consumer events.Consumer + Mux *chi.Mux + Store store.Store + Config *config.Config + HistoryClient ehsvc.EventHistoryService + GatewayClient gateway.GatewayAPIClient + RegisteredEvents []events.Unmarshaller +} + +// Logger configures a logger for the userlog service +func Logger(log log.Logger) Option { + return func(o *Options) { + o.Logger = log + } +} + +// Consumer configures an event consumer for the userlog service +func Consumer(c events.Consumer) Option { + return func(o *Options) { + o.Consumer = c + } +} + +// Mux defines the muxer for the userlog service +func Mux(m *chi.Mux) Option { + return func(o *Options) { + o.Mux = m + } +} + +// Store defines the store for the userlog service +func Store(s store.Store) Option { + return func(o *Options) { + o.Store = s + } +} + +// Config adds the config for the userlog service +func Config(c *config.Config) Option { + return func(o *Options) { + o.Config = c + } +} + +// HistoryClient adds a grpc client for the eventhistory service +func HistoryClient(hc ehsvc.EventHistoryService) Option { + return func(o *Options) { + o.HistoryClient = hc + } +} + +// GatewayClient adds a grpc client for the gateway service +func GatewayClient(gwc gateway.GatewayAPIClient) Option { + return func(o *Options) { + o.GatewayClient = gwc + } +} + +// RegisteredEvents registers the events the service should listen to +func RegisteredEvents(e []events.Unmarshaller) Option { + return func(o *Options) { + o.RegisteredEvents = e + } +} diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index 06240d3f78d..587f48194ea 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -4,50 +4,72 @@ import ( "context" "encoding/json" "fmt" - "net/http" "reflect" + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/events" - "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" + "github.com/cs3org/reva/v2/pkg/utils" + "github.com/go-chi/chi/v5" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0" ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" "go-micro.dev/v4/store" ) -// Comment when you read this on review -var _adminid = "2502d8b8-a5e7-4ab3-b858-5aafae4a64a2" - // UserlogService is the service responsible for user activities type UserlogService struct { + log log.Logger ch <-chan events.Event + m *chi.Mux store store.Store cfg *config.Config historyClient ehsvc.EventHistoryService + gwClient gateway.GatewayAPIClient registeredEvents map[string]events.Unmarshaller } // NewUserlogService returns an EventHistory service -func NewUserlogService(cfg *config.Config, consumer events.Consumer, store store.Store, registeredEvents []events.Unmarshaller) (*UserlogService, error) { - if consumer == nil || store == nil { - return nil, fmt.Errorf("Need non nil consumer (%v) and store (%v) to work properly", consumer, store) +func NewUserlogService(opts ...Option) (*UserlogService, error) { + o := &Options{} + for _, opt := range opts { + opt(o) + } + + if o.Consumer == nil || o.Store == nil { + return nil, fmt.Errorf("Need non nil consumer (%v) and store (%v) to work properly", o.Consumer, o.Store) } - ch, err := events.Consume(consumer, "userlog", registeredEvents...) + ch, err := events.Consume(o.Consumer, "userlog", o.RegisteredEvents...) if err != nil { return nil, err } - grpcClient := grpc.DefaultClient() - grpcClient.Options() - c := ehsvc.NewEventHistoryService("com.owncloud.api.eventhistory", grpcClient) - - ul := &UserlogService{ch: ch, store: store, cfg: cfg, historyClient: c, registeredEvents: make(map[string]events.Unmarshaller)} + ul := &UserlogService{ + log: o.Logger, + ch: ch, + m: o.Mux, + store: o.Store, + cfg: o.Config, + historyClient: o.HistoryClient, + gwClient: o.GatewayClient, + registeredEvents: make(map[string]events.Unmarshaller), + } - for _, e := range registeredEvents { + for _, e := range o.RegisteredEvents { typ := reflect.TypeOf(e) ul.registeredEvents[typ.String()] = e } + ul.m.Route("/", func(r chi.Router) { + r.Get("/*", ul.HandleGetEvents) + r.Delete("/*", ul.HandleDeleteEvents) + }) + go ul.MemorizeEvents() return ul, nil @@ -56,18 +78,37 @@ func NewUserlogService(cfg *config.Config, consumer events.Consumer, store store // MemorizeEvents stores eventIDs a user wants to receive func (ul *UserlogService) MemorizeEvents() { for event := range ul.ch { - switch event.Event.(type) { + var ( + spaceID string // we need the spaceid to inform other space members + spaceOwner *user.UserId // we need a space owner to query space members + requiredRole permissionChecker // we need to check the user has the required role to see the event + ) + switch e := event.Event.(type) { + case events.UploadReady: + spaceID = e.FileRef.GetResourceId().GetSpaceId() + spaceOwner = e.SpaceOwner + requiredRole = viewer default: - // for each event type we need to: - - // I) find users eligible to receive the event + ul.log.Error().Interface("event", e).Msg("unhandled event") + continue + } - // II) filter users who want to receive the event + // for each event type we need to: + // I) find users eligible to receive the event + users, err := ul.findEligibleUsers(spaceOwner, spaceID, requiredRole) + if err != nil { + ul.log.Error().Err(err).Str("spaceID", spaceID).Msg("failed to find eligible users") + continue + } - // III) store the eventID for each user + // II) filter users who want to receive the event + // This step is postponed for later. + // For now each user should get all events she is eligible to receive - // TEMP TESTING CODE - if err := ul.addEventToUser(_adminid, event.ID); err != nil { + // III) store the eventID for each user + for _, id := range users { + if err := ul.addEventsToUser(id, event.ID); err != nil { + ul.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user") continue } } @@ -75,15 +116,15 @@ func (ul *UserlogService) MemorizeEvents() { } // GetEvents allows to retrieve events from the eventhistory by userid -func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]interface{}, error) { +func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]*ehmsg.Event, error) { rec, err := ul.store.Read(userid) - if err != nil { + if err != nil && err != store.ErrNotFound { return nil, err } if len(rec) == 0 { // no events available - return []interface{}{}, nil + return []*ehmsg.Event{}, nil } var eventIDs []string @@ -97,38 +138,40 @@ func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]inter return nil, err } - var events []interface{} - for _, e := range resp.Events { - ev, ok := ul.registeredEvents[e.Type] - if !ok { - // this should not happen but we handle it anyway - continue - } + return resp.Events, nil - event, err := ev.Unmarshal(e.Event) - if err != nil { - // this shouldn't happen either - continue - } +} - events = append(events, event) - } +// DeleteEvents will delete the specified events +func (ul *UserlogService) DeleteEvents(userid string, evids []string) error { + return ul.removeEventsFromUser(userid, evids...) +} - return events, nil +func (ul *UserlogService) addEventsToUser(userid string, eventids ...string) error { + return ul.alterUserEventList(userid, func(ids []string) []string { + return append(ids, eventids...) + }) } -func (ul *UserlogService) ServeHTTP(w http.ResponseWriter, r *http.Request) { - evs, err := ul.GetEvents(r.Context(), _adminid) - if err != nil { - return +func (ul *UserlogService) removeEventsFromUser(userid string, eventids ...string) error { + toDelete := make(map[string]struct{}) + for _, e := range eventids { + toDelete[e] = struct{}{} } + return ul.alterUserEventList(userid, func(ids []string) []string { + var newids []string + for _, id := range ids { + if _, delete := toDelete[id]; delete { + continue + } - // TODO: format response - b, _ := json.Marshal(evs) - w.Write(b) + newids = append(newids, id) + } + return newids + }) } -func (ul *UserlogService) addEventToUser(userid string, eventid string) error { +func (ul *UserlogService) alterUserEventList(userid string, alter func([]string) []string) error { recs, err := ul.store.Read(userid) if err != nil && err != store.ErrNotFound { return err @@ -141,7 +184,7 @@ func (ul *UserlogService) addEventToUser(userid string, eventid string) error { } } - ids = append(ids, eventid) + ids = alter(ids) b, err := json.Marshal(ids) if err != nil { @@ -152,4 +195,136 @@ func (ul *UserlogService) addEventToUser(userid string, eventid string) error { Key: userid, Value: b, }) + +} + +func (ul *UserlogService) findEligibleUsers(spaceOwner *user.UserId, spaceID string, pc permissionChecker) ([]string, error) { + ctx, _, err := utils.Impersonate(spaceOwner, ul.gwClient, ul.cfg.MachineAuthAPIKey) + if err != nil { + return nil, err + } + + space, err := ul.getSpace(ctx, spaceID) + if err != nil { + return nil, err + } + + var users []string + switch space.SpaceType { + case "personal": + users = []string{space.GetOwner().GetId().GetOpaqueId()} + case "project": + if users, err = ul.gatherSpaceMembers(ctx, space, pc); err != nil { + return nil, err + } + default: + // TODO: shares? other space types? + return nil, fmt.Errorf("unsupported space type: %s", space.SpaceType) + } + + return users, nil +} + +func (ul *UserlogService) getSpace(ctx context.Context, spaceID string) (*storageprovider.StorageSpace, error) { + res, err := ul.gwClient.ListStorageSpaces(ctx, listStorageSpaceRequest(spaceID)) + if err != nil { + return nil, err + } + + if res.GetStatus().GetCode() != rpc.Code_CODE_OK { + return nil, fmt.Errorf("Unexpected status code while getting space: %v", res.GetStatus().GetCode()) + } + + if len(res.StorageSpaces) == 0 { + return nil, fmt.Errorf("error getting storage space %s: no space returned", spaceID) + } + + return res.StorageSpaces[0], nil +} + +func (ul *UserlogService) gatherSpaceMembers(ctx context.Context, space *storageprovider.StorageSpace, hasRequiredRole permissionChecker) ([]string, error) { + var permissionsMap map[string]*storageprovider.ResourcePermissions + if err := json.Unmarshal(space.Opaque.Map["grants"].GetValue(), &permissionsMap); err != nil { + return nil, err + } + + groupsMap := make(map[string]struct{}) + if opaqueGroups, ok := space.Opaque.Map["groups"]; ok { + _ = json.Unmarshal(opaqueGroups.GetValue(), &groupsMap) + } + + // we use a map to avoid duplicates + usermap := make(map[string]struct{}) + for id, perm := range permissionsMap { + if !hasRequiredRole(perm) { + // not alllowed to receive event + continue + } + + if _, isGroup := groupsMap[id]; !isGroup { + usermap[id] = struct{}{} + continue + } + + usrs, err := ul.resolveGroup(ctx, id) + if err != nil { + ul.log.Error().Err(err).Str("groupID", id).Msg("failed to resolve group") + continue + } + + for _, u := range usrs { + usermap[u] = struct{}{} + } + } + + var users []string + for id := range usermap { + users = append(users, id) + } + + return users, nil +} + +// resolves the users of a group +func (ul *UserlogService) resolveGroup(ctx context.Context, groupID string) ([]string, error) { + r, err := ul.gwClient.GetGroup(ctx, &group.GetGroupRequest{GroupId: &group.GroupId{OpaqueId: groupID}}) + if err != nil { + return nil, err + } + + if r.GetStatus().GetCode() != rpc.Code_CODE_OK { + return nil, fmt.Errorf("unexpected status code from gateway client: %d", r.GetStatus().GetCode()) + } + + var userIDs []string + for _, m := range r.GetGroup().GetMembers() { + userIDs = append(userIDs, m.GetOpaqueId()) + } + + return userIDs, nil +} + +func listStorageSpaceRequest(spaceID string) *storageprovider.ListStorageSpacesRequest { + return &storageprovider.ListStorageSpacesRequest{ + Filters: []*storageprovider.ListStorageSpacesRequest_Filter{ + { + Type: storageprovider.ListStorageSpacesRequest_Filter_TYPE_ID, + Term: &storageprovider.ListStorageSpacesRequest_Filter_Id{ + Id: &storageprovider.StorageSpaceId{ + OpaqueId: spaceID, + }, + }, + }, + }, + } +} + +type permissionChecker func(*storageprovider.ResourcePermissions) bool + +func viewer(perms *storageprovider.ResourcePermissions) bool { + return perms.Stat +} + +func editor(perms *storageprovider.ResourcePermissions) bool { + return perms.InitiateFileUpload }