Skip to content

Commit

Permalink
clone and restore api
Browse files Browse the repository at this point in the history
  • Loading branch information
yihuang committed Dec 29, 2022
1 parent b797716 commit 66b7908
Show file tree
Hide file tree
Showing 21 changed files with 186 additions and 163 deletions.
71 changes: 19 additions & 52 deletions baseapp/baseapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
dbm "github.com/cosmos/cosmos-db"
"github.com/cosmos/gogoproto/proto"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/libs/log"
tmproto "github.com/tendermint/tendermint/proto/tendermint/types"
"golang.org/x/exp/maps"
Expand Down Expand Up @@ -567,31 +566,12 @@ func (app *BaseApp) getContextForTx(mode runTxMode, txBytes []byte) sdk.Context
}

if mode == runTxModeSimulate {
ctx, _ = ctx.CacheContext()
ctx = ctx.CloneMultiStore()
}

return ctx
}

// cacheTxContext returns a new context based off of the provided context with
// a branched multi-store.
func (app *BaseApp) cacheTxContext(ctx sdk.Context, txBytes []byte) (sdk.Context, sdk.CacheMultiStore) {
ms := ctx.MultiStore()
// TODO: https://github.com/cosmos/cosmos-sdk/issues/2824
msCache := ms.CacheMultiStore()
if msCache.TracingEnabled() {
msCache = msCache.SetTracingContext(
sdk.TraceContext(
map[string]interface{}{
"txHash": fmt.Sprintf("%X", tmhash.Sum(txBytes)),
},
),
).(sdk.CacheMultiStore)
}

return ctx.WithMultiStore(msCache), msCache
}

// runTx processes a transaction within a given execution mode, encoded transaction
// bytes, and the decoded transaction itself. All state transitions occur through
// a cached Context depending on the mode provided. State only gets persisted
Expand Down Expand Up @@ -654,43 +634,30 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
}

if app.anteHandler != nil {
var (
anteCtx sdk.Context
msCache sdk.CacheMultiStore
)

// Branch context before AnteHandler call in case it aborts.
// This is required for both CheckTx and DeliverTx.
// Ref: https://github.com/cosmos/cosmos-sdk/issues/2772
//
// NOTE: Alternatively, we could require that AnteHandler ensures that
// writes do not happen if aborted/failed. This may have some
// performance benefits, but it'll be more difficult to get right.
anteCtx, msCache = app.cacheTxContext(ctx, txBytes)
anteCtx = anteCtx.WithEventManager(sdk.NewEventManager())
anteCtx := ctx.CloneMultiStore().
WithEventManager(sdk.NewEventManager())
newCtx, err := app.anteHandler(anteCtx, tx, mode == runTxModeSimulate)

if !newCtx.IsZero() {
// At this point, newCtx.MultiStore() is a store branch, or something else
// replaced by the AnteHandler. We want the original multistore.
//
// Also, in the case of the tx aborting, we need to track gas consumed via
// the instantiated gas meter in the AnteHandler, so we update the context
// prior to returning.
ctx = newCtx.WithMultiStore(ms)
if err != nil {
return gInfo, nil, nil, 0, err
}

// commit the cloned multistore, and update the newCtx.
ms.Restore(newCtx.MultiStore())
ctx = newCtx.WithMultiStore(ctx.MultiStore())

events := ctx.EventManager().Events()

// GasMeter expected to be set in AnteHandler
gasWanted = ctx.GasMeter().Limit()

if err != nil {
return gInfo, nil, nil, 0, err
}

priority = ctx.Priority()
msCache.Write()
anteEvents = events.ToABCIEvents()
}

Expand All @@ -707,16 +674,14 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
}
}

// Create a new Context based off of the existing Context with a MultiStore branch
// in case message processing fails. At this point, the MultiStore
// is a branch of a branch.
runMsgCtx, msCache := app.cacheTxContext(ctx, txBytes)

