Skip to content

Commit

Permalink
Merge pull request hyperledger#1 from buddingleader/geea-fabric
Browse files Browse the repository at this point in the history
Geea-fabric optimized tps from 65 to 90
  • Loading branch information
buddingleader authored Jul 25, 2019
2 parents 469501a + c7d8f74 commit 4c792a2
Show file tree
Hide file tree
Showing 9 changed files with 83 additions and 31 deletions.
7 changes: 6 additions & 1 deletion common/flogging/logging.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,12 @@ func (s *Logging) ZapLogger(name string) *zap.Logger {
panic(fmt.Sprintf("invalid logger name: %s", name))
}

s.mutex.RLock()
// always return true here because the core's Check()
// method computes the level for the logger name based
// on the active logging spec
levelEnabler := zap.LevelEnablerFunc(func(l zapcore.Level) bool { return l >= s.defaultLevel })

s.mutex.RLock()
core := &Core{
LevelEnabler: s.LoggerLevels,
Levels: s.LoggerLevels,
Expand Down
13 changes: 5 additions & 8 deletions core/endorser/endorser.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,14 +389,11 @@ func (e *Endorser) preProcess(signedProp *pb.SignedProposal) (*validateResult, e

// Here we handle uniqueness check and ACLs for proposals targeting a chain
// Notice that ValidateProposalMessage has already verified that TxID is computed properly
if _, err = e.s.GetTransactionByID(chainID, txid); err == nil {
// increment failure due to duplicate transactions. Useful for catching replay attacks in
// addition to benign retries
e.Metrics.DuplicateTxsFailure.With(meterLabels...).Add(1)
err = errors.Errorf("duplicate transaction found [%s]. Creator [%x]", txid, shdr.Creator)
vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
return vr, err
}
// if _, err = e.s.GetTransactionByID(chainID, txid); err == nil {
// err = errors.Errorf("duplicate transaction found [%s]. Creator [%x]", txid, shdr.Creator)
// vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}}
// return vr, err
// }

// check ACL only for application chaincodes; ACLs
// for system chaincodes are checked elsewhere
Expand Down
26 changes: 25 additions & 1 deletion core/endorser/support.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ package endorser

import (
"fmt"
"sync"
"time"

"github.com/hyperledger/fabric/common/channelconfig"
"github.com/hyperledger/fabric/common/crypto"
Expand Down Expand Up @@ -147,9 +149,31 @@ func (s *SupportImpl) Execute(txParams *ccprovider.TransactionParams, cid, name,
return s.ChaincodeSupport.Execute(txParams, cccid, input)
}

var (
rwMutex sync.RWMutex
expiredDuration = 5 * time.Second
cachedChaincodeDefinitionMap = make(map[string]cachedChaincodeDefinition)
)

type cachedChaincodeDefinition struct {
cachedAt time.Time
cachedMap ccprovider.ChaincodeDefinition
}

// GetChaincodeDefinition returns ccprovider.ChaincodeDefinition for the chaincode with the supplied name
func (s *SupportImpl) GetChaincodeDefinition(chaincodeName string, txsim ledger.QueryExecutor) (ccprovider.ChaincodeDefinition, error) {
return s.ChaincodeSupport.Lifecycle.ChaincodeDefinition(chaincodeName, txsim)
rwMutex.RLock()
cachedObj, exists := cachedChaincodeDefinitionMap[chaincodeName]
rwMutex.RUnlock()
if exists && time.Since(cachedObj.cachedAt) < expiredDuration {
return cachedObj.cachedMap, nil
}

chaincodeDefinition, err := s.ChaincodeSupport.Lifecycle.ChaincodeDefinition(chaincodeName, txsim)
rwMutex.Lock()
cachedChaincodeDefinitionMap[chaincodeName] = cachedChaincodeDefinition{time.Now(), chaincodeDefinition}
rwMutex.Unlock()
return chaincodeDefinition, err
}

// CheckACL checks the ACL for the resource for the Channel using the
Expand Down
6 changes: 3 additions & 3 deletions core/handlers/auth/filter/expiration.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ func validateProposal(signedProp *peer.SignedProposal) error {

// ProcessProposal processes a signed proposal
func (f *expirationCheckFilter) ProcessProposal(ctx context.Context, signedProp *peer.SignedProposal) (*peer.ProposalResponse, error) {
if err := validateProposal(signedProp); err != nil {
return nil, err
}
// if err := validateProposal(signedProp); err != nil {
// return nil, err
// }
return f.next.ProcessProposal(ctx, signedProp)
}
10 changes: 6 additions & 4 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,10 +406,12 @@ func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData, co
// History database could be written in parallel with state and/or async as a future optimization,
// although it has not been a bottleneck...no need to clutter the log with elapsed duration.
if ledgerconfig.IsHistoryDBEnabled() {
logger.Debugf("[%s] Committing block [%d] transactions to history database", l.ledgerID, blockNo)
if err := l.historyDB.Commit(block); err != nil {
panic(errors.WithMessage(err, "Error during commit to history db"))
}
go func() {
logger.Debugf("[%s] Committing block [%d] transactions to history database", l.ledgerID, blockNo)
if err := l.historyDB.Commit(block); err != nil {
panic(errors.WithMessage(err, "Error during commit to history db"))
}
}()
}

logger.Infof("[%s] Committed block [%d] with %d transaction(s) in %dms (state_validation=%dms block_and_pvtdata_commit=%dms state_commit=%dms)"+
Expand Down
3 changes: 2 additions & 1 deletion core/ledger/pvtdatastorage/store_impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,8 @@ func (s *store) Prepare(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvt
}

batch.Put(pendingCommitKey, emptyValue)
if err := s.db.WriteBatch(batch, true); err != nil {
// Make private data batch writes asynchronous
if err := s.db.WriteBatch(batch, false); err != nil {
return err
}
s.batchPending = true
Expand Down
1 change: 1 addition & 0 deletions gossip/privdata/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"bytes"
"encoding/hex"
"fmt"
"sync"
"time"

"github.com/golang/protobuf/proto"
Expand Down
17 changes: 17 additions & 0 deletions msp/identities.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,11 @@ package msp
import (
"crypto"
"crypto/rand"
"crypto/sha256"
"crypto/x509"
"encoding/hex"
"encoding/pem"
"sync"
"time"

"github.com/golang/protobuf/proto"
Expand Down Expand Up @@ -138,11 +140,23 @@ func NewSerializedIdentity(mspID string, certPEM []byte) ([]byte, error) {
return raw, nil
}

var (
cachedVerifiedMap = make(map[[32]byte]struct{})
rwMutex sync.RWMutex
)

// Verify checks against a signature and a message
// to determine whether this identity produced the
// signature; it returns nil if so or an error otherwise
func (id *identity) Verify(msg []byte, sig []byte) error {
// mspIdentityLogger.Infof("Verifying signature")
cached := sha256.Sum256(append(msg, sig...))
rwMutex.RLock()
_, exists := cachedVerifiedMap[cached]
rwMutex.RUnlock()
if exists {
return nil
}

// Compute Hash
hashOpt, err := id.getHashOpt(id.msp.cryptoConfig.SignatureHashFamily)
Expand All @@ -166,6 +180,9 @@ func (id *identity) Verify(msg []byte, sig []byte) error {
} else if !valid {
return errors.New("The signature is invalid")
}
rwMutex.Lock()
cachedVerifiedMap[cached] = struct{}{}
rwMutex.Unlock()

return nil
}
Expand Down
31 changes: 18 additions & 13 deletions peer/node/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package node
import (
"context"
"fmt"
"log"
"net"
"net/http"
"os"
Expand All @@ -17,6 +18,9 @@ import (
"syscall"
"time"

// to open debug pprof
_ "net/http/pprof"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric/common/cauthdsl"
ccdef "github.com/hyperledger/fabric/common/chaincode"
Expand Down Expand Up @@ -57,7 +61,6 @@ import (
endorsement3 "github.com/hyperledger/fabric/core/handlers/endorsement/api/identities"
"github.com/hyperledger/fabric/core/handlers/library"
validation "github.com/hyperledger/fabric/core/handlers/validation/api"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/cceventmgmt"
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
Expand Down Expand Up @@ -120,6 +123,10 @@ var nodeStartCmd = &cobra.Command{
if len(args) != 0 {
return fmt.Errorf("trailing args detected")
}

go func() {
log.Println(http.ListenAndServe(":8061", nil))
}()
// Parsing of the command line is done so silence cmd usage
cmd.SilenceUsage = true
return serve(args)
Expand Down Expand Up @@ -221,12 +228,11 @@ func serve(args []string) error {
throttle := comm.NewThrottle(grpcMaxConcurrency)
serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer")
serverConfig.MetricsProvider = metricsProvider
serverConfig.UnaryInterceptors = append(
serverConfig.UnaryInterceptors,
grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),
grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
throttle.UnaryServerIntercptor,
)
// serverConfig.UnaryInterceptors = append(
// serverConfig.UnaryInterceptors,
// grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),
// grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
// )
serverConfig.StreamInterceptors = append(
serverConfig.StreamInterceptors,
grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),
Expand Down Expand Up @@ -813,12 +819,11 @@ func startAdminServer(peerListenAddr string, peerServer *grpc.Server, metricsPro
throttle := comm.NewThrottle(grpcMaxConcurrency)
serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "AdminServer")
serverConfig.MetricsProvider = metricsProvider
serverConfig.UnaryInterceptors = append(
serverConfig.UnaryInterceptors,
grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),
grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
throttle.UnaryServerIntercptor,
)
// serverConfig.UnaryInterceptors = append(
// serverConfig.UnaryInterceptors,
// grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)),
// grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()),
// )
serverConfig.StreamInterceptors = append(
serverConfig.StreamInterceptors,
grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),
Expand Down

0 comments on commit 4c792a2

Please sign in to comment.