目录

kafka集群安装

# 搭建zookeeper集群

# 主机规划:

192.168.100.7 node1

192.168.100.8 node2

192.168.100.9 node3

# 软件下载地址:

cd /home/app
wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.6.2/apache-zookeeper-3.6.2-bin.tar.gz
1
2

# 三台主机hosts文件一致:

# cat /etc/hosts
192.168.100.7 node1
192.168.100.8  node2
192.168.100.9  node3
1
2
3
4

# 在node1节点上

[root@node1 app]# tar zookeeper-3.6.2.tar.gz -C /home/app/
[root@node1 app]# mv zookeeper-3.6.2 zookeeper
[root@node1 app]# cd zookeeper/conf/
[root@node1 conf]# cp zoo_sample.cfg zoo.cfg
tickTime=2000
initLimit=10
syncLimit=5
dataDir=/tmp/zookeeper
clientPort=2181
server.1=node1:2888:3888
server.2=node2:2888:3888
server.3=node3:2888:3888
1
2
3
4
5
6
7
8
9
10
11
12

# 创建dataDir目录创建/tmp/zookeeper

[root@node1 conf]# mkdir /tmp/zookeeper
[root@node1 conf]# touch /tmp/zookeeper/myid
[root@node1 conf]# echo 1 > /tmp/zookeeper/myid
1
2
3

# 将zookeeper文件复制到另外两个节点:

[root@node1 conf]# scp -r zookeeper-3.6.2/ node2:/home/app/
[root@node1 conf]# scp -r zookeeper-3.6.2/ node3:/home/app/
1
2

# node2创建dataDir目录创建/tmp/zookeeper

[root@node2 conf]# mkdir /tmp/zookeeper
[root@node2 conf]# touch /tmp/zookeeper/myid
[root@node2 conf]# echo 2 > /tmp/zookeeper/myid
1
2
3

# node3创建dataDir目录创建/tmp/zookeeper

[root@node3 conf]# mkdir /tmp/zookeeper
[root@node3 conf]# touch /tmp/zookeeper/myid
[root@node3 conf]# echo 3 > /tmp/zookeeper/myid
1
2
3

# 分别在每个节点上启动 zookeeper测试:

[root@node1 zookeeper]# ./bin/zkServer.sh start
[root@node2 zookeeper]# ./bin/zkServer.sh start
[root@node3 zookeeper]# ./bin/zkServer.sh start
1
2
3

# 查看状态

[root@node1 zookeeper]# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower
[root@node2 zookeeper]# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: leader
[root@node3 zookeeper]# ./bin/zkServer.sh status
ZooKeeper JMX enabled by default
Using config: /usr/local/zookeeper/bin/../conf/zoo.cfg
Mode: follower
1
2
3
4
5
6
7
8
9
10
11
12

# 搭建Kafka集群

# 将Kafka安装包上传到虚拟机,并解压

[root@node1 ~]# cd /home/app/
[root@node1 app]# wget http://mirrors.hust.edu.cn/apache/kafka/2.8.0/kafka_2.13-2.8.0.tgz
[root@node1 app]# tar -xvzf kafka_2.13-2.8.0.tgz
[root@node1 app]# cd /home/app/kafka_2.13-2.8.0/
1
2
3
4

# 修改server.properties

[root@node1 app]# cd config
[root@node1 app]# vim server.properties
#指定broker的id
broker.id=0
listeners=PLAINTEXT://node1:9092
advertised.listeners=PLAINTEXT://node1:9092
#指定kafka数据的位置
log.dirs=/home/app/kafka_2.13-2.8.0/data
zookeeper.connect=node1:2181,node2:2181,node3:2181
1
2
3
4
5
6
7
8
9

# 将安装好的Kafka的复制到另外两台服务器

[root@node1 app]# cd /home/app
[root@node1 app]# scp -r kafka_2.13-2.8.0/ node2:$PWD
[root@node1 app]# scp -r kafka_2.13-2.8.0/ node3:$PWD
修改另外两个节点的broker.id分别为1和2
-----node2-----
[root@node2 ~]# cd /home/app/kafka_2.13-2.8.0/config
[root@node2 app]# vim server.properties
#指定broker的id
broker.id=1
listeners=PLAINTEXT://node2:9092
advertised.listeners=PLAINTEXT://node2:9092
-----node3-----
[root@node3 ~]# cd /home/app/kafka_2.13-2.8.0/config
[root@node3 config]# vim server.properties
#指定broker的id
broker.id=2
listeners=PLAINTEXT://node3:9092
advertised.listeners=PLAINTEXT://node3:9092
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18

# 配置KAFKA_HOME环境变量

