【Linux存储系列教程】kafka消息队列(中间件)

一、关于kafka

1.kafka是什么?

kafka是一个多分区、多副本且基于zookeeper协调的分布式消息系统。也是一个分布式流式处理平台,它以高吞吐、可持久化、可水平扩展、支持流数据处理等多种特性而被广泛使用。

2.kafka的作用

  • 消息系统
    • kafka具备系统解耦、冗余存储、流量削峰、缓冲、异步通信、可扩展性、可恢复性等功能。
    • 提供消息顺序性保障、回溯消费功能
  • 存储系统
    • 支持将消息持久化到磁盘
  • 流处理平台
    • 提供了一个完整的流式处理类库,比如窗口、连接、变换和聚合等各类操作

3.kafka基本结构

kafka01

  • producer生产者
    • 发送消息的一方
    • 生产者负责创建消息,然后将其投递 到kafka
  • consumer消费者
    • 消费者,也就是接收消息的一方。
    • 消费者连接到Kafka上并接收消息,进而进行相应的业务逻辑处理
  • consumer Group(CG)消费者组
    • 消费者组,由多个consumer组成。
    • 消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费
    • 消费者组之间互不影响
  • brokerkafka服务器节点或实例
    • 一个kafka集群由多个broker组成
    • 一个broker可以容纳多个topic
  • topic主题
    • Kafka中的消息以主题为单位进行归类,
    • 生产者负责将消息发送到特定的主题(发送到Kafka集群中的每一条消息都要指定一个主题),而消费者负责订阅主题并进行消费
  • partition分区
    • 主题是一个逻辑上的概念,它还可以细分为多个分区,一个分区只属于单个主题,很多时候也会把分区称为主题分区Topic-Partition。同一主题下的不同分区包含的消息是不同的,分区在存储层面可以看作一个可追加的日志Log文件,消息在被追加到分区日志文件的时候都会分配一个特定的偏移量offsetoffset是消息在分区中的唯一标识,Kafka通过它来保证消息在分区内的顺序性,不过offset并不跨越分区,也就是说,Kafka保证的是分区有序而不是主题有序。
  • replica副本
    • Kafka为分区引入了多副本Replica机制,通过增加副本数量可以提升容灾能力。同一分区的不同副本中保存的是相同的消息(在同一时刻,副本之间并非完全一样),副本之间是==一主多从==的关系,其中leader副本负责处理读写请求,follower副本只负责与leader副本的消息同步。副本处于不同的broker中,当leader副本出现故障时,从follower副本中重新选举新的leader副本对外提供服务。Kafka通过多副本机制实现了故障的自动转移,当Kafka集群中某个broker失效时仍然能保证服务可用。

二、kafka的复制机制

Kafka使用ISR的方式进行数据复制,则有效地权衡了数据可靠性和性能之间的关系。

AR、ISR、OSR、HW、LEO

分区中的所有副本统称为ARAssigned Replicas。所有与leader副本保持一定程度同步的副本(包括leader副本在内)组成ISRIn-Sync Replicas,ISR集合是AR集合中的一个子集。消息会先发送到leader副本,然后follower副本才能从leader副本中拉取消息进行同步,同步期间内follower副本相对于leader副本而言会有一定程度的滞后。

leader副本同步滞后过多的副本==不包括leader副本==组成OSROut-of-Sync Replicas,由此可见,AR=ISR+OSR。在正常情况下,所有的follower副本都应该与leader副本保持一定程度的同步,即AR=ISROSR集合为==空==。

leader副本负责维护和跟踪ISR集合中所有follower副本的滞后状态,当follower副本落后太多或失效时,leader副本会把它从ISR集合中剔除。如果OSR集合中有follower副本==追上==了leader副本,那么leader副本会把它从OSR集合转移至ISR集合。默认情况下,当leader副本发生故障时,只有在ISR集合中的副本才有资格被选举为新的leader,而在OSR集合中的副本则没有任何机会==不过这个原则也可以通过修改相应的参数配置来改变==。

