From e864ebdc2a91266f8fa87fd5f7e0f1b2a3697f2f Mon Sep 17 00:00:00 2001 From: Michael Gasch <15986659+embano1@users.noreply.github.com> Date: Tue, 21 May 2024 21:19:35 +0200 Subject: [PATCH] fix: invalid ce prefix in confluent binding Kafka binding spec requires prefix to be ce_ Closes: #1058 Signed-off-by: Michael Gasch <15986659+embano1@users.noreply.github.com> --- protocol/kafka_confluent/v2/message.go | 5 ++-- protocol/kafka_confluent/v2/message_test.go | 32 ++++++++++----------- 2 files changed, 19 insertions(+), 18 deletions(-) diff --git a/protocol/kafka_confluent/v2/message.go b/protocol/kafka_confluent/v2/message.go index 164879a11..43df8d4ff 100644 --- a/protocol/kafka_confluent/v2/message.go +++ b/protocol/kafka_confluent/v2/message.go @@ -11,14 +11,15 @@ import ( "strconv" "strings" + "github.com/confluentinc/confluent-kafka-go/v2/kafka" + "github.com/cloudevents/sdk-go/v2/binding" "github.com/cloudevents/sdk-go/v2/binding/format" "github.com/cloudevents/sdk-go/v2/binding/spec" - "github.com/confluentinc/confluent-kafka-go/v2/kafka" ) const ( - prefix = "ce-" + prefix = "ce_" contentTypeKey = "content-type" ) diff --git a/protocol/kafka_confluent/v2/message_test.go b/protocol/kafka_confluent/v2/message_test.go index e7f599b63..9676fe7ac 100644 --- a/protocol/kafka_confluent/v2/message_test.go +++ b/protocol/kafka_confluent/v2/message_test.go @@ -42,14 +42,14 @@ var ( TopicPartition: topicPartition, Value: []byte("hello world!"), Headers: mapToKafkaHeaders(map[string]string{ - "ce-type": testEvent.Type(), - "ce-source": testEvent.Source(), - "ce-id": testEvent.ID(), - "ce-time": test.Timestamp.String(), - "ce-specversion": "1.0", - "ce-dataschema": test.Schema.String(), - "ce-datacontenttype": "text/json", - "ce-subject": "receiverTopic", + "ce_type": testEvent.Type(), + "ce_source": testEvent.Source(), + "ce_id": testEvent.ID(), + "ce_time": test.Timestamp.String(), + "ce_specversion": "1.0", + "ce_dataschema": test.Schema.String(), + "ce_datacontenttype": "text/json", + "ce_subject": "receiverTopic", "exta": "someext", }), } @@ -89,14 +89,14 @@ func TestNewMessage(t *testing.T) { TopicPartition: topicPartition, Value: nil, Headers: mapToKafkaHeaders(map[string]string{ - "ce-type": testEvent.Type(), - "ce-source": testEvent.Source(), - "ce-id": testEvent.ID(), - "ce-time": test.Timestamp.String(), - "ce-specversion": "1.0", - "ce-dataschema": test.Schema.String(), - "ce-datacontenttype": "text/json", - "ce-subject": "receiverTopic", + "ce_type": testEvent.Type(), + "ce_source": testEvent.Source(), + "ce_id": testEvent.ID(), + "ce_time": test.Timestamp.String(), + "ce_specversion": "1.0", + "ce_dataschema": test.Schema.String(), + "ce_datacontenttype": "text/json", + "ce_subject": "receiverTopic", }), }, expectedEncoding: binding.EncodingBinary,