本文共 4417 字,大约阅读时间需要 14 分钟。
(一)、简介
1、Kafka简介:Kafka is a distributed,partitioned,replicated commit logservice。它提供了类似于JMS的特性,但是在设计实现上完全不同,此外它并不是JMS规范的实现。kafka对消息保存时根据Topic进行归类,发送消息者成为Producer,消息接受者成为Consumer,此外kafka集群有多个kafka实例组成,每个实例(server)成为broker。无论是kafka集群,还是producer和consumer都依赖于zookeeper来保证系统可用性集群保存一些meta信息。2、主要功能:
1:It lets you publish and subscribe to streams of records.发布和订阅消息流,这个功能类似于消息队列,这也是kafka归类为消息队列框架的原因 2:It lets you store streams of records in a fault-tolerant way.以容错的方式记录消息流,kafka以文件的方式来存储消息流 3:It lets you process streams of records as they occur.可以再消息发布的时候进行处理3、主要场景:
1、Messaging : 对于一些常规的消息系统,kafka是个不错的选择;partitons/replication和容错,可以使kafka具有良好的扩展性和性能优势.不过到目前为止,我们应该很清楚认识到,kafka并没有提供JMS中的"事务性""消息传输担保(消息确认机制)""消息分组"等企业级特性;kafka只能使用作为"常规"的消息系统,在一定程度上,尚未确保消息的发送与接收绝对可靠(比如,消息重发,消息发送丢失等)2、Websit activity tracking: kafka可以作为"网站活性跟踪"的最佳工具;可以将网页/用户操作等信息发送到kafka中.并实时监控,或者离线统计分析等3、Log Aggregation : kafka的特性决定它非常适合作为"日志收集中心";application可以将操作日志"批量""异步"的发送到kafka集群中,而不是保存在本地或者DB中;kafka可以批量提交消息/压缩消息等,这对producer端而言,几乎感觉不到性能的开支.此时consumer端可以使hadoop等其他系统化的存储和分析系统.4、工作机制
4.1、消息传输流程:Producer即生产者,向Kafka集群发送消息,在发送消息之前,会对消息进行分类,即Topic,上图展示了两个producer发送了分类为topic1的消息,另外一个发送了topic2的消息.
Topic即主题,通过对消息指定主题可以将消息分类,消费者可以只关注自己需要的Topic中的消息Consumer即消费者,消费者通过与kafka集群建立长连接的方式,不断地从集群中拉取消息,然后可以对这些消息进行处理。从上图中就可以看出同一个Topic下的消费者和生产者的数量并不是对应的。4.2、Kafka服务器存储策略
谈到kafka的存储,就不得不提到分区,即partitions,创建一个topic时,同时可以指定分区数目,分区数越多,其吞吐量也越大,但是需要的资源也越多,同时也会导致更高的不可用性,kafka在接收到生产者发送的消息之后,会根据均衡策略将消息存储到不同的分区中。在每个分区中,消息以顺序存储,最晚接收的的消息会最后被消费4.3、与生产者的交互
生产者在向kafka集群发送消息的时候,可以通过指定分区来发送到指定的分区中 也可以通过指定均衡策略来将消息发送到不同的分区中 如果不指定,就会采用默认的随机均衡策略,将消息随机的存储到不同的分区中4.4、与消费者的交互
在消费者消费消息时,kafka使用offset来记录当前消费的位置 在kafka的设计中,可以有多个不同的group来同时消费同一个topic下的消息,如图,我们有两个不同的group同时消费,他们的的消费的记录位置offset各不项目,不互相干扰。 对于一个group而言,消费者的数量不应该多余分区的数量,因为在一个group中,每个分区至多只能绑定到一个消费者上,即一个消费者可以消费多个分区,一个分区只能给一个消费者消费 因此,若一个group中的消费者数量大于分区数量的话,多余的消费者将不会收到任何消息。(二)、安装和使用
1、下载。可通过官网下载相应的版本( )。2、安装。由于kafka运行需要Java环境,因此需要安装jdk。以前的kafka还需要zookeeper,新版的kafka已经内置了一个zookeeper环境,所以我们可以直接使用
说是安装,如果只需要进行最简单的尝试的话我们只需要解压到任意目录即可,这里我们将kafka压缩包解压到所需的目录文件夹中。3、具体的安装配置步骤如下:
[appuser@Kafka2 ~]#wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.0.0/kafka_2.12-2.0.0.tgz[root@Kafka2 opt]# tar xf kafka_2.12-2.0.0.tgz [root@Kafka2 opt]# mv kafka_2.12-2.0.0 /usr/local/kafka [root@Kafka2 kafka]# grep '^[a-Z]' config/server.properties broker.id=0listeners=PLAINTEXT://172.20.67.57:9092num.network.threads=3num.io.threads=8socket.send.buffer.bytes=102400socket.receive.buffer.bytes=102400socket.request.max.bytes=104857600log.dirs=/tmp/kafka-logsnum.partitions=1num.recovery.threads.per.data.dir=1offsets.topic.replication.factor=1transaction.state.log.replication.factor=1transaction.state.log.min.isr=1log.retention.hours=168log.segment.bytes=1073741824log.retention.check.interval.ms=300000zookeeper.connect=localhost:2181zookeeper.connection.timeout.ms=6000group.initial.rebalance.delay.ms=0重要参数说明:1、broker.id 申明当前kafka服务器在集群中的唯一ID,需配置为integer,并且集群中的每一个kafka服务器的id都应是唯一的,我们这里采用默认配置即可2、listeners 申明此kafka服务器需要监听的端口号,如果是在本机上跑虚拟机运行可以不用配置本项,默认会使用localhost的地址,如果是在远程服务器上运行则必须配置,例如listeners=PLAINTEXT:// 192.168.180.128:9092。并确保服务器的9092端口能够访问3、zookeeper.connect 申明kafka所连接的zookeeper的地址 ,需配置为zookeeper的地址,由于本次使用的是kafka高版本中自带zookeeper,使用默认配置即可zookeeper.connect=localhost:2181
4、运行。
4.1、由于启动kafka需要先启动zookeeper才可以,所有先启动zookeeper服务。[root@Kafka2 kafka]# /usr/local/kafka/bin/zookeeper-server-start.sh /usr/local/kafka/config/zookeeper.properties看到以上输出基本正常了。
4.2、启动kafka。接下来可以启动kafka了,重新打开一个bash.
[root@Kafka2 ~]# /usr/local/kafka/bin/kafka-server-start.sh /usr/local/kafka/config/server.properties上图看出启动成功了。
5、创建测试消息。
5.1、创建一个topic。 Kafka通过topic对同一类的数据进行管理,同一类的数据使用同一个topic可以在处理数据时更加的便捷。在kafka解压目录打开终端,创建topic并查看。[root@Kafka2 ~]# /usr/local/kafka/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic test Created topic "test".[root@Kafka2 ~]# /usr/local/kafka/bin/kafka-topics.sh --list --zookeeper localhost:2181test
5.2、创建一个消费信息的消费者。消费者创建完成之后,因为还没有发送任何数据,因此这里在执行后没有打印出任何数据
[root@Kafka2 ~]# /usr/local/kafka/bin/kafka-console-consumer.sh --bootstrap-server 172.20.67.57:9092 --topic test --from-beginning`
5.3、创建一个消息的生产者。
[root@Kafka2 ~]# /usr/local/kafka/bin/kafka-console-producer.sh --broker-list 172.20.67.57:9092 --topic test>this is my sif first message
按回车键,发送消息。消费终端就可以看到了。
转载于:https://blog.51cto.com/liqingbiao/2313419