diff --git a/protocol/mqtt_paho/v2/message.go b/protocol/mqtt_paho/v2/message.go index 8dd938545..f1202f234 100644 --- a/protocol/mqtt_paho/v2/message.go +++ b/protocol/mqtt_paho/v2/message.go @@ -17,8 +17,7 @@ import ( ) const ( - prefix = "ce-" - contentType = "Content-Type" + prefix = "ce-" ) var specs = spec.WithPrefix(prefix) @@ -42,7 +41,7 @@ func NewMessage(msg *paho.Publish) *Message { var v spec.Version if msg.Properties != nil { // Use properties.User["Content-type"] to determine if message is structured - if s := msg.Properties.User.Get(contentType); format.IsFormat(s) { + if s := msg.Properties.ContentType; format.IsFormat(s) { f = format.Lookup(s) } else if s := msg.Properties.User.Get(specs.PrefixedSpecVersionName()); s != "" { v = specs.Version(s) @@ -88,8 +87,6 @@ func (m *Message) ReadBinary(ctx context.Context, encoder binding.BinaryWriter) } else { err = encoder.SetExtension(strings.TrimPrefix(userProperty.Key, prefix), userProperty.Value) } - } else if userProperty.Key == contentType { - err = encoder.SetAttribute(m.version.AttributeFromKind(spec.DataContentType), string(userProperty.Value)) } if err != nil { return diff --git a/protocol/mqtt_paho/v2/message_test.go b/protocol/mqtt_paho/v2/message_test.go index 757f81f5c..ff51a0a0c 100644 --- a/protocol/mqtt_paho/v2/message_test.go +++ b/protocol/mqtt_paho/v2/message_test.go @@ -32,7 +32,7 @@ func TestReadStructured(t *testing.T) { msg: &paho.Publish{ Payload: []byte(""), Properties: &paho.PublishProperties{ - User: []paho.UserProperty{{Key: contentType, Value: event.ApplicationCloudEventsJSON}}, + ContentType: event.ApplicationCloudEventsJSON, }, }, }, diff --git a/protocol/mqtt_paho/v2/write_message.go b/protocol/mqtt_paho/v2/write_message.go index a4b87f4aa..9db47e918 100644 --- a/protocol/mqtt_paho/v2/write_message.go +++ b/protocol/mqtt_paho/v2/write_message.go @@ -42,11 +42,9 @@ var ( func (b *pubMessageWriter) SetStructuredEvent(ctx context.Context, f format.Format, event io.Reader) error { if b.Properties == nil { - b.Properties = &paho.PublishProperties{ - User: make([]paho.UserProperty, 0), - } + b.Properties = &paho.PublishProperties{} } - b.Properties.User.Add(contentType, f.MediaType()) + b.Properties.ContentType = f.MediaType() var buf bytes.Buffer _, err := io.Copy(&buf, event) if err != nil { @@ -85,15 +83,13 @@ func (b *pubMessageWriter) SetData(reader io.Reader) error { func (b *pubMessageWriter) SetAttribute(attribute spec.Attribute, value interface{}) error { if attribute.Kind() == spec.DataContentType { if value == nil { - b.removeProperty(contentType) + b.Properties.ContentType = "" } s, err := types.Format(value) if err != nil { return err } - if err := b.addProperty(contentType, s); err != nil { - return err - } + b.Properties.ContentType = s } else { if value == nil { b.removeProperty(prefix + attribute.Name()) diff --git a/samples/mqtt/sender/main.go b/samples/mqtt/sender/main.go index 10c2a03e7..f48643f17 100644 --- a/samples/mqtt/sender/main.go +++ b/samples/mqtt/sender/main.go @@ -52,6 +52,7 @@ func main() { for i := 0; i < count; i++ { e := cloudevents.NewEvent() + e.SetExtension("eventkey", "eventvalue") e.SetID(uuid.New().String()) e.SetType("com.cloudevents.sample.sent") e.SetSource("https://github.com/cloudevents/sdk-go/samples/mqtt/sender") @@ -62,6 +63,7 @@ func main() { if err != nil { log.Printf("failed to set data: %v", err) } + ctx = cloudevents.WithEncodingStructured(ctx) if result := c.Send( cecontext.WithTopic(ctx, "test-topic"), e,