Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

perf: improve performance of nested cache store #14444

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,6 +225,9 @@ extension interfaces. `module.Manager.Modules` is now of type `map[string]interf
* (snapshots) [14048](https://github.com/cosmos/cosmos-sdk/pull/14048) Move the Snapshot package to the store package. This is done in an effort group all storage related logic under one package.
* (baseapp) [#14050](https://github.com/cosmos/cosmos-sdk/pull/14050) refactor `ABCIListener` interface to accept go contexts
* (store/streaming)[#14603](https://github.com/cosmos/cosmos-sdk/pull/14603) `StoreDecoderRegistry` moved from store to `types/simulations` this breaks the `AppModuleSimulation` interface.
* (store) [#14444](https://github.com/cosmos/cosmos-sdk/pull/14444) Refactor cache store to improve performance in iteration and nesting cases.
- Add new method `CacheMultiStore()` to `Context`, `NewContext` accept `CacheMultiStore` instead of `MultiStore`, change `WithMultiStore` to `WithCacheMultiStore`.
- Add new methods `Clone` and `Restore` on `CacheMultiStore` and `CacheKVStore` interfaces.

### CLI Breaking Changes

Expand Down
1 change: 1 addition & 0 deletions baseapp/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -866,6 +866,7 @@ func TestABCI_InvalidTransaction(t *testing.T) {
require.EqualValues(t, sdkerrors.ErrInvalidSequence.Codespace(), space, err)
require.EqualValues(t, sdkerrors.ErrInvalidSequence.ABCICode(), code, err)
} else {
require.NoError(t, err)
require.NotNil(t, result)
}
}
Expand Down
60 changes: 18 additions & 42 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
dbm "github.com/cosmos/cosmos-db"
"github.com/cosmos/gogoproto/proto"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -574,25 +573,6 @@ func (app *BaseApp) getContextForTx(mode runTxMode, txBytes []byte) sdk.Context
return ctx
}

// cacheTxContext returns a new context based off of the provided context with
// a branched multi-store.
func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context, storetypes.CacheMultiStore) {
ms := ctx.MultiStore()
// TODO: https://github.com/cosmos/cosmos-sdk/issues/2824
msCache := ms.CacheMultiStore()
if msCache.TracingEnabled() {
msCache = msCache.SetTracingContext(
storetypes.TraceContext(
map[string]interface{}{
"txHash": fmt.Sprintf("%X", tmhash.Sum(txBytes)),
},
),
).(storetypes.CacheMultiStore)
}

return ctx.WithMultiStore(msCache), msCache
}

// runTx processes a transaction within a given execution mode, encoded transaction
// bytes, and the decoded transaction itself. All state transitions occur through
// a cached Context depending on the mode provided. State only gets persisted
Expand All @@ -607,7 +587,7 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
var gasWanted uint64

ctx := app.getContextForTx(mode, txBytes)
ms := ctx.MultiStore()
cms := ctx.CacheMultiStore()

// only run the tx if there is block gas remaining
if mode == runTxModeDeliver && ctx.BlockGasMeter().IsOutOfGas() {
Expand Down Expand Up @@ -655,20 +635,15 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
}

if app.anteHandler != nil {
var (
anteCtx sdk.Context
msCache storetypes.CacheMultiStore
)

// Branch context before AnteHandler call in case it aborts.
// This is required for both CheckTx and DeliverTx.
// Ref: https://github.com/cosmos/cosmos-sdk/issues/2772
//
// NOTE: Alternatively, we could require that AnteHandler ensures that
// writes do not happen if aborted/failed. This may have some
// performance benefits, but it'll be more difficult to get right.
anteCtx, msCache = app.cacheTxContext(ctx, txBytes)
anteCtx = anteCtx.WithEventManager(sdk.NewEventManager())
anteCtx := ctx.WithCacheMultiStore(ctx.CacheMultiStore().Clone()).
WithEventManager(sdk.NewEventManager())
newCtx, err := app.anteHandler(anteCtx, tx, mode == runTxModeSimulate)

if !newCtx.IsZero() {
Expand All @@ -678,7 +653,7 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
// Also, in the case of the tx aborting, we need to track gas consumed via
// the instantiated gas meter in the AnteHandler, so we update the context
// prior to returning.
ctx = newCtx.WithMultiStore(ms)
ctx = newCtx.WithCacheMultiStore(cms)
}

events := ctx.EventManager().Events()
Expand All @@ -690,8 +665,10 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
return gInfo, nil, nil, 0, err
}

// commit the cloned multistore.
cms.Restore(anteCtx.CacheMultiStore())

priority = ctx.Priority()
msCache.Write()
anteEvents = events.ToABCIEvents()
}

Expand All @@ -708,17 +685,15 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
}
}

// Create a new Context based off of the existing Context with a MultiStore branch
// in case message processing fails. At this point, the MultiStore
// is a branch of a branch.
runMsgCtx, msCache := app.cacheTxContext(ctx, txBytes)

// Attempt to execute all messages and only update state if all messages pass
// and we're in DeliverTx. Note, runMsgs will never return a reference to a
// Result if any single message fails or does not have a registered Handler.
result, err = app.runMsgs(runMsgCtx, msgs, mode)

if err == nil {
err = ctx.RunAtomic(func(runMsgCtx sdk.Context) error {
var err error
result, err = app.runMsgs(runMsgCtx, msgs, mode)
if err != nil {
return err
}
Comment on lines +693 to +696
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This returned err is actually ignored, the only reason it's observed by callers is because it happens to be assigned to the err variable defined by the outer function signature's return value parameter. That's fragile and unreliable and basically accidental. The error value returned by RunAtomic should be captured and inspected and returned (if non-nil) explicitly.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's not "happens to", but on purpose, because it need to write to the outer result, so I make it write to the err as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but this behavior is surprising, subtle, and fragile. The atomic func should report errors as returned error values, and its caller should inspect those returned errors rather than ignoring them as it does now.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make sense, did a small change, should make it clearer.


// Run optional postHandlers.
//
Expand All @@ -731,7 +706,8 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re

newCtx, err := app.postHandler(postCtx, tx, mode == runTxModeSimulate, err == nil)
if err != nil {
return gInfo, nil, anteEvents, priority, err
result = nil
return err
}

result.Events = append(result.Events, newCtx.EventManager().ABCIEvents()...)
Expand All @@ -740,15 +716,15 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
if mode == runTxModeDeliver {
// When block gas exceeds, it'll panic and won't commit the cached store.
consumeBlockGas()

msCache.Write()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When is this write called now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Part of RunAtomic, RunAtomic will commit when success, discard when fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunAtomic itself doesn't call Write, as far as I can see?

Copy link
Collaborator Author

@yihuang yihuang Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

RunAtomic itself doesn't call Write, as far as I can see?

The semantic of commit is implemented in RunAtomic directly, there's no write method with this API.

Copy link
Contributor

@08d2 08d2 Jan 20, 2023

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you mean. runTx must call msCache.Write, at some point, if mode == runTxModeDeliver. I see that the Write call here is deleted, but I don't see where another Write call is made. Where does that Write happen, now?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't understand what you mean. runTx must call msCache.Write, at some point, if mode == runTxModeDeliver. I see that the Write call here is deleted, but I don't see where another Write call is made. Where does that Write happen, now?

msCache is gone as well :)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, that's fine, but it doesn't address my point. My point is that runTx operates on a fork of app state, and (assuming mode == runTxModeDeliver) that state must be committed if the runTx operation succeeds, and must be discarded if the runTx operation fails. Where is that logic now?

}

if len(anteEvents) > 0 && (mode == runTxModeDeliver || mode == runTxModeSimulate) {
// append the events in the order of occurrence
result.Events = append(anteEvents, result.Events...)
}
}

return nil
})

return gInfo, result, anteEvents, priority, err
}
Expand Down
3 changes: 2 additions & 1 deletion baseapp/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func (app *BaseApp) NewContext(isCheckTx bool, header tmproto.Header) sdk.Contex
return sdk.NewContext(app.deliverState.ms, header, false, app.logger)
}

// NewUncachedContext is used in unit test to bypass the `deliverState.ms`.
func (app *BaseApp) NewUncachedContext(isCheckTx bool, header tmproto.Header) sdk.Context {
return sdk.NewContext(app.cms, header, isCheckTx, app.logger)
return sdk.NewContext(app.cms.CacheMultiStore(), header, isCheckTx, app.logger)
}

func (app *BaseApp) GetContextForDeliverTx(txBytes []byte) sdk.Context {
Expand Down
2 changes: 1 addition & 1 deletion store/cachekv/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ func (cs *CacheStack) CommitToRevision(target int) error {
// Snapshot pushes a new cached context to the stack,
// and returns the index of it.
func (cs *CacheStack) Snapshot() int {
cs.cacheStores = append(cs.cacheStores, cachekv.NewStore(cs.CurrentStore()))
cs.cacheStores = append(cs.cacheStores, cs.CurrentStore().Clone())
return len(cs.cacheStores) - 1
}

Expand Down
21 changes: 16 additions & 5 deletions store/cachekv/internal/btree.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,16 +35,16 @@ func NewBTree() BTree {
}
}

func (bt BTree) Set(key, value []byte) {
bt.tree.Set(newItem(key, value))
func (bt BTree) Set(key, value []byte, dirty bool) {
bt.tree.Set(item{key: key, value: value, dirty: dirty})
}

func (bt BTree) Get(key []byte) []byte {
func (bt BTree) Get(key []byte) ([]byte, bool) {
i, found := bt.tree.Get(newItem(key, nil))
if !found {
return nil
return nil, false
}
return i.value
return i.value, true
}

func (bt BTree) Delete(key []byte) {
Expand All @@ -65,6 +65,16 @@ func (bt BTree) ReverseIterator(start, end []byte) (types.Iterator, error) {
return newMemIterator(start, end, bt, false), nil
}

// ScanDirtyItems iterate over the dirty entries.
func (bt BTree) ScanDirtyItems(fn func(key, value []byte)) {
bt.tree.Scan(func(item item) bool {
if item.dirty {
fn(item.key, item.value)
}
return true
})
}

// Copy the tree. This is a copy-on-write operation and is very fast because
// it only performs a shadowed copy.
func (bt BTree) Copy() BTree {
Expand All @@ -77,6 +87,7 @@ func (bt BTree) Copy() BTree {
type item struct {
key []byte
value []byte
dirty bool
}

// byKeys compares the items by key
Expand Down
21 changes: 13 additions & 8 deletions store/cachekv/internal/btree_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,39 +11,44 @@ func TestGetSetDelete(t *testing.T) {
db := NewBTree()

// A nonexistent key should return nil.
value := db.Get([]byte("a"))
value, found := db.Get([]byte("a"))
require.Nil(t, value)
require.False(t, found)

// Set and get a value.
db.Set([]byte("a"), []byte{0x01})
db.Set([]byte("b"), []byte{0x02})
value = db.Get([]byte("a"))
db.Set([]byte("a"), []byte{0x01}, true)
db.Set([]byte("b"), []byte{0x02}, true)
value, found = db.Get([]byte("a"))
require.Equal(t, []byte{0x01}, value)
require.True(t, found)

value = db.Get([]byte("b"))
value, found = db.Get([]byte("b"))
require.Equal(t, []byte{0x02}, value)
require.True(t, found)

// Deleting a non-existent value is fine.
db.Delete([]byte("x"))

// Delete a value.
db.Delete([]byte("a"))

value = db.Get([]byte("a"))
value, found = db.Get([]byte("a"))
require.Nil(t, value)
require.False(t, found)

db.Delete([]byte("b"))

value = db.Get([]byte("b"))
value, found = db.Get([]byte("b"))
require.Nil(t, value)
require.False(t, found)
}

func TestDBIterator(t *testing.T) {
db := NewBTree()

for i := 0; i < 10; i++ {
if i != 6 { // but skip 6.
db.Set(int642Bytes(int64(i)), []byte{})
db.Set(int642Bytes(int64(i)), []byte{}, true)
}
}

Expand Down
44 changes: 0 additions & 44 deletions store/cachekv/search_benchmark_test.go

This file was deleted.

Loading