Skip to content

Commit

Permalink
[FAB-11334] Adds a function to purge a ledger's transient storage (#2769
Browse files Browse the repository at this point in the history
)

This introduces a transientstore.Delete(ledgerID) routine to assist with the bookkeeping
while unjoining a peer from a channel.  Before removing the transient storage, the store
is marked as UNDER_DELETION in a system leveldb, providing an opportunity to recover from
a failed deletion at the next peer initialization.

Signed-off-by: Josh Kneubuhl <jkneubuh@us.ibm.com>
  • Loading branch information
jkneubuh authored Jul 27, 2021
1 parent f662d98 commit bb8bada
Show file tree
Hide file tree
Showing 7 changed files with 898 additions and 26 deletions.
11 changes: 10 additions & 1 deletion common/ledger/util/leveldbhelper/leveldb_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,16 +205,25 @@ func (f *FileLock) Lock() error {
panic(fmt.Sprintf("Error creating dir if missing: %s", err))
}
dbOpts.ErrorIfMissing = !dirEmpty
f.db, err = leveldb.OpenFile(f.filePath, dbOpts)
db, err := leveldb.OpenFile(f.filePath, dbOpts)
if err != nil && err == syscall.EAGAIN {
return errors.Errorf("lock is already acquired on file %s", f.filePath)
}
if err != nil {
panic(fmt.Sprintf("Error acquiring lock on file %s: %s", f.filePath, err))
}

// only mutate the lock db reference AFTER validating that the lock was held.
f.db = db

return nil
}

// Determine if the lock is currently held open.
func (f *FileLock) IsLocked() bool {
return f.db != nil
}

// Unlock releases a previously acquired lock. We achieve this by closing
// the previously opened db. FileUnlock can be called multiple times.
func (f *FileLock) Unlock() {
Expand Down
27 changes: 27 additions & 0 deletions common/ledger/util/leveldbhelper/leveldb_helper_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,33 @@ func TestFileLock(t *testing.T) {
require.NoError(t, os.RemoveAll(fileLockPath))
}

func TestFileLockLockUnlockLock(t *testing.T) {
// create an open lock
lockPath := testDBPath + "/fileLock"
lock := NewFileLock(lockPath)
require.Nil(t, lock.db)
require.Equal(t, lock.filePath, lockPath)
require.False(t, lock.IsLocked())

defer lock.Unlock()
defer os.RemoveAll(lockPath)

// lock
require.NoError(t, lock.Lock())
require.True(t, lock.IsLocked())

// lock
require.ErrorContains(t, lock.Lock(), "lock is already acquired")

// unlock
lock.Unlock()
require.False(t, lock.IsLocked())

// lock - this should not error
require.NoError(t, lock.Lock())
require.True(t, lock.IsLocked())
}

func TestCreateDBInEmptyDir(t *testing.T) {
require.NoError(t, os.RemoveAll(testDBPath), "")
require.NoError(t, os.MkdirAll(testDBPath, 0o775), "")
Expand Down
Loading

0 comments on commit bb8bada

Please sign in to comment.