Skip to content

Commit

Permalink
Fix trigger timer issues
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklas-dohrn committed Sep 12, 2024
1 parent b1fd1a7 commit de826d4
Showing 1 changed file with 21 additions and 24 deletions.
45 changes: 21 additions & 24 deletions src/pkg/egress/syslog/https_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type HTTPSBatchWriter struct {
msgBatch bytes.Buffer
batchSize int
sendInterval time.Duration
sendTimer TriggerTimer
sendTimer *TriggerTimer
egrMsgCount float64
}

Expand Down Expand Up @@ -72,8 +72,8 @@ func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error {

// TODO: Error back propagation. Errors are not looked at further down the call chain
func (w *HTTPSBatchWriter) startAndTriggerSend() {
if !w.sendTimer.Running() {
w.sendTimer.Start(w.sendInterval, func() {
if w.sendTimer == nil || !w.sendTimer.Running() {
w.sendTimer = NewTriggerTimer(w.sendInterval, func() {
w.sendMsgBatch()
})
}
Expand All @@ -83,37 +83,34 @@ func (w *HTTPSBatchWriter) startAndTriggerSend() {
}

type TriggerTimer struct {
trigger chan int
running bool
triggered bool
execFunc func()
}

type Timer interface {
Start(d time.Duration, f func())
}

func NewTriggerTimer() Timer {
return &TriggerTimer{
running: false,
func NewTriggerTimer(d time.Duration, f func()) *TriggerTimer {
timer := &TriggerTimer{
triggered: false,
execFunc: f,
}
timer.initWait(d)

return timer
}

func (t *TriggerTimer) Start(d time.Duration, f func()) {
t.running = true
for {
timer := time.NewTimer(d)
select {
case <-timer.C:
case <-t.trigger:
f()
t.running = false
}
func (t *TriggerTimer) initWait(duration time.Duration) {
timer := time.NewTimer(duration)
<-timer.C
if !t.triggered {
t.execFunc()
}

}

func (t *TriggerTimer) Trigger() {
t.trigger <- 1
t.triggered = true
t.execFunc()
}

func (t *TriggerTimer) Running() bool {
return t.running
return !t.triggered
}

0 comments on commit de826d4

Please sign in to comment.