Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

range queries with pagination #420

Merged
merged 1 commit into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type ServerConf struct {
Database DatabaseConf
// The lengths of various queues that buffer between internal components.
QueueLength QueueLengthConf
// QueryProcessing holds limits associated with query responses
QueryProcessing QueryProcessingConf
// Server logging level.
LogLevel string
// Server TLS configuration, for secure communication with clients.
Expand Down Expand Up @@ -118,6 +120,11 @@ type QueueLengthConf struct {
Block uint32
}

// QueueProcessingConf holds the configuration associated with rich and range query processing
type QueryProcessingConf struct {
ResponseSizeLimitInBytes uint64
}

// BlockCreationConf holds the block creation parameters.
// TODO consider moving this to shared-config if we want to have it consistent across nodes
type BlockCreationConf struct {
Expand Down Expand Up @@ -197,6 +204,7 @@ func readLocalConfig(localConfigFile string) (*LocalConfiguration, error) {

v.SetDefault("server.database.name", "leveldb")
v.SetDefault("server.database.ledgerDirectory", "./tmp/")
v.SetDefault("server.queryProcessing.responseSizeLimitInBytes", 1048576)

if err := v.ReadInConfig(); err != nil {
return nil, errors.Wrap(err, "error reading local config file")
Expand Down
3 changes: 3 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ var expectedLocalConfig = &LocalConfiguration{
ReorderedTransactionBatch: 100,
Block: 100,
},
QueryProcessing: QueryProcessingConf{
ResponseSizeLimitInBytes: 1048576,
},
LogLevel: "info",
TLS: TLSConf{
Enabled: false,
Expand Down
6 changes: 5 additions & 1 deletion config/testdata/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ server:
# queueLength.block denotes the maximum queue length
# of waiting blocks
block: 100
queryProcessing:
# queryProcessing.responseSizeLimitInBytes denotes the maximum
# memory size of the query response
responseSizeLimitInBytes: 1048576
# logLevel can be debug, info, warn, err, and panic
logLevel: info
tls:
Expand Down Expand Up @@ -132,4 +136,4 @@ bootstrap:
# and on-board by fetching the ledger from them, rebuilding the database in the process (not supported yet).
method: genesis
# file contains the initial configuration that will be used to bootstrap the node, as specified by the method, above.
file: ./testdata/3node-shared-config-bootstrap.yml
file: ./testdata/3node-shared-config-bootstrap.yml
33 changes: 28 additions & 5 deletions internal/bcdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type DB interface {
// GetData retrieves values for given key
GetData(dbName, querierUserID, key string) (*types.GetDataResponseEnvelope, error)

// GetDataRange retrieves a range of values
GetDataRange(dbName, querierUserID, startKey, endKey string, limit uint64) (*types.GetDataRangeResponseEnvelope, error)

// DataQuery executes a given JSON query and return key-value pairs which are matching
// the criteria provided in the query. The query is a json marshled bytes which needs
// to contain a top level combinational operator followed by a list of attributes and
Expand Down Expand Up @@ -255,11 +258,12 @@ func NewDB(conf *config.Configurations, logger *logger.SugarLogger) (DB, error)

worldstateQueryProcessor := newWorldstateQueryProcessor(
&worldstateQueryProcessorConfig{
nodeID: localConf.Server.Identity.ID,
db: levelDB,
blockStore: blockStore,
identityQuerier: querier,
logger: logger,
nodeID: localConf.Server.Identity.ID,
db: levelDB,
queryProcessingConf: &localConf.Server.QueryProcessing,
blockStore: blockStore,
identityQuerier: querier,
logger: logger,
},
)

Expand Down Expand Up @@ -541,6 +545,25 @@ func (d *db) GetData(dbName, querierUserID, key string) (*types.GetDataResponseE
}, nil
}

// GetDataRange returns a range of values starting from the start key and till before the end key
func (d *db) GetDataRange(dbName, querierUserID, startKey, endKey string, limit uint64) (*types.GetDataRangeResponseEnvelope, error) {
dataResponse, err := d.worldstateQueryProcessor.getDataRange(dbName, querierUserID, startKey, endKey, limit)
if err != nil {
return nil, err
}

dataResponse.Header = d.responseHeader()
sign, err := d.signature(dataResponse)
if err != nil {
return nil, err
}

return &types.GetDataRangeResponseEnvelope{
Response: dataResponse,
Signature: sign,
}, nil
}

// DataQuery executes a given JSON query and return key-value pairs which are matching
// the criteria provided in the query
func (d *db) DataQuery(ctx context.Context, dbName, querierUserID string, query []byte) (*types.DataQueryResponseEnvelope, error) {
Expand Down
23 changes: 23 additions & 0 deletions internal/bcdb/mocks/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/bcdb/mocks/tx_processor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

115 changes: 100 additions & 15 deletions internal/bcdb/worldstate_query_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"

"github.com/golang/protobuf/proto"
"github.com/hyperledger-labs/orion-server/config"
"github.com/hyperledger-labs/orion-server/internal/blockstore"
"github.com/hyperledger-labs/orion-server/internal/errors"
ierrors "github.com/hyperledger-labs/orion-server/internal/errors"
Expand All @@ -20,28 +21,31 @@ import (
)

type worldstateQueryProcessor struct {
nodeID string
db worldstate.DB
blockStore *blockstore.Store
identityQuerier *identity.Querier
logger *logger.SugarLogger
nodeID string
db worldstate.DB
queryProcessingConf *config.QueryProcessingConf
blockStore *blockstore.Store
identityQuerier *identity.Querier
logger *logger.SugarLogger
}

type worldstateQueryProcessorConfig struct {
nodeID string
db worldstate.DB
blockStore *blockstore.Store
identityQuerier *identity.Querier
logger *logger.SugarLogger
nodeID string
db worldstate.DB
queryProcessingConf *config.QueryProcessingConf
blockStore *blockstore.Store
identityQuerier *identity.Querier
logger *logger.SugarLogger
}

func newWorldstateQueryProcessor(conf *worldstateQueryProcessorConfig) *worldstateQueryProcessor {
return &worldstateQueryProcessor{
nodeID: conf.nodeID,
db: conf.db,
blockStore: conf.blockStore,
identityQuerier: conf.identityQuerier,
logger: conf.logger,
nodeID: conf.nodeID,
db: conf.db,
queryProcessingConf: conf.queryProcessingConf,
blockStore: conf.blockStore,
identityQuerier: conf.identityQuerier,
logger: conf.logger,
}
}

Expand Down Expand Up @@ -126,6 +130,87 @@ func (q *worldstateQueryProcessor) getData(dbName, querierUserID, key string) (*
}, nil
}

// getDataRange return the state associated with a given key
func (q *worldstateQueryProcessor) getDataRange(dbName, querierUserID, startKey, endKey string, limit uint64) (*types.GetDataRangeResponse, error) {
if worldstate.IsSystemDB(dbName) {
return nil, &errors.PermissionErr{
ErrMsg: "no user can directly read from a system database [" + dbName + "]. " +
"To read from a system database, use /config, /user, /db rest endpoints instead of /data",
}
}

hasPerm, err := q.identityQuerier.HasReadAccessOnDataDB(querierUserID, dbName)
if err != nil {
return nil, err
}
if !hasPerm {
return nil, &errors.PermissionErr{
ErrMsg: "the user [" + querierUserID + "] has no permission to read from database [" + dbName + "]",
}
}

var kvs []*types.KVWithMetadata
var resultCount uint64
var size uint64
var pendingResult bool
var nextStartKey string

itr, err := q.db.GetIterator(dbName, startKey, endKey)
defer itr.Release()
if err != nil {
return nil, err
}

for itr.Next() {
k := string(itr.Key())
v := &types.ValueWithMetadata{}
if err := proto.Unmarshal(itr.Value(), v); err != nil {
return nil, err
}

acl := v.GetMetadata().GetAccessControl()
if acl != nil {
if !acl.ReadUsers[querierUserID] && !acl.ReadWriteUsers[querierUserID] {
continue
}
}

if limit > 0 {
resultCount++
if resultCount > limit {
pendingResult = true
nextStartKey = k
break
}
}

size += uint64(len(k) + len(itr.Value()))
if size > q.queryProcessingConf.ResponseSizeLimitInBytes {
pendingResult = true
nextStartKey = k
if len(kvs) != 0 {
break
}

return nil, &errors.ServerRestrictionError{
ErrMsg: fmt.Sprintf("response size limit for queries is configured as %d bytes but a single record size itself is %d bytes. Increase the query response size limit at the server", q.queryProcessingConf.ResponseSizeLimitInBytes, size),
}
}

kvs = append(kvs, &types.KVWithMetadata{
Key: k,
Value: v.GetValue(),
Metadata: v.GetMetadata(),
})
}

return &types.GetDataRangeResponse{
KVs: kvs,
PendingResult: pendingResult,
NextStartKey: nextStartKey,
}, nil
}

func (q *worldstateQueryProcessor) getUser(querierUserID, targetUserID string) (*types.GetUserResponse, error) {
user, metadata, err := q.identityQuerier.GetUser(targetUserID)
if err != nil {
Expand Down
Loading