Apache Storm - 三叉戟


Trident 是 Storm 的扩展。与 Storm 一样,Trident 也是由 Twitter 开发的。开发 Trident 的主要原因是在 Storm 之上提供高级抽象以及状态流处理和低延迟分布式查询。

Trident使用spout和bolt,但这些低级组件是Trident在执行前自动生成的。Trident 具有函数、过滤器、连接、分组和聚合。

Trident 将流作为一系列批次进行处理,这些批次称为事务。通常,这些小批量的大小约为数千或数百万个元组,具体取决于输入流。这样,Trident 与 Storm 不同,Storm 执行逐元组处理。

批处理概念与数据库事务非常相似。每笔交易都会分配一个交易 ID。一旦所有处理完成,交易就被认为是成功的。然而,处理事务的元组之一失败将导致整个事务重新传输。对于每个批次,Trident 将在事务开始时调用 beginCommit,并在事务结束时调用 commit。

三叉戟拓扑

Trident API 公开了一个使用“TridentTopology”类创建 Trident 拓扑的简单选项。基本上,Trident 拓扑从 spout 接收输入流,并对流执行有序的操作序列(过滤、聚合、分组等)。Storm Tuple 被 Trident Tuple 取代,Bolts 被操作取代。一个简单的 Trident 拓扑可以如下创建 -

TridentTopology topology = new TridentTopology();

三叉戟元组

Trident 元组是一个命名的值列表。TridentTuple 接口是 Trident 拓扑的数据模型。TridentTuple接口是Trident拓扑可以处理的数据的基本单元。

三叉戟喷口

Trident spout 与 Storm spout 类似,但具有使用 Trident 功能的附加选项。实际上,我们仍然可以使用我们在 Storm 拓扑中使用的 IRichSpout,但它本质上是非事务性的,我们将无法利用 Trident 提供的优势。

具有使用 Trident 特性的所有功能的基本 Spout 是“ITridentSpout”。它支持事务和不透明事务语义。其他 Spout 是 IBatchSpout、IPartitionedTridentSpout 和 IOpaquePartitionedTridentSpout。

除了这些通用 Spout 之外,Trident 还有许多 Trident Spout 的示例实现。其中之一是 FeederBatchSpout spout,我们可以使用它轻松发送三叉戟元组的命名列表,而无需担心批处理、并行性等。

FeederBatchSpout 创建和数据馈送可以如下所示完成 -

TridentTopology topology = new TridentTopology();
FeederBatchSpout testSpout = new FeederBatchSpout(
   ImmutableList.of("fromMobileNumber", "toMobileNumber", “duration”));
topology.newStream("fixed-batch-spout", testSpout)
testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)));

三叉戟行动

Trident 依靠“Trident 操作”来处理 Trident 元组的输入流。Trident API 有许多内置操作来处理简单到复杂的流处理。这些操作的范围从简单的验证到复杂的三叉戟元组分组和聚合。让我们回顾一下最重要和最常用的操作。

筛选

Filter 是一个用于执行输入验证任务的对象。Trident 过滤器获取 trident 元组字段的子集作为输入,并根据是否满足某些条件返回 true 或 false。如果返回 true,则该元组将保留在输出流中;否则,该元组将从流中删除。Filter 基本上会继承自BaseFilter类并实现isKeep方法。这是过滤器操作的示例实现 -

public class MyFilter extends BaseFilter {
   public boolean isKeep(TridentTuple tuple) {
      return tuple.getInteger(1) % 2 == 0;
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2]
[1, 4]

可以使用“each”方法在拓扑中调用过滤器函数。“Fields”类可用于指定输入(三叉戟元组的子集)。示例代码如下 -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
.each(new Fields("a", "b"), new MyFilter())

功能

函数是一个用于对单个三叉戟元组执行简单操作的对象。它采用 trident 元组字段的子集并发出零个或多个新的 trident 元组字段。

Function基本上继承自BaseFunction类并实现了execute方法。下面给出了示例实现 -

public class MyFunction extends BaseFunction {
   public void execute(TridentTuple tuple, TridentCollector collector) {
      int a = tuple.getInteger(0);
      int b = tuple.getInteger(1);
      collector.emit(new Values(a + b));
   }
}

input

[1, 2]
[1, 3]
[1, 4]

output

[1, 2, 3]
[1, 3, 4]
[1, 4, 5]

就像 Filter 操作一样,Function 操作可以使用each方法在拓扑中调用。示例代码如下 -

TridentTopology topology = new TridentTopology();
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d")));

聚合

聚合是用于对输入批次、分区或流执行聚合操作的对象。Trident 具有三种类型的聚合。它们如下 -

