Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pubsub subscription filtering support #737

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions protocol/pubsub/v2/internal/connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,12 @@ type Connection struct {
// MessageOrdering enables message ordering for all topics and subscriptions.
// This can only be set prior to first call of any function.
MessageOrdering bool
// Filter is an expression written in the Cloud Pub/Sub filter language. If
// non-empty, then only `PubsubMessage`s whose `attributes` field matches the
// filter are delivered on this subscription. If empty, then no messages are
// filtered out. Cannot be changed after the subscription is created.
// This can only be set prior to first call of any function.
Filter string
}

const (
Expand Down Expand Up @@ -234,6 +240,7 @@ func (c *Connection) getOrCreateSubscriptionInfo(ctx context.Context, getAlready
AckDeadline: *c.AckDeadline,
RetentionDuration: *c.RetentionDuration,
EnableMessageOrdering: c.MessageOrdering,
Filter: c.Filter,
})
if si.err != nil {
return
Expand Down
58 changes: 58 additions & 0 deletions protocol/pubsub/v2/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,37 @@ func WithSubscriptionAndTopicID(subscriptionID, topicID string) Option {
}
}

// WithSubscriptionIDAndFilter sets the subscription and topic IDs for pubsub transport.
// This option can be used multiple times.
func WithSubscriptionIDAndFilter(subscriptionID, filter string) Option {
return func(t *Protocol) error {
if t.subscriptions == nil {
t.subscriptions = make([]subscriptionWithTopic, 0)
}
t.subscriptions = append(t.subscriptions, subscriptionWithTopic{
subscriptionID: subscriptionID,
filter: filter,
})
return nil
}
}

// WithSubscriptionTopicIDAndFilter sets the subscription with filter option and topic IDs for pubsub transport.
// This option can be used multiple times.
func WithSubscriptionTopicIDAndFilter(subscriptionID, topicID, filter string) Option {
return func(t *Protocol) error {
if t.subscriptions == nil {
t.subscriptions = make([]subscriptionWithTopic, 0)
}
t.subscriptions = append(t.subscriptions, subscriptionWithTopic{
subscriptionID: subscriptionID,
topicID: topicID,
filter: filter,
})
return nil
}
}

// WithSubscriptionIDFromEnv sets the subscription ID for pubsub transport from
// a given environment variable name.
func WithSubscriptionIDFromEnv(key string) Option {
Expand All @@ -135,6 +166,33 @@ func WithSubscriptionIDFromDefaultEnv() Option {
return WithSubscriptionIDFromEnv(DefaultSubscriptionEnvKey)
}

// WithFilter sets the subscription filter for pubsub transport.
func WithFilter(filter string) Option {
return func(t *Protocol) error {
if t.subscriptions == nil {
t.subscriptions = make([]subscriptionWithTopic, 0)
}
t.subscriptions = append(t.subscriptions, subscriptionWithTopic{
filter: filter,
})
return nil
}
}

// WithFilterFromEnv sets the subscription filter for pubsub transport from
// a given environment variable name.
func WithFilterFromEnv(key string) Option {
return func(t *Protocol) error {
v := os.Getenv(key)
if v == "" {
return fmt.Errorf("unable to load subscription filter, %q environment variable not set", key)
}

opt := WithFilter(v)
return opt(t)
}
}

// AllowCreateTopic sets if the transport can create a topic if it does not
// exist.
func AllowCreateTopic(allow bool) Option {
Expand Down
1 change: 1 addition & 0 deletions protocol/pubsub/v2/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const (
type subscriptionWithTopic struct {
topicID string
subscriptionID string
filter string
}

// Protocol acts as both a pubsub topic and a pubsub subscription .
Expand Down