vim /etc/profile
export KAFKA_HOME=/export/server/kafka_2.13-2.8.0
export PATH=:$PATH:${KAFKA_HOME}
分发到各个节点
scp /etc/profile node2.ip.cn:$PWD
scp /etc/profile node3.ip.cn:$PWD
每个节点加载环境变量
source /etc/profile
1
2
3
4
5
6
7
8

# 启动服务器

#启动ZooKeeper
nohup bin/zookeeper-server-start.sh config/zookeeper.properties &
#启动kafka
cd /export/server/kafka_2.12-2.4.1
nohup bin/kafka-server-start.sh config/server.properties &
#测试kafka集群是否启动成功
bin/kafka-topics.sh --bootstrap-server node1.ip.cn:9092 --list
1
2
3
4
5
6
7

# Kafka一键启动/关闭

# 在几点中创建/export/onekey 目录

cd /home/app/onekey
1

# 准备slave配置文件,用于保存要启动哪几个节点上的kafka

node1
node2
node3
1
2
3

# 编写start-kafka.sh脚本

vim start-kafka.sh
cat ./slave | while read line
do
{
echo $line
ssh $line "source /etc/profile; export JMX_PORT=9988;
nohup ${KAFKA_HOME}/bin/kafka-server-start.sh ${KAFKA_HOME}/config/server.properties > /dev/nul* 2>&1"&
}&
wait
done
1
2
3
4
5
6
7
8
9
10

# 编写stop-kafka.sh脚本

vim stop-kafka.sh
cat ./salve | while read line
do
{
echo $line
ssh $line "source /etc/profile; jps |grep Kafka |cut -d ' ' -f1 |xargs kill -9"&
}&
wait
done
1
2
3
4
5
6
7
8
9

# 基础操作

# 创建topic(主题)

#创建名为test的主题
bin/kafka-topics.sh --create --bootstrap-server node1.ip.cn:9092 --topic test
#查看目前kafka中的主题
bin/kafka-topics.sh --list --bootstrap-server node1.ip.cn:9092
1
2
3
4

# 生产消息到kafka

使用kafka内置的测试程序,生产一些消息到kafka的test主题中

bin/kafka-console-producer.sh --broker-list node1.ip.cn:9092 --topic test
1

# 从kafka消费消息

使用下面的命令来消费test主题中的消息

bin/kafka-console-consumer.sh --bootstrap-server node1.ip.cn:9092 --topic test --from-beginning
1

# 使用Kafka Tools操作Kafka

# 连接Kafka集群

安装kafka Tools后启动kafka

# Kafka基准测试

# 基于一个分区一个副本的基准测试

# 创建topic

bin/kafka-topics.sh --zookeeper node1:2181 --create --topic benchmark --partitions 1 --replication-factor 1
1

# 生成消息基本测试

在生产环境中,推荐使用生产5000W消息,这样性能数据会更准确些。为了方便测试,课程上演示500W的消息作为基准测试。

bin/kafka-producer-perf-test.sh --topic benchmark --num-records 5000000 --throughput -1 --record-size 1000 --producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1
1

bin/kafka-producer-perf-test.sh

--topoc topic的名字

--num-records 总共指定生产数据量(默认5000W)

--throughput 指定吞吐量—限流(-1不指定)

--record-size record数据大小(字节)

--producer-props bootstrap.servers=node1:9092,node2:9092,node3:9092 acks=1 指定kafka集群地址,ACK模式;

测试结果

吞吐量 93092.533979 records/sec
每秒9.3W条记录
吞吐速率 (88.78/sec)
每秒约89MB数据
平均延迟时间 346.62 ms avg latency
最大延迟时间 1003.00 ms max latency

# 消费消息基准测试

bin/kafka-consumer-perf-test.sh --broker-list node1:9092,node2:9092,node3:9092 --topic benchmark --fetch-size 1048576 --messages 5000000
1

bin/kafka-consumer-perf-test.sh

--broker-list 指定kafka集群地址

--topic 指定topic的名称

--fetch--size 每次拉取的数据大小

--messages 总共要消费的消息个数

测试结果

data.consumed.in.MB
共计消费的数据
4768.3434MB
MB.sec
每秒消费的数量
445.6006
每秒445MB
data.consumed.in.nMsg
共计消费的数量
5000000
nMsg.sec
每秒的数量
467234.0518
每秒46.7W

# Kafka中得重要概念

  • broker

    • kafka服务器,生产者、消费者都要连接broker
    • 一个集群由多个broker组成,功能实现kafka集群得负载均衡、容错
  • producer:生产者

  • consumer:消费者

  • topic:主题,一个Kafka集群中,可以包含多个topic。一个topic可以包含多个分区

    • 是一个逻辑结构,生产、消费消息都需要指定topic
  • partiton:kafka集群得分布式就是由分区来实现的。一个topic中得消息可以分布在topic中得不同partition中

  • replica:副本,实现kafka集群的容错,实现partition的容错。一个topic至少应该包含大于一个的副本

  • consumer group:消费者组,一个消费者组中的消费者可以共同消费topic中的分区数据。每一个消费者组都是一个唯一的名字。配置group.id一样的消费者是属于同一个组中

  • offset:偏移量。相对消费者、partition来说。可以通过offset来拉取数据

