# ──────────────────────────────────────────────────────────────────────────────
# Kafka cluster management — Strimzi / k3s
# Requires: kubectl + helm configured for the target cluster
# ──────────────────────────────────────────────────────────────────────────────

NAMESPACE    := kafka
BROKER       := kafka-dual-role-0
BOOTSTRAP    := kafka-kafka-bootstrap.$(NAMESPACE).svc.cluster.local:9093
SCRIPT_DIR   := $(dir $(abspath $(lastword $(MAKEFILE_LIST))))
KAFKA_NAME   := kafka

# Overridable per invocation
TOPIC        ?= smoke-test
MESSAGE      ?= hello
GROUP        ?= my-group
MAX          ?= 10
PARTITIONS   ?= 1
REPLICAS     ?= 1
KAFKA_VER    ?= 4.1.0
METADATA_VER ?= 4.1-IV0

KUBECTL   := kubectl -n $(NAMESPACE)
KEXEC     := $(KUBECTL) exec $(BROKER) --
KAFKA_BIN := /opt/kafka/bin

.DEFAULT_GOAL := help
.PHONY: help \
        deploy smoke-test upgrade teardown \
        _config \
        topic-list topic-create topic-delete \
        produce consume list-messages \
        group-list group-offsets

# ── Help ──────────────────────────────────────────────────────────────────────

help:
	@printf "Usage: make <target> [VARIABLE=value ...]\n\n"
	@printf "Cluster lifecycle:\n"
	@printf "  deploy            Install Strimzi operator + Kafka cluster + users\n"
	@printf "  smoke-test        Run produce/consume end-to-end test\n"
	@printf "  upgrade           Upgrade Kafka (KAFKA_VER=) or Strimzi operator\n"
	@printf "  teardown          Delete the Kafka cluster and namespace\n"
	@printf "\nTopics:\n"
	@printf "  topic-list        List all topics\n"
	@printf "  topic-create      Create TOPIC (idempotent)\n"
	@printf "  topic-delete      Delete TOPIC\n"
	@printf "\nMessages:\n"
	@printf "  produce           Produce MESSAGE into TOPIC\n"
	@printf "  consume           Consume MAX messages from TOPIC (from latest)\n"
	@printf "  list-messages     Consume MAX messages from TOPIC (from beginning)\n"
	@printf "\nConsumer groups:\n"
	@printf "  group-list        List all consumer groups\n"
	@printf "  group-offsets     Show offsets and lag for GROUP\n"
	@printf "\nVariables (current values):\n"
	@printf "  TOPIC=%-12s  KAFKA_VER=%-10s  PARTITIONS=%s\n" "$(TOPIC)" "$(KAFKA_VER)" "$(PARTITIONS)"
	@printf "  MESSAGE=%-12s  GROUP=%-10s      REPLICAS=%s\n"  "$(MESSAGE)" "$(GROUP)" "$(REPLICAS)"
	@printf "  MAX=%-12s    METADATA_VER=%s\n\n"               "$(MAX)" "$(METADATA_VER)"

# ── Cluster lifecycle ─────────────────────────────────────────────────────────

