Skip to content

Commit

Permalink
range queries with pagination
Browse files Browse the repository at this point in the history
This commit adds range query on a given database.
`GET database?startkey=k1&endkey=k2` would be used
by the client to range over a set of keys. If the
query record limit is reached, the response would
include the next start key.

Signed-off-by: senthil <cendhu@gmail.com>
  • Loading branch information
cendhu committed Jun 21, 2022
1 parent 0ccfa19 commit 0d2419b
Show file tree
Hide file tree
Showing 19 changed files with 1,495 additions and 248 deletions.
8 changes: 8 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type ServerConf struct {
Database DatabaseConf
// The lengths of various queues that buffer between internal components.
QueueLength QueueLengthConf
// QueryProcessing holds limits associated with query responses
QueryProcessing QueryProcessingConf
// Server logging level.
LogLevel string
// Server TLS configuration, for secure communication with clients.
Expand Down Expand Up @@ -118,6 +120,11 @@ type QueueLengthConf struct {
Block uint32
}

// QueueProcessingConf holds the configuration associated with rich and range query processing
type QueryProcessingConf struct {
ResponseSizeLimit uint32
}

// BlockCreationConf holds the block creation parameters.
// TODO consider moving this to shared-config if we want to have it consistent across nodes
type BlockCreationConf struct {
Expand Down Expand Up @@ -197,6 +204,7 @@ func readLocalConfig(localConfigFile string) (*LocalConfiguration, error) {

v.SetDefault("server.database.name", "leveldb")
v.SetDefault("server.database.ledgerDirectory", "./tmp/")
v.SetDefault("server.queryProcessing.responseSizeLimit", 10)

if err := v.ReadInConfig(); err != nil {
return nil, errors.Wrap(err, "error reading local config file")
Expand Down
3 changes: 3 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,9 @@ var expectedLocalConfig = &LocalConfiguration{
ReorderedTransactionBatch: 100,
Block: 100,
},
QueryProcessing: QueryProcessingConf{
ResponseSizeLimit: 10,
},
LogLevel: "info",
TLS: TLSConf{
Enabled: false,
Expand Down
6 changes: 5 additions & 1 deletion config/testdata/config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,10 @@ server:
# queueLength.block denotes the maximum queue length
# of waiting blocks
block: 100
queryProcessing:
# queryProcessing.responseSizeLimit denotes the maximum
# memory size of the query response in MB
responseSizeLimit: 10
# logLevel can be debug, info, warn, err, and panic
logLevel: info
tls:
Expand Down Expand Up @@ -132,4 +136,4 @@ bootstrap:
# and on-board by fetching the ledger from them, rebuilding the database in the process (not supported yet).
method: genesis
# file contains the initial configuration that will be used to bootstrap the node, as specified by the method, above.
file: ./testdata/3node-shared-config-bootstrap.yml
file: ./testdata/3node-shared-config-bootstrap.yml
33 changes: 28 additions & 5 deletions internal/bcdb/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,9 @@ type DB interface {
// GetData retrieves values for given key
GetData(dbName, querierUserID, key string) (*types.GetDataResponseEnvelope, error)

// GetDataRange retrieves a range of values
GetDataRange(dbName, querierUserID, startKey, endKey string, limit uint32) (*types.GetDataRangeResponseEnvelope, error)

// DataQuery executes a given JSON query and return key-value pairs which are matching
// the criteria provided in the query. The query is a json marshled bytes which needs
// to contain a top level combinational operator followed by a list of attributes and
Expand Down Expand Up @@ -255,11 +258,12 @@ func NewDB(conf *config.Configurations, logger *logger.SugarLogger) (DB, error)

worldstateQueryProcessor := newWorldstateQueryProcessor(
&worldstateQueryProcessorConfig{
nodeID: localConf.Server.Identity.ID,
db: levelDB,
blockStore: blockStore,
identityQuerier: querier,
logger: logger,
nodeID: localConf.Server.Identity.ID,
db: levelDB,
responseSizeLimit: localConf.Server.QueryProcessing.ResponseSizeLimit,
blockStore: blockStore,
identityQuerier: querier,
logger: logger,
},
)

Expand Down Expand Up @@ -541,6 +545,25 @@ func (d *db) GetData(dbName, querierUserID, key string) (*types.GetDataResponseE
}, nil
}

// GetDataRange returns a range of values starting from the start key and till before the end key
func (d *db) GetDataRange(dbName, querierUserID, startKey, endKey string, limit uint32) (*types.GetDataRangeResponseEnvelope, error) {
dataResponse, err := d.worldstateQueryProcessor.getDataRange(dbName, querierUserID, startKey, endKey, limit)
if err != nil {
return nil, err
}

dataResponse.Header = d.responseHeader()
sign, err := d.signature(dataResponse)
if err != nil {
return nil, err
}

return &types.GetDataRangeResponseEnvelope{
Response: dataResponse,
Signature: sign,
}, nil
}

// DataQuery executes a given JSON query and return key-value pairs which are matching
// the criteria provided in the query
func (d *db) DataQuery(ctx context.Context, dbName, querierUserID string, query []byte) (*types.DataQueryResponseEnvelope, error) {
Expand Down
23 changes: 23 additions & 0 deletions internal/bcdb/mocks/db.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion internal/bcdb/mocks/tx_processor.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

114 changes: 99 additions & 15 deletions internal/bcdb/worldstate_query_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,28 +20,31 @@ import (
)

type worldstateQueryProcessor struct {
nodeID string
db worldstate.DB
blockStore *blockstore.Store
identityQuerier *identity.Querier
logger *logger.SugarLogger
nodeID string
db worldstate.DB
responseSizeLimit uint32
blockStore *blockstore.Store
identityQuerier *identity.Querier
logger *logger.SugarLogger
}

type worldstateQueryProcessorConfig struct {
nodeID string
db worldstate.DB
blockStore *blockstore.Store
identityQuerier *identity.Querier
logger *logger.SugarLogger
nodeID string
db worldstate.DB
responseSizeLimit uint32
blockStore *blockstore.Store
identityQuerier *identity.Querier
logger *logger.SugarLogger
}

func newWorldstateQueryProcessor(conf *worldstateQueryProcessorConfig) *worldstateQueryProcessor {
return &worldstateQueryProcessor{
nodeID: conf.nodeID,
db: conf.db,
blockStore: conf.blockStore,
identityQuerier: conf.identityQuerier,
logger: conf.logger,
nodeID: conf.nodeID,
db: conf.db,
responseSizeLimit: conf.responseSizeLimit * 1024, // MB to bytes
blockStore: conf.blockStore,
identityQuerier: conf.identityQuerier,
logger: conf.logger,
}
}

Expand Down Expand Up @@ -126,6 +129,87 @@ func (q *worldstateQueryProcessor) getData(dbName, querierUserID, key string) (*
}, nil
}

// getDataRange return the state associated with a given key
func (q *worldstateQueryProcessor) getDataRange(dbName, querierUserID, startKey, endKey string, limit uint32) (*types.GetDataRangeResponse, error) {
if worldstate.IsSystemDB(dbName) {
return nil, &errors.PermissionErr{
ErrMsg: "no user can directly read from a system database [" + dbName + "]. " +
"To read from a system database, use /config, /user, /db rest endpoints instead of /data",
}
}

hasPerm, err := q.identityQuerier.HasReadAccessOnDataDB(querierUserID, dbName)
if err != nil {
return nil, err
}
if !hasPerm {
return nil, &errors.PermissionErr{
ErrMsg: "the user [" + querierUserID + "] has no permission to read from database [" + dbName + "]",
}
}

var kvs []*types.KVWithMetadata
var resultCount uint32
var size uint32
var pendingResult bool
var nextStartKey string

itr, err := q.db.GetIterator(dbName, startKey, endKey)
defer itr.Release()
if err != nil {
return nil, err
}

for itr.Next() {
k := string(itr.Key())
v := &types.ValueWithMetadata{}
if err := proto.Unmarshal(itr.Value(), v); err != nil {
return nil, err
}

acl := v.GetMetadata().GetAccessControl()
if acl != nil {
if !acl.ReadUsers[querierUserID] && !acl.ReadWriteUsers[querierUserID] {
continue
}
}

if limit > 0 {
resultCount++
if resultCount > limit {
pendingResult = true
nextStartKey = k
break
}
}

size += uint32(len(k) + len(itr.Value()))
if size > q.responseSizeLimit {
pendingResult = true
nextStartKey = k
if len(kvs) != 0 {
break
}

return nil, &errors.ServerRestrictionError{
ErrMsg: fmt.Sprintf("response size limit for queries is configured as %d bytes but a single record size itself is %d bytes. Increase the query response size limit at the server", q.responseSizeLimit, size),
}
}

kvs = append(kvs, &types.KVWithMetadata{
Key: k,
Value: v.GetValue(),
Metadata: v.GetMetadata(),
})
}

return &types.GetDataRangeResponse{
KVs: kvs,
PendingResult: pendingResult,
NextStartKey: nextStartKey,
}, nil
}

func (q *worldstateQueryProcessor) getUser(querierUserID, targetUserID string) (*types.GetUserResponse, error) {
user, metadata, err := q.identityQuerier.GetUser(targetUserID)
if err != nil {
Expand Down
Loading

0 comments on commit 0d2419b

Please sign in to comment.