# 消费者组

一个消费者组中可以包含多个消费者,共同来消费topic中的数据

一个topic中如果只有一个分区,那么这个分区智能被某个组中的一个消费者消费

有多少分区,那么就可以被同一个组内的多少个消费者消费

设置test topic为2个分区

bin/kafka-topic.sh --zookeeper node1:2181 -alter --partitons 2 --topic test
1

# 幂等性

  • 生产者消息重复的问题

    • Kafka生产消息到partition,如果直接发送消息,Kafka会将消息保存到分区中,但Kafka会返回一个ack给生产者,标识当前操作是否成功,是否已经保存了这条消息。如果ack响应的过程失败了,此时生产者会重试,继续发送没有发送成功的消息,Kafka又会保存一条一摸一样的消息
  • 在Kafka中可以开启幂等性

    • 当kafka的生产消息时,会增加一个pid(生产者的唯一编号)和sequence number(针对消息的一个递增序列)
    • 发送消息,会连着pid和sequence number一起放松
    • Kafka接收到消息,会将消息和pid、sequence number一并保存下来
    • 如果ack响应失败,生产者重试,再次发送消息时,Kafka会根据pid、sequence number是否需要再保存一条消息
    • 判断条件:生产者发送过来的sequence number是否小于等于parition中消息对应的sequence

# kafka中的副本机制

# 生产者的分区写入策略

  • 轮询(按照消息尽量保证每一个分区的负载)策略,消息会均地分布到每个partition

    写入消息地时候,key为null的时候,默认使用的是轮询策略

  • 随机策略(不使用)

  • 按key写入策略,key.hash()%

  • 自定义分区策略

乱序问题

  • 在Kafka中生产者是有写入策略,如果topic有多个分区,就会将数据分分散在不同的partition中存储
  • 当partition数量大于1的时候,数据(消息)会打散分布在不同的partion中
  • 如果只有一个分区,消息是有序的

# 消费组Consumer Group Rebaleance机制

  • 再均衡:在某些情况下,消费者组中的消费的分区会产生变化,会导致消费者分配不均匀(例如:有两个消费者消费3个,因为某个partition崩溃了,还有一个消费者当前没有分区要削峰),当Kafka Consumer Group就会启动rebalance机制,重新平衡这个 Consumer Group 内的消费者消费的分区分配。

  • 触发时机

    • 消费者数量发生变化

      • 某个消费者crash
      • 新增消费者
    • topic的数量发生变化

      • 某个topic被删除
    • partition的数量发生变化

      • 删除partition

      • 新增partition

  • 不良影响

    • 发生rebalance,所有的consumer将不再工作,共同来参与再均衡,直到每个消费者都已经被成功分配所需要消费的分区为止(rebalance结束)

# 消费者的分配策略

分区分配策略:保障每个消费者尽量能够均衡地消费分区数据据,不能出现某个消费者分区的数量特别多,某个消费者消费的分区特别少

  • Rangele分配策略(范围分配策略)
    • n:分区的数量/消费者数量
    • m:分区的数量%消费者数量
  • 前m个消费者消费n+1个分区
  • 剩余的消费者消费nge
  • RoundRobin分配策略(轮询分配策略)
  • 消费者挨个分配消费的分区
  • Striky粘性分配策略
    • 在没有发生rebalance跟轮询分配策略是一致的
    • 发生了rebalance,轮询分配测率。重新走一遍轮询分配的过程。而粘性会保证跟上一次的尽量一致,只是将新的需要分配的分区,均匀的分配到现有可用的消费者中即可
    • 减少上下文的切换

# 副本的ACK机制

producer是不断地往Kafka中写入数据,写入数据会有一个返回结果,表示是否写入成功。这里对应有一个ACKs的配置。

  • acks = 0:生产者只管写入、不管是否写入成功,可能会数据丢失。性能是最好的
  • acks=1:生产者会等到leader分区写入成功后,返回成功,接着发送吓一跳
  • acks=-1/all:确保消息写入到leader分区、还确保消息写入到对应副本都成功后,接着发送下一条性能是最差的

根据业务情况来选择ack机制,是要求性能最高,一部分数据丢失影响不大,可以选择0/1。如果要求数据一定不能丢失,就得配置为-1/all。

分区中是有leader和follower的概念,为了确保消费者消费的数据是一致的,只能从分区leafer去读写消息follower做的事情就是同步数据,Backup

