Skip to content

Commit

Permalink
Update of unit tests to use KRaft - part 6 (#10436)
Browse files Browse the repository at this point in the history
Signed-off-by: Jakub Scholz <www@scholzj.com>
  • Loading branch information
scholzj committed Aug 12, 2024
1 parent f3893c8 commit 7a1db8a
Show file tree
Hide file tree
Showing 6 changed files with 1,314 additions and 691 deletions.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -41,28 +41,26 @@ public class KafkaAssemblyOperatorNodePoolWatcherTest {
private static final String NAMESPACE = "my-namespace";
private static final String CLUSTER_NAME = "my-cluster";
private static final Kafka KAFKA = new KafkaBuilder()
.withNewMetadata()
.withName(CLUSTER_NAME)
.withNamespace(NAMESPACE)
.withLabels(Map.of("selector", "matching"))
.withAnnotations(Map.of(Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled"))
.endMetadata()
.withNewSpec()
.withNewKafka()
.withListeners(new GenericKafkaListenerBuilder()
.withName("plain")
.withPort(9092)
.withType(KafkaListenerType.INTERNAL)
.withTls(false)
.build())
.endKafka()
.withNewZookeeper()
.withReplicas(3)
.withNewEphemeralStorage()
.endEphemeralStorage()
.endZookeeper()
.endSpec()
.build();
.withNewMetadata()
.withName(CLUSTER_NAME)
.withNamespace(NAMESPACE)
.withLabels(Map.of("selector", "matching"))
.withAnnotations(Map.of(
Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled",
Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled"
))
.endMetadata()
.withNewSpec()
.withNewKafka()
.withListeners(new GenericKafkaListenerBuilder()
.withName("plain")
.withPort(9092)
.withType(KafkaListenerType.INTERNAL)
.withTls(false)
.build())
.endKafka()
.endSpec()
.build();

private final static KafkaNodePool POOL = new KafkaNodePoolBuilder()
.withNewMetadata()
Expand All @@ -76,7 +74,7 @@ public class KafkaAssemblyOperatorNodePoolWatcherTest {
.withNewJbodStorage()
.withVolumes(new PersistentClaimStorageBuilder().withId(0).withSize("100Gi").build())
.endJbodStorage()
.withRoles(ProcessRoles.BROKER)
.withRoles(ProcessRoles.CONTROLLER, ProcessRoles.BROKER)
.withResources(new ResourceRequirementsBuilder().withRequests(Map.of("cpu", new Quantity("4"))).build())
.endSpec()
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
import io.strimzi.operator.cluster.operator.resource.kubernetes.ClusterRoleBindingOperator;
import io.strimzi.operator.cluster.operator.resource.kubernetes.CrdOperator;
import io.strimzi.operator.common.Annotations;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.InvalidResourceException;
import io.strimzi.operator.common.model.PasswordGenerator;
import io.strimzi.platform.KubernetesVersion;
import io.vertx.core.Future;
Expand All @@ -40,11 +40,9 @@
import java.util.Map;
import java.util.stream.Collectors;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.times;
Expand All @@ -53,19 +51,20 @@

@ExtendWith(VertxExtension.class)
public class KafkaAssemblyOperatorNonParametrizedTest {

public static final String NAMESPACE = "test";
public static final String NAME = "my-kafka";
private static Vertx vertx;
private static WorkerExecutor sharedWorkerExecutor;
private final OpenSslCertManager certManager = new OpenSslCertManager();
private final PasswordGenerator passwordGenerator = new PasswordGenerator(12,
private static final String NAMESPACE = "test";
private static final String NAME = "my-kafka";
private static final OpenSslCertManager CERT_MANAGER = new OpenSslCertManager();
@SuppressWarnings("SpellCheckingInspection")
private static final PasswordGenerator PASSWORD_GENERATOR = new PasswordGenerator(12,
"abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ",
"abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
"0123456789");

private static Vertx vertx;
private static WorkerExecutor sharedWorkerExecutor;

@BeforeAll
public static void before() {
vertx = Vertx.vertx();
Expand All @@ -85,7 +84,7 @@ public void testDeleteClusterRoleBindings(VertxTestContext context) {
ArgumentCaptor<ClusterRoleBinding> desiredCrb = ArgumentCaptor.forClass(ClusterRoleBinding.class);
when(mockCrbOps.reconcile(any(), eq(KafkaResources.initContainerClusterRoleBindingName(NAME, NAMESPACE)), desiredCrb.capture())).thenReturn(Future.succeededFuture());

KafkaAssemblyOperator op = new KafkaAssemblyOperator(vertx, new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), certManager, passwordGenerator,
KafkaAssemblyOperator op = new KafkaAssemblyOperator(vertx, new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), CERT_MANAGER, PASSWORD_GENERATOR,
supplier, new ClusterOperatorConfig.ClusterOperatorConfigBuilder(ResourceUtils.dummyClusterOperatorConfig(), KafkaVersionTestUtils.getKafkaVersionLookup()).with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "1").build());
Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME);

Expand All @@ -104,20 +103,22 @@ public void testDeleteClusterRoleBindings(VertxTestContext context) {
public void testSelectorLabels(VertxTestContext context) {
Kafka kafka = new KafkaBuilder()
.withNewMetadata()
.withName(NAME)
.withNamespace(NAMESPACE)
.withName(NAME)
.withNamespace(NAMESPACE)
.withAnnotations(Map.of(
Annotations.ANNO_STRIMZI_IO_NODE_POOLS, "enabled",
Annotations.ANNO_STRIMZI_IO_KRAFT, "enabled"
))
.endMetadata()
.withNewSpec()
.withNewKafka()
.withReplicas(3)
.withNewEphemeralStorage()
.endEphemeralStorage()
.withListeners(new GenericKafkaListenerBuilder()
.withName("plain")
.withPort(9092)
.withType(KafkaListenerType.INTERNAL)
.withTls(false)
.build())
.endKafka()
.withNewZookeeper()
.withReplicas(3)
.withNewEphemeralStorage()
.endEphemeralStorage()
.endZookeeper()
.endSpec()
.build();

Expand All @@ -133,7 +134,7 @@ public void testSelectorLabels(VertxTestContext context) {
.with(ClusterOperatorConfig.OPERATION_TIMEOUT_MS.key(), "120000")
.with(ClusterOperatorConfig.CUSTOM_RESOURCE_SELECTOR.key(), Map.of("selectorLabel", "value").entrySet().stream().map(e -> e.getKey() + "=" + e.getValue()).collect(Collectors.joining(","))).build();

KafkaAssemblyOperator op = new KafkaAssemblyOperator(vertx, new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), certManager, passwordGenerator,
KafkaAssemblyOperator op = new KafkaAssemblyOperator(vertx, new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION), CERT_MANAGER, PASSWORD_GENERATOR,
supplier, config);
Reconciliation reconciliation = new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME);

Expand All @@ -156,57 +157,4 @@ public void testSelectorLabels(VertxTestContext context) {
async.flag();
})));
}

/**
* Tests that KRaft cluster cannot be deployed without using NodePools
*
* @param context Test context
*/
@Test
public void testOptionalCustomResourceFieldsValidation(VertxTestContext context) {
Kafka kafka = new KafkaBuilder()
.withNewMetadata()
.withName(NAME)
.withNamespace(NAMESPACE)
.endMetadata()
.withNewSpec()
.withNewKafka()
.withListeners(new GenericKafkaListenerBuilder()
.withName("listener")
.withPort(9092)
.withTls(true)
.withType(KafkaListenerType.INTERNAL)
.withNewKafkaListenerAuthenticationTlsAuth()
.endKafkaListenerAuthenticationTlsAuth()
.build())
.endKafka()
.endSpec()
.build();

ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false);

CrdOperator<KubernetesClient, Kafka, KafkaList> mockKafkaOps = supplier.kafkaOperator;
when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(NAME))).thenReturn(Future.succeededFuture(kafka));

ClusterOperatorConfig config = ResourceUtils.dummyClusterOperatorConfig(KafkaVersionTestUtils.getKafkaVersionLookup());

KafkaAssemblyOperator kao = new KafkaAssemblyOperator(
vertx, new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION),
certManager,
passwordGenerator,
supplier,
config);

