Skip to content

Commit

Permalink
Added Error messages and cleaned up some code
Browse files Browse the repository at this point in the history
  • Loading branch information
nicklas-dohrn committed Sep 26, 2024
1 parent dd4169c commit a0f80f0
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 13 deletions.
4 changes: 3 additions & 1 deletion src/pkg/egress/syslog/https.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"crypto/tls"
"errors"
"fmt"
"log"
"net/url"
"strings"
"time"
Expand Down Expand Up @@ -70,7 +71,8 @@ func (w *HTTPSWriter) sendHttpRequest(msg []byte, msgCount float64) error {
func (w *HTTPSWriter) Write(env *loggregator_v2.Envelope) error {
msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname)
if err != nil {
return err
log.Printf("failed to parse syslog, dropping faulty message, err: %s", err)
return nil
}

for _, msg := range msgs {
Expand Down
33 changes: 21 additions & 12 deletions src/pkg/egress/syslog/https_batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package syslog
import (
"bytes"
"crypto/tls"
"log"
"time"

"code.cloudfoundry.org/go-loggregator/v10/rpc/loggregator_v2"
Expand Down Expand Up @@ -51,7 +52,8 @@ func NewHTTPSBatchWriter(
func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error {
msgs, err := w.syslogConverter.ToRFC5424(env, w.hostname)
if err != nil {
return err
log.Printf("Failed to parse syslog, dropping faulty message, err: %s", err)
return nil
}

for _, msg := range msgs {
Expand All @@ -61,28 +63,35 @@ func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error {
}

func (w *HTTPSBatchWriter) startSender() {

t := time.NewTimer(w.sendInterval)

var msgBatch bytes.Buffer
var msgCount float64
reset := func() {
msgBatch.Reset()
msgCount = 0
t.Reset(w.sendInterval)
}
for {
select {
case msg := <-w.msgs:
msgBatch.Write(msg)
msgCount++
if msgBatch.Len() >= w.batchSize {
w.sendHttpRequest(msgBatch.Bytes(), msgCount)
msgBatch.Reset()
msgCount = 0
t.Reset(w.sendInterval)
length, buffer_err := msgBatch.Write(msg)
if buffer_err != nil {
log.Printf("Failed to write to buffer, dropping buffer of size %d , err: %s", length, buffer_err)
reset()
} else {
msgCount++
if length >= w.batchSize {
w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck
reset()
}
}
case <-t.C:
if msgBatch.Len() > 0 {
w.sendHttpRequest(msgBatch.Bytes(), msgCount)
msgBatch.Reset()
msgCount = 0
w.sendHttpRequest(msgBatch.Bytes(), msgCount) //nolint:errcheck
reset()
}
t.Reset(w.sendInterval)
}
}
}

0 comments on commit a0f80f0

Please sign in to comment.