Skip to content

Commit

Permalink
Change batch dispatch implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklas-dohrn committed Sep 12, 2024
1 parent 21162e3 commit 419f450
Show file tree
Hide file tree
Showing 4 changed files with 40 additions and 98 deletions.
89 changes: 30 additions & 59 deletions src/pkg/egress/syslog/https_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,10 +14,9 @@ const BATCHSIZE = 256 * 1024

type HTTPSBatchWriter struct {
HTTPSWriter
msgBatch bytes.Buffer
msgs chan []byte
batchSize int
sendInterval time.Duration
sendTimer *TriggerTimer
egrMsgCount float64
}

Expand All @@ -30,7 +29,7 @@ func NewHTTPSBatchWriter(
) egress.WriteCloser {
client := httpClient(netConf, tlsConf)
binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme
return &HTTPSBatchWriter{
BatchWriter := &HTTPSBatchWriter{
HTTPSWriter: HTTPSWriter{
url: binding.URL,
appID: binding.AppID,
Expand All @@ -42,17 +41,10 @@ func NewHTTPSBatchWriter(
batchSize: BATCHSIZE,
sendInterval: 1 * time.Second,
egrMsgCount: 0,
msgs: make(chan []byte),
}
}

func (w *HTTPSBatchWriter) sendMsgBatch() error {
currentEgrCount := w.egrMsgCount
currentMsg := w.msgBatch.Bytes()

w.egrMsgCount = 0
w.msgBatch.Reset()

return w.sendHttpRequest(currentMsg, currentEgrCount)
go BatchWriter.startSender()
return BatchWriter
}

// Modified Write function
Expand All @@ -63,55 +55,34 @@ func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error {
}

for _, msg := range msgs {
w.msgBatch.Write(msg)
w.egrMsgCount += 1
w.startAndTriggerSend()
w.msgs <- msg
}
return nil
}

// TODO: Error back propagation. Errors are not looked at further down the call chain
func (w *HTTPSBatchWriter) startAndTriggerSend() {
if w.sendTimer == nil || !w.sendTimer.Running() {
w.sendTimer = NewTriggerTimer(w.sendInterval, func() {
w.sendMsgBatch()
})
}
if w.msgBatch.Len() >= w.batchSize {
w.sendTimer.Trigger()
}
}

type TriggerTimer struct {
triggered bool
execFunc func()
}

func NewTriggerTimer(d time.Duration, f func()) *TriggerTimer {
timer := &TriggerTimer{
triggered: false,
execFunc: f,
func (w *HTTPSBatchWriter) startSender() {
t := time.NewTimer(w.sendInterval)

var msgBatch bytes.Buffer
var msgCount float64
for {
select {
case msg := <-w.msgs:
msgBatch.Write(msg)
msgCount++
if msgBatch.Len() >= w.batchSize {
w.sendHttpRequest(msgBatch.Bytes(), msgCount)
msgBatch.Reset()
msgCount = 0
t.Reset(w.sendInterval)
}
case <-t.C:
if msgBatch.Len() > 0 {
w.sendHttpRequest(msgBatch.Bytes(), msgCount)
msgBatch.Reset()
msgCount = 0
}
t.Reset(w.sendInterval)
}
}
timer.initWait(d)

return timer
}

func (t *TriggerTimer) initWait(duration time.Duration) {
timer := time.NewTimer(duration)
go func() {
<-timer.C
t.Trigger()
}()
}

func (t *TriggerTimer) Trigger() {
if !t.triggered {
t.triggered = true
t.execFunc()
}
}

func (t *TriggerTimer) Running() bool {
return !t.triggered
}
40 changes: 2 additions & 38 deletions src/pkg/egress/syslog/https_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,46 +17,8 @@ import (
. "github.com/onsi/gomega"
)

var triggered = 0
var string_to_1024_chars = "saljdflajsdssdfsdfljkfkajafjajlköflkjöjaklgljksdjlakljkflkjweljklkwjejlkfekljwlkjefjklwjklsdajkljklwerlkaskldgjksakjekjwrjkljasdjkgfkljwejklrkjlklasdkjlsadjlfjlkadfljkajklsdfjklslkdfjkllkjasdjkflsdlakfjklasldfkjlasdjfkjlsadlfjklaljsafjlslkjawjklerkjljklasjkdfjklwerjljalsdjkflwerjlkwejlkarjklalkklfsdjlfhkjsdfkhsewhkjjasdjfkhwkejrkjahjefkhkasdjhfkashfkjwehfkksadfjaskfkhjdshjfhewkjhasdfjdajskfjwehkfajkankaskjdfasdjhfkkjhjjkasdfjhkjahksdf"

var _ = Describe("TriggerTimer testing", func() {
BeforeEach(func() {
triggered = 0
})

It("Timer triggered by call", func() {
timer := syslog.NewTriggerTimer(10*time.Millisecond, trigger)
timer.Trigger()
//expect timer to be triggered and therefore stopped
Expect(timer.Running()).To(BeFalse())
Expect(triggered).To(Equal(1))
time.Sleep(12 * time.Millisecond)
//expect timer to stay stopped and not trigger func again
Expect(timer.Running()).To(BeFalse())
Expect(triggered).To(Equal(1))
})

It("Timer triggered by time elapsed", func() {
timer := syslog.NewTriggerTimer(10*time.Millisecond, trigger)
//expect timer to be running and untriggered
Expect(timer.Running()).To(BeTrue())
Expect(triggered).To(Equal(0))
time.Sleep(12 * time.Millisecond)
//expect timer to be triggered and stopped
Expect(timer.Running()).To(BeFalse())
Expect(triggered).To(Equal(1))
//expect timer to not be able to be retriggered
timer.Trigger()
Expect(timer.Running()).To(BeFalse())
Expect(triggered).To(Equal(1))
})
})

func trigger() {
triggered += 1
}

var _ = Describe("HTTPS_batch_testing", func() {
var (
netConf syslog.NetworkTimeoutConfig
Expand Down Expand Up @@ -125,6 +87,7 @@ var _ = Describe("HTTPS_batch_testing", func() {
for i := 0; i < 300; i++ {
writer.Write(env1)
}
time.Sleep(100 * time.Millisecond)
Expect(drain.messages).To(HaveLen(256))
})

Expand All @@ -134,6 +97,7 @@ var _ = Describe("HTTPS_batch_testing", func() {
writer.Write(env1)
time.Sleep(99 * time.Millisecond)
}
Expect(drain.messages).To(HaveLen(0))
time.Sleep(100 * time.Millisecond)
Expect(drain.messages).To(HaveLen(10))
})
Expand Down
1 change: 0 additions & 1 deletion src/pkg/egress/syslog/https_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ var _ = Describe("HTTPWriter", func() {
&metricsHelpers.SpyMetric{},
c,
)

env := buildLogEnvelope("APP", "1", "just a test", loggregator_v2.Log_OUT)
Expect(writer.Write(env)).To(HaveOccurred())
})
Expand Down
8 changes: 8 additions & 0 deletions src/pkg/egress/syslog/syslog_connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,4 +166,12 @@ func (w *SyslogConnector) emitLoggregatorErrorLog(appID, message string) {
w.logClient.EmitLog(message, option)
}
func (w *SyslogConnector) emitStandardOutErrorLog(appID, scheme, url string, missed int) {
errorAppOrAggregate := fmt.Sprintf("for %s's app drain", appID)
if appID == "" {
errorAppOrAggregate = "for aggregate drain"
}
log.Printf(
"Dropped %d %s logs %s with url %s",
missed, scheme, errorAppOrAggregate, url,
)
}

0 comments on commit 419f450

Please sign in to comment.