diff --git a/src/pkg/egress/syslog/https_batch.go b/src/pkg/egress/syslog/https_batch.go index 06a394b57..874fc9a1f 100644 --- a/src/pkg/egress/syslog/https_batch.go +++ b/src/pkg/egress/syslog/https_batch.go @@ -14,10 +14,9 @@ const BATCHSIZE = 256 * 1024 type HTTPSBatchWriter struct { HTTPSWriter - msgBatch bytes.Buffer + msgs chan []byte batchSize int sendInterval time.Duration - sendTimer *TriggerTimer egrMsgCount float64 } @@ -30,7 +29,7 @@ func NewHTTPSBatchWriter( ) egress.WriteCloser { client := httpClient(netConf, tlsConf) binding.URL.Scheme = "https" // reset the scheme for usage to a valid http scheme - return &HTTPSBatchWriter{ + BatchWriter := &HTTPSBatchWriter{ HTTPSWriter: HTTPSWriter{ url: binding.URL, appID: binding.AppID, @@ -42,17 +41,10 @@ func NewHTTPSBatchWriter( batchSize: BATCHSIZE, sendInterval: 1 * time.Second, egrMsgCount: 0, + msgs: make(chan []byte), } -} - -func (w *HTTPSBatchWriter) sendMsgBatch() error { - currentEgrCount := w.egrMsgCount - currentMsg := w.msgBatch.Bytes() - - w.egrMsgCount = 0 - w.msgBatch.Reset() - - return w.sendHttpRequest(currentMsg, currentEgrCount) + go BatchWriter.startSender() + return BatchWriter } // Modified Write function @@ -63,55 +55,34 @@ func (w *HTTPSBatchWriter) Write(env *loggregator_v2.Envelope) error { } for _, msg := range msgs { - w.msgBatch.Write(msg) - w.egrMsgCount += 1 - w.startAndTriggerSend() + w.msgs <- msg } return nil } -// TODO: Error back propagation. Errors are not looked at further down the call chain -func (w *HTTPSBatchWriter) startAndTriggerSend() { - if w.sendTimer == nil || !w.sendTimer.Running() { - w.sendTimer = NewTriggerTimer(w.sendInterval, func() { - w.sendMsgBatch() - }) - } - if w.msgBatch.Len() >= w.batchSize { - w.sendTimer.Trigger() - } -} - -type TriggerTimer struct { - triggered bool - execFunc func() -} - -func NewTriggerTimer(d time.Duration, f func()) *TriggerTimer { - timer := &TriggerTimer{ - triggered: false, - execFunc: f, +func (w *HTTPSBatchWriter) startSender() { + t := time.NewTimer(w.sendInterval) + + var msgBatch bytes.Buffer + var msgCount float64 + for { + select { + case msg := <-w.msgs: + msgBatch.Write(msg) + msgCount++ + if msgBatch.Len() >= w.batchSize { + w.sendHttpRequest(msgBatch.Bytes(), msgCount) + msgBatch.Reset() + msgCount = 0 + t.Reset(w.sendInterval) + } + case <-t.C: + if msgBatch.Len() > 0 { + w.sendHttpRequest(msgBatch.Bytes(), msgCount) + msgBatch.Reset() + msgCount = 0 + } + t.Reset(w.sendInterval) + } } - timer.initWait(d) - - return timer -} - -func (t *TriggerTimer) initWait(duration time.Duration) { - timer := time.NewTimer(duration) - go func() { - <-timer.C - t.Trigger() - }() -} - -func (t *TriggerTimer) Trigger() { - if !t.triggered { - t.triggered = true - t.execFunc() - } -} - -func (t *TriggerTimer) Running() bool { - return !t.triggered } diff --git a/src/pkg/egress/syslog/https_batch_test.go b/src/pkg/egress/syslog/https_batch_test.go index 389283574..2a328f9b9 100644 --- a/src/pkg/egress/syslog/https_batch_test.go +++ b/src/pkg/egress/syslog/https_batch_test.go @@ -17,46 +17,8 @@ import ( . "github.com/onsi/gomega" ) -var triggered = 0 var string_to_1024_chars = "saljdflajsdssdfsdfljkfkajafjajlköflkjöjaklgljksdjlakljkflkjweljklkwjejlkfekljwlkjefjklwjklsdajkljklwerlkaskldgjksakjekjwrjkljasdjkgfkljwejklrkjlklasdkjlsadjlfjlkadfljkajklsdfjklslkdfjkllkjasdjkflsdlakfjklasldfkjlasdjfkjlsadlfjklaljsafjlslkjawjklerkjljklasjkdfjklwerjljalsdjkflwerjlkwejlkarjklalkklfsdjlfhkjsdfkhsewhkjjasdjfkhwkejrkjahjefkhkasdjhfkashfkjwehfkksadfjaskfkhjdshjfhewkjhasdfjdajskfjwehkfajkankaskjdfasdjhfkkjhjjkasdfjhkjahksdf" -var _ = Describe("TriggerTimer testing", func() { - BeforeEach(func() { - triggered = 0 - }) - - It("Timer triggered by call", func() { - timer := syslog.NewTriggerTimer(10*time.Millisecond, trigger) - timer.Trigger() - //expect timer to be triggered and therefore stopped - Expect(timer.Running()).To(BeFalse()) - Expect(triggered).To(Equal(1)) - time.Sleep(12 * time.Millisecond) - //expect timer to stay stopped and not trigger func again - Expect(timer.Running()).To(BeFalse()) - Expect(triggered).To(Equal(1)) - }) - - It("Timer triggered by time elapsed", func() { - timer := syslog.NewTriggerTimer(10*time.Millisecond, trigger) - //expect timer to be running and untriggered - Expect(timer.Running()).To(BeTrue()) - Expect(triggered).To(Equal(0)) - time.Sleep(12 * time.Millisecond) - //expect timer to be triggered and stopped - Expect(timer.Running()).To(BeFalse()) - Expect(triggered).To(Equal(1)) - //expect timer to not be able to be retriggered - timer.Trigger() - Expect(timer.Running()).To(BeFalse()) - Expect(triggered).To(Equal(1)) - }) -}) - -func trigger() { - triggered += 1 -} - var _ = Describe("HTTPS_batch_testing", func() { var ( netConf syslog.NetworkTimeoutConfig @@ -125,6 +87,7 @@ var _ = Describe("HTTPS_batch_testing", func() { for i := 0; i < 300; i++ { writer.Write(env1) } + time.Sleep(100 * time.Millisecond) Expect(drain.messages).To(HaveLen(256)) }) @@ -134,6 +97,7 @@ var _ = Describe("HTTPS_batch_testing", func() { writer.Write(env1) time.Sleep(99 * time.Millisecond) } + Expect(drain.messages).To(HaveLen(0)) time.Sleep(100 * time.Millisecond) Expect(drain.messages).To(HaveLen(10)) }) diff --git a/src/pkg/egress/syslog/https_test.go b/src/pkg/egress/syslog/https_test.go index 21bf138c5..ebf999067 100644 --- a/src/pkg/egress/syslog/https_test.go +++ b/src/pkg/egress/syslog/https_test.go @@ -78,7 +78,6 @@ var _ = Describe("HTTPWriter", func() { &metricsHelpers.SpyMetric{}, c, ) - env := buildLogEnvelope("APP", "1", "just a test", loggregator_v2.Log_OUT) Expect(writer.Write(env)).To(HaveOccurred()) }) diff --git a/src/pkg/egress/syslog/syslog_connector.go b/src/pkg/egress/syslog/syslog_connector.go index e487082da..eaaa56207 100644 --- a/src/pkg/egress/syslog/syslog_connector.go +++ b/src/pkg/egress/syslog/syslog_connector.go @@ -166,4 +166,12 @@ func (w *SyslogConnector) emitLoggregatorErrorLog(appID, message string) { w.logClient.EmitLog(message, option) } func (w *SyslogConnector) emitStandardOutErrorLog(appID, scheme, url string, missed int) { + errorAppOrAggregate := fmt.Sprintf("for %s's app drain", appID) + if appID == "" { + errorAppOrAggregate = "for aggregate drain" + } + log.Printf( + "Dropped %d %s logs %s with url %s", + missed, scheme, errorAppOrAggregate, url, + ) }