Helm/Kafka

From Chorke Wiki
Jump to navigation Jump to search
helm repo add bitnami https://charts.bitnami.com/bitnami
helm repo update && helm repo list
kubectl config get-contexts

Config

export KUBECONFIG="${HOME}/.kube/dev-kubeconfig.yaml"
export KUBECONFIG="${HOME}/.kube/gcp-kubeconfig.yaml"
export KUBECONFIG="${HOME}/.kube/config"

Install

helm show values bitnami/kafka --version=31.0.0|less
helm show values bitnami/kafka --version=31.1.0|less


kubectl get ns|grep kafka
kubectl delete ns   kafka

kubectl get ns|grep kafka
kubectl create ns   kafka
kubectl -n=kafka create secret generic kafka-secret \
--from-literal=inter-broker-client-secret=sadaqah! \
--from-literal=controller-client-secret=sadaqah! \
--from-literal=inter-broker-password=sadaqah! \
--from-literal=controller-password=sadaqah! \
--from-literal=zookeeper-password=sadaqah! \
--from-literal=client-passwords=sadaqah! \
--dry-run=client -o=yaml \
| kubectl apply -f=-

cat <<ENV | kubectl -n=kafka create secret generic kafka-secret --from-env-file=/dev/stdin
inter-broker-client-secret=sadaqah!
controller-client-secret=sadaqah!
inter-broker-password=sadaqah!
controller-password=sadaqah!
zookeeper-password=sadaqah!
client-passwords=sadaqah!
ENV

cat << YML | kubectl -n=kafka apply -f=-
---
apiVersion: v1
kind: PersistentVolume
metadata:
  name: pv-kafka
spec:
  persistentVolumeReclaimPolicy: Retain
  storageClassName: standard
  capacity:
    storage: 10Gi
  accessModes:
  - ReadWriteOnce
  hostPath:
    path: /tmp/hostpath-provisioner/kafka/pvc-kafka
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  labels:
    app.kubernetes.io/name: kafka
  name: pvc-kafka
spec:
  storageClassName: standard
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi
  volumeName: pv-kafka
YML
cat << YML | kubectl -n=kafka apply -f=-
---













---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
  labels:
    app.kubernetes.io/name: kafka
  name: pvc-kafka
spec:
  storageClassName: standard
  accessModes:
  - ReadWriteOnce
  resources:
    requests:
      storage: 10Gi

YML

PV_NAME="$(kubectl -n=kafka get   pvc pvc-kafka -o=yaml|yq -r .spec.volumeName)"
cat <<YML |kubectl -n=kafka patch pv ${PV_NAME} --patch-file=/dev/stdin
---
spec:
  persistentVolumeReclaimPolicy: Retain
YML

cat <<YML | helm -n=kafka install    kafka bitnami/kafka --version=31.1.0 -f=-
---
externalAccess:
  enabled: true
  controller:
    forceExpose: false
    service:
      loadBalancerIPs:
       - 192.168.49.100
      domain: kafka.k8s.local
fullnameOverride: kafka
sasl:
  existingSecret: kafka-secret
  client:
    users:
    - chorke
controller:
  replicaCount: 1
broker:
  replicaCount: 0
YML

sudo arp -s 192.168.49.100 02:42:c0:a8:31:02
sudo arp -d 192.168.49.100 # for delete
arp -n
arp -a

  xdg-open http://kafka.k8s.local &>/dev/null &
gnome-open http://kafka.k8s.local &>/dev/null &
   x-www-browser http://kafka.k8s.local &>/dev/null &
sensible-browser http://kafka.k8s.local &>/dev/null &

Uninstall

helm uninstall -n=kafka kafka
kubectl delete namespace kafka

Swiss Knife

kubectl -n=kafka run -i --tty --rm kafka-producer \
--image=docker.io/bitnami/kafka:3.9.0-debian-12-r1 \
--restart=Never -- bash
kubectl -n=kafka run -i --tty --rm kafka-consumer \
--image=docker.io/bitnami/kafka:3.9.0-debian-12-r1 \
--restart=Never -- bash

cat << PRO | tee /tmp/kafka.properties >/dev/null
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="chorke" password="sadaqah!";
PRO
cat /tmp/kafka.properties

kafka-topics.sh --list \
--command-config /tmp/kafka.properties \
--bootstrap-server 192.168.49.100:9094

kafka-console-producer.sh \
--producer.config /tmp/kafka.properties \
--bootstrap-server 192.168.49.100:9094 \
--topic=test
kafka-console-consumer.sh \
--consumer.config /tmp/kafka.properties \
--bootstrap-server 192.168.49.100:9094 \
--topic test --from-beginning

Swiss Knife » Python

