Apache Kafka - 简单的生产者示例


让我们创建一个使用 Java 客户端发布和使用消息的应用程序。Kafka 生产者客户端由以下 API 组成。

Kafka生产者API

让我们了解本节中最重要的 Kafka 生产者 API 集。KafkaProducer API 的核心部分是KafkaProducer类。KafkaProducer 类提供了一个选项,可以通过以下方法在其构造函数中连接 Kafka 代理。

  • KafkaProducer 类提供 send 方法将消息异步发送到主题。send()的签名如下

producer.send(new ProducerRecord<byte[],byte[]>(topic, 
partition, key1, value1) , callback);
  • ProducerRecord - 生产者管理等待发送的记录缓冲区。

  • 回调- 当服务器确认记录时执行的用户提供的回调(null 表示没有回调)。

  • KafkaProducer类提供了一个flush方法来确保所有先前发送的消息已经实际完成。刷新方法的语法如下 -

public void flush()
  • KafkaProducer 类提供了partitionFor 方法,该方法有助于获取给定主题的分区元数据。这可用于自定义分区。该方法的签名如下 -

public Map metrics()

它返回由生产者维护的内部指标的映射。

  • public void close() - KafkaProducer 类提供 close 方法块,直到所有先前发送的请求完成。

生产者API

Producer API 的核心部分是Producer类。Producer 类在其构造函数中提供了通过以下方法连接 Kafka Broker 的选项。

制作人阶层

生产者类提供 send 方法,使用以下签名将消息发送到单个或多个主题。


public void send(KeyedMessaget<k,v> message) 
- sends the data to a single topic,par-titioned by key using either sync or async producer.
public void send(List<KeyedMessage<k,v>>messages)
- sends data to multiple topics.
Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

有两种类型的生产者——同步异步

相同的 API 配置也适用于同步生产者。它们之间的区别是同步生产者直接发送消息,但在后台发送消息。当您想要更高的吞吐量时,异步生产者是首选。在以前的版本(如 0.8)中,异步生产者没有 send() 的回调来注册错误处理程序。这仅在当前版本 0.9 中可用。

公共无效关闭()

Producer 类提供close方法来关闭生产者池与所有 Kafka 代理的连接。

配置设置

下表列出了 Producer API 的主要配置设置,以便更好地理解 -

序列号 配置设置和说明
1

客户端ID

识别生产者应用程序

2

生产者类型

同步或异步

3

确认

acks 配置控制生产者请求被视为完整的标准。

4

重试

如果生产者请求失败,则自动使用特定值重试。

5

引导服务器

经纪人的引导列表。

6

徘徊者

如果您想减少请求数量,可以将 linger.ms 设置为大于某个值。

7

键序列化器

串行器接口的键。

8

值序列化器

串行器接口的值。

9

批量大小

缓冲区大小。

10

缓冲存储器

控制生产者可用于缓冲的内存总量。

生产者记录 API

ProducerRecord 是发送到 Kafka 集群的键/值对。ProducerRecord 类构造函数用于使用以下签名创建带有分区、键和值对的记录。

public ProducerRecord (string topic, int partition, k key, v value)
  • 主题- 用户定义的主题名称将附加到记录中。

  • 分区- 分区计数

  • Key - 将包含在记录中的密钥。

  • - 记录内容
public ProducerRecord (string topic, k key, v value)

ProducerRecord 类构造函数用于创建带有键、值对且不带分区的记录。

  • 主题- 创建一个主题来分配记录。

  • 密钥- 记录密钥。

  • - 记录内容。

public ProducerRecord (string topic, v value)

ProducerRecord 类创建一条没有分区和键的记录。

  • 主题- 创建一个主题。

  • - 记录内容。

下表列出了 ProducerRecord 类方法 -

序列号 类方法和说明
1

公共字符串主题()

主题将附加到记录中。

2

公共K密钥()

将包含在记录中的密钥。如果没有这样的键,则此处将返回 null。

