Skip to content

Commit

Permalink
pass unreconciled missing pvtdata to pvtdata store (backport to relea…
Browse files Browse the repository at this point in the history
…se-2.2) (#1886)

* add config for deprioritized missing data access interval

Signed-off-by: Senthil Nathan N <cendhu@gmail.com>

* minor refactoring of missing pvtdata retrieval test

Signed-off-by: Senthil Nathan N <cendhu@gmail.com>
Signed-off-by: David Enyeart <enyeart@us.ibm.com>

* pass unreconciled data to the pvtstore commit

Signed-off-by: Senthil Nathan N <cendhu@gmail.com>

Co-authored-by: Senthil Nathan N <cendhu@gmail.com>
  • Loading branch information
denyeart and cendhu authored Sep 17, 2020
1 parent ee2bc1b commit a5a6acd
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 155 deletions.
2 changes: 1 addition & 1 deletion core/ledger/kvledger/kv_ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -685,7 +685,7 @@ func (l *kvLedger) CommitPvtDataOfOldBlocks(reconciledPvtdata []*ledger.Reconcil

logger.Debugf("[%s:] Committing pvtData of [%d] old blocks to the pvtdatastore", l.ledgerID, len(reconciledPvtdata))

err = l.pvtdataStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData, nil)
err = l.pvtdataStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData, unreconciled)
if err != nil {
return nil, err
}
Expand Down
22 changes: 13 additions & 9 deletions core/ledger/kvledger/kv_ledger_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/golang/protobuf/proto"
"github.com/hyperledger/fabric-protos-go/common"
Expand Down Expand Up @@ -447,9 +448,10 @@ func TestLedgerBackup(t *testing.T) {
RootFSPath: originalPath,
StateDBConfig: &lgr.StateDBConfig{},
PrivateDataConfig: &lgr.PrivateDataConfig{
MaxBatchSize: 5000,
BatchesInterval: 1000,
PurgeInterval: 100,
MaxBatchSize: 5000,
BatchesInterval: 1000,
PurgeInterval: 100,
DeprioritizedDataReconcilerInterval: 120 * time.Minute,
},
HistoryDBConfig: &lgr.HistoryDBConfig{
Enabled: true,
Expand Down Expand Up @@ -500,9 +502,10 @@ func TestLedgerBackup(t *testing.T) {
RootFSPath: restorePath,
StateDBConfig: &lgr.StateDBConfig{},
PrivateDataConfig: &lgr.PrivateDataConfig{
MaxBatchSize: 5000,
BatchesInterval: 1000,
PurgeInterval: 100,
MaxBatchSize: 5000,
BatchesInterval: 1000,
PurgeInterval: 100,
DeprioritizedDataReconcilerInterval: 120 * time.Minute,
},
HistoryDBConfig: &lgr.HistoryDBConfig{
Enabled: true,
Expand Down Expand Up @@ -589,9 +592,10 @@ func testConfig(t *testing.T) (conf *lgr.Config, cleanup func()) {
RootFSPath: path,
StateDBConfig: &lgr.StateDBConfig{},
PrivateDataConfig: &lgr.PrivateDataConfig{
MaxBatchSize: 5000,
BatchesInterval: 1000,
PurgeInterval: 100,
MaxBatchSize: 5000,
BatchesInterval: 1000,
PurgeInterval: 100,
DeprioritizedDataReconcilerInterval: 120 * time.Minute,
},
HistoryDBConfig: &lgr.HistoryDBConfig{
Enabled: true,
Expand Down
10 changes: 6 additions & 4 deletions core/ledger/kvledger/tests/env.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"os"
"path/filepath"
"testing"
"time"

"github.com/hyperledger/fabric-protos-go/common"
"github.com/hyperledger/fabric-protos-go/peer"
Expand Down Expand Up @@ -224,7 +225,7 @@ func populateMissingsWithTestDefaults(t *testing.T, initializer *ledgermgmt.Init
initializer.MetricsProvider = &disabled.Provider{}
}

if initializer.Config == nil {
if initializer.Config == nil || initializer.Config.RootFSPath == "" {
rootPath, err := ioutil.TempDir("/tmp", "ledgersData")
if err != nil {
t.Fatalf("Failed to create root directory: %s", err)
Expand All @@ -249,9 +250,10 @@ func populateMissingsWithTestDefaults(t *testing.T, initializer *ledgermgmt.Init

if initializer.Config.PrivateDataConfig == nil {
initializer.Config.PrivateDataConfig = &ledger.PrivateDataConfig{
MaxBatchSize: 5000,
BatchesInterval: 1000,
PurgeInterval: 100,
MaxBatchSize: 5000,
BatchesInterval: 1000,
PurgeInterval: 100,
DeprioritizedDataReconcilerInterval: 120 * time.Minute,
}
}
if initializer.Config.SnapshotsConfig == nil {
Expand Down
234 changes: 159 additions & 75 deletions core/ledger/kvledger/tests/missing_pvtdata_retrieval_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,87 +8,171 @@ package tests

import (
"testing"
"time"

"github.com/hyperledger/fabric/bccsp/sw"
"github.com/hyperledger/fabric/core/container/externalbuilder"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/kvledger"
"github.com/stretchr/testify/assert"
"github.com/hyperledger/fabric/core/ledger/ledgermgmt"
"github.com/stretchr/testify/require"
)

func TestGetMissingPvtDataAfterRollback(t *testing.T) {
env := newEnv(t)
defer env.cleanup()
env.initLedgerMgmt()
h := env.newTestHelperCreateLgr("ledger1", t)

collConf := []*collConf{{name: "coll1", btl: 5}}

// deploy cc1 with 'collConf'
h.simulateDeployTx("cc1", collConf)
h.cutBlockAndCommitLegacy()

// pvtdata simulation
h.simulateDataTx("", func(s *simulator) {
s.setPvtdata("cc1", "coll1", "key1", "value1")
})
// another pvtdata simulation
h.simulateDataTx("", func(s *simulator) {
s.setPvtdata("cc1", "coll1", "key2", "value2")
})

h.causeMissingPvtData(0)
blk2 := h.cutBlockAndCommitLegacy()

h.verifyPvtState("cc1", "coll1", "key2", "value2") // key2 should have been committed
h.simulateDataTx("", func(s *simulator) {
h.assertError(s.GetPrivateData("cc1", "coll1", "key1")) // key1 would be stale with respect to hashed version
})

// verify missing pvtdata info
h.verifyBlockAndPvtDataSameAs(2, blk2)
expectedMissingPvtDataInfo := make(ledger.MissingPvtDataInfo)
expectedMissingPvtDataInfo.Add(2, 0, "cc1", "coll1")
h.verifyMissingPvtDataSameAs(2, expectedMissingPvtDataInfo)

// commit block 3
h.simulateDataTx("", func(s *simulator) {
s.setPvtdata("cc1", "coll1", "key3", "value2")
func TestGetMissingPvtData(t *testing.T) {
setup := func(h *testhelper) (*ledger.BlockAndPvtData, ledger.MissingPvtDataInfo) {
collConf := []*collConf{{
name: "coll1",
btl: 5,
}}

// deploy cc1 with 'collConf'
h.simulateDeployTx("cc1", collConf)
h.cutBlockAndCommitLegacy()

// pvtdata simulation
h.simulateDataTx("", func(s *simulator) {
s.setPvtdata("cc1", "coll1", "key1", "value1")
})
// another pvtdata simulation
h.simulateDataTx("", func(s *simulator) {
s.setPvtdata("cc1", "coll1", "key2", "value2")
})
// another pvtdata simulation
h.simulateDataTx("", func(s *simulator) {
s.setPvtdata("cc1", "coll1", "key3", "value3")
})

// two transactions are missing some pvtdata
h.causeMissingPvtData(0)
h.causeMissingPvtData(2)
blk2 := h.cutBlockAndCommitLegacy()

h.verifyPvtState("cc1", "coll1", "key2", "value2") // key2 should have been committed
h.simulateDataTx("", func(s *simulator) {
h.assertError(s.GetPrivateData("cc1", "coll1", "key1")) // key1 would be stale with respect to hashed version
})
h.simulateDataTx("", func(s *simulator) {
h.assertError(s.GetPrivateData("cc1", "coll1", "key3")) // key3 would be stale with respect to hashed version
})

// verify missing pvtdata info
h.verifyBlockAndPvtDataSameAs(2, blk2)
expectedMissingPvtDataInfo := make(ledger.MissingPvtDataInfo)
expectedMissingPvtDataInfo.Add(2, 0, "cc1", "coll1")
expectedMissingPvtDataInfo.Add(2, 2, "cc1", "coll1")
h.verifyMissingPvtDataSameAs(2, expectedMissingPvtDataInfo)

return blk2, expectedMissingPvtDataInfo
}

t.Run("get missing data after rollback", func(t *testing.T) {
env := newEnv(t)
defer env.cleanup()
env.initLedgerMgmt()
h := env.newTestHelperCreateLgr("ledger1", t)

blk, expectedMissingPvtDataInfo := setup(h)

// verify missing pvtdata info
require.Equal(t, uint64(2), blk.Block.Header.Number)
h.verifyBlockAndPvtDataSameAs(2, blk)
h.verifyMissingPvtDataSameAs(int(2), expectedMissingPvtDataInfo)

// commit block 3
h.simulateDataTx("", func(s *simulator) {
s.setPvtdata("cc1", "coll1", "key4", "value4")
})
blk3 := h.cutBlockAndCommitLegacy()

// commit block 4
h.simulateDataTx("", func(s *simulator) {
s.setPvtdata("cc1", "coll1", "key5", "value5")
})
blk4 := h.cutBlockAndCommitLegacy()

// verify missing pvtdata info
h.verifyMissingPvtDataSameAs(5, expectedMissingPvtDataInfo)

// rollback ledger to block 2
h.verifyLedgerHeight(5)
env.closeLedgerMgmt()
err := kvledger.RollbackKVLedger(env.initializer.Config.RootFSPath, "ledger1", 2)
require.NoError(t, err)
env.initLedgerMgmt()

h = env.newTestHelperOpenLgr("ledger1", t)
h.verifyLedgerHeight(3)

// verify block & pvtdata
h.verifyBlockAndPvtDataSameAs(2, blk)
// when the pvtdata store is ahead of blockstore,
// missing pvtdata info for block 2 would not be returned.
h.verifyMissingPvtDataSameAs(5, nil)

// recommit block 3
require.NoError(t, h.lgr.CommitLegacy(blk3, &ledger.CommitOptions{}))
// when the pvtdata store is ahead of blockstore,
// missing pvtdata info for block 2 would not be returned.
h.verifyMissingPvtDataSameAs(5, nil)

// recommit block 4
require.NoError(t, h.lgr.CommitLegacy(blk4, &ledger.CommitOptions{}))
// once the pvtdata store and blockstore becomes equal,
// missing pvtdata info for block 2 would be returned.
h.verifyMissingPvtDataSameAs(5, expectedMissingPvtDataInfo)
})
blk3 := h.cutBlockAndCommitLegacy()

// commit block 4
h.simulateDataTx("", func(s *simulator) {
s.setPvtdata("cc1", "coll1", "key3", "value2")
t.Run("get deprioritized missing data", func(t *testing.T) {
cryptoProvider, err := sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore())
require.NoError(t, err)

initializer := &ledgermgmt.Initializer{
Config: &ledger.Config{
PrivateDataConfig: &ledger.PrivateDataConfig{
MaxBatchSize: 5000,
BatchesInterval: 1000,
PurgeInterval: 100,
DeprioritizedDataReconcilerInterval: 120 * time.Minute,
},
},
HashProvider: cryptoProvider,
EbMetadataProvider: &externalbuilder.MetadataProvider{
DurablePath: "testdata",
},
}
env := newEnvWithInitializer(t, initializer)
defer env.cleanup()
env.initLedgerMgmt()
h := env.newTestHelperCreateLgr("ledger1", t)

blk, expectedMissingPvtDataInfo := setup(h)

// verify missing pvtdata info
require.Equal(t, uint64(2), blk.Block.Header.Number)
h.verifyBlockAndPvtDataSameAs(2, blk)
h.verifyMissingPvtDataSameAs(int(2), expectedMissingPvtDataInfo)

h.commitPvtDataOfOldBlocks(nil, expectedMissingPvtDataInfo)
for i := 0; i < 5; i++ {
h.verifyMissingPvtDataSameAs(int(2), ledger.MissingPvtDataInfo{})
}

env.closeLedgerMgmt()
env.initializer.Config.PrivateDataConfig.DeprioritizedDataReconcilerInterval = 0 * time.Second
env.initLedgerMgmt()

h = env.newTestHelperOpenLgr("ledger1", t)
for i := 0; i < 5; i++ {
h.verifyMissingPvtDataSameAs(int(2), expectedMissingPvtDataInfo)
}

env.closeLedgerMgmt()
env.initializer.Config.PrivateDataConfig.DeprioritizedDataReconcilerInterval = 120 * time.Minute
env.initLedgerMgmt()

h = env.newTestHelperOpenLgr("ledger1", t)
for i := 0; i < 5; i++ {
h.verifyMissingPvtDataSameAs(int(2), ledger.MissingPvtDataInfo{})
}
})
blk4 := h.cutBlockAndCommitLegacy()

// verify missing pvtdata info
h.verifyMissingPvtDataSameAs(5, expectedMissingPvtDataInfo)

// rollback ledger to block 2
h.verifyLedgerHeight(5)
env.closeLedgerMgmt()
err := kvledger.RollbackKVLedger(env.initializer.Config.RootFSPath, "ledger1", 2)
assert.NoError(t, err)
env.initLedgerMgmt()

h = env.newTestHelperOpenLgr("ledger1", t)
h.verifyLedgerHeight(3)

// verify block & pvtdata
h.verifyBlockAndPvtDataSameAs(2, blk2)
// when the pvtdata store is ahead of blockstore,
// missing pvtdata info for block 2 would not be returned.
h.verifyMissingPvtDataSameAs(5, nil)

// recommit block 3
assert.NoError(t, h.lgr.CommitLegacy(blk3, &ledger.CommitOptions{}))
// when the pvtdata store is ahead of blockstore,
// missing pvtdata info for block 2 would not be returned.
h.verifyMissingPvtDataSameAs(5, nil)

// recommit block 4
assert.NoError(t, h.lgr.CommitLegacy(blk4, &ledger.CommitOptions{}))
// once the pvtdata store and blockstore becomes equal,
// missing pvtdata info for block 2 would be returned.
h.verifyMissingPvtDataSameAs(5, expectedMissingPvtDataInfo)
}
4 changes: 4 additions & 0 deletions core/ledger/kvledger/tests/test_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,10 @@ func (h *testhelper) cutBlockAndCommitExpectError() (*ledger.BlockAndPvtData, er
return h.committer.cutBlockAndCommitExpectError(h.simulatedTrans, h.missingPvtData)
}

func (h *testhelper) commitPvtDataOfOldBlocks(blocksPvtData []*ledger.ReconciledPvtdata, unreconciled ledger.MissingPvtDataInfo) ([]*ledger.PvtdataHashMismatch, error) {
return h.lgr.CommitPvtDataOfOldBlocks(blocksPvtData, unreconciled)
}

// assertError is a helper function that can be called as assertError(f()) where 'f' is some other function
// this function assumes that the last return type of function 'f' is of type 'error'
func (h *testhelper) assertError(output ...interface{}) {
Expand Down
8 changes: 8 additions & 0 deletions core/ledger/ledger_interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,14 @@ type PrivateDataConfig struct {
// PurgeInterval is the number of blocks to wait until purging expired
// private data entries.
PurgeInterval int
// The missing data entries are classified into three categories:
// (1) eligible prioritized
// (2) eligible deprioritized
// (3) ineligible
// The reconciler would fetch the eligible prioritized missing data
// from other peers. A chance for eligible deprioritized missing data
// would be given after every DeprioritizedDataReconcilerInterval
DeprioritizedDataReconcilerInterval time.Duration
}

// HistoryDBConfig is a structure used to configure the transaction history database.
Expand Down
Loading

0 comments on commit a5a6acd

Please sign in to comment.