Spring Cloud - 使用 Apache Kafka 进行流处理
介绍
在分布式环境中,服务需要相互通信。通信可以同步或异步发生。在本节中,我们将了解服务如何使用消息代理进行异步通信。
执行异步通信的两个主要好处 -
生产者和消费者的速度可能不同- 如果数据的消费者慢或快,它不会影响生产者处理,反之亦然。两者都可以以各自的速度工作,而不会互相影响。
生产者不需要处理来自不同消费者的请求- 可能有多个消费者想要从生产者读取同一组数据。通过中间的消息代理,生产者无需处理这些消费者生成的负载。另外,生产者级别的任何中断都不会阻止消费者读取旧的生产者数据,因为这些数据在消息代理中可用。
Apache Kafka和RabbitMQ是两个著名的用于进行异步通信的消息代理。在本教程中,我们将使用 Apache Kafka。
Kafka——依赖设置
让我们以之前使用过的 Restaurant 为例。因此,假设我们的客户服务和餐厅服务通过异步通信进行通信。为此,我们将使用 Apache Kafka。我们需要在两种服务中使用它,即客户服务和餐厅服务。
要使用 Apache Kafka,我们将更新这两个服务的 POM 并添加以下依赖项。
<dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-stream-kafka</artifactId> </dependency>
我们还需要运行 Kafka 实例。有多种方法可以完成,但我们更喜欢使用 Docker 容器启动 Kafka。以下是我们可以考虑使用的一些图像 -
无论我们使用哪个镜像,这里需要注意的重要一点是,一旦镜像启动并运行,请确保 Kafka 集群可通过localhost:9092访问
现在我们已经在我们的镜像上运行了 Kafka 集群,让我们转向核心示例。
装订和活页夹
谈到 Spring Cloud 流,有三个重要的概念 -
外部消息系统- 这是外部管理的组件,负责存储应用程序生成的事件/消息,可供订阅者/消费者读取。请注意,这不在应用程序/Spring 内管理。Apache Kafka、RabbitMQ 是几个例子
Binders - 这是提供与消息传递系统集成的组件,例如,由消息传递系统的 IP 地址、身份验证等组成。
绑定- 该组件使用绑定程序向消息传递系统生成消息或使用来自特定主题/队列的消息。
所有上述属性均在应用程序属性文件中定义。
例子
让我们以之前使用过的 Restaurant 为例。因此,让我们假设每当客户服务中添加新服务时,我们希望将客户信息通知到附近的餐厅关于他/她的信息。
为此,我们首先更新我们的客户服务以包含并使用 Kafka。请注意,我们将使用客户服务作为数据的生产者。也就是说,每当我们通过 API 添加 Customer 时,它也会被添加到 Kafka 中。
spring: application: name: customer-service cloud: stream: source: customerBinding-out-0 kafka: binder: brokers: localhost:9092 replicationFactor: 1 bindings: customerBinding-out-0: destination: customer producer: partitionCount: 3 server: port: ${app_port} eureka: client: serviceURL: defaultZone: http://localhost:8900/eureka
注意事项-
我们使用本地 Kafka 实例的地址定义了一个绑定器。
我们还定义了绑定“customerBinding-out-0”,它使用“customer”主题来输出消息。
我们还提到了在stream.source中的绑定,以便我们可以在代码中强制使用它。
完成此操作后,现在让我们通过添加新方法“addCustomer”来更新控制器,该方法负责服务 POST 请求。然后,我们从post请求将数据发送到 Kafka Broker。
package com.tutorialspoint; import java.util.HashMap; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RequestMethod; import org.springframework.web.bind.annotation.RestController; @RestController class RestaurantCustomerInstancesController { @Autowired private StreamBridge streamBridge; static HashMap<Long, Customer> mockCustomerData = new HashMap(); static{ mockCustomerData.put(1L, new Customer(1, "Jane", "DC")); mockCustomerData.put(2L, new Customer(2, "John", "SFO")); mockCustomerData.put(3L, new Customer(3, "Kate", "NY")); } @RequestMapping("/customer/{id}") public Customer getCustomerInfo(@PathVariable("id") Long id) { System.out.println("Querying customer for id with: " + id); return mockCustomerData.get(id); } @RequestMapping(path = "/customer/{id}", method = RequestMethod.POST) public Customer addCustomer(@PathVariable("id") Long id) { // add default name Customer defaultCustomer = new Customer(id, "Dwayne", "NY"); streamBridge.send("customerBinding-out-0", defaultCustomer); return defaultCustomer; } }
注意事项
我们使用 Autowiring StreamBridge 来发送消息。
我们在“发送”方法中使用的参数还指定了我们想要用来发送数据的绑定。
现在让我们更新我们的餐厅服务以包含并订阅“客户”主题。请注意,我们将使用餐厅服务作为数据的消费者。也就是说,每当我们通过 API 添加客户时,餐厅服务都会通过 Kafka 来了解它。
首先,让我们更新应用程序属性文件。
spring: application: name: restaurant-service cloud: function: definition: customerBinding stream: kafka: binder: brokers: localhost:9092 replicationFactor: 1 bindings: customerBinding-in-0: destination: customer server: port: ${app_port} eureka: client: serviceURL: defaultZone: http://localhost:8900/eureka
完成此操作后,现在让我们通过添加新方法“customerBinding”来更新控制器,该方法负责获取请求并提供一个打印请求及其元数据详细信息的函数。
package com.tutorialspoint; import java.util.HashMap; import java.util.List; import java.util.function.Consumer; import java.util.function.Supplier; import java.util.stream.Collectors; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.cloud.stream.annotation.StreamListener; import org.springframework.cloud.stream.function.StreamBridge; import org.springframework.context.annotation.Bean; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.support.MessageBuilder; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.RequestMapping; import org.springframework.web.bind.annotation.RestController; @RestController class RestaurantController { @Autowired CustomerService customerService; @Autowired private StreamBridge streamBridge; static HashMap<Long, Restaurant> mockRestaurantData = new HashMap(); static{ mockRestaurantData.put(1L, new Restaurant(1, "Pandas", "DC")); mockRestaurantData.put(2L, new Restaurant(2, "Indies", "SFO")); mockRestaurantData.put(3L, new Restaurant(3, "Little Italy", "DC")); mockRestaurantData.put(4L, new Restaurant(4, "Pizeeria", "NY")); } @RequestMapping("/restaurant/customer/{id}") public List<Restaurant> getRestaurantForCustomer(@PathVariable("id") Long id) { System.out.println("Got request for customer with id: " + id); String customerCity = customerService.getCustomerById(id).getCity(); return mockRestaurantData.entrySet().stream().filter( entry -> entry.getValue().getCity().equals(customerCity)) .map(entry -> entry.getValue()) .collect(Collectors.toList()); } @RequestMapping("/restaurant/cust/{id}") public void getRestaurantForCust(@PathVariable("id") Long id) { streamBridge.send("ordersBinding-out-0", id); } @Bean public Consumer<Message<Customer>> customerBinding() { return msg -> { System.out.println(msg); }; } }
注意事项-
我们正在使用“customerBinding”,它应该传递当消息到达此绑定时将调用的函数。
创建捆绑并指定主题时,我们为此函数/bean 使用的名称也需要在 YAML 文件中使用。
现在,让我们像往常一样执行上面的代码,启动Eureka Server。请注意,这不是硬性要求,在这里是为了完整性。
然后,让我们使用以下命令编译并开始更新客户服务 -
mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient- 1.0.jar --spring.config.location=classpath:application-kafka.yml
然后,让我们使用以下命令编译并开始更新 Restaurant Service -
mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client- 1.0.jar --spring.config.location=classpath:application-kafka.yml
我们已经准备好了,现在让我们通过 API 来测试我们的代码片段 -
curl -X POST http://localhost:8083/customer/1
以下是我们将从该 API 获得的输出 -
{ "id": 1, "name": "Dwayne", "city": "NY" }
现在,让我们检查餐厅服务的日志 -
GenericMessage [payload=Customer [id=1, name=Dwayne, city=NY], headers={kafka_offset=1,...
因此,实际上,您会看到使用 Kafka Broker,餐厅服务收到了有关新添加客户的通知。
分区和消费者组
分区和消费者组是使用 Spring Cloud 流时应该注意的两个重要概念。
分区- 它们用于对数据进行分区,以便我们可以在多个消费者之间分配工作。
让我们看看如何在 Spring Cloud 中对数据进行分区。比如说,我们要根据客户 ID 对数据进行分区。因此,让我们更新我们的客户服务。为此,我们需要告诉
让我们更新客户服务应用程序属性以指定数据的键。
spring: application: name: customer-service cloud: function: definition: ordersBinding stream: source: customerBinding-out-0 kafka: binder: brokers: localhost:9092 replicationFactor: 1 bindings: customerBinding-out-0: destination: customer producer: partitionKeyExpression: 'getPayload().getId()' partitionCount: 3 server: port: ${app_port} eureka: client: serviceURL: defaultZone: http://localhost:8900/eureka
为了指定键,即“partitionKeyExpression”,我们提供了 Spring 表达式语言。由于我们在消息中发送客户数据,因此表达式假定类型为 GenericMessage<Customer>。请注意,GenericMessage 是 Spring 框架类,用于将有效负载和标头包装在单个对象中。因此,我们从此消息中获取 Customer 类型的有效负载,然后调用客户的getId()方法。
现在,让我们也更新我们的消费者,即餐厅服务,以在使用请求时记录更多信息。
现在,让我们像往常一样执行上面的代码,启动Eureka Server。请注意,这不是一个硬性要求,在这里是为了完整起见。
然后,让我们使用以下命令编译并开始更新客户服务 -
mvn clean install ; java -Dapp_port=8083 -jar .\target\spring-cloud-eurekaclient- 1.0.jar --spring.config.location=classpath:application-kafka.yml
然后,让我们使用以下命令编译并开始更新 Restaurant Service -
mvn clean install; java -Dapp_port=8082 -jar .\target\spring-cloud-feign-client- 1.0.jar --spring.config.location=classpath:application-kafka.yml
我们已经准备好了,现在让我们测试我们的代码片段。作为测试的一部分,我们将执行以下操作 -
插入 ID 为 1 的客户:curl -X POST http://localhost:8083/customer/1
插入 ID 为 1 的客户:curl -X POST http://localhost:8083/customer/1
插入 ID 为 1 的客户:curl -X POST http://localhost:8083/customer/5
插入 ID 为 1 的客户:curl -X POST http://localhost:8083/customer/3
插入 ID 为 1 的客户:curl -X POST http://localhost:8083/customer/1
我们不太关心 API 的输出。相反,我们更关心数据发送到的分区。由于我们使用客户 ID 作为键,因此我们希望具有相同 ID 的客户最终会位于同一分区中。
现在,让我们检查餐厅服务的日志 -
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY] Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY] Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 2 Customer: Customer [id=5, name=Dwayne, city=NY] Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 0 Customer: Customer [id=3, name=Dwayne, city=NY] Consumer Group: anonymous.9108d02a-b1ee-4a7a-8707-7760581fa323 Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY]
因此,正如我们所看到的,ID 为 1 的客户每次都位于同一分区,即分区 1。
消费者组- 消费者组是出于相同目的阅读相同主题的消费者的逻辑分组。主题中的数据在消费者组中的消费者之间进行分区,以便给定消费者组中只有一个消费者可以读取主题的分区。
要定义消费者组,我们需要做的就是在使用 Kafka 主题名称的绑定中定义一个组。例如,让我们在控制器的应用程序文件中定义消费者组名称。
spring: application: name: restaurant-service cloud: function: definition: customerBinding stream: kafka: binder: brokers: localhost:9092 replicationFactor: 1 bindings: customerBinding-in-0: destination: customer group: restController server: port: ${app_port} eureka: client: serviceURL: defaultZone: http://localhost:8900/eureka
让我们重新编译并启动餐厅服务。现在,让我们通过点击客户服务上的 POST API 来生成事件 -
插入 ID 为 1 的客户:curl -X POST http://localhost:8083/customer/1
现在,如果我们检查餐厅服务的日志,我们将看到以下内容 -
Consumer: org.apache.kafka.clients.consumer.KafkaConsumer@7d6d8400 Consumer Group: restContoller Partition Id: 1 Customer: Customer [id=1, name=Dwayne, city=NY]
因此,正如我们从输出中看到的,我们创建了一个名为“rest-contoller”的消费者组,其消费者负责读取主题。在上面的例子中,我们只运行了一个服务实例,因此“客户”主题的所有分区都分配给同一个实例。但是,如果我们有多个分区,我们将在工作人员之间分配分区。