diff --git a/changelog/unreleased/metadata-storage.md b/changelog/unreleased/metadata-storage.md new file mode 100644 index 0000000000..c11c2d7516 --- /dev/null +++ b/changelog/unreleased/metadata-storage.md @@ -0,0 +1,5 @@ +Enhancement: Add metadata storage layer and indexer + +We ported over and enhanced the metadata storage layer and indexer from ocis-pkg so that it can be used by reva services as well. + +https://github.com/cs3org/reva/pull/2585 diff --git a/go.mod b/go.mod index 91e986b526..6fc98f28d2 100644 --- a/go.mod +++ b/go.mod @@ -4,6 +4,7 @@ require ( bou.ke/monkey v1.0.2 contrib.go.opencensus.io/exporter/prometheus v0.4.0 github.com/BurntSushi/toml v1.0.0 + github.com/CiscoM31/godata v1.0.5 // indirect github.com/Masterminds/goutils v1.1.1 // indirect github.com/Masterminds/semver v1.5.0 // indirect github.com/Masterminds/sprig v2.22.0+incompatible @@ -41,6 +42,7 @@ require ( github.com/hashicorp/go-hclog v1.1.0 github.com/hashicorp/go-plugin v1.4.3 github.com/huandu/xstrings v1.3.2 // indirect + github.com/iancoleman/strcase v0.2.0 // indirect github.com/jedib0t/go-pretty v4.3.0+incompatible github.com/juliangruber/go-intersect v1.1.0 github.com/mattn/go-sqlite3 v1.14.10 diff --git a/go.sum b/go.sum index d3e0e5f0a8..a38a81384a 100644 --- a/go.sum +++ b/go.sum @@ -82,6 +82,8 @@ github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03 github.com/BurntSushi/toml v1.0.0 h1:dtDWrepsVPfW9H/4y7dDgFc2MBUSeJhlaDtK13CxFlU= github.com/BurntSushi/toml v1.0.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/CiscoM31/godata v1.0.5 h1:AITXpa/5ybXEq59A0nqUGiS7ZXVJnQtFw5o09tyN/UA= +github.com/CiscoM31/godata v1.0.5/go.mod h1:wcmFm66GMdOE316TgwFO1wo5ainCvTK26omd93oZf2M= github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/HdrHistogram/hdrhistogram-go v1.1.2/go.mod h1:yDgFjdqOqDEKOvasDdhWNXYg9BVp4O+o5f6V/ehm6Oo= github.com/Masterminds/goutils v1.1.1 h1:5nUrii3FMTL5diU80unEVvNevw1nH4+ZV4DSLVJLSYI= @@ -597,6 +599,8 @@ github.com/hashicorp/yamux v0.0.0-20180604194846-3520598351bb/go.mod h1:+NfK9FKe github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/huandu/xstrings v1.3.2 h1:L18LIDzqlW6xN2rEkpdV8+oL/IXWJ1APd+vsdYy4Wdw= github.com/huandu/xstrings v1.3.2/go.mod h1:y5/lhBue+AyNmUVz9RLU9xbLR0o4KIIExikq4ovT0aE= +github.com/iancoleman/strcase v0.2.0 h1:05I4QRnGpI0m37iZQRuskXh+w77mr6Z41lwQzuHLwW0= +github.com/iancoleman/strcase v0.2.0/go.mod h1:iwCmte+B7n89clKwxIoIXy/HfoL7AsD47ZCWhYzw7ho= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/iij/doapi v0.0.0-20190504054126-0bbf12d6d7df/go.mod h1:QMZY7/J/KSQEhKWFeDesPjMj+wCHReeknARU3wqlyN4= diff --git a/pkg/storage/utils/indexer/errors/errors.go b/pkg/storage/utils/indexer/errors/errors.go new file mode 100644 index 0000000000..07d6a9255a --- /dev/null +++ b/pkg/storage/utils/indexer/errors/errors.go @@ -0,0 +1,57 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package errors + +import ( + "fmt" + + "github.com/cs3org/reva/pkg/storage/utils/indexer/option" +) + +// AlreadyExistsErr implements the Error interface. +type AlreadyExistsErr struct { + TypeName, Value string + IndexBy option.IndexBy +} + +func (e *AlreadyExistsErr) Error() string { + return fmt.Sprintf("%s with %s=%s does already exist", e.TypeName, e.IndexBy.String(), e.Value) +} + +// IsAlreadyExistsErr checks whether an error is of type AlreadyExistsErr. +func IsAlreadyExistsErr(e error) bool { + _, ok := e.(*AlreadyExistsErr) + return ok +} + +// NotFoundErr implements the Error interface. +type NotFoundErr struct { + TypeName, Value string + IndexBy option.IndexBy +} + +func (e *NotFoundErr) Error() string { + return fmt.Sprintf("%s with %s=%s not found", e.TypeName, e.IndexBy.String(), e.Value) +} + +// IsNotFoundErr checks whether an error is of type IsNotFoundErr. +func IsNotFoundErr(e error) bool { + _, ok := e.(*NotFoundErr) + return ok +} diff --git a/pkg/storage/utils/indexer/helper.go b/pkg/storage/utils/indexer/helper.go new file mode 100644 index 0000000000..242f321ee2 --- /dev/null +++ b/pkg/storage/utils/indexer/helper.go @@ -0,0 +1,32 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package indexer + +// dedup removes duplicate values in given slice +func dedup(s []string) []string { + var out []string + exists := make(map[string]bool) + for _, ss := range s { + if _, ok := exists[ss]; !ok { + out = append(out, ss) + exists[ss] = true + } + } + return out +} diff --git a/pkg/storage/utils/indexer/helper_test.go b/pkg/storage/utils/indexer/helper_test.go new file mode 100644 index 0000000000..ff3ad93593 --- /dev/null +++ b/pkg/storage/utils/indexer/helper_test.go @@ -0,0 +1,55 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package indexer + +import ( + "strconv" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/onsi/gomega/gmeasure" +) + +var _ = Describe("Helper", func() { + Describe("dedup", func() { + It("dedups reasonably fast", func() { + experiment := gmeasure.NewExperiment("deduplicating string slices") + AddReportEntry(experiment.Name, experiment) + + experiment.Sample(func(idx int) { + slice := []string{} + for i := 0; i < 900; i++ { + slice = append(slice, strconv.Itoa(i)) + } + for i := 0; i < 100; i++ { + slice = append(slice, strconv.Itoa(i)) + } + experiment.MeasureDuration("repagination", func() { + dedupped := dedup(slice) + Expect(len(dedupped)).To(Equal(900)) + }) + }, gmeasure.SamplingConfig{N: 100000, Duration: 10 * time.Second}) + + repaginationStats := experiment.GetStats("repagination") + medianDuration := repaginationStats.DurationFor(gmeasure.StatMedian) + Expect(medianDuration).To(BeNumerically("<", 1200*time.Microsecond)) + }) + }) +}) diff --git a/pkg/storage/utils/indexer/index/autoincrement.go b/pkg/storage/utils/indexer/index/autoincrement.go new file mode 100644 index 0000000000..3c58ed25e0 --- /dev/null +++ b/pkg/storage/utils/indexer/index/autoincrement.go @@ -0,0 +1,221 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package index + +import ( + "context" + "os" + "path" + "path/filepath" + "sort" + "strconv" + "strings" + + idxerrs "github.com/cs3org/reva/pkg/storage/utils/indexer/errors" + "github.com/cs3org/reva/pkg/storage/utils/indexer/option" + metadata "github.com/cs3org/reva/pkg/storage/utils/metadata" +) + +// Autoincrement are fields for an index of type autoincrement. +type Autoincrement struct { + indexBy option.IndexBy + typeName string + filesDir string + indexBaseDir string + indexRootDir string + + bound *option.Bound + storage metadata.Storage +} + +// NewAutoincrementIndex instantiates a new AutoincrementIndex instance. +func NewAutoincrementIndex(storage metadata.Storage, o ...option.Option) Index { + opts := &option.Options{} + for _, opt := range o { + opt(opts) + } + + u := &Autoincrement{ + storage: storage, + indexBy: opts.IndexBy, + typeName: opts.TypeName, + filesDir: opts.FilesDir, + bound: opts.Bound, + indexBaseDir: path.Join(opts.Prefix, "index."+storage.Backend()), + indexRootDir: path.Join(opts.Prefix, "index."+storage.Backend(), strings.Join([]string{"autoincrement", opts.TypeName, opts.IndexBy.String()}, ".")), + } + + return u +} + +// Init initializes an autoincrement index. +func (idx *Autoincrement) Init() error { + if err := idx.storage.MakeDirIfNotExist(context.Background(), idx.indexBaseDir); err != nil { + return err + } + + return idx.storage.MakeDirIfNotExist(context.Background(), idx.indexRootDir) +} + +// Lookup exact lookup by value. +func (idx *Autoincrement) Lookup(v string) ([]string, error) { + searchPath := path.Join(idx.indexRootDir, v) + oldname, err := idx.storage.ResolveSymlink(context.Background(), searchPath) + if err != nil { + if os.IsNotExist(err) { + err = &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v} + } + + return nil, err + } + + return []string{oldname}, nil +} + +// Add a new value to the index. +func (idx *Autoincrement) Add(id, v string) (string, error) { + var newName string + if v == "" { + next, err := idx.next() + if err != nil { + return "", err + } + newName = path.Join(idx.indexRootDir, strconv.Itoa(next)) + } else { + newName = path.Join(idx.indexRootDir, v) + } + if err := idx.storage.CreateSymlink(context.Background(), id, newName); err != nil { + if os.IsExist(err) { + return "", &idxerrs.AlreadyExistsErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v} + } + + return "", err + } + + return newName, nil +} + +// Remove a value v from an index. +func (idx *Autoincrement) Remove(_ string, v string) error { + if v == "" { + return nil + } + searchPath := path.Join(idx.indexRootDir, v) + _, err := idx.storage.ResolveSymlink(context.Background(), searchPath) + if err != nil { + if os.IsNotExist(err) { + err = &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v} + } + + return err + } + + deletePath := path.Join("/", idx.indexRootDir, v) + return idx.storage.Delete(context.Background(), deletePath) +} + +// Update index from to . +func (idx *Autoincrement) Update(id, oldV, newV string) error { + if err := idx.Remove(id, oldV); err != nil { + return err + } + + _, err := idx.Add(id, newV) + return err +} + +// Search allows for glob search on the index. +func (idx *Autoincrement) Search(pattern string) ([]string, error) { + paths, err := idx.storage.ReadDir(context.Background(), idx.indexRootDir) + if err != nil { + return nil, err + } + + searchPath := idx.indexRootDir + matches := make([]string, 0) + for _, p := range paths { + if found, err := filepath.Match(pattern, path.Base(p)); found { + if err != nil { + return nil, err + } + + oldPath, err := idx.storage.ResolveSymlink(context.Background(), path.Join(searchPath, path.Base(p))) + if err != nil { + return nil, err + } + matches = append(matches, oldPath) + } + } + + return matches, nil +} + +// CaseInsensitive undocumented. +func (idx *Autoincrement) CaseInsensitive() bool { + return false +} + +// IndexBy undocumented. +func (idx *Autoincrement) IndexBy() option.IndexBy { + return idx.indexBy +} + +// TypeName undocumented. +func (idx *Autoincrement) TypeName() string { + return idx.typeName +} + +// FilesDir undocumented. +func (idx *Autoincrement) FilesDir() string { + return idx.filesDir +} + +func (idx *Autoincrement) next() (int, error) { + paths, err := idx.storage.ReadDir(context.Background(), idx.indexRootDir) + + if err != nil { + return -1, err + } + + if len(paths) == 0 { + return int(idx.bound.Lower), nil + } + + sort.Slice(paths, func(i, j int) bool { + a, _ := strconv.Atoi(path.Base(paths[i])) + b, _ := strconv.Atoi(path.Base(paths[j])) + return a < b + }) + + latest, err := strconv.Atoi(path.Base(paths[len(paths)-1])) // would returning a string be a better interface? + if err != nil { + return -1, err + } + + if int64(latest) < idx.bound.Lower { + return int(idx.bound.Lower), nil + } + + return latest + 1, nil +} + +// Delete deletes the index folder from its storage. +func (idx *Autoincrement) Delete() error { + return idx.storage.Delete(context.Background(), idx.indexRootDir) +} diff --git a/pkg/storage/utils/indexer/index/autoincrement_test.go b/pkg/storage/utils/indexer/index/autoincrement_test.go new file mode 100644 index 0000000000..fb0aafbfa4 --- /dev/null +++ b/pkg/storage/utils/indexer/index/autoincrement_test.go @@ -0,0 +1,235 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package index_test + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + + "github.com/cs3org/reva/pkg/storage/utils/indexer/index" + "github.com/cs3org/reva/pkg/storage/utils/indexer/option" + metadata "github.com/cs3org/reva/pkg/storage/utils/metadata" + "github.com/stretchr/testify/assert" +) + +func TestNext(t *testing.T) { + scenarios := []struct { + name string + expected int + indexBy option.IndexBy + }{ + { + name: "get next value", + expected: 0, + indexBy: option.IndexByField("Number"), + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + tmpDir, err := createTmpDirStr() + assert.NoError(t, err) + dataDir := filepath.Join(tmpDir, "data") + + err = os.MkdirAll(dataDir, 0777) + assert.NoError(t, err) + + storage, err := metadata.NewDiskStorage(dataDir) + assert.NoError(t, err) + + i := index.NewAutoincrementIndex( + storage, + option.WithBounds(&option.Bound{ + Lower: 0, + Upper: 0, + }), + option.WithFilesDir(dataDir), + option.WithTypeName("LambdaType"), + option.WithIndexBy(scenario.indexBy), + ) + + err = i.Init() + assert.NoError(t, err) + + tmpFile, err := os.Create(filepath.Join(tmpDir, "data", "test-example")) + assert.NoError(t, err) + assert.NoError(t, tmpFile.Close()) + + oldName, err := i.Add("test-example", "") + assert.NoError(t, err) + assert.Equal(t, "0", filepath.Base(oldName)) + + oldName, err = i.Add("test-example", "") + assert.NoError(t, err) + assert.Equal(t, "1", filepath.Base(oldName)) + + oldName, err = i.Add("test-example", "") + assert.NoError(t, err) + assert.Equal(t, "2", filepath.Base(oldName)) + t.Log(oldName) + + _ = os.RemoveAll(tmpDir) + }) + } +} + +func TestLowerBound(t *testing.T) { + scenarios := []struct { + name string + expected int + indexBy option.IndexBy + entity interface{} + }{ + { + name: "get next value with a lower bound specified", + expected: 0, + indexBy: option.IndexByField("Number"), + }, + } + + for _, scenario := range scenarios { + t.Run(scenario.name, func(t *testing.T) { + tmpDir, err := createTmpDirStr() + assert.NoError(t, err) + dataDir := filepath.Join(tmpDir, "data") + + err = os.MkdirAll(dataDir, 0777) + assert.NoError(t, err) + + storage, err := metadata.NewDiskStorage(dataDir) + assert.NoError(t, err) + + i := index.NewAutoincrementIndex( + storage, + option.WithBounds(&option.Bound{ + Lower: 1000, + }), + option.WithFilesDir(dataDir), + option.WithTypeName("LambdaType"), + option.WithIndexBy(scenario.indexBy), + ) + + err = i.Init() + assert.NoError(t, err) + + tmpFile, err := os.Create(filepath.Join(tmpDir, "data", "test-example")) + assert.NoError(t, err) + assert.NoError(t, tmpFile.Close()) + + oldName, err := i.Add("test-example", "") + assert.NoError(t, err) + assert.Equal(t, "1000", filepath.Base(oldName)) + + oldName, err = i.Add("test-example", "") + assert.NoError(t, err) + assert.Equal(t, "1001", filepath.Base(oldName)) + + oldName, err = i.Add("test-example", "") + assert.NoError(t, err) + assert.Equal(t, "1002", filepath.Base(oldName)) + t.Log(oldName) + + _ = os.RemoveAll(tmpDir) + }) + } +} + +func TestAdd(t *testing.T) { + tmpDir, err := createTmpDirStr() + assert.NoError(t, err) + dataDir := filepath.Join(tmpDir, "data") + + err = os.MkdirAll(dataDir, 0777) + assert.NoError(t, err) + + storage, err := metadata.NewDiskStorage(dataDir) + assert.NoError(t, err) + + tmpFile, err := os.Create(filepath.Join(tmpDir, "data", "test-example")) + assert.NoError(t, err) + assert.NoError(t, tmpFile.Close()) + + i := index.NewAutoincrementIndex( + storage, + option.WithBounds(&option.Bound{ + Lower: 0, + Upper: 0, + }), + option.WithFilesDir(filepath.Join(tmpDir, "data")), + option.WithTypeName("owncloud.Account"), + option.WithIndexBy(option.IndexByField("UidNumber")), + ) + + err = i.Init() + assert.NoError(t, err) + + _, err = i.Add("test-example", "") + if err != nil { + t.Error(err) + } +} + +func BenchmarkAdd(b *testing.B) { + tmpDir, err := createTmpDirStr() + assert.NoError(b, err) + dataDir := filepath.Join(tmpDir, "data") + + err = os.MkdirAll(dataDir, 0777) + assert.NoError(b, err) + + storage, err := metadata.NewDiskStorage(dataDir) + assert.NoError(b, err) + + tmpFile, err := os.Create(filepath.Join(tmpDir, "data", "test-example")) + assert.NoError(b, err) + assert.NoError(b, tmpFile.Close()) + + i := index.NewAutoincrementIndex( + storage, + option.WithBounds(&option.Bound{ + Lower: 0, + Upper: 0, + }), + option.WithFilesDir(filepath.Join(tmpDir, "data")), + option.WithTypeName("LambdaType"), + option.WithIndexBy(option.IndexByField("Number")), + ) + + err = i.Init() + assert.NoError(b, err) + + for n := 0; n < b.N; n++ { + _, err := i.Add("test-example", "") + if err != nil { + b.Error(err) + } + assert.NoError(b, err) + } +} + +func createTmpDirStr() (string, error) { + name, err := ioutil.TempDir("/tmp", "testfiles-*") + if err != nil { + return "", err + } + + return name, nil +} diff --git a/pkg/storage/utils/indexer/index/index.go b/pkg/storage/utils/indexer/index/index.go new file mode 100644 index 0000000000..1af6fbff72 --- /dev/null +++ b/pkg/storage/utils/indexer/index/index.go @@ -0,0 +1,37 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package index + +import "github.com/cs3org/reva/pkg/storage/utils/indexer/option" + +// Index can be implemented to create new indexer-strategies. See Unique for example. +// Each indexer implementation is bound to one data-column (IndexBy) and a data-type (TypeName) +type Index interface { + Init() error + Lookup(v string) ([]string, error) + Add(id, v string) (string, error) + Remove(id string, v string) error + Update(id, oldV, newV string) error + Search(pattern string) ([]string, error) + CaseInsensitive() bool + IndexBy() option.IndexBy + TypeName() string + FilesDir() string + Delete() error // Delete deletes the index folder from its storage. +} diff --git a/pkg/storage/utils/indexer/index/non_unique.go b/pkg/storage/utils/indexer/index/non_unique.go new file mode 100644 index 0000000000..8c88f3cdfc --- /dev/null +++ b/pkg/storage/utils/indexer/index/non_unique.go @@ -0,0 +1,235 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package index + +import ( + "context" + "os" + "path" + "path/filepath" + "strings" + + idxerrs "github.com/cs3org/reva/pkg/storage/utils/indexer/errors" + "github.com/cs3org/reva/pkg/storage/utils/indexer/option" + metadata "github.com/cs3org/reva/pkg/storage/utils/metadata" +) + +// NonUnique are fields for an index of type non_unique. +type NonUnique struct { + caseInsensitive bool + indexBy option.IndexBy + typeName string + filesDir string + indexBaseDir string + indexRootDir string + + storage metadata.Storage +} + +// NewNonUniqueIndexWithOptions instantiates a new NonUniqueIndex instance. +// /tmp/ocis/accounts/index.cs3/Pets/Bro* +// ├── Brown/ +// │ └── rebef-123 -> /tmp/testfiles-395764020/pets/rebef-123 +// ├── Green/ +// │ ├── goefe-789 -> /tmp/testfiles-395764020/pets/goefe-789 +// │ └── xadaf-189 -> /tmp/testfiles-395764020/pets/xadaf-189 +// └── White/ +// └── wefwe-456 -> /tmp/testfiles-395764020/pets/wefwe-456 +func NewNonUniqueIndexWithOptions(storage metadata.Storage, o ...option.Option) Index { + opts := &option.Options{} + for _, opt := range o { + opt(opts) + } + + return &NonUnique{ + storage: storage, + caseInsensitive: opts.CaseInsensitive, + indexBy: opts.IndexBy, + typeName: opts.TypeName, + filesDir: opts.FilesDir, + indexBaseDir: path.Join(opts.Prefix, "index."+storage.Backend()), + indexRootDir: path.Join(opts.Prefix, "index."+storage.Backend(), strings.Join([]string{"non_unique", opts.TypeName, opts.IndexBy.String()}, ".")), + } +} + +// Init initializes a non_unique index. +func (idx *NonUnique) Init() error { + if err := idx.storage.MakeDirIfNotExist(context.Background(), idx.indexBaseDir); err != nil { + return err + } + + return idx.storage.MakeDirIfNotExist(context.Background(), idx.indexRootDir) +} + +// Lookup exact lookup by value. +func (idx *NonUnique) Lookup(v string) ([]string, error) { + if idx.caseInsensitive { + v = strings.ToLower(v) + } + paths, err := idx.storage.ReadDir(context.Background(), path.Join("/", idx.indexRootDir, v)) + if err != nil { + return nil, err + } + + var matches = make([]string, 0) + for _, p := range paths { + matches = append(matches, path.Base(p)) + } + + return matches, nil +} + +// Add a new value to the index. +func (idx *NonUnique) Add(id, v string) (string, error) { + if v == "" { + return "", nil + } + if idx.caseInsensitive { + v = strings.ToLower(v) + } + + newName := path.Join(idx.indexRootDir, v) + if err := idx.storage.MakeDirIfNotExist(context.Background(), newName); err != nil { + return "", err + } + + if err := idx.storage.CreateSymlink(context.Background(), id, path.Join(newName, id)); err != nil { + if os.IsExist(err) { + return "", &idxerrs.AlreadyExistsErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v} + } + + return "", err + } + + return newName, nil +} + +// Remove a value v from an index. +func (idx *NonUnique) Remove(id string, v string) error { + if v == "" { + return nil + } + if idx.caseInsensitive { + v = strings.ToLower(v) + } + + deletePath := path.Join("/", idx.indexRootDir, v, id) + err := idx.storage.Delete(context.Background(), deletePath) + if err != nil { + return err + } + + toStat := path.Join("/", idx.indexRootDir, v) + infos, err := idx.storage.ReadDir(context.Background(), toStat) + if err != nil { + return err + } + + if len(infos) == 0 { + deletePath = path.Join("/", idx.indexRootDir, v) + err := idx.storage.Delete(context.Background(), deletePath) + if err != nil { + return err + } + } + + return nil +} + +// Update index from to . +func (idx *NonUnique) Update(id, oldV, newV string) error { + if idx.caseInsensitive { + oldV = strings.ToLower(oldV) + newV = strings.ToLower(newV) + } + + if err := idx.Remove(id, oldV); err != nil { + return err + } + + if _, err := idx.Add(id, newV); err != nil { + return err + } + + return nil +} + +// Search allows for glob search on the index. +func (idx *NonUnique) Search(pattern string) ([]string, error) { + if idx.caseInsensitive { + pattern = strings.ToLower(pattern) + } + + foldersMatched := make([]string, 0) + matches := make([]string, 0) + paths, err := idx.storage.ReadDir(context.Background(), idx.indexRootDir) + + if err != nil { + return nil, err + } + + for _, p := range paths { + if found, err := filepath.Match(pattern, path.Base(p)); found { + if err != nil { + return nil, err + } + + foldersMatched = append(foldersMatched, p) + } + } + + for i := range foldersMatched { + paths, _ := idx.storage.ReadDir(context.Background(), foldersMatched[i]) + + for _, p := range paths { + matches = append(matches, path.Base(p)) + } + } + + if len(matches) == 0 { + return nil, &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: pattern} + } + + return matches, nil +} + +// CaseInsensitive undocumented. +func (idx *NonUnique) CaseInsensitive() bool { + return idx.caseInsensitive +} + +// IndexBy undocumented. +func (idx *NonUnique) IndexBy() option.IndexBy { + return idx.indexBy +} + +// TypeName undocumented. +func (idx *NonUnique) TypeName() string { + return idx.typeName +} + +// FilesDir undocumented. +func (idx *NonUnique) FilesDir() string { + return idx.filesDir +} + +// Delete deletes the index folder from its storage. +func (idx *NonUnique) Delete() error { + return idx.storage.Delete(context.Background(), idx.indexRootDir) +} diff --git a/pkg/storage/utils/indexer/index/non_unique_test.go b/pkg/storage/utils/indexer/index/non_unique_test.go new file mode 100644 index 0000000000..b9dac61142 --- /dev/null +++ b/pkg/storage/utils/indexer/index/non_unique_test.go @@ -0,0 +1,128 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package index_test + +import ( + "fmt" + "os" + "path" + "testing" + + "github.com/cs3org/reva/pkg/storage/utils/indexer/errors" + "github.com/cs3org/reva/pkg/storage/utils/indexer/index" + "github.com/cs3org/reva/pkg/storage/utils/indexer/option" + . "github.com/cs3org/reva/pkg/storage/utils/indexer/test" + "github.com/cs3org/reva/pkg/storage/utils/metadata" + "github.com/stretchr/testify/assert" +) + +func TestNonUniqueIndexAdd(t *testing.T) { + sut, dataPath := getNonUniqueIdxSut(t, Pet{}, option.IndexByField("Color")) + + ids, err := sut.Lookup("Green") + assert.NoError(t, err) + assert.EqualValues(t, []string{"goefe-789", "xadaf-189"}, ids) + + ids, err = sut.Lookup("White") + assert.NoError(t, err) + assert.EqualValues(t, []string{"wefwe-456"}, ids) + + ids, err = sut.Lookup("Cyan") + assert.Error(t, err) + assert.Nil(t, ids) + + _ = os.RemoveAll(dataPath) + +} + +func TestNonUniqueIndexUpdate(t *testing.T) { + sut, dataPath := getNonUniqueIdxSut(t, Pet{}, option.IndexByField("Color")) + + err := sut.Update("goefe-789", "Green", "Black") + assert.NoError(t, err) + + err = sut.Update("xadaf-189", "Green", "Black") + assert.NoError(t, err) + + assert.DirExists(t, path.Join(dataPath, fmt.Sprintf("index.disk/non_unique.%v.Color/Black", GetTypeFQN(Pet{})))) + assert.NoDirExists(t, path.Join(dataPath, fmt.Sprintf("index.disk/non_unique.%v.Color/Green", GetTypeFQN(Pet{})))) + + _ = os.RemoveAll(dataPath) +} + +func TestNonUniqueIndexDelete(t *testing.T) { + sut, dataPath := getNonUniqueIdxSut(t, Pet{}, option.IndexByField("Color")) + assert.FileExists(t, path.Join(dataPath, fmt.Sprintf("index.disk/non_unique.%v.Color/Green/goefe-789", GetTypeFQN(Pet{})))) + + err := sut.Remove("goefe-789", "Green") + assert.NoError(t, err) + assert.NoFileExists(t, path.Join(dataPath, fmt.Sprintf("index.disk/non_unique.%v.Color/Green/goefe-789", GetTypeFQN(Pet{})))) + assert.FileExists(t, path.Join(dataPath, fmt.Sprintf("index.disk/non_unique.%v.Color/Green/xadaf-189", GetTypeFQN(Pet{})))) + + _ = os.RemoveAll(dataPath) +} + +func TestNonUniqueIndexSearch(t *testing.T) { + sut, dataPath := getNonUniqueIdxSut(t, Pet{}, option.IndexByField("Email")) + + res, err := sut.Search("Gr*") + + assert.NoError(t, err) + assert.Len(t, res, 2) + + assert.Equal(t, "goefe-789", path.Base(res[0])) + assert.Equal(t, "xadaf-189", path.Base(res[1])) + + _, err = sut.Search("does-not-exist@example.com") + assert.Error(t, err) + assert.IsType(t, &errors.NotFoundErr{}, err) + + _ = os.RemoveAll(dataPath) +} + +// entity: used to get the fully qualified name for the index root path. +func getNonUniqueIdxSut(t *testing.T, entity interface{}, indexBy option.IndexBy) (index.Index, string) { + dataPath, _ := WriteIndexTestData(Data, "ID", "") + storage, err := metadata.NewDiskStorage(dataPath) + if err != nil { + t.Fatal(err) + } + + sut := index.NewNonUniqueIndexWithOptions( + storage, + option.WithTypeName(GetTypeFQN(entity)), + option.WithIndexBy(indexBy), + option.WithFilesDir(path.Join(dataPath, "pets")), + ) + err = sut.Init() + if err != nil { + t.Fatal(err) + } + + for _, u := range Data["pets"] { + pkVal := ValueOf(u, "ID") + idxByVal := ValueOf(u, "Color") + _, err := sut.Add(pkVal, idxByVal) + if err != nil { + t.Fatal(err) + } + } + + return sut, dataPath +} diff --git a/pkg/storage/utils/indexer/index/unique.go b/pkg/storage/utils/indexer/index/unique.go new file mode 100644 index 0000000000..40f0f0014e --- /dev/null +++ b/pkg/storage/utils/indexer/index/unique.go @@ -0,0 +1,211 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package index + +import ( + "context" + "os" + "path" + "path/filepath" + "strings" + + idxerrs "github.com/cs3org/reva/pkg/storage/utils/indexer/errors" + "github.com/cs3org/reva/pkg/storage/utils/indexer/option" + metadata "github.com/cs3org/reva/pkg/storage/utils/metadata" +) + +// Unique are fields for an index of type unique. +type Unique struct { + caseInsensitive bool + indexBy option.IndexBy + typeName string + filesDir string + indexBaseDir string + indexRootDir string + + storage metadata.Storage +} + +// NewUniqueIndexWithOptions instantiates a new UniqueIndex instance. Init() should be +// called afterward to ensure correct on-disk structure. +func NewUniqueIndexWithOptions(storage metadata.Storage, o ...option.Option) Index { + opts := &option.Options{} + for _, opt := range o { + opt(opts) + } + + u := &Unique{ + storage: storage, + caseInsensitive: opts.CaseInsensitive, + indexBy: opts.IndexBy, + typeName: opts.TypeName, + filesDir: opts.FilesDir, + indexBaseDir: path.Join(opts.Prefix, "index."+storage.Backend()), + indexRootDir: path.Join(opts.Prefix, "index."+storage.Backend(), strings.Join([]string{"unique", opts.TypeName, opts.IndexBy.String()}, ".")), + } + + return u +} + +// Init initializes a unique index. +func (idx *Unique) Init() error { + if err := idx.storage.MakeDirIfNotExist(context.Background(), idx.indexBaseDir); err != nil { + return err + } + + return idx.storage.MakeDirIfNotExist(context.Background(), idx.indexRootDir) +} + +// Lookup exact lookup by value. +func (idx *Unique) Lookup(v string) ([]string, error) { + if idx.caseInsensitive { + v = strings.ToLower(v) + } + searchPath := path.Join(idx.indexRootDir, v) + oldname, err := idx.storage.ResolveSymlink(context.Background(), searchPath) + if err != nil { + if os.IsNotExist(err) { + err = &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v} + } + + return nil, err + } + + return []string{oldname}, nil +} + +// Add adds a value to the index, returns the path to the root-document +func (idx *Unique) Add(id, v string) (string, error) { + if v == "" { + return "", nil + } + if idx.caseInsensitive { + v = strings.ToLower(v) + } + target := path.Join(idx.filesDir, id) + newName := path.Join(idx.indexRootDir, v) + if err := idx.storage.CreateSymlink(context.Background(), target, newName); err != nil { + if os.IsExist(err) { + return "", &idxerrs.AlreadyExistsErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v} + } + + return "", err + } + + return newName, nil +} + +// Remove a value v from an index. +func (idx *Unique) Remove(_ string, v string) error { + if v == "" { + return nil + } + if idx.caseInsensitive { + v = strings.ToLower(v) + } + searchPath := path.Join(idx.indexRootDir, v) + _, err := idx.storage.ResolveSymlink(context.Background(), searchPath) + if err != nil { + if os.IsNotExist(err) { + err = &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: v} + } + + return err + } + + deletePath := path.Join("/", idx.indexRootDir, v) + return idx.storage.Delete(context.Background(), deletePath) +} + +// Update index from to . +func (idx *Unique) Update(id, oldV, newV string) error { + if idx.caseInsensitive { + oldV = strings.ToLower(oldV) + newV = strings.ToLower(newV) + } + + if err := idx.Remove(id, oldV); err != nil { + return err + } + + if _, err := idx.Add(id, newV); err != nil { + return err + } + + return nil +} + +// Search allows for glob search on the index. +func (idx *Unique) Search(pattern string) ([]string, error) { + if idx.caseInsensitive { + pattern = strings.ToLower(pattern) + } + + paths, err := idx.storage.ReadDir(context.Background(), idx.indexRootDir) + if err != nil { + return nil, err + } + + searchPath := idx.indexRootDir + matches := make([]string, 0) + for _, p := range paths { + if found, err := filepath.Match(pattern, path.Base(p)); found { + if err != nil { + return nil, err + } + + oldPath, err := idx.storage.ResolveSymlink(context.Background(), path.Join(searchPath, path.Base(p))) + if err != nil { + return nil, err + } + matches = append(matches, oldPath) + } + } + + if len(matches) == 0 { + return nil, &idxerrs.NotFoundErr{TypeName: idx.typeName, IndexBy: idx.indexBy, Value: pattern} + } + + return matches, nil +} + +// CaseInsensitive undocumented. +func (idx *Unique) CaseInsensitive() bool { + return idx.caseInsensitive +} + +// IndexBy undocumented. +func (idx *Unique) IndexBy() option.IndexBy { + return idx.indexBy +} + +// TypeName undocumented. +func (idx *Unique) TypeName() string { + return idx.typeName +} + +// FilesDir undocumented. +func (idx *Unique) FilesDir() string { + return idx.filesDir +} + +// Delete deletes the index folder from its storage. +func (idx *Unique) Delete() error { + return idx.storage.Delete(context.Background(), idx.indexRootDir) +} diff --git a/pkg/storage/utils/indexer/index/unique_test.go b/pkg/storage/utils/indexer/index/unique_test.go new file mode 100644 index 0000000000..1677d0d0cb --- /dev/null +++ b/pkg/storage/utils/indexer/index/unique_test.go @@ -0,0 +1,143 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package index_test + +import ( + "os" + "path" + "testing" + + "github.com/cs3org/reva/pkg/storage/utils/indexer/errors" + "github.com/cs3org/reva/pkg/storage/utils/indexer/index" + "github.com/cs3org/reva/pkg/storage/utils/indexer/option" + . "github.com/cs3org/reva/pkg/storage/utils/indexer/test" + "github.com/cs3org/reva/pkg/storage/utils/metadata" + "github.com/stretchr/testify/assert" +) + +func TestUniqueLookupSingleEntry(t *testing.T) { + uniq, dataDir := getUniqueIdxSut(t, option.IndexByField("Email"), User{}) + filesDir := path.Join(dataDir, "users") + + t.Log("existing lookup") + resultPath, err := uniq.Lookup("mikey@example.com") + assert.NoError(t, err) + + assert.Equal(t, []string{path.Join(filesDir, "abcdefg-123")}, resultPath) + + t.Log("non-existing lookup") + resultPath, err = uniq.Lookup("doesnotExists@example.com") + assert.Error(t, err) + assert.IsType(t, &errors.NotFoundErr{}, err) + assert.Empty(t, resultPath) + + _ = os.RemoveAll(dataDir) + +} + +func TestUniqueUniqueConstraint(t *testing.T) { + uniq, dataDir := getUniqueIdxSut(t, option.IndexByField("Email"), User{}) + + _, err := uniq.Add("abcdefg-123", "mikey@example.com") + assert.Error(t, err) + assert.IsType(t, &errors.AlreadyExistsErr{}, err) + + _ = os.RemoveAll(dataDir) +} + +func TestUniqueRemove(t *testing.T) { + uniq, dataDir := getUniqueIdxSut(t, option.IndexByField("Email"), User{}) + + err := uniq.Remove("", "mikey@example.com") + assert.NoError(t, err) + + _, err = uniq.Lookup("mikey@example.com") + assert.Error(t, err) + assert.IsType(t, &errors.NotFoundErr{}, err) + + _ = os.RemoveAll(dataDir) +} + +func TestUniqueUpdate(t *testing.T) { + uniq, dataDir := getUniqueIdxSut(t, option.IndexByField("Email"), User{}) + + t.Log("successful update") + err := uniq.Update("", "mikey@example.com", "mikey2@example.com") + assert.NoError(t, err) + + t.Log("failed update because not found") + err = uniq.Update("", "nonexisting@example.com", "something2@example.com") + assert.Error(t, err) + assert.IsType(t, &errors.NotFoundErr{}, err) + + _ = os.RemoveAll(dataDir) +} + +func TestUniqueIndexSearch(t *testing.T) { + sut, dataDir := getUniqueIdxSut(t, option.IndexByField("Email"), User{}) + + res, err := sut.Search("j*@example.com") + + assert.NoError(t, err) + assert.Len(t, res, 2) + + assert.Equal(t, "ewf4ofk-555", path.Base(res[0])) + assert.Equal(t, "rulan54-777", path.Base(res[1])) + + _, err = sut.Search("does-not-exist@example.com") + assert.Error(t, err) + assert.IsType(t, &errors.NotFoundErr{}, err) + + _ = os.RemoveAll(dataDir) +} + +func TestErrors(t *testing.T) { + assert.True(t, errors.IsAlreadyExistsErr(&errors.AlreadyExistsErr{})) + assert.True(t, errors.IsNotFoundErr(&errors.NotFoundErr{})) +} + +func getUniqueIdxSut(t *testing.T, indexBy option.IndexBy, entityType interface{}) (index.Index, string) { + dataPath, _ := WriteIndexTestData(Data, "ID", "") + storage, err := metadata.NewDiskStorage(dataPath) + if err != nil { + t.Fatal(err) + } + + sut := index.NewUniqueIndexWithOptions( + storage, + option.WithTypeName(GetTypeFQN(entityType)), + option.WithIndexBy(indexBy), + option.WithFilesDir(path.Join(dataPath, "users")), + ) + err = sut.Init() + if err != nil { + t.Fatal(err) + } + + for _, u := range Data["users"] { + pkVal := ValueOf(u, "ID") + idxByVal := ValueOf(u, "Email") + _, err := sut.Add(pkVal, idxByVal) + if err != nil { + t.Fatal(err) + } + } + + return sut, dataPath +} diff --git a/pkg/storage/utils/indexer/indexer.go b/pkg/storage/utils/indexer/indexer.go new file mode 100644 index 0000000000..3e136696ab --- /dev/null +++ b/pkg/storage/utils/indexer/indexer.go @@ -0,0 +1,433 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +// Package indexer provides symlink-based indexer for on-disk document-directories. +package indexer + +import ( + "context" + "errors" + "fmt" + "path" + "strings" + + "github.com/CiscoM31/godata" + "github.com/iancoleman/strcase" + + errorspkg "github.com/cs3org/reva/pkg/storage/utils/indexer/errors" + "github.com/cs3org/reva/pkg/storage/utils/indexer/index" + "github.com/cs3org/reva/pkg/storage/utils/indexer/option" + "github.com/cs3org/reva/pkg/storage/utils/metadata" + "github.com/cs3org/reva/pkg/storage/utils/sync" +) + +// Indexer is a facade to configure and query over multiple indices. +type Indexer struct { + storage metadata.Storage + indices typeMap + mu sync.NamedRWMutex +} + +// IdxAddResult represents the result of an Add call on an index +type IdxAddResult struct { + Field, Value string +} + +// CreateIndexer creates a new Indexer. +func CreateIndexer(storage metadata.Storage) *Indexer { + return &Indexer{ + storage: storage, + indices: typeMap{}, + mu: sync.NewNamedRWMutex(), + } +} + +// Reset takes care of deleting all indices from storage and from the internal map of indices +func (i *Indexer) Reset() error { + for j := range i.indices { + for _, indices := range i.indices[j].IndicesByField { + for _, idx := range indices { + err := idx.Delete() + if err != nil { + return err + } + } + } + delete(i.indices, j) + } + + return nil +} + +// AddIndex adds a new index to the indexer receiver. +func (i *Indexer) AddIndex(t interface{}, indexBy option.IndexBy, pkName, entityDirName, indexType string, bound *option.Bound, caseInsensitive bool) error { + var idx index.Index + + var f func(metadata.Storage, ...option.Option) index.Index + switch indexType { + case "unique": + f = index.NewUniqueIndexWithOptions + case "non_unique": + f = index.NewNonUniqueIndexWithOptions + case "autoincrement": + f = index.NewAutoincrementIndex + default: + return fmt.Errorf("invalid index type: %s", indexType) + } + idx = f( + i.storage, + option.CaseInsensitive(caseInsensitive), + option.WithBounds(bound), + option.WithIndexBy(indexBy), + option.WithTypeName(getTypeFQN(t)), + ) + + i.indices.addIndex(getTypeFQN(t), pkName, idx) + return idx.Init() +} + +// Add a new entry to the indexer +func (i *Indexer) Add(t interface{}) ([]IdxAddResult, error) { + typeName := getTypeFQN(t) + + i.mu.Lock(typeName) + defer i.mu.Unlock(typeName) + + var results []IdxAddResult + if fields, ok := i.indices[typeName]; ok { + for _, indices := range fields.IndicesByField { + for _, idx := range indices { + pkVal, err := valueOf(t, option.IndexByField(fields.PKFieldName)) + if err != nil { + return []IdxAddResult{}, err + } + idxByVal, err := valueOf(t, idx.IndexBy()) + if err != nil { + return []IdxAddResult{}, err + } + value, err := idx.Add(pkVal, idxByVal) + if err != nil { + return []IdxAddResult{}, err + } + if value == "" { + continue + } + results = append(results, IdxAddResult{Field: idx.IndexBy().String(), Value: value}) + } + } + } + + return results, nil +} + +// FindBy finds a value on an index by field and value. +func (i *Indexer) FindBy(t interface{}, findBy, val string) ([]string, error) { + typeName := getTypeFQN(t) + + i.mu.RLock(typeName) + defer i.mu.RUnlock(typeName) + + resultPaths := make([]string, 0) + if fields, ok := i.indices[typeName]; ok { + for _, idx := range fields.IndicesByField[strcase.ToCamel(findBy)] { + idxVal := val + res, err := idx.Lookup(idxVal) + if err != nil { + if errorspkg.IsNotFoundErr(err) { + continue + } + + if err != nil { + return nil, err + } + } + + resultPaths = append(resultPaths, res...) + + } + } + + result := make([]string, 0, len(resultPaths)) + for _, v := range resultPaths { + result = append(result, path.Base(v)) + } + + return result, nil +} + +// Delete deletes all indexed fields of a given type t on the Indexer. +func (i *Indexer) Delete(t interface{}) error { + typeName := getTypeFQN(t) + + i.mu.Lock(typeName) + defer i.mu.Unlock(typeName) + + if fields, ok := i.indices[typeName]; ok { + for _, indices := range fields.IndicesByField { + for _, idx := range indices { + pkVal, err := valueOf(t, option.IndexByField(fields.PKFieldName)) + if err != nil { + return err + } + idxByVal, err := valueOf(t, idx.IndexBy()) + if err != nil { + return err + } + if err := idx.Remove(pkVal, idxByVal); err != nil { + return err + } + } + } + } + + return nil +} + +// FindByPartial allows for glob search across all indexes. +func (i *Indexer) FindByPartial(t interface{}, field string, pattern string) ([]string, error) { + typeName := getTypeFQN(t) + + i.mu.RLock(typeName) + defer i.mu.RUnlock(typeName) + + resultPaths := make([]string, 0) + if fields, ok := i.indices[typeName]; ok { + for _, idx := range fields.IndicesByField[strcase.ToCamel(field)] { + res, err := idx.Search(pattern) + if err != nil { + if errorspkg.IsNotFoundErr(err) { + continue + } + + if err != nil { + return nil, err + } + } + + resultPaths = append(resultPaths, res...) + + } + } + + result := make([]string, 0, len(resultPaths)) + for _, v := range resultPaths { + result = append(result, path.Base(v)) + } + + return result, nil + +} + +// Update updates all indexes on a value to a value . +func (i *Indexer) Update(from, to interface{}) error { + typeNameFrom := getTypeFQN(from) + + i.mu.Lock(typeNameFrom) + defer i.mu.Unlock(typeNameFrom) + + if typeNameTo := getTypeFQN(to); typeNameFrom != typeNameTo { + return fmt.Errorf("update types do not match: from %v to %v", typeNameFrom, typeNameTo) + } + + if fields, ok := i.indices[typeNameFrom]; ok { + for fName, indices := range fields.IndicesByField { + oldV, err := valueOf(from, option.IndexByField(fName)) + if err != nil { + return err + } + newV, err := valueOf(to, option.IndexByField(fName)) + if err != nil { + return err + } + pkVal, err := valueOf(from, option.IndexByField(fields.PKFieldName)) + if err != nil { + return err + } + for _, idx := range indices { + if oldV == newV { + continue + } + if oldV == "" { + if _, err := idx.Add(pkVal, newV); err != nil { + return err + } + continue + } + if newV == "" { + if err := idx.Remove(pkVal, oldV); err != nil { + return err + } + continue + } + if err := idx.Update(pkVal, oldV, newV); err != nil { + return err + } + } + } + } + + return nil +} + +// Query parses an OData query into something our indexer.Index understands and resolves it. +func (i *Indexer) Query(ctx context.Context, t interface{}, q string) ([]string, error) { + query, err := godata.ParseFilterString(ctx, q) + if err != nil { + return nil, err + } + + tree := newQueryTree() + if err := buildTreeFromOdataQuery(query.Tree, &tree); err != nil { + return nil, err + } + + results := make([]string, 0) + if err := i.resolveTree(t, &tree, &results); err != nil { + return nil, err + } + + return results, nil +} + +// t is used to infer the indexed field names. When building an index search query, field names have to respect Golang +// conventions and be in PascalCase. For a better overview on this contemplate reading the reflection package under the +// indexer directory. Traversal of the tree happens in a pre-order fashion. +// TODO implement logic for `and` operators. +func (i *Indexer) resolveTree(t interface{}, tree *queryTree, partials *[]string) error { + if partials == nil { + return errors.New("return value cannot be nil: partials") + } + + if tree.left != nil { + _ = i.resolveTree(t, tree.left, partials) + } + + if tree.right != nil { + _ = i.resolveTree(t, tree.right, partials) + } + + // by the time we're here we reached a leaf node. + if tree.token != nil { + switch tree.token.filterType { + case "FindBy": + operand, err := sanitizeInput(tree.token.operands) + if err != nil { + return err + } + + r, err := i.FindBy(t, operand.field, operand.value) + if err != nil { + return err + } + + *partials = append(*partials, r...) + case "FindByPartial": + operand, err := sanitizeInput(tree.token.operands) + if err != nil { + return err + } + + r, err := i.FindByPartial(t, operand.field, fmt.Sprintf("%v*", operand.value)) + if err != nil { + return err + } + + *partials = append(*partials, r...) + default: + return fmt.Errorf("unsupported filter: %v", tree.token.filterType) + } + } + + *partials = dedup(*partials) + return nil +} + +type indexerTuple struct { + field, value string +} + +// sanitizeInput returns a tuple of fieldName + value to be applied on indexer.Index filters. +func sanitizeInput(operands []string) (*indexerTuple, error) { + if len(operands) != 2 { + return nil, fmt.Errorf("invalid number of operands for filter function: got %v expected 2", len(operands)) + } + + // field names are Go public types and by design they are in PascalCase, therefore we need to adhere to this rules. + // for further information on this have a look at the reflection package. + f := strcase.ToCamel(operands[0]) + + // remove single quotes from value. + v := strings.ReplaceAll(operands[1], "'", "") + return &indexerTuple{ + field: f, + value: v, + }, nil +} + +// buildTreeFromOdataQuery builds an indexer.queryTree out of a GOData ParseNode. The purpose of this intermediate tree +// is to transform godata operators and functions into supported operations on our index. At the time of this writing +// we only support `FindBy` and `FindByPartial` queries as these are the only implemented filters on indexer.Index(es). +func buildTreeFromOdataQuery(root *godata.ParseNode, tree *queryTree) error { + if root.Token.Type == godata.ExpressionTokenFunc { // i.e "startswith", "contains" + switch root.Token.Value { + case "startswith": + token := token{ + operator: root.Token.Value, + filterType: "FindByPartial", + // TODO sanitize the number of operands it the expected one. + operands: []string{ + root.Children[0].Token.Value, // field name, i.e: Name + root.Children[1].Token.Value, // field value, i.e: Jac + }, + } + + tree.insert(&token) + default: + return errors.New("operation not supported") + } + } + + if root.Token.Type == godata.ExpressionTokenLogical { + switch root.Token.Value { + case "or": + tree.insert(&token{operator: root.Token.Value}) + for _, child := range root.Children { + if err := buildTreeFromOdataQuery(child, tree.left); err != nil { + return err + } + } + case "eq": + tree.insert(&token{ + operator: root.Token.Value, + filterType: "FindBy", + operands: []string{ + root.Children[0].Token.Value, + root.Children[1].Token.Value, + }, + }) + for _, child := range root.Children { + if err := buildTreeFromOdataQuery(child, tree.left); err != nil { + return err + } + } + default: + return errors.New("operator not supported") + } + } + return nil +} diff --git a/pkg/storage/utils/indexer/indexer_suite_test.go b/pkg/storage/utils/indexer/indexer_suite_test.go new file mode 100644 index 0000000000..6edb502e70 --- /dev/null +++ b/pkg/storage/utils/indexer/indexer_suite_test.go @@ -0,0 +1,31 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package indexer_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestIndexer(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Indexer Suite") +} diff --git a/pkg/storage/utils/indexer/indexer_test.go b/pkg/storage/utils/indexer/indexer_test.go new file mode 100644 index 0000000000..3d4a50cce6 --- /dev/null +++ b/pkg/storage/utils/indexer/indexer_test.go @@ -0,0 +1,293 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package indexer + +import ( + "context" + "os" + "path" + "testing" + + _ "github.com/cs3org/reva/pkg/storage/utils/indexer/index" + "github.com/cs3org/reva/pkg/storage/utils/indexer/option" + . "github.com/cs3org/reva/pkg/storage/utils/indexer/test" + "github.com/cs3org/reva/pkg/storage/utils/metadata" + "github.com/stretchr/testify/assert" +) + +func TestIndexer_Disk_FindByWithUniqueIndex(t *testing.T) { + dataDir, err := WriteIndexTestData(Data, "ID", "") + assert.NoError(t, err) + indexer := createDiskIndexer(dataDir) + + err = indexer.AddIndex(&User{}, option.IndexByField("UserName"), "ID", "users", "unique", nil, false) + assert.NoError(t, err) + + u := &User{ID: "abcdefg-123", UserName: "mikey", Email: "mikey@example.com"} + _, err = indexer.Add(u) + assert.NoError(t, err) + + res, err := indexer.FindBy(User{}, "UserName", "mikey") + assert.NoError(t, err) + t.Log(res) + + _ = os.RemoveAll(dataDir) +} + +func TestIndexer_Disk_AddWithUniqueIndex(t *testing.T) { + dataDir, err := WriteIndexTestData(Data, "ID", "") + assert.NoError(t, err) + indexer := createDiskIndexer(dataDir) + + err = indexer.AddIndex(&User{}, option.IndexByField("UserName"), "ID", "users", "unique", nil, false) + assert.NoError(t, err) + + u := &User{ID: "abcdefg-123", UserName: "mikey", Email: "mikey@example.com"} + _, err = indexer.Add(u) + assert.NoError(t, err) + + _ = os.RemoveAll(dataDir) +} + +func TestIndexer_Disk_AddWithNonUniqueIndex(t *testing.T) { + dataDir, err := WriteIndexTestData(Data, "ID", "") + assert.NoError(t, err) + indexer := createDiskIndexer(dataDir) + + err = indexer.AddIndex(&Pet{}, option.IndexByField("Kind"), "ID", "pets", "non_unique", nil, false) + assert.NoError(t, err) + + pet1 := Pet{ID: "goefe-789", Kind: "Hog", Color: "Green", Name: "Dicky"} + pet2 := Pet{ID: "xadaf-189", Kind: "Hog", Color: "Green", Name: "Ricky"} + + _, err = indexer.Add(pet1) + assert.NoError(t, err) + + _, err = indexer.Add(pet2) + assert.NoError(t, err) + + res, err := indexer.FindBy(Pet{}, "Kind", "Hog") + assert.NoError(t, err) + + t.Log(res) + + _ = os.RemoveAll(dataDir) +} + +func TestIndexer_Disk_AddWithAutoincrementIndex(t *testing.T) { + dataDir, err := WriteIndexTestData(Data, "ID", "") + assert.NoError(t, err) + indexer := createDiskIndexer(dataDir) + + err = indexer.AddIndex(&User{}, option.IndexByField("UID"), "ID", "users", "autoincrement", &option.Bound{Lower: 5}, false) + assert.NoError(t, err) + + res1, err := indexer.Add(Data["users"][0]) + assert.NoError(t, err) + assert.Equal(t, "UID", res1[0].Field) + assert.Equal(t, "5", path.Base(res1[0].Value)) + + res2, err := indexer.Add(Data["users"][1]) + assert.NoError(t, err) + assert.Equal(t, "UID", res2[0].Field) + assert.Equal(t, "6", path.Base(res2[0].Value)) + + resFindBy, err := indexer.FindBy(User{}, "UID", "6") + assert.NoError(t, err) + assert.Equal(t, "hijklmn-456", resFindBy[0]) + t.Log(resFindBy) + + _ = os.RemoveAll(dataDir) +} + +func TestIndexer_Disk_DeleteWithNonUniqueIndex(t *testing.T) { + dataDir, err := WriteIndexTestData(Data, "ID", "") + assert.NoError(t, err) + indexer := createDiskIndexer(dataDir) + + err = indexer.AddIndex(&Pet{}, option.IndexByField("Kind"), "ID", "pets", "non_unique", nil, false) + assert.NoError(t, err) + + pet1 := Pet{ID: "goefe-789", Kind: "Hog", Color: "Green", Name: "Dicky"} + pet2 := Pet{ID: "xadaf-189", Kind: "Hog", Color: "Green", Name: "Ricky"} + + _, err = indexer.Add(pet1) + assert.NoError(t, err) + + _, err = indexer.Add(pet2) + assert.NoError(t, err) + + err = indexer.Delete(pet2) + assert.NoError(t, err) + + _ = os.RemoveAll(dataDir) +} + +func TestIndexer_Disk_SearchWithNonUniqueIndex(t *testing.T) { + dataDir, err := WriteIndexTestData(Data, "ID", "") + assert.NoError(t, err) + indexer := createDiskIndexer(dataDir) + + err = indexer.AddIndex(&Pet{}, option.IndexByField("Name"), "ID", "pets", "non_unique", nil, false) + assert.NoError(t, err) + + pet1 := Pet{ID: "goefe-789", Kind: "Hog", Color: "Green", Name: "Dicky"} + pet2 := Pet{ID: "xadaf-189", Kind: "Hog", Color: "Green", Name: "Ricky"} + + _, err = indexer.Add(pet1) + assert.NoError(t, err) + + _, err = indexer.Add(pet2) + assert.NoError(t, err) + + res, err := indexer.FindByPartial(pet2, "Name", "*ky") + assert.NoError(t, err) + + t.Log(res) + _ = os.RemoveAll(dataDir) +} + +func TestIndexer_Disk_UpdateWithUniqueIndex(t *testing.T) { + dataDir, err := WriteIndexTestData(Data, "ID", "") + assert.NoError(t, err) + indexer := createDiskIndexer(dataDir) + + err = indexer.AddIndex(&User{}, option.IndexByField("UserName"), "ID", "users", "unique", nil, false) + assert.NoError(t, err) + + err = indexer.AddIndex(&User{}, option.IndexByField("Email"), "ID", "users", "unique", nil, false) + assert.NoError(t, err) + + user1 := &User{ID: "abcdefg-123", UserName: "mikey", Email: "mikey@example.com"} + user2 := &User{ID: "hijklmn-456", UserName: "frank", Email: "frank@example.com"} + + _, err = indexer.Add(user1) + assert.NoError(t, err) + + _, err = indexer.Add(user2) + assert.NoError(t, err) + + err = indexer.Update(user1, &User{ + ID: "abcdefg-123", + UserName: "mikey-new", + Email: "mikey@example.com", + }) + assert.NoError(t, err) + v, err1 := indexer.FindBy(&User{}, "UserName", "mikey-new") + assert.NoError(t, err1) + assert.Len(t, v, 1) + v, err2 := indexer.FindBy(&User{}, "UserName", "mikey") + assert.NoError(t, err2) + assert.Len(t, v, 0) + + err1 = indexer.Update(&User{ + ID: "abcdefg-123", + UserName: "mikey-new", + Email: "mikey@example.com", + }, &User{ + ID: "abcdefg-123", + UserName: "mikey-newest", + Email: "mikey-new@example.com", + }) + assert.NoError(t, err1) + fbUserName, err2 := indexer.FindBy(&User{}, "UserName", "mikey-newest") + assert.NoError(t, err2) + assert.Len(t, fbUserName, 1) + fbEmail, err3 := indexer.FindBy(&User{}, "Email", "mikey-new@example.com") + assert.NoError(t, err3) + assert.Len(t, fbEmail, 1) + + _ = os.RemoveAll(dataDir) +} + +func TestIndexer_Disk_UpdateWithNonUniqueIndex(t *testing.T) { + dataDir, err := WriteIndexTestData(Data, "ID", "") + assert.NoError(t, err) + indexer := createDiskIndexer(dataDir) + + err = indexer.AddIndex(&Pet{}, option.IndexByField("Name"), "ID", "pets", "non_unique", nil, false) + assert.NoError(t, err) + + pet1 := Pet{ID: "goefe-789", Kind: "Hog", Color: "Green", Name: "Dicky"} + pet2 := Pet{ID: "xadaf-189", Kind: "Hog", Color: "Green", Name: "Ricky"} + + _, err = indexer.Add(pet1) + assert.NoError(t, err) + + _, err = indexer.Add(pet2) + assert.NoError(t, err) + + _ = os.RemoveAll(dataDir) +} + +func TestQueryDiskImpl(t *testing.T) { + dataDir, err := WriteIndexTestData(Data, "ID", "") + assert.NoError(t, err) + indexer := createDiskIndexer(dataDir) + ctx := context.Background() + + err = indexer.AddIndex(&Account{}, option.IndexByField("OnPremisesSamAccountName"), "ID", "accounts", "non_unique", nil, false) + assert.NoError(t, err) + + err = indexer.AddIndex(&Account{}, option.IndexByField("Mail"), "ID", "accounts", "non_unique", nil, false) + assert.NoError(t, err) + + err = indexer.AddIndex(&Account{}, option.IndexByField("ID"), "ID", "accounts", "non_unique", nil, false) + assert.NoError(t, err) + + acc := Account{ + ID: "ba5b6e54-e29d-4b2b-8cc4-0a0b958140d2", + Mail: "spooky@skeletons.org", + OnPremisesSamAccountName: "MrDootDoot", + } + + _, err = indexer.Add(acc) + assert.NoError(t, err) + + r, err := indexer.Query(ctx, &Account{}, "on_premises_sam_account_name eq 'MrDootDoot'") // this query will match both pets. + assert.NoError(t, err) + assert.Equal(t, []string{"ba5b6e54-e29d-4b2b-8cc4-0a0b958140d2"}, r) + + r, err = indexer.Query(ctx, &Account{}, "mail eq 'spooky@skeletons.org'") // this query will match both pets. + assert.NoError(t, err) + assert.Equal(t, []string{"ba5b6e54-e29d-4b2b-8cc4-0a0b958140d2"}, r) + + r, err = indexer.Query(ctx, &Account{}, "on_premises_sam_account_name eq 'MrDootDoot' or mail eq 'spooky@skeletons.org'") // this query will match both pets. + assert.NoError(t, err) + assert.Equal(t, []string{"ba5b6e54-e29d-4b2b-8cc4-0a0b958140d2"}, r) + + r, err = indexer.Query(ctx, &Account{}, "startswith(on_premises_sam_account_name,'MrDoo')") // this query will match both pets. + assert.NoError(t, err) + assert.Equal(t, []string{"ba5b6e54-e29d-4b2b-8cc4-0a0b958140d2"}, r) + + r, err = indexer.Query(ctx, &Account{}, "id eq 'ba5b6e54-e29d-4b2b-8cc4-0a0b958140d2' or on_premises_sam_account_name eq 'MrDootDoot'") // this query will match both pets. + assert.NoError(t, err) + assert.Equal(t, []string{"ba5b6e54-e29d-4b2b-8cc4-0a0b958140d2"}, r) + + _ = os.RemoveAll(dataDir) +} + +func createDiskIndexer(dataDir string) *Indexer { + storage, err := metadata.NewDiskStorage(dataDir) + if err != nil { + return nil + } + + return CreateIndexer(storage) +} diff --git a/pkg/storage/utils/indexer/map.go b/pkg/storage/utils/indexer/map.go new file mode 100644 index 0000000000..f792677059 --- /dev/null +++ b/pkg/storage/utils/indexer/map.go @@ -0,0 +1,45 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package indexer + +import "github.com/cs3org/reva/pkg/storage/utils/indexer/index" + +// typeMap stores the indexer layout at runtime. + +type typeMap map[tName]typeMapping +type tName = string +type fieldName = string + +type typeMapping struct { + PKFieldName string + IndicesByField map[fieldName][]index.Index +} + +func (m typeMap) addIndex(typeName string, pkName string, idx index.Index) { + if val, ok := m[typeName]; ok { + val.IndicesByField[idx.IndexBy().String()] = append(val.IndicesByField[idx.IndexBy().String()], idx) + return + } + m[typeName] = typeMapping{ + PKFieldName: pkName, + IndicesByField: map[string][]index.Index{ + idx.IndexBy().String(): {idx}, + }, + } +} diff --git a/pkg/storage/utils/indexer/option/option.go b/pkg/storage/utils/indexer/option/option.go new file mode 100644 index 0000000000..ad1db067c2 --- /dev/null +++ b/pkg/storage/utils/indexer/option/option.go @@ -0,0 +1,105 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package option + +import ( + "github.com/cs3org/reva/pkg/storage/utils/metadata" +) + +// Option defines a single option function. +type Option func(o *Options) + +// IndexBy defines how the data is being indexed +type IndexBy interface { + String() string +} + +// IndexByField represents the field that's being used to index the data by +type IndexByField string + +// String returns a string representation +func (ibf IndexByField) String() string { + return string(ibf) +} + +// IndexByFunc represents a function that's being used to index the data by +type IndexByFunc struct { + Name string + Func func(v interface{}) (string, error) +} + +// String returns a string representation +func (ibf IndexByFunc) String() string { + return ibf.Name +} + +// Bound represents a lower and upper bound range for an index. +// todo: if we would like to provide an upper bound then we would need to deal with ranges, in which case this is why the +// upper bound attribute is here. +type Bound struct { + Lower, Upper int64 +} + +// Options defines the available options for this package. +type Options struct { + CaseInsensitive bool + Bound *Bound + + TypeName string + IndexBy IndexBy + FilesDir string + Prefix string + + Storage metadata.Storage +} + +// CaseInsensitive sets the CaseInsensitive field. +func CaseInsensitive(val bool) Option { + return func(o *Options) { + o.CaseInsensitive = val + } +} + +// WithBounds sets the Bounds field. +func WithBounds(val *Bound) Option { + return func(o *Options) { + o.Bound = val + } +} + +// WithTypeName sets the TypeName option. +func WithTypeName(val string) Option { + return func(o *Options) { + o.TypeName = val + } +} + +// WithIndexBy sets the option IndexBy. +func WithIndexBy(val IndexBy) Option { + return func(o *Options) { + o.IndexBy = val + } +} + +// WithFilesDir sets the option FilesDir. +func WithFilesDir(val string) Option { + return func(o *Options) { + o.FilesDir = val + } +} diff --git a/pkg/storage/utils/indexer/query_tree.go b/pkg/storage/utils/indexer/query_tree.go new file mode 100644 index 0000000000..037e9b825c --- /dev/null +++ b/pkg/storage/utils/indexer/query_tree.go @@ -0,0 +1,58 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package indexer + +type queryTree struct { + token *token + root bool + left *queryTree + right *queryTree +} + +// token to be resolved by the index +type token struct { + operator string // original OData operator. i.e: 'startswith', `or`, `and`. + filterType string // equivalent operator from OData -> indexer i.e FindByPartial or FindBy. + operands []string +} + +// newQueryTree constructs a new tree with a root node. +func newQueryTree() queryTree { + return queryTree{ + root: true, + } +} + +// insert populates first the LHS of the tree first, if this is not possible it fills the RHS. +func (t *queryTree) insert(tkn *token) { + if t != nil && t.root { + t.left = &queryTree{token: tkn} + return + } + + if t.left == nil { + t.left = &queryTree{token: tkn} + return + } + + if t.left != nil && t.right == nil { + t.right = &queryTree{token: tkn} + return + } +} diff --git a/pkg/storage/utils/indexer/reflect.go b/pkg/storage/utils/indexer/reflect.go new file mode 100644 index 0000000000..47d02eb3f1 --- /dev/null +++ b/pkg/storage/utils/indexer/reflect.go @@ -0,0 +1,86 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package indexer + +import ( + "errors" + "fmt" + "path" + "reflect" + "strconv" + "strings" + + "github.com/cs3org/reva/pkg/storage/utils/indexer/option" +) + +func getType(v interface{}) (reflect.Value, error) { + rv := reflect.ValueOf(v) + for rv.Kind() == reflect.Ptr || rv.Kind() == reflect.Interface { + rv = rv.Elem() + } + if !rv.IsValid() { + return reflect.Value{}, errors.New("failed to read value via reflection") + } + + return rv, nil +} + +func getTypeFQN(t interface{}) string { + typ, _ := getType(t) + typeName := path.Join(typ.Type().PkgPath(), typ.Type().Name()) + typeName = strings.ReplaceAll(typeName, "/", ".") + return typeName +} + +func valueOf(v interface{}, indexBy option.IndexBy) (string, error) { + switch idxBy := indexBy.(type) { + case option.IndexByField: + return valueOfField(v, string(idxBy)) + case option.IndexByFunc: + return idxBy.Func(v) + default: + return "", fmt.Errorf("unknown indexBy type") + } +} + +func valueOfField(v interface{}, field string) (string, error) { + parts := strings.Split(field, ".") + for i, part := range parts { + r := reflect.ValueOf(v) + if r.Kind() == reflect.Ptr { + r = r.Elem() + } + f := reflect.Indirect(r).FieldByName(part) + if f.Kind() == reflect.Ptr { + f = f.Elem() + } + + switch { + case f.Kind() == reflect.Struct && i != len(parts)-1: + v = f.Interface() + case f.Kind() == reflect.String: + return f.String(), nil + case f.IsZero(): + return "", nil + default: + return strconv.Itoa(int(f.Int())), nil + } + } + return "", nil +} diff --git a/pkg/storage/utils/indexer/reflect_test.go b/pkg/storage/utils/indexer/reflect_test.go new file mode 100644 index 0000000000..62812f64db --- /dev/null +++ b/pkg/storage/utils/indexer/reflect_test.go @@ -0,0 +1,92 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package indexer + +import ( + "fmt" + "testing" + + "github.com/cs3org/reva/pkg/storage/utils/indexer/option" +) + +func Test_getTypeFQN(t *testing.T) { + type someT struct{} + + type args struct { + t interface{} + } + tests := []struct { + name string + args args + want string + }{ + {name: "ByValue", args: args{&someT{}}, want: "github.com.cs3org.reva.pkg.storage.utils.indexer.someT"}, + {name: "ByRef", args: args{someT{}}, want: "github.com.cs3org.reva.pkg.storage.utils.indexer.someT"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := getTypeFQN(tt.args.t); got != tt.want { + t.Errorf("getTypeFQN() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_valueOf(t *testing.T) { + type nestedDeeplyT struct { + Val string + } + type nestedT struct { + Deeply nestedDeeplyT + } + type someT struct { + val string + Nested nestedT + } + type args struct { + v interface{} + indexBy option.IndexBy + } + tests := []struct { + name string + args args + want string + }{ + {name: "ByValue", args: args{v: someT{val: "hello"}, indexBy: option.IndexByField("val")}, want: "hello"}, + {name: "ByRef", args: args{v: &someT{val: "hello"}, indexBy: option.IndexByField("val")}, want: "hello"}, + {name: "nested", args: args{v: &someT{Nested: nestedT{Deeply: nestedDeeplyT{Val: "nestedHello"}}}, indexBy: option.IndexByField("Nested.Deeply.Val")}, want: "nestedHello"}, + {name: "using a indexFunc", args: args{v: &someT{Nested: nestedT{Deeply: nestedDeeplyT{Val: "nestedHello"}}}, indexBy: option.IndexByFunc{ + Name: "neestedDeeplyVal", + Func: func(i interface{}) (string, error) { + t, ok := i.(*someT) + if !ok { + return "", fmt.Errorf("booo") + } + return t.Nested.Deeply.Val, nil + }, + }}, want: "nestedHello"}, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got, err := valueOf(tt.args.v, tt.args.indexBy); got != tt.want || err != nil { + t.Errorf("valueOf() = %v, want %v", got, tt.want) + } + }) + } +} diff --git a/pkg/storage/utils/indexer/test/data.go b/pkg/storage/utils/indexer/test/data.go new file mode 100644 index 0000000000..94e51fc5d2 --- /dev/null +++ b/pkg/storage/utils/indexer/test/data.go @@ -0,0 +1,120 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package test + +import ( + "encoding/json" + "io/ioutil" + "os" + "path" +) + +// User is a user. +type User struct { + ID, UserName, Email string + UID int +} + +// Pet is a pet. +type Pet struct { + ID, Kind, Color, Name string + UID int +} + +// Account mocks an ocis account. +type Account struct { + ID string + OnPremisesSamAccountName string + Mail string +} + +// Data mock data. +var Data = map[string][]interface{}{ + "users": { + User{ID: "abcdefg-123", UserName: "mikey", Email: "mikey@example.com"}, + User{ID: "hijklmn-456", UserName: "frank", Email: "frank@example.com"}, + User{ID: "ewf4ofk-555", UserName: "jacky", Email: "jacky@example.com"}, + User{ID: "rulan54-777", UserName: "jones", Email: "jones@example.com"}, + }, + "pets": { + Pet{ID: "rebef-123", Kind: "Dog", Color: "Brown", Name: "Waldo"}, + Pet{ID: "wefwe-456", Kind: "Cat", Color: "White", Name: "Snowy"}, + Pet{ID: "goefe-789", Kind: "Hog", Color: "Green", Name: "Dicky"}, + Pet{ID: "xadaf-189", Kind: "Hog", Color: "Green", Name: "Ricky"}, + }, + "accounts": { + Account{ID: "ba5b6e54-e29d-4b2b-8cc4-0a0b958140d2", Mail: "spooky@skeletons.org", OnPremisesSamAccountName: "MrDootDoot"}, + }, +} + +// WriteIndexTestData writes mock data to disk. +func WriteIndexTestData(m map[string][]interface{}, privateKey, dir string) (string, error) { + rootDir, err := getRootDir(dir) + if err != nil { + return "", err + } + + err = writeData(m, privateKey, rootDir) + if err != nil { + return "", err + } + + return rootDir, nil +} + +// getRootDir allows for some minimal behavior on destination on disk. Testing the cs3 api behavior locally means +// keeping track of where the cs3 data lives on disk, this function allows for multiplexing whether or not to use a +// temporary folder or an already defined one. +func getRootDir(dir string) (string, error) { + var rootDir string + var err error + + if dir != "" { + rootDir = dir + } else { + rootDir, err = CreateTmpDir() + if err != nil { + return "", err + } + } + return rootDir, nil +} + +// writeData writes test data to disk on rootDir location Marshaled as json. +func writeData(m map[string][]interface{}, privateKey string, rootDir string) error { + for dirName := range m { + fileTypePath := path.Join(rootDir, dirName) + + if err := os.MkdirAll(fileTypePath, 0755); err != nil { + return err + } + for _, u := range m[dirName] { + data, err := json.Marshal(u) + if err != nil { + return err + } + + pkVal := ValueOf(u, privateKey) + if err := ioutil.WriteFile(path.Join(fileTypePath, pkVal), data, 0600); err != nil { + return err + } + } + } + return nil +} diff --git a/pkg/storage/utils/indexer/test/helpers.go b/pkg/storage/utils/indexer/test/helpers.go new file mode 100644 index 0000000000..b7d9f97f90 --- /dev/null +++ b/pkg/storage/utils/indexer/test/helpers.go @@ -0,0 +1,66 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package test + +import ( + "errors" + "io/ioutil" + "path" + "reflect" + "strings" +) + +// CreateTmpDir creates a temporary dir for tests data. +func CreateTmpDir() (string, error) { + name, err := ioutil.TempDir("/tmp", "testfiles-") + if err != nil { + return "", err + } + + return name, nil +} + +// ValueOf gets the value of a type v on a given field . +func ValueOf(v interface{}, field string) string { + r := reflect.ValueOf(v) + f := reflect.Indirect(r).FieldByName(field) + + return f.String() +} + +func getType(v interface{}) (reflect.Value, error) { + rv := reflect.ValueOf(v) + for rv.Kind() == reflect.Ptr || rv.Kind() == reflect.Interface { + rv = rv.Elem() + } + if !rv.IsValid() { + return reflect.Value{}, errors.New("failed to read value via reflection") + } + + return rv, nil +} + +// GetTypeFQN formats a valid name from a type . This is a duplication of the already existing function in the +// indexer package, but since there is a circular dependency we chose to duplicate it. +func GetTypeFQN(t interface{}) string { + typ, _ := getType(t) + typeName := path.Join(typ.Type().PkgPath(), typ.Type().Name()) + typeName = strings.ReplaceAll(typeName, "/", ".") + return typeName +} diff --git a/pkg/storage/utils/metadata/cs3.go b/pkg/storage/utils/metadata/cs3.go new file mode 100644 index 0000000000..5f6c9b5879 --- /dev/null +++ b/pkg/storage/utils/metadata/cs3.go @@ -0,0 +1,372 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package metadata + +import ( + "bytes" + "context" + "errors" + "fmt" + "io/ioutil" + "net/http" + "os" + + gateway "github.com/cs3org/go-cs3apis/cs3/gateway/v1beta1" + user "github.com/cs3org/go-cs3apis/cs3/identity/user/v1beta1" + rpc "github.com/cs3org/go-cs3apis/cs3/rpc/v1beta1" + provider "github.com/cs3org/go-cs3apis/cs3/storage/provider/v1beta1" + types "github.com/cs3org/go-cs3apis/cs3/types/v1beta1" + ctxpkg "github.com/cs3org/reva/pkg/ctx" + "github.com/cs3org/reva/pkg/errtypes" + "github.com/cs3org/reva/pkg/rgrpc/status" + "github.com/cs3org/reva/pkg/rgrpc/todo/pool" + "github.com/cs3org/reva/pkg/utils" + "google.golang.org/grpc/metadata" +) + +// CS3 represents a metadata storage with a cs3 storage backend +type CS3 struct { + providerAddr string + gatewayAddr string + serviceUser *user.User + machineAuthAPIKey string + dataGatewayClient *http.Client + SpaceRoot *provider.ResourceId +} + +// NewCS3Storage returns a new cs3 storage instance +func NewCS3Storage(gwAddr, providerAddr, serviceUser, machineAuthAPIKey string) (s Storage, err error) { + c := http.DefaultClient + + return &CS3{ + providerAddr: providerAddr, + gatewayAddr: gwAddr, + dataGatewayClient: c, + machineAuthAPIKey: machineAuthAPIKey, + serviceUser: &user.User{ + Id: &user.UserId{ + OpaqueId: serviceUser, + }, + }, + }, nil +} + +// Backend returns the backend name of the storage +func (cs3 *CS3) Backend() string { + return "cs3" +} + +// Init creates the metadata space +func (cs3 *CS3) Init(ctx context.Context, spaceid string) (err error) { + client, err := cs3.providerClient() + if err != nil { + return err + } + + ctx, err = cs3.getAuthContext(ctx) + if err != nil { + return err + } + // FIXME change CS3 api to allow sending a space id + cssr, err := client.CreateStorageSpace(ctx, &provider.CreateStorageSpaceRequest{ + Opaque: &types.Opaque{ + Map: map[string]*types.OpaqueEntry{ + "spaceid": { + Decoder: "plain", + Value: []byte(spaceid), + }, + }, + }, + Owner: cs3.serviceUser, + Name: "Metadata", + Type: "metadata", + }) + switch { + case err != nil: + return err + case cssr.Status.Code == rpc.Code_CODE_OK: + cs3.SpaceRoot = cssr.StorageSpace.Root + case cssr.Status.Code == rpc.Code_CODE_ALREADY_EXISTS: + // TODO make CreateStorageSpace return existing space? + cs3.SpaceRoot = &provider.ResourceId{StorageId: spaceid, OpaqueId: spaceid} + default: + return errtypes.NewErrtypeFromStatus(cssr.Status) + } + return nil +} + +// SimpleUpload uploads a file to the metadata storage +func (cs3 *CS3) SimpleUpload(ctx context.Context, uploadpath string, content []byte) error { + client, err := cs3.providerClient() + if err != nil { + return err + } + ctx, err = cs3.getAuthContext(ctx) + if err != nil { + return err + } + + ref := provider.InitiateFileUploadRequest{ + Ref: &provider.Reference{ + ResourceId: cs3.SpaceRoot, + Path: utils.MakeRelativePath(uploadpath), + }, + } + + res, err := client.InitiateFileUpload(ctx, &ref) + if err != nil { + return err + } + if res.Status.Code != rpc.Code_CODE_OK { + return status.NewErrorFromCode(res.Status.Code, "cs3 metadata SimpleUpload") + } + + var endpoint string + + for _, proto := range res.GetProtocols() { + if proto.Protocol == "simple" { + endpoint = proto.GetUploadEndpoint() + break + } + } + if endpoint == "" { + return errors.New("metadata storage doesn't support the simple upload protocol") + } + + req, err := http.NewRequest(http.MethodPut, endpoint, bytes.NewReader(content)) + if err != nil { + return err + } + + md, _ := metadata.FromOutgoingContext(ctx) + req.Header.Add(ctxpkg.TokenHeader, md.Get(ctxpkg.TokenHeader)[0]) + resp, err := cs3.dataGatewayClient.Do(req) + if err != nil { + return err + } + return resp.Body.Close() +} + +// SimpleDownload reads a file from the metadata storage +func (cs3 *CS3) SimpleDownload(ctx context.Context, downloadpath string) (content []byte, err error) { + client, err := cs3.providerClient() + if err != nil { + return nil, err + } + ctx, err = cs3.getAuthContext(ctx) + if err != nil { + return nil, err + } + + dreq := provider.InitiateFileDownloadRequest{ + Ref: &provider.Reference{ + ResourceId: cs3.SpaceRoot, + Path: utils.MakeRelativePath(downloadpath), + }, + } + + res, err := client.InitiateFileDownload(ctx, &dreq) + if err != nil { + return []byte{}, errtypes.NotFound(dreq.Ref.Path) + } + + var endpoint string + + for _, proto := range res.GetProtocols() { + if proto.Protocol == "spaces" { + endpoint = proto.GetDownloadEndpoint() + break + } + } + if endpoint == "" { + return []byte{}, errors.New("metadata storage doesn't support the spaces download protocol") + } + + req, err := http.NewRequest(http.MethodGet, endpoint, nil) + if err != nil { + return []byte{}, err + } + + md, _ := metadata.FromOutgoingContext(ctx) + req.Header.Add(ctxpkg.TokenHeader, md.Get(ctxpkg.TokenHeader)[0]) + resp, err := cs3.dataGatewayClient.Do(req) + if err != nil { + return []byte{}, err + } + + if resp.StatusCode != http.StatusOK { + return []byte{}, errtypes.NotFound(dreq.Ref.Path) + } + + b, err := ioutil.ReadAll(resp.Body) + if err != nil { + return []byte{}, err + } + + if err = resp.Body.Close(); err != nil { + return []byte{}, err + } + + return b, nil +} + +// Delete deletes a path +func (cs3 *CS3) Delete(ctx context.Context, path string) error { + client, err := cs3.providerClient() + if err != nil { + return err + } + ctx, err = cs3.getAuthContext(ctx) + if err != nil { + return err + } + + res, err := client.Delete(ctx, &provider.DeleteRequest{ + Ref: &provider.Reference{ + ResourceId: cs3.SpaceRoot, + Path: utils.MakeRelativePath(path), + }, + }) + if err != nil { + return err + } + if res.Status.Code != rpc.Code_CODE_OK { + return fmt.Errorf("error deleting path: %v", path) + } + + return nil +} + +// ReadDir returns the entries in a given directory +func (cs3 *CS3) ReadDir(ctx context.Context, path string) ([]string, error) { + client, err := cs3.providerClient() + if err != nil { + return nil, err + } + ctx, err = cs3.getAuthContext(ctx) + if err != nil { + return nil, err + } + + relPath := utils.MakeRelativePath(path) + res, err := client.ListContainer(ctx, &provider.ListContainerRequest{ + Ref: &provider.Reference{ + ResourceId: cs3.SpaceRoot, + Path: relPath, + }, + }) + + if err != nil { + return nil, err + } + if res.Status.Code != rpc.Code_CODE_OK { + return nil, fmt.Errorf("error listing directory: %v", path) + } + + entries := []string{} + for _, ri := range res.Infos { + entries = append(entries, ri.Path) + } + return entries, nil +} + +// MakeDirIfNotExist will create a root node in the metadata storage. Requires an authenticated context. +func (cs3 *CS3) MakeDirIfNotExist(ctx context.Context, folder string) error { + client, err := cs3.providerClient() + if err != nil { + return err + } + ctx, err = cs3.getAuthContext(ctx) + if err != nil { + return err + } + + var rootPathRef = &provider.Reference{ + ResourceId: cs3.SpaceRoot, + Path: utils.MakeRelativePath(folder), + } + + resp, err := client.Stat(ctx, &provider.StatRequest{ + Ref: rootPathRef, + }) + + if err != nil { + return err + } + + if resp.Status.Code == rpc.Code_CODE_NOT_FOUND { + _, err := client.CreateContainer(ctx, &provider.CreateContainerRequest{ + Ref: rootPathRef, + }) + + if err != nil { + return err + } + } + + return nil +} + +// CreateSymlink creates a symlink +func (cs3 *CS3) CreateSymlink(ctx context.Context, oldname, newname string) error { + if _, err := cs3.ResolveSymlink(ctx, newname); err == nil { + return os.ErrExist + } + + return cs3.SimpleUpload(ctx, newname, []byte(oldname)) +} + +// ResolveSymlink resolves a symlink +func (cs3 *CS3) ResolveSymlink(ctx context.Context, name string) (string, error) { + b, err := cs3.SimpleDownload(ctx, name) + if err != nil { + if errors.Is(err, errtypes.NotFound("")) { + return "", os.ErrNotExist + } + return "", err + } + + return string(b), err +} + +func (cs3 *CS3) providerClient() (provider.ProviderAPIClient, error) { + return pool.GetStorageProviderServiceClient(cs3.providerAddr) +} + +func (cs3 *CS3) getAuthContext(ctx context.Context) (context.Context, error) { + client, err := pool.GetGatewayServiceClient(cs3.gatewayAddr) + if err != nil { + return nil, err + } + + authCtx := ctxpkg.ContextSetUser(context.Background(), cs3.serviceUser) + authRes, err := client.Authenticate(authCtx, &gateway.AuthenticateRequest{ + Type: "machine", + ClientId: "userid:" + cs3.serviceUser.Id.OpaqueId, + ClientSecret: cs3.machineAuthAPIKey, + }) + if err != nil { + return nil, err + } + if authRes.GetStatus().GetCode() != rpc.Code_CODE_OK { + return nil, errtypes.NewErrtypeFromStatus(authRes.GetStatus()) + } + authCtx = metadata.AppendToOutgoingContext(authCtx, ctxpkg.TokenHeader, authRes.Token) + return authCtx, nil +} diff --git a/pkg/storage/utils/metadata/disk.go b/pkg/storage/utils/metadata/disk.go new file mode 100644 index 0000000000..dfbc0500bb --- /dev/null +++ b/pkg/storage/utils/metadata/disk.go @@ -0,0 +1,96 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package metadata + +import ( + "context" + "io/ioutil" + "os" + "path" +) + +// Disk represents a disk metadata storage +type Disk struct { + dataDir string +} + +// NewDiskStorage returns a new disk storage instance +func NewDiskStorage(dataDir string) (s Storage, err error) { + return &Disk{ + dataDir: dataDir, + }, nil +} + +// Init creates the metadata space +func (disk *Disk) Init(_ context.Context, _ string) (err error) { + return os.MkdirAll(disk.dataDir, 0777) +} + +// Backend returns the backend name of the storage +func (disk *Disk) Backend() string { + return "disk" +} + +// SimpleUpload stores a file on disk +func (disk *Disk) SimpleUpload(_ context.Context, uploadpath string, content []byte) error { + return os.WriteFile(disk.targetPath(uploadpath), content, 0644) +} + +// SimpleDownload reads a file from disk +func (disk *Disk) SimpleDownload(_ context.Context, downloadpath string) ([]byte, error) { + return os.ReadFile(disk.targetPath(downloadpath)) +} + +// Delete deletes a path +func (disk *Disk) Delete(_ context.Context, path string) error { + return os.Remove(disk.targetPath(path)) +} + +// ReadDir returns the resource infos in a given directory +func (disk *Disk) ReadDir(_ context.Context, p string) ([]string, error) { + infos, err := ioutil.ReadDir(disk.targetPath(p)) + if err != nil { + return nil, err + } + + entries := make([]string, 0, len(infos)) + for _, entry := range infos { + entries = append(entries, path.Join(p, entry.Name())) + } + return entries, nil +} + +// MakeDirIfNotExist will create a root node in the metadata storage. Requires an authenticated context. +func (disk *Disk) MakeDirIfNotExist(_ context.Context, path string) error { + return os.MkdirAll(disk.targetPath(path), 0777) +} + +// CreateSymlink creates a symlink +func (disk *Disk) CreateSymlink(_ context.Context, oldname, newname string) error { + return os.Symlink(oldname, disk.targetPath(newname)) +} + +// ResolveSymlink resolves a symlink +func (disk *Disk) ResolveSymlink(_ context.Context, path string) (string, error) { + return os.Readlink(disk.targetPath(path)) +} + +func (disk *Disk) targetPath(p string) string { + return path.Join(disk.dataDir, p) +} diff --git a/pkg/storage/utils/metadata/storage.go b/pkg/storage/utils/metadata/storage.go new file mode 100644 index 0000000000..599f6e3110 --- /dev/null +++ b/pkg/storage/utils/metadata/storage.go @@ -0,0 +1,40 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package metadata + +import ( + "context" +) + +// Storage is the interface to maintain metadata in a storage +type Storage interface { + Backend() string + + Init(ctx context.Context, name string) (err error) + SimpleUpload(ctx context.Context, uploadpath string, content []byte) error + SimpleDownload(ctx context.Context, path string) ([]byte, error) + Delete(ctx context.Context, path string) error + + ReadDir(ctx context.Context, path string) ([]string, error) + + CreateSymlink(ctx context.Context, oldname, newname string) error + ResolveSymlink(ctx context.Context, name string) (string, error) + + MakeDirIfNotExist(ctx context.Context, name string) error +} diff --git a/pkg/storage/utils/sync/mutex.go b/pkg/storage/utils/sync/mutex.go new file mode 100644 index 0000000000..f9ce472280 --- /dev/null +++ b/pkg/storage/utils/sync/mutex.go @@ -0,0 +1,67 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package sync + +import ( + "sync" +) + +// NamedRWMutex works the same as RWMutex, the only difference is that it stores mutexes in a map and reuses them. +// It's handy if you want to write-lock, write-unlock, read-lock and read-unlock for specific names only. +type NamedRWMutex struct { + pool sync.Pool + mus sync.Map +} + +// NewNamedRWMutex returns a new instance of NamedRWMutex. +func NewNamedRWMutex() NamedRWMutex { + return NamedRWMutex{pool: sync.Pool{New: func() interface{} { + return new(sync.RWMutex) + }}} +} + +// Lock locks rw for writing. +func (m *NamedRWMutex) Lock(name string) { + m.loadOrStore(name).Lock() +} + +// Unlock unlocks rw for writing. +func (m *NamedRWMutex) Unlock(name string) { + m.loadOrStore(name).Unlock() +} + +// RLock locks rw for reading. +func (m *NamedRWMutex) RLock(name string) { + m.loadOrStore(name).RLock() +} + +// RUnlock undoes a single RLock call. +func (m *NamedRWMutex) RUnlock(name string) { + m.loadOrStore(name).RUnlock() +} + +func (m *NamedRWMutex) loadOrStore(name string) *sync.RWMutex { + pmu := m.pool.Get() + mmu, loaded := m.mus.LoadOrStore(name, pmu) + if loaded { + m.pool.Put(pmu) + } + + return mmu.(*sync.RWMutex) +} diff --git a/pkg/storage/utils/sync/mutex_test.go b/pkg/storage/utils/sync/mutex_test.go new file mode 100644 index 0000000000..ca49a63d29 --- /dev/null +++ b/pkg/storage/utils/sync/mutex_test.go @@ -0,0 +1,51 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package sync + +import ( + "fmt" + "runtime" + "testing" +) + +func HammerMutex(m *NamedRWMutex, loops int, c chan bool) { + for i := 0; i < loops; i++ { + id := fmt.Sprintf("%v", i) + m.Lock(id) + m.Unlock(id) + } + c <- true +} + +func TestNamedRWMutex(t *testing.T) { + if n := runtime.SetMutexProfileFraction(1); n != 0 { + t.Logf("got mutexrate %d expected 0", n) + } + defer runtime.SetMutexProfileFraction(0) + m := NewNamedRWMutex() + c := make(chan bool) + r := 10 + + for i := 0; i < r; i++ { + go HammerMutex(&m, 2000, c) + } + for i := 0; i < r; i++ { + <-c + } +} diff --git a/pkg/storage/utils/sync/sync.go b/pkg/storage/utils/sync/sync.go new file mode 100644 index 0000000000..83a87b9539 --- /dev/null +++ b/pkg/storage/utils/sync/sync.go @@ -0,0 +1,26 @@ +// Copyright 2018-2022 CERN +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. +// +// In applying this license, CERN does not waive the privileges and immunities +// granted to it by virtue of its status as an Intergovernmental Organization +// or submit itself to any jurisdiction. + +package sync + +import "sync" + +var ( + // ParsingViperConfig addresses the fact that config parsing using Viper is not thread safe. + ParsingViperConfig sync.Mutex +) diff --git a/tools/check-license/check-license.go b/tools/check-license/check-license.go index 6e8807d5bc..9520bbc7f3 100644 --- a/tools/check-license/check-license.go +++ b/tools/check-license/check-license.go @@ -30,7 +30,7 @@ import ( var fix = flag.Bool("fix", false, "add header if not present") -var licenseText = `// Copyright 2018-2021 CERN +var licenseText = `// Copyright 2018-2022 CERN // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License.