  • aggregate - 单独聚合每批三叉戟元组。在聚合过程中,元组最初使用全局分组进行重新分区,以将同一批次的所有分区合并为单个分区。

  • partitionAggregate - 聚合每个分区而不是整批三叉戟元组。分区聚合的输出完全替换了输入元组。分区聚合的输出包含单个字段元组。

  • persistaggregate - 聚合所有批次中的所有 trident 元组,并将结果存储在内存或数据库中。

TridentTopology topology = new TridentTopology();

// aggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .aggregate(new Count(), new Fields(“count”))
	
// partitionAggregate operation
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .partitionAggregate(new Count(), new Fields(“count"))
	
// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

可以使用CombinerAggregator、ReducerAggregator 或通用Aggregator 接口创建聚合操作。上面示例中使用的“count”聚合器是内置聚合器之一。它是使用“CombinerAggregator”实现的。实现如下 -

public class Count implements CombinerAggregator<Long> {
   @Override
   public Long init(TridentTuple tuple) {
      return 1L;
   }
	
   @Override
   public Long combine(Long val1, Long val2) {
      return val1 + val2;
   }
	
   @Override
   public Long zero() {
      return 0L;
   }
}

分组

分组操作是一个内置操作,可以通过groupBy方法调用。groupBy 方法通过对指定字段执行 partitionBy 来重新分区流,然后在每个分区中,将组字段相等的元组分组在一起。通常,我们使用“groupBy”和“persistentAggregate”来获得分组聚合。示例代码如下 -

TridentTopology topology = new TridentTopology();

// persistentAggregate - saving the count to memory
topology.newStream("spout", spout)
   .each(new Fields(“a, b"), new MyFunction(), new Fields(“d”))
   .groupBy(new Fields(“d”)
   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));

合并和加入

合并和连接可以分别使用“merge”和“join”方法来完成。合并合并一个或多个流。连接与合并类似,不同之处在于连接使用双方的 trident 元组字段来检查和连接两个流。此外,加入只能在批次级别下进行。示例代码如下 -

TridentTopology topology = new TridentTopology();
topology.merge(stream1, stream2, stream3);
topology.join(stream1, new Fields("key"), stream2, new Fields("x"), 
   new Fields("key", "a", "b", "c"));

状态维护

Trident 提供了一种状态维护机制。状态信息可以存储在拓扑本身中,否则您也可以将其存储在单独的数据库中。原因是为了维持这样的状态:如果任何元组在处理过程中失败,则重试失败的元组。这会在更新状态时产生问题,因为您不确定该元组的状态之前是否已更新。如果元组在更新状态之前失败,则重试元组将使状态稳定。但是,如果元组在更新状态后失败,则重试同一元组将再次增加数据库中的计数并使状态不稳定。需要执行以下步骤以确保消息仅被处理一次 -

  • 小批量处理元组。

  • 为每个批次分配一个唯一的 ID。如果重试该批次,则会为其提供相同的唯一 ID。

  • 状态更新是按批次排序的。例如,在第一批的状态更新完成之前,无法进行第二批的状态更新。

分布式远程过程调用

分布式RPC用于从Trident拓扑中查询和检索结果。Storm有一个内置的分布式RPC服务器。分布式RPC服务器接收来自客户端的RPC请求并将其传递给拓扑。拓扑处理请求并将结果发送到分布式RPC服务器,分布式RPC服务器将结果重定向到客户端。Trident 的分布式 RPC 查询的执行方式与普通 RPC 查询类似,只是这些查询是并行运行的。

何时使用三叉戟?

与许多用例一样,如果要求只处理一次查询,我们可以通过在 Trident 中编写拓扑来实现。另一方面,对于Storm来说,很难实现一次处理。因此,Trident 对于需要一次性处理的用例非常有用。Trident 并不适合所有用例,尤其是高性能用例,因为它增加了 Storm 和管理状态的复杂性。

三叉戟的工作示例

我们将把上一节中制定的呼叫日志分析器应用程序转换为 Trident 框架。与普通风暴相比,由于其高级 API,Trident 的应用程序将相对容易。Storm 基本上需要在 Trident 中执行 Function、Filter、Aggregate、GroupBy、Join 和 Merge 操作中的任何一项。最后,我们将使用LocalDRPC类启动DRPC服务器,并使用LocalDRPC类的execute方法搜索一些关键字。

格式化通话信息

FormatCall类的目的是格式化包括“主叫号码”和“接收号码”的呼叫信息。完整的程序代码如下 -

编码:FormatCall.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class FormatCall extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      String fromMobileNumber = tuple.getString(0);
      String toMobileNumber = tuple.getString(1);
      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));
   }
}

CSV分割

