- Apache Kafka Tutorial
- Apache Kafka - Home
- Apache Kafka - Introduction
- Apache Kafka - Fundamentals
- Apache Kafka - Cluster Architecture
- Apache Kafka - Work Flow
- Apache Kafka - Installation Steps
- Apache Kafka - Basic Operations
- Simple Producer Example
- Consumer Group Example
- Integration With Storm
- Integration With Spark
- Real Time Application(Twitter)
- Apache Kafka - Tools
- Apache Kafka - Applications
- Apache Kafka Useful Resources
- Apache Kafka - Quick Guide
- Apache Kafka - Useful Resources
- Apache Kafka - Discussion
Apache Kafka - 快速指南
Apache Kafka - 简介
在大数据中,使用了大量的数据。关于数据,我们主要面临两个挑战。第一个挑战是如何收集大量数据,第二个挑战是分析收集到的数据。为了克服这些挑战,您必须需要一个消息传递系统。
Kafka 专为分布式高吞吐量系统而设计。Kafka 往往能够很好地替代传统的消息代理。与其他消息系统相比,Kafka 具有更好的吞吐量、内置分区、复制和固有的容错能力,这使其非常适合大规模消息处理应用。
什么是消息系统?
消息系统负责将数据从一个应用程序传输到另一个应用程序,因此应用程序可以专注于数据,而不用担心如何共享数据。分布式消息传递基于可靠消息队列的概念。消息在客户端应用程序和消息传递系统之间异步排队。有两种类型的消息传递模式可用 - 一种是点对点,另一种是发布-订阅 (pub-sub) 消息传递系统。大多数消息传递模式都遵循pub-sub。
点对点消息系统
在点对点系统中,消息保存在队列中。一个或多个消费者可以消费队列中的消息,但一条特定消息最多只能由一个消费者消费。一旦消费者读取队列中的消息,该消息就会从该队列中消失。该系统的典型示例是订单处理系统,其中每个订单将由一个订单处理器处理,但多个订单处理器也可以同时工作。下图描述了该结构。
发布订阅消息系统
在发布-订阅系统中,消息保存在主题中。与点对点系统不同,消费者可以订阅一个或多个主题并消费该主题中的所有消息。在发布-订阅系统中,消息生产者称为发布者,消息消费者称为订阅者。现实生活中的一个例子是 Dish TV,它发布了不同的频道,如体育、电影、音乐等,任何人都可以订阅自己的一组频道,并在订阅的频道可用时获取它们。
卡夫卡是什么?
Apache Kafka 是一种分布式发布-订阅消息传递系统和强大的队列,可以处理大量数据,并使您能够将消息从一个端点传递到另一个端点。Kafka既适合离线消息消费,也适合在线消息消费。Kafka 消息持久保存在磁盘上并在集群内进行复制,以防止数据丢失。Kafka 构建在 ZooKeeper 同步服务之上。它与 Apache Storm 和 Spark 很好地集成,用于实时流数据分析。
好处
以下是 Kafka 的一些好处 -
可靠性- Kafka 是分布式、分区、复制和容错的。
可扩展性- Kafka 消息系统可以轻松扩展,无需停机。
持久性- Kafka 使用
分布式提交日志
,这意味着消息尽可能快地保留在磁盘上,因此它是持久的。性能- Kafka 在发布和订阅消息方面具有高吞吐量。即使存储大量 TB 的消息,也能保持稳定的性能。
Kafka 速度非常快,并且保证零停机和零数据丢失。
用例
Kafka 可用于许多用例。其中一些列于下面 -
指标- Kafka 通常用于操作监控数据。这涉及聚合来自分布式应用程序的统计数据以生成集中的操作数据源。
日志聚合解决方案- Kafka 可以在整个组织中使用,从多个服务收集日志,并以标准格式提供给多个消费者。
流处理- Storm 和 Spark Streaming 等流行框架从主题读取数据,对其进行处理,然后将处理后的数据写入新主题,供用户和应用程序使用。Kafka强大的耐用性在流处理的场景中也非常有用。
需要卡夫卡
Kafka 是一个用于处理所有实时数据源的统一平台。Kafka支持低延迟消息传递,并在机器故障时保证容错。它有能力处理大量不同的消费者。Kafka 速度非常快,每秒执行 200 万次写入。Kafka 将所有数据持久保存到磁盘,这本质上意味着所有写入都进入操作系统(RAM)的页面缓存。这使得将数据从页面缓存传输到网络套接字变得非常高效。
Apache Kafka - 基础知识
在深入了解 Kafka 之前,您必须了解主题、经纪人、生产者和消费者等主要术语。下图说明了主要术语,表格详细描述了图表组件。
在上图中,一个主题被配置为三个分区。分区 1 有两个偏移因子 0 和 1。分区 2 有 4 个偏移因子 0、1、2 和 3。分区 3 有 1 个偏移因子 0。副本的 id 与托管它的服务器的 id 相同。
假设,如果主题的复制因子设置为 3,那么 Kafka 将为每个分区创建 3 个相同的副本,并将它们放置在集群中以供其所有操作使用。为了平衡集群中的负载,每个代理存储一个或多个这些分区。多个生产者和消费者可以同时发布和检索消息。
序列号 | 组件和描述 |
---|---|
1 | 主题 属于特定类别的消息流称为主题。数据存储在主题中。 主题被分成多个分区。对于每个主题,Kafka 至少保留一个分区。每个这样的分区都包含不可变有序序列中的消息。分区被实现为一组大小相等的段文件。 |
2 | 分割 主题可能有许多分区,因此它可以处理任意数量的数据。 |
3 |
分区偏移量 每个分区消息都有一个唯一的序列 ID,称为 |
4 |
分区的副本 副本只不过是分区的 |
5 |
经纪人
|
6 |
卡夫卡集群 Kafka 有多个 Broker,称为 Kafka 集群。Kafka集群可以在不停机的情况下进行扩展。这些集群用于管理消息数据的持久性和复制。 |
7 |
制片人 生产者是一个或多个 Kafka 主题的消息发布者。生产者将数据发送给 Kafka 经纪人。每次生产者向代理发布消息时,代理只需将消息附加到最后一个段文件。实际上,消息将被附加到一个分区中。生产者还可以将消息发送到他们选择的分区。 |
8 |
消费者 消费者从经纪人那里读取数据。消费者订阅一个或多个主题,并通过从代理中提取数据来消费已发布的消息。 |
9 |
领导者
|
10 |
追随者 遵循领导者指令的节点称为跟随者。如果领导者失败,其中一名追随者将自动成为新的领导者。追随者充当普通消费者,提取消息并更新自己的数据存储。 |
Apache Kafka - 集群架构
看看下面的插图。它显示了Kafka的集群图。
下表描述了上图中显示的每个组件。
序列号 | 组件和描述 |
---|---|
1 |
经纪人 Kafka集群通常由多个broker组成以维持负载平衡。Kafka 代理是无状态的,因此它们使用 ZooKeeper 来维护集群状态。一个 Kafka 代理实例每秒可以处理数十万次读取和写入,每个代理可以处理 TB 级消息而不影响性能。Kafka Broker Leader 选举可以由 ZooKeeper 来完成。 |
2 |
动物园管理员 ZooKeeper用于管理和协调Kafka代理。ZooKeeper服务主要用于通知生产者和消费者Kafka系统中是否存在任何新的Broker或Kafka系统中的Broker发生故障。根据 Zookeeper 收到的有关代理存在或失败的通知,生产者和消费者会做出决定并开始与其他代理协调他们的任务。 |
3 |
制片人 生产者将数据推送给经纪人。当新的代理启动时,所有生产者都会搜索它并自动向该新代理发送一条消息。Kafka 生产者不会等待代理的确认,而是以代理可以处理的速度发送消息。 |
4 |
消费者 由于 Kafka Broker 是无状态的,这意味着消费者必须使用分区偏移量来维护已消费了多少条消息。如果消费者确认特定的消息偏移量,则意味着消费者已经消费了所有先前的消息。消费者向代理发出异步拉取请求,以获得可供使用的字节缓冲区。消费者只需提供偏移值即可倒带或跳到分区中的任何点。消费者偏移值由ZooKeeper通知。 |
Apache Kafka - 工作流程
到目前为止,我们讨论了Kafka的核心概念。现在让我们来了解一下 Kafka 的工作流程。
Kafka 只是划分为一个或多个分区的主题的集合。Kafka 分区是一个线性排序的消息序列,其中每条消息都由其索引(称为偏移量)标识。Kafka 集群中的所有数据都是分区的不相交联合。传入消息写入分区的末尾,消息由消费者顺序读取。通过将消息复制到不同的代理来提供持久性。
Kafka 以快速、可靠、持久、容错和零停机的方式提供基于发布-订阅和队列的消息系统。在这两种情况下,生产者只需将消息发送到主题,而消费者可以根据需要选择任何一种类型的消息系统。让我们按照下一节中的步骤来了解消费者如何选择他们所选择的消息系统。
Pub-Sub 消息传递的工作流程
以下是发布-订阅消息传递的逐步工作流程 -
生产者定期向主题发送消息。
Kafka 代理将所有消息存储在为该特定主题配置的分区中。它确保消息在分区之间平等共享。如果生产者发送两条消息并且有两个分区,Kafka 将在第一个分区中存储一条消息,在第二个分区中存储第二条消息。
消费者订阅特定主题。
一旦消费者订阅了某个主题,Kafka 就会向消费者提供该主题的当前偏移量,并将该偏移量保存在 Zookeeper 集合中。
消费者会定期(例如100毫秒)向Kafka请求新消息。
一旦 Kafka 收到来自生产者的消息,它就会将这些消息转发给消费者。
消费者将接收消息并处理它。
处理消息后,消费者将向 Kafka 代理发送确认。
一旦 Kafka 收到确认,它就会将偏移量更改为新值并在 Zookeeper 中更新它。由于偏移量是在 Zookeeper 中维护的,因此即使在服务器中断期间,消费者也可以正确读取下一条消息。
上述流程将重复,直到消费者停止请求。
消费者可以选择随时倒回/跳到主题的所需偏移量并阅读所有后续消息。
队列消息传递/消费者组的工作流程
在队列消息系统中,具有相同Group ID 的
一组消费者将订阅一个主题,而不是单个消费者。简单来说,订阅具有相同Group ID 的
主题的消费者被视为一个组,并且消息在它们之间共享。让我们来看看这个系统的实际工作流程。
生产者定期向主题发送消息。
Kafka 将所有消息存储在为该特定主题配置的分区中,类似于之前的场景。
单个消费者订阅特定主题,假设
Topic-01
的Group ID
为Group-1
。Kafka 以与 Pub-Sub 消息传递相同的方式与消费者进行交互,直到新消费者订阅相同的主题
Topic-01
,其组 ID
与Group-1
相同。一旦新的消费者到达,Kafka 将其操作切换到共享模式并在两个消费者之间共享数据。这种共享将持续下去,直到消费者数量达到为该特定主题配置的分区数量。
一旦消费者数量超过分区数量,新消费者将不会收到任何进一步的消息,直到任何一个现有消费者取消订阅。出现这种情况是因为 Kafka 中的每个消费者将被分配至少一个分区,一旦所有分区都分配给现有消费者,新的消费者将不得不等待。
此功能也称为
消费者组
。同样,Kafka 将以非常简单且高效的方式提供这两个系统的优点。
ZooKeeper 的角色
Apache Kafka 的一个关键依赖项是 Apache Zookeeper,它是一种分布式配置和同步服务。Zookeeper 充当 Kafka 代理和消费者之间的协调接口。Kafka 服务器通过 Zookeeper 集群共享信息。Kafka 在 Zookeeper 中存储基本元数据,例如有关主题、代理、消费者偏移量(队列读取器)等的信息。
由于所有关键信息都存储在 Zookeeper 中,并且它通常会在其整体中复制这些数据,因此 Kafka 代理/Zookeeper 的故障不会影响 Kafka 集群的状态。一旦 Zookeeper 重新启动,Kafka 将恢复状态。这使得 Kafka 的停机时间为零。当领导者失败时,Kafka Broker 之间的领导者选举也是通过 Zookeeper 完成的。
了解更多关于Zookeeper的信息,请参考zookeeper
让我们在下一章继续进一步介绍如何在您的计算机上安装 Java、ZooKeeper 和 Kafka。
Apache Kafka - 安装步骤
以下是在您的计算机上安装 Java 的步骤。
第 1 步 - 验证 Java 安装
希望您现在已经在计算机上安装了 java,因此您只需使用以下命令进行验证即可。
$ java -version
如果您的机器上成功安装了java,您可以看到已安装的Java的版本。
步骤 1.1 - 下载 JDK
如果未下载Java,请访问以下链接下载最新版本的JDK并下载最新版本。
http://www.oracle.com/technetwork/java/javase/downloads/index.html现在最新版本是JDK 8u 60,文件是“jdk-8u60-linux-x64.tar.gz”。请将文件下载到您的计算机上。
步骤 1.2 - 提取文件
通常,正在下载的文件存储在下载文件夹中,验证它并使用以下命令提取 tar 安装程序。
$ cd /go/to/download/path $ tar -zxf jdk-8u60-linux-x64.gz
步骤 1.3 - 移至 Opt 目录
要使 java 对所有用户可用,请将提取的 java 内容移动到usr/local/java
/ 文件夹。
$ su password: (type password of root user) $ mkdir /opt/jdk $ mv jdk-1.8.0_60 /opt/jdk/
步骤 1.4 - 设置路径
要设置路径和 JAVA_HOME 变量,请将以下命令添加到 ~/.bashrc 文件中。
export JAVA_HOME =/usr/jdk/jdk-1.8.0_60 export PATH=$PATH:$JAVA_HOME/bin
现在将所有更改应用到当前运行的系统中。
$ source ~/.bashrc
步骤 1.5 - Java 替代方案
使用以下命令更改 Java 替代方案。
update-alternatives --install /usr/bin/java java /opt/jdk/jdk1.8.0_60/bin/java 100
步骤 1.6 - 现在使用步骤 1 中解释的验证命令(java -version)验证 java。
步骤 2 - ZooKeeper 框架安装
步骤 2.1 - 下载 ZooKeeper
要在您的计算机上安装 ZooKeeper 框架,请访问以下链接并下载最新版本的 ZooKeeper。
http://zookeeper.apache.org/releases.html截至目前,ZooKeeper 的最新版本是 3.4.6 (ZooKeeper-3.4.6.tar.gz)。
步骤 2.2 - 提取 tar 文件
使用以下命令提取 tar 文件
$ cd opt/ $ tar -zxf zookeeper-3.4.6.tar.gz $ cd zookeeper-3.4.6 $ mkdir data
步骤 2.3 - 创建配置文件
使用命令 vi “ conf/zoo.cfg” 打开名为 conf/
zoo.cfg的配置文件,并将以下所有参数设置为起点。
$ vi conf/zoo.cfg tickTime=2000 dataDir=/path/to/zookeeper/data clientPort=2181 initLimit=5 syncLimit=2
配置文件保存成功并再次返回终端后,就可以启动zookeeper服务器了。
步骤 2.4 - 启动 ZooKeeper 服务器
$ bin/zkServer.sh start
执行此命令后,您将得到如下所示的响应 -
$ JMX enabled by default $ Using config: /Users/../zookeeper-3.4.6/bin/../conf/zoo.cfg $ Starting zookeeper ... STARTED
步骤 2.5 - 启动 CLI
$ bin/zkCli.sh
输入上述命令后,您将连接到zookeeper服务器并得到以下响应。
Connecting to localhost:2181 ................ ................ ................ Welcome to ZooKeeper! ................ ................ WATCHER:: WatchedEvent state:SyncConnected type: None path:null [zk: localhost:2181(CONNECTED) 0]
步骤 2.6 - 停止 Zookeeper 服务器
连接服务器并执行所有操作后,您可以使用以下命令停止 Zookeeper 服务器 -
$ bin/zkServer.sh stop
现在您已经在计算机上成功安装了 Java 和 ZooKeeper。让我们看看安装 Apache Kafka 的步骤。
第 3 步 - Apache Kafka 安装
让我们继续执行以下步骤,在您的计算机上安装 Kafka。
步骤 3.1 - 下载 Kafka
要在您的计算机上安装 Kafka,请单击以下链接 -
https://www.apache.org/dyn/closer.cgi?path=/kafka/0.9.0.0/kafka_2.11-0.9.0.0.tgz现在最新版本即kafka_2.11_0.9.0.0.tgz将下载到您的计算机上。
步骤 3.2 - 提取 tar 文件
使用以下命令提取 tar 文件 -
$ cd opt/ $ tar -zxf kafka_2.11.0.9.0.0 tar.gz $ cd kafka_2.11.0.9.0.0
现在您已经在计算机上下载了最新版本的 Kafka。
步骤 3.3 - 启动服务器
您可以通过发出以下命令来启动服务器 -
$ bin/kafka-server-start.sh config/server.properties
服务器启动后,您将在屏幕上看到以下响应 -
$ bin/kafka-server-start.sh config/server.properties [2016-01-02 15:37:30,410] INFO KafkaConfig values: request.timeout.ms = 30000 log.roll.hours = 168 inter.broker.protocol.version = 0.9.0.X log.preallocate = false security.inter.broker.protocol = PLAINTEXT ……………………………………………. …………………………………………….
第 4 步 - 停止服务器
执行完所有操作后,您可以使用以下命令停止服务器 -
$ bin/kafka-server-stop.sh config/server.properties
现在我们已经讨论了 Kafka 的安装,我们可以在下一章中学习如何对 Kafka 进行基本操作。
Apache Kafka - 基本操作
首先让我们开始实现单节点单代理
配置,然后将我们的设置迁移到单节点多代理配置。
希望您现在已经在计算机上安装了 Java、ZooKeeper 和 Kafka。在进行 Kafka 集群设置之前,首先您需要启动 ZooKeeper,因为 Kafka 集群使用 ZooKeeper。
启动动物园管理员
打开一个新终端并输入以下命令 -
bin/zookeeper-server-start.sh config/zookeeper.properties
要启动 Kafka Broker,请输入以下命令 -
bin/kafka-server-start.sh config/server.properties
启动 Kafka Broker 后,在 ZooKeeper 终端上输入命令jps
,您将看到以下响应 -
821 QuorumPeerMain 928 Kafka 931 Jps
现在您可以看到终端上运行着两个守护进程,其中 QuorumPeerMain 是 ZooKeeper 守护进程,另一个是 Kafka 守护进程。
单节点单代理配置
在此配置中,您有一个 ZooKeeper 和代理 ID 实例。以下是配置它的步骤 -
创建 Kafka 主题- Kafka 提供了一个名为kafka-topics.sh
的命令行实用程序来在服务器上创建主题。打开新终端并输入以下示例。
句法
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic topic-name
例子
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic Hello-Kafka
我们刚刚创建了一个名为Hello-Kafka
的主题,该主题具有单个分区和一个副本因子。上面创建的输出将类似于以下输出 -
输出- 创建主题Hello-Kafka
创建主题后,您可以在 Kafka 代理终端窗口中获取通知以及 config/server.properties 文件中的“/tmp/kafka-logs/”中指定的已创建主题的日志。
主题列表
要获取 Kafka 服务器中的主题列表,您可以使用以下命令 -
句法
bin/kafka-topics.sh --list --zookeeper localhost:2181
输出
Hello-Kafka
由于我们已经创建了一个主题,因此它只会列出Hello-Kafka
。假设,如果您创建多个主题,您将在输出中获得主题名称。
启动Producer发送消息
句法
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic topic-name
从上面的语法来看,生产者命令行客户端需要两个主要参数 -
Broker-list - 我们要将消息发送到的代理列表。在这种情况下,我们只有一名经纪人。Config/server.properties 文件包含代理端口 ID,因为我们知道我们的代理正在侦听端口 9092,因此您可以直接指定它。
主题名称- 这是主题名称的示例。
例子
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka
生产者将等待来自标准输入的输入并发布到 Kafka 集群。默认情况下,每个新行都会作为新消息发布,然后在config/ Producer.properties
文件中指定默认生产者属性。现在您可以在终端中输入几行消息,如下所示。
输出
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Hello-Kafka[2016-01-16 13:50:45,931] WARN property topic is not valid (kafka.utils.Verifia-bleProperties) Hello My first message
My second message
启动Consumer接收消息
与生产者类似,默认的消费者属性在config/consumer.proper-ties
文件中指定。打开一个新终端并输入以下语法来使用消息。
句法
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic topic-name --from-beginning
例子
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Hello-Kafka --from-beginning
输出
Hello My first message My second message
最后,您可以从生产者的终端输入消息并看到它们出现在消费者的终端中。到目前为止,您对具有单个代理的单节点集群有了很好的了解。现在让我们继续讨论多个代理配置。
单节点多代理配置
在继续进行多代理集群设置之前,首先启动 ZooKeeper 服务器。
创建多个 Kafka 代理- 我们在 config/server.properties 中已有一个 Kafka 代理实例。现在我们需要多个代理实例,因此将现有的 server.prop-erties 文件复制到两个新的配置文件中,并将其重命名为 server-one.properties 和 server-two.prop-erties。然后编辑这两个新文件并分配以下更改 -
配置/服务器-one.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=1 # The port the socket server listens on port=9093 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-1
配置/服务器两个.properties
# The id of the broker. This must be set to a unique integer for each broker. broker.id=2 # The port the socket server listens on port=9094 # A comma seperated list of directories under which to store log files log.dirs=/tmp/kafka-logs-2
启动多个代理- 在三台服务器上进行所有更改后,然后打开三个新终端来一一启动每个代理。
Broker1 bin/kafka-server-start.sh config/server.properties Broker2 bin/kafka-server-start.sh config/server-one.properties Broker3 bin/kafka-server-start.sh config/server-two.properties
现在我们的机器上运行着三个不同的代理。自己尝试一下,通过在 ZooKeeper 终端上输入jps来检查所有守护进程,然后您将看到响应。
创建主题
让我们将此主题的复制因子值指定为 3,因为我们正在运行三个不同的代理。如果您有两个代理,则分配的副本值将为两个。
句法
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic topic-name
例子
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 -partitions 1 --topic Multibrokerapplication
输出
created topic “Multibrokerapplication”
描述
命令用于检查哪个代理正在侦听当前创建的主题,如下所示 -
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation
输出
bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic Multibrokerappli-cation Topic:Multibrokerapplication PartitionCount:1 ReplicationFactor:3 Configs: Topic:Multibrokerapplication Partition:0 Leader:0 Replicas:0,2,1 Isr:0,2,1
从上面的输出中,我们可以得出结论,第一行给出了所有分区的摘要,显示主题名称、分区计数和我们已经选择的复制因子。在第二行中,每个节点将成为随机选择的分区部分的领导者。
在我们的例子中,我们看到我们的第一个经纪人(broker.id 0)是领导者。那么 Replicas:0,2,1 意味着所有代理都复制该主题,最后Isr是
同步
副本的集合。嗯,这是当前存活并被领导者追上的副本的子集。
启动Producer发送消息
此过程与单一代理设置中的过程相同。
例子
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication
输出
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic Multibrokerapplication [2016-01-20 19:27:21,045] WARN Property topic is not valid (kafka.utils.Verifia-bleProperties) This is single node-multi broker demo This is the second message
启动Consumer接收消息
此过程与单一代理设置中所示的过程相同。
例子
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Multibrokerapplica-tion --from-beginning
输出
bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic Multibrokerapplica-tion —from-beginning This is single node-multi broker demo This is the second message
主题基本操作
本章我们将讨论各种基本主题操作。
修改主题
正如您已经了解如何在 Kafka 集群中创建主题。现在让我们使用以下命令修改创建的主题
句法
bin/kafka-topics.sh —zookeeper localhost:2181 --alter --topic topic_name --parti-tions count
例子
We have already created a topic “Hello-Kafka” with single partition count and one replica factor. Now using “alter” command we have changed the partition count. bin/kafka-topics.sh --zookeeper localhost:2181 --alter --topic Hello-kafka --parti-tions 2
输出
WARNING: If partitions are increased for a topic that has a key, the partition logic or ordering of the messages will be affected Adding partitions succeeded!
删除主题
要删除主题,您可以使用以下语法。
句法
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic topic_name
例子
bin/kafka-topics.sh --zookeeper localhost:2181 --delete --topic Hello-kafka
输出
> Topic Hello-kafka marked for deletion
注意 -如果delete.topic.enable未设置为 true,这不会产生任何影响
Apache Kafka - 简单的生产者示例
让我们创建一个使用 Java 客户端发布和使用消息的应用程序。Kafka 生产者客户端由以下 API 组成。
Kafka生产者API
让我们了解本节中最重要的 Kafka 生产者 API 集。KafkaProducer API 的核心部分是KafkaProducer
类。KafkaProducer 类提供了一个选项,可以通过以下方法在其构造函数中连接 Kafka 代理。
KafkaProducer 类提供 send 方法将消息异步发送到主题。send()的签名如下
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1) , callback);
ProducerRecord - 生产者管理等待发送的记录缓冲区。
回调- 当服务器确认记录时执行的用户提供的回调(null 表示没有回调)。
KafkaProducer类提供了一个flush方法来确保所有先前发送的消息已经实际完成。刷新方法的语法如下 -
public void flush()
KafkaProducer 类提供了partitionFor 方法,该方法有助于获取给定主题的分区元数据。这可用于自定义分区。该方法的签名如下 -
public Map metrics()
它返回由生产者维护的内部指标的映射。
public void close() - KafkaProducer 类提供 close 方法块,直到所有先前发送的请求完成。
生产者API
Producer API 的核心部分是Producer
类。Producer 类在其构造函数中提供了通过以下方法连接 Kafka Broker 的选项。
制作人阶层
生产者类提供 send 方法,使用以下签名将消息发送到单个或多个主题。
public void send(KeyedMessaget<k,v> message) - sends the data to a single topic,par-titioned by key using either sync or async producer. public void send(List<KeyedMessage<k,v>>messages) - sends data to multiple topics. Properties prop = new Properties(); prop.put(producer.type,”async”) ProducerConfig config = new ProducerConfig(prop);
有两种类型的生产者——同步和异步。
相同的 API 配置也适用于同步生产者。
它们之间的区别是同步生产者直接发送消息,但在后台发送消息。当您想要更高的吞吐量时,异步生产者是首选。在以前的版本(如 0.8)中,异步生产者没有 send() 的回调来注册错误处理程序。这仅在当前版本 0.9 中可用。
公共无效关闭()
Producer 类提供close方法来关闭生产者池与所有 Kafka 代理的连接。
配置设置
下表列出了 Producer API 的主要配置设置,以便更好地理解 -
序列号 | 配置设置和说明 |
---|---|
1 |
客户端ID 识别生产者应用程序 |
2 |
生产者类型 同步或异步 |
3 |
确认 acks 配置控制生产者请求被视为完整的标准。 |
4 |
重试 如果生产者请求失败,则自动使用特定值重试。 |
5 |
引导服务器 经纪人的引导列表。 |
6 |
徘徊者 如果您想减少请求数量,可以将 linger.ms 设置为大于某个值。 |
7 |
键序列化器 串行器接口的键。 |
8 |
值序列化器 串行器接口的值。 |
9 |
批量大小 缓冲区大小。 |
10 |
缓冲存储器 控制生产者可用于缓冲的内存总量。 |
生产者记录 API
ProducerRecord 是发送到 Kafka 集群的键/值对。ProducerRecord 类构造函数用于使用以下签名创建带有分区、键和值对的记录。
public ProducerRecord (string topic, int partition, k key, v value)
主题- 用户定义的主题名称将附加到记录中。
分区- 分区计数
Key - 将包含在记录中的密钥。
- 值- 记录内容
public ProducerRecord (string topic, k key, v value)
ProducerRecord 类构造函数用于创建带有键、值对且不带分区的记录。
主题- 创建一个主题来分配记录。
密钥- 记录密钥。
值- 记录内容。
public ProducerRecord (string topic, v value)
ProducerRecord 类创建一条没有分区和键的记录。
主题- 创建一个主题。
值- 记录内容。
下表列出了 ProducerRecord 类方法 -
序列号 | 类方法和说明 |
---|---|
1 |
公共字符串主题() 主题将附加到记录中。 |
2 |
公共K密钥() 将包含在记录中的密钥。如果没有这样的键,则此处将返回 null。 |
3 |
公共V值() 记录内容。 |
4 |
分割() 记录的分区计数 |
简单生产者应用程序
在创建应用程序之前,首先启动 ZooKeeper 和 Kafka 代理,然后使用 create topic 命令在 Kafka 代理中创建您自己的主题。之后创建一个名为Sim-pleProducer.java
的 java 类并输入以下代码。
//import util.properties packages import java.util.Properties; //import simple producer packages import org.apache.kafka.clients.producer.Producer; //import KafkaProducer packages import org.apache.kafka.clients.producer.KafkaProducer; //import ProducerRecord packages import org.apache.kafka.clients.producer.ProducerRecord; //Create java class named “SimpleProducer” public class SimpleProducer { public static void main(String[] args) throws Exception{ // Check arguments length value if(args.length == 0){ System.out.println("Enter topic name"); return; } //Assign topicName to string variable String topicName = args[0].toString(); // create instance for properties to access producer configs Properties props = new Properties(); //Assign localhost id props.put("bootstrap.servers", “localhost:9092"); //Set acknowledgements for producer requests. props.put("acks", “all"); //If the request fails, the producer can automatically retry, props.put("retries", 0); //Specify buffer size in config props.put("batch.size", 16384); //Reduce the no of requests less than 0 props.put("linger.ms", 1); //The buffer.memory controls the total amount of memory available to the producer for buffering. props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer <String, String>(props); for(int i = 0; i < 10; i++) producer.send(new ProducerRecord<String, String>(topicName, Integer.toString(i), Integer.toString(i))); System.out.println(“Message sent successfully”); producer.close(); } }
编译- 可以使用以下命令编译应用程序。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
执行- 可以使用以下命令执行应用程序。
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>
输出
Message sent successfully To check the above output open new terminal and type Consumer CLI command to receive messages. >> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning 1 2 3 4 5 6 7 8 9 10
简单的消费者示例
到目前为止,我们已经创建了一个生产者来将消息发送到 Kafka 集群。现在让我们创建一个消费者来消费来自 Kafka 集群的消息。KafkaConsumer API用于消费来自Kafka集群的消息。KafkaConsumer 类构造函数定义如下。
public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)
configs - 返回消费者配置的映射。
KafkaConsumer 类具有下表列出的以下重要方法。
序列号 | 方法及说明 |
---|---|
1 |
public java.util.Set<TopicPart-tition> 赋值() 获取消费者当前分配的一组分区。 |
2 |
公共字符串订阅() 订阅给定的主题列表以获取动态分配的分区。 |
3 |
public void sub-scribe(java.util.List<java.lang.String>主题,ConsumerRe-balanceListener监听器) 订阅给定的主题列表以获取动态分配的分区。 |
4 |
公共无效取消订阅() 从给定的分区列表中取消订阅主题。 |
5 |
公共无效订阅(java.util.List<java.lang.String>主题) 订阅给定的主题列表以获取动态分配的分区。如果给定的主题列表为空,则其处理方式与 unsubscribe() 相同。 |
6 |
public void sub-scribe(java.util.regex.Pattern模式,ConsumerRebalanceLis-tener监听器) 参数模式是指正则表达式格式的订阅模式,监听器参数从订阅模式获取通知。 |
7 |
public void as-sign(java.util.List<TopicParti-tion> 分区) 手动向客户分配分区列表。 |
8 |
轮询() 使用订阅/分配 API 之一获取指定主题或分区的数据。如果在轮询数据之前未订阅主题,这将返回错误。 |
9 |
公共无效commitSync() 提交在最后一次 poll() 上返回的所有订阅主题和分区列表的偏移量。相同的操作也适用于 commitAsyn()。 |
10 |
公共无效寻求(TopicPartition分区,长偏移量) 获取消费者将在下一个 poll() 方法中使用的当前偏移值。 |
11 |
公共无效简历() 恢复暂停的分区。 |
12 |
公共无效唤醒() 唤醒消费者。 |
消费者记录API
ConsumerRecord API 用于从 Kafka 集群接收记录。该 API 由主题名称、从中接收记录的分区号以及指向 Kafka 分区中的记录的偏移量组成。ConsumerRecord 类用于创建具有特定主题名称、分区计数和 <key, value> 对的消费者记录。它具有以下签名。
public ConsumerRecord(string topic,int partition, long offset,K key, V value)
主题- 从 Kafka 集群接收的消费者记录的主题名称。
分区- 主题的分区。
Key - 记录的键,如果不存在键,将返回 null。
值- 记录内容。
消费者记录 API
ConsumerRecords API 充当 ConsumerRecord 的容器。此 API 用于保存特定主题的每个分区的 ConsumerRecord 列表。其构造函数定义如下。
public ConsumerRecords(java.util.Map<TopicPartition,java.util.List <Consumer-Record>K,V>>> records)
TopicPartition - 返回特定主题的分区图。
Records - 返回 ConsumerRecord 列表。
ConsumerRecords 类定义了以下方法。
序列号 | 方法和说明 |
---|---|
1 |
公共 int 计数() 所有主题的记录数。 |
2 |
公共设置分区() 包含此记录集中数据的分区集(如果没有返回数据,则该集为空)。 |
3 |
公共迭代器 iterator() 迭代器使您能够循环访问集合、获取或删除元素。 |
4 |
公共列表记录() 获取给定分区的记录列表。 |
配置设置
下面列出了消费者客户端 API 主要配置设置的配置设置 -
序列号 | 设置和说明 |
---|---|
1 |
引导服务器 经纪人的引导列表。 |
2 |
组号 将单个消费者分配给一个组。 |
3 |
启用自动提交 如果值为 true,则启用偏移量的自动提交,否则不提交。 |
4 |
自动提交间隔.ms 返回更新的消耗偏移量写入 ZooKeeper 的频率。 |
5 |
会话超时毫秒 指示Kafka在放弃并继续消费消息之前将等待ZooKeeper响应请求(读或写)的毫秒数。 |
简单消费者应用程序
生产者申请步骤在这里保持不变。首先,启动您的 ZooKeeper 和 Kafka 代理。然后使用名为SimpleCon-sumer.java
的 java 类创建SimpleConsumer
应用程序,并键入以下代码。
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class SimpleConsumer { public static void main(String[] args) throws Exception { if(args.length == 0){ System.out.println("Enter topic name"); return; } //Kafka consumer configuration settings String topicName = args[0].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", "test"); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer <String, String>(props); //Kafka Consumer subscribes list of topics here. consumer.subscribe(Arrays.asList(topicName)) //print the topic name System.out.println("Subscribed to topic " + topicName); int i = 0; while (true) { ConsumerRecords<String, String> records = con-sumer.poll(100); for (ConsumerRecord<String, String> record : records) // print the offset,key and value for the consumer records. System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
编译- 可以使用以下命令编译应用程序。
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java
执行 -可以使用以下命令执行应用程序
java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>
输入- 打开生产者 CLI 并向主题发送一些消息。您可以将简单的输入设置为“Hello Consumer”。
输出- 以下是输出。
Subscribed to topic Hello-Kafka offset = 3, key = null, value = Hello Consumer
Apache Kafka - 消费者组示例
Consumer Group是来自Kafka主题的多线程或多机消费。
消费者组
消费者可以使用相同的group.id加入群组
。
组的最大并行度是组中消费者的数量←分区数。
Kafka将主题的分区分配给一组中的消费者,以便每个分区恰好由该组中的一个消费者消费。
Kafka 保证消息只能被组中的单个消费者读取。
消费者可以按照消息在日志中存储的顺序查看消息。
消费者的重新平衡
添加更多进程/线程将导致 Kafka 重新平衡。如果任何消费者或代理无法向 ZooKeeper 发送心跳,则可以通过 Kafka 集群重新配置。在此重新平衡期间,Kafka 会将可用分区分配给可用线程,可能会将分区移动到另一个进程。
import java.util.Properties; import java.util.Arrays; import org.apache.kafka.clients.consumer.KafkaConsumer; import org.apache.kafka.clients.consumer.ConsumerRecords; import org.apache.kafka.clients.consumer.ConsumerRecord; public class ConsumerGroup { public static void main(String[] args) throws Exception { if(args.length < 2){ System.out.println("Usage: consumer <topic> <groupname>"); return; } String topic = args[0].toString(); String group = args[1].toString(); Properties props = new Properties(); props.put("bootstrap.servers", "localhost:9092"); props.put("group.id", group); props.put("enable.auto.commit", "true"); props.put("auto.commit.interval.ms", "1000"); props.put("session.timeout.ms", "30000"); props.put("key.deserializer", "org.apache.kafka.common.serialization.ByteArraySerializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(props); consumer.subscribe(Arrays.asList(topic)); System.out.println("Subscribed to topic " + topic); int i = 0; while (true) { ConsumerRecords<String, String> records = consumer.poll(100); for (ConsumerRecord<String, String> record : records) System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(), record.key(), record.value()); } } }
汇编
javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*" ConsumerGroup.java
执行
>>java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup <topic-name> my-group >>java -cp "/home/bala/Workspace/kafka/kafka_2.11-0.9.0.0/libs/*":. ConsumerGroup <topic-name> my-group
在这里,我们创建了一个名为my-group 的
示例组,有两个消费者。同样,您可以创建您的组以及组中的消费者数量。
输入
打开生产者 CLI 并发送一些消息,例如 -
Test consumer group 01 Test consumer group 02
第一个过程的输出
Subscribed to topic Hello-kafka offset = 3, key = null, value = Test consumer group 01
第二过程的输出
Subscribed to topic Hello-kafka offset = 3, key = null, value = Test consumer group 02
现在希望您通过使用 Java 客户端演示已经了解了 SimpleConsumer 和 ConsumeGroup。现在您已经了解如何使用 Java 客户端发送和接收消息。让我们在下一章继续 Kafka 与大数据技术的集成。
Apache Kafka - 与 Storm 集成
在本章中,我们将学习如何将 Kafka 与 Apache Storm 集成。
关于暴风雨
Storm 最初由 Nathan Marz 和 BackType 团队创建。在很短的时间内,Apache Storm 成为了分布式实时处理系统的标准,允许您处理大量数据。Storm 非常快,基准测试显示每个节点每秒处理超过一百万个元组。Apache Storm 持续运行,使用来自配置源 (Spouts) 的数据并将数据沿着处理管道 (Bolts) 传递。喷口和螺栓组合起来形成拓扑。
与 Storm 集成
Kafka 和 Storm 天然相辅相成,它们的强大合作可以实现对快速移动的大数据的实时流分析。Kafka 和 Storm 的集成是为了让开发人员更轻松地从 Storm 拓扑中获取和发布数据流。
概念流程
Spout 是流的来源。例如,spout 可以从 Kafka Topic 读取元组并将它们作为流发出。Bolt 消耗输入流、处理并可能发出新流。Bolt 可以执行任何操作,从运行函数、过滤元组、进行流式聚合、流式连接、与数据库对话等等。Storm 拓扑中的每个节点并行执行。拓扑会无限期地运行,直到您将其终止。Storm 将自动重新分配任何失败的任务。此外,Storm 保证即使机器宕机、消息丢失也不会丢失数据。
让我们详细了解一下 Kafka-Storm 集成 API。Kafka 与 Storm 的集成主要分为三个类。它们如下 -
BrokerHosts - ZkHosts 和 StaticHosts
BrokerHosts 是一个接口,ZkHosts 和 StaticHosts 是它的两个主要实现。ZkHosts 用于通过在 ZooKeeper 中维护详细信息来动态跟踪 Kafka 代理,而 StaticHosts 用于手动/静态设置 Kafka 代理及其详细信息。ZkHosts 是访问 Kafka 代理的简单快速的方法。
ZkHosts 的签名如下 -
public ZkHosts(String brokerZkStr, String brokerZkPath) public ZkHosts(String brokerZkStr)
其中brokerZkStr是ZooKeeper主机,brokerZkPath是用于维护Kafka代理详细信息的ZooKeeper路径。
卡夫卡配置API
该API用于定义Kafka集群的配置设置。Kafka Config的签名定义如下
public KafkaConfig(BrokerHosts hosts, string topic)
主机- BrokerHosts 可以是 ZkHosts / StaticHosts。
主题- 主题名称。
Spout配置接口
Spoutconfig 是 KafkaConfig 的扩展,支持额外的 ZooKeeper 信息。
public SpoutConfig(BrokerHosts hosts, string topic, string zkRoot, string id)
Hosts - BrokerHosts 可以是 BrokerHosts 接口的任何实现
主题- 主题名称。
zkRoot - ZooKeeper 根路径。
id - spout 存储其在 Zookeeper 中消耗的偏移量的状态。该 id 应该唯一地标识您的 spout。
方案为多方案
SchemeAsMultiScheme 是一个接口,它规定从 Kafka 消耗的 ByteBuffer 如何转换为风暴元组。它派生自 MultiScheme 并接受 Scheme 类的实现。Scheme 类有很多实现,其中一种实现是 StringScheme,它将字节解析为简单的字符串。它还控制输出字段的命名。签名定义如下。
public SchemeAsMultiScheme(Scheme scheme)
方案- 从 kafka 消耗的字节缓冲区。
KafkaSpout API
KafkaSpout 是我们的 Spout 实现,它将与 Storm 集成。它从 kafka 主题中获取消息并将其作为元组发送到 Storm 生态系统中。KafkaSpout 从 SpoutConfig 获取其配置详细信息。
下面是创建简单 Kafka spout 的示例代码。
// ZooKeeper connection string BrokerHosts hosts = new ZkHosts(zkConnString); //Creating SpoutConfig Object SpoutConfig spoutConfig = new SpoutConfig(hosts, topicName, "/" + topicName UUID.randomUUID().toString()); //convert the ByteBuffer to String. spoutConfig.scheme = new SchemeAsMultiScheme(new StringScheme()); //Assign SpoutConfig to KafkaSpout. KafkaSpout kafkaSpout = new KafkaSpout(spoutConfig);
螺栓创建
Bolt 是一个将元组作为输入、处理元组并生成新元组作为输出的组件。Bolts 将实现 IRichBolt 接口。在该程序中,使用两个bolt类WordSplitter-Bolt和WordCounterBolt来执行操作。
IRichBolt 接口有以下方法 -
准备- 为 Bolt 提供要执行的环境。执行器将运行此方法来初始化 spout。
执行- 处理单个输入元组。
Cleanup - 当螺栓将要关闭时调用。
declareOutputFields - 声明元组的输出模式。
让我们创建 SplitBolt.java,它实现将句子拆分为单词的逻辑;以及 CountBolt.java,它实现分离唯一单词并计算其出现次数的逻辑。
SplitBolt.java
import java.util.Map; import backtype.storm.tuple.Tuple; import backtype.storm.tuple.Fields; import backtype.storm.tuple.Values; import backtype.storm.task.OutputCollector; import backtype.storm.topology.OutputFieldsDeclarer; import backtype.storm.topology.IRichBolt; import backtype.storm.task.Top