Skip to content

Commit

Permalink
Add private write-set for purge operation
Browse files Browse the repository at this point in the history
This commit adds a private write-set for a purge operation.
The private write-set contains the delete operation on the key getting purged.

Signed-off-by: manish <manish.sethi@gmail.com>
  • Loading branch information
manish-sethi authored and denyeart committed Nov 4, 2022
1 parent d4dc075 commit a6f52f3
Show file tree
Hide file tree
Showing 10 changed files with 174 additions and 62 deletions.
10 changes: 6 additions & 4 deletions core/ledger/kvledger/txmgmt/rwsetutil/rwset_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,12 @@ func (b *RWSetBuilder) AddToPvtAndHashedWriteSet(ns string, coll string, key str
b.getOrCreateCollHashedRwBuilder(ns, coll).writeMap[key] = kvWriteHash
}

// AddToHashedWriteSetPurge adds a purge key to the hashed write-set
func (b *RWSetBuilder) AddToHashedWriteSetPurge(ns string, coll string, key string) {
kvWriteHashPurge := newKVWriteHashPurge(key)
b.getOrCreateCollHashedRwBuilder(ns, coll).writeMap[key] = kvWriteHashPurge
// AddToPvtAndHashedWriteSetForPurge adds a purge key to the hashed write-set
func (b *RWSetBuilder) AddToPvtAndHashedWriteSetForPurge(ns string, coll string, key string) {
kvWrite, kvWriteHash := newPvtKVWriteAndHash(key, nil)
kvWriteHash.IsPurge = true
b.getOrCreateCollPvtRwBuilder(ns, coll).writeMap[key] = kvWrite
b.getOrCreateCollHashedRwBuilder(ns, coll).writeMap[key] = kvWriteHash
}

// AddToHashedMetadataWriteSet adds a metadata to a key in the hashed write-set
Expand Down
31 changes: 25 additions & 6 deletions core/ledger/kvledger/txmgmt/rwsetutil/rwset_builder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func TestTxSimulationResultWithPvtData(t *testing.T) {
rwSetBuilder.AddToPvtAndHashedWriteSet("ns2", "coll2", "key1", []byte("pvt-ns2-coll2-key1-value"))

// pvt rwset ns3
rwSetBuilder.AddToHashedWriteSetPurge("ns3", "coll1", "key1")
rwSetBuilder.AddToPvtAndHashedWriteSetForPurge("ns3", "coll1", "key1")

actualSimRes, err := rwSetBuilder.GetTxSimulationResults()
require.NoError(t, err)
Expand All @@ -116,6 +116,15 @@ func TestTxSimulationResultWithPvtData(t *testing.T) {
Writes: []*kvrwset.KVWrite{newKVWrite("key1", []byte("pvt-ns2-coll2-key1-value"))},
}

pvtNs3Coll1 := &kvrwset.KVRWSet{
Writes: []*kvrwset.KVWrite{
{
Key: "key1",
IsDelete: true,
},
},
}

expectedPvtRWSet := &rwset.TxPvtReadWriteSet{
DataModel: rwset.TxReadWriteSet_KV,
NsPvtRwset: []*rwset.NsPvtReadWriteSet{
Expand All @@ -138,6 +147,15 @@ func TestTxSimulationResultWithPvtData(t *testing.T) {
},
},
},
{
Namespace: "ns3",
CollectionPvtRwset: []*rwset.CollectionPvtReadWriteSet{
{
CollectionName: "coll1",
Rwset: serializeTestProtoMsg(t, pvtNs3Coll1),
},
},
},
},
}
require.Equal(t, expectedPvtRWSet, actualSimRes.PvtSimulationResults)
Expand Down Expand Up @@ -186,7 +204,11 @@ func TestTxSimulationResultWithPvtData(t *testing.T) {

hashedNs3Coll1 := &kvrwset.HashedRWSet{
HashedWrites: []*kvrwset.KVWriteHash{
constructTestKVWriteHashPurge(t, "key1"),
{
KeyHash: util.ComputeStringHash("key1"),
IsDelete: true,
IsPurge: true,
},
},
}

Expand Down Expand Up @@ -231,6 +253,7 @@ func TestTxSimulationResultWithPvtData(t *testing.T) {
{
CollectionName: "coll1",
HashedRwset: serializeTestProtoMsg(t, hashedNs3Coll1),
PvtRwsetHash: util.ComputeHash(serializeTestProtoMsg(t, pvtNs3Coll1)),
},
},
}
Expand Down Expand Up @@ -380,10 +403,6 @@ func constructTestPvtKVWriteHash(t *testing.T, key string, value []byte) *kvrwse
return kvWriteHash
}

