Skip to content

Commit

Permalink
[release-v1.15] Allow enabling sarama logging and disabling client po…
Browse files Browse the repository at this point in the history
…ol and Upgrade knative.dev/eventing to latest 1.15 (#1278)

* Allow enabling sarama logging and disabling client pool (knative-extensions#4103) (knative-extensions#4108)

Add 3 new environment variables:
```
ENABLE_SARAMA_LOGGER       (default: false)
ENABLE_SARAMA_DEBUG_LOGGER (default: false)
ENABLE_SARAMA_CLIENT_POOL  (default: true)
```

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Upgrade knative.dev/eventing to latest 1.15 (knative-extensions#4112)

* Upgrade knative.dev/eventing to latest 1.15

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Pass PodLister as expected

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

* Migrate to library SchedulerFunc to use the new signature

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>

---------

Signed-off-by: Pierangelo Di Pilato <pierdipi@redhat.com>
  • Loading branch information
pierDipi authored Sep 26, 2024
1 parent 4ed8dce commit 1f54e37
Show file tree
Hide file tree
Showing 20 changed files with 338 additions and 240 deletions.
14 changes: 13 additions & 1 deletion control-plane/cmd/kafka-controller/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ package main
import (
"context"
"log"
"os"
"strings"

"github.com/IBM/sarama"
filteredFactory "knative.dev/pkg/client/injection/kube/informers/factory/filtered"
"knative.dev/pkg/configmap"
"knative.dev/pkg/controller"
Expand Down Expand Up @@ -70,7 +73,16 @@ func main() {
auth.OIDCLabelSelector,
eventing.DispatcherLabelSelectorStr,
)
ctx = clientpool.WithKafkaClientPool(ctx)

if v := os.Getenv("ENABLE_SARAMA_LOGGER"); strings.EqualFold(v, "true") {
sarama.Logger = log.New(os.Stdout, "[sarama] ", log.LstdFlags|log.Llongfile)
}
if v := os.Getenv("ENABLE_SARAMA_DEBUG_LOGGER"); strings.EqualFold(v, "true") {
sarama.DebugLogger = log.New(os.Stdout, "[sarama][debug] ", log.LstdFlags|log.Llongfile)
}
if v := os.Getenv("ENABLE_SARAMA_CLIENT_POOL"); v == "" || strings.EqualFold(v, "true") {
ctx = clientpool.WithKafkaClientPool(ctx)
}

sharedmain.MainNamed(ctx, component,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,12 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: ENABLE_SARAMA_LOGGER
value: "false"
- name: ENABLE_SARAMA_DEBUG_LOGGER
value: "false"
- name: ENABLE_SARAMA_CLIENT_POOL
value: "true"

ports:
- containerPort: 9090
Expand Down
28 changes: 25 additions & 3 deletions control-plane/pkg/kafka/clientpool/clientpool.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,11 @@ import (
"go.uber.org/zap"

corev1 "k8s.io/api/core/v1"
"knative.dev/pkg/logging"

"knative.dev/eventing-kafka-broker/control-plane/pkg/kafka"
"knative.dev/eventing-kafka-broker/control-plane/pkg/prober"
"knative.dev/eventing-kafka-broker/control-plane/pkg/security"
"knative.dev/pkg/logging"
)

type KafkaClientKey struct{}
Expand Down Expand Up @@ -63,8 +64,21 @@ type ClientPool struct {
}

type GetKafkaClientFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error)

type GetKafkaClusterAdminFunc func(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error)

func DisabledGetKafkaClusterAdminFunc(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.ClusterAdmin, error) {
c, err := makeSaramaClient(bootstrapServers, secret, sarama.NewClient)
if err != nil {
return nil, err
}
return sarama.NewClusterAdminFromClient(c)
}

func DisabledGetClient(_ context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
return makeSaramaClient(bootstrapServers, secret, sarama.NewClient)
}

func (cp *ClientPool) GetClient(ctx context.Context, bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
client, err := cp.getClient(ctx, bootstrapServers, secret)
if err != nil {
Expand Down Expand Up @@ -141,7 +155,11 @@ func (cp *ClientPool) GetClusterAdmin(ctx context.Context, bootstrapServers []st
}

func Get(ctx context.Context) *ClientPool {
return ctx.Value(ctxKey).(*ClientPool)
v := ctx.Value(ctxKey)
if v == nil {
return nil
}
return v.(*ClientPool)
}

func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clientKey {
Expand All @@ -162,6 +180,10 @@ func makeClusterAdminKey(bootstrapServers []string, secret *corev1.Secret) clien
}

func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1.Secret) (sarama.Client, error) {
return makeSaramaClient(bootstrapServers, secret, cp.newSaramaClient)
}

func makeSaramaClient(bootstrapServers []string, secret *corev1.Secret, newSaramaClient kafka.NewClientFunc) (sarama.Client, error) {
secretOpt, err := security.NewSaramaSecurityOptionFromSecret(secret)
if err != nil {
return nil, err
Expand All @@ -172,7 +194,7 @@ func (cp *ClientPool) makeSaramaClient(bootstrapServers []string, secret *corev1
return nil, err
}

saramaClient, err := cp.newSaramaClient(bootstrapServers, config)
saramaClient, err := newSaramaClient(bootstrapServers, config)
if err != nil {
return nil, err
}
Expand Down
18 changes: 11 additions & 7 deletions control-plane/pkg/reconciler/broker/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,8 +65,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
configmapInformer := configmapinformer.Get(ctx)
featureFlags := apisconfig.DefaultFeaturesConfig()

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -79,11 +77,17 @@ func NewController(ctx context.Context, watcher configmap.Watcher, env *config.E
DispatcherLabel: base.BrokerDispatcherLabel,
ReceiverLabel: base.BrokerReceiverLabel,
},
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
ConfigMapLister: configmapInformer.Lister(),
Env: env,
Counter: counter.NewExpiringCounter(ctx),
KafkaFeatureFlags: featureFlags,
ConfigMapLister: configmapInformer.Lister(),
Env: env,
Counter: counter.NewExpiringCounter(ctx),
KafkaFeatureFlags: featureFlags,
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

logger := logging.FromContext(ctx)
Expand Down
10 changes: 7 additions & 3 deletions control-plane/pkg/reconciler/broker/namespaced_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
logger.Fatal("unable to create Manifestival client-go client", zap.Error(err))
}

clientPool := clientpool.Get(ctx)

reconciler := &NamespacedReconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -103,7 +101,6 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
DispatcherLabel: base.BrokerDispatcherLabel,
ReceiverLabel: base.BrokerReceiverLabel,
},
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
NamespaceLister: namespaceinformer.Get(ctx).Lister(),
ConfigMapLister: configmapInformer.Lister(),
ServiceAccountLister: serviceaccountinformer.Get(ctx).Lister(),
Expand All @@ -119,6 +116,13 @@ func NewNamespacedController(ctx context.Context, watcher configmap.Watcher, env
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

impl := brokerreconciler.NewImpl(ctx, reconciler, kafka.NamespacedBrokerClass, func(impl *controller.Impl) controller.Options {
return controller.Options{PromoteFilterFunc: kafka.NamespacedBrokerClassFilter()}
})
Expand Down
24 changes: 14 additions & 10 deletions control-plane/pkg/reconciler/channel/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,8 +67,6 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf

messagingv1beta.RegisterAlternateKafkaChannelConditionSet(conditionSet)

clientPool := clientpool.Get(ctx)

reconciler := &Reconciler{
Reconciler: &base.Reconciler{
KubeClient: kubeclient.Get(ctx),
Expand All @@ -80,14 +78,20 @@ func NewController(ctx context.Context, watcher configmap.Watcher, configs *conf
DataPlaneNamespace: configs.SystemNamespace,
ReceiverLabel: base.ChannelReceiverLabel,
},
GetKafkaClusterAdmin: clientPool.GetClusterAdmin,
Env: configs,
ConfigMapLister: configmapInformer.Lister(),
ServiceLister: serviceinformer.Get(ctx).Lister(),
SubscriptionLister: subscriptioninformer.Get(ctx).Lister(),
ConsumerGroupLister: consumerGroupInformer.Lister(),
InternalsClient: consumergroupclient.Get(ctx),
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
ConsumerGroupLister: consumerGroupInformer.Lister(),
InternalsClient: consumergroupclient.Get(ctx),
Env: configs,
ConfigMapLister: configmapInformer.Lister(),
ServiceLister: serviceinformer.Get(ctx).Lister(),
SubscriptionLister: subscriptioninformer.Get(ctx).Lister(),
KafkaFeatureFlags: apisconfig.DefaultFeaturesConfig(),
}

clientPool := clientpool.Get(ctx)
if clientPool == nil {
reconciler.GetKafkaClusterAdmin = clientpool.DisabledGetKafkaClusterAdminFunc
} else {
reconciler.GetKafkaClusterAdmin = clientPool.GetClusterAdmin
}

logger := logging.FromContext(ctx)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -449,7 +449,7 @@ func (r *Reconciler) schedule(ctx context.Context, cg *kafkainternals.ConsumerGr
return cg.MarkScheduleConsumerFailed("Schedule", err)
}

placements, err := statefulSetScheduler.Schedule(cg)
placements, err := statefulSetScheduler.Schedule(ctx, cg)
if err != nil {
return cg.MarkScheduleConsumerFailed("Schedule", err)
}
Expand Down
Loading

0 comments on commit 1f54e37

Please sign in to comment.