diff --git a/Makefile b/Makefile new file mode 100644 index 0000000..a8e4797 --- /dev/null +++ b/Makefile @@ -0,0 +1,210 @@ +# ────────────────────────────────────────────────────────────────────────────── +# Kafka cluster management — Strimzi / k3s +# Requires: kubectl + helm configured for the target cluster +# ────────────────────────────────────────────────────────────────────────────── + +NAMESPACE := kafka +BROKER := kafka-dual-role-0 +BOOTSTRAP := kafka-kafka-bootstrap.$(NAMESPACE).svc.cluster.local:9093 +SCRIPT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST)))) +KAFKA_NAME := kafka + +# Overridable per invocation +TOPIC ?= smoke-test +MESSAGE ?= hello +GROUP ?= my-group +MAX ?= 10 +PARTITIONS ?= 1 +REPLICAS ?= 1 +KAFKA_VER ?= 4.1.0 +METADATA_VER ?= 4.1-IV0 + +KUBECTL := kubectl -n $(NAMESPACE) +KEXEC := $(KUBECTL) exec $(BROKER) -- +KAFKA_BIN := /opt/kafka/bin + +.DEFAULT_GOAL := help +.PHONY: help \ + deploy smoke-test upgrade teardown \ + _config \ + topic-list topic-create topic-delete \ + produce consume list-messages \ + group-list group-offsets + +# ── Help ────────────────────────────────────────────────────────────────────── + +help: + @printf "Usage: make [VARIABLE=value ...]\n\n" + @printf "Cluster lifecycle:\n" + @printf " deploy Install Strimzi operator + Kafka cluster + users\n" + @printf " smoke-test Run produce/consume end-to-end test\n" + @printf " upgrade Upgrade Kafka (KAFKA_VER=) or Strimzi operator\n" + @printf " teardown Delete the Kafka cluster and namespace\n" + @printf "\nTopics:\n" + @printf " topic-list List all topics\n" + @printf " topic-create Create TOPIC (idempotent)\n" + @printf " topic-delete Delete TOPIC\n" + @printf "\nMessages:\n" + @printf " produce Produce MESSAGE into TOPIC\n" + @printf " consume Consume MAX messages from TOPIC (from latest)\n" + @printf " list-messages Consume MAX messages from TOPIC (from beginning)\n" + @printf "\nConsumer groups:\n" + @printf " group-list List all consumer groups\n" + @printf " group-offsets Show offsets and lag for GROUP\n" + @printf "\nVariables (current values):\n" + @printf " TOPIC=%-12s KAFKA_VER=%-10s PARTITIONS=%s\n" "$(TOPIC)" "$(KAFKA_VER)" "$(PARTITIONS)" + @printf " MESSAGE=%-12s GROUP=%-10s REPLICAS=%s\n" "$(MESSAGE)" "$(GROUP)" "$(REPLICAS)" + @printf " MAX=%-12s METADATA_VER=%s\n\n" "$(MAX)" "$(METADATA_VER)" + +# ── Cluster lifecycle ───────────────────────────────────────────────────────── + +deploy: + @echo "==> [1/5] Creating namespace '$(NAMESPACE)'" + @kubectl create namespace $(NAMESPACE) --dry-run=client -o yaml | kubectl apply -f - + @echo "==> [2/5] Installing Strimzi operator" + @helm repo add strimzi https://strimzi.io/charts/ 2>/dev/null || true + @helm repo update strimzi 2>/dev/null + @helm upgrade --install strimzi-kafka-operator strimzi/strimzi-kafka-operator \ + --namespace $(NAMESPACE) \ + --set watchAnyNamespace=false \ + --wait --timeout 5m + @echo "==> [3/5] Waiting for Strimzi CRDs" + @for crd in kafkas.kafka.strimzi.io kafkanodepools.kafka.strimzi.io kafkausers.kafka.strimzi.io; do \ + until kubectl get crd "$$crd" -o jsonpath='{.status.conditions[?(@.type=="Established")].status}' 2>/dev/null | grep -q True; do sleep 2; done; \ + echo " $$crd ready"; \ + done + @echo "==> [4/5] Deploying Kafka cluster (KRaft, SASL_SSL, SCRAM-SHA-512)" + @kubectl apply -f $(SCRIPT_DIR)kafka.yaml + @kubectl wait kafka/$(KAFKA_NAME) --for=condition=Ready --timeout=10m -n $(NAMESPACE) + @echo "==> [5/5] Deploying KafkaUsers" + @kubectl apply -f $(SCRIPT_DIR)kafka-users.yaml + @for user in kafka-admin kafka-client; do \ + kubectl wait kafkauser/$$user --for=condition=Ready --timeout=120s -n $(NAMESPACE); \ + done + @echo "" + @echo "Cluster ready. Bootstrap: $(BOOTSTRAP)" + +smoke-test: + @echo "==> Running smoke test" + @$(KUBECTL) delete job kafka-smoke-test --ignore-not-found > /dev/null + @kubectl apply -f $(SCRIPT_DIR)kafka-smoke-test.yaml + @until $(KUBECTL) get pod -l job-name=kafka-smoke-test \ + -o jsonpath='{.items[0].status.phase}' 2>/dev/null | grep -qE 'Succeeded|Failed'; do sleep 3; done + @$(KUBECTL) logs -l job-name=kafka-smoke-test --tail=20 + @$(KUBECTL) get pod -l job-name=kafka-smoke-test \ + -o jsonpath='{.items[0].status.phase}' | grep -q Succeeded \ + && echo "==> Smoke test PASSED" \ + || { echo "==> Smoke test FAILED"; exit 1; } + +upgrade: + @echo "==> Upgrading Strimzi operator" + @helm repo update strimzi 2>/dev/null + @helm upgrade strimzi-kafka-operator strimzi/strimzi-kafka-operator \ + --namespace $(NAMESPACE) \ + --set watchAnyNamespace=false \ + --wait --timeout 5m + @echo "==> Upgrading Kafka to $(KAFKA_VER) (metadataVersion=$(METADATA_VER))" + @$(KUBECTL) patch kafka $(KAFKA_NAME) --type=merge \ + -p '{"spec":{"kafka":{"version":"$(KAFKA_VER)","metadataVersion":"$(METADATA_VER)"}}}' + @kubectl wait kafka/$(KAFKA_NAME) --for=condition=Ready --timeout=10m -n $(NAMESPACE) + @echo "==> Upgrade complete" + @$(KUBECTL) get kafka $(KAFKA_NAME) + +teardown: + @echo "==> Deleting KafkaUsers and Kafka cluster" + @kubectl delete -f $(SCRIPT_DIR)kafka-users.yaml --ignore-not-found + @kubectl delete -f $(SCRIPT_DIR)kafka.yaml --ignore-not-found + @echo "==> Waiting for pods to terminate" + @$(KUBECTL) wait pod -l strimzi.io/cluster=$(KAFKA_NAME) \ + --for=delete --timeout=120s 2>/dev/null || true + @echo "==> Uninstalling Strimzi operator" + @helm uninstall strimzi-kafka-operator -n $(NAMESPACE) 2>/dev/null || true + @echo "==> Deleting namespace '$(NAMESPACE)'" + @kubectl delete namespace $(NAMESPACE) --ignore-not-found + @echo "==> Teardown complete" + +# ── Internal: write client config into the broker pod ───────────────────────── + +_config: + @$(KUBECTL) get secret kafka-cluster-ca-cert \ + -o jsonpath='{.data.ca\.crt}' | base64 -d \ + | $(KUBECTL) exec -i $(BROKER) -- tee /tmp/mk-ca.crt > /dev/null + @$(KUBECTL) get secret kafka-client \ + -o jsonpath='{.data.sasl\.jaas\.config}' | base64 -d \ + | $(KUBECTL) exec -i $(BROKER) -- tee /tmp/mk-jaas.conf > /dev/null + @$(KEXEC) /bin/bash -c \ + 'printf "security.protocol=SASL_SSL\nssl.truststore.type=PEM\nssl.truststore.location=/tmp/mk-ca.crt\nsasl.mechanism=SCRAM-SHA-512\nsasl.jaas.config=$$(cat /tmp/mk-jaas.conf)\n" \ + > /tmp/mk-client.properties' + +# ── Topics ──────────────────────────────────────────────────────────────────── + +topic-list: _config + @$(KEXEC) $(KAFKA_BIN)/kafka-topics.sh \ + --bootstrap-server $(BOOTSTRAP) \ + --command-config /tmp/mk-client.properties \ + --list + +topic-create: _config + @$(KEXEC) $(KAFKA_BIN)/kafka-topics.sh \ + --bootstrap-server $(BOOTSTRAP) \ + --command-config /tmp/mk-client.properties \ + --create \ + --topic $(TOPIC) \ + --partitions $(PARTITIONS) \ + --replication-factor $(REPLICAS) \ + --if-not-exists + @echo "Topic '$(TOPIC)' ready (partitions=$(PARTITIONS) replicas=$(REPLICAS))" + +topic-delete: _config + @$(KEXEC) $(KAFKA_BIN)/kafka-topics.sh \ + --bootstrap-server $(BOOTSTRAP) \ + --command-config /tmp/mk-client.properties \ + --delete \ + --topic $(TOPIC) + @echo "Topic '$(TOPIC)' deleted" + +# ── Messages ────────────────────────────────────────────────────────────────── + +produce: _config + @echo "$(MESSAGE)" \ + | $(KUBECTL) exec -i $(BROKER) -- \ + $(KAFKA_BIN)/kafka-console-producer.sh \ + --bootstrap-server $(BOOTSTRAP) \ + --producer.config /tmp/mk-client.properties \ + --topic $(TOPIC) + @echo "Produced to '$(TOPIC)': $(MESSAGE)" + +consume: _config + @$(KEXEC) $(KAFKA_BIN)/kafka-console-consumer.sh \ + --bootstrap-server $(BOOTSTRAP) \ + --consumer.config /tmp/mk-client.properties \ + --topic $(TOPIC) \ + --max-messages $(MAX) \ + --timeout-ms 10000 2>/dev/null + +list-messages: _config + @$(KEXEC) $(KAFKA_BIN)/kafka-console-consumer.sh \ + --bootstrap-server $(BOOTSTRAP) \ + --consumer.config /tmp/mk-client.properties \ + --topic $(TOPIC) \ + --from-beginning \ + --max-messages $(MAX) \ + --timeout-ms 15000 2>/dev/null + +# ── Consumer groups ─────────────────────────────────────────────────────────── + +group-list: _config + @$(KEXEC) $(KAFKA_BIN)/kafka-consumer-groups.sh \ + --bootstrap-server $(BOOTSTRAP) \ + --command-config /tmp/mk-client.properties \ + --list 2>/dev/null + +group-offsets: _config + @$(KEXEC) /bin/bash -c \ + 'out=$$( $(KAFKA_BIN)/kafka-consumer-groups.sh \ + --bootstrap-server $(BOOTSTRAP) \ + --command-config /tmp/mk-client.properties \ + --group $(GROUP) --describe 2>/dev/null); \ + echo "$$out" | grep -q GroupIdNotFoundException \ + && echo "Group $(GROUP) not found (no consumer has connected yet)" \ + || echo "$$out"' diff --git a/README.md b/README.md index 50ceb4a..e511e72 100644 --- a/README.md +++ b/README.md @@ -60,17 +60,59 @@ This certificate must be trusted by any Kafka client connecting over TLS. - `kubectl` configured against the target cluster - `helm` v3 +- `make` - `cert-manager` is **not** required — Strimzi manages its own CA --- +## Makefile + +All operations are available via `make`. Run `make help` for the full reference. + +``` +Cluster lifecycle: + deploy Install Strimzi operator + Kafka cluster + users + smoke-test Run produce/consume end-to-end test + upgrade Upgrade Kafka (KAFKA_VER=) or Strimzi operator + teardown Delete the Kafka cluster and namespace + +Topics: + topic-list List all topics + topic-create Create TOPIC (idempotent) + topic-delete Delete TOPIC + +Messages: + produce Produce MESSAGE into TOPIC + consume Consume MAX messages from TOPIC (from latest) + list-messages Consume MAX messages from TOPIC (from beginning) + +Consumer groups: + group-list List all consumer groups + group-offsets Show offsets and lag for GROUP +``` + +Key variables (all overridable on the command line): + +| Variable | Default | Description | +|---|---|---| +| `TOPIC` | `smoke-test` | Topic name | +| `MESSAGE` | `hello` | Message to produce | +| `GROUP` | `my-group` | Consumer group | +| `MAX` | `10` | Max messages to consume | +| `PARTITIONS` | `1` | Partitions for topic-create | +| `REPLICAS` | `1` | Replication factor for topic-create | +| `KAFKA_VER` | `4.1.0` | Kafka version for upgrade | +| `METADATA_VER` | `4.1-IV0` | KRaft metadata version for upgrade | + +--- + ## Installation ```bash -./deploy.sh +make deploy ``` -The script performs the following steps: +The target performs the following steps: 1. Creates the `kafka` namespace 2. Installs the Strimzi operator via Helm (scoped to the `kafka` namespace) @@ -143,24 +185,11 @@ sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule require ## Testing -A smoke test job is provided that validates the full produce/consume cycle over SASL_SSL. - -### Run the smoke test - ```bash -kubectl apply -f kafka-smoke-test.yaml -kubectl logs -n kafka -l job-name=kafka-smoke-test -f +make smoke-test ``` -### Re-run - -```bash -kubectl delete job kafka-smoke-test -n kafka -kubectl apply -f kafka-smoke-test.yaml -kubectl logs -n kafka -l job-name=kafka-smoke-test -f -``` - -### What the test does +Runs a Kubernetes Job that validates the full produce/consume cycle over SASL_SSL: 1. **Topic creation** — creates topic `smoke-test` with `--if-not-exists` (idempotent) 2. **Produce** — sends 5 messages (`message-1` … `message-5`) via `kafka-console-producer` @@ -179,9 +208,10 @@ message-3 message-4 message-5 ==> SMOKE TEST PASSED (5/5 messages) +==> Smoke test PASSED ``` -The job is cleaned up automatically after 10 minutes (`ttlSecondsAfterFinished: 600`). +The job is cleaned up automatically after 1 minute (`ttlSecondsAfterFinished: 60`). --- @@ -199,59 +229,41 @@ Strimzi will create the corresponding secret in the `kafka` namespace within sec ## Upgrade -### Upgrade the Kafka version +> **Rule:** always upgrade the Strimzi operator before upgrading the Kafka version. +> Check the [Strimzi upgrade guide](https://strimzi.io/docs/operators/latest/deploying.html#assembly-upgrade-str) for supported upgrade paths. -Supported versions are dictated by the installed Strimzi operator. Check available versions: +### Upgrade Kafka version + +`KAFKA_VER` and `METADATA_VER` are independent because Strimzi recommends upgrading +the broker binary first, then migrating the metadata format — this allows rollback of +the binary without being blocked by an already-migrated metadata version. ```bash -kubectl get kafka kafka -n kafka \ - -o jsonpath='{.status.kafkaVersion}' +# Step 1: upgrade broker binary only +make upgrade KAFKA_VER=4.1.1 METADATA_VER=4.1-IV0 -helm show chart strimzi/strimzi-kafka-operator | grep appVersion +# Step 2: once confirmed stable, migrate metadata format +make upgrade KAFKA_VER=4.1.1 METADATA_VER=4.1-IV1 ``` -To upgrade Kafka from e.g. `4.1.0` to `4.1.1`: - -1. Update `spec.kafka.version` in `kafka.yaml` -2. Update `spec.kafka.metadataVersion` if the new version introduces a new metadata version -3. Apply the change: - -```bash -kubectl apply -f kafka.yaml -kubectl wait kafka/kafka --for=condition=Ready --timeout=10m -n kafka -``` - -Strimzi performs a rolling restart of the broker pod automatically. - ### Upgrade the Strimzi operator ```bash -helm repo update strimzi -helm upgrade strimzi-kafka-operator strimzi/strimzi-kafka-operator \ - --namespace kafka \ - --set watchAnyNamespace=false \ - --wait --timeout 5m +make upgrade ``` -> **Note:** always upgrade the operator before upgrading the Kafka version. Check the [Strimzi upgrade guide](https://strimzi.io/docs/operators/latest/deploying.html#assembly-upgrade-str) for supported upgrade paths. +Running `upgrade` without overriding `KAFKA_VER` upgrades only the Strimzi operator +via `helm upgrade` and leaves the Kafka CR unchanged. --- ## Uninstallation ```bash -# Delete Kafka cluster and users (PVC is deleted because deleteClaim: true) -kubectl delete -f kafka-users.yaml -kubectl delete -f kafka.yaml - -# Wait for pods to terminate -kubectl wait --for=delete pod -l strimzi.io/cluster=kafka -n kafka --timeout=120s - -# Uninstall Strimzi operator -helm uninstall strimzi-kafka-operator -n kafka - -# Delete namespace (removes all remaining secrets and CRDs bindings) -kubectl delete namespace kafka +make teardown ``` -> The `deleteClaim: true` flag in `kafka.yaml` ensures the 10 Gi PVC is deleted together with the KafkaNodePool, leaving no orphaned volumes. +Deletes in order: KafkaUsers → Kafka cluster → Strimzi operator → namespace. + +> The `deleteClaim: true` flag in `kafka.yaml` ensures the 10 Gi PVC is deleted +> together with the KafkaNodePool, leaving no orphaned volumes. diff --git a/kafka-smoke-test.yaml b/kafka-smoke-test.yaml index 67e69bd..1eff1dd 100644 --- a/kafka-smoke-test.yaml +++ b/kafka-smoke-test.yaml @@ -4,7 +4,7 @@ metadata: name: kafka-smoke-test namespace: kafka spec: - ttlSecondsAfterFinished: 600 + ttlSecondsAfterFinished: 60 backoffLimit: 3 template: spec: diff --git a/kafka-users.yaml b/kafka-users.yaml index 1cccee5..8fd2c9a 100644 --- a/kafka-users.yaml +++ b/kafka-users.yaml @@ -35,6 +35,7 @@ spec: - Read - Write - Create + - Delete - Describe - DescribeConfigs - resource: