Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: concurrent checkTx #49

Merged
merged 18 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from 12 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
45 changes: 19 additions & 26 deletions baseapp/abci.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,33 +167,33 @@ func (app *BaseApp) CheckTx(req abci.RequestCheckTx) abci.ResponseCheckTx {
return sdkerrors.ResponseCheckTx(err, 0, 0, app.trace)
}

var mode runTxMode

switch {
case req.Type == abci.CheckTxType_New:
mode = runTxModeCheck

case req.Type == abci.CheckTxType_Recheck:
mode = runTxModeReCheck

default:
if req.Type != abci.CheckTxType_New && req.Type != abci.CheckTxType_Recheck {
panic(fmt.Sprintf("unknown RequestCheckTx type: %s", req.Type))
}

gInfo, result, err := app.runTx(mode, req.Tx, tx)
gInfo, err := app.checkTx(req.Tx, tx, req.Type == abci.CheckTxType_Recheck)
if err != nil {
return sdkerrors.ResponseCheckTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)
}

return abci.ResponseCheckTx{
GasWanted: int64(gInfo.GasWanted), // TODO: Should type accept unsigned ints?
GasUsed: int64(gInfo.GasUsed), // TODO: Should type accept unsigned ints?
Log: result.Log,
Data: result.Data,
Events: result.Events.ToABCIEvents(),
}
}

// BeginRecheckTx implements the ABCI interface and set the check state based on the given header
func (app *BaseApp) BeginRecheckTx(req abci.RequestBeginRecheckTx) abci.ResponseBeginRecheckTx {
// NOTE: This is safe because Tendermint holds a lock on the mempool for Rechecking.
app.setCheckState(req.Header)
return abci.ResponseBeginRecheckTx{Code: abci.CodeTypeOK}
}

// EndRecheckTx implements the ABCI interface.
func (app *BaseApp) EndRecheckTx(req abci.RequestEndRecheckTx) abci.ResponseEndRecheckTx {
return abci.ResponseEndRecheckTx{Code: abci.CodeTypeOK}
}

// DeliverTx implements the ABCI interface and executes a tx in DeliverTx mode.
// State only gets persisted if all messages are valid and get executed successfully.
// Otherwise, the ResponseDeliverTx will contain releveant error information.
Expand All @@ -205,7 +205,7 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx
return sdkerrors.ResponseDeliverTx(err, 0, 0, app.trace)
}

gInfo, result, err := app.runTx(runTxModeDeliver, req.Tx, tx)
gInfo, result, err := app.runTx(req.Tx, tx, false)
if err != nil {
return sdkerrors.ResponseDeliverTx(err, gInfo.GasWanted, gInfo.GasUsed, app.trace)
}
Expand All @@ -221,11 +221,10 @@ func (app *BaseApp) DeliverTx(req abci.RequestDeliverTx) abci.ResponseDeliverTx

