本文共 3550 字,大约阅读时间需要 11 分钟。
1、介绍:
kafka是一个分布式的信息流式处理的工具。
Kafka的特性:
高吞吐量、低延迟每个topic可以分多个partition, consumer group 对partition进行consume操作。
可扩展性:kafka集群支持热扩展。
持久性、可靠性:消息被持久化到本地磁盘,并且支持数据备份防止数据丢失
容错性:允许集群中节点失败(若副本数量为n,则允许n-1个节点失败)。
高并发:支持数千个客户端同时读写。
Kafka流程:
Kafka中发布订阅的对象是topic。我们可以为每类数据创建一个topic,把向topic发布消息的客户端称作producer,从topic订阅消息的客户端称作consumer。Producers和consumers可以同时从多个topic读写数据。一个kafka集群由一个或多个broker服务器组成,它负责持久化和备份具体的kafka消息。
Producers往Brokers里面的指定Topic中写消息,Consumers从Brokers里面拉去指定Topic的消息,然后进行业务处理。
一个topic实际是由多个partition组成的,遇到瓶颈时,可以通过增加partition的数量来进行横向扩容。单个parition内是保证消息有序。
正常的topic相当于一个MQ的队列,发布者发送message必须指定topic,然后Kafka会根据接收到的message进行load balance,均匀的分布到topic的不同的partition上,一个消费者组要全部消费这个topic上的所有partition,所以一个消费者组如果多个消费者,那么这里面的消费者是不能消费到全部消息的。
订阅topic是以一个消费组来订阅的,一个消费组里面可以有多个消费者。同一个消费组中的两个消费者,不会同时消费一个partition。换句话来说,就是一个partition,只能被消费组里的一个消费者消费,但是可以同时被多个消费组消费。因此,如果消费组内的消费者如果比partition多的话,那么就会有个别消费者一直空闲。
zookeeper作用:
zookeeper是为了解决分布式一致性问题的工具。
kafka 很多说不需要安装zk的是因为他们都使用了kafka自带的zk,至于kafka为什么使用zk,你首先要知道zk的作用, 作为去中心化的集群模式。需要要消费者知道现在那些生产者(对于消费者而言,kafka就是生产者)是可用的。如果没了zk消费者如何知道呢?如果每次消费者在消费之前都去尝试连接生产者测试下是否连接成功,效率呢?所以kafka需要zk,在kafka的设计中就依赖了zk了。
安装kafka之前需要先安装zookeeper集群,虽然卡夫卡有自带的zk集群,但是建议还是使用单独的zk集群。
具体原因:
kafka使用zookeeper来实现动态的集群扩展,不需要更改客户端(producer和consumer)的配置。broker会在zookeeper注册并保持相关的元数据(topic,partition信息等)更新。而客户端会在zookeeper上注册相关的watcher。一旦zookeeper发生变化,客户端能及时感知并作出相应调整。这样就保证了添加或去除broker时,各broker间仍能自动实现负载均衡。这里的客户端指的是Kafka的消息生产端(Producer)和消息消费端(Consumer)·
Kafka使用zk的分布式协调服务,将生产者,消费者,消息储存(broker,用于存储信息,消息读写等)结合在一起。
同时借助zk,kafka能够将生产者,消费者和broker在内的所有组件在无状态的条件下建立起生产者和消费者的订阅关系,实现生产者的负载均衡。
1. broker在zk中注册
2. topic在zk中注册
在kafka中可以定义很多个topic,每个topic又被分为很多个分区。一般情况下,每个分区独立在存在一个broker上,所有的这些topic和broker的对应关系都有zk进行维护
3. consumer(消费者)在zk中注册
所以,Zookeeper作用:管理broker、consumer。
2、kafka集群搭建
环境:
JDK:1.8.0_221
ZK:3.4.14
Kafka:0.11.0.0
Scala:2.11.8由于ZK、Kakfa运行需要依赖JVM环境,需要先安装JDK(网上很多,不再描述)https://www.oracle.com/java/technologies/javase-downloads.htmlwww.oracle.com
2.ZK安装
zooKeeper是作为分布式协调服务,是不需要依赖于Hadoop的环境,也可以为其他的分布式环境提供服务。Index of /apache/zookeeper/zookeeper-3.4.14mirror.bit.edu.cn
解压:
gunzip zookeeper-3.4.14.tar.gz
tar -zxvf zookeeper-3.4.14.tar
配置:
由于是单点模式,所以配置server.1=本地IP,或者不配置server默认localhost
进入conf文件夹
mv zoo_sample.cfg zoo.cfg
修改zoo.cfg的配置
#tickTime: zookeeper中使用的基本时间单位, 毫秒值.
#dataDir: 数据目录. 可以是任意目录.
#initLimit: 配置leader节点和follower节点启动并且完成数据同步的时间.
#syncLimit:leader节点和follower节点心跳检测的最大延迟时间.
#clientPort: 监听client连接的端口号.
#server.x中的“x”表示ZooKeeper Server进程的标识
例如:
clientPort=2181
server.1=localhost:2888:3888
server.2=localhost:2889:3889
server.3=localhost:2890:3890
启动:
./zkServer.sh start
验证:
./zkServer.sh status
连接:
./zkCli.sh -server localhost
3、Kafka集群搭建Apache Kafkakafka.apache.org
修改配置:
log.dirs=/Users/gaowei/Package/kafka_2.11-0.11.0.0/log
zookeeper.connect=自己的IP(或者localhost):2181
listeners=PLAINTEXT://自己的IP(或者localhost):9092
advertised.listeners=PLAINTEXT://自己的IP(或者localhost):9092
启动
./kafka-server-start.sh ./../config/server.properties &
测试:
1.创建topic:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test
2.查看topic列表:
bin/kafka-topics.sh --list --zookeeper localhost:2181
3.生成消息:
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
4.消费消息(从头消费):
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic test --from-beginning
创建topic:
生产者:
消费者:
出现报错原因:
kafka_2.11-0.10.2.1升级了消费者命令,新版本采用bootstrap-server参数,而不是之前的zookeeper参数。其实报错里面已经很清楚了(新版本指的是kafka 0.8.0之后的版本)。
修改消费者命令:
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
转载地址:http://vihhv.baihongyu.com/