CSVSplit 类的目的是基于“逗号 (,)”拆分输入字符串并发出字符串中的每个单词。该函数用于解析分布式查询的输入参数。完整代码如下 -

编码:CSVSplit.java

import backtype.storm.tuple.Values;

import storm.trident.operation.BaseFunction;
import storm.trident.operation.TridentCollector;
import storm.trident.tuple.TridentTuple;

public class CSVSplit extends BaseFunction {
   @Override
   public void execute(TridentTuple tuple, TridentCollector collector) {
      for(String word: tuple.getString(0).split(",")) {
         if(word.length() > 0) {
            collector.emit(new Values(word));
         }
      }
   }
}

日志分析器

这是主要的应用程序。最初,应用程序将初始化 TridentTopology 并使用FeederBatchSpout提供调用者信息。Trident拓扑流可以使用TridentTopology类的newStream方法创建。类似地,可以使用TridentTopology类的newDRCPStream方法创建Trident拓扑DRPC流。可以使用 LocalDRPC 类创建一个简单的 DRCP 服务器。LocalDRPC有执行方法来搜索某些关键字。完整的代码如下。

编码:LogAnalyserTrident.java

import java.util.*;

import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.LocalDRPC;
import backtype.storm.utils.DRPCClient;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;

import storm.trident.TridentState;
import storm.trident.TridentTopology;
import storm.trident.tuple.TridentTuple;

import storm.trident.operation.builtin.FilterNull;
import storm.trident.operation.builtin.Count;
import storm.trident.operation.builtin.Sum;
import storm.trident.operation.builtin.MapGet;
import storm.trident.operation.builtin.Debug;
import storm.trident.operation.BaseFilter;

import storm.trident.testing.FixedBatchSpout;
import storm.trident.testing.FeederBatchSpout;
import storm.trident.testing.Split;
import storm.trident.testing.MemoryMapState;

import com.google.common.collect.ImmutableList;

public class LogAnalyserTrident {
   public static void main(String[] args) throws Exception {
      System.out.println("Log Analyser Trident");
      TridentTopology topology = new TridentTopology();
		
      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",
         "toMobileNumber", "duration"));

      TridentState callCounts = topology
         .newStream("fixed-batch-spout", testSpout)
         .each(new Fields("fromMobileNumber", "toMobileNumber"), 
         new FormatCall(), new Fields("call"))
         .groupBy(new Fields("call"))
         .persistentAggregate(new MemoryMapState.Factory(), new Count(), 
         new Fields("count"));

      LocalDRPC drpc = new LocalDRPC();

      topology.newDRPCStream("call_count", drpc)
         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));

      topology.newDRPCStream("multiple_call_count", drpc)
         .each(new Fields("args"), new CSVSplit(), new Fields("call"))
         .groupBy(new Fields("call"))
         .stateQuery(callCounts, new Fields("call"), new MapGet(), 
         new Fields("count"))
         .each(new Fields("call", "count"), new Debug())
         .each(new Fields("count"), new FilterNull())
         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));

      Config conf = new Config();
      LocalCluster cluster = new LocalCluster();
      cluster.submitTopology("trident", conf, topology.build());
      Random randomGenerator = new Random();
      int idx = 0;
		
      while(idx < 10) {
         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123402", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123403", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123401", 
            "1234123404", randomGenerator.nextInt(60))));

         testSpout.feed(ImmutableList.of(new Values("1234123402", 
            "1234123403", randomGenerator.nextInt(60))));

         idx = idx + 1;
      }

      System.out.println("DRPC : Query starts");
      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));
      System.out.println(drpc.execute("multiple_call_count", "1234123401 -
         1234123402,1234123401 - 1234123403"));
      System.out.println("DRPC : Query ends");

      cluster.shutdown();
      drpc.shutdown();

      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);
   }
}

构建并运行应用程序

完整的应用程序包含三个 Java 代码。它们如下 -

  • 格式调用.java
  • CSVSplit.java
  • LogAnalyerTrident.java

可以使用以下命令构建应用程序 -

javac -cp “/path/to/storm/apache-storm-0.9.5/lib/*” *.java

该应用程序可以使用以下命令运行 -

java -cp “/path/to/storm/apache-storm-0.9.5/lib/*”:. LogAnalyserTrident

输出

一旦应用程序启动,应用程序将输出有关集群启动过程、操作处理、DRPC Server 和客户端信息以及最后集群关闭过程的完整详细信息。此输出将显示在控制台上,如下所示。

DRPC : Query starts
[["1234123401 - 1234123402",10]]
DEBUG: [1234123401 - 1234123402, 10]
DEBUG: [1234123401 - 1234123403, 10]
[[20]]
DRPC : Query ends