Skip to content

Commit

Permalink
e2e functionality for block reconcilation
Browse files Browse the repository at this point in the history
  • Loading branch information
tittuvarghese committed Sep 28, 2024
1 parent ad406a4 commit a81db93
Show file tree
Hide file tree
Showing 7 changed files with 223 additions and 11 deletions.
18 changes: 15 additions & 3 deletions common/p2pmessage/p2pmessage.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,13 @@ package p2pmessage

import (
"context"
"fmt"
"github.com/hyperledger/fabric/common/crypto"
"github.com/hyperledger/fabric/common/flogging"
"github.com/hyperledger/fabric/common/ledger/blockledger"
"github.com/hyperledger/fabric/common/policies"
"github.com/hyperledger/fabric/common/util"
gossipprivdata "github.com/hyperledger/fabric/gossip/privdata"
"github.com/hyperledger/fabric/internal/peer/protos"
"time"
)
Expand Down Expand Up @@ -113,11 +115,21 @@ func (h *Handler) Handle(ctx context.Context, request *protos.ReconcileRequest)
defer h.Metrics.StreamsClosed.Add(1)

reconcileResponse := &protos.ReconcileResponse{
Success: true,
Message: "I amd sending the response! It's me server",
Success: false,
}

return reconcileResponse, nil
// Calling Reconciler Service
// reconcilerServiceRegistry := gossipprivdata.NewOnDemandReconcilerService()
reconciler := gossipprivdata.GetOnDemandReconcilerService(request.ChannelId)
fmt.Println(reconciler)

if reconciler == nil {
reconcileResponse.Message = "no reconciler found for channel " + request.ChannelId

return reconcileResponse, fmt.Errorf("no reconciler found for channel " + request.ChannelId)
}
response, err := reconciler.Reconcile(request.BlockNumber)
return &response, err
}

// ExtractChannelHeaderCertHash extracts the TLS cert hash from a channel header.
Expand Down
28 changes: 24 additions & 4 deletions core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -464,8 +464,8 @@ func (l *kvLedger) filterYetToCommitBlocks(blocksPvtData map[uint64][]*ledger.Tx
return nil
}

//recommitLostBlocks retrieves blocks in specified range and commit the write set to either
//state DB or history DB or both
// recommitLostBlocks retrieves blocks in specified range and commit the write set to either
// state DB or history DB or both
func (l *kvLedger) recommitLostBlocks(firstBlockNum uint64, lastBlockNum uint64, recoverables ...recoverable) error {
logger.Infof("Recommitting lost blocks - firstBlockNum=%d, lastBlockNum=%d, recoverables=%#v", firstBlockNum, lastBlockNum, recoverables)
var err error
Expand Down Expand Up @@ -762,6 +762,23 @@ func (l *kvLedger) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledge
return l.pvtdataStore.GetMissingPvtDataInfoForMostRecentBlocks(maxBlock)
}

// GetMissingPvtDataInfoForSpecificBlock returns the missing private data information for the
// specified blocks which miss at least a private data of a eligible collection.
func (l *kvLedger) GetMissingPvtDataInfoForSpecificBlock(blockNumber uint64) (ledger.MissingPvtDataInfo, error) {
// the missing pvtData info in the pvtdataStore could belong to a block which is yet
// to be processed and committed to the blockStore and stateDB (such a scenario is possible
// after a peer rollback). In such cases, we cannot return missing pvtData info. Otherwise,
// we would end up in an inconsistent state database.
if l.isPvtstoreAheadOfBlkstore.Load().(bool) {
return nil, nil
}
// it is safe to not acquire a read lock on l.blockAPIsRWLock. Without a lock, the value of
// lastCommittedBlock can change due to a new block commit. As a result, we may not
// be able to fetch the missing data info of truly the most recent blocks. This
// decision was made to ensure that the regular block commit rate is not affected.
return l.pvtdataStore.GetMissingPvtDataInfoForSpecificBlock(blockNumber)
}