Checkpoint async = context.checkpoint();
kao.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME))
.onComplete(context.failing(v -> context.verify(() -> {
assertThat(v, instanceOf(InvalidResourceException.class));

assertThat(v.getMessage(), containsString("The .spec.zookeeper section of the Kafka custom resource is missing. This section is required for a ZooKeeper-based cluster."));
assertThat(v.getMessage(), containsString("The .spec.kafka.replicas property of the Kafka custom resource is missing. This property is required for a ZooKeeper-based Kafka cluster that is not using Node Pools."));
assertThat(v.getMessage(), containsString("The .spec.kafka.storage section of the Kafka custom resource is missing. This section is required for a ZooKeeper-based Kafka cluster that is not using Node Pools."));

async.flag();
})));
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright Strimzi authors.
* License: Apache License 2.0 (see the file LICENSE or http://apache.org/licenses/LICENSE-2.0.html).
*/
package io.strimzi.operator.cluster.operator.assembly;

import io.fabric8.kubernetes.client.KubernetesClient;
import io.strimzi.api.kafka.model.kafka.Kafka;
import io.strimzi.api.kafka.model.kafka.KafkaBuilder;
import io.strimzi.api.kafka.model.kafka.KafkaList;
import io.strimzi.api.kafka.model.kafka.listener.GenericKafkaListenerBuilder;
import io.strimzi.api.kafka.model.kafka.listener.KafkaListenerType;
import io.strimzi.certs.OpenSslCertManager;
import io.strimzi.operator.cluster.ClusterOperatorConfig;
import io.strimzi.operator.cluster.KafkaVersionTestUtils;
import io.strimzi.operator.cluster.PlatformFeaturesAvailability;
import io.strimzi.operator.cluster.ResourceUtils;
import io.strimzi.operator.cluster.operator.resource.ResourceOperatorSupplier;
import io.strimzi.operator.cluster.operator.resource.kubernetes.CrdOperator;
import io.strimzi.operator.common.Reconciliation;
import io.strimzi.operator.common.model.InvalidResourceException;
import io.strimzi.operator.common.model.PasswordGenerator;
import io.strimzi.platform.KubernetesVersion;
import io.vertx.core.Future;
import io.vertx.core.Vertx;
import io.vertx.core.WorkerExecutor;
import io.vertx.junit5.Checkpoint;
import io.vertx.junit5.VertxExtension;
import io.vertx.junit5.VertxTestContext;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;

import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.when;

@ExtendWith(VertxExtension.class)
public class KafkaAssemblyOperatorNonParametrizedZooBasedTest {
private static final String NAMESPACE = "test";
private static final String NAME = "my-kafka";
private static final OpenSslCertManager CERT_MANAGER = new OpenSslCertManager();
@SuppressWarnings("SpellCheckingInspection")
private static final PasswordGenerator PASSWORD_GENERATOR = new PasswordGenerator(12,
"abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ",
"abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ" +
"0123456789");

private static Vertx vertx;
private static WorkerExecutor sharedWorkerExecutor;

@BeforeAll
public static void before() {
vertx = Vertx.vertx();
sharedWorkerExecutor = vertx.createSharedWorkerExecutor("kubernetes-ops-pool");
}

@AfterAll
public static void after() {
sharedWorkerExecutor.close();
vertx.close();
}

/**
* Tests that KRaft cluster cannot be deployed without using NodePools
*
* @param context Test context
*/
@Test
public void testOptionalCustomResourceFieldsValidation(VertxTestContext context) {
Kafka kafka = new KafkaBuilder()
.withNewMetadata()
.withName(NAME)
.withNamespace(NAMESPACE)
.endMetadata()
.withNewSpec()
.withNewKafka()
.withListeners(new GenericKafkaListenerBuilder()
.withName("listener")
.withPort(9092)
.withTls(true)
.withType(KafkaListenerType.INTERNAL)
.withNewKafkaListenerAuthenticationTlsAuth()
.endKafkaListenerAuthenticationTlsAuth()
.build())
.endKafka()
.endSpec()
.build();

ResourceOperatorSupplier supplier = ResourceUtils.supplierWithMocks(false);

CrdOperator<KubernetesClient, Kafka, KafkaList> mockKafkaOps = supplier.kafkaOperator;
when(mockKafkaOps.getAsync(eq(NAMESPACE), eq(NAME))).thenReturn(Future.succeededFuture(kafka));

ClusterOperatorConfig config = ResourceUtils.dummyClusterOperatorConfig(KafkaVersionTestUtils.getKafkaVersionLookup());

KafkaAssemblyOperator kao = new KafkaAssemblyOperator(
vertx, new PlatformFeaturesAvailability(false, KubernetesVersion.MINIMAL_SUPPORTED_VERSION),
CERT_MANAGER,
PASSWORD_GENERATOR,
supplier,
config);

Checkpoint async = context.checkpoint();
kao.reconcile(new Reconciliation("test-trigger", Kafka.RESOURCE_KIND, NAMESPACE, NAME))
.onComplete(context.failing(v -> context.verify(() -> {
assertThat(v, instanceOf(InvalidResourceException.class));

assertThat(v.getMessage(), containsString("The .spec.zookeeper section of the Kafka custom resource is missing. This section is required for a ZooKeeper-based cluster."));
assertThat(v.getMessage(), containsString("The .spec.kafka.replicas property of the Kafka custom resource is missing. This property is required for a ZooKeeper-based Kafka cluster that is not using Node Pools."));
assertThat(v.getMessage(), containsString("The .spec.kafka.storage section of the Kafka custom resource is missing. This section is required for a ZooKeeper-based Kafka cluster that is not using Node Pools."));

async.flag();
})));
}
}

0 comments on commit 7a1db8a

Please sign in to comment.