-
Notifications
You must be signed in to change notification settings - Fork 217
/
write_producer_message.go
163 lines (136 loc) · 4.17 KB
/
write_producer_message.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
/*
Copyright 2021 The CloudEvents Authors
SPDX-License-Identifier: Apache-2.0
*/
package kafka_sarama
import (
"bytes"
"context"
"io"
"github.com/IBM/sarama"
"github.com/cloudevents/sdk-go/v2/binding"
"github.com/cloudevents/sdk-go/v2/binding/format"
"github.com/cloudevents/sdk-go/v2/binding/spec"
"github.com/cloudevents/sdk-go/v2/types"
)
const (
partitionKey = "partitionkey"
)
// WriteProducerMessage fills the provided producerMessage with the message m.
// Using context you can tweak the encoding processing (more details on binding.Write documentation).
// By default, this function implements the key mapping, trying to set the key of the message based on partitionKey extension.
// If you want to disable the Key Mapping, decorate the context with `WithSkipKeyMapping`
func WriteProducerMessage(ctx context.Context, m binding.Message, producerMessage *sarama.ProducerMessage, transformers ...binding.Transformer) error {
writer := (*kafkaProducerMessageWriter)(producerMessage)
skipKey := binding.GetOrDefaultFromCtx(ctx, skipKeyKey{}, false).(bool)
var key string
// If skipKey = false, then we add a transformer that extracts the key
if !skipKey {
transformers = append(transformers, binding.TransformerFunc(func(r binding.MessageMetadataReader, w binding.MessageMetadataWriter) error {
ext := r.GetExtension(partitionKey)
if !types.IsZero(ext) {
extStr, err := types.Format(ext)
if err != nil {
return err
}
key = extStr
}
return nil
}))
}
_, err := binding.Write(
ctx,
m,
writer,
writer,
transformers...,
)
if key != "" {
producerMessage.Key = sarama.StringEncoder(key)
}
return err
}
type kafkaProducerMessageWriter sarama.ProducerMessage
func (b *kafkaProducerMessageWriter) SetStructuredEvent(ctx context.Context, format format.Format, event io.Reader) error {
b.Headers = []sarama.RecordHeader{{
Key: []byte(contentTypeHeader),
Value: []byte(format.MediaType()),
}}
var buf bytes.Buffer
_, err := io.Copy(&buf, event)
if err != nil {
return err
}
b.Value = sarama.ByteEncoder(buf.Bytes())
return nil
}
func (b *kafkaProducerMessageWriter) Start(ctx context.Context) error {
b.Headers = []sarama.RecordHeader{}
return nil
}
func (b *kafkaProducerMessageWriter) End(ctx context.Context) error {
return nil
}
func (b *kafkaProducerMessageWriter) SetData(reader io.Reader) error {
var buf bytes.Buffer
_, err := io.Copy(&buf, reader)
if err != nil {
return err
}
b.Value = sarama.ByteEncoder(buf.Bytes())
return nil
}
func (b *kafkaProducerMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error {
if attribute.Kind() == spec.DataContentType {
if value == nil {
b.removeHeader(contentTypeHeader)
return nil
}
// Everything is a string here
s, err := types.Format(value)
if err != nil {
return err
}
b.Headers = append(b.Headers, sarama.RecordHeader{Key: []byte(contentTypeHeader), Value: []byte(s)})
} else {
if value == nil {
b.removeHeader(prefix + attribute.Name())
return nil
}
// Everything is a string here
s, err := types.Format(value)
if err != nil {
return err
}
b.Headers = append(b.Headers, sarama.RecordHeader{Key: []byte(prefix + attribute.Name()), Value: []byte(s)})
}
return nil
}
func (b *kafkaProducerMessageWriter) SetExtension(name string, value interface{}) error {
if value == nil {
b.removeHeader(prefix + name)
return nil
}
// Kafka headers, everything is a string!
s, err := types.Format(value)
if err != nil {
return err
}
b.Headers = append(b.Headers, sarama.RecordHeader{Key: []byte(prefix + name), Value: []byte(s)})
return nil
}
func (b *kafkaProducerMessageWriter) removeHeader(name string) {
k := []byte(name)
for index, h := range b.Headers {
if bytes.Equal(k, h.Key) {
b.Headers = append(b.Headers[:index], b.Headers[index+1:]...)
return
}
}
}
type skipKeyKey struct{}
func WithSkipKeyMapping(ctx context.Context) context.Context {
return context.WithValue(ctx, skipKeyKey{}, true)
}
var _ binding.StructuredWriter = (*kafkaProducerMessageWriter)(nil) // Test it conforms to the interface
var _ binding.BinaryWriter = (*kafkaProducerMessageWriter)(nil) // Test it conforms to the interface