func (l *kvLedger) addBlockCommitHash(block *common.Block, updateBatchBytes []byte) {
var valueBytes []byte

Expand Down Expand Up @@ -812,9 +829,12 @@ func (l *kvLedger) GetPvtDataByNum(blockNum uint64, filter ledger.PvtNsCollFilte
// DoesPvtDataInfoExist returns true when
// (1) the ledger has pvtdata associated with the given block number (or)
// (2) a few or all pvtdata associated with the given block number is missing but the
// missing info is recorded in the ledger (or)
//
// missing info is recorded in the ledger (or)
//
// (3) the block is committed but it does not contain even a single
// transaction with pvtData.
//
// transaction with pvtData.
func (l *kvLedger) DoesPvtDataInfoExist(blockNum uint64) (bool, error) {
pvtStoreHt, err := l.pvtdataStore.LastCommittedBlockHeight()
if err != nil {
Expand Down
1 change: 1 addition & 0 deletions core/ledger/ledger_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ type ConfigHistoryRetriever interface {
// MissingPvtDataTracker allows getting information about the private data that is not missing on the peer
type MissingPvtDataTracker interface {
GetMissingPvtDataInfoForMostRecentBlocks(maxBlocks int) (MissingPvtDataInfo, error)
GetMissingPvtDataInfoForSpecificBlock(blockNumber uint64) (MissingPvtDataInfo, error)
}

// MissingPvtDataInfo is a map of block number to MissingBlockPvtdataInfo
Expand Down
84 changes: 84 additions & 0 deletions core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ SPDX-License-Identifier: Apache-2.0
package pvtdatastorage

import (
"fmt"
"sync"
"sync/atomic"
"time"
Expand Down Expand Up @@ -476,6 +477,41 @@ func (s *Store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.M
return s.getMissingData(elgPrioritizedMissingDataGroup, maxBlock)
}

// GetMissingPvtDataInfoForSpecificBlock returns the missing private data information for the
// specified block which miss at least a private data of a eligible collection.
func (s *Store) GetMissingPvtDataInfoForSpecificBlock(blockNumber uint64) (ledger.MissingPvtDataInfo, error) {
// we assume that this function would be called by the gossip only after processing the
// last retrieved missing pvtdata info and committing the same.
if blockNumber < 1 {
return nil, fmt.Errorf("invalid block number [%d]", blockNumber)
}

lastCommittedBlock := atomic.LoadUint64(&s.lastCommittedBlock)
if blockNumber > lastCommittedBlock {
return nil, fmt.Errorf("block [%d] is not committed yet to the ledger", blockNumber)
}

logger.Debug("fetching missing pvtdata entries from the de-prioritized list")
dePrioratizedQue, err := s.getMissingDataFromSpecificBlock(elgDeprioritizedMissingDataGroup, blockNumber)
if err != nil {
logger.Debugf("Error in fetching missing pvtdata entries from the de-prioritized list: %s", err)
return nil, err
}
logger.Debug("fetching missing pvtdata entries from the prioritized list")
prioritizedQue, err := s.getMissingDataFromSpecificBlock(elgPrioritizedMissingDataGroup, blockNumber)
if err != nil {
logger.Debugf("Error in fetching missing pvtdata entries from the prioritized list: %s", err)
return nil, err
}

for k, v := range dePrioratizedQue {
prioritizedQue[k] = v // This will overwrite map1's entry if the key exists
}

return prioritizedQue, nil

}

func (s *Store) getMissingData(group []byte, maxBlock int) (ledger.MissingPvtDataInfo, error) {
missingPvtDataInfo := make(ledger.MissingPvtDataInfo)
numberOfBlockProcessed := 0
Expand Down Expand Up @@ -550,6 +586,54 @@ func (s *Store) getMissingData(group []byte, maxBlock int) (ledger.MissingPvtDat
return missingPvtDataInfo, nil
}

func (s *Store) getMissingDataFromSpecificBlock(group []byte, blockNumber uint64) (ledger.MissingPvtDataInfo, error) {
missingPvtDataInfo := make(ledger.MissingPvtDataInfo)

startKey, endKey := createRangeScanKeysForElgMissingData(blockNumber, group)
dbItr, err := s.db.GetIterator(startKey, endKey)
if err != nil {
return nil, err
}
defer dbItr.Release()

for dbItr.Next() {
missingDataKeyBytes := dbItr.Key()
missingDataKey := decodeElgMissingDataKey(missingDataKeyBytes)

// check whether the entry is expired. If so, move to the next item.
// As we may use the old lastCommittedBlock value, there is a possibility that
// this missing data is actually expired but we may get the stale information.
// Though it may leads to extra work of pulling the expired data, it will not
// affect the correctness. Further, as we try to fetch the most recent missing
// data (less possibility of expiring now), such scenario would be rare. In the
// best case, we can load the latest lastCommittedBlock value here atomically to
// make this scenario very rare.
expired, err := isExpired(missingDataKey.nsCollBlk, s.btlPolicy, blockNumber)
if err != nil {
return nil, err
}
if expired {
continue
}

valueBytes := dbItr.Value()
bitmap, err := decodeMissingDataValue(valueBytes)
if err != nil {
return nil, err
}

// for each transaction which misses private data, make an entry in missingBlockPvtDataInfo
for index, isSet := bitmap.NextSet(0); isSet; index, isSet = bitmap.NextSet(index + 1) {
txNum := uint64(index)
missingPvtDataInfo.Add(missingDataKey.blkNum, txNum, missingDataKey.ns, missingDataKey.coll)
}

break
}

return missingPvtDataInfo, nil
}

// FetchBootKVHashes returns the KVHashes from the data that was loaded from a snapshot at the time of
// bootstrapping. This funciton returns an error if the supplied blkNum is greater than the last block
// number in the booting snapshot
Expand Down
86 changes: 86 additions & 0 deletions gossip/privdata/reconcile.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ package privdata
import (
"encoding/hex"
"fmt"
"github.com/hyperledger/fabric/internal/peer/protos"
"math"
"sync"
"time"
Expand Down Expand Up @@ -53,6 +54,9 @@ type PvtDataReconciler interface {
Start()
// Stop function stops reconciler
Stop()

//Reconcile performs on demand reconcilation a block or transaction
Reconcile(uint65 uint64) (protos.ReconcileResponse, error)
}

type Reconciler struct {
Expand All @@ -68,6 +72,25 @@ type Reconciler struct {
committer.Committer
}

var ReconcilerServiceRegistry = make(map[string]PvtDataReconciler)

// SetOnDemandReconcilerService sets a reconciler service by name
func SetOnDemandReconcilerService(name string, reconciler PvtDataReconciler) {
ReconcilerServiceRegistry[name] = reconciler
}

func GetOnDemandReconcilerService(name string) PvtDataReconciler {

if len(ReconcilerServiceRegistry) == 0 {
return nil
}
if ReconcilerServiceRegistry[name] == nil {
return nil
}

return ReconcilerServiceRegistry[name]
}

// NoOpReconciler non functional reconciler to be used
// in case reconciliation has been disabled
type NoOpReconciler struct {
Expand All @@ -82,6 +105,11 @@ func (*NoOpReconciler) Stop() {
// do nothing
}

func (*NoOpReconciler) Reconcile(num uint64) (protos.ReconcileResponse, error) {
logger.Debug("Private data reconciliation has been disabled")
return protos.ReconcileResponse{Success: false, Message: "got nil as MissingPvtDataTracker, exiting..."}, nil
}

// NewReconciler creates a new instance of reconciler
func NewReconciler(channel string, metrics *metrics.PrivdataMetrics, c committer.Committer,
fetcher ReconciliationFetcher, config *PrivdataConfig) *Reconciler {
Expand Down Expand Up @@ -111,6 +139,10 @@ func (r *Reconciler) Start() {
})
}

func (r *Reconciler) Reconcile(num uint64) (protos.ReconcileResponse, error) {
return r.reconcileSpecific(num)
}

func (r *Reconciler) run() {
for {
select {
Expand All @@ -126,6 +158,60 @@ func (r *Reconciler) run() {
}
}

// ReconcileSpecific initiates a reconcilation for specific block and returns the number of items that were reconciled , minBlock, maxBlock (blocks range) and an error
func (r *Reconciler) reconcileSpecific(num uint64) (protos.ReconcileResponse, error) {
missingPvtDataTracker, err := r.GetMissingPvtDataTracker()
if err != nil {
r.logger.Error("reconciliation error when trying to get missingPvtDataTracker:", err)
return protos.ReconcileResponse{Success: false, Message: err.Error()}, err
}
if missingPvtDataTracker == nil {
r.logger.Error("got nil as MissingPvtDataTracker, exiting...")
return protos.ReconcileResponse{Success: false, Message: "got nil as MissingPvtDataTracker, exiting..."}, nil
}
totalReconciled := 0

defer r.reportReconciliationDuration(time.Now())

// for {
missingPvtDataInfo, err := missingPvtDataTracker.GetMissingPvtDataInfoForSpecificBlock(num)
if err != nil {
r.logger.Errorf("reconciliation error when trying to get missing pvt data info for the block [%d] blocks: error %v", num, err.Error())
return protos.ReconcileResponse{Success: false, Message: err.Error()}, err
}
// if missingPvtDataInfo is nil, len will return 0
if len(missingPvtDataInfo) == 0 {
r.logger.Debug("Reconciliation cycle finished successfully. no items to reconcile")
return protos.ReconcileResponse{Success: true, Message: fmt.Sprintf("Reconciliation cycle finished successfully. nothing to reconcile for blocks range [%d]", num)}, nil
}

r.logger.Debug("got from ledger", len(missingPvtDataInfo), "blocks with missing private data, trying to reconcile...")

dig2collectionCfg, _, _ := r.getDig2CollectionConfig(missingPvtDataInfo)
fetchedData, err := r.FetchReconciledItems(dig2collectionCfg)
if err != nil {
r.logger.Error("reconciliation error when trying to fetch missing items from different peers:", err)
return protos.ReconcileResponse{Success: false, Message: err.Error()}, err
}

pvtDataToCommit := r.preparePvtDataToCommit(fetchedData.AvailableElements)
unreconciled := constructUnreconciledMissingData(dig2collectionCfg, fetchedData.AvailableElements)
pvtdataHashMismatch, err := r.CommitPvtDataOfOldBlocks(pvtDataToCommit, unreconciled)
if err != nil {
return protos.ReconcileResponse{Success: false, Message: "failed to commit private data"}, err
}
r.logMismatched(pvtdataHashMismatch)
//if minB < minBlock {
// minBlock = minB
//}
//if maxB > maxBlock {
// maxBlock = maxB
//}
totalReconciled += len(fetchedData.AvailableElements)
// }
return protos.ReconcileResponse{Success: true, Message: fmt.Sprintf("Reconciliation cycle finished successfully. reconciled %d private data keys from blocks range [%d]", totalReconciled, num)}, nil
}

// returns the number of items that were reconciled , minBlock, maxBlock (blocks range) and an error
func (r *Reconciler) reconcile() error {
missingPvtDataTracker, err := r.GetMissingPvtDataTracker()
Expand Down
4 changes: 4 additions & 0 deletions gossip/service/gossip_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,10 @@ func (g *GossipService) InitializeChannel(channelID string, ordererSource *order
reconciler = &gossipprivdata.NoOpReconciler{}
}

// reconcilerServiceRegistry := gossipprivdata.NewOnDemandReconcilerService()
// reconcilerServiceRegistry.SetOnDemandReconcilerService(channelID, reconciler)
gossipprivdata.SetOnDemandReconcilerService(channelID, reconciler)

pushAckTimeout := g.serviceConfig.PvtDataPushAckTimeout
g.privateHandlers[channelID] = privateHandler{
support: support,
Expand Down
13 changes: 9 additions & 4 deletions internal/peer/reconciler/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ func reconcileBlockCmd(cf *ReconcileCmdFactory) *cobra.Command {

func reconcileBlock(cmd *cobra.Command, args []string, rf *ReconcileCmdFactory) error {

logger.Debugf("Received the arguments for reconcilation %d", args)

if len(args) == 0 {
return fmt.Errorf("reconcile target required, block number")
}
Expand Down Expand Up @@ -59,12 +61,15 @@ func reconcileBlock(cmd *cobra.Command, args []string, rf *ReconcileCmdFactory)
}
}

response, _ := rf.DeliverClient.ReconcileSpecifiedBlock(uint64(blockNumber))
response, err := rf.DeliverClient.ReconcileSpecifiedBlock(uint64(blockNumber))

fmt.Println(response)
if err != nil {
fmt.Printf("Failed to reconcile block %d: %v\n", blockNumber, err.Error())
} else {
fmt.Println(response)
}

logger.Debugf("Received the arguments for reconcilation %d", blockNumber)
fmt.Printf("Received the arguments for reconcilation %v on channel %s\n", args, channelID)
// fmt.Printf("Received the arguments for reconcilation %v on channel %s\n", args, channelID)

err = rf.DeliverClient.Close()
if err != nil {
Expand Down

0 comments on commit a81db93

Please sign in to comment.