diff --git a/common/p2pmessage/p2pmessage.go b/common/p2pmessage/p2pmessage.go index 435a7e00831..d20c50ae920 100644 --- a/common/p2pmessage/p2pmessage.go +++ b/common/p2pmessage/p2pmessage.go @@ -2,11 +2,13 @@ package p2pmessage import ( "context" + "fmt" "github.com/hyperledger/fabric/common/crypto" "github.com/hyperledger/fabric/common/flogging" "github.com/hyperledger/fabric/common/ledger/blockledger" "github.com/hyperledger/fabric/common/policies" "github.com/hyperledger/fabric/common/util" + gossipprivdata "github.com/hyperledger/fabric/gossip/privdata" "github.com/hyperledger/fabric/internal/peer/protos" "time" ) @@ -113,11 +115,21 @@ func (h *Handler) Handle(ctx context.Context, request *protos.ReconcileRequest) defer h.Metrics.StreamsClosed.Add(1) reconcileResponse := &protos.ReconcileResponse{ - Success: true, - Message: "I amd sending the response! It's me server", + Success: false, } - return reconcileResponse, nil + // Calling Reconciler Service + // reconcilerServiceRegistry := gossipprivdata.NewOnDemandReconcilerService() + reconciler := gossipprivdata.GetOnDemandReconcilerService(request.ChannelId) + fmt.Println(reconciler) + + if reconciler == nil { + reconcileResponse.Message = "no reconciler found for channel " + request.ChannelId + + return reconcileResponse, fmt.Errorf("no reconciler found for channel " + request.ChannelId) + } + response, err := reconciler.Reconcile(request.BlockNumber) + return &response, err } // ExtractChannelHeaderCertHash extracts the TLS cert hash from a channel header. diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index f354a449bd7..3ebe8b92505 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -464,8 +464,8 @@ func (l *kvLedger) filterYetToCommitBlocks(blocksPvtData map[uint64][]*ledger.Tx return nil } -//recommitLostBlocks retrieves blocks in specified range and commit the write set to either -//state DB or history DB or both +// recommitLostBlocks retrieves blocks in specified range and commit the write set to either +// state DB or history DB or both func (l *kvLedger) recommitLostBlocks(firstBlockNum uint64, lastBlockNum uint64, recoverables ...recoverable) error { logger.Infof("Recommitting lost blocks - firstBlockNum=%d, lastBlockNum=%d, recoverables=%#v", firstBlockNum, lastBlockNum, recoverables) var err error @@ -762,6 +762,23 @@ func (l *kvLedger) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledge return l.pvtdataStore.GetMissingPvtDataInfoForMostRecentBlocks(maxBlock) } +// GetMissingPvtDataInfoForSpecificBlock returns the missing private data information for the +// specified blocks which miss at least a private data of a eligible collection. +func (l *kvLedger) GetMissingPvtDataInfoForSpecificBlock(blockNumber uint64) (ledger.MissingPvtDataInfo, error) { + // the missing pvtData info in the pvtdataStore could belong to a block which is yet + // to be processed and committed to the blockStore and stateDB (such a scenario is possible + // after a peer rollback). In such cases, we cannot return missing pvtData info. Otherwise, + // we would end up in an inconsistent state database. + if l.isPvtstoreAheadOfBlkstore.Load().(bool) { + return nil, nil + } + // it is safe to not acquire a read lock on l.blockAPIsRWLock. Without a lock, the value of + // lastCommittedBlock can change due to a new block commit. As a result, we may not + // be able to fetch the missing data info of truly the most recent blocks. This + // decision was made to ensure that the regular block commit rate is not affected. + return l.pvtdataStore.GetMissingPvtDataInfoForSpecificBlock(blockNumber) +} + func (l *kvLedger) addBlockCommitHash(block *common.Block, updateBatchBytes []byte) { var valueBytes []byte @@ -812,9 +829,12 @@ func (l *kvLedger) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilte // DoesPvtDataInfoExist returns true when // (1) the ledger has pvtdata associated with the given block number (or) // (2) a few or all pvtdata associated with the given block number is missing but the -// missing info is recorded in the ledger (or) +// +// missing info is recorded in the ledger (or) +// // (3) the block is committed but it does not contain even a single -// transaction with pvtData. +// +// transaction with pvtData. func (l *kvLedger) DoesPvtDataInfoExist(blockNum uint64) (bool, error) { pvtStoreHt, err := l.pvtdataStore.LastCommittedBlockHeight() if err != nil { diff --git a/core/ledger/ledger_interface.go b/core/ledger/ledger_interface.go index 38ad285d5d5..9abde93c197 100644 --- a/core/ledger/ledger_interface.go +++ b/core/ledger/ledger_interface.go @@ -536,6 +536,7 @@ type ConfigHistoryRetriever interface { // MissingPvtDataTracker allows getting information about the private data that is not missing on the peer type MissingPvtDataTracker interface { GetMissingPvtDataInfoForMostRecentBlocks(maxBlocks int) (MissingPvtDataInfo, error) + GetMissingPvtDataInfoForSpecificBlock(blockNumber uint64) (MissingPvtDataInfo, error) } // MissingPvtDataInfo is a map of block number to MissingBlockPvtdataInfo diff --git a/core/ledger/pvtdatastorage/store.go b/core/ledger/pvtdatastorage/store.go index 6a2975dd6d5..fe370c0b730 100644 --- a/core/ledger/pvtdatastorage/store.go +++ b/core/ledger/pvtdatastorage/store.go @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0 package pvtdatastorage import ( + "fmt" "sync" "sync/atomic" "time" @@ -476,6 +477,41 @@ func (s *Store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.M return s.getMissingData(elgPrioritizedMissingDataGroup, maxBlock) } +// GetMissingPvtDataInfoForSpecificBlock returns the missing private data information for the +// specified block which miss at least a private data of a eligible collection. +func (s *Store) GetMissingPvtDataInfoForSpecificBlock(blockNumber uint64) (ledger.MissingPvtDataInfo, error) { + // we assume that this function would be called by the gossip only after processing the + // last retrieved missing pvtdata info and committing the same. + if blockNumber < 1 { + return nil, fmt.Errorf("invalid block number [%d]", blockNumber) + } + + lastCommittedBlock := atomic.LoadUint64(&s.lastCommittedBlock) + if blockNumber > lastCommittedBlock { + return nil, fmt.Errorf("block [%d] is not committed yet to the ledger", blockNumber) + } + + logger.Debug("fetching missing pvtdata entries from the de-prioritized list") + dePrioratizedQue, err := s.getMissingDataFromSpecificBlock(elgDeprioritizedMissingDataGroup, blockNumber) + if err != nil { + logger.Debugf("Error in fetching missing pvtdata entries from the de-prioritized list: %s", err) + return nil, err + } + logger.Debug("fetching missing pvtdata entries from the prioritized list") + prioritizedQue, err := s.getMissingDataFromSpecificBlock(elgPrioritizedMissingDataGroup, blockNumber) + if err != nil { + logger.Debugf("Error in fetching missing pvtdata entries from the prioritized list: %s", err) + return nil, err + } + + for k, v := range dePrioratizedQue { + prioritizedQue[k] = v // This will overwrite map1's entry if the key exists + } + + return prioritizedQue, nil + +} + func (s *Store) getMissingData(group []byte, maxBlock int) (ledger.MissingPvtDataInfo, error) { missingPvtDataInfo := make(ledger.MissingPvtDataInfo) numberOfBlockProcessed := 0 @@ -550,6 +586,54 @@ func (s *Store) getMissingData(group []byte, maxBlock int) (ledger.MissingPvtDat return missingPvtDataInfo, nil } +func (s *Store) getMissingDataFromSpecificBlock(group []byte, blockNumber uint64) (ledger.MissingPvtDataInfo, error) { + missingPvtDataInfo := make(ledger.MissingPvtDataInfo) + + startKey, endKey := createRangeScanKeysForElgMissingData(blockNumber, group) + dbItr, err := s.db.GetIterator(startKey, endKey) + if err != nil { + return nil, err + } + defer dbItr.Release() + + for dbItr.Next() { + missingDataKeyBytes := dbItr.Key() + missingDataKey := decodeElgMissingDataKey(missingDataKeyBytes) + + // check whether the entry is expired. If so, move to the next item. + // As we may use the old lastCommittedBlock value, there is a possibility that + // this missing data is actually expired but we may get the stale information. + // Though it may leads to extra work of pulling the expired data, it will not + // affect the correctness. Further, as we try to fetch the most recent missing + // data (less possibility of expiring now), such scenario would be rare. In the + // best case, we can load the latest lastCommittedBlock value here atomically to + // make this scenario very rare. + expired, err := isExpired(missingDataKey.nsCollBlk, s.btlPolicy, blockNumber) + if err != nil { + return nil, err + } + if expired { + continue + } + + valueBytes := dbItr.Value() + bitmap, err := decodeMissingDataValue(valueBytes) + if err != nil { + return nil, err + } + + // for each transaction which misses private data, make an entry in missingBlockPvtDataInfo + for index, isSet := bitmap.NextSet(0); isSet; index, isSet = bitmap.NextSet(index + 1) { + txNum := uint64(index) + missingPvtDataInfo.Add(missingDataKey.blkNum, txNum, missingDataKey.ns, missingDataKey.coll) + } + + break + } + + return missingPvtDataInfo, nil +} + // FetchBootKVHashes returns the KVHashes from the data that was loaded from a snapshot at the time of // bootstrapping. This funciton returns an error if the supplied blkNum is greater than the last block // number in the booting snapshot diff --git a/gossip/privdata/reconcile.go b/gossip/privdata/reconcile.go index 145e88c245d..bcda187bdae 100644 --- a/gossip/privdata/reconcile.go +++ b/gossip/privdata/reconcile.go @@ -9,6 +9,7 @@ package privdata import ( "encoding/hex" "fmt" + "github.com/hyperledger/fabric/internal/peer/protos" "math" "sync" "time" @@ -53,6 +54,9 @@ type PvtDataReconciler interface { Start() // Stop function stops reconciler Stop() + + //Reconcile performs on demand reconcilation a block or transaction + Reconcile(uint65 uint64) (protos.ReconcileResponse, error) } type Reconciler struct { @@ -68,6 +72,25 @@ type Reconciler struct { committer.Committer } +var ReconcilerServiceRegistry = make(map[string]PvtDataReconciler) + +// SetOnDemandReconcilerService sets a reconciler service by name +func SetOnDemandReconcilerService(name string, reconciler PvtDataReconciler) { + ReconcilerServiceRegistry[name] = reconciler +} + +func GetOnDemandReconcilerService(name string) PvtDataReconciler { + + if len(ReconcilerServiceRegistry) == 0 { + return nil + } + if ReconcilerServiceRegistry[name] == nil { + return nil + } + + return ReconcilerServiceRegistry[name] +} + // NoOpReconciler non functional reconciler to be used // in case reconciliation has been disabled type NoOpReconciler struct { @@ -82,6 +105,11 @@ func (*NoOpReconciler) Stop() { // do nothing } +func (*NoOpReconciler) Reconcile(num uint64) (protos.ReconcileResponse, error) { + logger.Debug("Private data reconciliation has been disabled") + return protos.ReconcileResponse{Success: false, Message: "got nil as MissingPvtDataTracker, exiting..."}, nil +} + // NewReconciler creates a new instance of reconciler func NewReconciler(channel string, metrics *metrics.PrivdataMetrics, c committer.Committer, fetcher ReconciliationFetcher, config *PrivdataConfig) *Reconciler { @@ -111,6 +139,10 @@ func (r *Reconciler) Start() { }) } +func (r *Reconciler) Reconcile(num uint64) (protos.ReconcileResponse, error) { + return r.reconcileSpecific(num) +} + func (r *Reconciler) run() { for { select { @@ -126,6 +158,60 @@ func (r *Reconciler) run() { } } +// ReconcileSpecific initiates a reconcilation for specific block and returns the number of items that were reconciled , minBlock, maxBlock (blocks range) and an error +func (r *Reconciler) reconcileSpecific(num uint64) (protos.ReconcileResponse, error) { + missingPvtDataTracker, err := r.GetMissingPvtDataTracker() + if err != nil { + r.logger.Error("reconciliation error when trying to get missingPvtDataTracker:", err) + return protos.ReconcileResponse{Success: false, Message: err.Error()}, err + } + if missingPvtDataTracker == nil { + r.logger.Error("got nil as MissingPvtDataTracker, exiting...") + return protos.ReconcileResponse{Success: false, Message: "got nil as MissingPvtDataTracker, exiting..."}, nil + } + totalReconciled := 0 + + defer r.reportReconciliationDuration(time.Now()) + + // for { + missingPvtDataInfo, err := missingPvtDataTracker.GetMissingPvtDataInfoForSpecificBlock(num) + if err != nil { + r.logger.Errorf("reconciliation error when trying to get missing pvt data info for the block [%d] blocks: error %v", num, err.Error()) + return protos.ReconcileResponse{Success: false, Message: err.Error()}, err + } + // if missingPvtDataInfo is nil, len will return 0 + if len(missingPvtDataInfo) == 0 { + r.logger.Debug("Reconciliation cycle finished successfully. no items to reconcile") + return protos.ReconcileResponse{Success: true, Message: fmt.Sprintf("Reconciliation cycle finished successfully. nothing to reconcile for blocks range [%d]", num)}, nil + } + + r.logger.Debug("got from ledger", len(missingPvtDataInfo), "blocks with missing private data, trying to reconcile...") + + dig2collectionCfg, _, _ := r.getDig2CollectionConfig(missingPvtDataInfo) + fetchedData, err := r.FetchReconciledItems(dig2collectionCfg) + if err != nil { + r.logger.Error("reconciliation error when trying to fetch missing items from different peers:", err) + return protos.ReconcileResponse{Success: false, Message: err.Error()}, err + } + + pvtDataToCommit := r.preparePvtDataToCommit(fetchedData.AvailableElements) + unreconciled := constructUnreconciledMissingData(dig2collectionCfg, fetchedData.AvailableElements) + pvtdataHashMismatch, err := r.CommitPvtDataOfOldBlocks(pvtDataToCommit, unreconciled) + if err != nil { + return protos.ReconcileResponse{Success: false, Message: "failed to commit private data"}, err + } + r.logMismatched(pvtdataHashMismatch) + //if minB < minBlock { + // minBlock = minB + //} + //if maxB > maxBlock { + // maxBlock = maxB + //} + totalReconciled += len(fetchedData.AvailableElements) + // } + return protos.ReconcileResponse{Success: true, Message: fmt.Sprintf("Reconciliation cycle finished successfully. reconciled %d private data keys from blocks range [%d]", totalReconciled, num)}, nil +} + // returns the number of items that were reconciled , minBlock, maxBlock (blocks range) and an error func (r *Reconciler) reconcile() error { missingPvtDataTracker, err := r.GetMissingPvtDataTracker() diff --git a/gossip/service/gossip_service.go b/gossip/service/gossip_service.go index d900d79e2a4..bb375690518 100644 --- a/gossip/service/gossip_service.go +++ b/gossip/service/gossip_service.go @@ -366,6 +366,10 @@ func (g *GossipService) InitializeChannel(channelID string, ordererSource *order reconciler = &gossipprivdata.NoOpReconciler{} } + // reconcilerServiceRegistry := gossipprivdata.NewOnDemandReconcilerService() + // reconcilerServiceRegistry.SetOnDemandReconcilerService(channelID, reconciler) + gossipprivdata.SetOnDemandReconcilerService(channelID, reconciler) + pushAckTimeout := g.serviceConfig.PvtDataPushAckTimeout g.privateHandlers[channelID] = privateHandler{ support: support, diff --git a/internal/peer/reconciler/block.go b/internal/peer/reconciler/block.go index 65d5ad7f921..419a6201d42 100644 --- a/internal/peer/reconciler/block.go +++ b/internal/peer/reconciler/block.go @@ -26,6 +26,8 @@ func reconcileBlockCmd(cf *ReconcileCmdFactory) *cobra.Command { func reconcileBlock(cmd *cobra.Command, args []string, rf *ReconcileCmdFactory) error { + logger.Debugf("Received the arguments for reconcilation %d", args) + if len(args) == 0 { return fmt.Errorf("reconcile target required, block number") } @@ -59,12 +61,15 @@ func reconcileBlock(cmd *cobra.Command, args []string, rf *ReconcileCmdFactory) } } - response, _ := rf.DeliverClient.ReconcileSpecifiedBlock(uint64(blockNumber)) + response, err := rf.DeliverClient.ReconcileSpecifiedBlock(uint64(blockNumber)) - fmt.Println(response) + if err != nil { + fmt.Printf("Failed to reconcile block %d: %v\n", blockNumber, err.Error()) + } else { + fmt.Println(response) + } - logger.Debugf("Received the arguments for reconcilation %d", blockNumber) - fmt.Printf("Received the arguments for reconcilation %v on channel %s\n", args, channelID) + // fmt.Printf("Received the arguments for reconcilation %v on channel %s\n", args, channelID) err = rf.DeliverClient.Close() if err != nil {