From e741ce5b63a715c10a5e96839c8d94e1bf6da6e9 Mon Sep 17 00:00:00 2001 From: WangFeifan <314130948@qq.com> Date: Thu, 25 Jul 2019 14:35:04 +0800 Subject: [PATCH] Geea-fabric optimized tps from 65 to 90 --- common/flogging/logging.go | 2 +- core/endorser/endorser.go | 10 ++++---- core/endorser/support.go | 26 ++++++++++++++++++- core/handlers/auth/filter/expiration.go | 6 ++--- core/ledger/kvledger/kv_ledger.go | 10 +++++--- core/ledger/pvtdatastorage/store_impl.go | 3 ++- gossip/privdata/coordinator.go | 8 +++++- msp/identities.go | 17 +++++++++++++ peer/node/start.go | 32 +++++++++++++++--------- 9 files changed, 86 insertions(+), 28 deletions(-) diff --git a/common/flogging/logging.go b/common/flogging/logging.go index a2047408567..3ddddb50241 100644 --- a/common/flogging/logging.go +++ b/common/flogging/logging.go @@ -196,7 +196,7 @@ func (s *Logging) ZapLogger(name string) *zap.Logger { // always return true here because the core's Check() // method computes the level for the logger name based // on the active logging spec - levelEnabler := zap.LevelEnablerFunc(func(l zapcore.Level) bool { return true }) + levelEnabler := zap.LevelEnablerFunc(func(l zapcore.Level) bool { return l >= s.defaultLevel }) s.mutex.RLock() core := &Core{ diff --git a/core/endorser/endorser.go b/core/endorser/endorser.go index 208a0562daa..d7b07d66298 100644 --- a/core/endorser/endorser.go +++ b/core/endorser/endorser.go @@ -372,11 +372,11 @@ func (e *Endorser) preProcess(signedProp *pb.SignedProposal) (*validateResult, e if chainID != "" { // Here we handle uniqueness check and ACLs for proposals targeting a chain // Notice that ValidateProposalMessage has already verified that TxID is computed properly - if _, err = e.s.GetTransactionByID(chainID, txid); err == nil { - err = errors.Errorf("duplicate transaction found [%s]. Creator [%x]", txid, shdr.Creator) - vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}} - return vr, err - } + // if _, err = e.s.GetTransactionByID(chainID, txid); err == nil { + // err = errors.Errorf("duplicate transaction found [%s]. Creator [%x]", txid, shdr.Creator) + // vr.resp = &pb.ProposalResponse{Response: &pb.Response{Status: 500, Message: err.Error()}} + // return vr, err + // } // check ACL only for application chaincodes; ACLs // for system chaincodes are checked elsewhere diff --git a/core/endorser/support.go b/core/endorser/support.go index 78f160913d3..ca87dcd4ce2 100644 --- a/core/endorser/support.go +++ b/core/endorser/support.go @@ -8,6 +8,8 @@ package endorser import ( "fmt" + "sync" + "time" "github.com/hyperledger/fabric/common/channelconfig" "github.com/hyperledger/fabric/common/crypto" @@ -147,9 +149,31 @@ func (s *SupportImpl) Execute(txParams *ccprovider.TransactionParams, cid, name, return s.ChaincodeSupport.Execute(txParams, cccid, input) } +var ( + rwMutex sync.RWMutex + expiredDuration = 5 * time.Second + cachedChaincodeDefinitionMap = make(map[string]cachedChaincodeDefinition) +) + +type cachedChaincodeDefinition struct { + cachedAt time.Time + cachedMap ccprovider.ChaincodeDefinition +} + // GetChaincodeDefinition returns ccprovider.ChaincodeDefinition for the chaincode with the supplied name func (s *SupportImpl) GetChaincodeDefinition(chaincodeName string, txsim ledger.QueryExecutor) (ccprovider.ChaincodeDefinition, error) { - return s.ChaincodeSupport.Lifecycle.ChaincodeDefinition(chaincodeName, txsim) + rwMutex.RLock() + cachedObj, exists := cachedChaincodeDefinitionMap[chaincodeName] + rwMutex.RUnlock() + if exists && time.Since(cachedObj.cachedAt) < expiredDuration { + return cachedObj.cachedMap, nil + } + + chaincodeDefinition, err := s.ChaincodeSupport.Lifecycle.ChaincodeDefinition(chaincodeName, txsim) + rwMutex.Lock() + cachedChaincodeDefinitionMap[chaincodeName] = cachedChaincodeDefinition{time.Now(), chaincodeDefinition} + rwMutex.Unlock() + return chaincodeDefinition, err } // CheckACL checks the ACL for the resource for the Channel using the diff --git a/core/handlers/auth/filter/expiration.go b/core/handlers/auth/filter/expiration.go index 6677d47ab27..3e3979d6aa9 100644 --- a/core/handlers/auth/filter/expiration.go +++ b/core/handlers/auth/filter/expiration.go @@ -55,8 +55,8 @@ func validateProposal(signedProp *peer.SignedProposal) error { // ProcessProposal processes a signed proposal func (f *expirationCheckFilter) ProcessProposal(ctx context.Context, signedProp *peer.SignedProposal) (*peer.ProposalResponse, error) { - if err := validateProposal(signedProp); err != nil { - return nil, err - } + // if err := validateProposal(signedProp); err != nil { + // return nil, err + // } return f.next.ProcessProposal(ctx, signedProp) } diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index 271655b23e4..ae8dfec4ba5 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -324,10 +324,12 @@ func (l *kvLedger) CommitWithPvtData(pvtdataAndBlock *ledger.BlockAndPvtData) er // History database could be written in parallel with state and/or async as a future optimization, // although it has not been a bottleneck...no need to clutter the log with elapsed duration. if ledgerconfig.IsHistoryDBEnabled() { - logger.Debugf("[%s] Committing block [%d] transactions to history database", l.ledgerID, blockNo) - if err := l.historyDB.Commit(block); err != nil { - panic(errors.WithMessage(err, "Error during commit to history db")) - } + go func() { + logger.Debugf("[%s] Committing block [%d] transactions to history database", l.ledgerID, blockNo) + if err := l.historyDB.Commit(block); err != nil { + panic(errors.WithMessage(err, "Error during commit to history db")) + } + }() } elapsedCommitWithPvtData := time.Since(startBlockProcessing) diff --git a/core/ledger/pvtdatastorage/store_impl.go b/core/ledger/pvtdatastorage/store_impl.go index 6f2560f2258..bf88d4270fa 100644 --- a/core/ledger/pvtdatastorage/store_impl.go +++ b/core/ledger/pvtdatastorage/store_impl.go @@ -206,7 +206,8 @@ func (s *store) Prepare(blockNum uint64, pvtData []*ledger.TxPvtData, missingPvt } batch.Put(pendingCommitKey, emptyValue) - if err := s.db.WriteBatch(batch, true); err != nil { + // Make private data batch writes asynchronous + if err := s.db.WriteBatch(batch, false); err != nil { return err } s.batchPending = true diff --git a/gossip/privdata/coordinator.go b/gossip/privdata/coordinator.go index 720af7275b1..bc1475254e4 100644 --- a/gossip/privdata/coordinator.go +++ b/gossip/privdata/coordinator.go @@ -10,6 +10,7 @@ import ( "bytes" "encoding/hex" "fmt" + "sync" "time" "github.com/golang/protobuf/proto" @@ -141,6 +142,11 @@ func (c *coordinator) StorePvtData(txID string, privData *transientstore2.TxPvtR return c.TransientStore.PersistWithConfig(txID, blkHeight, privData) } +var ( + retryThresh time.Duration + retryThreshInitOnce sync.Once +) + // StoreBlock stores block with private data into the ledger func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDataCollections) error { if block.Data == nil { @@ -177,7 +183,7 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa return err } - retryThresh := viper.GetDuration("peer.gossip.pvtData.pullRetryThreshold") + retryThreshInitOnce.Do(func() { retryThresh = viper.GetDuration("peer.gossip.pvtData.pullRetryThreshold") }) var bFetchFromPeers bool // defaults to false if len(privateInfo.missingKeys) == 0 { logger.Debugf("[%s] No missing collection private write sets to fetch from remote peers", c.ChainID) diff --git a/msp/identities.go b/msp/identities.go index c15219566d3..0d1b5133fcf 100644 --- a/msp/identities.go +++ b/msp/identities.go @@ -9,9 +9,11 @@ package msp import ( "crypto" "crypto/rand" + "crypto/sha256" "crypto/x509" "encoding/hex" "encoding/pem" + "sync" "time" "github.com/golang/protobuf/proto" @@ -138,11 +140,23 @@ func NewSerializedIdentity(mspID string, certPEM []byte) ([]byte, error) { return raw, nil } +var ( + cachedVerifiedMap = make(map[[32]byte]struct{}) + rwMutex sync.RWMutex +) + // Verify checks against a signature and a message // to determine whether this identity produced the // signature; it returns nil if so or an error otherwise func (id *identity) Verify(msg []byte, sig []byte) error { // mspIdentityLogger.Infof("Verifying signature") + cached := sha256.Sum256(append(msg, sig...)) + rwMutex.RLock() + _, exists := cachedVerifiedMap[cached] + rwMutex.RUnlock() + if exists { + return nil + } // Compute Hash hashOpt, err := id.getHashOpt(id.msp.cryptoConfig.SignatureHashFamily) @@ -166,6 +180,9 @@ func (id *identity) Verify(msg []byte, sig []byte) error { } else if !valid { return errors.New("The signature is invalid") } + rwMutex.Lock() + cachedVerifiedMap[cached] = struct{}{} + rwMutex.Unlock() return nil } diff --git a/peer/node/start.go b/peer/node/start.go index 5d4ba394a1b..79237361f0e 100644 --- a/peer/node/start.go +++ b/peer/node/start.go @@ -8,6 +8,7 @@ package node import ( "fmt" + "log" "net" "net/http" "os" @@ -15,6 +16,9 @@ import ( "syscall" "time" + // to open debug pprof + _ "net/http/pprof" + "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric/common/cauthdsl" ccdef "github.com/hyperledger/fabric/common/chaincode" @@ -31,7 +35,7 @@ import ( "github.com/hyperledger/fabric/core/aclmgmt" "github.com/hyperledger/fabric/core/aclmgmt/resources" "github.com/hyperledger/fabric/core/admin" - "github.com/hyperledger/fabric/core/cclifecycle" + cc "github.com/hyperledger/fabric/core/cclifecycle" "github.com/hyperledger/fabric/core/chaincode" "github.com/hyperledger/fabric/core/chaincode/accesscontrol" "github.com/hyperledger/fabric/core/chaincode/lifecycle" @@ -53,7 +57,7 @@ import ( endorsement2 "github.com/hyperledger/fabric/core/handlers/endorsement/api" endorsement3 "github.com/hyperledger/fabric/core/handlers/endorsement/api/identities" "github.com/hyperledger/fabric/core/handlers/library" - "github.com/hyperledger/fabric/core/handlers/validation/api" + validation "github.com/hyperledger/fabric/core/handlers/validation/api" "github.com/hyperledger/fabric/core/ledger/cceventmgmt" "github.com/hyperledger/fabric/core/ledger/ledgermgmt" "github.com/hyperledger/fabric/core/operations" @@ -114,6 +118,10 @@ var nodeStartCmd = &cobra.Command{ if len(args) != 0 { return fmt.Errorf("trailing args detected") } + + go func() { + log.Println(http.ListenAndServe(":8061", nil)) + }() // Parsing of the command line is done so silence cmd usage cmd.SilenceUsage = true return serve(args) @@ -211,11 +219,11 @@ func serve(args []string) error { serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "PeerServer") serverConfig.MetricsProvider = metricsProvider - serverConfig.UnaryInterceptors = append( - serverConfig.UnaryInterceptors, - grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)), - grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), - ) + // serverConfig.UnaryInterceptors = append( + // serverConfig.UnaryInterceptors, + // grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)), + // grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), + // ) serverConfig.StreamInterceptors = append( serverConfig.StreamInterceptors, grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)), @@ -779,11 +787,11 @@ func startAdminServer(peerListenAddr string, peerServer *grpc.Server, metricsPro } serverConfig.Logger = flogging.MustGetLogger("core.comm").With("server", "AdminServer") serverConfig.MetricsProvider = metricsProvider - serverConfig.UnaryInterceptors = append( - serverConfig.UnaryInterceptors, - grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)), - grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), - ) + // serverConfig.UnaryInterceptors = append( + // serverConfig.UnaryInterceptors, + // grpcmetrics.UnaryServerInterceptor(grpcmetrics.NewUnaryMetrics(metricsProvider)), + // grpclogging.UnaryServerInterceptor(flogging.MustGetLogger("comm.grpc.server").Zap()), + // ) serverConfig.StreamInterceptors = append( serverConfig.StreamInterceptors, grpcmetrics.StreamServerInterceptor(grpcmetrics.NewStreamMetrics(metricsProvider)),