Skip to content

Commit

Permalink
Merge pull request #906 from mattdowdell/ack-malformed-events
Browse files Browse the repository at this point in the history
Support ACK when receiving malformed events
  • Loading branch information
embano1 authored Aug 3, 2023
2 parents 85db5b9 + f84be73 commit 4cc8c4a
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 5 deletions.
9 changes: 8 additions & 1 deletion v2/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ type ceClient struct {
eventDefaulterFns []EventDefaulter
pollGoroutines int
blockingCallback bool
ackMalformedEvent bool
}

func (c *ceClient) applyOptions(opts ...Option) error {
Expand Down Expand Up @@ -202,7 +203,13 @@ func (c *ceClient) StartReceiver(ctx context.Context, fn interface{}) error {
return fmt.Errorf("client already has a receiver")
}

invoker, err := newReceiveInvoker(fn, c.observabilityService, c.inboundContextDecorators, c.eventDefaulterFns...)
invoker, err := newReceiveInvoker(
fn,
c.observabilityService,
c.inboundContextDecorators,
c.eventDefaulterFns,
c.ackMalformedEvent,
)
if err != nil {
return err
}
Expand Down
87 changes: 87 additions & 0 deletions v2/client/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (

"github.com/google/go-cmp/cmp"

"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/test"
"github.com/cloudevents/sdk-go/v2/client"
"github.com/cloudevents/sdk-go/v2/event"
"github.com/cloudevents/sdk-go/v2/protocol"
Expand Down Expand Up @@ -399,6 +401,68 @@ func TestClientContext(t *testing.T) {
wg.Wait()
}

func TestClientStartReceiverWithAckMalformedEvent(t *testing.T) {
testCases := []struct {
name string
opts []client.Option
expectedAck bool
}{
{
name: "without ack",
},
{
name: "with ack",
opts: []client.Option{client.WithAckMalformedEvent()},
expectedAck: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// make sure the receiver goroutine is closed
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

receiver := &mockReceiver{
finished: make(chan struct{}),
}

// only need 1 goroutine to exercise this
tc.opts = append(tc.opts, client.WithPollGoroutines(1))

c, err := client.New(receiver, tc.opts...)
if err != nil {
t.Fatalf("failed to construct client: %v", err)
}

go c.StartReceiver(ctx, func(ctx context.Context, e event.Event) protocol.Result {
t.Error("receiver callback called unexpectedly")
return nil
})

ctx, cancelTimeout := context.WithTimeout(ctx, time.Second)
defer cancelTimeout()

select {
case <-receiver.finished:
// continue to rest of the test
case <-ctx.Done():
t.Fatalf("timed out waiting for receiver to complete")
}

if tc.expectedAck {
if protocol.IsNACK(receiver.result) {
t.Errorf("receiver did not receive ACK: %v", receiver.result)
}
} else {
if protocol.IsACK(receiver.result) {
t.Errorf("receiver did not receive NACK: %v", receiver.result)
}
}
})
}
}

type requestValidation struct {
Host string
Headers http.Header
Expand Down Expand Up @@ -488,3 +552,26 @@ func isImportantHeader(h string) bool {
}
return true
}

type mockReceiver struct {
mu sync.Mutex
count int
result error
finished chan struct{}
}

func (m *mockReceiver) Receive(ctx context.Context) (binding.Message, error) {
m.mu.Lock()
defer m.mu.Unlock()

if m.count > 0 {
return nil, io.EOF
}

m.count++

return binding.WithFinish(test.UnknownMessage, func(err error) {
m.result = err
close(m.finished)
}), nil
}
2 changes: 1 addition & 1 deletion v2/client/http_receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (
)

func NewHTTPReceiveHandler(ctx context.Context, p *thttp.Protocol, fn interface{}) (*EventReceiver, error) {
invoker, err := newReceiveInvoker(fn, noopObservabilityService{}, nil) //TODO(slinkydeveloper) maybe not nil?
invoker, err := newReceiveInvoker(fn, noopObservabilityService{}, nil, nil, false) //TODO(slinkydeveloper) maybe not nil?
if err != nil {
return nil, err
}
Expand Down
14 changes: 11 additions & 3 deletions v2/client/invoker.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,18 @@ type Invoker interface {

var _ Invoker = (*receiveInvoker)(nil)

func newReceiveInvoker(fn interface{}, observabilityService ObservabilityService, inboundContextDecorators []func(context.Context, binding.Message) context.Context, fns ...EventDefaulter) (Invoker, error) {
func newReceiveInvoker(
fn interface{},
observabilityService ObservabilityService,
inboundContextDecorators []func(context.Context, binding.Message) context.Context,
fns []EventDefaulter,
ackMalformedEvent bool,
) (Invoker, error) {
r := &receiveInvoker{
eventDefaulterFns: fns,
observabilityService: observabilityService,
inboundContextDecorators: inboundContextDecorators,
ackMalformedEvent: ackMalformedEvent,
}

if fn, err := receiver(fn); err != nil {
Expand All @@ -44,6 +51,7 @@ type receiveInvoker struct {
observabilityService ObservabilityService
eventDefaulterFns []EventDefaulter
inboundContextDecorators []func(context.Context, binding.Message) context.Context
ackMalformedEvent bool
}

func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn protocol.ResponseFn) (err error) {
Expand All @@ -58,13 +66,13 @@ func (r *receiveInvoker) Invoke(ctx context.Context, m binding.Message, respFn p
switch {
case eventErr != nil && r.fn.hasEventIn:
r.observabilityService.RecordReceivedMalformedEvent(ctx, eventErr)
return respFn(ctx, nil, protocol.NewReceipt(false, "failed to convert Message to Event: %w", eventErr))
return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "failed to convert Message to Event: %w", eventErr))
case r.fn != nil:
// Check if event is valid before invoking the receiver function
if e != nil {
if validationErr := e.Validate(); validationErr != nil {
r.observabilityService.RecordReceivedMalformedEvent(ctx, validationErr)
return respFn(ctx, nil, protocol.NewReceipt(false, "validation error in incoming event: %w", validationErr))
return respFn(ctx, nil, protocol.NewReceipt(r.ackMalformedEvent, "validation error in incoming event: %w", validationErr))
}
}

Expand Down
13 changes: 13 additions & 0 deletions v2/client/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,3 +126,16 @@ func WithBlockingCallback() Option {
return nil
}
}

// WithAckMalformedevents causes malformed events received within StartReceiver to be acknowledged
// rather than being permanently not-acknowledged. This can be useful when a protocol does not
// provide a responder implementation and would otherwise cause the receiver to be partially or
// fully stuck.
func WithAckMalformedEvent() Option {
return func(i interface{}) error {
if c, ok := i.(*ceClient); ok {
c.ackMalformedEvent = true
}
return nil
}
}
28 changes: 28 additions & 0 deletions v2/client/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -136,3 +136,31 @@ func TestWith_Defaulters(t *testing.T) {
})
}
}

func TestWithAckMalformedEvent(t *testing.T) {
testCases := []struct {
name string
opts []Option
expected bool
}{
{
name: "unset",
},
{
name: "set",
opts: []Option{WithAckMalformedEvent()},
expected: true,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
client := &ceClient{}
client.applyOptions(tc.opts...)

if client.ackMalformedEvent != tc.expected {
t.Errorf("unexpected ackMalformedEvent; want: %t; got: %t", tc.expected, client.ackMalformedEvent)
}
})
}
}

0 comments on commit 4cc8c4a

Please sign in to comment.