K8s 中运行Kafka

1、kafka 的yaml资源文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
#部署 Service Headless,用于Kafka间相互通信
apiVersion: v1
kind: Service
metadata:
name: kafka-headless
namespace: default
labels:
app: kafka
spec:
type: ClusterIP
clusterIP: None
ports:
- name: kafka
port: 9092
targetPort: kafka
selector:
app: kafka
---
#部署 Service,用于外部访问 Kafka
apiVersion: v1
kind: Service
metadata:
name: kafka
namespace: default
labels:
app: kafka
spec:
type: ClusterIP
ports:
- name: kafka
port: 9092
targetPort: kafka
selector:
app: kafka
---
apiVersion: policy/v1
kind: PodDisruptionBudget
metadata:
name: kafka-pdb
namespace: default
spec:
selector:
matchLabels:
app: kafka
minAvailable: 2
---
apiVersion: apps/v1
kind: StatefulSet
metadata:
name: "kafka"
namespace: default
labels:
app: kafka
spec:
selector:
matchLabels:
app: kafka
serviceName: kafka-headless
podManagementPolicy: "Parallel"
replicas: 3
updateStrategy:
type: "RollingUpdate"
template:
metadata:
name: "kafka"
labels:
app: kafka
spec:
securityContext:
fsGroup: 1001
runAsUser: 1001
containers:
- name: kafka
image: "bitnami/kafka:3.2.1-debian-11-r4"
imagePullPolicy: "IfNotPresent"
resources:
limits:
cpu: 2
memory: 2Gi
requests:
cpu: 250m
memory: 256Mi
env:
- name: MY_POD_IP
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: MY_POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
- name: TZ
value: Asia/Shanghai
- name: KAFKA_CFG_ZOOKEEPER_CONNECT
value: "zk" #Zookeeper Service 名称
- name: KAFKA_PORT_NUMBER
value: "9092"
- name: KAFKA_CFG_LISTENERS
value: "PLAINTEXT://:$(KAFKA_PORT_NUMBER)"
- name: KAFKA_CFG_ADVERTISED_LISTENERS
value: 'PLAINTEXT://$(MY_POD_NAME).kafka-headless:$(KAFKA_PORT_NUMBER)'
- name: ALLOW_PLAINTEXT_LISTENER
value: "yes"
- name: KAFKA_HEAP_OPTS
value: "-Xmx512m -Xms512m"
- name: KAFKA_CFG_LOGS_DIRS
value: /opt/bitnami/kafka/data
- name: JMX_PORT
value: "9988"
ports:
- name: kafka
containerPort: 9092
livenessProbe:
tcpSocket:
port: kafka
initialDelaySeconds: 10
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 2
readinessProbe:
tcpSocket:
port: kafka
initialDelaySeconds: 5
periodSeconds: 10
timeoutSeconds: 5
successThreshold: 1
failureThreshold: 6
volumeMounts:
- name: kafka-data
mountPath: /bitnami/kafka
volumeClaimTemplates:
- metadata:
name: kafka-data
spec:
storageClassName: nfs #指定 storageclass
accessModes:
- "ReadWriteOnce"
resources:
requests:
storage: 5Gi

2、部署kafka

1
[root@001-new ~]$ kubectl apply -f kafka.yaml

3、Kafka功能验证

查看zk节点状态

1
2
3
4
5
[root@001-new ~]$ kubectl get po |grep kafka
NAME READY STATUS RESTARTS AGE
kafka-0 1/1 Running 0 49s
kafka-1 1/1 Running 0 62s
kafka-2 1/1 Running 0 75s

生产消息

进入 kafka-2 ,创建topic test,进入生产者窗口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
# 进入pod
[root@001-new ~]$ kubectl exec -it kafka-2 -- bash
# 进入脚本目录
I have no name!@kafka-2:~ $ cd /opt/bitnami/kafka/bin/

# 创建topic
I have no name!@kafka-2:/opt/bitnami/kafka/bin$ ./kafka-topics.sh --create --bootstrap-server kafka:9092 --replication-factor 1 --partitions 1 --topic test
Created topic test.

# 查看topic列表
I have no name!@kafka-2:/opt/bitnami/kafka/bin$ ./kafka-topics.sh --list --bootstrap-server kafka:9092
test

# 进入topic test生产消息
I have no name!@kafka-2:/opt/bitnami/kafka/bin$ ./kafka-console-producer.sh --topic test --broker-list kafka:9092
>testing

如果报错:Error: JMX connector server communication error: service:jmx:rmi://kafka-2:9988 是jmx配置的端口已经监听了,只需要重新声明环境变量 export JMX_PORT=9989 更换端口即可

消费消息

进入 kafka-0 ,进入消费者窗口

1
2
3
4
5
6
7
8
9
# 进入pod
[root@001-new ~]$ kubectl exec -it kfaka-0 bash

# 进入脚本目录
I have no name!@kafka-0:/$ cd /opt/bitnami/kafka/bin/

# 能消费到数据,说明Kafka状态ok
I have no name!@kafka-0:/opt/bitnami/kafka/bin$ ./kafka-console-consumer.sh --topic test --bootstrap-server kafka:9092
testing

kafka消息的生产和消费正常,kafka集群正常,k8s kafka集群部署完成。

-------------本文结束感谢您的阅读-------------