3

公共V值()

记录内容。

4

分割()

记录的分区计数

简单生产者应用程序

在创建应用程序之前,首先启动 ZooKeeper 和 Kafka 代理,然后使用 create topic 命令在 Kafka 代理中创建您自己的主题。之后创建一个名为Sim-pleProducer.java的 java 类并输入以下代码。

//import util.properties packages
import java.util.Properties;

//import simple producer packages
import org.apache.kafka.clients.producer.Producer;

//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;

//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;

//Create java class named “SimpleProducer”
public class SimpleProducer {
   
   public static void main(String[] args) throws Exception{
      
      // Check arguments length value
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      
      //Assign topicName to string variable
      String topicName = args[0].toString();
      
      // create instance for properties to access producer configs   
      Properties props = new Properties();
      
      //Assign localhost id
      props.put("bootstrap.servers", “localhost:9092");
      
      //Set acknowledgements for producer requests.      
      props.put("acks", “all");
      
      //If the request fails, the producer can automatically retry,
      props.put("retries", 0);
      
      //Specify buffer size in config
      props.put("batch.size", 16384);
      
      //Reduce the no of requests less than 0   
      props.put("linger.ms", 1);
      
      //The buffer.memory controls the total amount of memory available to the producer for buffering.   
      props.put("buffer.memory", 33554432);
      
      props.put("key.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
         
      props.put("value.serializer", 
         "org.apache.kafka.common.serialization.StringSerializer");
      
      Producer<String, String> producer = new KafkaProducer
         <String, String>(props);
            
      for(int i = 0; i < 10; i++)
         producer.send(new ProducerRecord<String, String>(topicName, 
            Integer.toString(i), Integer.toString(i)));
               System.out.println(“Message sent successfully”);
               producer.close();
   }
}

编译- 可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

执行- 可以使用以下命令执行应用程序。

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

输出

Message sent successfully
To check the above output open new terminal and type Consumer CLI command to receive messages.
>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning
1
2
3
4
5
6
7
8
9
10

简单的消费者示例

到目前为止,我们已经创建了一个生产者来将消息发送到 Kafka 集群。现在让我们创建一个消费者来消费来自 Kafka 集群的消息。KafkaConsumer API用于消费来自Kafka集群的消息。KafkaConsumer 类构造函数定义如下。

public KafkaConsumer(java.util.Map<java.lang.String,java.lang.Object> configs)

configs - 返回消费者配置的映射。

KafkaConsumer 类具有下表列出的以下重要方法。

序列号 方法及说明
1

public java.util.Set<TopicPart-tition> 赋值()

获取消费者当前分配的一组分区。

2

公共字符串订阅()

订阅给定的主题列表以获取动态分配的分区。

3

public void sub-scribe(java.util.List<java.lang.String>主题,ConsumerRe-balanceListener监听器)

订阅给定的主题列表以获取动态分配的分区。

4

公共无效取消订阅()

从给定的分区列表中取消订阅主题。

5

公共无效订阅(java.util.List<java.lang.String>主题)

订阅给定的主题列表以获取动态分配的分区。如果给定的主题列表为空,则其处理方式与 unsubscribe() 相同。

6

public void sub-scribe(java.util.regex.Pattern模式,ConsumerRebalanceLis-tener监听器)

参数模式是指正则表达式格式的订阅模式,监听器参数从订阅模式获取通知。

7

public void as-sign(java.util.List<TopicParti-tion> 分区)

手动向客户分配分区列表。

8

轮询()

使用订阅/分配 API 之一获取指定主题或分区的数据。如果在轮询数据之前未订阅主题,这将返回错误。

9

公共无效commitSync()

提交在最后一次 poll() 上返回的所有订阅主题和分区列表的偏移量。相同的操作也适用于 commitAsyn()。

10

公共无效寻求(TopicPartition分区,长偏移量)

获取消费者将在下一个 poll() 方法中使用的当前偏移值。

11

公共无效简历()

恢复暂停的分区。

12

公共无效唤醒()

唤醒消费者。

消费者记录API

ConsumerRecord API 用于从 Kafka 集群接收记录。该 API 由主题名称、从中接收记录的分区号以及指向 Kafka 分区中的记录的偏移量组成。ConsumerRecord 类用于创建具有特定主题名称、分区计数和 <key, value> 对的消费者记录。它具有以下签名。

public ConsumerRecord(string topic,int partition, long offset,K key, V value)
  • 主题- 从 Kafka 集群接收的消费者记录的主题名称。

  • 分区- 主题的分区。

  • Key - 记录的键,如果不存在键,将返回 null。

  • - 记录内容。

消费者记录 API

ConsumerRecords API 充当 ConsumerRecord 的容器。此 API 用于保存特定主题的每个分区的 ConsumerRecord 列表。其构造函数定义如下。

public ConsumerRecords(java.util.Map<TopicPartition,java.util.List
<Consumer-Record>K,V>>> records)
  • TopicPartition - 返回特定主题的分区图。

  • Records - 返回 ConsumerRecord 列表。

ConsumerRecords 类定义了以下方法。

序列号 方法和说明
1

公共 int 计数()

所有主题的记录数。

2

公共设置分区()

包含此记录集中数据的分区集(如果没有返回数据,则该集为空)。

3

公共迭代器 iterator()

迭代器使您能够循环访问集合、获取或删除元素。

4

公共列表记录()

获取给定分区的记录列表。

配置设置

下面列出了消费者客户端 API 主要配置设置的配置设置 -

序列号 设置和说明
1

引导服务器

经纪人的引导列表。

2

组号

将单个消费者分配给一个组。

3

启用自动提交

如果值为 true,则启用偏移量的自动提交,否则不提交。

4

自动提交间隔.ms

返回更新的消耗偏移量写入 ZooKeeper 的频率。

5

会话超时毫秒

指示Kafka在放弃并继续消费消息之前将等待ZooKeeper响应请求(读或写)的毫秒数。

简单消费者应用程序

生产者申请步骤在这里保持不变。首先,启动您的 ZooKeeper 和 Kafka 代理。然后使用名为SimpleCon-sumer.java的 java 类创建SimpleConsumer应用程序,并键入以下代码。

import java.util.Properties;
import java.util.Arrays;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class SimpleConsumer {
   public static void main(String[] args) throws Exception {
      if(args.length == 0){
         System.out.println("Enter topic name");
         return;
      }
      //Kafka consumer configuration settings
      String topicName = args[0].toString();
      Properties props = new Properties();
      
      props.put("bootstrap.servers", "localhost:9092");
      props.put("group.id", "test");
      props.put("enable.auto.commit", "true");
      props.put("auto.commit.interval.ms", "1000");
      props.put("session.timeout.ms", "30000");
      props.put("key.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer", 
         "org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer<String, String> consumer = new KafkaConsumer
         <String, String>(props);
      
      //Kafka Consumer subscribes list of topics here.
      consumer.subscribe(Arrays.asList(topicName))
      
      //print the topic name
      System.out.println("Subscribed to topic " + topicName);
      int i = 0;
      
      while (true) {
         ConsumerRecords<String, String> records = con-sumer.poll(100);
         for (ConsumerRecord<String, String> record : records)
         
         // print the offset,key and value for the consumer records.
         System.out.printf("offset = %d, key = %s, value = %s\n", 
            record.offset(), record.key(), record.value());
      }
   }
}

编译- 可以使用以下命令编译应用程序。

javac -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*” *.java

执行 -可以使用以下命令执行应用程序

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleConsumer <topic-name>

输入- 打开生产者 CLI 并向主题发送一些消息。您可以将简单的输入设置为“Hello Consumer”。

输出- 以下是输出。

Subscribed to topic Hello-Kafka
offset = 3, key = null, value = Hello Consumer