Skip to content

Commit

Permalink
clone and restore api
Browse files Browse the repository at this point in the history
  • Loading branch information
yihuang committed Dec 30, 2022
1 parent 726b1d4 commit 4eaafd1
Show file tree
Hide file tree
Showing 24 changed files with 191 additions and 167 deletions.
1 change: 1 addition & 0 deletions baseapp/abci_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -857,6 +857,7 @@ func TestABCI_InvalidTransaction(t *testing.T) {
require.EqualValues(t, sdkerrors.ErrInvalidSequence.Codespace(), space, err)
require.EqualValues(t, sdkerrors.ErrInvalidSequence.ABCICode(), code, err)
} else {
require.NoError(t, err)
require.NotNil(t, result)
}
}
Expand Down
56 changes: 16 additions & 40 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
dbm "github.com/cosmos/cosmos-db"
"github.com/cosmos/gogoproto/proto"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -567,31 +566,12 @@ func (app *BaseApp) getContextForTx(mode runTxMode, txBytes []byte) sdk.Context
}

if mode == runTxModeSimulate {
ctx, _ = ctx.CacheContext()
ctx = ctx.CloneMultiStore()
}

return ctx
}

// cacheTxContext returns a new context based off of the provided context with
// a branched multi-store.
func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context, sdk.CacheMultiStore) {
ms := ctx.MultiStore()
// TODO: https://github.com/cosmos/cosmos-sdk/issues/2824
msCache := ms.CacheMultiStore()
if msCache.TracingEnabled() {
msCache = msCache.SetTracingContext(
sdk.TraceContext(
map[string]interface{}{
"txHash": fmt.Sprintf("%X", tmhash.Sum(txBytes)),
},
),
).(sdk.CacheMultiStore)
}

return ctx.WithMultiStore(msCache), msCache
}

// runTx processes a transaction within a given execution mode, encoded transaction
// bytes, and the decoded transaction itself. All state transitions occur through
// a cached Context depending on the mode provided. State only gets persisted
Expand Down Expand Up @@ -654,20 +634,15 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
}

if app.anteHandler != nil {
var (
anteCtx sdk.Context
msCache sdk.CacheMultiStore
)

// Branch context before AnteHandler call in case it aborts.
// This is required for both CheckTx and DeliverTx.
// Ref: https://github.com/cosmos/cosmos-sdk/issues/2772
//
// NOTE: Alternatively, we could require that AnteHandler ensures that
// writes do not happen if aborted/failed. This may have some
// performance benefits, but it'll be more difficult to get right.
anteCtx, msCache = app.cacheTxContext(ctx, txBytes)
anteCtx = anteCtx.WithEventManager(sdk.NewEventManager())
anteCtx := ctx.CloneMultiStore().
WithEventManager(sdk.NewEventManager())
newCtx, err := app.anteHandler(anteCtx, tx, mode == runTxModeSimulate)

if !newCtx.IsZero() {
Expand All @@ -689,8 +664,10 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
return gInfo, nil, nil, 0, err
}

// commit the cloned multistore.
ms.Restore(anteCtx.MultiStore())

priority = ctx.Priority()
msCache.Write()
anteEvents = events.ToABCIEvents()
}

Expand All @@ -707,16 +684,14 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
}
}

// Create a new Context based off of the existing Context with a MultiStore branch
// in case message processing fails. At this point, the MultiStore
// is a branch of a branch.
runMsgCtx, msCache := app.cacheTxContext(ctx, txBytes)

// Attempt to execute all messages and only update state if all messages pass
// and we're in DeliverTx. Note, runMsgs will never return a reference to a
// Result if any single message fails or does not have a registered Handler.
result, err = app.runMsgs(runMsgCtx, msgs, mode)
if err == nil {
_ = ctx.RunAtomic(func(runMsgCtx sdk.Context) error {
result, err = app.runMsgs(runMsgCtx, msgs, mode)
if err != nil {
return err
}

// Run optional postHandlers.
//
Expand All @@ -729,7 +704,8 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re

newCtx, err := app.postHandler(postCtx, tx, mode == runTxModeSimulate)
if err != nil {
return gInfo, nil, nil, priority, err
result = nil
return err
}

result.Events = append(result.Events, newCtx.EventManager().ABCIEvents()...)
Expand All @@ -738,15 +714,15 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
if mode == runTxModeDeliver {
// When block gas exceeds, it'll panic and won't commit the cached store.
consumeBlockGas()

msCache.Write()
}

if len(anteEvents) > 0 && (mode == runTxModeDeliver || mode == runTxModeSimulate) {
// append the events in the order of occurrence
result.Events = append(anteEvents, result.Events...)
}
}

return nil
})