func constructTestKVWriteHashPurge(t *testing.T, key string) *kvrwset.KVWriteHash {
return newKVWriteHashPurge(key)
}

func serializeTestProtoMsg(t *testing.T, protoMsg proto.Message) []byte {
msgBytes, err := proto.Marshal(protoMsg)
require.NoError(t, err)
Expand Down
5 changes: 0 additions & 5 deletions core/ledger/kvledger/txmgmt/rwsetutil/rwset_proto_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,11 +362,6 @@ func newPvtKVWriteAndHash(key string, value []byte) (*kvrwset.KVWrite, *kvrwset.
return kvWrite, &kvrwset.KVWriteHash{KeyHash: keyHash, IsDelete: kvWrite.IsDelete, ValueHash: valueHash}
}

func newKVWriteHashPurge(key string) *kvrwset.KVWriteHash {
keyHash := util.ComputeStringHash(key)
return &kvrwset.KVWriteHash{KeyHash: keyHash, IsDelete: true, IsPurge: true}
}

// IsKVWriteDelete returns true if the kvWrite indicates a delete operation. See FAB-18386 for details.
func IsKVWriteDelete(kvWrite *kvrwset.KVWrite) bool {
return kvWrite.IsDelete || len(kvWrite.Value) == 0
Expand Down
10 changes: 0 additions & 10 deletions core/ledger/kvledger/txmgmt/rwsetutil/rwset_proto_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,16 +245,6 @@ func TestVersionConversion(t *testing.T) {
require.Equal(t, protoVer, newProtoVersion(internalVer))
}

func TestNewKVWriteHashPurge(t *testing.T) {
sampleKey := "purge-key1"
sampleKVWriteHash := &kvrwset.KVWriteHash{
KeyHash: util.ComputeStringHash(sampleKey),
IsDelete: true,
IsPurge: true,
}
require.Equal(t, sampleKVWriteHash, newKVWriteHashPurge(sampleKey))
}

func TestIsDelete(t *testing.T) {
t.Run("kvWrite", func(t *testing.T) {
kvWritesToBeInterpretedAsDelete := []*kvrwset.KVWrite{
Expand Down
2 changes: 1 addition & 1 deletion core/ledger/kvledger/txmgmt/txmgr/tx_simulator.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ func (s *txSimulator) PurgePrivateData(ns, coll, key string) error {
return err
}
s.writePerformed = true
s.rwsetBuilder.AddToHashedWriteSetPurge(ns, coll, key)
s.rwsetBuilder.AddToPvtAndHashedWriteSetForPurge(ns, coll, key)
return nil
}

Expand Down
4 changes: 2 additions & 2 deletions core/ledger/kvledger/txmgmt/validation/validator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -376,8 +376,8 @@ func TestPrvtdataPurgeUpdates(t *testing.T) {

rwsetBuilder1 := rwsetutil.NewRWSetBuilder()
// key1 and key2 are purged, and key3 is written
rwsetBuilder1.AddToHashedWriteSetPurge("ns1", "coll1", "key1")
rwsetBuilder1.AddToHashedWriteSetPurge("ns1", "coll1", "key2")
rwsetBuilder1.AddToPvtAndHashedWriteSetForPurge("ns1", "coll1", "key1")
rwsetBuilder1.AddToPvtAndHashedWriteSetForPurge("ns1", "coll1", "key2")
rwsetBuilder1.AddToPvtAndHashedWriteSet("ns1", "coll1", "key3", []byte("value3"))
txRWset1 := rwsetBuilder1.GetTxReadWriteSet()

Expand Down
10 changes: 10 additions & 0 deletions core/ledger/pvtdatastorage/helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/bits-and-blooms/bitset"
"github.com/hyperledger/fabric-protos-go/ledger/rwset"
"github.com/hyperledger/fabric/core/ledger"
"github.com/hyperledger/fabric/core/ledger/internal/version"
"github.com/hyperledger/fabric/core/ledger/kvledger/txmgmt/rwsetutil"
"github.com/hyperledger/fabric/core/ledger/pvtdatapolicy"
"github.com/hyperledger/fabric/core/ledger/util"
Expand Down Expand Up @@ -192,6 +193,7 @@ func prepareHashedIndexEntries(dataEntires []*dataEntry) ([]*hashedIndexEntry, e
func preparePurgerMarkerEntries(blkNum uint64, purgeMarkers []*PurgeMarker) ([]*purgeMarkerEntry, []*purgeMarkerCollEntry) {
purgeMarkersEntries := []*purgeMarkerEntry{}
purgeMarkersCollEntries := []*purgeMarkerCollEntry{}
nsCollVisitedMap := map[nsColl]*version.Height{}

for _, m := range purgeMarkers {
purgeMarkersEntries = append(purgeMarkersEntries,
Expand All @@ -208,6 +210,13 @@ func preparePurgerMarkerEntries(blkNum uint64, purgeMarkers []*PurgeMarker) ([]*
},
)

nsColl := nsColl{ns: m.Ns, coll: m.Coll}
version := version.NewHeight(blkNum, m.TxNum)
visitedVersion, ok := nsCollVisitedMap[nsColl]
if ok && visitedVersion.Compare(version) > 0 {
// a key in the same collection with higher version already caused adding of collection level purge mearker entry
continue
}
purgeMarkersCollEntries = append(purgeMarkersCollEntries,
&purgeMarkerCollEntry{
key: &purgeMarkerCollKey{
Expand All @@ -220,6 +229,7 @@ func preparePurgerMarkerEntries(blkNum uint64, purgeMarkers []*PurgeMarker) ([]*
},
},
)
nsCollVisitedMap[nsColl] = version
}
return purgeMarkersEntries, purgeMarkersCollEntries
}
Expand Down
11 changes: 8 additions & 3 deletions core/ledger/pvtdatastorage/kv_encoding.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,10 +308,15 @@ func rangeScanKeysForPurgeMarkers() ([]byte, []byte) {

// driveHashedIndexKeyRangeFromPurgeMarker returns the scan range for hashedIndexKeys for a key specified by the `purgeMarkerKey`.
// The range covers all the hashedIndexKeys between block 0 and the height specified in the `purgeMarkerVal`
func driveHashedIndexKeyRangeFromPurgeMarker(purgeMarkerKey, purgeMarkerVal []byte) ([]byte, []byte) {
func driveHashedIndexKeyRangeFromPurgeMarker(purgeMarkerKey, purgeMarkerVal []byte) ([]byte, []byte, error) {
startKey := append(hashedIndexKeyPrefix, purgeMarkerKey[1:]...)
endKey := append(startKey, purgeMarkerVal...)
return startKey, endKey
version, err := decodePurgeMarkerVal(purgeMarkerVal)
if err != nil {
return nil, nil, err
}
version.TxNum += 1 // increase transaction by one so that the private key for the purge operation itself is also included
endKey := append(startKey, version.ToBytes()...)
return startKey, endKey, nil
}

func rangeScanKeysForHashedIndexKey(ns, coll string, keyHash []byte) ([]byte, []byte) {
Expand Down
7 changes: 5 additions & 2 deletions core/ledger/pvtdatastorage/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,7 +599,7 @@ func (s *Store) removePurgedDataFromCollPvtRWset(k *dataKey, v *rwset.Collection
return err
}

if keyHt.Compare(purgeMarkerHt) >= 0 {
if keyHt.Compare(purgeMarkerHt) > 0 {
filterInKVWrites = append(filterInKVWrites, w)
continue
}
Expand Down Expand Up @@ -840,7 +840,10 @@ func (s *Store) deleteDataMarkedForPurge() error {
if err := purgeMarkerIter.Error(); err != nil {
return err
}
hStart, hEnd := driveHashedIndexKeyRangeFromPurgeMarker(purgeMarkerIter.Key(), purgeMarkerIter.Value())
hStart, hEnd, err := driveHashedIndexKeyRangeFromPurgeMarker(purgeMarkerIter.Key(), purgeMarkerIter.Value())
if err != nil {
return err
}
hashedIndexIter, err := s.db.GetIterator(hStart, hEnd)
if err != nil {
return err
Expand Down
Loading

0 comments on commit a6f52f3

Please sign in to comment.