Helm/Kafka: Difference between revisions
Jump to navigation
Jump to search
Line 262: | Line 262: | ||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
kubectl -n kafka run -i --tty --rm kafka-consumer \ | kubectl -n kafka run -i --tty --rm kafka-consumer \ | ||
--image=python: | --image=python:3.11 --restart=Never -- bash | ||
</syntaxhighlight> | </syntaxhighlight> | ||
Line 268: | Line 268: | ||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
kubectl -n kafka run -i --tty --rm kafka-producer \ | kubectl -n kafka run -i --tty --rm kafka-producer \ | ||
--image=python: | --image=python:3.11 --restart=Never -- bash | ||
</syntaxhighlight> | </syntaxhighlight> | ||
Line 282: | Line 282: | ||
source ~/.config/venv/kafka/bin/activate | source ~/.config/venv/kafka/bin/activate | ||
pip install confluent-kafka | pip install confluent-kafka | ||
pip install kafka-python | |||
telnet 192.168.49.100 9094 | telnet 192.168.49.100 9094 | ||
Line 291: | Line 292: | ||
|- | |- | ||
|valign='top'| | |valign='top'| | ||
<syntaxhighlight lang="python" | <syntaxhighlight lang="python"> | ||
cat << EXE | python | cat << EXE | python | ||
from confluent_kafka import Consumer | from confluent_kafka import Consumer | ||
Line 326: | Line 327: | ||
|valign='top'| | |valign='top'| | ||
<syntaxhighlight lang="python"> | <syntaxhighlight lang="python" line> | ||
cat << EXE | python | cat << EXE | python | ||
from confluent_kafka import Producer | from confluent_kafka import Producer | ||
Line 356: | Line 357: | ||
producer.produce('test', key='key', | producer.produce('test', key='key', | ||
value='Hello from Python!', callback=delivery_report) | value='Hello from Python!', callback=delivery_report) | ||
producer.flush() | |||
EXE | |||
</syntaxhighlight> | |||
|- | |||
|colspan="2"| | |||
---- | |||
|- | |||
|valign='top'| | |||
<syntaxhighlight lang="python"> | |||
cat << EXE | python | |||
from kafka import KafkaConsumer | |||
consumer = KafkaConsumer( | |||
'test', | |||
group_id='my-group', | |||
bootstrap_servers='192.168.49.100:9094', | |||
security_protocol='SASL_PLAINTEXT', | |||
sasl_mechanism='SCRAM-SHA-256', | |||
sasl_plain_password='sadaqah!', | |||
sasl_plain_username='chorke', | |||
auto_offset_reset='earliest' | |||
) | |||
for message in consumer: | |||
print(f"Received message: {message.value.decode('utf-8')}") | |||
EXE | |||
</syntaxhighlight> | |||
|valign='top'| | |||
<syntaxhighlight lang="python" line> | |||
cat << EXE | python | |||
from kafka import KafkaProducer | |||
producer = KafkaProducer( | |||
bootstrap_servers='192.168.49.100:9094', | |||
security_protocol='SASL_PLAINTEXT', | |||
sasl_mechanism='SCRAM-SHA-256', | |||
sasl_plain_password='sadaqah!', | |||
sasl_plain_username='chorke' | |||
) | |||
producer.send('test', b'Hello, Kafka!') | |||
producer.flush() | producer.flush() | ||
EXE | EXE |
Revision as of 10:34, 23 December 2024
helm repo add bitnami https://charts.bitnami.com/bitnami helm repo update && helm repo list kubectl config get-contexts
Config
export KUBECONFIG="${HOME}/.kube/dev-kubeconfig.yaml"
export KUBECONFIG="${HOME}/.kube/gcp-kubeconfig.yaml"
export KUBECONFIG="${HOME}/.kube/config"
Install
helm show values bitnami/kafka --version=31.0.0|less
helm show values bitnami/kafka --version=31.1.0|less
kubectl get ns|grep kafka
kubectl delete ns kafka
kubectl get ns|grep kafka
kubectl create ns kafka
|
kubectl -n=kafka create secret generic kafka-secret \
--from-literal=inter-broker-client-secret=sadaqah! \
--from-literal=controller-client-secret=sadaqah! \
--from-literal=inter-broker-password=sadaqah! \
--from-literal=controller-password=sadaqah! \
--from-literal=zookeeper-password=sadaqah! \
--from-literal=client-passwords=sadaqah! \
--dry-run=client -o=yaml \
| kubectl apply -f=-
|
| |
cat <<ENV | kubectl -n=kafka create secret generic kafka-secret --from-env-file=/dev/stdin
inter-broker-client-secret=sadaqah!
controller-client-secret=sadaqah!
inter-broker-password=sadaqah!
controller-password=sadaqah!
zookeeper-password=sadaqah!
client-passwords=sadaqah!
ENV
| |
| |
cat << YML | kubectl -n=kafka apply -f=-
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: pv-kafka
spec:
persistentVolumeReclaimPolicy: Retain
storageClassName: standard
capacity:
storage: 10Gi
accessModes:
- ReadWriteOnce
hostPath:
path: /tmp/hostpath-provisioner/kafka/pvc-kafka
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
labels:
app.kubernetes.io/name: kafka
name: pvc-kafka
spec:
storageClassName: standard
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
volumeName: pv-kafka
YML
|
cat << YML | kubectl -n=kafka apply -f=-
---
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
labels:
app.kubernetes.io/name: kafka
name: pvc-kafka
spec:
storageClassName: standard
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
YML
|
| |
PV_NAME="$(kubectl -n=kafka get pvc pvc-kafka -o=yaml|yq -r .spec.volumeName)"
cat <<YML |kubectl -n=kafka patch pv ${PV_NAME} --patch-file=/dev/stdin
---
spec:
persistentVolumeReclaimPolicy: Retain
YML
| |
| |
cat <<YML | helm -n=kafka install kafka bitnami/kafka --version=31.1.0 -f=-
---
externalAccess:
enabled: true
controller:
forceExpose: false
service:
loadBalancerIPs:
- 192.168.49.100
domain: kafka.k8s.local
fullnameOverride: kafka
sasl:
existingSecret: kafka-secret
client:
users:
- chorke
controller:
replicaCount: 1
broker:
replicaCount: 0
YML
| |
| |
xdg-open http://kafka.k8s.local &>/dev/null &
gnome-open http://kafka.k8s.local &>/dev/null &
|
x-www-browser http://kafka.k8s.local &>/dev/null &
sensible-browser http://kafka.k8s.local &>/dev/null &
|
Uninstall
helm uninstall -n=kafka kafka
kubectl delete namespace kafka
Swiss Knife
kubectl -n=kafka run -i --tty --rm kafka-producer \
--image=docker.io/bitnami/kafka:3.9.0-debian-12-r1 \
--restart=Never -- bash
|
kubectl -n=kafka run -i --tty --rm kafka-consumer \
--image=docker.io/bitnami/kafka:3.9.0-debian-12-r1 \
--restart=Never -- bash
|
| |
cat << PRO | tee /tmp/kafka.properties >/dev/null
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="chorke" password="sadaqah!";
PRO
cat /tmp/kafka.properties
| |
| |
kafka-topics.sh --list \
--command-config /tmp/kafka.properties \
--bootstrap-server 192.168.49.100:9094
| |
| |
kafka-console-producer.sh \
--producer.config /tmp/kafka.properties \
--bootstrap-server 192.168.49.100:9094 \
--topic=test
|
kafka-console-consumer.sh \
--consumer.config /tmp/kafka.properties \
--bootstrap-server 192.168.49.100:9094 \
--topic test --from-beginning
|
Swiss Knife » Python
kubectl -n kafka run -i --tty --rm kafka-consumer \
--image=python:3.11 --restart=Never -- bash
|
kubectl -n kafka run -i --tty --rm kafka-producer \
--image=python:3.11 --restart=Never -- bash
|
| |
apt-get update && apt-get upgrade -y && apt-get install -y inetutils-telnet && apt-get autoclean
python3 -m venv ~/.config/venv/kafka --prompt=Kafka
source ~/.config/venv/kafka/bin/activate
pip install confluent-kafka
pip install kafka-python
telnet 192.168.49.100 9094
| |
| |
cat << EXE | python
from confluent_kafka import Consumer
consumer_config = {
'bootstrap.servers': '192.168.49.100:9094',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.username': 'chorke',
'sasl.password': 'sadaqah!',
'group.id': 'test-group',
'auto.offset.reset': 'earliest',
}
consumer = Consumer(consumer_config)
consumer.subscribe(['test'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
EXE
|
cat << EXE | python
from confluent_kafka import Producer
producer_config = {
'bootstrap.servers': '192.168.49.100:9094',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.password': 'sadaqah!',
'sasl.username': 'chorke',
}
producer = Producer(producer_config)
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
producer.produce('test', key='key',
value='Hello from Python!', callback=delivery_report)
producer.flush()
EXE
|
| |
cat << EXE | python
from kafka import KafkaConsumer
consumer = KafkaConsumer(
'test',
group_id='my-group',
bootstrap_servers='192.168.49.100:9094',
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_password='sadaqah!',
sasl_plain_username='chorke',
auto_offset_reset='earliest'
)
for message in consumer:
print(f"Received message: {message.value.decode('utf-8')}")
EXE
|
cat << EXE | python
from kafka import KafkaProducer
producer = KafkaProducer(
bootstrap_servers='192.168.49.100:9094',
security_protocol='SASL_PLAINTEXT',
sasl_mechanism='SCRAM-SHA-256',
sasl_plain_password='sadaqah!',
sasl_plain_username='chorke'
)
producer.send('test', b'Hello, Kafka!')
producer.flush()
EXE
|
| |
deactivate
exit
|
Playground
helm -n=kafka install kafka bitnami/kafka --version=31.0.0
helm -n=kafka upgrade -i kafka bitnami/kafka --version=31.1.0
helm show values bitnami/kafka --version=31.1.0|less
| |
| |
kubectl -n=kafka get secret kafka-kraft-cluster-id -o=yaml|yq -r .data."kraft-cluster-id"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."inter-broker-client-secret"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."controller-client-secret"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."inter-broker-password"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."controller-password"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."zookeeper-password"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."client-passwords"|base64 -d && echo
kubectl -n=kafka exec -it svc/kafka -c=kafka -- bash
kubectl -n=kafka exec -it svc/kafka -c=kafka -- sh
| |
| |
kubectl config --kubeconfig=${HOME}/.kube/aws-kubeconfig.yaml view --flatten
kubectl config --kubeconfig=${HOME}/.kube/dev-kubeconfig.yaml view --flatten
kubectl config --kubeconfig=${HOME}/.kube/gcp-kubeconfig.yaml view --flatten
kubectl config --kubeconfig=${HOME}/.kube/config view --flatten
| |
| |
kubectl -n=kafka delete all --all
kubectl -n=kafka delete ing --all
kubectl -n=kafka delete sts --all
|
kubectl -n=kafka delete svc --all
kubectl -n=kafka delete pvc --all
kubectl -n=kafka delete pv --all
|
| |
kubectl -n=kafka rollout history sts kafka-master
kubectl -n=kafka rollout restart sts kafka-master
kubectl -n=kafka rollout status sts kafka-master
|
kubectl -n=kafka exec -it svc/kafka -c=kafka -- bash
kubectl -n=kafka logs -f svc/kafka -c=kafka
kubectl -n=kafka logs -f svc/kafka
|
References
| ||