Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PubSub subsequent messages not properly ordered by key #753

Closed
rriolobos opened this issue Feb 8, 2022 · 3 comments
Closed

PubSub subsequent messages not properly ordered by key #753

rriolobos opened this issue Feb 8, 2022 · 3 comments

Comments

@rriolobos
Copy link

rriolobos commented Feb 8, 2022

type withOrderingKey struct{}

I tested pubsub ordering key with this library but I found an issue. When you send 3 or more messages, with same ordering key, receiver only waits for first one, second and third messages are consumed concurrently. I added a 5 second sleep in message handler in order to check that the total amount of time was 15 seconds as expected, but it was 10 second. It can also be checked with logs. When I try the same directly invoking pubsub receiver, it worked as expected (@skymeyer).

func receive(ctx context.Context, event event.Event) error {
	fmt.Printf("Event Context: %+v\n", event.Context)

	fmt.Printf("Protocol Context: %+v\n", pscontext.ProtocolContextFrom(ctx))

	data := &Example{}
	if err := event.DataAs(data); err != nil {
		fmt.Printf("Got Data Error: %s\n", err.Error())
	}

	time.Sleep(8 * time.Second)

	fmt.Printf("Data processed: %+v\n", data)

	fmt.Printf("----------------------------\n")

	return nil
}

func main() {
	ctx := context.Background()

	var env envConfig
	if err := envconfig.Process("", &env); err != nil {
		log.Printf("[ERROR] Failed to process env var: %s", err)
		os.Exit(1)
	}

	t, err := cepubsub.New(context.Background(),
		cepubsub.WithProjectID(env.ProjectID),
		cepubsub.WithTopicID(env.TopicID),
		cepubsub.WithReceiveSettings(&pubsub.ReceiveSettings{}),
		cepubsub.WithSubscriptionID(env.SubscriptionID))
	if err != nil {
		log.Fatalf("failed to create pubsub protocol, %s", err.Error())
	}

	t.ReceiveSettings.NumGoroutines = 1

	c, err := cloudevents.NewClient(t)

	if err != nil {
		log.Fatalf("failed to create client, %s", err.Error())
	}

	log.Println("Created client, listening...")

	if err := c.StartReceiver(ctx, receive); err != nil {
		log.Fatalf("failed to start pubsub receiver, %s", err.Error())
	}
}
@skymeyer
Copy link
Contributor

skymeyer commented Feb 8, 2022

@rriolobos I have updated your sample code a bit to try to reproduce and created two samples:

Ordering seems to work as expected on my end. Can you review a couple of things (ranked from most to less likely) as the message ordering seems to work for me:

Hope that helps shed some light. I have not tested letting cloudevents sdk create the topic/subscription dynamically. I had them preconfigured. Are you relying on the auto creation ?

@rriolobos
Copy link
Author

rriolobos commented Feb 8, 2022

Hi @skymeyer,

First of all thanks for your answer.

You are right, I mean, when I set MaxOutstandingMessages=1 messages are properly ordered. In fact, all messages are ordered, both for same key and different one. Let's try with an example:
Let's imagine every message is process in 5 seconds and prints 2 logs: start message key X 5s -> end message key X
Imagine we have the following sequence without setting MaxOutstandigMessages=1:

A,B,C,A,A

I expect the following output;
Start A
StartB
StartC
EndA
StartA
EndB
EndC
EndA
StartA
EndA

I mean, hope messages with different key are processed concurrently and messages with same key are processes sequentially.

The output I'm obtaining:
Start A
StartB
StartC
EndA
StartA
StartA
EndB
EndC
EndA
EndA

After first A ack, A messages order is not kept anymore. Even more, if I send B,B after previos sequence, they are also processed concurrently instead on sequential as expected.

Is it possible to get this behavior based on some specific settings?

When testing this scenario with a receiver using directly the PubSub Receive() method, the output obtained is the expected one. Next you can see the code:

func main() {

	ctx := context.Background()

	var env envConfig

	if err := envconfig.Process("", &env); err != nil {
		log.Printf("[ERROR] Failed to process env var: %s", err)
		os.Exit(1)
	}

	client, err := pubsub.NewClient(ctx, env.ProjectID)
	if err != nil {
		fmt.Errorf("pubsub.NewClient: %v", err)
	}
	defer client.Close()

	sub := client.Subscription(env.SubscriptionID)
	cctx, _ := context.WithCancel(ctx)
	err = sub.Receive(cctx, func(ctx context.Context, msg *pubsub.Message) {
		fmt.Printf("Got message: %q\n", string(msg.Data))
		time.Sleep(6 * time.Second)
		msg.Ack()
		fmt.Printf("Done: %q\n", string(msg.Data)))
	})
	if err != nil {
		fmt.Errorf("Receive: %v", err)
	}

}

Best regards.

@n3wscott
Copy link
Member

I believe the linked PR corrected this issue with the corresponding config set on the transport instance. Closing, please reopen if this was not solved.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants