【Linux存储系列教程】kafka消息队列(中间件)
【Linux存储系列教程】kafka消息队列(中间件)
一、关于kafka
1.kafka是什么?
kafka
是一个多分区、多副本且基于zookeeper
协调的分布式消息系统。也是一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。
2.kafka的作用
- 消息系统
kafka
具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、可扩展性、可恢复性等功能。- 提供消息顺序性保障、回溯消费功能
- 存储系统
- 支持将消息持久化到磁盘
- 流处理平台
- 提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作
3.kafka基本结构
producer
生产者- 发送消息的一方
- 生产者负责创建消息,然后将其投递 到
kafka
中
consumer
消费者- 消费者,也就是接收消息的一方。
- 消费者连接到Kafka上并接收消息,进而进行相应的业务逻辑处理
consumer Group(CG)
消费者组- 消费者组,由多个
consumer
组成。 - 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
- 消费者组之间互不影响
- 消费者组,由多个
broker
kafka服务器节点或实例- 一个
kafka
集群由多个broker
组成 - 一个
broker
可以容纳多个topic
- 一个
topic
主题- Kafka中的消息以主题为单位进行归类,
- 生产者负责将消息发送到特定的主题(发送到
Kafka
集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费
partition
分区- 主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区
Topic-Partition
。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志Log
文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量offset
。offset
是消息在分区中的唯一标识,Kafka
通过它来保证消息在分区内的顺序性,不过offset
并不跨越分区,也就是说,Kafka
保证的是分区有序而不是主题有序。
- 主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区
replica
副本Kafka
为分区引入了多副本Replica
机制,通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是==一主多从==的关系,其中leader
副本负责处理读写请求,follower
副本只负责与leader副本的消息同步。副本处于不同的broker
中,当leader
副本出现故障时,从follower
副本中重新选举新的leader副本对外提供服务。Kafka
通过多副本机制实现了故障的自动转移,当Kafka
集群中某个broker
失效时仍然能保证服务可用。
二、kafka的复制机制
Kafka
使用ISR
的方式进行数据复制,则有效地权衡了数据可靠性和性能之间的关系。
AR、ISR、OSR、HW、LEO
分区中的所有副本统称为AR
Assigned Replicas
。所有与leader副本保持一定程度同步的副本(包括leader
副本在内)组成ISRIn-Sync Replicas
,ISR集合是AR集合中的一个子集。消息会先发送到leader
副本,然后follower
副本才能从leader
副本中拉取消息进行同步,同步期间内follower
副本相对于leader
副本而言会有一定程度的滞后。与
leader
副本同步滞后过多的副本==不包括leader副本==组成OSROut-of-Sync Replicas
,由此可见,AR=ISR+OSR
。在正常情况下,所有的follower
副本都应该与leader
副本保持一定程度的同步,即AR=ISR
,OSR
集合为==空==。
leader
副本负责维护和跟踪ISR集合中所有follower
副本的滞后状态,当follower
副本落后太多或失效时,leader
副本会把它从ISR集合中剔除。如果OSR集合中有follower
副本==追上==了leader
副本,那么leader
副本会把它从OSR集合转移至ISR集合。默认情况下,当leader
副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader
,而在OSR集合中的副本则没有任何机会==不过这个原则也可以通过修改相应的参数配置来改变==。ISR与HW和LEO也有紧密的关系。HW是
High Watermark
的缩写,俗称高水位,它标识了一个特定的消息偏移量offset
,消费者只能拉取到这个offset
之前的消息。LEO是
Log End Offset
的缩写,它标识当前日志文件中下一条待写入消息的offset
,分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW,对消费者而言只能消费HW之前的消息。
三、kafka消息确认机制 ACK
producer
发送消息后,leader
将消息同步给follower
,然后返回ACK
给producer
,表示消息已收到,此时才可以继续发送下一条消息。
kafka
提供了以下3种ACK
级别:0
:leader
接收到消息马上返回ack
,此时可能还没有写入磁盘,可能丢失数据1
:leader
将消息写入磁盘后,马上返回ack
,此时可能还没同步follower
,同样可能丢失数据-1(all)
:leader
和follower
都将数据写入磁盘后,返回ack
。但是如果在写入磁盘后,ack
尚未发送,此时leader
发生故障,会导致数据写入重复
四、kafka的partition分区方式
consumer
采用pull
方式主动从broker
拉取数据,此时会传入timeout
参数,如果当前没有数据可消费,consumer
会等待一段时间,直到timeout
超期才返回1
个topic
有多个partition
,1
个consumer-group
有多个consumer
,这其中就涉及到partition
的分配问题。
kafka
提供2
种分配方式:Range
和RoundRobin
range
- 原理是将
partition数
/consumer数
,来决定每个consumer
分配几个partition
。如果除不尽,则前面几个consumer
会多1
个partition
。
- 原理是将
RoundRobin
- 轮询每个
consumer
,逐一分配
- 轮询每个
五、kafka集群部署
在部署
kafka
之前,需要提前部署zookeeper
!点我跳转到zookeeper
部署教程。
1.部署jdk环境
过程省略,请查看
zookeeper
部署教程:https://www.wsjj.top/archives/107
2.安装kafka
A.下载kafka
如果提示没有
wget
命令,请安装yum install -y wget
[root@zk1 ~]# wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz
B.安装kafka
[root@zk1 ~]# tar xf kafka_2.13-3.3.1.tgz -C /usr/local/
[root@zk1 ~]# ls /usr/local/
bin games jdk1.8.0_191 lib libexec share zookeeper
etc include kafka_2.13-3.3.1 lib64 sbin src zookeeper37
[root@zk1 ~]# mv /usr/local/kafka_2.13-3.3.1 /usr/local/kafka33
C.配置环境变量
[root@zk1 ~]# vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_191
export KAFKA_HOME=/usr/local/kafka33
export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin #自行修改上一期教程添加的字段
[root@zk1 ~]# source /etc/profile
D.创建kafka
日志目录
[root@zk1 ~]# mkdir /usr/local/kafka33/kafka-logs
E.修改kafka
配置文件
[root@zk1 ~]# vim /usr/local/kafka33/config/server.properties
#配置文件并不完整,以下仅展示修改的地方。
broker.id=0 #默认保持0即可
listeners=PLAINTEXT://192.168.140.10:9092 #去掉注释,填写本机IP
log.dirs=/usr/local/kafka33/kafka-logs #修改日志存放路径
zookeeper.connect=192.168.140.10:2181,192.168.140.11:2181,192.168.140.12:2181 #指定zookeeper地址,我们有三台。
F.把kafka
目录拷贝给另外2台机器
[root@zk1 ~]# for i in 11 12; do scp -r /usr/local/kafka33/ root@192.168.140.$i:/usr/local/; done
G.拷贝环境文件给另外2台机器
[root@zk1 ~]# for i in 11 12
> do
> scp -r /etc/profile root@192.168.140.$i:/etc/
> done
H.修改另外2台服务器上的配置文件
[root@zk2 ~]# vim /usr/local/kafka33/config/server.properties
broker.id=1 #修改ID,不和之前的重复即可
listeners=PLAINTEXT://192.168.140.11:9092 #修改监听IP为本机
[root@zk3 ~]# vim /usr/local/kafka33/config/server.properties
broker.id=2 #修改ID,不和之前的重复即可
listeners=PLAINTEXT://192.168.140.12:9092 #修改监听IP为本机
重新加载环境配置文件
[root@zk2 ~]# source /etc/profile
[root@zk3 ~]# source /etc/profile
I.启动服务
[root@zk1 ~]# kafka-server-start.sh -daemon /usr/local/kafka33/config/server.properties
[root@zk2 ~]# kafka-server-start.sh -daemon /usr/local/kafka33/config/server.properties
[root@zk3 ~]# kafka-server-start.sh -daemon /usr/local/kafka33/config/server.properties
J.查看是否成功运行
[root@zk1 ~]# jps
3170 Jps
2182 QuorumPeerMain
3035 Kafka
[root@zk2 ~]# jps
2770 Kafka
2853 Jps
1966 QuorumPeerMain
[root@zk3 ~]# jps
2632 Kafka
2715 Jps
1838 QuorumPeerMain
3.在zookeeper
中查看kafka
的注册信息
[root@zk1 ~]# /usr/local/zookeeper37/bin/zkCli.sh
Connecting to localhost:2181
[zk: localhost:2181(CONNECTED) 1] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
#可以看到多了很多目录
[zk: localhost:2181(CONNECTED) 2] get /brokers/ids/0
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.140.10:9092"],"jmx_port":-1,"port":9092,"host":"192.168.140.10","version":5,"timestamp":"1683542941484"}
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.140.11:9092"],"jmx_port":-1,"port":9092,"host":"192.168.140.11","version":5,"timestamp":"1683543144766"}
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.140.12:9092"],"jmx_port":-1,"port":9092,"host":"192.168.140.12","version":5,"timestamp":"1683543145833"}
4.测试topic
操作
[root@zk1 ~]# kafka-topics.sh --create --topic test1 --replication-factor 1 --partitions 1 --bootstrap-server 192.168.140.10:9092
Created topic test1.
[root@zk1 ~]# kafka-topics.sh --list --bootstrap-server 192.168.140.10:9092
test1 #查看当前系统所有topic
[root@zk1 ~]# kafka-topics.sh --describe --topic test1 --bootstrap-server 192.168.140.10:9092
Topic: test1 TopicId: kJ1ilOuQTMOJO8RQasCRrA PartitionCount: 1 ReplicationFactor: 1 Configs:
Topic: test1 Partition: 0 Leader: 1 Replicas: 1 Isr: 1
#查看详细信息
[root@zk1 ~]# kafka-topics.sh --delete --topic test1 --bootstrap-server 192.168.140.10:9092
#删除topic
5.测试生产者、消费者
如果您这里报错,请检查刚刚是否已经删除了
topic
[root@zk1 ~]# kafka-console-producer.sh --broker-list 192.168.140.10:9092 --topic test1 #生产者
>1
>2
>3
>4
>5
>6
>
[root@zk1 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.140.10:9092 --topic test1 --from-beginning #消费者
1
2
3
4
5
6