// Attempt to execute all messages and only update state if all messages pass
// and we're in DeliverTx. Note, runMsgs will never return a reference to a
// Result if any single message fails or does not have a registered Handler.
result, err = app.runMsgs(runMsgCtx, msgs, mode)
if err == nil {
if err = ctx.RunAtomic(func(runMsgCtx sdk.Context) error {
result, err = app.runMsgs(runMsgCtx, msgs, mode)
if err != nil {
return err
}

// Run optional postHandlers.
//
Expand All @@ -729,7 +694,7 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re

newCtx, err := app.postHandler(postCtx, tx, mode == runTxModeSimulate)
if err != nil {
return gInfo, nil, nil, priority, err
return err
}

result.Events = append(result.Events, newCtx.EventManager().ABCIEvents()...)
Expand All @@ -738,14 +703,16 @@ func (app *BaseApp) runTx(mode runTxMode, txBytes []byte) (gInfo sdk.GasInfo, re
if mode == runTxModeDeliver {
// When block gas exceeds, it'll panic and won't commit the cached store.
consumeBlockGas()

msCache.Write()
}

if len(anteEvents) > 0 && (mode == runTxModeDeliver || mode == runTxModeSimulate) {
// append the events in the order of occurrence
result.Events = append(anteEvents, result.Events...)
}

return nil
}); err != nil {
return gInfo, nil, nil, priority, err
}

return gInfo, result, anteEvents, priority, err
Expand Down
4 changes: 0 additions & 4 deletions baseapp/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,6 @@ func (app *BaseApp) NewContext(isCheckTx bool, header tmproto.Header) sdk.Contex
return sdk.NewContext(app.deliverState.ms, header, false, app.logger)
}

func (app *BaseApp) NewUncachedContext(isCheckTx bool, header tmproto.Header) sdk.Context {
return sdk.NewContext(app.cms, header, isCheckTx, app.logger)
}

func (app *BaseApp) GetContextForDeliverTx(txBytes []byte) sdk.Context {
return app.getContextForTx(runTxModeDeliver, txBytes)
}
74 changes: 18 additions & 56 deletions store/cachekv/benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,19 +28,20 @@ func DoBenchmarkDeepContextStack(b *testing.B, depth int) {
}
cms.Commit()

ctx := sdk.NewContext(cms, tmproto.Header{}, false, log.NewNopLogger())
ms := cms.CacheMultiStore()
ctx := sdk.NewContext(ms, tmproto.Header{}, false, log.NewNopLogger())

var stack ContextStack
stack.Reset(ctx)

for i := 0; i < depth; i++ {
stack.Snapshot()

store := stack.CurrentContext().KVStore(key)
store := stack.Context().KVStore(key)
store.Set([]byte(fmt.Sprintf("hello%03d", i)), []byte("modified"))
}

store = stack.CurrentContext().KVStore(key)
store = stack.Context().KVStore(key)

b.ResetTimer()
for i := 0; i < b.N; i++ {
Expand Down Expand Up @@ -85,87 +86,48 @@ type cachedContext struct {
type ContextStack struct {
// Context of the initial state before transaction execution.
// It's the context used by `StateDB.CommitedState`.
initialCtx sdk.Context
cachedContexts []cachedContext
ctx sdk.Context
snapshots []interface{}
}

// CurrentContext returns the top context of cached stack,
// if the stack is empty, returns the initial context.
func (cs *ContextStack) CurrentContext() sdk.Context {
l := len(cs.cachedContexts)
if l == 0 {
return cs.initialCtx
}
return cs.cachedContexts[l-1].ctx
func (cs *ContextStack) Context() sdk.Context {
return cs.ctx
}

// Reset sets the initial context and clear the cache context stack.
func (cs *ContextStack) Reset(ctx sdk.Context) {
cs.initialCtx = ctx
if len(cs.cachedContexts) > 0 {
cs.cachedContexts = []cachedContext{}
}
cs.ctx = ctx
cs.snapshots = []interface{}{}
}

// IsEmpty returns true if the cache context stack is empty.
func (cs *ContextStack) IsEmpty() bool {
return len(cs.cachedContexts) == 0
}

// Commit commits all the cached contexts from top to bottom in order and clears the stack by setting an empty slice of cache contexts.
func (cs *ContextStack) Commit() {
// commit in order from top to bottom
for i := len(cs.cachedContexts) - 1; i >= 0; i-- {
if cs.cachedContexts[i].commit == nil {
panic(fmt.Sprintf("commit function at index %d should not be nil", i))
} else {
cs.cachedContexts[i].commit()
}
}
cs.cachedContexts = []cachedContext{}
}

// CommitToRevision commit the cache after the target revision,
// to improve efficiency of db operations.
func (cs *ContextStack) CommitToRevision(target int) error {
if target < 0 || target >= len(cs.cachedContexts) {
return fmt.Errorf("snapshot index %d out of bound [%d..%d)", target, 0, len(cs.cachedContexts))
}

// commit in order from top to bottom
for i := len(cs.cachedContexts) - 1; i > target; i-- {
if cs.cachedContexts[i].commit == nil {
return fmt.Errorf("commit function at index %d should not be nil", i)
}
cs.cachedContexts[i].commit()
}
cs.cachedContexts = cs.cachedContexts[0 : target+1]

return nil
return len(cs.snapshots) == 0
}

// Snapshot pushes a new cached context to the stack,
// and returns the index of it.
func (cs *ContextStack) Snapshot() int {
i := len(cs.cachedContexts)
ctx, commit := cs.CurrentContext().CacheContext()
cs.cachedContexts = append(cs.cachedContexts, cachedContext{ctx: ctx, commit: commit})
return i
cs.snapshots = append(cs.snapshots, cs.ctx.MultiStore().Snapshot())
return len(cs.snapshots) - 1
}

// RevertToSnapshot pops all the cached contexts after the target index (inclusive).
// the target should be snapshot index returned by `Snapshot`.
// This function panics if the index is out of bounds.
func (cs *ContextStack) RevertToSnapshot(target int) {
if target < 0 || target >= len(cs.cachedContexts) {
panic(fmt.Errorf("snapshot index %d out of bound [%d..%d)", target, 0, len(cs.cachedContexts)))
if target < 0 || target >= len(cs.snapshots) {
panic(fmt.Errorf("snapshot index %d out of bound [%d..%d)", target, 0, len(cs.snapshots)))
}
cs.cachedContexts = cs.cachedContexts[:target]
cs.ctx.MultiStore().Restore(cs.snapshots[target])
cs.snapshots = cs.snapshots[:target]
}

// RevertAll discards all the cache contexts.
func (cs *ContextStack) RevertAll() {
if len(cs.cachedContexts) > 0 {
if len(cs.snapshots) > 0 {
cs.RevertToSnapshot(0)
}
}
15 changes: 15 additions & 0 deletions store/cachekv/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,21 @@ func (store *Store) GetStoreType() types.StoreType {
return store.parent.GetStoreType()
}

// Clone creates a snapshot of the cache store.
// This is a copy-on-write operation and is very fast because
// it only performs a shadowed copy.
func (store *Store) Clone() types.CacheKVStore {
return &Store{
cache: store.cache.Copy(),
parent: store.parent,
}
}

func (store *Store) Restore(s types.CacheKVStore) {
ss := s.(*Store)
store.cache = ss.cache
}

// Get implements types.KVStore.
func (store *Store) Get(key []byte) (value []byte) {
store.mtx.Lock()
Expand Down
37 changes: 32 additions & 5 deletions store/cachemulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const storeNameCtxKey = "store_name"
// keys for the substores.
type Store struct {
db types.CacheKVStore
stores map[types.StoreKey]types.CacheWrap
stores map[types.StoreKey]types.CacheKVStore
keys map[string]types.StoreKey

traceWriter io.Writer
Expand All @@ -38,12 +38,12 @@ var _ types.CacheMultiStore = Store{}
// CacheWrapper objects and a KVStore as the database. Each CacheWrapper store
// is a branched store.
func NewFromKVStore(
store types.KVStore, stores map[types.StoreKey]types.CacheWrapper,
store types.KVStore, stores map[types.StoreKey]types.KVStore,
keys map[string]types.StoreKey, traceWriter io.Writer, traceContext types.TraceContext,
) Store {
cms := Store{
db: cachekv.NewStore(store),
stores: make(map[types.StoreKey]types.CacheWrap, len(stores)),
stores: make(map[types.StoreKey]types.CacheKVStore, len(stores)),
keys: keys,
traceWriter: traceWriter,
traceContext: traceContext,
Expand All @@ -66,14 +66,14 @@ func NewFromKVStore(
// NewStore creates a new Store object from a mapping of store keys to
// CacheWrapper objects. Each CacheWrapper store is a branched store.
func NewStore(
db dbm.DB, stores map[types.StoreKey]types.CacheWrapper, keys map[string]types.StoreKey,
db dbm.DB, stores map[types.StoreKey]types.KVStore, keys map[string]types.StoreKey,
traceWriter io.Writer, traceContext types.TraceContext,
) Store {
return NewFromKVStore(dbadapter.Store{DB: db}, stores, keys, traceWriter, traceContext)
}

func newCacheMultiStoreFromCMS(cms Store) Store {
stores := make(map[types.StoreKey]types.CacheWrapper)
stores := make(map[types.StoreKey]types.KVStore)
for k, v := range cms.stores {
stores[k] = v
}
Expand Down Expand Up @@ -127,6 +127,33 @@ func (cms Store) Write() {
}
}

func (cms Store) Clone() types.CacheMultiStore {
stores := make(map[types.StoreKey]types.CacheKVStore, len(cms.stores))
for key, store := range cms.stores {
stores[key] = store.Clone()
}
return Store{
db: cms.db.Clone(),
stores: stores,

keys: cms.keys,
traceWriter: cms.traceWriter,
traceContext: cms.traceContext,
}
}

func (cms Store) Restore(s types.CacheMultiStore) {
ms := s.(Store)
cms.db.Restore(ms.db)
for key, store := range cms.stores {
store.Restore(ms.stores[key])
}

// this multi-store must not be used anymore
ms.db = nil
ms.stores = nil
}

// Implements CacheWrapper.
func (cms Store) CacheWrap() types.CacheWrap {
return cms.CacheMultiStore().(types.CacheWrap)
Expand Down
4 changes: 2 additions & 2 deletions store/rootmulti/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ func (rs *Store) CacheWrapWithTrace(_ io.Writer, _ types.TraceContext) types.Cac
// CacheMultiStore creates ephemeral branch of the multi-store and returns a CacheMultiStore.
// It implements the MultiStore interface.
func (rs *Store) CacheMultiStore() types.CacheMultiStore {
stores := make(map[types.StoreKey]types.CacheWrapper)
stores := make(map[types.StoreKey]types.KVStore)
for k, v := range rs.stores {
store := types.KVStore(v)
// Wire the listenkv.Store to allow listeners to observe the writes from the cache store,
Expand All @@ -497,7 +497,7 @@ func (rs *Store) CacheMultiStore() types.CacheMultiStore {
// any store cannot be loaded. This should only be used for querying and
// iterating at past heights.
func (rs *Store) CacheMultiStoreWithVersion(version int64) (types.CacheMultiStore, error) {
cachedStores := make(map[types.StoreKey]types.CacheWrapper)
cachedStores := make(map[types.StoreKey]types.KVStore)
for key, store := range rs.stores {
var cacheStore types.KVStore
switch store.GetStoreType() {
Expand Down
Loading

0 comments on commit 66b7908

Please sign in to comment.