ISR与HW和LEO也有紧密的关系。HW是High Watermark的缩写,俗称高水位,它标识了一个特定的消息偏移量offset,消费者只能拉取到这个offset之前的消息。

LEO是Log End Offset的缩写,它标识当前日志文件中下一条待写入消息的offset,分区ISR集合中的每个副本都会维护自身的LEO,而ISR集合中最小的LEO即为分区的HW,对消费者而言只能消费HW之前的消息。

三、kafka消息确认机制 ACK

producer发送消息后,leader将消息同步给follower,然后返回ACKproducer,表示消息已收到,此时才可以继续发送下一条消息。

  • kafka提供了以下3种ACK级别:
    • 0leader接收到消息马上返回ack,此时可能还没有写入磁盘,可能丢失数据
    • 1leader将消息写入磁盘后,马上返回ack,此时可能还没同步follower,同样可能丢失数据
    • -1(all)leaderfollower都将数据写入磁盘后,返回ack。但是如果在写入磁盘后,ack尚未发送,此时leader发生故障,会导致数据写入重复

四、kafka的partition分区方式

consumer采用pull方式主动从broker拉取数据,此时会传入timeout参数,如果当前没有数据可消费,consumer会等待一段时间,直到timeout超期才返回
1topic有多个partition1consumer-group有多个consumer,这其中就涉及到partition的分配问题。

  • kafka提供2种分配方式:RangeRoundRobin
    • range
      • 原理是将partition数/consumer数,来决定每个consumer分配几个partition。如果除不尽,则前面几个consumer会多1partition
    • RoundRobin
      • 轮询每个consumer,逐一分配

五、kafka集群部署

在部署kafka之前,需要提前部署zookeeper点我跳转到zookeeper部署教程。

1.部署jdk环境

过程省略,请查看zookeeper部署教程:https://www.wsjj.top/archives/107

2.安装kafka

A.下载kafka

如果提示没有wget命令,请安装yum install -y wget

[root@zk1 ~]# wget https://archive.apache.org/dist/kafka/3.3.1/kafka_2.13-3.3.1.tgz

B.安装kafka

[root@zk1 ~]# tar xf kafka_2.13-3.3.1.tgz -C /usr/local/
[root@zk1 ~]# ls /usr/local/
bin  games    jdk1.8.0_191      lib    libexec  share  zookeeper
etc  include  kafka_2.13-3.3.1  lib64  sbin     src    zookeeper37
[root@zk1 ~]# mv /usr/local/kafka_2.13-3.3.1 /usr/local/kafka33

C.配置环境变量

[root@zk1 ~]# vim /etc/profile
export JAVA_HOME=/usr/local/jdk1.8.0_191
export KAFKA_HOME=/usr/local/kafka33
export PATH=$PATH:$JAVA_HOME/bin:$KAFKA_HOME/bin	#自行修改上一期教程添加的字段

[root@zk1 ~]# source /etc/profile

D.创建kafka日志目录

[root@zk1 ~]# mkdir /usr/local/kafka33/kafka-logs

E.修改kafka配置文件

[root@zk1 ~]# vim /usr/local/kafka33/config/server.properties
#配置文件并不完整,以下仅展示修改的地方。
broker.id=0		#默认保持0即可
listeners=PLAINTEXT://192.168.140.10:9092	#去掉注释,填写本机IP
log.dirs=/usr/local/kafka33/kafka-logs	#修改日志存放路径
zookeeper.connect=192.168.140.10:2181,192.168.140.11:2181,192.168.140.12:2181	#指定zookeeper地址,我们有三台。

F.把kafka目录拷贝给另外2台机器

[root@zk1 ~]# for i in 11 12; do scp -r /usr/local/kafka33/ root@192.168.140.$i:/usr/local/; done

G.拷贝环境文件给另外2台机器

