Skip to content

Commit

Permalink
pillar/agentlog: Add mutex locks for safe PSI data handling and test …
Browse files Browse the repository at this point in the history
…synchronization.

This commit introduces two mutex locks to ensure thread-safe handling of
PSI data and synchronization during testing.

The first lock (PsiMutex) is added to protect access to the PSI files,
preventing race conditions between the PSI data producer in the tests
and the consumer in memprofile.

The second lock (psiProducerMutex) ensures that only one PSI data
producer runs during tests, avoiding conflicts and ensuring consistent
test results.

Signed-off-by: Nikolay Martyanov <nikolay@zededa.com>
  • Loading branch information
OhmSpectator committed Aug 23, 2024
1 parent 304ccd4 commit 3bc2b2d
Show file tree
Hide file tree
Showing 2 changed files with 50 additions and 35 deletions.
78 changes: 43 additions & 35 deletions pkg/pillar/agentlog/agentlog_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -285,10 +285,16 @@ func matchPsiStats(line string) bool {
return re.MatchString(line)
}

// Mutex for PSI stats producer - let's avoid running multiple producers at the same time
var psiProducerMutex sync.Mutex

func emulateMemoryPressureStats() (cancel context.CancelFunc, err error) {
// Take the mutex on the producer creation and release it when the producer is done
psiProducerMutex.Lock()
// Create a new file for memory pressure stats
fakePSIFileHandler, err := os.CreateTemp("", "memory-pressure")
if err != nil {
psiProducerMutex.Unlock()
return nil, err
}

Expand All @@ -302,12 +308,6 @@ func emulateMemoryPressureStats() (cancel context.CancelFunc, err error) {
ctx, cancel := context.WithCancel(context.Background())

go func() {
defer ticker.Stop()
defer fakePSIFileHandler.Close()
defer os.Remove(fakePSIFileName)
defer func() {
agentlog.PressureMemoryFile = originalPressureMemoryFile
}()
PsiStats := agentlog.PressureStallInfo{
SomeAvg10: 0.00,
SomeAvg60: 0.00,
Expand All @@ -321,6 +321,7 @@ func emulateMemoryPressureStats() (cancel context.CancelFunc, err error) {
for {
select {
case <-ticker.C:
agentlog.PsiMutex.Lock()
PsiStats.SomeAvg10 = generateRandomAvgValue()
PsiStats.SomeAvg60 = generateRandomAvgValue()
PsiStats.SomeAvg300 = generateRandomAvgValue()
Expand All @@ -337,7 +338,16 @@ func emulateMemoryPressureStats() (cancel context.CancelFunc, err error) {
if err := os.WriteFile(fakePSIFileName, []byte(content), 0644); err != nil {
panic(err)
}
agentlog.PsiMutex.Unlock()
case <-ctx.Done():
ticker.Stop()
agentlog.PsiMutex.Lock()
fakePSIFileHandler.Close()
os.Remove(fakePSIFileName)
agentlog.PressureMemoryFile = originalPressureMemoryFile
agentlog.PsiMutex.Unlock()
// We destroy this producer, so release the mutex
psiProducerMutex.Unlock()
return
}
}
Expand All @@ -354,6 +364,8 @@ full avg10=2.00 avg60=0.20 avg300=0.02 total=2000`
)

func createFakePSIStatsFile() (cleanupFunc context.CancelFunc, err error) {
// Take the mutex on the producer creation and release it when the producer is done
psiProducerMutex.Lock()
// Create a new file for memory pressure stats
fakePSIFileHandler, err := os.CreateTemp("", "memory-pressure")
if err != nil {
Expand All @@ -365,23 +377,24 @@ func createFakePSIStatsFile() (cleanupFunc context.CancelFunc, err error) {
agentlog.PressureMemoryFile = fakePSIFileName

// Write some content to the file
agentlog.PsiMutex.Lock()
if err := os.WriteFile(fakePSIFileName, []byte(staticPSIStatsContent), 0644); err != nil {
agentlog.PsiMutex.Unlock()
return nil, err
}
agentlog.PsiMutex.Unlock()

ctx, cancel := context.WithCancel(context.Background())

go func() {
defer func() {
fakePSIFileHandler.Close()
os.Remove(fakePSIFileName)
agentlog.PressureMemoryFile = originalPressureMemoryFile

}()
select {
case <-ctx.Done():
return
}
<-ctx.Done()
agentlog.PsiMutex.Lock()
fakePSIFileHandler.Close()
os.Remove(fakePSIFileName)
agentlog.PressureMemoryFile = originalPressureMemoryFile
agentlog.PsiMutex.Unlock()
// We destroy this producer, so release the mutex
psiProducerMutex.Unlock()
}()

return cancel, nil
Expand Down Expand Up @@ -413,27 +426,22 @@ func startIntegratedPSICollectorAPI() (cancel context.CancelFunc, err error) {
}
ctx, cancel := context.WithCancel(context.Background())
go func() {
defer func() {
if started {
http.Post("http://127.0.0.1:6543/stop", "", nil)
// Wait for the server to stop, check if it is still running
for i := 0; i < 100; i++ {
_, err := http.Get("http://127.0.0.1:6543")
if err != nil && strings.Contains(err.Error(), "connection refused") {
started = false
break
}
time.Sleep(100 * time.Millisecond)
<-ctx.Done()
if started {
http.Post("http://127.0.0.1:6543/stop", "", nil)
// Wait for the server to stop, check if it is still running
for i := 0; i < 100; i++ {
_, err := http.Get("http://127.0.0.1:6543")
if err != nil && strings.Contains(err.Error(), "connection refused") {
started = false
break
}
if started {
panic("could not stop the server in 10 seconds")
}
psiServerMutex.Unlock()
time.Sleep(100 * time.Millisecond)
}
if started {
panic("could not stop the server in 10 seconds")
}
}()
select {
case <-ctx.Done():
return
psiServerMutex.Unlock()
}
}()
return cancel, nil
Expand Down
7 changes: 7 additions & 0 deletions pkg/pillar/agentlog/memprofile.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"os"
"runtime"
"strings"
"sync"
"time"

"github.com/lf-edge/eve/pkg/pillar/types"
Expand Down Expand Up @@ -128,6 +129,10 @@ func GetMemAllocationSites(reportZeroInUse bool) (int, []MemAllocationSite) {
return n, sites
}

// PsiMutex is the mutex to protect the access to the PSI files.
// We need it to avoid a race condition with the PSI data emulator in tests.
var PsiMutex sync.Mutex

func isPSISupported() bool {
_, err := os.Stat(PressureMemoryFile)
if err != nil {
Expand All @@ -138,6 +143,8 @@ func isPSISupported() bool {
}

func collectMemoryPSI() (*PressureStallInfo, error) {
PsiMutex.Lock()
defer PsiMutex.Unlock()
if !isPSISupported() {
return nil, fmt.Errorf("PSI is not supported")
}
Expand Down

0 comments on commit 3bc2b2d

Please sign in to comment.