Skip to content

Commit

Permalink
Merge pull request #37 from rewardStyle/Less-greedy-Send-and-Sync-sends
Browse files Browse the repository at this point in the history
Make .Close() less greedy, and make sync close too
  • Loading branch information
fjordan authored Jan 16, 2017
2 parents d06da20 + 104a79f commit 0642f2a
Show file tree
Hide file tree
Showing 4 changed files with 95 additions and 1 deletion.
30 changes: 29 additions & 1 deletion listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
Expand Down Expand Up @@ -302,7 +303,6 @@ func (l *Listener) Close() error {
if conf.Debug.Verbose {
log.Println("Listener is waiting for all tasks to finish...")
}

// Stop consuming
go func() {
l.interrupts <- syscall.SIGINT
Expand All @@ -313,7 +313,35 @@ func (l *Listener) Close() error {
if conf.Debug.Verbose {
log.Println("Listener is shutting down.")
}
runtime.Gosched()
return nil
}

// CloseSync closes the Listener in a syncronous manner.
func (l *Listener) CloseSync() error {
if conf.Debug.Verbose {
log.Println("Listener is waiting for all tasks to finish...")
}
var err error
// Stop consuming
select {
case l.interrupts <- syscall.SIGINT:
break
default:
if conf.Debug.Verbose {
log.Println("Already closing listener.")
}
runtime.Gosched()
return err
}
l.wg.Wait()
for l.IsConsuming() {
runtime.Gosched()
}
if conf.Debug.Verbose {
log.Println("Listener is shutting down.")
}
runtime.Gosched()
return nil
}

Expand Down
19 changes: 19 additions & 0 deletions listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,25 @@ func TestListenerStop(t *testing.T) {
listener.Close()
}

func TestListenerSyncStop(t *testing.T) {
listener, _ := new(Listener).Init()
listener.NewEndpoint(testEndpoint, "stream-name")

Convey("Given a running listener", t, func() {
go listener.Listen(func(msg []byte, wg *sync.WaitGroup) {
wg.Done()
})

Convey("It should stop listening if sent an interrupt signal", func() {
err := listener.CloseSync()
So(err, ShouldBeNil)
So(listener.IsListening(), ShouldEqual, false)
})
})

listener.Close()
}

func TestListenerError(t *testing.T) {
listener, _ := new(Listener).Init()
listener.NewEndpoint(testEndpoint, "stream-name")
Expand Down
29 changes: 29 additions & 0 deletions producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"log"
"os"
"os/signal"
"runtime"
"strconv"
"sync"
"syscall"
Expand Down Expand Up @@ -370,7 +371,35 @@ func (p *Producer) Close() error {
if conf.Debug.Verbose {
log.Println("Producer is shutting down.")
}
runtime.Gosched()
return nil
}

// CloseSync closes the Producer in a syncronous manner.
func (p *Producer) CloseSync() error {
if conf.Debug.Verbose {
log.Println("Listener is waiting for all tasks to finish...")
}
var err error
// Stop consuming
select {
case p.interrupts <- syscall.SIGINT:
break
default:
if conf.Debug.Verbose {
log.Println("Already closing listener.")
}
runtime.Gosched()
return err
}
p.wg.Wait()
for p.IsProducing() {
runtime.Gosched()
}
if conf.Debug.Verbose {
log.Println("Listener is shutting down.")
}
runtime.Gosched()
return nil
}

Expand Down
18 changes: 18 additions & 0 deletions producer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,24 @@ func TestProducerStop(t *testing.T) {
producer.Close()
}

func TestSyncStop(t *testing.T) {
producer, _ := new(Producer).Init()
producer.NewEndpoint(testEndpoint, "stream-name")

Convey("Given a running producer", t, func() {
go producer.produce()
runtime.Gosched()
Convey("It should stop producing if sent an interrupt signal", func() {
err := producer.CloseSync()
So(err, ShouldBeNil)
// Wait for it to stop
So(producer.IsProducing(), ShouldEqual, false)
})
})

producer.Close()
}

func TestProducerError(t *testing.T) {
producer, _ := new(Producer).Init()
producer.NewEndpoint(testEndpoint, "stream-name")
Expand Down

0 comments on commit 0642f2a

Please sign in to comment.