From d56565555b828cdc4ab9f9a3b6a913334943524a Mon Sep 17 00:00:00 2001 From: jkoberg Date: Tue, 7 Feb 2023 14:57:33 +0100 Subject: [PATCH 1/3] introduce userlog service Signed-off-by: jkoberg --- Makefile | 1 + ocis-pkg/config/config.go | 2 + ocis-pkg/config/defaultconfig.go | 2 + ocis/pkg/runtime/service/service.go | 2 + .../pkg/config/defaults/defaultconfig.go | 4 + services/userlog/Makefile | 37 +++++ services/userlog/README.md | 11 ++ services/userlog/cmd/userlog/main.go | 14 ++ services/userlog/pkg/command/health.go | 18 ++ services/userlog/pkg/command/root.go | 59 +++++++ services/userlog/pkg/command/server.go | 101 ++++++++++++ services/userlog/pkg/command/version.go | 19 +++ services/userlog/pkg/config/config.go | 58 +++++++ services/userlog/pkg/config/debug.go | 9 + .../pkg/config/defaults/defaultconfig.go | 78 +++++++++ services/userlog/pkg/config/log.go | 9 + services/userlog/pkg/config/parser/parse.go | 38 +++++ services/userlog/pkg/config/service.go | 6 + services/userlog/pkg/logging/logging.go | 17 ++ services/userlog/pkg/metrics/metrics.go | 35 ++++ services/userlog/pkg/server/http/option.go | 102 ++++++++++++ services/userlog/pkg/server/http/server.go | 65 ++++++++ services/userlog/pkg/service/service.go | 155 ++++++++++++++++++ services/userlog/pkg/service/service_test.go | 3 + 24 files changed, 845 insertions(+) create mode 100644 services/userlog/Makefile create mode 100644 services/userlog/README.md create mode 100644 services/userlog/cmd/userlog/main.go create mode 100644 services/userlog/pkg/command/health.go create mode 100644 services/userlog/pkg/command/root.go create mode 100644 services/userlog/pkg/command/server.go create mode 100644 services/userlog/pkg/command/version.go create mode 100644 services/userlog/pkg/config/config.go create mode 100644 services/userlog/pkg/config/debug.go create mode 100644 services/userlog/pkg/config/defaults/defaultconfig.go create mode 100644 services/userlog/pkg/config/log.go create mode 100644 services/userlog/pkg/config/parser/parse.go create mode 100644 services/userlog/pkg/config/service.go create mode 100644 services/userlog/pkg/logging/logging.go create mode 100644 services/userlog/pkg/metrics/metrics.go create mode 100644 services/userlog/pkg/server/http/option.go create mode 100644 services/userlog/pkg/server/http/server.go create mode 100644 services/userlog/pkg/service/service.go create mode 100644 services/userlog/pkg/service/service_test.go diff --git a/Makefile b/Makefile index b32ec49a085..7e5a0a2205e 100644 --- a/Makefile +++ b/Makefile @@ -44,6 +44,7 @@ OCIS_MODULES = \ services/storage-users \ services/store \ services/thumbnails \ + services/userlog \ services/users \ services/web \ services/webdav\ diff --git a/ocis-pkg/config/config.go b/ocis-pkg/config/config.go index 76ba265f073..a9ab12f6c80 100644 --- a/ocis-pkg/config/config.go +++ b/ocis-pkg/config/config.go @@ -31,6 +31,7 @@ import ( storageusers "github.com/owncloud/ocis/v2/services/storage-users/pkg/config" store "github.com/owncloud/ocis/v2/services/store/pkg/config" thumbnails "github.com/owncloud/ocis/v2/services/thumbnails/pkg/config" + userlog "github.com/owncloud/ocis/v2/services/userlog/pkg/config" users "github.com/owncloud/ocis/v2/services/users/pkg/config" web "github.com/owncloud/ocis/v2/services/web/pkg/config" webdav "github.com/owncloud/ocis/v2/services/webdav/pkg/config" @@ -106,6 +107,7 @@ type Config struct { StorageUsers *storageusers.Config `yaml:"storage_users"` Store *store.Config `yaml:"store"` Thumbnails *thumbnails.Config `yaml:"thumbnails"` + Userlog *userlog.Config `yaml:"userlog"` Users *users.Config `yaml:"users"` Web *web.Config `yaml:"web"` WebDAV *webdav.Config `yaml:"webdav"` diff --git a/ocis-pkg/config/defaultconfig.go b/ocis-pkg/config/defaultconfig.go index 67d056eaf59..434c42e139c 100644 --- a/ocis-pkg/config/defaultconfig.go +++ b/ocis-pkg/config/defaultconfig.go @@ -29,6 +29,7 @@ import ( storageusers "github.com/owncloud/ocis/v2/services/storage-users/pkg/config/defaults" store "github.com/owncloud/ocis/v2/services/store/pkg/config/defaults" thumbnails "github.com/owncloud/ocis/v2/services/thumbnails/pkg/config/defaults" + userlog "github.com/owncloud/ocis/v2/services/userlog/pkg/config/defaults" users "github.com/owncloud/ocis/v2/services/users/pkg/config/defaults" web "github.com/owncloud/ocis/v2/services/web/pkg/config/defaults" webdav "github.com/owncloud/ocis/v2/services/webdav/pkg/config/defaults" @@ -71,6 +72,7 @@ func DefaultConfig() *Config { StorageUsers: storageusers.DefaultConfig(), Store: store.DefaultConfig(), Thumbnails: thumbnails.DefaultConfig(), + Userlog: userlog.DefaultConfig(), Users: users.DefaultConfig(), Web: web.DefaultConfig(), WebDAV: webdav.DefaultConfig(), diff --git a/ocis/pkg/runtime/service/service.go b/ocis/pkg/runtime/service/service.go index 517d6418d0b..fe7060c05b5 100644 --- a/ocis/pkg/runtime/service/service.go +++ b/ocis/pkg/runtime/service/service.go @@ -46,6 +46,7 @@ import ( storageusers "github.com/owncloud/ocis/v2/services/storage-users/pkg/command" store "github.com/owncloud/ocis/v2/services/store/pkg/command" thumbnails "github.com/owncloud/ocis/v2/services/thumbnails/pkg/command" + userlog "github.com/owncloud/ocis/v2/services/userlog/pkg/command" users "github.com/owncloud/ocis/v2/services/users/pkg/command" web "github.com/owncloud/ocis/v2/services/web/pkg/command" webdav "github.com/owncloud/ocis/v2/services/webdav/pkg/command" @@ -131,6 +132,7 @@ func NewService(options ...Option) (*Service, error) { s.ServicesRegistry[opts.Config.Search.Service.Name] = search.NewSutureService s.ServicesRegistry[opts.Config.Postprocessing.Service.Name] = postprocessing.NewSutureService s.ServicesRegistry[opts.Config.EventHistory.Service.Name] = eventhistory.NewSutureService + s.ServicesRegistry[opts.Config.Userlog.Service.Name] = userlog.NewSutureService // populate delayed services s.Delayed[opts.Config.Sharing.Service.Name] = sharing.NewSutureService diff --git a/services/proxy/pkg/config/defaults/defaultconfig.go b/services/proxy/pkg/config/defaults/defaultconfig.go index af5a712049e..140a57d8705 100644 --- a/services/proxy/pkg/config/defaults/defaultconfig.go +++ b/services/proxy/pkg/config/defaults/defaultconfig.go @@ -202,6 +202,10 @@ func DefaultPolicies() []config.Policy { Endpoint: "/api/v0/settings", Service: "com.owncloud.web.settings", }, + { + Endpoint: "/api/v0/activities", + Service: "com.owncloud.userlog.userlog", + }, }, }, } diff --git a/services/userlog/Makefile b/services/userlog/Makefile new file mode 100644 index 00000000000..b1f19d550b9 --- /dev/null +++ b/services/userlog/Makefile @@ -0,0 +1,37 @@ +SHELL := bash +NAME := userlog + +include ../../.make/recursion.mk + +############ tooling ############ +ifneq (, $(shell command -v go 2> /dev/null)) # suppress `command not found warnings` for non go targets in CI +include ../../.bingo/Variables.mk +endif + +############ go tooling ############ +include ../../.make/go.mk + +############ release ############ +include ../../.make/release.mk + +############ docs generate ############ +include ../../.make/docs.mk + +.PHONY: docs-generate +docs-generate: config-docs-generate + +############ generate ############ +include ../../.make/generate.mk + +.PHONY: ci-go-generate +ci-go-generate: # CI runs ci-node-generate automatically before this target + +.PHONY: ci-node-generate +ci-node-generate: + +############ licenses ############ +.PHONY: ci-node-check-licenses +ci-node-check-licenses: + +.PHONY: ci-node-save-licenses +ci-node-save-licenses: diff --git a/services/userlog/README.md b/services/userlog/README.md new file mode 100644 index 00000000000..612af80c3e4 --- /dev/null +++ b/services/userlog/README.md @@ -0,0 +1,11 @@ +# Userlog service + +The `userlog` service provides a way to configure which events a user wants to be informed about and an API to retrieve them. + +## Configuring + +The `userlog` service has hardcoded configuration for now. + +## Retrieving + +The `userlog` service provides an API to retrieve configured events. diff --git a/services/userlog/cmd/userlog/main.go b/services/userlog/cmd/userlog/main.go new file mode 100644 index 00000000000..efdb7ae4b8d --- /dev/null +++ b/services/userlog/cmd/userlog/main.go @@ -0,0 +1,14 @@ +package main + +import ( + "os" + + "github.com/owncloud/ocis/v2/services/userlog/pkg/command" + "github.com/owncloud/ocis/v2/services/userlog/pkg/config/defaults" +) + +func main() { + if err := command.Execute(defaults.DefaultConfig()); err != nil { + os.Exit(1) + } +} diff --git a/services/userlog/pkg/command/health.go b/services/userlog/pkg/command/health.go new file mode 100644 index 00000000000..6ef36534645 --- /dev/null +++ b/services/userlog/pkg/command/health.go @@ -0,0 +1,18 @@ +package command + +import ( + "github.com/owncloud/ocis/v2/services/userlog/pkg/config" + "github.com/urfave/cli/v2" +) + +// Health is the entrypoint for the health command. +func Health(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "health", + Usage: "Check health status", + Action: func(c *cli.Context) error { + // Not implemented + return nil + }, + } +} diff --git a/services/userlog/pkg/command/root.go b/services/userlog/pkg/command/root.go new file mode 100644 index 00000000000..c9fb4b39be3 --- /dev/null +++ b/services/userlog/pkg/command/root.go @@ -0,0 +1,59 @@ +package command + +import ( + "context" + "os" + + "github.com/owncloud/ocis/v2/ocis-pkg/clihelper" + ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config" + "github.com/owncloud/ocis/v2/services/userlog/pkg/config" + "github.com/thejerf/suture/v4" + "github.com/urfave/cli/v2" +) + +// GetCommands provides all commands for this service +func GetCommands(cfg *config.Config) cli.Commands { + return []*cli.Command{ + // start this service + Server(cfg), + + // interaction with this service + + // infos about this service + Health(cfg), + Version(cfg), + } +} + +// Execute is the entry point for the userlog command. +func Execute(cfg *config.Config) error { + app := clihelper.DefaultApp(&cli.App{ + Name: "userlog", + Usage: "starts userlog service", + Commands: GetCommands(cfg), + }) + + return app.Run(os.Args) +} + +// SutureService allows for the userlog command to be embedded and supervised by a suture supervisor tree. +type SutureService struct { + cfg *config.Config +} + +// NewSutureService creates a new userlog.SutureService +func NewSutureService(cfg *ociscfg.Config) suture.Service { + cfg.Notifications.Commons = cfg.Commons + return SutureService{ + cfg: cfg.Userlog, + } +} + +func (s SutureService) Serve(ctx context.Context) error { + s.cfg.Context = ctx + if err := Execute(s.cfg); err != nil { + return err + } + + return nil +} diff --git a/services/userlog/pkg/command/server.go b/services/userlog/pkg/command/server.go new file mode 100644 index 00000000000..076eed54e41 --- /dev/null +++ b/services/userlog/pkg/command/server.go @@ -0,0 +1,101 @@ +package command + +import ( + "context" + "fmt" + + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/events/stream" + "github.com/oklog/run" + "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" + "github.com/owncloud/ocis/v2/ocis-pkg/version" + "github.com/owncloud/ocis/v2/services/userlog/pkg/config" + "github.com/owncloud/ocis/v2/services/userlog/pkg/config/parser" + "github.com/owncloud/ocis/v2/services/userlog/pkg/logging" + "github.com/owncloud/ocis/v2/services/userlog/pkg/metrics" + "github.com/owncloud/ocis/v2/services/userlog/pkg/server/http" + "github.com/urfave/cli/v2" + "go-micro.dev/v4/store" +) + +// all events we care about +var _registeredEvents = []events.Unmarshaller{ + events.UploadReady{}, +} + +// Server is the entrypoint for the server command. +func Server(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "server", + Usage: fmt.Sprintf("start the %s service without runtime (unsupervised mode)", cfg.Service.Name), + Category: "server", + Before: func(c *cli.Context) error { + return configlog.ReturnFatal(parser.ParseConfig(cfg)) + }, + Action: func(c *cli.Context) error { + logger := logging.Configure(cfg.Service.Name, cfg.Log) + + err := ogrpc.Configure(ogrpc.GetClientOptions(cfg.GRPCClientTLS)...) + if err != nil { + return err + } + + gr := run.Group{} + ctx, cancel := func() (context.Context, context.CancelFunc) { + if cfg.Context == nil { + return context.WithCancel(context.Background()) + } + return context.WithCancel(cfg.Context) + }() + mtrcs := metrics.New() + + defer cancel() + + consumer, err := stream.NatsFromConfig(stream.NatsConfig(cfg.Events)) + if err != nil { + return err + } + + var st store.Store + switch cfg.Store.Type { + case "inmemory": + st = store.NewMemoryStore() + default: + return fmt.Errorf("unknown store '%s' configured", cfg.Store.Type) + } + + mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1) + + { + server, err := http.Server( + http.Logger(logger), + http.Context(ctx), + http.Config(cfg), + http.Metrics(mtrcs), + http.Store(st), + http.Consumer(consumer), + http.RegisteredEvents(_registeredEvents), + ) + + if err != nil { + logger.Info().Err(err).Str("transport", "http").Msg("Failed to initialize server") + return err + } + + gr.Add(func() error { + return server.Run() + }, func(err error) { + logger.Error(). + Str("transport", "http"). + Err(err). + Msg("Shutting down server") + + cancel() + }) + } + + return gr.Run() + }, + } +} diff --git a/services/userlog/pkg/command/version.go b/services/userlog/pkg/command/version.go new file mode 100644 index 00000000000..fbee1e06e27 --- /dev/null +++ b/services/userlog/pkg/command/version.go @@ -0,0 +1,19 @@ +package command + +import ( + "github.com/owncloud/ocis/v2/services/userlog/pkg/config" + "github.com/urfave/cli/v2" +) + +// Version prints the service versions of all running instances. +func Version(cfg *config.Config) *cli.Command { + return &cli.Command{ + Name: "version", + Usage: "print the version of this binary and the running service instances", + Category: "info", + Action: func(c *cli.Context) error { + // not implemented + return nil + }, + } +} diff --git a/services/userlog/pkg/config/config.go b/services/userlog/pkg/config/config.go new file mode 100644 index 00000000000..874c63fa7e8 --- /dev/null +++ b/services/userlog/pkg/config/config.go @@ -0,0 +1,58 @@ +package config + +import ( + "context" + "time" + + "github.com/owncloud/ocis/v2/ocis-pkg/shared" +) + +// Config combines all available configuration parts. +type Config struct { + Commons *shared.Commons `yaml:"-"` // don't use this directly as configuration for a service + + Service Service `yaml:"-"` + + Log *Log `yaml:"log"` + Debug Debug `yaml:"debug"` + + HTTP HTTP `yaml:"http"` + GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"` + + Events Events `yaml:"events"` + Store Store `yaml:"store"` + + Context context.Context `yaml:"-"` +} + +// Store configures the store to use +type Store struct { + Type string `yaml:"type" env:"USERLOG_STORE_TYPE" desc:"The type of the store. Supported is inmemory"` + RecordExpiry time.Duration `yaml:"record_expiry" env:"USERLOG_RECORD_EXPIRY" desc:"time to life for events in the store"` +} + +// Events combines the configuration options for the event bus. +type Events struct { + Endpoint string `yaml:"endpoint" env:"USERLOG_EVENTS_ENDPOINT" desc:"The address of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture."` + Cluster string `yaml:"cluster" env:"USERLOG_EVENTS_CLUSTER" desc:"The clusterID of the event system. The event system is the message queuing service. It is used as message broker for the microservice architecture. Mandatory when using NATS as event system."` + TLSInsecure bool `yaml:"tls_insecure" env:"OCIS_INSECURE;USERLOG_EVENTS_TLS_INSECURE" desc:"Whether to verify the server TLS certificates."` + TLSRootCACertificate string `yaml:"tls_root_ca_certificate" env:"USERLOG_EVENTS_TLS_ROOT_CA_CERTIFICATE" desc:"The root CA certificate used to validate the server's TLS certificate. If provided NOTIFICATIONS_EVENTS_TLS_INSECURE will be seen as false."` + EnableTLS bool `yaml:"enable_tls" env:"OCIS_EVENTS_ENABLE_TLS;USERLOG_EVENTS_ENABLE_TLS" desc:"Enable TLS for the connection to the events broker. The events broker is the ocis service which receives and delivers events between the services.."` +} + +// CORS defines the available cors configuration. +type CORS struct { + AllowedOrigins []string `yaml:"allow_origins" env:"OCIS_CORS_ALLOW_ORIGINS;USERLOG_CORS_ALLOW_ORIGINS" desc:"A comma-separated list of allowed CORS origins. See following chapter for more details: *Access-Control-Allow-Origin* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Origin"` + AllowedMethods []string `yaml:"allow_methods" env:"OCIS_CORS_ALLOW_METHODS;USERLOG_CORS_ALLOW_METHODS" desc:"A comma-separated list of allowed CORS methods. See following chapter for more details: *Access-Control-Request-Method* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Request-Method"` + AllowedHeaders []string `yaml:"allow_headers" env:"OCIS_CORS_ALLOW_HEADERS;USERLOG_CORS_ALLOW_HEADERS" desc:"A comma-separated list of allowed CORS headers. See following chapter for more details: *Access-Control-Request-Headers* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Request-Headers."` + AllowCredentials bool `yaml:"allow_credentials" env:"OCIS_CORS_ALLOW_CREDENTIALS;USERLOG_CORS_ALLOW_CREDENTIALS" desc:"Allow credentials for CORS.See following chapter for more details: *Access-Control-Allow-Credentials* at https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Access-Control-Allow-Credentials."` +} + +// HTTP defines the available http configuration. +type HTTP struct { + Addr string `yaml:"addr" env:"USERLOG_HTTP_ADDR" desc:"The bind address of the HTTP service."` + Namespace string `yaml:"-"` + Root string `yaml:"root" env:"USERLOG_HTTP_ROOT" desc:"Subdirectory that serves as the root for this HTTP service."` + CORS CORS `yaml:"cors"` + TLS shared.HTTPServiceTLS `yaml:"tls"` +} diff --git a/services/userlog/pkg/config/debug.go b/services/userlog/pkg/config/debug.go new file mode 100644 index 00000000000..d29ac49163a --- /dev/null +++ b/services/userlog/pkg/config/debug.go @@ -0,0 +1,9 @@ +package config + +// Debug defines the available debug configuration. +type Debug struct { + Addr string `yaml:"addr" env:"USERLOG_DEBUG_ADDR" desc:"Bind address of the debug server, where metrics, health, config and debug endpoints will be exposed."` + Token string `yaml:"token" env:"USERLOG_DEBUG_TOKEN" desc:"Token to secure the metrics endpoint."` + Pprof bool `yaml:"pprof" env:"USERLOG_DEBUG_PPROF" desc:"Enables pprof, which can be used for profiling."` + Zpages bool `yaml:"zpages" env:"USERLOG_DEBUG_ZPAGES" desc:"Enables zpages, which can be used for collecting and viewing in-memory traces."` +} diff --git a/services/userlog/pkg/config/defaults/defaultconfig.go b/services/userlog/pkg/config/defaults/defaultconfig.go new file mode 100644 index 00000000000..faae9fdde33 --- /dev/null +++ b/services/userlog/pkg/config/defaults/defaultconfig.go @@ -0,0 +1,78 @@ +package defaults + +import ( + "strings" + + "github.com/owncloud/ocis/v2/ocis-pkg/shared" + "github.com/owncloud/ocis/v2/services/userlog/pkg/config" +) + +// FullDefaultConfig returns the full default config +func FullDefaultConfig() *config.Config { + cfg := DefaultConfig() + EnsureDefaults(cfg) + Sanitize(cfg) + return cfg +} + +// DefaultConfig return the default configuration +func DefaultConfig() *config.Config { + return &config.Config{ + Service: config.Service{ + Name: "userlog", + }, + Events: config.Events{ + Endpoint: "127.0.0.1:9233", + Cluster: "ocis-cluster", + EnableTLS: false, + }, + Store: config.Store{ + Type: "inmemory", + }, + HTTP: config.HTTP{ + Addr: "127.0.0.1:0", + Root: "/", + Namespace: "com.owncloud.userlog", + CORS: config.CORS{ + AllowedOrigins: []string{"*"}, + AllowedMethods: []string{"GET"}, + AllowedHeaders: []string{"Authorization", "Origin", "Content-Type", "Accept", "X-Requested-With"}, + AllowCredentials: true, + }, + }, + } +} + +// EnsureDefaults ensures the config contains default values +func EnsureDefaults(cfg *config.Config) { + // provide with defaults for shared logging, since we need a valid destination address for "envdecode". + if cfg.Log == nil && cfg.Commons != nil && cfg.Commons.Log != nil { + cfg.Log = &config.Log{ + Level: cfg.Commons.Log.Level, + Pretty: cfg.Commons.Log.Pretty, + Color: cfg.Commons.Log.Color, + File: cfg.Commons.Log.File, + } + } else if cfg.Log == nil { + cfg.Log = &config.Log{} + } + + if cfg.GRPCClientTLS == nil { + cfg.GRPCClientTLS = &shared.GRPCClientTLS{} + if cfg.Commons != nil && cfg.Commons.GRPCClientTLS != nil { + cfg.GRPCClientTLS = cfg.Commons.GRPCClientTLS + } + } + + if cfg.Commons != nil { + cfg.HTTP.TLS = cfg.Commons.HTTPServiceTLS + } +} + +// Sanitize sanitizes the config +func Sanitize(cfg *config.Config) { + // sanitize config + if cfg.HTTP.Root != "/" { + cfg.HTTP.Root = strings.TrimSuffix(cfg.HTTP.Root, "/") + } +} diff --git a/services/userlog/pkg/config/log.go b/services/userlog/pkg/config/log.go new file mode 100644 index 00000000000..9e9098aa5b4 --- /dev/null +++ b/services/userlog/pkg/config/log.go @@ -0,0 +1,9 @@ +package config + +// Log defines the available log configuration. +type Log struct { + Level string `mapstructure:"level" env:"OCIS_LOG_LEVEL;USERLOG_LOG_LEVEL" desc:"The log level. Valid values are: \"panic\", \"fatal\", \"error\", \"warn\", \"info\", \"debug\", \"trace\"."` + Pretty bool `mapstructure:"pretty" env:"OCIS_LOG_PRETTY;USERLOG_LOG_PRETTY" desc:"Activates pretty log output."` + Color bool `mapstructure:"color" env:"OCIS_LOG_COLOR;USERLOG_LOG_COLOR" desc:"Activates colorized log output."` + File string `mapstructure:"file" env:"OCIS_LOG_FILE;USERLOG_LOG_FILE" desc:"The path to the log file. Activates logging to this file if set."` +} diff --git a/services/userlog/pkg/config/parser/parse.go b/services/userlog/pkg/config/parser/parse.go new file mode 100644 index 00000000000..254d4667c3e --- /dev/null +++ b/services/userlog/pkg/config/parser/parse.go @@ -0,0 +1,38 @@ +package parser + +import ( + "errors" + + ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config" + "github.com/owncloud/ocis/v2/services/userlog/pkg/config" + "github.com/owncloud/ocis/v2/services/userlog/pkg/config/defaults" + + "github.com/owncloud/ocis/v2/ocis-pkg/config/envdecode" +) + +// ParseConfig loads configuration from known paths. +func ParseConfig(cfg *config.Config) error { + _, err := ociscfg.BindSourcesToStructs(cfg.Service.Name, cfg) + if err != nil { + return err + } + + defaults.EnsureDefaults(cfg) + + // load all env variables relevant to the config in the current context. + if err := envdecode.Decode(cfg); err != nil { + // no environment variable set for this config is an expected "error" + if !errors.Is(err, envdecode.ErrNoTargetFieldsAreSet) { + return err + } + } + + defaults.Sanitize(cfg) + + return Validate(cfg) +} + +// Validate validates the config +func Validate(cfg *config.Config) error { + return nil +} diff --git a/services/userlog/pkg/config/service.go b/services/userlog/pkg/config/service.go new file mode 100644 index 00000000000..d1eac383f0b --- /dev/null +++ b/services/userlog/pkg/config/service.go @@ -0,0 +1,6 @@ +package config + +// Service defines the available service configuration. +type Service struct { + Name string `yaml:"-"` +} diff --git a/services/userlog/pkg/logging/logging.go b/services/userlog/pkg/logging/logging.go new file mode 100644 index 00000000000..691170093df --- /dev/null +++ b/services/userlog/pkg/logging/logging.go @@ -0,0 +1,17 @@ +package logging + +import ( + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/services/userlog/pkg/config" +) + +// LoggerFromConfig initializes a service-specific logger instance. +func Configure(name string, cfg *config.Log) log.Logger { + return log.NewLogger( + log.Name(name), + log.Level(cfg.Level), + log.Pretty(cfg.Pretty), + log.Color(cfg.Color), + log.File(cfg.File), + ) +} diff --git a/services/userlog/pkg/metrics/metrics.go b/services/userlog/pkg/metrics/metrics.go new file mode 100644 index 00000000000..1c34c6f9ff3 --- /dev/null +++ b/services/userlog/pkg/metrics/metrics.go @@ -0,0 +1,35 @@ +package metrics + +import "github.com/prometheus/client_golang/prometheus" + +var ( + // Namespace defines the namespace for the defines metrics. + Namespace = "ocis" + + // Subsystem defines the subsystem for the defines metrics. + Subsystem = "userlog" +) + +// Metrics defines the available metrics of this service. +type Metrics struct { + BuildInfo *prometheus.GaugeVec +} + +// New initializes the available metrics. +func New() *Metrics { + m := &Metrics{ + BuildInfo: prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Namespace: Namespace, + Subsystem: Subsystem, + Name: "build_info", + Help: "Build information", + }, []string{"version"}), + } + + _ = prometheus.Register( + m.BuildInfo, + ) + + // TODO: implement metrics + return m +} diff --git a/services/userlog/pkg/server/http/option.go b/services/userlog/pkg/server/http/option.go new file mode 100644 index 00000000000..905621f5673 --- /dev/null +++ b/services/userlog/pkg/server/http/option.go @@ -0,0 +1,102 @@ +package http + +import ( + "context" + + "github.com/cs3org/reva/v2/pkg/events" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/services/userlog/pkg/config" + "github.com/owncloud/ocis/v2/services/userlog/pkg/metrics" + "github.com/urfave/cli/v2" + "go-micro.dev/v4/store" +) + +// Option defines a single option function. +type Option func(o *Options) + +// Options defines the available options for this package. +type Options struct { + Logger log.Logger + Context context.Context + Config *config.Config + Metrics *metrics.Metrics + Flags []cli.Flag + Namespace string + Store store.Store + Consumer events.Consumer + RegisteredEvents []events.Unmarshaller +} + +// newOptions initializes the available default options. +func newOptions(opts ...Option) Options { + opt := Options{} + + for _, o := range opts { + o(&opt) + } + + return opt +} + +// Logger provides a function to set the logger option. +func Logger(val log.Logger) Option { + return func(o *Options) { + o.Logger = val + } +} + +// Context provides a function to set the context option. +func Context(val context.Context) Option { + return func(o *Options) { + o.Context = val + } +} + +// Config provides a function to set the config option. +func Config(val *config.Config) Option { + return func(o *Options) { + o.Config = val + } +} + +// Metrics provides a function to set the metrics option. +func Metrics(val *metrics.Metrics) Option { + return func(o *Options) { + o.Metrics = val + } +} + +// Flags provides a function to set the flags option. +func Flags(val []cli.Flag) Option { + return func(o *Options) { + o.Flags = append(o.Flags, val...) + } +} + +// Namespace provides a function to set the Namespace option. +func Namespace(val string) Option { + return func(o *Options) { + o.Namespace = val + } +} + +// Store provides a function to configure the store +func Store(store store.Store) Option { + return func(o *Options) { + o.Store = store + } +} + +// Consumer provides a function to configure the consumer +func Consumer(consumer events.Consumer) Option { + return func(o *Options) { + o.Consumer = consumer + } +} + +// RegisteredEvents provides a function to register events +func RegisteredEvents(evs []events.Unmarshaller) Option { + return func(o *Options) { + o.RegisteredEvents = evs + } +} diff --git a/services/userlog/pkg/server/http/server.go b/services/userlog/pkg/server/http/server.go new file mode 100644 index 00000000000..38b5db78a51 --- /dev/null +++ b/services/userlog/pkg/server/http/server.go @@ -0,0 +1,65 @@ +package http + +import ( + "fmt" + + "github.com/owncloud/ocis/v2/ocis-pkg/service/http" + "github.com/owncloud/ocis/v2/ocis-pkg/version" + svc "github.com/owncloud/ocis/v2/services/userlog/pkg/service" + "go-micro.dev/v4" +) + +// Service is the service interface +type Service interface { +} + +// Server initializes the http service and server. +func Server(opts ...Option) (http.Service, error) { + options := newOptions(opts...) + + service, err := http.NewService( + http.TLSConfig(options.Config.HTTP.TLS), + http.Logger(options.Logger), + http.Namespace(options.Config.HTTP.Namespace), + http.Name("userlog"), + http.Version(version.GetString()), + http.Address(options.Config.HTTP.Addr), + http.Context(options.Context), + http.Flags(options.Flags...), + ) + if err != nil { + options.Logger.Error(). + Err(err). + Msg("Error initializing http service") + return http.Service{}, fmt.Errorf("could not initialize http service: %w", err) + } + + //middlewares := []func(stdhttp.Handler) stdhttp.Handler{ + //middleware.TraceContext, + //chimiddleware.RequestID, + //middleware.Version( + //"userlog", + //version.GetString(), + //), + //middleware.Logger( + //options.Logger, + //), + //} + + handle, err := svc.NewUserlogService(options.Config, options.Consumer, options.Store, options.RegisteredEvents) + if err != nil { + return http.Service{}, err + } + + { + //handle = svc.NewInstrument(handle, options.Metrics) + //handle = svc.NewLogging(handle, options.Logger) + //handle = svc.NewTracing(handle) + } + + if err := micro.RegisterHandler(service.Server(), handle); err != nil { + return http.Service{}, err + } + + return service, nil +} diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go new file mode 100644 index 00000000000..06240d3f78d --- /dev/null +++ b/services/userlog/pkg/service/service.go @@ -0,0 +1,155 @@ +package service + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "reflect" + + "github.com/cs3org/reva/v2/pkg/events" + "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" + ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" + "github.com/owncloud/ocis/v2/services/userlog/pkg/config" + "go-micro.dev/v4/store" +) + +// Comment when you read this on review +var _adminid = "2502d8b8-a5e7-4ab3-b858-5aafae4a64a2" + +// UserlogService is the service responsible for user activities +type UserlogService struct { + ch <-chan events.Event + store store.Store + cfg *config.Config + historyClient ehsvc.EventHistoryService + registeredEvents map[string]events.Unmarshaller +} + +// NewUserlogService returns an EventHistory service +func NewUserlogService(cfg *config.Config, consumer events.Consumer, store store.Store, registeredEvents []events.Unmarshaller) (*UserlogService, error) { + if consumer == nil || store == nil { + return nil, fmt.Errorf("Need non nil consumer (%v) and store (%v) to work properly", consumer, store) + } + + ch, err := events.Consume(consumer, "userlog", registeredEvents...) + if err != nil { + return nil, err + } + + grpcClient := grpc.DefaultClient() + grpcClient.Options() + c := ehsvc.NewEventHistoryService("com.owncloud.api.eventhistory", grpcClient) + + ul := &UserlogService{ch: ch, store: store, cfg: cfg, historyClient: c, registeredEvents: make(map[string]events.Unmarshaller)} + + for _, e := range registeredEvents { + typ := reflect.TypeOf(e) + ul.registeredEvents[typ.String()] = e + } + + go ul.MemorizeEvents() + + return ul, nil +} + +// MemorizeEvents stores eventIDs a user wants to receive +func (ul *UserlogService) MemorizeEvents() { + for event := range ul.ch { + switch event.Event.(type) { + default: + // for each event type we need to: + + // I) find users eligible to receive the event + + // II) filter users who want to receive the event + + // III) store the eventID for each user + + // TEMP TESTING CODE + if err := ul.addEventToUser(_adminid, event.ID); err != nil { + continue + } + } + } +} + +// GetEvents allows to retrieve events from the eventhistory by userid +func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]interface{}, error) { + rec, err := ul.store.Read(userid) + if err != nil { + return nil, err + } + + if len(rec) == 0 { + // no events available + return []interface{}{}, nil + } + + var eventIDs []string + if err := json.Unmarshal(rec[0].Value, &eventIDs); err != nil { + // this should never happen + return nil, err + } + + resp, err := ul.historyClient.GetEvents(ctx, &ehsvc.GetEventsRequest{Ids: eventIDs}) + if err != nil { + return nil, err + } + + var events []interface{} + for _, e := range resp.Events { + ev, ok := ul.registeredEvents[e.Type] + if !ok { + // this should not happen but we handle it anyway + continue + } + + event, err := ev.Unmarshal(e.Event) + if err != nil { + // this shouldn't happen either + continue + } + + events = append(events, event) + } + + return events, nil +} + +func (ul *UserlogService) ServeHTTP(w http.ResponseWriter, r *http.Request) { + evs, err := ul.GetEvents(r.Context(), _adminid) + if err != nil { + return + } + + // TODO: format response + b, _ := json.Marshal(evs) + w.Write(b) +} + +func (ul *UserlogService) addEventToUser(userid string, eventid string) error { + recs, err := ul.store.Read(userid) + if err != nil && err != store.ErrNotFound { + return err + } + + var ids []string + if len(recs) > 0 { + if err := json.Unmarshal(recs[0].Value, &ids); err != nil { + return err + } + } + + ids = append(ids, eventid) + + b, err := json.Marshal(ids) + if err != nil { + return err + } + + return ul.store.Write(&store.Record{ + Key: userid, + Value: b, + }) +} diff --git a/services/userlog/pkg/service/service_test.go b/services/userlog/pkg/service/service_test.go new file mode 100644 index 00000000000..944a1f69f35 --- /dev/null +++ b/services/userlog/pkg/service/service_test.go @@ -0,0 +1,3 @@ +package service_test + +// tests here From d7f57f3a50cb2a27c969aba306f4921074029df7 Mon Sep 17 00:00:00 2001 From: jkoberg Date: Tue, 21 Feb 2023 10:27:27 +0100 Subject: [PATCH 2/3] changelog Signed-off-by: jkoberg --- changelog/unreleased/userlog-service.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 changelog/unreleased/userlog-service.md diff --git a/changelog/unreleased/userlog-service.md b/changelog/unreleased/userlog-service.md new file mode 100644 index 00000000000..d9f100f9e33 --- /dev/null +++ b/changelog/unreleased/userlog-service.md @@ -0,0 +1,5 @@ +Enhancement: Userlog Service + +Introduces userlog service. It stores eventIDs the user is interested in and provides an API to retrieve the events. + +https://github.com/owncloud/ocis/pull/5610 From a9561d85c8eae29d172aa67f4053b9fb7aaf362c Mon Sep 17 00:00:00 2001 From: jkoberg Date: Wed, 8 Feb 2023 12:38:04 +0100 Subject: [PATCH 3/3] sharpen userlog service Signed-off-by: jkoberg --- .drone.star | 1 + .../pkg/config/defaults/defaultconfig.go | 9 +- services/userlog/Makefile | 3 +- services/userlog/README.md | 30 +- .../userlog/mocks/event_history_service.go | 63 +++ services/userlog/pkg/command/root.go | 2 +- services/userlog/pkg/command/server.go | 65 ++- services/userlog/pkg/config/config.go | 14 +- .../pkg/config/defaults/defaultconfig.go | 7 +- services/userlog/pkg/config/parser/parse.go | 5 + services/userlog/pkg/server/http/option.go | 18 + services/userlog/pkg/server/http/server.go | 48 ++- services/userlog/pkg/service/http.go | 233 +++++++++++ services/userlog/pkg/service/options.go | 82 ++++ services/userlog/pkg/service/service.go | 379 +++++++++++++++--- .../userlog/pkg/service/service_suit_test.go | 13 + services/userlog/pkg/service/service_test.go | 142 ++++++- 17 files changed, 1024 insertions(+), 90 deletions(-) create mode 100644 services/userlog/mocks/event_history_service.go create mode 100644 services/userlog/pkg/service/http.go create mode 100644 services/userlog/pkg/service/options.go create mode 100644 services/userlog/pkg/service/service_suit_test.go diff --git a/.drone.star b/.drone.star index cd87aa5fb0e..610f6b59b05 100644 --- a/.drone.star +++ b/.drone.star @@ -79,6 +79,7 @@ config = { "services/storage-users", "services/store", "services/thumbnails", + "services/userlog", "services/users", "services/web", "services/webdav", diff --git a/services/proxy/pkg/config/defaults/defaultconfig.go b/services/proxy/pkg/config/defaults/defaultconfig.go index 140a57d8705..daf3a7534ec 100644 --- a/services/proxy/pkg/config/defaults/defaultconfig.go +++ b/services/proxy/pkg/config/defaults/defaultconfig.go @@ -101,6 +101,11 @@ func DefaultPolicies() []config.Policy { Endpoint: "/archiver", Service: "com.owncloud.web.frontend", }, + { + // reroute oc10 notifications endpoint to userlog service + Endpoint: "/ocs/v2.php/apps/notifications/api/v1/notifications", + Service: "com.owncloud.userlog.userlog", + }, { Type: config.RegexRoute, Endpoint: "/ocs/v[12].php/cloud/user/signing-key", // only `user/signing-key` is left in ocis-ocs @@ -202,10 +207,6 @@ func DefaultPolicies() []config.Policy { Endpoint: "/api/v0/settings", Service: "com.owncloud.web.settings", }, - { - Endpoint: "/api/v0/activities", - Service: "com.owncloud.userlog.userlog", - }, }, }, } diff --git a/services/userlog/Makefile b/services/userlog/Makefile index b1f19d550b9..12a1d0a7d60 100644 --- a/services/userlog/Makefile +++ b/services/userlog/Makefile @@ -24,7 +24,8 @@ docs-generate: config-docs-generate include ../../.make/generate.mk .PHONY: ci-go-generate -ci-go-generate: # CI runs ci-node-generate automatically before this target +ci-go-generate: $(MOCKERY) # CI runs ci-node-generate automatically before this target + $(MOCKERY) --dir ../../protogen/gen/ocis/services/eventhistory/v0 --case underscore --name EventHistoryService .PHONY: ci-node-generate ci-node-generate: diff --git a/services/userlog/README.md b/services/userlog/README.md index 612af80c3e4..352e76fa9c5 100644 --- a/services/userlog/README.md +++ b/services/userlog/README.md @@ -1,11 +1,33 @@ -# Userlog service +# Userlog Service -The `userlog` service provides a way to configure which events a user wants to be informed about and an API to retrieve them. +The `userlog` service is a mediator between the `eventhistory` service and clients who want to be informed about user related events. It provides an API to retrieve those. + +## Prerequisites + +Running the `userlog` service without running the `eventhistory` service is not possible. + +## Storing + +The `userlog` service persists information via the configured store in `USERLOG_STORE_TYPE`. Possible stores are: + - `mem`: Basic in-memory store and the default. + - `ocmem`: Advanced in-memory store allowing max size. + - `redis`: Stores data in a configured redis cluster. + - `etcd`: Stores data in a configured etcd cluster. + - `nats-js`: Stores data using key-value-store feature of [nats jetstream](https://docs.nats.io/nats-concepts/jetstream/key-value-store) + - `noop`: Stores nothing. Useful for testing. Not recommended in productive enviroments. + +1. Note that in-memory stores are by nature not reboot persistent. +2. Though usually not necessary, a database name and a database table can be configured for event stores if the event store supports this. Generally not applicapable for stores of type `in-memory`. These settings are blank by default which means that the standard settings of the configured store applies. +3. The userlog service can be scaled if not using `in-memory` stores and the stores are configured identically over all instances. ## Configuring -The `userlog` service has hardcoded configuration for now. +For the time being, the configuration which user related events are of interest is hardcoded and cannot be changed. ## Retrieving -The `userlog` service provides an API to retrieve configured events. +The `userlog` service provides an API to retrieve configured events. For now, this API is mostly following the [oc10 notification GET API](https://doc.owncloud.com/server/next/developer_manual/core/apis/ocs-notification-endpoint-v1.html#get-user-notifications). + +## Deleting + +To delete events for an user, use a `DELETE` request to `ocs/v2.php/apps/notifications/api/v1/notifications` containing the IDs to delete. diff --git a/services/userlog/mocks/event_history_service.go b/services/userlog/mocks/event_history_service.go new file mode 100644 index 00000000000..cfcee315c54 --- /dev/null +++ b/services/userlog/mocks/event_history_service.go @@ -0,0 +1,63 @@ +// Code generated by mockery v2.14.1. DO NOT EDIT. + +package mocks + +import ( + context "context" + + client "go-micro.dev/v4/client" + + mock "github.com/stretchr/testify/mock" + + v0 "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" +) + +// EventHistoryService is an autogenerated mock type for the EventHistoryService type +type EventHistoryService struct { + mock.Mock +} + +// GetEvents provides a mock function with given fields: ctx, in, opts +func (_m *EventHistoryService) GetEvents(ctx context.Context, in *v0.GetEventsRequest, opts ...client.CallOption) (*v0.GetEventsResponse, error) { + _va := make([]interface{}, len(opts)) + for _i := range opts { + _va[_i] = opts[_i] + } + var _ca []interface{} + _ca = append(_ca, ctx, in) + _ca = append(_ca, _va...) + ret := _m.Called(_ca...) + + var r0 *v0.GetEventsResponse + if rf, ok := ret.Get(0).(func(context.Context, *v0.GetEventsRequest, ...client.CallOption) *v0.GetEventsResponse); ok { + r0 = rf(ctx, in, opts...) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(*v0.GetEventsResponse) + } + } + + var r1 error + if rf, ok := ret.Get(1).(func(context.Context, *v0.GetEventsRequest, ...client.CallOption) error); ok { + r1 = rf(ctx, in, opts...) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +type mockConstructorTestingTNewEventHistoryService interface { + mock.TestingT + Cleanup(func()) +} + +// NewEventHistoryService creates a new instance of EventHistoryService. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +func NewEventHistoryService(t mockConstructorTestingTNewEventHistoryService) *EventHistoryService { + mock := &EventHistoryService{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/services/userlog/pkg/command/root.go b/services/userlog/pkg/command/root.go index c9fb4b39be3..4a7e7983971 100644 --- a/services/userlog/pkg/command/root.go +++ b/services/userlog/pkg/command/root.go @@ -43,7 +43,7 @@ type SutureService struct { // NewSutureService creates a new userlog.SutureService func NewSutureService(cfg *ociscfg.Config) suture.Service { - cfg.Notifications.Commons = cfg.Commons + cfg.Userlog.Commons = cfg.Commons return SutureService{ cfg: cfg.Userlog, } diff --git a/services/userlog/pkg/command/server.go b/services/userlog/pkg/command/server.go index 076eed54e41..7ca044d297d 100644 --- a/services/userlog/pkg/command/server.go +++ b/services/userlog/pkg/command/server.go @@ -3,25 +3,58 @@ package command import ( "context" "fmt" + "strings" "github.com/cs3org/reva/v2/pkg/events" "github.com/cs3org/reva/v2/pkg/events/stream" + "github.com/cs3org/reva/v2/pkg/rgrpc/todo/pool" "github.com/oklog/run" "github.com/owncloud/ocis/v2/ocis-pkg/config/configlog" + "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" ogrpc "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" + "github.com/owncloud/ocis/v2/ocis-pkg/store" "github.com/owncloud/ocis/v2/ocis-pkg/version" + ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" "github.com/owncloud/ocis/v2/services/userlog/pkg/config/parser" "github.com/owncloud/ocis/v2/services/userlog/pkg/logging" "github.com/owncloud/ocis/v2/services/userlog/pkg/metrics" "github.com/owncloud/ocis/v2/services/userlog/pkg/server/http" "github.com/urfave/cli/v2" - "go-micro.dev/v4/store" ) // all events we care about var _registeredEvents = []events.Unmarshaller{ + // file related events.UploadReady{}, + events.ContainerCreated{}, + events.FileTouched{}, + events.FileDownloaded{}, + events.FileVersionRestored{}, + events.ItemMoved{}, + events.ItemTrashed{}, + events.ItemPurged{}, + events.ItemRestored{}, + + // space related + events.SpaceCreated{}, + events.SpaceRenamed{}, + events.SpaceEnabled{}, + events.SpaceDisabled{}, + events.SpaceDeleted{}, + events.SpaceShared{}, + events.SpaceUnshared{}, + events.SpaceUpdated{}, + events.SpaceMembershipExpired{}, + + // share related + events.ShareCreated{}, + // events.ShareRemoved{}, // TODO: ShareRemoved doesn't hold sharee information + events.ShareUpdated{}, + events.ShareExpired{}, + events.LinkCreated{}, + // events.LinkRemoved{}, // TODO: LinkRemoved doesn't hold sharee information + events.LinkUpdated{}, } // Server is the entrypoint for the server command. @@ -48,7 +81,9 @@ func Server(cfg *config.Config) *cli.Command { } return context.WithCancel(cfg.Context) }() + mtrcs := metrics.New() + mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1) defer cancel() @@ -57,15 +92,27 @@ func Server(cfg *config.Config) *cli.Command { return err } - var st store.Store - switch cfg.Store.Type { - case "inmemory": - st = store.NewMemoryStore() - default: - return fmt.Errorf("unknown store '%s' configured", cfg.Store.Type) + st := store.Create( + store.Type(cfg.Store.Type), + store.Addresses(strings.Split(cfg.Store.Addresses, ",")...), + store.Database(cfg.Store.Database), + store.Table(cfg.Store.Table), + ) + + tm, err := pool.StringToTLSMode(cfg.GRPCClientTLS.Mode) + if err != nil { + return err + } + gwclient, err := pool.GetGatewayServiceClient( + cfg.RevaGateway, + pool.WithTLSCACert(cfg.GRPCClientTLS.CACert), + pool.WithTLSMode(tm), + ) + if err != nil { + return fmt.Errorf("could not get reva client: %s", err) } - mtrcs.BuildInfo.WithLabelValues(version.GetString()).Set(1) + hClient := ehsvc.NewEventHistoryService("com.owncloud.api.eventhistory", grpc.DefaultClient()) { server, err := http.Server( @@ -75,6 +122,8 @@ func Server(cfg *config.Config) *cli.Command { http.Metrics(mtrcs), http.Store(st), http.Consumer(consumer), + http.Gateway(gwclient), + http.History(hClient), http.RegisteredEvents(_registeredEvents), ) diff --git a/services/userlog/pkg/config/config.go b/services/userlog/pkg/config/config.go index 874c63fa7e8..acf98e1a27a 100644 --- a/services/userlog/pkg/config/config.go +++ b/services/userlog/pkg/config/config.go @@ -2,7 +2,6 @@ package config import ( "context" - "time" "github.com/owncloud/ocis/v2/ocis-pkg/shared" ) @@ -19,16 +18,21 @@ type Config struct { HTTP HTTP `yaml:"http"` GRPCClientTLS *shared.GRPCClientTLS `yaml:"grpc_client_tls"` - Events Events `yaml:"events"` - Store Store `yaml:"store"` + MachineAuthAPIKey string `yaml:"machine_auth_api_key" env:"OCIS_MACHINE_AUTH_API_KEY;USERLOG_MACHINE_AUTH_API_KEY" desc:"Machine auth API key used to validate internal requests necessary to access resources from other services."` + RevaGateway string `yaml:"reva_gateway" env:"REVA_GATEWAY" desc:"CS3 gateway used to look up user metadata"` + Events Events `yaml:"events"` + Store Store `yaml:"store"` Context context.Context `yaml:"-"` } // Store configures the store to use type Store struct { - Type string `yaml:"type" env:"USERLOG_STORE_TYPE" desc:"The type of the store. Supported is inmemory"` - RecordExpiry time.Duration `yaml:"record_expiry" env:"USERLOG_RECORD_EXPIRY" desc:"time to life for events in the store"` + Type string `yaml:"type" env:"USERLOG_STORE_TYPE" desc:"The type of the userlog store. Supported values are: 'mem', 'ocmem', 'etcd', 'redis', 'nats-js', 'noop'. See the text description for details."` + Addresses string `yaml:"addresses" env:"USERLOG_STORE_ADDRESSES" desc:"A comma separated list of addresses to access the configured store. This has no effect when 'in-memory' stores are configured. Note that the behaviour how addresses are used is dependent on the library of the configured store."` + Database string `yaml:"database" env:"USERLOG_STORE_DATABASE" desc:"(optional) The database name the configured store should use. This has no effect when 'in-memory' stores are configured."` + Table string `yaml:"table" env:"USERLOG_STORE_TABLE" desc:"(optional) The database table the store should use. This has no effect when 'in-memory' stores are configured."` + Size int `yaml:"size" env:"USERLOG_STORE_SIZE" desc:"The maximum quantity of items in the store. Only applies when store type 'ocmem' is configured. Defaults to 512."` } // Events combines the configuration options for the event bus. diff --git a/services/userlog/pkg/config/defaults/defaultconfig.go b/services/userlog/pkg/config/defaults/defaultconfig.go index faae9fdde33..6e6993fb6d4 100644 --- a/services/userlog/pkg/config/defaults/defaultconfig.go +++ b/services/userlog/pkg/config/defaults/defaultconfig.go @@ -27,8 +27,9 @@ func DefaultConfig() *config.Config { EnableTLS: false, }, Store: config.Store{ - Type: "inmemory", + Type: "mem", }, + RevaGateway: shared.DefaultRevaConfig().Address, HTTP: config.HTTP{ Addr: "127.0.0.1:0", Root: "/", @@ -57,6 +58,10 @@ func EnsureDefaults(cfg *config.Config) { cfg.Log = &config.Log{} } + if cfg.MachineAuthAPIKey == "" && cfg.Commons != nil && cfg.Commons.MachineAuthAPIKey != "" { + cfg.MachineAuthAPIKey = cfg.Commons.MachineAuthAPIKey + } + if cfg.GRPCClientTLS == nil { cfg.GRPCClientTLS = &shared.GRPCClientTLS{} if cfg.Commons != nil && cfg.Commons.GRPCClientTLS != nil { diff --git a/services/userlog/pkg/config/parser/parse.go b/services/userlog/pkg/config/parser/parse.go index 254d4667c3e..b3218cec9b2 100644 --- a/services/userlog/pkg/config/parser/parse.go +++ b/services/userlog/pkg/config/parser/parse.go @@ -4,6 +4,7 @@ import ( "errors" ociscfg "github.com/owncloud/ocis/v2/ocis-pkg/config" + "github.com/owncloud/ocis/v2/ocis-pkg/shared" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" "github.com/owncloud/ocis/v2/services/userlog/pkg/config/defaults" @@ -34,5 +35,9 @@ func ParseConfig(cfg *config.Config) error { // Validate validates the config func Validate(cfg *config.Config) error { + if cfg.MachineAuthAPIKey == "" { + return shared.MissingMachineAuthApiKeyError(cfg.Service.Name) + } + return nil } diff --git a/services/userlog/pkg/server/http/option.go b/services/userlog/pkg/server/http/option.go index 905621f5673..11700b5c145 100644 --- a/services/userlog/pkg/server/http/option.go +++ b/services/userlog/pkg/server/http/option.go @@ -3,8 +3,10 @@ package http import ( "context" + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" "github.com/cs3org/reva/v2/pkg/events" "github.com/owncloud/ocis/v2/ocis-pkg/log" + ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" "github.com/owncloud/ocis/v2/services/userlog/pkg/metrics" "github.com/urfave/cli/v2" @@ -24,6 +26,8 @@ type Options struct { Namespace string Store store.Store Consumer events.Consumer + GatewayClient gateway.GatewayAPIClient + HistoryClient ehsvc.EventHistoryService RegisteredEvents []events.Unmarshaller } @@ -94,6 +98,20 @@ func Consumer(consumer events.Consumer) Option { } } +// Gateway provides a function to configure the gateway client +func Gateway(gw gateway.GatewayAPIClient) Option { + return func(o *Options) { + o.GatewayClient = gw + } +} + +// History provides a function to configure the event history client +func History(h ehsvc.EventHistoryService) Option { + return func(o *Options) { + o.HistoryClient = h + } +} + // RegisteredEvents provides a function to register events func RegisteredEvents(evs []events.Unmarshaller) Option { return func(o *Options) { diff --git a/services/userlog/pkg/server/http/server.go b/services/userlog/pkg/server/http/server.go index 38b5db78a51..e603f5294a0 100644 --- a/services/userlog/pkg/server/http/server.go +++ b/services/userlog/pkg/server/http/server.go @@ -3,6 +3,11 @@ package http import ( "fmt" + stdhttp "net/http" + + "github.com/go-chi/chi/v5" + chimiddleware "github.com/go-chi/chi/v5/middleware" + "github.com/owncloud/ocis/v2/ocis-pkg/middleware" "github.com/owncloud/ocis/v2/ocis-pkg/service/http" "github.com/owncloud/ocis/v2/ocis-pkg/version" svc "github.com/owncloud/ocis/v2/services/userlog/pkg/service" @@ -34,29 +39,36 @@ func Server(opts ...Option) (http.Service, error) { return http.Service{}, fmt.Errorf("could not initialize http service: %w", err) } - //middlewares := []func(stdhttp.Handler) stdhttp.Handler{ - //middleware.TraceContext, - //chimiddleware.RequestID, - //middleware.Version( - //"userlog", - //version.GetString(), - //), - //middleware.Logger( - //options.Logger, - //), - //} + middlewares := []func(stdhttp.Handler) stdhttp.Handler{ + middleware.TraceContext, + chimiddleware.RequestID, + middleware.Version( + "userlog", + version.GetString(), + ), + middleware.Logger( + options.Logger, + ), + middleware.ExtractAccountUUID(), + } + + mux := chi.NewMux() + mux.Use(middlewares...) - handle, err := svc.NewUserlogService(options.Config, options.Consumer, options.Store, options.RegisteredEvents) + handle, err := svc.NewUserlogService( + svc.Logger(options.Logger), + svc.Consumer(options.Consumer), + svc.Mux(mux), + svc.Store(options.Store), + svc.Config(options.Config), + svc.HistoryClient(options.HistoryClient), + svc.GatewayClient(options.GatewayClient), + svc.RegisteredEvents(options.RegisteredEvents), + ) if err != nil { return http.Service{}, err } - { - //handle = svc.NewInstrument(handle, options.Metrics) - //handle = svc.NewLogging(handle, options.Logger) - //handle = svc.NewTracing(handle) - } - if err := micro.RegisterHandler(service.Server(), handle); err != nil { return http.Service{}, err } diff --git a/services/userlog/pkg/service/http.go b/services/userlog/pkg/service/http.go new file mode 100644 index 00000000000..65cbd1251dc --- /dev/null +++ b/services/userlog/pkg/service/http.go @@ -0,0 +1,233 @@ +package service + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "net/http" + "time" + + revactx "github.com/cs3org/reva/v2/pkg/ctx" + "github.com/cs3org/reva/v2/pkg/events" + ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0" +) + +// ServeHTTP fulfills Handler interface +func (ul *UserlogService) ServeHTTP(w http.ResponseWriter, r *http.Request) { + ul.m.ServeHTTP(w, r) +} + +// HandleGetEvents is the GET handler for events +func (ul *UserlogService) HandleGetEvents(w http.ResponseWriter, r *http.Request) { + u, ok := revactx.ContextGetUser(r.Context()) + if !ok { + ul.log.Error().Int("returned statuscode", http.StatusUnauthorized).Msg("user unauthorized") + w.WriteHeader(http.StatusUnauthorized) + return + } + + evs, err := ul.GetEvents(r.Context(), u.GetId().GetOpaqueId()) + if err != nil { + ul.log.Error().Err(err).Int("returned statuscode", http.StatusInternalServerError).Msg("get events failed") + w.WriteHeader(http.StatusInternalServerError) + return + } + + resp := GetEventResponseOC10{} + for _, e := range evs { + noti, err := ul.convertEvent(r.Context(), e) + if err != nil { + ul.log.Error().Err(err).Str("eventid", e.Id).Str("eventtype", e.Type).Msg("failed to convert event") + continue + } + + resp.OCS.Data = append(resp.OCS.Data, noti) + } + + resp.OCS.Meta.StatusCode = http.StatusOK + b, _ := json.Marshal(resp) + w.Write(b) +} + +// HandleDeleteEvents is the DELETE handler for events +func (ul *UserlogService) HandleDeleteEvents(w http.ResponseWriter, r *http.Request) { + u, ok := revactx.ContextGetUser(r.Context()) + if !ok { + ul.log.Error().Int("returned statuscode", http.StatusUnauthorized).Msg("user unauthorized") + w.WriteHeader(http.StatusUnauthorized) + return + } + + var ids []string + if err := json.NewDecoder(r.Body).Decode(&ids); err != nil { + ul.log.Error().Err(err).Int("returned statuscode", http.StatusBadRequest).Msg("request body is malformed") + w.WriteHeader(http.StatusBadRequest) + return + } + + if err := ul.DeleteEvents(u.GetId().GetOpaqueId(), ids); err != nil { + ul.log.Error().Err(err).Int("returned statuscode", http.StatusInternalServerError).Msg("delete events failed") + w.WriteHeader(http.StatusInternalServerError) + return + } + + w.WriteHeader(http.StatusOK) +} + +func (ul *UserlogService) convertEvent(ctx context.Context, event *ehmsg.Event) (OC10Notification, error) { + etype, ok := ul.registeredEvents[event.Type] + if !ok { + // this should not happen + return OC10Notification{}, errors.New("eventtype not registered") + } + + einterface, err := etype.Unmarshal(event.Event) + if err != nil { + // this shouldn't happen either + return OC10Notification{}, errors.New("cant unmarshal event") + } + + noti := OC10Notification{ + EventID: event.Id, + Service: "userlog", + Timestamp: time.Now().Format(time.RFC3339Nano), + } + + switch ev := einterface.(type) { + // file related + case events.UploadReady: + space, _ := ul.getSpace(ctx, ev.FileRef.GetResourceId().GetSpaceId()) + noti.UserID = ev.ExecutingUser.GetId().GetOpaqueId() + noti.Subject = "File uploaded" + noti.Message = fmt.Sprintf("File '%s' was uploaded to space '%s' by user '%s'", ev.Filename, space.GetName(), ev.ExecutingUser.GetUsername()) + case events.ContainerCreated: + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "Folder created" + noti.Message = fmt.Sprintf("Folder '%s' was created", ev.Ref.GetPath()) + case events.FileTouched: + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "File touched" + noti.Message = fmt.Sprintf("File '%s' was touched", ev.Ref.GetPath()) + case events.FileDownloaded: + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "File downloaded" + noti.Message = fmt.Sprintf("File '%s' was downloaded", ev.Ref.GetPath()) + case events.FileVersionRestored: + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "File version restored" + noti.Message = fmt.Sprintf("An older version of file '%s' was restored", ev.Ref.GetPath()) + case events.ItemMoved: + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "File moved" + noti.Message = fmt.Sprintf("File '%s' was moved from '%s'", ev.Ref.GetPath(), ev.OldReference.GetPath()) + case events.ItemTrashed: + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "File trashed" + noti.Message = fmt.Sprintf("File '%s' was trashed", ev.Ref.GetPath()) + case events.ItemPurged: + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "File purged" + noti.Message = fmt.Sprintf("File '%s' was purged", ev.Ref.GetPath()) + case events.ItemRestored: + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "File restored" + noti.Message = fmt.Sprintf("File '%s' was restored", ev.Ref.GetPath()) + + // space related + case events.SpaceCreated: + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "Space created" + noti.Message = fmt.Sprintf("Space '%s' was created", ev.Name) + case events.SpaceRenamed: + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "Space renamed" + noti.Message = fmt.Sprintf("Space '%s' was renamed", ev.Name) + case events.SpaceEnabled: + space, _ := ul.getSpace(ctx, ev.ID.GetOpaqueId()) + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "Space enabled" + noti.Message = fmt.Sprintf("Space '%s' was renamed", space.Name) + case events.SpaceDisabled: + space, _ := ul.getSpace(ctx, ev.ID.GetOpaqueId()) + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "Space disabled" + noti.Message = fmt.Sprintf("Space '%s' was disabled", space.Name) + case events.SpaceDeleted: + space, _ := ul.getSpace(ctx, ev.ID.GetOpaqueId()) + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "Space deleted" + noti.Message = fmt.Sprintf("Space '%s' was deleted", space.Name) + case events.SpaceShared: + space, _ := ul.getSpace(ctx, ev.ID.GetOpaqueId()) + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "Space shared" + noti.Message = fmt.Sprintf("Space '%s' was shared", space.Name) + case events.SpaceUnshared: + space, _ := ul.getSpace(ctx, ev.ID.GetOpaqueId()) + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "Space unshared" + noti.Message = fmt.Sprintf("Space '%s' was unshared", space.Name) + case events.SpaceUpdated: + space, _ := ul.getSpace(ctx, ev.ID.GetOpaqueId()) + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "Space updated" + noti.Message = fmt.Sprintf("Space '%s' was updated", space.Name) + case events.SpaceMembershipExpired: + space, _ := ul.getSpace(ctx, ev.SpaceID.GetOpaqueId()) + noti.UserID = "" + noti.Subject = "Space membership expired" + noti.Message = fmt.Sprintf("A spacemembership for space '%s' has expired", space.Name) + + // share related + case events.ShareCreated: + space, _ := ul.getSpace(ctx, ev.ItemID.GetSpaceId()) + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "Share received" + noti.Message = fmt.Sprintf("A file was shared in space %s", space.Name) + case events.ShareUpdated: + space, _ := ul.getSpace(ctx, ev.ItemID.GetSpaceId()) + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "Share updated" + noti.Message = fmt.Sprintf("A share was updated in space %s", space.Name) + case events.ShareExpired: + space, _ := ul.getSpace(ctx, ev.ItemID.GetSpaceId()) + noti.Subject = "Share expired" + noti.Message = fmt.Sprintf("A share has expired in space %s", space.Name) + case events.LinkCreated: + space, _ := ul.getSpace(ctx, ev.ItemID.GetSpaceId()) + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "Share received" + noti.Message = fmt.Sprintf("A link was created in space %s", space.Name) + case events.LinkUpdated: + space, _ := ul.getSpace(ctx, ev.ItemID.GetSpaceId()) + noti.UserID = ev.Executant.GetOpaqueId() + noti.Subject = "Share received" + noti.Message = fmt.Sprintf("A link was updated in space %s", space.Name) + } + + return noti, nil +} + +// OC10Notification is the oc10 style representation of an event +// some fields are left out for simplicity +type OC10Notification struct { + EventID string `json:"notification_id"` + Service string `json:"app"` + Timestamp string `json:"datetime"` + UserID string `json:"user"` + Subject string `json:"subject"` + Message string `json:"message"` +} + +// GetEventResponseOC10 is the response from GET events endpoint in oc10 style +type GetEventResponseOC10 struct { + OCS struct { + Meta struct { + Message string `json:"message"` + Status string `json:"status"` + StatusCode int `json:"statuscode"` + } `json:"meta"` + Data []OC10Notification `json:"data"` + } `json:"ocs"` +} diff --git a/services/userlog/pkg/service/options.go b/services/userlog/pkg/service/options.go new file mode 100644 index 00000000000..b84c67725e1 --- /dev/null +++ b/services/userlog/pkg/service/options.go @@ -0,0 +1,82 @@ +package service + +import ( + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/go-chi/chi/v5" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" + "github.com/owncloud/ocis/v2/services/userlog/pkg/config" + "go-micro.dev/v4/store" +) + +// Option for the userlog service +type Option func(*Options) + +// Options for the userlog service +type Options struct { + Logger log.Logger + Consumer events.Consumer + Mux *chi.Mux + Store store.Store + Config *config.Config + HistoryClient ehsvc.EventHistoryService + GatewayClient gateway.GatewayAPIClient + RegisteredEvents []events.Unmarshaller +} + +// Logger configures a logger for the userlog service +func Logger(log log.Logger) Option { + return func(o *Options) { + o.Logger = log + } +} + +// Consumer configures an event consumer for the userlog service +func Consumer(c events.Consumer) Option { + return func(o *Options) { + o.Consumer = c + } +} + +// Mux defines the muxer for the userlog service +func Mux(m *chi.Mux) Option { + return func(o *Options) { + o.Mux = m + } +} + +// Store defines the store for the userlog service +func Store(s store.Store) Option { + return func(o *Options) { + o.Store = s + } +} + +// Config adds the config for the userlog service +func Config(c *config.Config) Option { + return func(o *Options) { + o.Config = c + } +} + +// HistoryClient adds a grpc client for the eventhistory service +func HistoryClient(hc ehsvc.EventHistoryService) Option { + return func(o *Options) { + o.HistoryClient = hc + } +} + +// GatewayClient adds a grpc client for the gateway service +func GatewayClient(gwc gateway.GatewayAPIClient) Option { + return func(o *Options) { + o.GatewayClient = gwc + } +} + +// RegisteredEvents registers the events the service should listen to +func RegisteredEvents(e []events.Unmarshaller) Option { + return func(o *Options) { + o.RegisteredEvents = e + } +} diff --git a/services/userlog/pkg/service/service.go b/services/userlog/pkg/service/service.go index 06240d3f78d..4d54f53317e 100644 --- a/services/userlog/pkg/service/service.go +++ b/services/userlog/pkg/service/service.go @@ -3,51 +3,74 @@ package service import ( "context" "encoding/json" + "errors" "fmt" - "net/http" "reflect" + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + group "github.com/cs3org/go-cs3apis/cs3/identity/group/v1beta1" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + storageprovider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" "github.com/cs3org/reva/v2/pkg/events" - "github.com/owncloud/ocis/v2/ocis-pkg/service/grpc" + "github.com/cs3org/reva/v2/pkg/utils" + "github.com/go-chi/chi/v5" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0" ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" "github.com/owncloud/ocis/v2/services/userlog/pkg/config" "go-micro.dev/v4/store" ) -// Comment when you read this on review -var _adminid = "2502d8b8-a5e7-4ab3-b858-5aafae4a64a2" - // UserlogService is the service responsible for user activities type UserlogService struct { + log log.Logger ch <-chan events.Event + m *chi.Mux store store.Store cfg *config.Config historyClient ehsvc.EventHistoryService + gwClient gateway.GatewayAPIClient registeredEvents map[string]events.Unmarshaller } // NewUserlogService returns an EventHistory service -func NewUserlogService(cfg *config.Config, consumer events.Consumer, store store.Store, registeredEvents []events.Unmarshaller) (*UserlogService, error) { - if consumer == nil || store == nil { - return nil, fmt.Errorf("Need non nil consumer (%v) and store (%v) to work properly", consumer, store) +func NewUserlogService(opts ...Option) (*UserlogService, error) { + o := &Options{} + for _, opt := range opts { + opt(o) + } + + if o.Consumer == nil || o.Store == nil { + return nil, fmt.Errorf("Need non nil consumer (%v) and store (%v) to work properly", o.Consumer, o.Store) } - ch, err := events.Consume(consumer, "userlog", registeredEvents...) + ch, err := events.Consume(o.Consumer, "userlog", o.RegisteredEvents...) if err != nil { return nil, err } - grpcClient := grpc.DefaultClient() - grpcClient.Options() - c := ehsvc.NewEventHistoryService("com.owncloud.api.eventhistory", grpcClient) - - ul := &UserlogService{ch: ch, store: store, cfg: cfg, historyClient: c, registeredEvents: make(map[string]events.Unmarshaller)} + ul := &UserlogService{ + log: o.Logger, + ch: ch, + m: o.Mux, + store: o.Store, + cfg: o.Config, + historyClient: o.HistoryClient, + gwClient: o.GatewayClient, + registeredEvents: make(map[string]events.Unmarshaller), + } - for _, e := range registeredEvents { + for _, e := range o.RegisteredEvents { typ := reflect.TypeOf(e) ul.registeredEvents[typ.String()] = e } + ul.m.Route("/", func(r chi.Router) { + r.Get("/*", ul.HandleGetEvents) + r.Delete("/*", ul.HandleDeleteEvents) + }) + go ul.MemorizeEvents() return ul, nil @@ -56,18 +79,83 @@ func NewUserlogService(cfg *config.Config, consumer events.Consumer, store store // MemorizeEvents stores eventIDs a user wants to receive func (ul *UserlogService) MemorizeEvents() { for event := range ul.ch { - switch event.Event.(type) { + // for each event we need to: + // I) find users eligible to receive the event + var ( + users []string + err error + ) + switch e := event.Event.(type) { default: - // for each event type we need to: + err = errors.New("unhandled event") - // I) find users eligible to receive the event + // file related + case events.UploadReady: + users, err = ul.findSpaceMembers(ul.impersonate(e.SpaceOwner), e.FileRef.GetResourceId().GetSpaceId(), viewer) + case events.ContainerCreated: + users, err = ul.findSpaceMembers(ul.impersonate(e.SpaceOwner), e.Ref.GetResourceId().GetSpaceId(), viewer) + case events.FileTouched: + users, err = ul.findSpaceMembers(ul.impersonate(e.SpaceOwner), e.Ref.GetResourceId().GetSpaceId(), viewer) + case events.FileDownloaded: + users, err = ul.findSpaceMembers(ul.impersonate(e.Owner), e.Ref.GetResourceId().GetSpaceId(), viewer) // no space owner in event + case events.FileVersionRestored: + users, err = ul.findSpaceMembers(ul.impersonate(e.SpaceOwner), e.Ref.GetResourceId().GetSpaceId(), editor) + case events.ItemMoved: + users, err = ul.findSpaceMembers(ul.impersonate(e.SpaceOwner), e.Ref.GetResourceId().GetSpaceId(), viewer) + case events.ItemTrashed: + users, err = ul.findSpaceMembers(ul.impersonate(e.SpaceOwner), e.Ref.GetResourceId().GetSpaceId(), viewer) + case events.ItemPurged: + users, err = ul.findSpaceMembers(ul.impersonate(e.Owner), e.Ref.GetResourceId().GetSpaceId(), editor) // no space owner in event + case events.ItemRestored: + users, err = ul.findSpaceMembers(ul.impersonate(e.SpaceOwner), e.Ref.GetResourceId().GetSpaceId(), viewer) - // II) filter users who want to receive the event + // space related // TODO: how to find spaceadmins? + case events.SpaceCreated: + users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), viewer) + case events.SpaceRenamed: + users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), viewer) + case events.SpaceEnabled: + users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), viewer) + case events.SpaceDisabled: + users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), manager) + case events.SpaceDeleted: + users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), manager) + case events.SpaceShared: + users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), manager) + case events.SpaceUnshared: + users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), manager) + case events.SpaceUpdated: + users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ID.GetOpaqueId(), manager) + case events.SpaceMembershipExpired: + users, err = ul.resolveShare(ul.impersonate(e.SpaceOwner), e.GranteeUserID, e.GranteeGroupID, e.SpaceID.GetOpaqueId()) - // III) store the eventID for each user + // share related + case events.ShareCreated: + users, err = ul.resolveShare(ul.impersonate(e.Executant), e.GranteeUserID, e.GranteeGroupID, e.ItemID.GetSpaceId()) + case events.ShareUpdated: + users, err = ul.resolveShare(ul.impersonate(e.Executant), e.GranteeUserID, e.GranteeGroupID, e.ItemID.GetSpaceId()) + case events.ShareExpired: + users, err = ul.resolveShare(ul.impersonate(e.ShareOwner), e.GranteeUserID, e.GranteeGroupID, e.ItemID.GetSpaceId()) + case events.LinkCreated: + users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ItemID.GetOpaqueId(), editor) + case events.LinkUpdated: + users, err = ul.findSpaceMembers(ul.impersonate(e.Executant), e.ItemID.GetOpaqueId(), editor) - // TEMP TESTING CODE - if err := ul.addEventToUser(_adminid, event.ID); err != nil { + } + + if err != nil { + ul.log.Error().Err(err).Interface("event", event).Msg("error gathering members for event") + continue + } + + // II) filter users who want to receive the event + // This step is postponed for later. + // For now each user should get all events she is eligible to receive + + // III) store the eventID for each user + for _, id := range users { + if err := ul.addEventsToUser(id, event.ID); err != nil { + ul.log.Error().Err(err).Str("userID", id).Str("eventid", event.ID).Msg("failed to store event for user") continue } } @@ -75,20 +163,21 @@ func (ul *UserlogService) MemorizeEvents() { } // GetEvents allows to retrieve events from the eventhistory by userid -func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]interface{}, error) { +func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]*ehmsg.Event, error) { rec, err := ul.store.Read(userid) - if err != nil { + if err != nil && err != store.ErrNotFound { + ul.log.Fatal().Err(err).Str("userid", userid).Msg("failed to read record from database") return nil, err } if len(rec) == 0 { // no events available - return []interface{}{}, nil + return []*ehmsg.Event{}, nil } var eventIDs []string if err := json.Unmarshal(rec[0].Value, &eventIDs); err != nil { - // this should never happen + ul.log.Fatal().Err(err).Str("userid", userid).Msg("failed to umarshal record from database") return nil, err } @@ -97,38 +186,74 @@ func (ul *UserlogService) GetEvents(ctx context.Context, userid string) ([]inter return nil, err } - var events []interface{} - for _, e := range resp.Events { - ev, ok := ul.registeredEvents[e.Type] - if !ok { - // this should not happen but we handle it anyway - continue + // remove expired events from list asynchronously + go func() { + if err := ul.removeExpiredEvents(userid, eventIDs, resp.Events); err != nil { + ul.log.Error().Err(err).Str("userid", userid).Msg("could not remove expired events from user") } - event, err := ev.Unmarshal(e.Event) - if err != nil { - // this shouldn't happen either - continue - } + }() + + return resp.Events, nil + +} - events = append(events, event) +// DeleteEvents will delete the specified events +func (ul *UserlogService) DeleteEvents(userid string, evids []string) error { + toDelete := make(map[string]struct{}) + for _, e := range evids { + toDelete[e] = struct{}{} } - return events, nil + return ul.alterUserEventList(userid, func(ids []string) []string { + var newids []string + for _, id := range ids { + if _, delete := toDelete[id]; delete { + continue + } + + newids = append(newids, id) + } + return newids + }) } -func (ul *UserlogService) ServeHTTP(w http.ResponseWriter, r *http.Request) { - evs, err := ul.GetEvents(r.Context(), _adminid) +func (ul *UserlogService) impersonate(u *user.UserId) context.Context { + ctx, _, err := utils.Impersonate(u, ul.gwClient, ul.cfg.MachineAuthAPIKey) if err != nil { - return + ul.log.Error().Err(err).Str("userid", u.GetOpaqueId()).Msg("failed to impersonate user") + return context.Background() + } + return ctx +} + +func (ul *UserlogService) addEventsToUser(userid string, eventids ...string) error { + return ul.alterUserEventList(userid, func(ids []string) []string { + return append(ids, eventids...) + }) +} + +func (ul *UserlogService) removeExpiredEvents(userid string, all []string, received []*ehmsg.Event) error { + exists := make(map[string]struct{}, len(received)) + for _, e := range received { + exists[e.Id] = struct{}{} + } + + var toDelete []string + for _, eid := range all { + if _, ok := exists[eid]; !ok { + toDelete = append(toDelete, eid) + } } - // TODO: format response - b, _ := json.Marshal(evs) - w.Write(b) + if len(toDelete) == 0 { + return nil + } + + return ul.DeleteEvents(userid, toDelete) } -func (ul *UserlogService) addEventToUser(userid string, eventid string) error { +func (ul *UserlogService) alterUserEventList(userid string, alter func([]string) []string) error { recs, err := ul.store.Read(userid) if err != nil && err != store.ErrNotFound { return err @@ -141,7 +266,12 @@ func (ul *UserlogService) addEventToUser(userid string, eventid string) error { } } - ids = append(ids, eventid) + ids = alter(ids) + + // store reacts unforseeable when trying to store nil values + if len(ids) == 0 { + return ul.store.Delete(userid) + } b, err := json.Marshal(ids) if err != nil { @@ -153,3 +283,158 @@ func (ul *UserlogService) addEventToUser(userid string, eventid string) error { Value: b, }) } + +// we need the spaceid to inform other space members +// we need an owner to query space members +// we need to check the user has the required role to see the event +func (ul *UserlogService) findSpaceMembers(ctx context.Context, spaceID string, requiredRole permissionChecker) ([]string, error) { + space, err := ul.getSpace(ctx, spaceID) + if err != nil { + return nil, err + } + + var users []string + switch space.SpaceType { + case "personal": + users = []string{space.GetOwner().GetId().GetOpaqueId()} + case "project": + if users, err = ul.gatherSpaceMembers(ctx, space, requiredRole); err != nil { + return nil, err + } + default: + // TODO: shares? other space types? + return nil, fmt.Errorf("unsupported space type: %s", space.SpaceType) + } + + return users, nil +} + +func (ul *UserlogService) getSpace(ctx context.Context, spaceID string) (*storageprovider.StorageSpace, error) { + res, err := ul.gwClient.ListStorageSpaces(ctx, listStorageSpaceRequest(spaceID)) + if err != nil { + return nil, err + } + + if res.GetStatus().GetCode() != rpc.Code_CODE_OK { + return nil, fmt.Errorf("Unexpected status code while getting space: %v", res.GetStatus().GetCode()) + } + + if len(res.StorageSpaces) == 0 { + return nil, fmt.Errorf("error getting storage space %s: no space returned", spaceID) + } + + return res.StorageSpaces[0], nil +} + +func (ul *UserlogService) gatherSpaceMembers(ctx context.Context, space *storageprovider.StorageSpace, hasRequiredRole permissionChecker) ([]string, error) { + var permissionsMap map[string]*storageprovider.ResourcePermissions + if err := utils.ReadJSONFromOpaque(space.GetOpaque(), "grants", &permissionsMap); err != nil { + return nil, err + } + + groupsMap := make(map[string]struct{}) + if opaqueGroups, ok := space.Opaque.Map["groups"]; ok { + _ = json.Unmarshal(opaqueGroups.GetValue(), &groupsMap) + } + + // we use a map to avoid duplicates + usermap := make(map[string]struct{}) + for id, perm := range permissionsMap { + if !hasRequiredRole(perm) { + // not allowed to receive event + continue + } + + if _, isGroup := groupsMap[id]; !isGroup { + usermap[id] = struct{}{} + continue + } + + usrs, err := ul.resolveGroup(ctx, id) + if err != nil { + ul.log.Error().Err(err).Str("groupID", id).Msg("failed to resolve group") + continue + } + + for _, u := range usrs { + usermap[u] = struct{}{} + } + } + + var users []string + for id := range usermap { + users = append(users, id) + } + + return users, nil +} + +// resolves the users of a group +func (ul *UserlogService) resolveGroup(ctx context.Context, groupID string) ([]string, error) { + r, err := ul.gwClient.GetGroup(ctx, &group.GetGroupRequest{GroupId: &group.GroupId{OpaqueId: groupID}}) + if err != nil { + return nil, err + } + + if r.GetStatus().GetCode() != rpc.Code_CODE_OK { + return nil, fmt.Errorf("unexpected status code from gateway client: %d", r.GetStatus().GetCode()) + } + + var userIDs []string + for _, m := range r.GetGroup().GetMembers() { + userIDs = append(userIDs, m.GetOpaqueId()) + } + + return userIDs, nil +} + +func (ul *UserlogService) resolveID(ctx context.Context, userid *user.UserId, groupid *group.GroupId) ([]string, error) { + if userid != nil { + return []string{userid.GetOpaqueId()}, nil + } + + return ul.resolveGroup(ctx, groupid.GetOpaqueId()) +} + +func (ul *UserlogService) resolveShare(ctx context.Context, userid *user.UserId, groupid *group.GroupId, spaceid string) ([]string, error) { + users, err := ul.resolveID(ctx, userid, groupid) + if err != nil { + return nil, err + } + + usr, err := ul.findSpaceMembers(ctx, spaceid, editor) + if err != nil { + return nil, err + } + + return append(users, usr...), nil +} + +func listStorageSpaceRequest(spaceID string) *storageprovider.ListStorageSpacesRequest { + return &storageprovider.ListStorageSpacesRequest{ + Filters: []*storageprovider.ListStorageSpacesRequest_Filter{ + { + Type: storageprovider.ListStorageSpacesRequest_Filter_TYPE_ID, + Term: &storageprovider.ListStorageSpacesRequest_Filter_Id{ + Id: &storageprovider.StorageSpaceId{ + OpaqueId: spaceID, + }, + }, + }, + }, + } +} + +type permissionChecker func(*storageprovider.ResourcePermissions) bool + +func viewer(perms *storageprovider.ResourcePermissions) bool { + return perms.Stat +} + +func editor(perms *storageprovider.ResourcePermissions) bool { + return perms.InitiateFileUpload +} + +func manager(perms *storageprovider.ResourcePermissions) bool { + return perms.DenyGrant +} diff --git a/services/userlog/pkg/service/service_suit_test.go b/services/userlog/pkg/service/service_suit_test.go new file mode 100644 index 00000000000..77d1081a82d --- /dev/null +++ b/services/userlog/pkg/service/service_suit_test.go @@ -0,0 +1,13 @@ +package service_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestSearch(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Userlog service Suite") +} diff --git a/services/userlog/pkg/service/service_test.go b/services/userlog/pkg/service/service_test.go index 944a1f69f35..724d4c7d92a 100644 --- a/services/userlog/pkg/service/service_test.go +++ b/services/userlog/pkg/service/service_test.go @@ -1,3 +1,143 @@ package service_test -// tests here +import ( + "context" + "encoding/json" + "reflect" + "time" + + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + "github.com/cs3org/reva/v2/pkg/events" + "github.com/cs3org/reva/v2/pkg/utils" + cs3mocks "github.com/cs3org/reva/v2/tests/cs3mocks/mocks" + "github.com/go-chi/chi/v5" + "github.com/google/uuid" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/owncloud/ocis/v2/ocis-pkg/log" + "github.com/owncloud/ocis/v2/ocis-pkg/store" + ehmsg "github.com/owncloud/ocis/v2/protogen/gen/ocis/messages/eventhistory/v0" + ehsvc "github.com/owncloud/ocis/v2/protogen/gen/ocis/services/eventhistory/v0" + "github.com/owncloud/ocis/v2/services/userlog/mocks" + "github.com/owncloud/ocis/v2/services/userlog/pkg/config" + "github.com/owncloud/ocis/v2/services/userlog/pkg/service" + "github.com/test-go/testify/mock" + microevents "go-micro.dev/v4/events" + microstore "go-micro.dev/v4/store" +) + +var _ = Describe("UserlogService", func() { + var ( + cfg = &config.Config{} + + ul *service.UserlogService + bus testBus + sto microstore.Store + + gwc cs3mocks.GatewayAPIClient + ehc mocks.EventHistoryService + ) + + BeforeEach(func() { + var err error + sto = store.Create() + bus = testBus(make(chan events.Event)) + o := utils.AppendJSONToOpaque(nil, "grants", map[string]*provider.ResourcePermissions{"userid": {Stat: true}}) + gwc.On("ListStorageSpaces", mock.Anything, mock.Anything).Return(&provider.ListStorageSpacesResponse{StorageSpaces: []*provider.StorageSpace{ + { + Opaque: o, + SpaceType: "project", + }, + }, Status: &rpc.Status{Code: rpc.Code_CODE_OK}}, nil) + gwc.On("GetUser", mock.Anything, mock.Anything).Return(&user.GetUserResponse{User: &user.User{Id: &user.UserId{OpaqueId: "userid"}}, Status: &rpc.Status{Code: rpc.Code_CODE_OK}}, nil) + gwc.On("Authenticate", mock.Anything, mock.Anything).Return(&gateway.AuthenticateResponse{Status: &rpc.Status{Code: rpc.Code_CODE_OK}}, nil) + + ul, err = service.NewUserlogService( + service.Config(cfg), + service.Consumer(bus), + service.Store(sto), + service.Logger(log.NewLogger()), + service.Mux(chi.NewMux()), + service.GatewayClient(&gwc), + service.HistoryClient(&ehc), + service.RegisteredEvents([]events.Unmarshaller{ + events.UploadReady{}, + }), + ) + Expect(err).ToNot(HaveOccurred()) + + }) + + It("it stores, returns and deletes a couple of events", func() { + ids := make(map[string]struct{}) + ids[bus.Publish(events.SpaceCreated{Executant: &user.UserId{OpaqueId: "userid"}})] = struct{}{} + ids[bus.Publish(events.UploadReady{SpaceOwner: &user.UserId{OpaqueId: "userid"}})] = struct{}{} + ids[bus.Publish(events.ContainerCreated{SpaceOwner: &user.UserId{OpaqueId: "userid"}})] = struct{}{} + + time.Sleep(500 * time.Millisecond) + + var events []*ehmsg.Event + for id := range ids { + events = append(events, &ehmsg.Event{Id: id}) + } + + ehc.On("GetEvents", mock.Anything, mock.Anything).Return(&ehsvc.GetEventsResponse{Events: events}, nil) + + evs, err := ul.GetEvents(context.Background(), "userid") + Expect(err).ToNot(HaveOccurred()) + Expect(len(evs)).To(Equal(len(ids))) + + var evids []string + for _, e := range evs { + _, exists := ids[e.Id] + Expect(exists).To(BeTrue()) + delete(ids, e.Id) + evids = append(evids, e.Id) + } + + Expect(len(ids)).To(Equal(0)) + err = ul.DeleteEvents("userid", evids) + Expect(err).ToNot(HaveOccurred()) + + evs, err = ul.GetEvents(context.Background(), "userid") + Expect(err).ToNot(HaveOccurred()) + Expect(len(evs)).To(Equal(0)) + }) + + AfterEach(func() { + close(bus) + }) +}) + +type testBus chan events.Event + +func (tb testBus) Consume(_ string, _ ...microevents.ConsumeOption) (<-chan microevents.Event, error) { + ch := make(chan microevents.Event) + go func() { + for ev := range tb { + b, _ := json.Marshal(ev.Event) + ch <- microevents.Event{ + Payload: b, + Metadata: map[string]string{ + events.MetadatakeyEventID: ev.ID, + events.MetadatakeyEventType: ev.Type, + }, + } + } + }() + return ch, nil +} + +func (tb testBus) Publish(e interface{}) string { + ev := events.Event{ + ID: uuid.New().String(), + Type: reflect.TypeOf(e).String(), + Event: e, + } + + tb <- ev + return ev.ID +}