// Commit implements the ABCI interface. It will commit all state that exists in
// the deliver state's multi-store and includes the resulting commit ID in the
// returned abci.ResponseCommit. Commit will set the check state based on the
// latest header and reset the deliver state. Also, if a non-zero halt height is
// defined in config, Commit will execute a deferred function call to check
// against that height and gracefully halt if it matches the latest committed
// height.
// returned abci.ResponseCommit. Commit will reset the deliver state.
// Also, if a non-zero halt height is defined in config, Commit will execute
// a deferred function call to check against that height and gracefully halt if
// it matches the latest committed height.
func (app *BaseApp) Commit() (res abci.ResponseCommit) {
header := app.deliverState.ctx.BlockHeader()

Expand All @@ -236,12 +235,6 @@ func (app *BaseApp) Commit() (res abci.ResponseCommit) {
commitID := app.cms.Commit()
app.logger.Debug("Commit synced", "commit", fmt.Sprintf("%X", commitID))

// Reset the Check state to the latest committed.
//
// NOTE: This is safe because Tendermint holds a lock on the mempool for
// Commit. Use the header from this latest block.
app.setCheckState(header)

// empty/reset the deliver state
app.deliverState = nil

Expand Down
82 changes: 82 additions & 0 deletions baseapp/accountlock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
package baseapp

import (
"encoding/binary"
"sort"
"sync"

sdk "github.com/cosmos/cosmos-sdk/types"
)

const sampleBytes = 2

type AccountLock struct {
accMtx [1 << (sampleBytes * 8)]sync.Mutex
}

func (al *AccountLock) Lock(ctx sdk.Context, tx sdk.Tx) []uint32 {
if !ctx.IsCheckTx() || ctx.IsReCheckTx() {
return nil
}

signers := getSigners(tx)
accKeys := getUniqSortedAddressKey(signers)

for _, key := range accKeys {
al.accMtx[key].Lock()
}

return accKeys
}

func (al *AccountLock) Unlock(accKeys []uint32) {
// NOTE reverse order
for i, length := 0, len(accKeys); i < length; i++ {
key := accKeys[length-1-i]
al.accMtx[key].Unlock()
}
}

func getSigners(tx sdk.Tx) []sdk.AccAddress {
seen := map[string]bool{}
var signers []sdk.AccAddress
for _, msg := range tx.GetMsgs() {
for _, addr := range msg.GetSigners() {
if !seen[addr.String()] {
signers = append(signers, addr)
seen[addr.String()] = true
}
}
}
return signers
}

func getUniqSortedAddressKey(addrs []sdk.AccAddress) []uint32 {
accKeys := make([]uint32, 0, len(addrs))
for _, addr := range addrs {
tail3 := addr[:sampleBytes]
tail := append([]byte{0}, tail3...)

accKey := binary.BigEndian.Uint32(tail)
accKeys = append(accKeys, accKey)
}

accKeys = uniq(accKeys)
sort.Slice(accKeys, func(i, j int) bool {
return i < j
})

return accKeys
}

func uniq(u []uint32) []uint32 {
seen := map[uint32]bool{}
var ret []uint32
for _, v := range u {
if !seen[v] {
ret = append(ret, v)
seen[v] = true
}
}
return ret
}
113 changes: 113 additions & 0 deletions baseapp/accountlock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
package baseapp

import (
"reflect"
"sync"
"testing"

"github.com/stretchr/testify/require"

abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto"
"github.com/tendermint/tendermint/crypto/secp256k1"

sdk "github.com/cosmos/cosmos-sdk/types"
)

func TestAccountLock(t *testing.T) {
app := setupBaseApp(t)
ctx := app.NewContext(true, abci.Header{})

privs := newTestPrivKeys(3)
tx := newTestTx(privs)

accKeys := app.accountLock.Lock(ctx, tx)

for _, accKey := range accKeys {
require.True(t, isMutexLock(&app.accountLock.accMtx[accKey]))
}

app.accountLock.Unlock(accKeys)

for _, accKey := range accKeys {
require.False(t, isMutexLock(&app.accountLock.accMtx[accKey]))
}
}

func TestUnlockDoNothingWithNil(t *testing.T) {
app := setupBaseApp(t)
require.NotPanics(t, func() { app.accountLock.Unlock(nil) })
}

func TestGetSigner(t *testing.T) {
privs := newTestPrivKeys(3)
tx := newTestTx(privs)
signers := getSigners(tx)

require.Equal(t, getAddrs(privs), signers)
}

func TestGetUniqSortedAddressKey(t *testing.T) {
privs := newTestPrivKeys(3)

addrs := getAddrs(privs)
addrs = append(addrs, addrs[1], addrs[0])
require.Equal(t, 5, len(addrs))

accKeys := getUniqSortedAddressKey(addrs)
require.Less(t, len(accKeys), len(addrs))

// check uniqueness
for i, iv := range accKeys {
for j, jv := range accKeys {
if i != j {
require.True(t, iv != jv)
}
}
}
}

type AccountLockTestTx struct {
Msgs []sdk.Msg
}

var _ sdk.Tx = AccountLockTestTx{}

func (tx AccountLockTestTx) GetMsgs() []sdk.Msg {
return tx.Msgs
}

func (tx AccountLockTestTx) ValidateBasic() error {
return nil
}

func newTestPrivKeys(num int) []crypto.PrivKey {
privs := make([]crypto.PrivKey, 0, num)
for i := 0; i < num; i++ {
privs = append(privs, secp256k1.GenPrivKey())
}
return privs
}

func getAddrs(privs []crypto.PrivKey) []sdk.AccAddress {
addrs := make([]sdk.AccAddress, 0, len(privs))
for _, priv := range privs {
addrs = append(addrs, sdk.AccAddress(priv.PubKey().Address()))
}
return addrs
}

func newTestTx(privs []crypto.PrivKey) sdk.Tx {
addrs := getAddrs(privs)
msgs := make([]sdk.Msg, len(addrs))
for i, addr := range addrs {
msgs[i] = sdk.NewTestMsg(addr)
}
return AccountLockTestTx{Msgs: msgs}
}

// Hack (too slow)
func isMutexLock(mtx *sync.Mutex) bool {
state := reflect.ValueOf(mtx).Elem().FieldByName("state")
return state.Int() == 1
}
Loading