deploy:
	@echo "==> [1/5] Creating namespace '$(NAMESPACE)'"
	@kubectl create namespace $(NAMESPACE) --dry-run=client -o yaml | kubectl apply -f -
	@echo "==> [2/5] Installing Strimzi operator"
	@helm repo add strimzi https://strimzi.io/charts/ 2>/dev/null || true
	@helm repo update strimzi 2>/dev/null
	@helm upgrade --install strimzi-kafka-operator strimzi/strimzi-kafka-operator \
	  --namespace $(NAMESPACE) \
	  --set watchAnyNamespace=false \
	  --wait --timeout 5m
	@echo "==> [3/5] Waiting for Strimzi CRDs"
	@for crd in kafkas.kafka.strimzi.io kafkanodepools.kafka.strimzi.io kafkausers.kafka.strimzi.io; do \
	  until kubectl get crd "$$crd" -o jsonpath='{.status.conditions[?(@.type=="Established")].status}' 2>/dev/null | grep -q True; do sleep 2; done; \
	  echo "    $$crd ready"; \
	done
	@echo "==> [4/5] Deploying Kafka cluster (KRaft, SASL_SSL, SCRAM-SHA-512)"
	@kubectl apply -f $(SCRIPT_DIR)kafka.yaml
	@kubectl wait kafka/$(KAFKA_NAME) --for=condition=Ready --timeout=10m -n $(NAMESPACE)
	@echo "==> [5/5] Deploying KafkaUsers"
	@kubectl apply -f $(SCRIPT_DIR)kafka-users.yaml
	@for user in kafka-admin kafka-client; do \
	  kubectl wait kafkauser/$$user --for=condition=Ready --timeout=120s -n $(NAMESPACE); \
	done
	@echo ""
	@echo "Cluster ready. Bootstrap: $(BOOTSTRAP)"

smoke-test:
	@echo "==> Running smoke test"
	@$(KUBECTL) delete job kafka-smoke-test --ignore-not-found > /dev/null
	@kubectl apply -f $(SCRIPT_DIR)kafka-smoke-test.yaml
	@until $(KUBECTL) get pod -l job-name=kafka-smoke-test \
	  -o jsonpath='{.items[0].status.phase}' 2>/dev/null | grep -qE 'Succeeded|Failed'; do sleep 3; done
	@$(KUBECTL) logs -l job-name=kafka-smoke-test --tail=20
	@$(KUBECTL) get pod -l job-name=kafka-smoke-test \
	  -o jsonpath='{.items[0].status.phase}' | grep -q Succeeded \
	  && echo "==> Smoke test PASSED" \
	  || { echo "==> Smoke test FAILED"; exit 1; }

upgrade:
	@echo "==> Upgrading Strimzi operator"
	@helm repo update strimzi 2>/dev/null
	@helm upgrade strimzi-kafka-operator strimzi/strimzi-kafka-operator \
	  --namespace $(NAMESPACE) \
	  --set watchAnyNamespace=false \
	  --wait --timeout 5m
	@echo "==> Upgrading Kafka to $(KAFKA_VER) (metadataVersion=$(METADATA_VER))"
	@$(KUBECTL) patch kafka $(KAFKA_NAME) --type=merge \
	  -p '{"spec":{"kafka":{"version":"$(KAFKA_VER)","metadataVersion":"$(METADATA_VER)"}}}'
	@kubectl wait kafka/$(KAFKA_NAME) --for=condition=Ready --timeout=10m -n $(NAMESPACE)
	@echo "==> Upgrade complete"
	@$(KUBECTL) get kafka $(KAFKA_NAME)

teardown:
	@echo "==> Deleting KafkaUsers and Kafka cluster"
	@kubectl delete -f $(SCRIPT_DIR)kafka-users.yaml --ignore-not-found
	@kubectl delete -f $(SCRIPT_DIR)kafka.yaml --ignore-not-found
	@echo "==> Waiting for pods to terminate"
	@$(KUBECTL) wait pod -l strimzi.io/cluster=$(KAFKA_NAME) \
	  --for=delete --timeout=120s 2>/dev/null || true
	@echo "==> Uninstalling Strimzi operator"
	@helm uninstall strimzi-kafka-operator -n $(NAMESPACE) 2>/dev/null || true
	@echo "==> Deleting namespace '$(NAMESPACE)'"
	@kubectl delete namespace $(NAMESPACE) --ignore-not-found
	@echo "==> Teardown complete"

# ── Internal: write client config into the broker pod ─────────────────────────

