Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support jetStream #695

Merged
merged 7 commits into from
Jul 15, 2021
Merged

support jetStream #695

merged 7 commits into from
Jul 15, 2021

Conversation

zhaojizhuang
Copy link
Contributor

Fix #694

add JetStream support in protocol

Signed-off-by: zhaojizhuang <571130360@qq.com>
@zhaojizhuang
Copy link
Contributor Author

@n3wscott @ripienaar

Signed-off-by: zhaojizhuang <571130360@qq.com>
@zhaojizhuang
Copy link
Contributor Author

zhaojizhuang commented Jul 12, 2021

@ripienaar jsm module has been dropped

@ripienaar
Copy link
Contributor

@wallyqs do you think you might have time to look this over? thanks!

protocol/jsm/v2/options.go Outdated Show resolved Hide resolved
protocol/jsm/v2/sender.go Outdated Show resolved Hide resolved
Signed-off-by: zhaojizhuang <571130360@qq.com>
Signed-off-by: zhaojizhuang <571130360@qq.com>
@zhaojizhuang
Copy link
Contributor Author

@wallyqs PR updated

Signed-off-by: zhaojizhuang <571130360@qq.com>
Signed-off-by: zhaojizhuang <571130360@qq.com>
Copy link
Member

@n3wscott n3wscott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would like to see more unit tests, we have been attempting to hit the 80% coverage mark in the repo.

We also should enable some e2e tests to know this is being tested and working, there are several examples in the testing folder, as well as current nats/stan tests.

Does jetstream replace nats/stan?

/*
Package jsm implements the CloudEvent transport implementation using NATS JetStream.
*/
package jsm
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would rather call this package nats_jetstream, as we did for kafka_sarama: https://github.com/cloudevents/sdk-go/tree/main/protocol/kafka_sarama/v2

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah,we can do that


var _ binding.Message = (*Message)(nil)

func (m *Message) ReadEncoding() binding.Encoding {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please document each exported method, even if it is just ReadEncoding implements bindings.interface or whatever the real interface is.

var ErrInvalidQueueName = errors.New("invalid queue name for QueueSubscriber")

// NatsOptions is a helper function to group a variadic nats.ProtocolOption into
// []stan.Option that can be used by either Sender, Consumer or Protocol
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

comment says stan, but the import is nats.


// NewMessage wraps an *nats.Msg in a binding.Message.
// The returned message *can* be read several times safely
func NewMessage(msg *nats.Msg) *Message {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it would be nice to have unit tests on this file.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

done

Conn *nats.Conn

Consumer *Consumer
consumerOptions []ConsumerOption
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is strange to store the connection options in the struct if you do not intend to use them again. It would be better to not need to store the options for consumerOptions and senderOptions as they give you no value past the constructor.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

discarded

Sender *Sender
senderOptions []SenderOption

connOwned bool // whether this protocol created the stan connection
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo? stan->nats?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, typo...

return nil, err
}

c.connOwned = true
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this should be moved to line 70? if the NewConsumerFromConn fails, we leak a connection.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we can't, for c.connOwned = true must be called after c,err:=NewConsumerFromConn

c.Conn.Close()
}

close(c.internalClose)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is strange to close a channel the *Consumer resource did not create, that should be the responsibility of the observer, and doing this will case a panic if Close is called again on c.

incoming chan msgErr
}

func NewReceiver() *Receiver {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what is this used for?

Copy link
Contributor Author

@zhaojizhuang zhaojizhuang Jul 14, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewReceiver creates a new protocol.Receiver responsible for receiving messages, an implements of protocol.Receiver, used by Consumer

connOwned bool
}

func NewConsumer(url, stream, subject string, natsOpts []nats.Option, jsmOpts []nats.JSOpt, subOpts []nats.SubOpt, opts ...ConsumerOption) (*Consumer, error) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

when should I make a NewConsumer vs NewReceiver?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NewConsumer is used by Nats subscriber,while NewReceiver is used by NewConsumer, like kafka_sarama nats``stan and so on

Signed-off-by: zhaojizhuang <571130360@qq.com>
@zhaojizhuang
Copy link
Contributor Author

@n3wscott done

Copy link
Member

@n3wscott n3wscott left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you very much for following up on so many review comments! nice work!!!

LGTM/APPROVE.

@zhaojizhuang
Copy link
Contributor Author

/retest

@n3wscott n3wscott merged commit 49fda7a into cloudevents:main Jul 15, 2021
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Feature request: support jetstream in protocol
4 participants