# ────────────────────────────────────────────────────────────────────────────── # Kafka cluster management — Strimzi / k3s # Requires: kubectl + helm configured for the target cluster # ────────────────────────────────────────────────────────────────────────────── NAMESPACE := kafka BROKER := kafka-dual-role-0 BOOTSTRAP := kafka-kafka-bootstrap.$(NAMESPACE).svc.cluster.local:9093 SCRIPT_DIR := $(dir $(abspath $(lastword $(MAKEFILE_LIST)))) KAFKA_NAME := kafka # Overridable per invocation TOPIC ?= smoke-test MESSAGE ?= hello GROUP ?= my-group MAX ?= 10 PARTITIONS ?= 1 REPLICAS ?= 1 KAFKA_VER ?= 4.1.0 METADATA_VER ?= 4.1-IV0 KUBECTL := kubectl -n $(NAMESPACE) KEXEC := $(KUBECTL) exec $(BROKER) -- KAFKA_BIN := /opt/kafka/bin .DEFAULT_GOAL := help .PHONY: help \ deploy smoke-test upgrade teardown \ _config \ topic-list topic-create topic-delete \ produce consume list-messages \ group-list group-offsets # ── Help ────────────────────────────────────────────────────────────────────── help: @printf "Usage: make [VARIABLE=value ...]\n\n" @printf "Cluster lifecycle:\n" @printf " deploy Install Strimzi operator + Kafka cluster + users\n" @printf " smoke-test Run produce/consume end-to-end test\n" @printf " upgrade Upgrade Kafka (KAFKA_VER=) or Strimzi operator\n" @printf " teardown Delete the Kafka cluster and namespace\n" @printf "\nTopics:\n" @printf " topic-list List all topics\n" @printf " topic-create Create TOPIC (idempotent)\n" @printf " topic-delete Delete TOPIC\n" @printf "\nMessages:\n" @printf " produce Produce MESSAGE into TOPIC\n" @printf " consume Consume MAX messages from TOPIC (from latest)\n" @printf " list-messages Consume MAX messages from TOPIC (from beginning)\n" @printf "\nConsumer groups:\n" @printf " group-list List all consumer groups\n" @printf " group-offsets Show offsets and lag for GROUP\n" @printf "\nVariables (current values):\n" @printf " TOPIC=%-12s KAFKA_VER=%-10s PARTITIONS=%s\n" "$(TOPIC)" "$(KAFKA_VER)" "$(PARTITIONS)" @printf " MESSAGE=%-12s GROUP=%-10s REPLICAS=%s\n" "$(MESSAGE)" "$(GROUP)" "$(REPLICAS)" @printf " MAX=%-12s METADATA_VER=%s\n\n" "$(MAX)" "$(METADATA_VER)" # ── Cluster lifecycle ───────────────────────────────────────────────────────── deploy: @echo "==> [1/5] Creating namespace '$(NAMESPACE)'" @kubectl create namespace $(NAMESPACE) --dry-run=client -o yaml | kubectl apply -f - @echo "==> [2/5] Installing Strimzi operator" @helm repo add strimzi https://strimzi.io/charts/ 2>/dev/null || true @helm repo update strimzi 2>/dev/null @helm upgrade --install strimzi-kafka-operator strimzi/strimzi-kafka-operator \ --namespace $(NAMESPACE) \ --set watchAnyNamespace=false \ --wait --timeout 5m @echo "==> [3/5] Waiting for Strimzi CRDs" @for crd in kafkas.kafka.strimzi.io kafkanodepools.kafka.strimzi.io kafkausers.kafka.strimzi.io; do \ until kubectl get crd "$$crd" -o jsonpath='{.status.conditions[?(@.type=="Established")].status}' 2>/dev/null | grep -q True; do sleep 2; done; \ echo " $$crd ready"; \ done @echo "==> [4/5] Deploying Kafka cluster (KRaft, SASL_SSL, SCRAM-SHA-512)" @kubectl apply -f $(SCRIPT_DIR)kafka.yaml @kubectl wait kafka/$(KAFKA_NAME) --for=condition=Ready --timeout=10m -n $(NAMESPACE) @echo "==> [5/5] Deploying KafkaUsers" @kubectl apply -f $(SCRIPT_DIR)kafka-users.yaml @for user in kafka-admin kafka-client; do \ kubectl wait kafkauser/$$user --for=condition=Ready --timeout=120s -n $(NAMESPACE); \ done @echo "" @echo "Cluster ready. Bootstrap: $(BOOTSTRAP)" smoke-test: @echo "==> Running smoke test" @$(KUBECTL) delete job kafka-smoke-test --ignore-not-found > /dev/null @kubectl apply -f $(SCRIPT_DIR)kafka-smoke-test.yaml @until $(KUBECTL) get pod -l job-name=kafka-smoke-test \ -o jsonpath='{.items[0].status.phase}' 2>/dev/null | grep -qE 'Succeeded|Failed'; do sleep 3; done @$(KUBECTL) logs -l job-name=kafka-smoke-test --tail=20 @$(KUBECTL) get pod -l job-name=kafka-smoke-test \ -o jsonpath='{.items[0].status.phase}' | grep -q Succeeded \ && echo "==> Smoke test PASSED" \ || { echo "==> Smoke test FAILED"; exit 1; } upgrade: @echo "==> Upgrading Strimzi operator" @helm repo update strimzi 2>/dev/null @helm upgrade strimzi-kafka-operator strimzi/strimzi-kafka-operator \ --namespace $(NAMESPACE) \ --set watchAnyNamespace=false \ --wait --timeout 5m @echo "==> Upgrading Kafka to $(KAFKA_VER) (metadataVersion=$(METADATA_VER))" @$(KUBECTL) patch kafka $(KAFKA_NAME) --type=merge \ -p '{"spec":{"kafka":{"version":"$(KAFKA_VER)","metadataVersion":"$(METADATA_VER)"}}}' @kubectl wait kafka/$(KAFKA_NAME) --for=condition=Ready --timeout=10m -n $(NAMESPACE) @echo "==> Upgrade complete" @$(KUBECTL) get kafka $(KAFKA_NAME) teardown: @echo "==> Deleting KafkaUsers and Kafka cluster" @kubectl delete -f $(SCRIPT_DIR)kafka-users.yaml --ignore-not-found @kubectl delete -f $(SCRIPT_DIR)kafka.yaml --ignore-not-found @echo "==> Waiting for pods to terminate" @$(KUBECTL) wait pod -l strimzi.io/cluster=$(KAFKA_NAME) \ --for=delete --timeout=120s 2>/dev/null || true @echo "==> Uninstalling Strimzi operator" @helm uninstall strimzi-kafka-operator -n $(NAMESPACE) 2>/dev/null || true @echo "==> Deleting namespace '$(NAMESPACE)'" @kubectl delete namespace $(NAMESPACE) --ignore-not-found @echo "==> Teardown complete" # ── Internal: write client config into the broker pod ───────────────────────── _config: @$(KUBECTL) get secret kafka-cluster-ca-cert \ -o jsonpath='{.data.ca\.crt}' | base64 -d \ | $(KUBECTL) exec -i $(BROKER) -- tee /tmp/mk-ca.crt > /dev/null @$(KUBECTL) get secret kafka-client \ -o jsonpath='{.data.sasl\.jaas\.config}' | base64 -d \ | $(KUBECTL) exec -i $(BROKER) -- tee /tmp/mk-jaas.conf > /dev/null @$(KEXEC) /bin/bash -c \ 'printf "security.protocol=SASL_SSL\nssl.truststore.type=PEM\nssl.truststore.location=/tmp/mk-ca.crt\nsasl.mechanism=SCRAM-SHA-512\nsasl.jaas.config=$$(cat /tmp/mk-jaas.conf)\n" \ > /tmp/mk-client.properties' # ── Topics ──────────────────────────────────────────────────────────────────── topic-list: _config @$(KEXEC) $(KAFKA_BIN)/kafka-topics.sh \ --bootstrap-server $(BOOTSTRAP) \ --command-config /tmp/mk-client.properties \ --list topic-create: _config @$(KEXEC) $(KAFKA_BIN)/kafka-topics.sh \ --bootstrap-server $(BOOTSTRAP) \ --command-config /tmp/mk-client.properties \ --create \ --topic $(TOPIC) \ --partitions $(PARTITIONS) \ --replication-factor $(REPLICAS) \ --if-not-exists @echo "Topic '$(TOPIC)' ready (partitions=$(PARTITIONS) replicas=$(REPLICAS))" topic-delete: _config @$(KEXEC) $(KAFKA_BIN)/kafka-topics.sh \ --bootstrap-server $(BOOTSTRAP) \ --command-config /tmp/mk-client.properties \ --delete \ --topic $(TOPIC) @echo "Topic '$(TOPIC)' deleted" # ── Messages ────────────────────────────────────────────────────────────────── produce: _config @echo "$(MESSAGE)" \ | $(KUBECTL) exec -i $(BROKER) -- \ $(KAFKA_BIN)/kafka-console-producer.sh \ --bootstrap-server $(BOOTSTRAP) \ --producer.config /tmp/mk-client.properties \ --topic $(TOPIC) @echo "Produced to '$(TOPIC)': $(MESSAGE)" consume: _config @$(KEXEC) $(KAFKA_BIN)/kafka-console-consumer.sh \ --bootstrap-server $(BOOTSTRAP) \ --consumer.config /tmp/mk-client.properties \ --topic $(TOPIC) \ --max-messages $(MAX) \ --timeout-ms 10000 2>/dev/null list-messages: _config @$(KEXEC) $(KAFKA_BIN)/kafka-console-consumer.sh \ --bootstrap-server $(BOOTSTRAP) \ --consumer.config /tmp/mk-client.properties \ --topic $(TOPIC) \ --from-beginning \ --max-messages $(MAX) \ --timeout-ms 15000 2>/dev/null # ── Consumer groups ─────────────────────────────────────────────────────────── group-list: _config @$(KEXEC) $(KAFKA_BIN)/kafka-consumer-groups.sh \ --bootstrap-server $(BOOTSTRAP) \ --command-config /tmp/mk-client.properties \ --list 2>/dev/null group-offsets: _config @$(KEXEC) /bin/bash -c \ 'out=$$( $(KAFKA_BIN)/kafka-consumer-groups.sh \ --bootstrap-server $(BOOTSTRAP) \ --command-config /tmp/mk-client.properties \ --group $(GROUP) --describe 2>/dev/null); \ echo "$$out" | grep -q GroupIdNotFoundException \ && echo "Group $(GROUP) not found (no consumer has connected yet)" \ || echo "$$out"'