Skip to content

Commit

Permalink
Cherry pick [FAB-18290] Add channel name to pvtdata reconciler log ms…
Browse files Browse the repository at this point in the history
…gs (#2091)

Add channel name to gossip.privdata logger context
so that log msgs will contain channel name.

Signed-off-by: Wenjian Qiao <wenjianq@gmail.com>
  • Loading branch information
wenjianqiao authored Nov 9, 2020
1 parent 99c2d12 commit d1730da
Show file tree
Hide file tree
Showing 9 changed files with 102 additions and 87 deletions.
20 changes: 11 additions & 9 deletions gossip/privdata/coordinator.go
Original file line number Diff line number Diff line change
Expand Up @@ -124,6 +124,7 @@ type coordinator struct {
Support
store *transientstore.Store
transientBlockRetention uint64
logger util.Logger
metrics *metrics.PrivdataMetrics
pullRetryThreshold time.Duration
skipPullingInvalidTransactions bool
Expand All @@ -138,6 +139,7 @@ func NewCoordinator(mspID string, support Support, store *transientstore.Store,
store: store,
selfSignedData: selfSignedData,
transientBlockRetention: config.TransientBlockRetention,
logger: logger.With("channel", support.ChainID),
metrics: metrics,
pullRetryThreshold: config.PullRetryThreshold,
skipPullingInvalidTransactions: config.SkipPullingInvalidTransactions,
Expand All @@ -154,15 +156,15 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa
return errors.New("Block header is nil")
}

logger.Infof("[%s] Received block [%d] from buffer", c.ChainID, block.Header.Number)
c.logger.Infof("Received block [%d] from buffer", block.Header.Number)

logger.Debugf("[%s] Validating block [%d]", c.ChainID, block.Header.Number)
c.logger.Debugf("Validating block [%d]", block.Header.Number)

validationStart := time.Now()
err := c.Validator.Validate(block)
c.reportValidationDuration(time.Since(validationStart))
if err != nil {
logger.Errorf("Validation failed: %+v", err)
c.logger.Errorf("Validation failed: %+v", err)
return err
}

Expand Down Expand Up @@ -204,15 +206,15 @@ func (c *coordinator) StoreBlock(block *common.Block, privateDataSets util.PvtDa
}
pvtdataToRetrieve, err := c.getTxPvtdataInfoFromBlock(block)
if err != nil {
logger.Warningf("Failed to get private data info from block: %s", err)
c.logger.Warningf("Failed to get private data info from block: %s", err)
return err
}

// Retrieve the private data.
// RetrievePvtdata checks this peer's eligibility and then retreives from cache, transient store, or from a remote peer.
retrievedPvtdata, err := pdp.RetrievePvtdata(pvtdataToRetrieve)
if err != nil {
logger.Warningf("Failed to retrieve pvtdata: %s", err)
c.logger.Warningf("Failed to retrieve pvtdata: %s", err)
return err
}

Expand Down Expand Up @@ -266,16 +268,16 @@ func (c *coordinator) GetPvtDataAndBlockByNum(seqNum uint64, peerAuthInfo protou
}
sp, err := c.CollectionStore.RetrieveCollectionAccessPolicy(cc)
if err != nil {
logger.Warningf("Failed obtaining policy for collection criteria [%#v]: %s", cc, err)
c.logger.Warningf("Failed obtaining policy for collection criteria [%#v]: %s", cc, err)
continue
}
isAuthorized := sp.AccessFilter()
if isAuthorized == nil {
logger.Warningf("Failed obtaining filter for collection criteria [%#v]", cc)
c.logger.Warningf("Failed obtaining filter for collection criteria [%#v]", cc)
continue
}
if !isAuthorized(peerAuthInfo) {
logger.Debugf("Skipping collection criteria [%#v] because peer isn't authorized", cc)
c.logger.Debugf("Skipping collection criteria [%#v] because peer isn't authorized", cc)
continue
}
seqs2Namespaces.addCollection(uint64(seqInBlock), txPvtDataItem.WriteSet.DataModel, ns.Namespace, col)
Expand Down Expand Up @@ -322,7 +324,7 @@ func (c *coordinator) getTxPvtdataInfoFromBlock(block *common.Block) ([]*ledger.

colConfig, err := c.CollectionStore.RetrieveCollectionConfig(cc)
if err != nil {
logger.Warningf("Failed to retrieve collection config for collection criteria [%#v]: %s", cc, err)
c.logger.Warningf("Failed to retrieve collection config for collection criteria [%#v]: %s", cc, err)
return nil, err
}
col := &ledger.CollectionPvtdataInfo{
Expand Down
22 changes: 12 additions & 10 deletions gossip/privdata/dataretriever.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,14 +32,16 @@ type StorageDataRetriever interface {
}

type dataRetriever struct {
logger util.Logger
store *transientstore.Store
committer committer.Committer
}

// NewDataRetriever constructing function for implementation of the
// StorageDataRetriever interface
func NewDataRetriever(store *transientstore.Store, committer committer.Committer) StorageDataRetriever {
func NewDataRetriever(channelID string, store *transientstore.Store, committer committer.Committer) StorageDataRetriever {
return &dataRetriever{
logger: logger.With("channel", channelID),
store: store,
committer: committer,
}
Expand All @@ -54,7 +56,7 @@ func (dr *dataRetriever) CollectionRWSet(digests []*protosgossip.PvtDataDigest,
return nil, false, errors.Wrap(err, "wasn't able to read ledger height")
}
if height <= blockNum {
logger.Debug("Current ledger height ", height, "is below requested block sequence number",
dr.logger.Debug("Current ledger height ", height, "is below requested block sequence number",
blockNum, "retrieving private data from transient store")
}

Expand All @@ -68,7 +70,7 @@ func (dr *dataRetriever) CollectionRWSet(digests []*protosgossip.PvtDataDigest,
}
pvtRWSet, err := dr.fromTransientStore(dig, filter)
if err != nil {
logger.Errorf("couldn't read from transient store private read-write set, "+
dr.logger.Errorf("couldn't read from transient store private read-write set, "+
"digest %+v, because of %s", dig, err)
continue
}
Expand Down Expand Up @@ -108,7 +110,7 @@ func (dr *dataRetriever) fromLedger(digests []*protosgossip.PvtDataDigest, block
pvtRWSetWithConfig := &util.PrivateRWSetWithConfig{}
for _, data := range pvtData {
if data.WriteSet == nil {
logger.Warning("Received nil write set for collection tx in block", data.SeqInBlock, "block number", blockNum)
dr.logger.Warning("Received nil write set for collection tx in block", data.SeqInBlock, "block number", blockNum)
continue
}

Expand Down Expand Up @@ -177,25 +179,25 @@ func (dr *dataRetriever) fromTransientStore(dig *protosgossip.PvtDataDigest, fil
}
rws := res.PvtSimulationResultsWithConfig
if rws == nil {
logger.Debug("Skipping nil PvtSimulationResultsWithConfig received at block height", res.ReceivedAtBlockHeight)
dr.logger.Debug("Skipping nil PvtSimulationResultsWithConfig received at block height", res.ReceivedAtBlockHeight)
continue
}
txPvtRWSet := rws.PvtRwset
if txPvtRWSet == nil {
logger.Debug("Skipping empty PvtRwset of PvtSimulationResultsWithConfig received at block height", res.ReceivedAtBlockHeight)
dr.logger.Debug("Skipping empty PvtRwset of PvtSimulationResultsWithConfig received at block height", res.ReceivedAtBlockHeight)
continue
}

colConfigs, found := rws.CollectionConfigs[dig.Namespace]
if !found {
logger.Error("No collection config was found for chaincode", dig.Namespace, "collection name",
dr.logger.Error("No collection config was found for chaincode", dig.Namespace, "collection name",
dig.Namespace, "txID", dig.TxId)
continue
}

configs := extractCollectionConfig(colConfigs, dig.Collection)
if configs == nil {
logger.Error("No collection config was found for collection", dig.Collection,
dr.logger.Error("No collection config was found for collection", dig.Collection,
"namespace", dig.Namespace, "txID", dig.TxId)
continue
}
Expand All @@ -216,13 +218,13 @@ func (dr *dataRetriever) extractPvtRWsets(pvtRWSets []*rwset.NsPvtReadWriteSet,
for _, nsws := range pvtRWSets {
// and in each namespace - iterate over all collections
if nsws.Namespace != namespace {
logger.Debug("Received private data namespace ", nsws.Namespace, " instead of ", namespace, " skipping...")
dr.logger.Debug("Received private data namespace ", nsws.Namespace, " instead of ", namespace, " skipping...")
continue
}
for _, col := range nsws.CollectionPvtRwset {
// This isn't the collection we're looking for
if col.CollectionName != collectionName {
logger.Debug("Received private data collection ", col.CollectionName, " instead of ", collectionName, " skipping...")
dr.logger.Debug("Received private data collection ", col.CollectionName, " instead of ", collectionName, " skipping...")
continue
}
// Add the collection pRWset to the accumulated set
Expand Down
21 changes: 10 additions & 11 deletions gossip/privdata/dataretriever_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func TestNewDataRetriever_GetDataFromTransientStore(t *testing.T) {

committer.On("LedgerHeight").Return(uint64(1), nil)

retriever := NewDataRetriever(store.store, committer)
retriever := NewDataRetriever("testchannel", store.store, committer)

store.Persist(txID, 2, &transientstore.TxPvtReadWriteSetWithConfigInfo{
PvtRwset: &rwset.TxPvtReadWriteSet{
Expand Down Expand Up @@ -127,7 +127,7 @@ func TestNewDataRetriever_GetDataFromLedger(t *testing.T) {
historyRetreiver.On("MostRecentCollectionConfigBelow", mock.Anything, namespace).Return(newCollectionConfig(collectionName), nil)
committer.On("GetConfigHistoryRetriever").Return(historyRetreiver, nil)

retriever := NewDataRetriever(store.store, committer)
retriever := NewDataRetriever("testchannel", store.store, committer)

// Request digest for private data which is greater than current ledger height
// to make it query ledger for missed private data
Expand Down Expand Up @@ -173,7 +173,7 @@ func TestNewDataRetriever_FailGetPvtDataFromLedger(t *testing.T) {
committer.On("GetPvtDataByNum", uint64(5), mock.Anything).
Return(nil, errors.New("failing retrieving private data"))

retriever := NewDataRetriever(store.store, committer)
retriever := NewDataRetriever("testchannel", store.store, committer)

// Request digest for private data which is greater than current ledger height
// to make it query transient store for missed private data
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestNewDataRetriever_GetOnlyRelevantPvtData(t *testing.T) {
historyRetreiver.On("MostRecentCollectionConfigBelow", mock.Anything, namespace).Return(newCollectionConfig(collectionName), nil)
committer.On("GetConfigHistoryRetriever").Return(historyRetreiver, nil)

retriever := NewDataRetriever(store.store, committer)
retriever := NewDataRetriever("testchannel", store.store, committer)

// Request digest for private data which is greater than current ledger height
// to make it query transient store for missed private data
Expand Down Expand Up @@ -306,7 +306,7 @@ func TestNewDataRetriever_GetMultipleDigests(t *testing.T) {
historyRetreiver.On("MostRecentCollectionConfigBelow", mock.Anything, ns2).Return(newCollectionConfig(col2), nil)
committer.On("GetConfigHistoryRetriever").Return(historyRetreiver, nil)

retriever := NewDataRetriever(store.store, committer)
retriever := NewDataRetriever("testchannel", store.store, committer)

// Request digest for private data which is greater than current ledger height
// to make it query transient store for missed private data
Expand Down Expand Up @@ -381,7 +381,7 @@ func TestNewDataRetriever_EmptyWriteSet(t *testing.T) {
historyRetreiver.On("MostRecentCollectionConfigBelow", mock.Anything, ns1).Return(newCollectionConfig(col1), nil)
committer.On("GetConfigHistoryRetriever").Return(historyRetreiver, nil)

retriever := NewDataRetriever(store.store, committer)
retriever := NewDataRetriever("testchannel", store.store, committer)

rwSets, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: ns1,
Expand Down Expand Up @@ -433,7 +433,7 @@ func TestNewDataRetriever_FailedObtainConfigHistoryRetriever(t *testing.T) {
committer.On("GetPvtDataByNum", uint64(5), mock.Anything).Return(result, nil)
committer.On("GetConfigHistoryRetriever").Return(nil, errors.New("failed to obtain ConfigHistoryRetriever"))

retriever := NewDataRetriever(store.store, committer)
retriever := NewDataRetriever("testchannel", store.store, committer)

_, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: ns1,
Expand Down Expand Up @@ -488,9 +488,8 @@ func TestNewDataRetriever_NoCollectionConfig(t *testing.T) {
Return(nil, nil)
committer.On("GetConfigHistoryRetriever").Return(historyRetreiver, nil)

retriever := NewDataRetriever(store.store, committer)
retriever := NewDataRetriever("testchannel", store.store, committer)
assertion := assert.New(t)

_, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: ns1,
Collection: col1,
Expand Down Expand Up @@ -522,7 +521,7 @@ func TestNewDataRetriever_FailedGetLedgerHeight(t *testing.T) {
col1 := "testCollectionName1"

committer.On("LedgerHeight").Return(uint64(0), errors.New("failed to read ledger height"))
retriever := NewDataRetriever(store.store, committer)
retriever := NewDataRetriever("testchannel", store.store, committer)

_, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: ns1,
Expand All @@ -548,7 +547,7 @@ func TestNewDataRetriever_EmptyPvtRWSetInTransientStore(t *testing.T) {

committer.On("LedgerHeight").Return(uint64(1), nil)

retriever := NewDataRetriever(store.store, committer)
retriever := NewDataRetriever("testchannel", store.store, committer)

rwSets, _, err := retriever.CollectionRWSet([]*gossip2.PvtDataDigest{{
Namespace: namespace,
Expand Down
24 changes: 13 additions & 11 deletions gossip/privdata/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ type distributorImpl struct {
gossipAdapter
CollectionAccessFactory
pushAckTimeout time.Duration
logger util.Logger
metrics *metrics.PrivdataMetrics
}

Expand Down Expand Up @@ -130,6 +131,7 @@ func NewDistributor(chainID string, gossip gossipAdapter, factory CollectionAcce
gossipAdapter: gossip,
CollectionAccessFactory: factory,
pushAckTimeout: pushAckTimeout,
logger: logger.With("channel", chainID),
metrics: metrics,
}
}
Expand Down Expand Up @@ -157,27 +159,27 @@ func (d *distributorImpl) computeDisseminationPlan(txID string,
namespace := pvtRwset.Namespace
configPackage, found := privDataWithConfig.CollectionConfigs[namespace]
if !found {
logger.Error("Collection config package for", namespace, "chaincode is not provided")
d.logger.Error("Collection config package for", namespace, "chaincode is not provided")
return nil, errors.New(fmt.Sprint("collection config package for", namespace, "chaincode is not provided"))
}

for _, collection := range pvtRwset.CollectionPvtRwset {
colCP, err := d.getCollectionConfig(configPackage, collection)
collectionName := collection.CollectionName
if err != nil {
logger.Error("Could not find collection access policy for", namespace, " and collection", collectionName, "error", err)
d.logger.Error("Could not find collection access policy for", namespace, " and collection", collectionName, "error", err)
return nil, errors.WithMessage(err, fmt.Sprint("could not find collection access policy for", namespace, " and collection", collectionName, "error", err))
}

colAP, err := d.AccessPolicy(colCP, d.chainID)
if err != nil {
logger.Error("Could not obtain collection access policy, collection name", collectionName, "due to", err)
d.logger.Error("Could not obtain collection access policy, collection name", collectionName, "due to", err)
return nil, errors.Wrap(err, fmt.Sprint("Could not obtain collection access policy, collection name", collectionName, "due to", err))
}

colFilter := colAP.AccessFilter()
if colFilter == nil {
logger.Error("Collection access policy for", collectionName, "has no filter")
d.logger.Error("Collection access policy for", collectionName, "has no filter")
return nil, errors.Errorf("No collection access policy filter computed for %v", collectionName)
}

Expand All @@ -186,7 +188,7 @@ func (d *distributorImpl) computeDisseminationPlan(txID string,
return nil, errors.WithStack(err)
}

logger.Debugf("Computing dissemination plan for collection [%s]", collectionName)
d.logger.Debugf("Computing dissemination plan for collection [%s]", collectionName)
dPlan, err := d.disseminationPlanForMsg(colAP, colFilter, pvtDataMsg)
if err != nil {
return nil, errors.WithMessagef(err, "could not build private data dissemination plan for chaincode %s and collection %s", namespace, collectionName)
Expand Down Expand Up @@ -220,7 +222,7 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
})

if err != nil {
logger.Error("Failed to retrieve peer routing filter for channel", d.chainID, ":", err)
d.logger.Error("Failed to retrieve peer routing filter for channel", d.chainID, ":", err)
return nil, err
}

Expand Down Expand Up @@ -301,8 +303,8 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces

maximumPeerRemainingCount--
if maximumPeerRemainingCount == 0 {
logger.Debug("MaximumPeerCount satisfied")
logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug)
d.logger.Debug("MaximumPeerCount satisfied")
d.logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug)
return disseminationPlan, nil
}
}
Expand All @@ -314,7 +316,7 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
numRemainingPeersToSelect = len(remainingPeersAcrossOrgs)
}
if numRemainingPeersToSelect > 0 {
logger.Debugf("MaximumPeerCount not yet satisfied after picking one peer per org, selecting %d more peer(s) for dissemination", numRemainingPeersToSelect)
d.logger.Debugf("MaximumPeerCount not yet satisfied after picking one peer per org, selecting %d more peer(s) for dissemination", numRemainingPeersToSelect)
}
for maximumPeerRemainingCount > 0 && len(remainingPeersAcrossOrgs) > 0 {
required := 1
Expand Down Expand Up @@ -350,7 +352,7 @@ func (d *distributorImpl) disseminationPlanForMsg(colAP privdata.CollectionAcces
remainingPeersAcrossOrgs = append(remainingPeersAcrossOrgs[:selectedPeerIndex], remainingPeersAcrossOrgs[selectedPeerIndex+1:]...)
}

logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug)
d.logger.Debugf("Disseminating private RWSet for TxID [%s] namespace [%s] collection [%s] to peers: %v", m.TxId, m.Namespace, m.CollectionName, selectedPeerEndpointsForDebug)
return disseminationPlan, nil
}

Expand Down Expand Up @@ -397,7 +399,7 @@ func (d *distributorImpl) disseminate(disseminationPlan []*dissemination) error
if err != nil {
atomic.AddUint32(&failures, 1)
m := dis.msg.GetPrivateData().Payload
logger.Error("Failed disseminating private RWSet for TxID", m.TxId, ", namespace", m.Namespace, "collection", m.CollectionName, ":", err)
d.logger.Error("Failed disseminating private RWSet for TxID", m.TxId, ", namespace", m.Namespace, "collection", m.CollectionName, ":", err)
}
}(dis)
}
Expand Down
Loading

0 comments on commit d1730da

Please sign in to comment.