# 高级API(High-Level API)、低级API(Low-level API)

  • 高级APi就是直接让Kafka帮助管理、处理分配、数据
    • offset存储在ZK中
    • 由Kafka的rebalance来控制消费者分配的分区
    • 开发起来比较简单,无需开发者关注底层细节
    • 无法做到细粒度的控制
  • 低级API:由编写的程序来控制逻辑
    • 自己来管理offset,可以将offset存储在ZK、MySQL、Redis、HBase、Flink的转台存储
    • 指定消费者拉取某个分区的数据
    • 可以做到细粒度的控制
    • 原有的Kafka的策略会失效,需要我们自己来实现消费机制

# Kafka原理

# leader和follower

  • Kafka中的leader和follower 是相对分区有意义,不是相对broker
  • Kafka在创建topic的时候,会尽量分配分区的leader在不同的broker中,其实就是负载均衡
  • leader职责:读写数据
  • follower职责:同步数据、参与选举(leader crash 之后,会选举一个follower重新称为分区的leader)
  • 注意和ZooKeeper区分
  • ZK的leader负责读、写、follower可以读取
    • ZK的leader负责读、写、followe可以读取
    • Kafka的leader负责读写、follower不能读取数据(确保每个消费者消费的数据是一致的),Kafka一个topic有多个分区leader,一样可以实现数据操作的负载均衡

# AR\ISR\OR

  • AR表示一个topic下的所有副本
  • ISR:In Sync Replicas,正在同步的副本(可以理解为当前几个follower是存活的)
  • OSR:Out of Sync Replicas,不在同步的副本
  • AR = ISR+OSR

# leader选举

Controller:controller是Kafka集群的老大,是针对Broker的一个角色

  • controller是高可用的,是通过ZK来进行选举

Leader:是针对partition的一个角色

  • leader是通过ISR来进行快速选举

如果Kafka是基于ZK来进行选举,ZK的压力可能会比较大。例如:某个节点崩溃,这个节点上不仅仅有一个leader,是由不少的leader需要选举。通过ISR快速进行选举

leader的负载均衡

  • 如果某个broker crash之后,就可能回导致partition的leader分布不均匀,就是一个broker上存在一个topic下不通partition的leader
  • 通过以下指令,可以将leader分配到优先的leader对用的broker,确保leader是均匀分配的
bin/kafka-leader-election.sh --bootstrap-server node1:9092 --topic test --partition=2 --election-type preferred
1

# Kafka读写流程

写流程

  • 通过ZooKeeper找partition对应的leader,leader是负责读写

    • producer开始写入数据
    • ISR里面的follower开始同步数据,并返回给leader ACK
    • 返回给producer ACK
  • 读流程

    • 通过Zookeeper找partition对应的leader,leader是负责读的
    • 通过Zookeeper找到消费者对应的offset
    • 然后开始从offset往后顺序拉取数据
    • 提交offset(自动提交-每隔多少秒提交一次offset、手动提交--放入到事务中提交)

# Kafka的物理存储

  • Kafka的数据组织结构
    • topic
    • partition
    • segment
      • .log数据文件
      • .index(稀疏索引)
      • .timeindex(根据时间做的索引)
  • 深入了解数据的流程
    • 消费者的offset是一个针对partition全局offset
    • 可以根据这个offset找到segment段
    • 接着需要将全局的offset转换成segment的局部offset
    • 根据局部的offset,就可以从(.index稀疏索引)找到对应的数据位置
    • 开始顺序读取

# 消息传递语义性

Flink里面又对应的每种不同机制的保证,提供Exactly-once保障(二阶段事务提交方式)

  • At-most once:最多一次(只管把数据消费到,不管有没有成功,可能有数据丢失)
  • At-least once:最少一次(有可能会出现重复消费)
  • Exactly once:仅又一次(事务性的保障,保证消息有且呗处理一次)

# Kafka消息不丢失

  • broker消息不丢失:又副本relicas的存在,会不断的从leader中同步副本,所以,一个broker crash,不会导致数据丢失,除非只有一个副本。

  • 生产者消息不丢失:ACK机制(配置成ALL/-1)、配置成0,或者1可能存在丢失

  • 消费者消费不丢失,重点控制offset

    At-least once:一种数据可能会重复消费

    Exactly once:仅被一次消费

# 数据积压

  • 数据挤压是指消费者因为一些外部IO、一些比较耗时的操作(Full GC —— Stop the world),就会造成消息在partition中一直存在得不到消费,就会产生数据积压
  • 在企业中,我们要有监控系统,如果出现这种情况,需要尽快处理。虽然后续的Spark Streaming/Flink可以实现背压机制,但是数据库积累太多一定对实时系统的实时性是有影响的

# Kafka Eagle 安装

更新时间: 2024/01/17, 05:48:13
最近更新
01
JVM调优
12-10
02
jenkins
12-10
03
Arthas
12-10
更多文章>