消息队列
消息队列是一种应用程序间通信技术,有助于构建通用集成机制,以支持云原生、基于微服务、无服务器和混合云架构。
kafka
Kafka和Mysql相比,都是需要
- 先下载安装对应的包
- 接着可以直接命令行调用
- 也可以通过 python、java 这样的编程语言来调用
- 都存在地址接口的概念,localhost:9092 指的是本机地址的 9092 端口
- 如果你想调用 Docker 里的端口,要提前通过--expose 暴露出这个端口
- 如果你想调用服务器的 kafka 或者 mysql,得填服务器的 ip+端口
Kafka 的应用场景
-
异步处理
- 可以将一些比较耗时的操作放在其他系统中,通过消息队列将需要进行处理的消息进行存储,其他系统可以消费消息队列中的数据
- 比较常见的:发送短信验证码、发送邮件
-
系统解耦
- 原先一个微服务是通过接口(HTTP)调用另一个微服务,这时候耦合很严重,只要接口发生变化就会导致系统不可用
- 使用消息队列可以将系统进行解耦合,现在第一个微服务可以将消息放入到消息队列中,另一个微服务可以从消息队列中把消息取出来进行处理。进行系统解耦
-
流量削峰
- 因为消息队列是低延迟、高可靠、高吞吐的,可以应对大量并发
-
日志处理
- 可以使用消息队列作为临时存储,或者一种通信管道
Kafka 消息队列的两种模型
- 生产者、消费者模型
- 生产者负责将消息生产到 MQ 中
- 消费者负责从 MQ 中获取消息
- 生产者和消费者是解耦的,可能是生产者一个程序、消费者是另外一个程序
- 消息队列的模式
- 点对点:一个消费者消费一个消息
- 发布订阅:多个消费者可以消费一个消息
Kafka 中的重要概念
- broker
- Kafka 服务器进程,生产者、消费者都要连接 broker
- 一个集群由多个 broker 组成,功能实现 Kafka 集群的负载均衡、容错
- producer:生产者,用于生产数据,可将生产后的消息送入指定的 Topic;
- consumer:消费者,获取数据,可消费指定的 Topic;
- topic:一组消息数据的标记符,一个 Kafka 集群中,可以包含多个 topic。一个 topic 可以包含多个分区
- 是一个逻辑结构,生产、消费消息都需要指定 topic
- partition:分区,Kafka 集群的分布式就是由分区来实现的。一个 topic 中的消息可以分布在 topic 中的不同 partition 中,为了保证 kafka 的吞吐量,一个 Topic 可以设置多个分区。同一分区只能被一个消费者订阅。
- replica:副本,实现 Kafkaf 集群的容错,实现 partition 的容错。一个 topic 至少应该包含大于 1 个的副本
- consumer group:消费者组,一个消费者组中的消费者可以共同消费 topic 中的分区数据。每一个消费者组都一个唯一的名字。配置 group.id 一样的消费者是属于同一个组中,同一个 group 可以有多个消费者,一条消息在一个 group 中,只会被一个消费者获取;
- offset:偏移量。相对消费者、partition 来说,可以通过 offset 来拉取数据
消费者组
- 一个消费者组中可以包含多个消费者,共同来消费 topic 中的数据
- 一个 topic 中如果只有一个分区,那么这个分区只能被 某个组中的一个消费者消费
- 有多少个分区,那么就可以被同一个组内的多少个消费者消费
Kafka 集群搭建
- Kafka 集群是必须要有 ZooKeeper 的
注意:
- 每一个 Kafka 的节点都需要修改broker. id(每个节点的标识,不能重复)
- log.dir 数据存储目录需要配置
Kafka 的生产者/消费者/工具
- 安装 Kafka 集群,可以测试以下
- 创建一个 topic 主题(消息都是存放在 topic 中,类似 mysql 建表的过程)
- 基于 kafka 的内置测试生产者脚本来读取标准输入(键盘输入)的数据,并放入到 topic 中
- 基于 kafka 的内置测试消费者脚本来消费 topic 中的数据
- 推荐大家开发的使用 Kafka Tool
- 浏览 Kafka 集群节点、多少个 topic、多少个分区
- 创建 topic/删除 topic
- 浏览 ZooKeeper 中的数据
本地安装与启动(基于 Docker)
1.下载 zookeeper 镜像与 kafka 镜像
docker pull wurstmeister/zookeeper
docker pull wurstmeister/kafka
2.本地启动 zookeeper
docker run -d --name zookeeper -p 2181:2181 -t wurstmeister/zookeeper
3. 本地启动 kafka
docker run -d --name kafka --publish 9092:9092 --link zookeeper --env KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 --env KAFKA_ADVERTISED_HOST_NAME=localhost --env KAFKA_ADVERTISED_PORT=9092 wurstmeister/kafka:latest
注意:上述代码,将 kafka 启动在 9092 端口
4. 进入 kafka bash
docker exec -it kafka bash
cd /opt/kafka/bin
5. 创建 Topic,分区为 2,Topic name 为'kafka_demo'
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 2 --topic kafka_demo
6. 查看当前所有 topic
kafka-topics.sh --zookeeper zookeeper:2181 --list
7. 本地安装 kafka-python
pip install kafka-python
Docker 中 kafka 的 topic 增删改查命令汇总
- 增
创建Topic:
分区为2,Topic name为'kafka_demo'
kafka-topics.sh --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 2 --topic kafka_demo
增加分区:
通过--alter修改主题的分区数,增加分区。通过命令行工具操作,主题的分区只能增加,不能减少。否则报错。
kafka-topics.sh --zookeeper zookeeper:2181 --alter --topic myTop1 -- partitions 2
- 删
删除:
名为kafka_demo的主题
kafka-topics.sh --delete --zookeeper zookeeper:2181 --topic kafka_demo
- 改
kafka-topics.sh --zookeeper zookeeper:2181 --create --topic topic_test_01 --partitions 2 --replication-factor 1
kafka-topics.sh --zookeeper zookeeper:2181 --alter --topic topic_test_01 --config max.message.bytes=1048576
kafka-topics.sh --zookeeper zookeeper:2181 --describe --topic topic_test_01
kafka-topics.sh --zookeeper zookeeper:2181 --alter --topic topic_test_01 --config segment.bytes=10485760
kafka-topics.sh --zookeeper zookeeper:2181 --alter --delete-config max.message.bytes --topic topic_test_01
- 查
查看
当前所有topic
kafka-topics.sh --zookeeper zookeeper:2181 --list
查看
当前所有topic详情
kafka-topics.sh --zookeeper zookeeper:2181 --describe
Kafka 幂等性
-
生产者消息重复问题
- Kafka 生产者生产消息到 partition,如果直接发送消息,kafka 会将消息保存到分区中,但 Kafka 会返回一个 ack 给生产者,表示当前操作是否成功,是否已经保存了这条消息。如果 ack 响应的过程失败了,此时生产者会重试,继续发送没有发送成功的消息,Kafka 又会保存一条一模一样的消息
-
在 Kafka 中可以开启幂等性
- 当 Kafka 的生产者生产消息时,会增加一个 pid(生产者的唯一编号)和 sequence number(针对消息的一个递增序列)
- 发送消息,会连着 pid 和 sequence number 一块发送
- kafka 接收到消息,会将消息和 pid、sequence number 一并保存下来
- 如果 ack 响应失败,生产者重试,再次发送消息时,Kafka 会根据 pid、sequence number 是否需要再保存一条消息
- 判断条件:生产者发送过来的 sequence number 是否小于等于 partition 中消息对应的 sequence