_config:
	@$(KUBECTL) get secret kafka-cluster-ca-cert \
	  -o jsonpath='{.data.ca\.crt}' | base64 -d \
	  | $(KUBECTL) exec -i $(BROKER) -- tee /tmp/mk-ca.crt > /dev/null
	@$(KUBECTL) get secret kafka-client \
	  -o jsonpath='{.data.sasl\.jaas\.config}' | base64 -d \
	  | $(KUBECTL) exec -i $(BROKER) -- tee /tmp/mk-jaas.conf > /dev/null
	@$(KEXEC) /bin/bash -c \
	  'printf "security.protocol=SASL_SSL\nssl.truststore.type=PEM\nssl.truststore.location=/tmp/mk-ca.crt\nsasl.mechanism=SCRAM-SHA-512\nsasl.jaas.config=$$(cat /tmp/mk-jaas.conf)\n" \
	   > /tmp/mk-client.properties'

# ── Topics ────────────────────────────────────────────────────────────────────

topic-list: _config
	@$(KEXEC) $(KAFKA_BIN)/kafka-topics.sh \
	  --bootstrap-server $(BOOTSTRAP) \
	  --command-config /tmp/mk-client.properties \
	  --list

topic-create: _config
	@$(KEXEC) $(KAFKA_BIN)/kafka-topics.sh \
	  --bootstrap-server $(BOOTSTRAP) \
	  --command-config /tmp/mk-client.properties \
	  --create \
	  --topic $(TOPIC) \
	  --partitions $(PARTITIONS) \
	  --replication-factor $(REPLICAS) \
	  --if-not-exists
	@echo "Topic '$(TOPIC)' ready (partitions=$(PARTITIONS) replicas=$(REPLICAS))"

topic-delete: _config
	@$(KEXEC) $(KAFKA_BIN)/kafka-topics.sh \
	  --bootstrap-server $(BOOTSTRAP) \
	  --command-config /tmp/mk-client.properties \
	  --delete \
	  --topic $(TOPIC)
	@echo "Topic '$(TOPIC)' deleted"

# ── Messages ──────────────────────────────────────────────────────────────────

produce: _config
	@echo "$(MESSAGE)" \
	  | $(KUBECTL) exec -i $(BROKER) -- \
	    $(KAFKA_BIN)/kafka-console-producer.sh \
	      --bootstrap-server $(BOOTSTRAP) \
	      --producer.config /tmp/mk-client.properties \
	      --topic $(TOPIC)
	@echo "Produced to '$(TOPIC)': $(MESSAGE)"

consume: _config
	@$(KEXEC) $(KAFKA_BIN)/kafka-console-consumer.sh \
	  --bootstrap-server $(BOOTSTRAP) \
	  --consumer.config /tmp/mk-client.properties \
	  --topic $(TOPIC) \
	  --max-messages $(MAX) \
	  --timeout-ms 10000 2>/dev/null

list-messages: _config
	@$(KEXEC) $(KAFKA_BIN)/kafka-console-consumer.sh \
	  --bootstrap-server $(BOOTSTRAP) \
	  --consumer.config /tmp/mk-client.properties \
	  --topic $(TOPIC) \
	  --from-beginning \
	  --max-messages $(MAX) \
	  --timeout-ms 15000 2>/dev/null

# ── Consumer groups ───────────────────────────────────────────────────────────

group-list: _config
	@$(KEXEC) $(KAFKA_BIN)/kafka-consumer-groups.sh \
	  --bootstrap-server $(BOOTSTRAP) \
	  --command-config /tmp/mk-client.properties \
	  --list 2>/dev/null

group-offsets: _config
	@$(KEXEC) /bin/bash -c \
	  'out=$$( $(KAFKA_BIN)/kafka-consumer-groups.sh \
	    --bootstrap-server $(BOOTSTRAP) \
	    --command-config /tmp/mk-client.properties \
	    --group $(GROUP) --describe 2>/dev/null); \
	  echo "$$out" | grep -q GroupIdNotFoundException \
	  && echo "Group $(GROUP) not found (no consumer has connected yet)" \
	  || echo "$$out"'