kubectl -n kafka run -i --tty --rm kafka-consumer \
--image=python:3.11 --restart=Never -- bash
kubectl -n kafka run -i --tty --rm kafka-producer \
--image=python:3.11 --restart=Never -- bash

apt-get update && apt-get upgrade -y && apt-get install -y inetutils-telnet && apt-get autoclean

python3 -m venv ~/.config/venv/kafka --prompt=Kafka
source ~/.config/venv/kafka/bin/activate
pip install confluent-kafka
pip install kafka-python

telnet 192.168.49.100 9094

cat << EXE | python
from confluent_kafka import Consumer

consumer_config = {
  'bootstrap.servers': '192.168.49.100:9094',
  'security.protocol': 'SASL_PLAINTEXT',
  'sasl.mechanism': 'SCRAM-SHA-256',
  'sasl.username': 'chorke',
  'sasl.password': 'sadaqah!',
  'group.id': 'test-group',
  'auto.offset.reset': 'earliest',
}

consumer = Consumer(consumer_config)
consumer.subscribe(['test'])

try:
  while True:
    msg = consumer.poll(1.0)
    if msg is None:
      continue
    if msg.error():
      print(f"Consumer error: {msg.error()}")
      continue
    print(f"Received message: {msg.value().decode('utf-8')}")

except KeyboardInterrupt:
  pass
finally:
  consumer.close()
EXE
cat << EXE | python
from confluent_kafka import Producer



producer_config = {
  'bootstrap.servers': '192.168.49.100:9094',
  'security.protocol': 'SASL_PLAINTEXT',
  'sasl.mechanism': 'SCRAM-SHA-256',
  'sasl.password': 'sadaqah!',
  'sasl.username': 'chorke',
}

producer = Producer(producer_config)


def delivery_report(err, msg):
  if err is not None:
    print(f"Message delivery failed: {err}")
  else:
    print(f"Message delivered to {msg.topic()} [{msg.partition()}]")






producer.produce('test', key='key',
    value='Hello from Python!', callback=delivery_report)
producer.flush()
EXE

cat << EXE | python
from kafka import KafkaConsumer

consumer = KafkaConsumer(
    'test',
    group_id='my-group',
    bootstrap_servers='192.168.49.100:9094',
    security_protocol='SASL_PLAINTEXT',
    sasl_mechanism='SCRAM-SHA-256',
    sasl_plain_password='sadaqah!',
    sasl_plain_username='chorke',
    auto_offset_reset='earliest'
)

for message in consumer:
    print(f"Received message: {message.value.decode('utf-8')}")
EXE
cat << EXE | python
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers='192.168.49.100:9094',
    security_protocol='SASL_PLAINTEXT',
    sasl_mechanism='SCRAM-SHA-256',
    sasl_plain_password='sadaqah!',
    sasl_plain_username='chorke'
)




producer.send('test', b'Hello, Kafka!')
producer.flush()
EXE

deactivate
exit

Playground

helm -n=kafka install    kafka bitnami/kafka --version=31.0.0
helm -n=kafka upgrade -i kafka bitnami/kafka --version=31.1.0
helm show values bitnami/kafka --version=31.1.0|less

kubectl -n=kafka get secret kafka-kraft-cluster-id -o=yaml|yq -r .data."kraft-cluster-id"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."inter-broker-client-secret"|base64 -d && echo

kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."controller-client-secret"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."inter-broker-password"|base64 -d && echo

kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."controller-password"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."zookeeper-password"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."client-passwords"|base64 -d && echo

kubectl -n=kafka exec -it svc/kafka -c=kafka -- bash
kubectl -n=kafka exec -it svc/kafka -c=kafka -- sh

kubectl config --kubeconfig=${HOME}/.kube/aws-kubeconfig.yaml view --flatten
kubectl config --kubeconfig=${HOME}/.kube/dev-kubeconfig.yaml view --flatten
kubectl config --kubeconfig=${HOME}/.kube/gcp-kubeconfig.yaml view --flatten
kubectl config --kubeconfig=${HOME}/.kube/config view --flatten

kubectl -n=kafka delete all --all
kubectl -n=kafka delete ing --all
kubectl -n=kafka delete sts --all
kubectl -n=kafka delete svc --all
kubectl -n=kafka delete pvc --all
kubectl -n=kafka delete pv  --all

kubectl -n=kafka rollout history sts kafka-master
kubectl -n=kafka rollout restart sts kafka-master
kubectl -n=kafka rollout status  sts kafka-master
kubectl -n=kafka exec -it svc/kafka -c=kafka -- bash
kubectl -n=kafka logs -f  svc/kafka -c=kafka
kubectl -n=kafka logs -f  svc/kafka

References