Skip to content

Commit

Permalink
Add kafka client pool (#3493)
Browse files Browse the repository at this point in the history
* Add kafka client pool

Signed-off-by: Calum Murray <cmurray@redhat.com>

* Use clientpool for consumergroups

Signed-off-by: Calum Murray <cmurray@redhat.com>

* Handle nil secret case

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: kafka connections aren't closed while in use, max connections is properly enforced

Signed-off-by: Calum Murray <cmurray@redhat.com>

* Fix clientpool concurrency issues

Signed-off-by: Calum Murray <cmurray@redhat.com>

* Switch remaining sarama.NewClient and sarama.NewClusterAdmin references to use the clientpool

Signed-off-by: Calum Murray <cmurray@redhat.com>

* updated codegen

Signed-off-by: Calum Murray <cmurray@redhat.com>

* feat: added replicating lru cache

Signed-off-by: Calum Murray <cmurray@redhat.com>

* test: added unit tests for replicating lru cache

Signed-off-by: Calum Murray <cmurray@redhat.com>

* feat: use new lru cache for the client pool

Signed-off-by: Calum Murray <cmurray@redhat.com>

* updated deps

Signed-off-by: Calum Murray <cmurray@redhat.com>

* feat: connections are re-created when secrets are rotated

Signed-off-by: Calum Murray <cmurray@redhat.com>

* refactor to use channels

Signed-off-by: Calum Murray <cmurray@redhat.com>

* Set better defaults for max clients, switched to use generics for better typing

Signed-off-by: Calum Murray <cmurray@redhat.com>

* small fixes

Signed-off-by: Calum Murray <cmurray@redhat.com>

* take simpler approach, fix go.mod

Signed-off-by: Calum Murray <cmurray@redhat.com>

* lots of changes, need to test

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix deadlock

Signed-off-by: Calum Murray <cmurray@redhat.com>

* always return the value to the cache if something went wrong

Signed-off-by: Calum Murray <cmurray@redhat.com>

* more fixes

Signed-off-by: Calum Murray <cmurray@redhat.com>

* add a lot of logging for debugging

Signed-off-by: Calum Murray <cmurray@redhat.com>

* removed extra logs

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix linter warnings

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix build issues

Signed-off-by: Calum Murray <cmurray@redhat.com>

* added more unit tests, refactored a bit to improve testability

Signed-off-by: Calum Murray <cmurray@redhat.com>

* Add more unit tests

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix unit tests

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix import styling

Signed-off-by: Calum Murray <cmurray@redhat.com>

* increase base client capacity

Signed-off-by: Calum Murray <cmurray@redhat.com>

* use zero sized chan for capacity

Signed-off-by: Calum Murray <cmurray@redhat.com>

* small fixes for client cleanup overloading

Signed-off-by: Calum Murray <cmurray@redhat.com>

* made the expiring cache more generic

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fixes to the prober cache to make it more generic

Signed-off-by: Calum Murray <cmurray@redhat.com>

* small clientpool fixes, as well as refactoring the reconcilers for better error checking of broken pipes

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix double secret handler mistake

Signed-off-by: Calum Murray <cmurray@redhat.com>

* goimports

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix double imports error

Signed-off-by: Calum Murray <cmurray@redhat.com>

* refactor to attach clientpool to context rather than use global variable

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: unit tests now have clientpool attached to ctx

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: namespaced trigger controller tests also have clientpool on ctx

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fixed double import after merge

Signed-off-by: Calum Murray <cmurray@redhat.com>

* refactor: use proxy sarama client instead of return client funcs

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: cache test now expects updated behaviour

Signed-off-by: Calum Murray <cmurray@redhat.com>

* fix: fixed nil pointer exceptions and topic offset fetch

Signed-off-by: Calum Murray <cmurray@redhat.com>

* cleanup: removed unnecessary log

Signed-off-by: Calum Murray <cmurray@redhat.com>

* address review comments

Signed-off-by: Calum Murray <cmurray@redhat.com>

---------

Signed-off-by: Calum Murray <cmurray@redhat.com>
  • Loading branch information
Cali0707 authored Feb 27, 2024
1 parent 7d1982c commit bd4ecfa
Show file tree
Hide file tree
Showing 44 changed files with 855 additions and 325 deletions.
2 changes: 2 additions & 0 deletions control-plane/cmd/kafka-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"knative.dev/eventing/pkg/eventingtls"

"knative.dev/eventing-kafka-broker/control-plane/pkg/config"
"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka/clientpool"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/broker"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/channel"
"knative.dev/eventing-kafka-broker/control-plane/pkg/reconciler/consumer"
Expand Down Expand Up @@ -64,6 +65,7 @@ func main() {
ctx = filteredFactory.WithSelectors(ctx,
eventingtls.TrustBundleLabelSelector,
)
ctx = clientpool.WithKafkaClientPool(ctx)

sharedmain.MainNamed(ctx, component,

Expand Down
10 changes: 5 additions & 5 deletions control-plane/pkg/counter/counter.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,20 +24,20 @@ import (
)

type Counter struct {
cache prober.Cache
cache prober.Cache[string, int, struct{}]
}

func NewExpiringCounter(ctx context.Context) *Counter {
cache := prober.NewLocalExpiringCache(ctx, 10*time.Minute)
cache := prober.NewLocalExpiringCache[string, int, struct{}](ctx, 10*time.Minute)
return &Counter{
cache: cache,
}
}

func (c *Counter) Inc(uuid string) int {
value := int(c.cache.GetStatus(uuid))
c.cache.UpsertStatus(uuid, prober.Status(value+1), nil, func(key string, arg interface{}) {})
return value
value, _ := c.cache.Get(uuid)
c.cache.UpsertStatus(uuid, value+1, struct{}{}, func(key string, value int, arg struct{}) {})
return value + 1
}

func (c *Counter) Del(uuid string) {
Expand Down
3 changes: 3 additions & 0 deletions control-plane/pkg/kafka/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type NewClusterAdminClientFunc func(addrs []string, config *sarama.Config) (sara
// NewClientFunc creates new sarama.Client.
type NewClientFunc func(addrs []string, config *sarama.Config) (sarama.Client, error)

// NewClusterAdminFromClientFunc creates new sarama.ClusterAdmin from sarama.Client
type NewClusterAdminFromClientFunc func(sarama.Client) (sarama.ClusterAdmin, error)

// GetSaramaConfig returns Kafka Client configuration with the given options applied.
func GetSaramaConfig(configOptions ...ConfigOption) (*sarama.Config, error) {
config := sarama.NewConfig()
Expand Down
233 changes: 233 additions & 0 deletions control-plane/pkg/kafka/clientpool/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,233 @@
/*
* Copyright 2023 The Knative Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package clientpool

import (
"sync"

"github.com/IBM/sarama"
corev1 "k8s.io/api/core/v1"
)

// client is a proxy for sarama.Client
//
// It keeps track of callers that are actively using the client using incrementCallers and Close()
type client struct {
client sarama.Client

isFatalError func(err error) bool
onFatalError func(err error)

callersWg sync.WaitGroup

secret *corev1.Secret
}

var _ sarama.Client = &client{}

func (c *client) Config() *sarama.Config {
return c.client.Config()
}

func (c *client) Controller() (*sarama.Broker, error) {
x, err := c.client.Controller()
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, err
}

func (c *client) RefreshController() (*sarama.Broker, error) {
x, err := c.client.RefreshController()
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, err
}

func (c *client) Brokers() []*sarama.Broker {
return c.client.Brokers()
}

func (c *client) Broker(brokerID int32) (*sarama.Broker, error) {
x, err := c.client.Broker(brokerID)
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, err
}

func (c *client) Topics() ([]string, error) {
x, err := c.client.Topics()
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, err
}

func (c *client) Partitions(topic string) ([]int32, error) {
x, err := c.client.Partitions(topic)
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, err
}

func (c *client) WritablePartitions(topic string) ([]int32, error) {
x, err := c.client.WritablePartitions(topic)
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, err
}

func (c *client) Leader(topic string, partitionID int32) (*sarama.Broker, error) {
x, err := c.client.Leader(topic, partitionID)
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, err
}

func (c *client) LeaderAndEpoch(topic string, partitionID int32) (*sarama.Broker, int32, error) {
x, y, err := c.client.LeaderAndEpoch(topic, partitionID)
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, y, err
}

func (c *client) Replicas(topic string, partitionID int32) ([]int32, error) {
x, err := c.client.Replicas(topic, partitionID)
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, err
}

func (c *client) InSyncReplicas(topic string, partitionID int32) ([]int32, error) {
x, err := c.client.InSyncReplicas(topic, partitionID)
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, err
}

func (c *client) OfflineReplicas(topic string, partitionID int32) ([]int32, error) {
x, err := c.client.OfflineReplicas(topic, partitionID)
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, err
}

func (c *client) RefreshBrokers(addrs []string) error {
err := c.client.RefreshBrokers(addrs)
if c.isFatalError(err) {
c.onFatalError(err)
}
return err
}

func (c *client) RefreshMetadata(topics ...string) error {
err := c.client.RefreshMetadata(topics...)
if c.isFatalError(err) {
c.onFatalError(err)
}
return err
}

func (c *client) GetOffset(topic string, partitionID int32, time int64) (int64, error) {
x, err := c.client.GetOffset(topic, partitionID, time)
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, err
}

func (c *client) Coordinator(consumerGroup string) (*sarama.Broker, error) {
x, err := c.client.Coordinator(consumerGroup)
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, err
}

func (c *client) RefreshCoordinator(consumerGroup string) error {
err := c.client.RefreshCoordinator(consumerGroup)
if c.isFatalError(err) {
c.onFatalError(err)
}
return err
}

func (c *client) TransactionCoordinator(transactionID string) (*sarama.Broker, error) {
x, err := c.client.TransactionCoordinator(transactionID)
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, err
}

func (c *client) RefreshTransactionCoordinator(transationID string) error {
err := c.client.RefreshTransactionCoordinator(transationID)
if c.isFatalError(err) {
c.onFatalError(err)
}
return err
}

func (c *client) InitProducerID() (*sarama.InitProducerIDResponse, error) {
x, err := c.client.InitProducerID()
if c.isFatalError(err) {
c.onFatalError(err)
}
return x, err
}

func (c *client) LeastLoadedBroker() *sarama.Broker {
return c.client.LeastLoadedBroker()
}

func (c *client) Close() error {
if c.client.Closed() {
return sarama.ErrClosedClient
}

c.callersWg.Done()
return nil
}

func (c *client) Closed() bool {
return c.client.Closed()
}

func (c *client) incrementCallers() {
c.callersWg.Add(1)
}

func (c *client) hasCorrectSecretVersion(secret *corev1.Secret) bool {
if c.secret == nil {
return secret == nil
}

if secret == nil {
return false
}

return c.secret.ResourceVersion == secret.ResourceVersion
}
Loading

0 comments on commit bd4ecfa

Please sign in to comment.