生产消费模型
消息队列的使用者主要有两个角色,一个是生产者,一个是消费者。
顾名思义,生产者是生产数据的,而消费者是使用数据的。
创建主题
一个kafka队列,可能有很多的生产者和消费者,为了能让不同的使用者能够区分出自己所需要的数据,因此有一个主题的概念。
生产者在生产数据时把数据推送到指定的主题中,而消费者就可以从该主题中拉取数据。
创建主题只需使用以下命令即可:
# 在kafka_2.13-3.2.0/bin目录下执行
./kafka-topics.sh --bootstrap-server=10.211.55.3:9092 --create --topic mytopic
--bootstrap-server
参数指定了kafka服务地址--create
表示要创建--topic
要创建的主题名
查看主题列表:
./kafka-topics.sh --bootstrap-server=10.211.55.3:9092 --list
查看主题详情:
./kafka-topics.sh --bootstrap-server=10.211.55.3:9092 --describe --topic mytopic
生产者
./kafka-console-producer.sh --bootstrap-server=10.211.55.3:9092 --topic mytopic
执行以上命令后,就可以输入数据:
消费者
消费数据
./kafka-console-consumer.sh --bootstrap-server=10.211.55.3:9092 --topic mytopic --from-beginning
hello world
hello kafka
其中--from-beginning
表示从头开始消费
偏移量
生产者生产的消息是有序的,我们可以把数据想象成是数组上的每一个元素,例如:
所谓偏移量,可以理解为这个数组的下标。
当消费者用--from-beginning
参数时,表示从头开始消费,也就是从数组下标为0开始消费。
如果没有指定这个参数,只会消费最新的消息,也就是说,只有有新数据产生才能消费到
看到这里,你很容易想到,我们也可以指定偏移量来进行消费:
# --offset指定偏移量 --partition指定分区
./kafka-console-consumer.sh --bootstrap-server=10.211.55.3:9092 --topic mytopic --offset 1 --partition 0
值得一提的是,指定偏移量时,必须指定分区。
不了解分区没关系,后面会专门介绍分区的概念,默认分区数量是1个,这里我们指定0分区进行消费
消费组
消费数据时可以通过--group
参数指定消费组,下面说两个场景:
场景1:消费者A和消费者B在消费数据时都指定了同一个消费组--group group1
:
./kafka-console-consumer.sh --bootstrap-server=10.211.55.3:9092 --topic mytopic --group group1
那么生产者在生产数据后,会被A或者B消费到,并且只会被A或B其中一个消费到,这种情况也叫做单播,如下图:
场景2:消费者A和消费者B在消费数据时指定了不同的消费组:
# 消费者A,group1
./kafka-console-consumer.sh --bootstrap-server=10.211.55.3:9092 --topic mytopic --group group1
# 消费者B,group2
./kafka-console-consumer.sh --bootstrap-server=10.211.55.3:9092 --topic mytopic --group group2
生产者生产数据后,消费者A和消费者B都会收到数据,这种情况叫做多播,如下图:
可以使用以下命令查看消费者组列表:
./kafka-consumer-groups.sh --bootstrap-server=10.211.55.3:9092 --list
查看消费者组详情:
./kafka-consumer-groups.sh --bootstrap-server=10.211.55.3:9092 --describe --group group1
其中有几个字段比较重要:
- current-offset: 已消费偏移量
- Log-end-offset: 最大偏移量
- lag:未消费数
如果我现在关掉所有消费者,然后生产数据,消费者组详情就会如下:
可以看到最大偏移量为29,已消费到偏移量27,lag=2表示还有2条消息未消费