diff --git a/core/ledger/kvledger/kv_ledger.go b/core/ledger/kvledger/kv_ledger.go index bd327a66844..e10ac3bdc3e 100644 --- a/core/ledger/kvledger/kv_ledger.go +++ b/core/ledger/kvledger/kv_ledger.go @@ -685,7 +685,7 @@ func (l *kvLedger) CommitPvtDataOfOldBlocks(reconciledPvtdata []*ledger.Reconcil logger.Debugf("[%s:] Committing pvtData of [%d] old blocks to the pvtdatastore", l.ledgerID, len(reconciledPvtdata)) - err = l.pvtdataStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData, nil) + err = l.pvtdataStore.CommitPvtDataOfOldBlocks(hashVerifiedPvtData, unreconciled) if err != nil { return nil, err } diff --git a/core/ledger/kvledger/kv_ledger_provider_test.go b/core/ledger/kvledger/kv_ledger_provider_test.go index f254a413122..1c4b039d0df 100644 --- a/core/ledger/kvledger/kv_ledger_provider_test.go +++ b/core/ledger/kvledger/kv_ledger_provider_test.go @@ -12,6 +12,7 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/golang/protobuf/proto" "github.com/hyperledger/fabric-protos-go/common" @@ -447,9 +448,10 @@ func TestLedgerBackup(t *testing.T) { RootFSPath: originalPath, StateDBConfig: &lgr.StateDBConfig{}, PrivateDataConfig: &lgr.PrivateDataConfig{ - MaxBatchSize: 5000, - BatchesInterval: 1000, - PurgeInterval: 100, + MaxBatchSize: 5000, + BatchesInterval: 1000, + PurgeInterval: 100, + DeprioritizedDataReconcilerInterval: 120 * time.Minute, }, HistoryDBConfig: &lgr.HistoryDBConfig{ Enabled: true, @@ -500,9 +502,10 @@ func TestLedgerBackup(t *testing.T) { RootFSPath: restorePath, StateDBConfig: &lgr.StateDBConfig{}, PrivateDataConfig: &lgr.PrivateDataConfig{ - MaxBatchSize: 5000, - BatchesInterval: 1000, - PurgeInterval: 100, + MaxBatchSize: 5000, + BatchesInterval: 1000, + PurgeInterval: 100, + DeprioritizedDataReconcilerInterval: 120 * time.Minute, }, HistoryDBConfig: &lgr.HistoryDBConfig{ Enabled: true, @@ -589,9 +592,10 @@ func testConfig(t *testing.T) (conf *lgr.Config, cleanup func()) { RootFSPath: path, StateDBConfig: &lgr.StateDBConfig{}, PrivateDataConfig: &lgr.PrivateDataConfig{ - MaxBatchSize: 5000, - BatchesInterval: 1000, - PurgeInterval: 100, + MaxBatchSize: 5000, + BatchesInterval: 1000, + PurgeInterval: 100, + DeprioritizedDataReconcilerInterval: 120 * time.Minute, }, HistoryDBConfig: &lgr.HistoryDBConfig{ Enabled: true, diff --git a/core/ledger/kvledger/tests/env.go b/core/ledger/kvledger/tests/env.go index 199519de6ce..2f29e7c7821 100644 --- a/core/ledger/kvledger/tests/env.go +++ b/core/ledger/kvledger/tests/env.go @@ -11,6 +11,7 @@ import ( "os" "path/filepath" "testing" + "time" "github.com/hyperledger/fabric-protos-go/common" "github.com/hyperledger/fabric-protos-go/peer" @@ -224,7 +225,7 @@ func populateMissingsWithTestDefaults(t *testing.T, initializer *ledgermgmt.Init initializer.MetricsProvider = &disabled.Provider{} } - if initializer.Config == nil { + if initializer.Config == nil || initializer.Config.RootFSPath == "" { rootPath, err := ioutil.TempDir("/tmp", "ledgersData") if err != nil { t.Fatalf("Failed to create root directory: %s", err) @@ -249,9 +250,10 @@ func populateMissingsWithTestDefaults(t *testing.T, initializer *ledgermgmt.Init if initializer.Config.PrivateDataConfig == nil { initializer.Config.PrivateDataConfig = &ledger.PrivateDataConfig{ - MaxBatchSize: 5000, - BatchesInterval: 1000, - PurgeInterval: 100, + MaxBatchSize: 5000, + BatchesInterval: 1000, + PurgeInterval: 100, + DeprioritizedDataReconcilerInterval: 120 * time.Minute, } } if initializer.Config.SnapshotsConfig == nil { diff --git a/core/ledger/kvledger/tests/missing_pvtdata_retrieval_test.go b/core/ledger/kvledger/tests/missing_pvtdata_retrieval_test.go index ca932e183db..3425535e741 100644 --- a/core/ledger/kvledger/tests/missing_pvtdata_retrieval_test.go +++ b/core/ledger/kvledger/tests/missing_pvtdata_retrieval_test.go @@ -8,87 +8,171 @@ package tests import ( "testing" + "time" + "github.com/hyperledger/fabric/bccsp/sw" + "github.com/hyperledger/fabric/core/container/externalbuilder" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/kvledger" - "github.com/stretchr/testify/assert" + "github.com/hyperledger/fabric/core/ledger/ledgermgmt" + "github.com/stretchr/testify/require" ) -func TestGetMissingPvtDataAfterRollback(t *testing.T) { - env := newEnv(t) - defer env.cleanup() - env.initLedgerMgmt() - h := env.newTestHelperCreateLgr("ledger1", t) - - collConf := []*collConf{{name: "coll1", btl: 5}} - - // deploy cc1 with 'collConf' - h.simulateDeployTx("cc1", collConf) - h.cutBlockAndCommitLegacy() - - // pvtdata simulation - h.simulateDataTx("", func(s *simulator) { - s.setPvtdata("cc1", "coll1", "key1", "value1") - }) - // another pvtdata simulation - h.simulateDataTx("", func(s *simulator) { - s.setPvtdata("cc1", "coll1", "key2", "value2") - }) - - h.causeMissingPvtData(0) - blk2 := h.cutBlockAndCommitLegacy() - - h.verifyPvtState("cc1", "coll1", "key2", "value2") // key2 should have been committed - h.simulateDataTx("", func(s *simulator) { - h.assertError(s.GetPrivateData("cc1", "coll1", "key1")) // key1 would be stale with respect to hashed version - }) - - // verify missing pvtdata info - h.verifyBlockAndPvtDataSameAs(2, blk2) - expectedMissingPvtDataInfo := make(ledger.MissingPvtDataInfo) - expectedMissingPvtDataInfo.Add(2, 0, "cc1", "coll1") - h.verifyMissingPvtDataSameAs(2, expectedMissingPvtDataInfo) - - // commit block 3 - h.simulateDataTx("", func(s *simulator) { - s.setPvtdata("cc1", "coll1", "key3", "value2") +func TestGetMissingPvtData(t *testing.T) { + setup := func(h *testhelper) (*ledger.BlockAndPvtData, ledger.MissingPvtDataInfo) { + collConf := []*collConf{{ + name: "coll1", + btl: 5, + }} + + // deploy cc1 with 'collConf' + h.simulateDeployTx("cc1", collConf) + h.cutBlockAndCommitLegacy() + + // pvtdata simulation + h.simulateDataTx("", func(s *simulator) { + s.setPvtdata("cc1", "coll1", "key1", "value1") + }) + // another pvtdata simulation + h.simulateDataTx("", func(s *simulator) { + s.setPvtdata("cc1", "coll1", "key2", "value2") + }) + // another pvtdata simulation + h.simulateDataTx("", func(s *simulator) { + s.setPvtdata("cc1", "coll1", "key3", "value3") + }) + + // two transactions are missing some pvtdata + h.causeMissingPvtData(0) + h.causeMissingPvtData(2) + blk2 := h.cutBlockAndCommitLegacy() + + h.verifyPvtState("cc1", "coll1", "key2", "value2") // key2 should have been committed + h.simulateDataTx("", func(s *simulator) { + h.assertError(s.GetPrivateData("cc1", "coll1", "key1")) // key1 would be stale with respect to hashed version + }) + h.simulateDataTx("", func(s *simulator) { + h.assertError(s.GetPrivateData("cc1", "coll1", "key3")) // key3 would be stale with respect to hashed version + }) + + // verify missing pvtdata info + h.verifyBlockAndPvtDataSameAs(2, blk2) + expectedMissingPvtDataInfo := make(ledger.MissingPvtDataInfo) + expectedMissingPvtDataInfo.Add(2, 0, "cc1", "coll1") + expectedMissingPvtDataInfo.Add(2, 2, "cc1", "coll1") + h.verifyMissingPvtDataSameAs(2, expectedMissingPvtDataInfo) + + return blk2, expectedMissingPvtDataInfo + } + + t.Run("get missing data after rollback", func(t *testing.T) { + env := newEnv(t) + defer env.cleanup() + env.initLedgerMgmt() + h := env.newTestHelperCreateLgr("ledger1", t) + + blk, expectedMissingPvtDataInfo := setup(h) + + // verify missing pvtdata info + require.Equal(t, uint64(2), blk.Block.Header.Number) + h.verifyBlockAndPvtDataSameAs(2, blk) + h.verifyMissingPvtDataSameAs(int(2), expectedMissingPvtDataInfo) + + // commit block 3 + h.simulateDataTx("", func(s *simulator) { + s.setPvtdata("cc1", "coll1", "key4", "value4") + }) + blk3 := h.cutBlockAndCommitLegacy() + + // commit block 4 + h.simulateDataTx("", func(s *simulator) { + s.setPvtdata("cc1", "coll1", "key5", "value5") + }) + blk4 := h.cutBlockAndCommitLegacy() + + // verify missing pvtdata info + h.verifyMissingPvtDataSameAs(5, expectedMissingPvtDataInfo) + + // rollback ledger to block 2 + h.verifyLedgerHeight(5) + env.closeLedgerMgmt() + err := kvledger.RollbackKVLedger(env.initializer.Config.RootFSPath, "ledger1", 2) + require.NoError(t, err) + env.initLedgerMgmt() + + h = env.newTestHelperOpenLgr("ledger1", t) + h.verifyLedgerHeight(3) + + // verify block & pvtdata + h.verifyBlockAndPvtDataSameAs(2, blk) + // when the pvtdata store is ahead of blockstore, + // missing pvtdata info for block 2 would not be returned. + h.verifyMissingPvtDataSameAs(5, nil) + + // recommit block 3 + require.NoError(t, h.lgr.CommitLegacy(blk3, &ledger.CommitOptions{})) + // when the pvtdata store is ahead of blockstore, + // missing pvtdata info for block 2 would not be returned. + h.verifyMissingPvtDataSameAs(5, nil) + + // recommit block 4 + require.NoError(t, h.lgr.CommitLegacy(blk4, &ledger.CommitOptions{})) + // once the pvtdata store and blockstore becomes equal, + // missing pvtdata info for block 2 would be returned. + h.verifyMissingPvtDataSameAs(5, expectedMissingPvtDataInfo) }) - blk3 := h.cutBlockAndCommitLegacy() - // commit block 4 - h.simulateDataTx("", func(s *simulator) { - s.setPvtdata("cc1", "coll1", "key3", "value2") + t.Run("get deprioritized missing data", func(t *testing.T) { + cryptoProvider, err := sw.NewDefaultSecurityLevelWithKeystore(sw.NewDummyKeyStore()) + require.NoError(t, err) + + initializer := &ledgermgmt.Initializer{ + Config: &ledger.Config{ + PrivateDataConfig: &ledger.PrivateDataConfig{ + MaxBatchSize: 5000, + BatchesInterval: 1000, + PurgeInterval: 100, + DeprioritizedDataReconcilerInterval: 120 * time.Minute, + }, + }, + HashProvider: cryptoProvider, + EbMetadataProvider: &externalbuilder.MetadataProvider{ + DurablePath: "testdata", + }, + } + env := newEnvWithInitializer(t, initializer) + defer env.cleanup() + env.initLedgerMgmt() + h := env.newTestHelperCreateLgr("ledger1", t) + + blk, expectedMissingPvtDataInfo := setup(h) + + // verify missing pvtdata info + require.Equal(t, uint64(2), blk.Block.Header.Number) + h.verifyBlockAndPvtDataSameAs(2, blk) + h.verifyMissingPvtDataSameAs(int(2), expectedMissingPvtDataInfo) + + h.commitPvtDataOfOldBlocks(nil, expectedMissingPvtDataInfo) + for i := 0; i < 5; i++ { + h.verifyMissingPvtDataSameAs(int(2), ledger.MissingPvtDataInfo{}) + } + + env.closeLedgerMgmt() + env.initializer.Config.PrivateDataConfig.DeprioritizedDataReconcilerInterval = 0 * time.Second + env.initLedgerMgmt() + + h = env.newTestHelperOpenLgr("ledger1", t) + for i := 0; i < 5; i++ { + h.verifyMissingPvtDataSameAs(int(2), expectedMissingPvtDataInfo) + } + + env.closeLedgerMgmt() + env.initializer.Config.PrivateDataConfig.DeprioritizedDataReconcilerInterval = 120 * time.Minute + env.initLedgerMgmt() + + h = env.newTestHelperOpenLgr("ledger1", t) + for i := 0; i < 5; i++ { + h.verifyMissingPvtDataSameAs(int(2), ledger.MissingPvtDataInfo{}) + } }) - blk4 := h.cutBlockAndCommitLegacy() - - // verify missing pvtdata info - h.verifyMissingPvtDataSameAs(5, expectedMissingPvtDataInfo) - - // rollback ledger to block 2 - h.verifyLedgerHeight(5) - env.closeLedgerMgmt() - err := kvledger.RollbackKVLedger(env.initializer.Config.RootFSPath, "ledger1", 2) - assert.NoError(t, err) - env.initLedgerMgmt() - - h = env.newTestHelperOpenLgr("ledger1", t) - h.verifyLedgerHeight(3) - - // verify block & pvtdata - h.verifyBlockAndPvtDataSameAs(2, blk2) - // when the pvtdata store is ahead of blockstore, - // missing pvtdata info for block 2 would not be returned. - h.verifyMissingPvtDataSameAs(5, nil) - - // recommit block 3 - assert.NoError(t, h.lgr.CommitLegacy(blk3, &ledger.CommitOptions{})) - // when the pvtdata store is ahead of blockstore, - // missing pvtdata info for block 2 would not be returned. - h.verifyMissingPvtDataSameAs(5, nil) - - // recommit block 4 - assert.NoError(t, h.lgr.CommitLegacy(blk4, &ledger.CommitOptions{})) - // once the pvtdata store and blockstore becomes equal, - // missing pvtdata info for block 2 would be returned. - h.verifyMissingPvtDataSameAs(5, expectedMissingPvtDataInfo) } diff --git a/core/ledger/kvledger/tests/test_helper.go b/core/ledger/kvledger/tests/test_helper.go index 57d41c87428..2bff50573b5 100644 --- a/core/ledger/kvledger/tests/test_helper.go +++ b/core/ledger/kvledger/tests/test_helper.go @@ -64,6 +64,10 @@ func (h *testhelper) cutBlockAndCommitExpectError() (*ledger.BlockAndPvtData, er return h.committer.cutBlockAndCommitExpectError(h.simulatedTrans, h.missingPvtData) } +func (h *testhelper) commitPvtDataOfOldBlocks(blocksPvtData []*ledger.ReconciledPvtdata, unreconciled ledger.MissingPvtDataInfo) ([]*ledger.PvtdataHashMismatch, error) { + return h.lgr.CommitPvtDataOfOldBlocks(blocksPvtData, unreconciled) +} + // assertError is a helper function that can be called as assertError(f()) where 'f' is some other function // this function assumes that the last return type of function 'f' is of type 'error' func (h *testhelper) assertError(output ...interface{}) { diff --git a/core/ledger/ledger_interface.go b/core/ledger/ledger_interface.go index dc1f69b2392..cadb485b92b 100644 --- a/core/ledger/ledger_interface.go +++ b/core/ledger/ledger_interface.go @@ -108,6 +108,14 @@ type PrivateDataConfig struct { // PurgeInterval is the number of blocks to wait until purging expired // private data entries. PurgeInterval int + // The missing data entries are classified into three categories: + // (1) eligible prioritized + // (2) eligible deprioritized + // (3) ineligible + // The reconciler would fetch the eligible prioritized missing data + // from other peers. A chance for eligible deprioritized missing data + // would be given after every DeprioritizedDataReconcilerInterval + DeprioritizedDataReconcilerInterval time.Duration } // HistoryDBConfig is a structure used to configure the transaction history database. diff --git a/core/ledger/pvtdatastorage/store.go b/core/ledger/pvtdatastorage/store.go index 3b57b66de4b..a4c2a9ce7c1 100644 --- a/core/ledger/pvtdatastorage/store.go +++ b/core/ledger/pvtdatastorage/store.go @@ -23,15 +23,6 @@ import ( var ( logger = flogging.MustGetLogger("pvtdatastorage") - // The missing data entries are classified into three categories: - // (1) eligible prioritized - // (2) eligible deprioritized - // (3) ineligible - // The reconciler would fetch the eligible prioritized missing data - // from other peers. A chance for eligible deprioritized missing data - // would be given after giving deprioritizedMissingDataPeriodicity number - // of chances to the eligible prioritized missing data - deprioritizedMissingDataPeriodicity = 100 ) // Provider provides handle to specific 'Store' that in turn manages @@ -75,7 +66,8 @@ type Store struct { // recovery operation. isLastUpdatedOldBlocksSet bool - iterSinceDeprioMissingDataAccess int + deprioritizedDataReconcilerInterval time.Duration + accessDeprioMissingDataAfter time.Time } type blkTranNumKey []byte @@ -139,11 +131,13 @@ func NewProvider(conf *PrivateDataConfig) (*Provider, error) { func (p *Provider) OpenStore(ledgerid string) (*Store, error) { dbHandle := p.dbProvider.GetDBHandle(ledgerid) s := &Store{ - db: dbHandle, - ledgerid: ledgerid, - batchesInterval: p.pvtData.BatchesInterval, - maxBatchSize: p.pvtData.MaxBatchSize, - purgeInterval: uint64(p.pvtData.PurgeInterval), + db: dbHandle, + ledgerid: ledgerid, + batchesInterval: p.pvtData.BatchesInterval, + maxBatchSize: p.pvtData.MaxBatchSize, + purgeInterval: uint64(p.pvtData.PurgeInterval), + deprioritizedDataReconcilerInterval: p.pvtData.DeprioritizedDataReconcilerInterval, + accessDeprioMissingDataAfter: time.Now().Add(p.pvtData.DeprioritizedDataReconcilerInterval), collElgProcSync: &collElgProcSync{ notification: make(chan bool, 1), procComplete: make(chan bool, 1), @@ -421,13 +415,12 @@ func (s *Store) GetMissingPvtDataInfoForMostRecentBlocks(maxBlock int) (ledger.M return nil, nil } - if s.iterSinceDeprioMissingDataAccess == deprioritizedMissingDataPeriodicity { - s.iterSinceDeprioMissingDataAccess = 0 + if time.Now().After(s.accessDeprioMissingDataAfter) { + s.accessDeprioMissingDataAfter = time.Now().Add(s.deprioritizedDataReconcilerInterval) logger.Debug("fetching missing pvtdata entries from the deprioritized list") return s.getMissingData(elgDeprioritizedMissingDataGroup, maxBlock) } - s.iterSinceDeprioMissingDataAccess++ logger.Debug("fetching missing pvtdata entries from the prioritized list") return s.getMissingData(elgPrioritizedMissingDataGroup, maxBlock) } diff --git a/core/ledger/pvtdatastorage/store_test.go b/core/ledger/pvtdatastorage/store_test.go index de3658c7791..b5beea98d4a 100644 --- a/core/ledger/pvtdatastorage/store_test.go +++ b/core/ledger/pvtdatastorage/store_test.go @@ -245,21 +245,14 @@ func TestGetMissingDataInfo(t *testing.T) { }, } - defaultVal := deprioritizedMissingDataPeriodicity - deprioritizedMissingDataPeriodicity = 10 - defer func() { - deprioritizedMissingDataPeriodicity = defaultVal - }() - - for i := 1; i <= 55; i++ { - if i%11 == 0 { - // after ever 10 iterations of accessing the prioritized list, the - // deprioritized list would be accessed - assertMissingDataInfo(t, store, expectedDeprioMissingDataInfo, 2) - continue - } - assertMissingDataInfo(t, store, expectedPrioMissingDataInfo, 2) - } + assertMissingDataInfo(t, store, expectedPrioMissingDataInfo, 2) + + store.accessDeprioMissingDataAfter = time.Now() + expectedNextAccessDeprioMissingDataTime := time.Now().Add(store.deprioritizedDataReconcilerInterval) + assertMissingDataInfo(t, store, expectedDeprioMissingDataInfo, 2) + + require.True(t, store.accessDeprioMissingDataAfter.After(expectedNextAccessDeprioMissingDataTime)) + assertMissingDataInfo(t, store, expectedPrioMissingDataInfo, 2) } func TestExpiryDataNotIncluded(t *testing.T) { diff --git a/core/ledger/pvtdatastorage/test_exports.go b/core/ledger/pvtdatastorage/test_exports.go index 8a4e4da6fef..6e468f96cec 100644 --- a/core/ledger/pvtdatastorage/test_exports.go +++ b/core/ledger/pvtdatastorage/test_exports.go @@ -10,6 +10,7 @@ import ( "io/ioutil" "os" "testing" + "time" "github.com/hyperledger/fabric/core/ledger" "github.com/hyperledger/fabric/core/ledger/pvtdatapolicy" @@ -20,9 +21,10 @@ import ( func pvtDataConf() *PrivateDataConfig { return &PrivateDataConfig{ PrivateDataConfig: &ledger.PrivateDataConfig{ - BatchesInterval: 1000, - MaxBatchSize: 5000, - PurgeInterval: 2, + BatchesInterval: 1000, + MaxBatchSize: 5000, + PurgeInterval: 2, + DeprioritizedDataReconcilerInterval: 120 * time.Minute, }, StorePath: "", } diff --git a/internal/peer/node/config.go b/internal/peer/node/config.go index 6f44fc808b9..4cf3e3fa0e6 100644 --- a/internal/peer/node/config.go +++ b/internal/peer/node/config.go @@ -7,7 +7,9 @@ SPDX-License-Identifier: Apache-2.0 package node import ( + "fmt" "path/filepath" + "time" coreconfig "github.com/hyperledger/fabric/core/config" "github.com/hyperledger/fabric/core/ledger" @@ -40,6 +42,11 @@ func ledgerConfig() *ledger.Config { if viper.IsSet("ledger.pvtdataStore.purgeInterval") { purgeInterval = viper.GetInt("ledger.pvtdataStore.purgeInterval") } + deprioritizedDataReconcilerInterval := 60 * time.Minute + if viper.IsSet("ledger.pvtdataStore.deprioritizedDataReconcilerInterval") { + deprioritizedDataReconcilerInterval = viper.GetDuration("ledger.pvtdataStore.deprioritizedDataReconcilerInterval") + fmt.Println(deprioritizedDataReconcilerInterval) + } rootFSPath := filepath.Join(coreconfig.GetPath("peer.fileSystemPath"), "ledgersData") snapshotsRootDir := viper.GetString("ledger.snapshots.rootDir") @@ -53,9 +60,10 @@ func ledgerConfig() *ledger.Config { CouchDB: &ledger.CouchDBConfig{}, }, PrivateDataConfig: &ledger.PrivateDataConfig{ - MaxBatchSize: collElgProcMaxDbBatchSize, - BatchesInterval: collElgProcDbBatchesInterval, - PurgeInterval: purgeInterval, + MaxBatchSize: collElgProcMaxDbBatchSize, + BatchesInterval: collElgProcDbBatchesInterval, + PurgeInterval: purgeInterval, + DeprioritizedDataReconcilerInterval: deprioritizedDataReconcilerInterval, }, HistoryDBConfig: &ledger.HistoryDBConfig{ Enabled: viper.GetBool("ledger.history.enableHistoryDatabase"), diff --git a/internal/peer/node/config_test.go b/internal/peer/node/config_test.go index 756c1034ff4..b0b8a6c4127 100644 --- a/internal/peer/node/config_test.go +++ b/internal/peer/node/config_test.go @@ -35,9 +35,10 @@ func TestLedgerConfig(t *testing.T) { CouchDB: &ledger.CouchDBConfig{}, }, PrivateDataConfig: &ledger.PrivateDataConfig{ - MaxBatchSize: 5000, - BatchesInterval: 1000, - PurgeInterval: 100, + MaxBatchSize: 5000, + BatchesInterval: 1000, + PurgeInterval: 100, + DeprioritizedDataReconcilerInterval: 60 * time.Minute, }, HistoryDBConfig: &ledger.HistoryDBConfig{ Enabled: false, @@ -81,9 +82,10 @@ func TestLedgerConfig(t *testing.T) { }, }, PrivateDataConfig: &ledger.PrivateDataConfig{ - MaxBatchSize: 5000, - BatchesInterval: 1000, - PurgeInterval: 100, + MaxBatchSize: 5000, + BatchesInterval: 1000, + PurgeInterval: 100, + DeprioritizedDataReconcilerInterval: 60 * time.Minute, }, HistoryDBConfig: &ledger.HistoryDBConfig{ Enabled: false, @@ -96,24 +98,25 @@ func TestLedgerConfig(t *testing.T) { { name: "CouchDB Explicit", config: map[string]interface{}{ - "peer.fileSystemPath": "/peerfs", - "ledger.state.stateDatabase": "CouchDB", - "ledger.state.couchDBConfig.couchDBAddress": "localhost:5984", - "ledger.state.couchDBConfig.username": "username", - "ledger.state.couchDBConfig.password": "password", - "ledger.state.couchDBConfig.maxRetries": 3, - "ledger.state.couchDBConfig.maxRetriesOnStartup": 10, - "ledger.state.couchDBConfig.requestTimeout": "30s", - "ledger.state.couchDBConfig.internalQueryLimit": 500, - "ledger.state.couchDBConfig.maxBatchUpdateSize": 600, - "ledger.state.couchDBConfig.warmIndexesAfterNBlocks": 5, - "ledger.state.couchDBConfig.createGlobalChangesDB": true, - "ledger.state.couchDBConfig.cacheSize": 64, - "ledger.pvtdataStore.collElgProcMaxDbBatchSize": 50000, - "ledger.pvtdataStore.collElgProcDbBatchesInterval": 10000, - "ledger.pvtdataStore.purgeInterval": 1000, - "ledger.history.enableHistoryDatabase": true, - "ledger.snapshots.rootDir": "/peerfs/snapshots", + "peer.fileSystemPath": "/peerfs", + "ledger.state.stateDatabase": "CouchDB", + "ledger.state.couchDBConfig.couchDBAddress": "localhost:5984", + "ledger.state.couchDBConfig.username": "username", + "ledger.state.couchDBConfig.password": "password", + "ledger.state.couchDBConfig.maxRetries": 3, + "ledger.state.couchDBConfig.maxRetriesOnStartup": 10, + "ledger.state.couchDBConfig.requestTimeout": "30s", + "ledger.state.couchDBConfig.internalQueryLimit": 500, + "ledger.state.couchDBConfig.maxBatchUpdateSize": 600, + "ledger.state.couchDBConfig.warmIndexesAfterNBlocks": 5, + "ledger.state.couchDBConfig.createGlobalChangesDB": true, + "ledger.state.couchDBConfig.cacheSize": 64, + "ledger.pvtdataStore.collElgProcMaxDbBatchSize": 50000, + "ledger.pvtdataStore.collElgProcDbBatchesInterval": 10000, + "ledger.pvtdataStore.purgeInterval": 1000, + "ledger.pvtdataStore.deprioritizedDataReconcilerInterval": "60m", + "ledger.history.enableHistoryDatabase": true, + "ledger.snapshots.rootDir": "/peerfs/snapshots", }, expected: &ledger.Config{ RootFSPath: "/peerfs/ledgersData", @@ -135,9 +138,10 @@ func TestLedgerConfig(t *testing.T) { }, }, PrivateDataConfig: &ledger.PrivateDataConfig{ - MaxBatchSize: 50000, - BatchesInterval: 10000, - PurgeInterval: 1000, + MaxBatchSize: 50000, + BatchesInterval: 10000, + PurgeInterval: 1000, + DeprioritizedDataReconcilerInterval: 60 * time.Minute, }, HistoryDBConfig: &ledger.HistoryDBConfig{ Enabled: true, diff --git a/sampleconfig/core.yaml b/sampleconfig/core.yaml index f0844468d2a..21e91819c80 100644 --- a/sampleconfig/core.yaml +++ b/sampleconfig/core.yaml @@ -678,6 +678,16 @@ ledger: # the minimum duration (in milliseconds) between writing # two consecutive db batches for converting the ineligible missing data entries to eligible missing data entries collElgProcDbBatchesInterval: 1000 + # The missing data entries are classified into two categories: + # (1) prioritized + # (2) deprioritized + # Initially, all missing data are in the prioritized list. When the + # reconciler is unable to fetch the missing data from other peers, + # the unreconciled missing data would be moved to the deprioritized list. + # The reconciler would retry deprioritized missing data after every + # deprioritizedDataReconcilerInterval (unit: minutes). Note that the + # interval needs to be greater than the reconcileSleepInterval + deprioritizedDataReconcilerInterval: 60m ############################################################################### #