Skip to content

Commit

Permalink
Fix for missing key in reconsumed message #268
Browse files Browse the repository at this point in the history
  • Loading branch information
Lanayx committed Jul 15, 2024
1 parent 36e5661 commit aebde9c
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 11 deletions.
16 changes: 8 additions & 8 deletions src/Pulsar.Client/Internal/DeadLetters.fs
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ type internal DeadLetterProcessor<'T>

let getOptionalKey (message: Message<'T>) =
if String.IsNullOrEmpty(%message.Key) then
Some { PartitionKey = message.Key; IsBase64Encoded = message.HasBase64EncodedKey }
else
None
else
Some { PartitionKey = message.Key; IsBase64Encoded = message.HasBase64EncodedKey }

interface IDeadLetterProcessor<'T> with
member this.ClearMessages() =
store.Clear()

member this.AddMessage (messageId, message) =
store.[messageId] <- message
store[messageId] <- message

member this.RemoveMessage messageId =
store.Remove(messageId) |> ignore
Expand All @@ -69,16 +69,16 @@ type internal DeadLetterProcessor<'T>
member this.ReconsumeLater (message, deliverAt, acknowledge) =
let propertiesMap = Dictionary<string, string>()
for KeyValue(k, v) in message.Properties do
propertiesMap.[k] <- v
propertiesMap[k] <- v
let mutable reconsumetimes = 1
match propertiesMap.TryGetValue(RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES) with
| true, v ->
reconsumetimes <- v |> int |> (+) 1
| _ ->
propertiesMap.[RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC] <- topicName
propertiesMap.[RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID] <- message.MessageId.ToString()
propertiesMap.[RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES] <- string reconsumetimes
propertiesMap.[RetryMessageUtil.SYSTEM_PROPERTY_DELIVER_AT] <- deliverAt |> string
propertiesMap[RetryMessageUtil.SYSTEM_PROPERTY_REAL_TOPIC] <- topicName
propertiesMap[RetryMessageUtil.SYSTEM_PROPERTY_ORIGIN_MESSAGE_ID] <- message.MessageId.ToString()
propertiesMap[RetryMessageUtil.SYSTEM_PROPERTY_RECONSUMETIMES] <- string reconsumetimes
propertiesMap[RetryMessageUtil.SYSTEM_PROPERTY_DELIVER_AT] <- deliverAt |> string
backgroundTask {
if reconsumetimes > policy.MaxRedeliveryCount then
let dlp = this :> IDeadLetterProcessor<'T>
Expand Down
6 changes: 3 additions & 3 deletions src/Pulsar.Client/Pulsar.Client.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,17 +7,17 @@
<Title>Pulsar.Client</Title>
<RootNamespace>Pulsar.Client</RootNamespace>
<AssemblyName>Pulsar.Client</AssemblyName>
<Version>3.5.1</Version>
<Version>3.5.2</Version>
<Company>F# community</Company>
<Description>.NET client library for Apache Pulsar</Description>
<RepositoryUrl>https://github.com/fsprojects/pulsar-client-dotnet</RepositoryUrl>
<PackageReleaseNotes>Changed Snappy compression package</PackageReleaseNotes>
<PackageReleaseNotes>Fix for missing key in reconsumed message</PackageReleaseNotes>
<PackageLicenseExpression>MIT</PackageLicenseExpression>
<PackageProjectUrl>https://github.com/fsprojects/pulsar-client-dotnet</PackageProjectUrl>
<RepositoryType>git</RepositoryType>
<PackageTags>pulsar</PackageTags>
<Authors>F# community</Authors>
<PackageVersion>3.5.1</PackageVersion>
<PackageVersion>3.5.2</PackageVersion>
<DebugType>portable</DebugType>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
<PackageReadmeFile>README.md</PackageReadmeFile>
Expand Down

0 comments on commit aebde9c

Please sign in to comment.