Helm/Kafka: Difference between revisions
Jump to navigation
Jump to search
(10 intermediate revisions by the same user not shown) | |||
Line 217: | Line 217: | ||
|colspan="2"| | |colspan="2"| | ||
<syntaxhighlight lang="bash"> | <syntaxhighlight lang="bash"> | ||
cat << | cat << PRO | tee /tmp/kafka.properties >/dev/null | ||
security.protocol=SASL_PLAINTEXT | security.protocol=SASL_PLAINTEXT | ||
sasl.mechanism=SCRAM-SHA-256 | sasl.mechanism=SCRAM-SHA-256 | ||
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="chorke" password="sadaqah!"; | sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="chorke" password="sadaqah!"; | ||
PRO | |||
cat /tmp/kafka.properties | cat /tmp/kafka.properties | ||
</syntaxhighlight> | </syntaxhighlight> | ||
Line 233: | Line 233: | ||
kafka-topics.sh --list \ | kafka-topics.sh --list \ | ||
--command-config /tmp/kafka.properties \ | --command-config /tmp/kafka.properties \ | ||
--bootstrap-server | --bootstrap-server 192.168.49.100:9094 | ||
</syntaxhighlight> | </syntaxhighlight> | ||
Line 244: | Line 244: | ||
kafka-console-producer.sh \ | kafka-console-producer.sh \ | ||
--producer.config /tmp/kafka.properties \ | --producer.config /tmp/kafka.properties \ | ||
--bootstrap-server | --bootstrap-server 192.168.49.100:9094 \ | ||
--topic=test | --topic=test | ||
</syntaxhighlight> | </syntaxhighlight> | ||
Line 252: | Line 252: | ||
kafka-console-consumer.sh \ | kafka-console-consumer.sh \ | ||
--consumer.config /tmp/kafka.properties \ | --consumer.config /tmp/kafka.properties \ | ||
--bootstrap-server | --bootstrap-server 192.168.49.100:9094 \ | ||
--topic test --from-beginning | --topic test --from-beginning | ||
</syntaxhighlight> | </syntaxhighlight> | ||
|} | |||
==Swiss Knife » Python== | |||
{| | |||
|valign='top'| | |||
<syntaxhighlight lang="bash"> | |||
kubectl -n kafka run -i --tty --rm kafka-consumer \ | |||
--image=python:bookworm --restart=Never -- bash | |||
</syntaxhighlight> | |||
|valign='top'| | |||
<syntaxhighlight lang="bash"> | |||
kubectl -n kafka run -i --tty --rm kafka-producer \ | |||
--image=python:bookworm --restart=Never -- bash | |||
</syntaxhighlight> | |||
|- | |||
|colspan="2"| | |||
---- | |||
|- | |||
|colspan="2"| | |||
<syntaxhighlight lang="bash"> | |||
apt-get update && apt-get upgrade -y && apt-get install -y inetutils-telnet && apt-get autoclean | |||
python3 -m venv ~/.config/venv/kafka --prompt=Kafka | |||
source ~/.config/venv/kafka/bin/activate | |||
pip install confluent-kafka | |||
telnet 192.168.49.100 9094 | |||
</syntaxhighlight> | |||
|- | |||
|colspan="2"| | |||
---- | |||
|- | |||
|valign='top'| | |||
<syntaxhighlight lang="python" line> | |||
cat << EXE | python | |||
from confluent_kafka import Consumer | |||
consumer_config = { | |||
'bootstrap.servers': '192.168.49.100:9094', | |||
'security.protocol': 'SASL_PLAINTEXT', | |||
'sasl.mechanism': 'SCRAM-SHA-256', | |||
'sasl.username': 'chorke', | |||
'sasl.password': 'sadaqah!', | |||
'group.id': 'test-group', | |||
'auto.offset.reset': 'earliest', | |||
} | |||
consumer = Consumer(consumer_config) | |||
consumer.subscribe(['test']) | |||
try: | |||
while True: | |||
msg = consumer.poll(1.0) | |||
if msg is None: | |||
continue | |||
if msg.error(): | |||
print(f"Consumer error: {msg.error()}") | |||
continue | |||
print(f"Received message: {msg.value().decode('utf-8')}") | |||
except KeyboardInterrupt: | |||
pass | |||
finally: | |||
consumer.close() | |||
EXE | |||
</syntaxhighlight> | |||
|valign='top'| | |||
<syntaxhighlight lang="python"> | |||
cat << EXE | python | |||
from confluent_kafka import Producer | |||
producer_config = { | |||
'bootstrap.servers': '192.168.49.100:9094', | |||
'security.protocol': 'SASL_PLAINTEXT', | |||
'sasl.mechanism': 'SCRAM-SHA-256', | |||
'sasl.password': 'sadaqah!', | |||
'sasl.username': 'chorke', | |||
} | |||
producer = Producer(producer_config) | |||
def delivery_report(err, msg): | |||
if err is not None: | |||
print(f"Message delivery failed: {err}") | |||
else: | |||
print(f"Message delivered to {msg.topic()} [{msg.partition()}]") | |||
producer.produce('test', key='key', | |||
value='Hello from Python!', callback=delivery_report) | |||
producer.flush() | |||
EXE | |||
</syntaxhighlight> | |||
|- | |||
|colspan="2"| | |||
---- | |||
|- | |||
|colspan="2"| | |||
<syntaxhighlight lang="bash"> | |||
deactivate | |||
exit | |||
</syntaxhighlight> | |||
|} | |} | ||
Latest revision as of 09:14, 21 December 2024
helm repo add bitnami https://charts.bitnami.com/bitnami helm repo update && helm repo list kubectl config get-contexts
Config
export KUBECONFIG="${HOME}/.kube/dev-kubeconfig.yaml"
export KUBECONFIG="${HOME}/.kube/gcp-kubeconfig.yaml"
export KUBECONFIG="${HOME}/.kube/config"
Install
helm show values bitnami/kafka --version=31.0.0|less
helm show values bitnami/kafka --version=31.1.0|less
kubectl get ns|grep kafka
kubectl delete ns kafka
kubectl get ns|grep kafka
kubectl create ns kafka
|
kubectl -n=kafka create secret generic kafka-secret \
--from-literal=inter-broker-client-secret=sadaqah! \
--from-literal=controller-client-secret=sadaqah! \
--from-literal=inter-broker-password=sadaqah! \
--from-literal=controller-password=sadaqah! \
--from-literal=zookeeper-password=sadaqah! \
--from-literal=client-passwords=sadaqah! \
--dry-run=client -o=yaml \
| kubectl apply -f=-
|
| |
cat <<ENV | kubectl -n=kafka create secret generic kafka-secret --from-env-file=/dev/stdin
inter-broker-client-secret=sadaqah!
controller-client-secret=sadaqah!
inter-broker-password=sadaqah!
controller-password=sadaqah!
zookeeper-password=sadaqah!
client-passwords=sadaqah!
ENV
| |
| |
cat << YML | kubectl -n=kafka apply -f=-
---
apiVersion: v1
kind: PersistentVolume
metadata:
name: pv-kafka
spec:
persistentVolumeReclaimPolicy: Retain
storageClassName: standard
capacity:
storage: 10Gi
accessModes:
- ReadWriteOnce
hostPath:
path: /tmp/hostpath-provisioner/kafka/pvc-kafka
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
labels:
app.kubernetes.io/name: kafka
name: pvc-kafka
spec:
storageClassName: standard
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
volumeName: pv-kafka
YML
|
cat << YML | kubectl -n=kafka apply -f=-
---
---
apiVersion: v1
kind: PersistentVolumeClaim
metadata:
labels:
app.kubernetes.io/name: kafka
name: pvc-kafka
spec:
storageClassName: standard
accessModes:
- ReadWriteOnce
resources:
requests:
storage: 10Gi
YML
|
| |
PV_NAME="$(kubectl -n=kafka get pvc pvc-kafka -o=yaml|yq -r .spec.volumeName)"
cat <<YML |kubectl -n=kafka patch pv ${PV_NAME} --patch-file=/dev/stdin
---
spec:
persistentVolumeReclaimPolicy: Retain
YML
| |
| |
cat <<YML | helm -n=kafka install kafka bitnami/kafka --version=31.1.0 -f=-
---
externalAccess:
enabled: true
controller:
forceExpose: false
service:
loadBalancerIPs:
- 192.168.49.100
domain: kafka.k8s.local
fullnameOverride: kafka
sasl:
existingSecret: kafka-secret
client:
users:
- chorke
controller:
replicaCount: 1
broker:
replicaCount: 0
YML
| |
| |
xdg-open http://kafka.k8s.local &>/dev/null &
gnome-open http://kafka.k8s.local &>/dev/null &
|
x-www-browser http://kafka.k8s.local &>/dev/null &
sensible-browser http://kafka.k8s.local &>/dev/null &
|
Uninstall
helm uninstall -n=kafka kafka
kubectl delete namespace kafka
Swiss Knife
kubectl -n=kafka run -i --tty --rm kafka-producer \
--image=docker.io/bitnami/kafka:3.9.0-debian-12-r1 \
--restart=Never -- bash
|
kubectl -n=kafka run -i --tty --rm kafka-consumer \
--image=docker.io/bitnami/kafka:3.9.0-debian-12-r1 \
--restart=Never -- bash
|
| |
cat << PRO | tee /tmp/kafka.properties >/dev/null
security.protocol=SASL_PLAINTEXT
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required username="chorke" password="sadaqah!";
PRO
cat /tmp/kafka.properties
| |
| |
kafka-topics.sh --list \
--command-config /tmp/kafka.properties \
--bootstrap-server 192.168.49.100:9094
| |
| |
kafka-console-producer.sh \
--producer.config /tmp/kafka.properties \
--bootstrap-server 192.168.49.100:9094 \
--topic=test
|
kafka-console-consumer.sh \
--consumer.config /tmp/kafka.properties \
--bootstrap-server 192.168.49.100:9094 \
--topic test --from-beginning
|
Swiss Knife » Python
kubectl -n kafka run -i --tty --rm kafka-consumer \
--image=python:bookworm --restart=Never -- bash
|
kubectl -n kafka run -i --tty --rm kafka-producer \
--image=python:bookworm --restart=Never -- bash
|
| |
apt-get update && apt-get upgrade -y && apt-get install -y inetutils-telnet && apt-get autoclean
python3 -m venv ~/.config/venv/kafka --prompt=Kafka
source ~/.config/venv/kafka/bin/activate
pip install confluent-kafka
telnet 192.168.49.100 9094
| |
| |
cat << EXE | python
from confluent_kafka import Consumer
consumer_config = {
'bootstrap.servers': '192.168.49.100:9094',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.username': 'chorke',
'sasl.password': 'sadaqah!',
'group.id': 'test-group',
'auto.offset.reset': 'earliest',
}
consumer = Consumer(consumer_config)
consumer.subscribe(['test'])
try:
while True:
msg = consumer.poll(1.0)
if msg is None:
continue
if msg.error():
print(f"Consumer error: {msg.error()}")
continue
print(f"Received message: {msg.value().decode('utf-8')}")
except KeyboardInterrupt:
pass
finally:
consumer.close()
EXE
|
cat << EXE | python
from confluent_kafka import Producer
producer_config = {
'bootstrap.servers': '192.168.49.100:9094',
'security.protocol': 'SASL_PLAINTEXT',
'sasl.mechanism': 'SCRAM-SHA-256',
'sasl.password': 'sadaqah!',
'sasl.username': 'chorke',
}
producer = Producer(producer_config)
def delivery_report(err, msg):
if err is not None:
print(f"Message delivery failed: {err}")
else:
print(f"Message delivered to {msg.topic()} [{msg.partition()}]")
producer.produce('test', key='key',
value='Hello from Python!', callback=delivery_report)
producer.flush()
EXE
|
| |
deactivate
exit
|
Playground
helm -n=kafka install kafka bitnami/kafka --version=31.0.0
helm -n=kafka upgrade -i kafka bitnami/kafka --version=31.1.0
helm show values bitnami/kafka --version=31.1.0|less
| |
| |
kubectl -n=kafka get secret kafka-kraft-cluster-id -o=yaml|yq -r .data."kraft-cluster-id"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."inter-broker-client-secret"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."controller-client-secret"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."inter-broker-password"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."controller-password"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."zookeeper-password"|base64 -d && echo
kubectl -n=kafka get secret kafka-secret -o=yaml|yq -r .data."client-passwords"|base64 -d && echo
kubectl -n=kafka exec -it svc/kafka -c=kafka -- bash
kubectl -n=kafka exec -it svc/kafka -c=kafka -- sh
| |
| |
kubectl config --kubeconfig=${HOME}/.kube/aws-kubeconfig.yaml view --flatten
kubectl config --kubeconfig=${HOME}/.kube/dev-kubeconfig.yaml view --flatten
kubectl config --kubeconfig=${HOME}/.kube/gcp-kubeconfig.yaml view --flatten
kubectl config --kubeconfig=${HOME}/.kube/config view --flatten
| |
| |
kubectl -n=kafka delete all --all
kubectl -n=kafka delete ing --all
kubectl -n=kafka delete sts --all
|
kubectl -n=kafka delete svc --all
kubectl -n=kafka delete pvc --all
kubectl -n=kafka delete pv --all
|
| |
kubectl -n=kafka rollout history sts kafka-master
kubectl -n=kafka rollout restart sts kafka-master
kubectl -n=kafka rollout status sts kafka-master
|
kubectl -n=kafka exec -it svc/kafka -c=kafka -- bash
kubectl -n=kafka logs -f svc/kafka -c=kafka
kubectl -n=kafka logs -f svc/kafka
|
References
| ||