helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update && helm repo list
kubectl config get-contexts
Config
export KUBECONFIG="${HOME}/.kube/dev-kubeconfig.yaml"
export KUBECONFIG="${HOME}/.kube/gcp-kubeconfig.yaml"
export KUBECONFIG="${HOME}/.kube/config"
Install
helm show values bitnami/kafka --version=31.0.0|less
helm show values bitnami/kafka --version=31.1.0|less
kubectl get ns|grep kafka
kubectl delete ns kafka
kubectl get ns|grep kafka
kubectl create ns kafka
|
kubectl -n=kafka create secret generic kafka-secret \
--from-literal=inter-broker-client-secret=sadaqah! \
--from-literal=controller-client-secret=sadaqah! \
--from-literal=inter-broker-password=sadaqah! \
--from-literal=controller-password=sadaqah! \
--from-literal=zookeeper-password=sadaqah! \
--from-literal=client-passwords=sadaqah! \
--dry-run=client -o=yaml \
| kubectl apply -f=-
|
|
cat <<ENV | kubectl -n=kafka create secret generic kafka-secret --from-env-file=/dev/stdin
inter-broker-client-secret=sadaqah!
controller-client-secret=sadaqah!
inter-broker-password=sadaqah!
controller-password=sadaqah!
zookeeper-password=sadaqah!
client-passwords=sadaqah!
ENV
|
|
cat << YML | kubectl -n=kafka apply -f=-
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: pv-kafka
spec:
persistentVolumeReclaimPolicy: Retain
storageClassName: standard
capacity:
storage: 10Gi
accessModes:
- ReadWriteOnce
hostPath:
path: /tmp/hostpath-provisioner/kafka/pvc-kafka
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
labels:
app.kubernetes.io/name: kafka
name: pvc-kafka
spec:
storageClassName: standard
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
volumeName: pv-kafka
YML
|
cat << YML | kubectl -n=kafka apply -f=-
---
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
labels:
app.kubernetes.io/name: kafka
name: pvc-kafka
spec:
storageClassName: standard
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
YML
|
|
PV_NAME="$(kubectl -n=kafka get pvc pvc-kafka -o=yaml|yq -r .spec.volumeName)"
cat <<YML |kubectl -n=kafka patch pv ${PV_NAME} --patch-file=/dev/stdin
---
spec:
persistentVolumeReclaimPolicy: Retain
YML
|
|
cat <<YML | helm -n=kafka install kafka bitnami/kafka --version=31.1.0 -f=-
---
externalAccess:
enabled: true
controller:
forceExpose: false
service:
loadBalancerIPs:
- 192.168.49.100
domain: kafka.k8s.local
fullnameOverride: kafka
sasl:
existingSecret: kafka-secret
client:
users:
- chorke
controller:
replicaCount: 1
broker:
replicaCount: 0
YML
|
|
sudo arp -s 192.168.49.100 02:42:c0:a8:31:02
sudo arp -d 192.168.49.100 # for delete
arp -n
arp -a
|
|
xdg-open http://kafka.k8s.local &>/dev/null &
gnome-open http://kafka.k8s.local &>/dev/null &
|
x-www-browser http://kafka.k8s.local &>/dev/null &
sensible-browser http://kafka.k8s.local &>/dev/null &
|
Uninstall
helm uninstall -n=kafka kafka
kubectl delete namespace kafka
Swiss Knife
kubectl -n=kafka run -i --tty --rm kafka-producer \
--image=docker.io/bitnami/kafka:3.9.0-debian-12-r1 \
--restart=Never -- bash
|
kubectl -n=kafka run -i --tty --rm kafka-consumer \
--image=docker.io/bitnami/kafka:3.9.0-debian-12-r1 \
--restart=Never -- bash
|
|
cat << PRO | tee /tmp/kafka.properties >/dev/null
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="chorke" password="sadaqah!";
PRO
cat /tmp/kafka.properties
|
|
kafka-topics.sh --list \
--command-config /tmp/kafka.properties \
--bootstrap-server 192.168.49.100:9094
|
|
kafka-console-producer.sh \
--producer.config /tmp/kafka.properties \
--bootstrap-server 192.168.49.100:9094 \
--topic=test
|
kafka-console-consumer.sh \
--consumer.config /tmp/kafka.properties \
--bootstrap-server 192.168.49.100:9094 \
--topic test --from-beginning
|
Swiss Knife » Python
kubectl -n kafka run -i --tty --rm kafka-consumer \
--image=python:3.11 --restart=Never -- bash
|
kubectl -n kafka run -i --tty --rm kafka-producer \
--image=python:3.11 --restart=Never -- bash
|
|
apt-get update && apt-get upgrade -y && apt-get install -y inetutils-telnet && apt-get autoclean
python3 -m venv ~/.config/venv/kafka --prompt=Kafka
source ~/.config/venv/kafka/bin/activate
pip install confluent-kafka
pip install kafka-python
telnet 192.168.49.100 9094
|
|
cat << EXE | python
from confluent_kafka import Consumer
consumer_config = {
'bootstrap.servers': '192.168.49.100:9094',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.username': 'chorke',
'sasl.password': 'sadaqah!',
'group.id': 'test-group',
'auto.offset.reset': 'earliest',
}
consumer = Consumer(consumer_config)
consumer.subscribe(['test'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
EXE
|
cat << EXE | python
from confluent_kafka import Producer
producer_config = {
'bootstrap.servers': '192.168.49.100:9094',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.password': 'sadaqah!',
'sasl.username': 'chorke',
}
producer = Producer(producer_config)
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
producer.produce('test', key='key',
value='Hello from Python!', callback=delivery_report)
producer.flush()
EXE
|
|
cat << EXE | python
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'test',
group_id='my-group',
bootstrap_servers='192.168.49.100:9094',
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_password='sadaqah!',
sasl_plain_username='chorke',
auto_offset_reset='earliest'
)
for message in consumer:
print(f"Received message: {message.value.decode('utf-8')}")
EXE
|
cat << EXE | python
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers='192.168.49.100:9094',
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_password='sadaqah!',
sasl_plain_username='chorke'
)
producer.send('test', b'Hello, Kafka!')
producer.flush()
EXE
|
|
|
Playground
helm -n=kafka install kafka bitnami/kafka --version=31.0.0
helm -n=kafka upgrade -i kafka bitnami/kafka --version=31.1.0
helm show values bitnami/kafka --version=31.1.0|less
|
|
kubectl -n=kafka get secret kafka-kraft-cluster-id -o=yaml|yq -r .data."kraft-cluster-id"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."inter-broker-client-secret"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."controller-client-secret"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."inter-broker-password"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."controller-password"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."zookeeper-password"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."client-passwords"|base64 -d && echo
kubectl -n=kafka exec -it svc/kafka -c=kafka -- bash
kubectl -n=kafka exec -it svc/kafka -c=kafka -- sh
|
|
kubectl config --kubeconfig=${HOME}/.kube/aws-kubeconfig.yaml view --flatten
kubectl config --kubeconfig=${HOME}/.kube/dev-kubeconfig.yaml view --flatten
kubectl config --kubeconfig=${HOME}/.kube/gcp-kubeconfig.yaml view --flatten
kubectl config --kubeconfig=${HOME}/.kube/config view --flatten
|
|
kubectl -n=kafka delete all --all
kubectl -n=kafka delete ing --all
kubectl -n=kafka delete sts --all
|
kubectl -n=kafka delete svc --all
kubectl -n=kafka delete pvc --all
kubectl -n=kafka delete pv --all
|
|
kubectl -n=kafka rollout history sts kafka-master
kubectl -n=kafka rollout restart sts kafka-master
kubectl -n=kafka rollout status sts kafka-master
|
kubectl -n=kafka exec -it svc/kafka -c=kafka -- bash
kubectl -n=kafka logs -f svc/kafka -c=kafka
kubectl -n=kafka logs -f svc/kafka
|
References