[root@zk1 ~]# for i in 11 12
> do
> scp -r /etc/profile root@192.168.140.$i:/etc/
> done

H.修改另外2台服务器上的配置文件

[root@zk2 ~]# vim /usr/local/kafka33/config/server.properties
broker.id=1	#修改ID,不和之前的重复即可
listeners=PLAINTEXT://192.168.140.11:9092	#修改监听IP为本机
[root@zk3 ~]# vim /usr/local/kafka33/config/server.properties
broker.id=2	#修改ID,不和之前的重复即可
listeners=PLAINTEXT://192.168.140.12:9092	#修改监听IP为本机
重新加载环境配置文件
[root@zk2 ~]# source /etc/profile
[root@zk3 ~]# source /etc/profile

I.启动服务

[root@zk1 ~]# kafka-server-start.sh -daemon /usr/local/kafka33/config/server.properties
[root@zk2 ~]# kafka-server-start.sh -daemon /usr/local/kafka33/config/server.properties
[root@zk3 ~]# kafka-server-start.sh -daemon /usr/local/kafka33/config/server.properties

J.查看是否成功运行

[root@zk1 ~]# jps
3170 Jps
2182 QuorumPeerMain
3035 Kafka
[root@zk2 ~]# jps
2770 Kafka
2853 Jps
1966 QuorumPeerMain
[root@zk3 ~]# jps
2632 Kafka
2715 Jps
1838 QuorumPeerMain

3.在zookeeper中查看kafka的注册信息

[root@zk1 ~]# /usr/local/zookeeper37/bin/zkCli.sh
Connecting to localhost:2181
[zk: localhost:2181(CONNECTED) 1] ls /
[admin, brokers, cluster, config, consumers, controller, controller_epoch, feature, isr_change_notification, latest_producer_id_block, log_dir_event_notification, zookeeper]
#可以看到多了很多目录
[zk: localhost:2181(CONNECTED) 2] get /brokers/ids/0
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.140.10:9092"],"jmx_port":-1,"port":9092,"host":"192.168.140.10","version":5,"timestamp":"1683542941484"}
[zk: localhost:2181(CONNECTED) 3] get /brokers/ids/1
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.140.11:9092"],"jmx_port":-1,"port":9092,"host":"192.168.140.11","version":5,"timestamp":"1683543144766"}
[zk: localhost:2181(CONNECTED) 4] get /brokers/ids/2
{"features":{},"listener_security_protocol_map":{"PLAINTEXT":"PLAINTEXT"},"endpoints":["PLAINTEXT://192.168.140.12:9092"],"jmx_port":-1,"port":9092,"host":"192.168.140.12","version":5,"timestamp":"1683543145833"}

4.测试topic操作

[root@zk1 ~]# kafka-topics.sh --create --topic test1 --replication-factor 1 --partitions 1 --bootstrap-server 192.168.140.10:9092
Created topic test1.
[root@zk1 ~]# kafka-topics.sh --list --bootstrap-server 192.168.140.10:9092 
test1	#查看当前系统所有topic
[root@zk1 ~]# kafka-topics.sh --describe --topic test1 --bootstrap-server 192.168.140.10:9092
Topic: test1	TopicId: kJ1ilOuQTMOJO8RQasCRrA	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: test1	Partition: 0	Leader: 1	Replicas: 1	Isr: 1
#查看详细信息
[root@zk1 ~]# kafka-topics.sh --delete --topic test1 --bootstrap-server 192.168.140.10:9092
#删除topic

5.测试生产者、消费者

如果您这里报错,请检查刚刚是否已经删除了topic

[root@zk1 ~]# kafka-console-producer.sh --broker-list 192.168.140.10:9092 --topic test1	#生产者
>1
>2
>3
>4
>5
>6
>
[root@zk1 ~]# kafka-console-consumer.sh --bootstrap-server 192.168.140.10:9092 --topic test1 --from-beginning	#消费者
1
2
3
4
5
6