return gInfo, result, anteEvents, priority, err
}
Expand Down
3 changes: 2 additions & 1 deletion baseapp/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ func (app *BaseApp) NewContext(isCheckTx bool, header tmproto.Header) sdk.Contex
return sdk.NewContext(app.deliverState.ms, header, false, app.logger)
}

// NewUncachedContext is used in unit test to bypass the `deliverState.ms`.
func (app *BaseApp) NewUncachedContext(isCheckTx bool, header tmproto.Header) sdk.Context {
return sdk.NewContext(app.cms, header, isCheckTx, app.logger)
return sdk.NewContext(app.cms.CacheMultiStore(), header, isCheckTx, app.logger)
}

func (app *BaseApp) GetContextForDeliverTx(txBytes []byte) sdk.Context {
Expand Down
81 changes: 18 additions & 63 deletions store/cachekv/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,20 @@ func DoBenchmarkDeepContextStack(b *testing.B, depth int) {
}
cms.Commit()

ctx := sdk.NewContext(cms, tmproto.Header{}, false, log.NewNopLogger())
ms := cms.CacheMultiStore()
ctx := sdk.NewContext(ms, tmproto.Header{}, false, log.NewNopLogger())

var stack ContextStack
stack.Reset(ctx)

for i := 0; i < depth; i++ {
stack.Snapshot()

store := stack.CurrentContext().KVStore(key)
store := stack.Context().KVStore(key)
store.Set([]byte(fmt.Sprintf("hello%03d", i)), []byte("modified"))
}

store = stack.CurrentContext().KVStore(key)
store = stack.Context().KVStore(key)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -71,101 +72,55 @@ func BenchmarkDeepContextStack13(b *testing.B) {
DoBenchmarkDeepContextStack(b, 13)
}

// cachedContext is a pair of cache context and its corresponding commit method.
// They are obtained from the return value of `context.CacheContext()`.
type cachedContext struct {
ctx sdk.Context
commit func()
}

// ContextStack manages the initial context and a stack of cached contexts,
// to support the `StateDB.Snapshot` and `StateDB.RevertToSnapshot` methods.
//
// Copied from an old version of ethermint
type ContextStack struct {
// Context of the initial state before transaction execution.
// It's the context used by `StateDB.CommitedState`.
initialCtx sdk.Context
cachedContexts []cachedContext
ctx sdk.Context
snapshots []storetypes.CacheMultiStore
}

// CurrentContext returns the top context of cached stack,
// if the stack is empty, returns the initial context.
func (cs *ContextStack) CurrentContext() sdk.Context {
l := len(cs.cachedContexts)
if l == 0 {
return cs.initialCtx
}
return cs.cachedContexts[l-1].ctx
func (cs *ContextStack) Context() sdk.Context {
return cs.ctx
}

// Reset sets the initial context and clear the cache context stack.
func (cs *ContextStack) Reset(ctx sdk.Context) {
cs.initialCtx = ctx
if len(cs.cachedContexts) > 0 {
cs.cachedContexts = []cachedContext{}
}
cs.ctx = ctx
cs.snapshots = []storetypes.CacheMultiStore{}
}

// IsEmpty returns true if the cache context stack is empty.
func (cs *ContextStack) IsEmpty() bool {
return len(cs.cachedContexts) == 0
}

// Commit commits all the cached contexts from top to bottom in order and clears the stack by setting an empty slice of cache contexts.
func (cs *ContextStack) Commit() {
// commit in order from top to bottom
for i := len(cs.cachedContexts) - 1; i >= 0; i-- {
if cs.cachedContexts[i].commit == nil {
panic(fmt.Sprintf("commit function at index %d should not be nil", i))
} else {
cs.cachedContexts[i].commit()
}
}
cs.cachedContexts = []cachedContext{}
}

// CommitToRevision commit the cache after the target revision,
// to improve efficiency of db operations.
func (cs *ContextStack) CommitToRevision(target int) error {
if target < 0 || target >= len(cs.cachedContexts) {
return fmt.Errorf("snapshot index %d out of bound [%d..%d)", target, 0, len(cs.cachedContexts))
}

// commit in order from top to bottom
for i := len(cs.cachedContexts) - 1; i > target; i-- {
if cs.cachedContexts[i].commit == nil {
return fmt.Errorf("commit function at index %d should not be nil", i)
}
cs.cachedContexts[i].commit()
}
cs.cachedContexts = cs.cachedContexts[0 : target+1]

return nil
return len(cs.snapshots) == 0
}

// Snapshot pushes a new cached context to the stack,
// and returns the index of it.
func (cs *ContextStack) Snapshot() int {
i := len(cs.cachedContexts)
ctx, commit := cs.CurrentContext().CacheContext()
cs.cachedContexts = append(cs.cachedContexts, cachedContext{ctx: ctx, commit: commit})
return i
cs.snapshots = append(cs.snapshots, cs.ctx.MultiStore().Clone())
return len(cs.snapshots) - 1
}

// RevertToSnapshot pops all the cached contexts after the target index (inclusive).
// the target should be snapshot index returned by `Snapshot`.
// This function panics if the index is out of bounds.
func (cs *ContextStack) RevertToSnapshot(target int) {
if target < 0 || target >= len(cs.cachedContexts) {
panic(fmt.Errorf("snapshot index %d out of bound [%d..%d)", target, 0, len(cs.cachedContexts)))
if target < 0 || target >= len(cs.snapshots) {
panic(fmt.Errorf("snapshot index %d out of bound [%d..%d)", target, 0, len(cs.snapshots)))
}
cs.cachedContexts = cs.cachedContexts[:target]
cs.ctx.MultiStore().Restore(cs.snapshots[target])
cs.snapshots = cs.snapshots[:target]
}

// RevertAll discards all the cache contexts.
func (cs *ContextStack) RevertAll() {
if len(cs.cachedContexts) > 0 {
if len(cs.snapshots) > 0 {
cs.RevertToSnapshot(0)
}
}
22 changes: 15 additions & 7 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,6 @@ import (
"github.com/cosmos/cosmos-sdk/store/types"
)

// cValue represents a cached value.
// If dirty is true, it indicates the cached value is different from the underlying value.
type cValue struct {
value []byte
dirty bool
}

// Store wraps an in-memory cache around an underlying types.KVStore.
type Store struct {
mtx sync.Mutex
Expand All @@ -38,6 +31,21 @@ func (store *Store) GetStoreType() types.StoreType {
return store.parent.GetStoreType()
}

// Clone creates a snapshot of the cache store.
// This is a copy-on-write operation and is very fast because
// it only performs a shadowed copy.
func (store *Store) Clone() types.CacheKVStore {
return &Store{
cache: store.cache.Copy(),
parent: store.parent,
}
}

func (store *Store) Restore(s types.CacheKVStore) {
ss := s.(*Store)
store.cache = ss.cache
}

// Get implements types.KVStore.
func (store *Store) Get(key []byte) (value []byte) {
store.mtx.Lock()
Expand Down
6 changes: 3 additions & 3 deletions store/cachekv/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ func newCacheKVStore() types.CacheKVStore {
// create two layer of cache store to better emulate the real world.
mem := dbadapter.Store{DB: dbm.NewMemDB()}
deliverState := cachekv.NewStore(mem)
return cachekv.NewStore(deliverState)
return deliverState.Clone()
}

func keyFmt(i int) []byte { return bz(fmt.Sprintf("key%0.8d", i)) }
Expand Down Expand Up @@ -697,11 +697,11 @@ func BenchmarkCacheKVStoreSetAndCommit(b *testing.B) {
b.ResetTimer()
for i := 0; i < b.N; i++ {
store := cachekv.NewStore(mem)
store1 := cachekv.NewStore(store)
store1 := store.Clone()
for j := 0; j < 10; j++ {
store1.Set(sdk.Uint64ToBigEndian(uint64(i+j)), []byte{byte(i)})
}
store1.Write()
store.Restore(store1)
store.Write()
}
}
Loading

0 comments on commit 4eaafd1

Please sign in to comment.