开发手册 欢迎您!
软件开发者资料库

Apache Storm - Trident

Apache Storm Trident - 从简介,核心概念,集群架构,工作流,分布式消息系统,安装,工作示例,Trident,Twitter中的Apache Storm,Yahoo!简单而简单的步骤学习Apache Storm。财务,应用。

Trident是Storm的延伸.像Storm一样,Trident也是由Twitter开发的.开发Trident背后的主要原因是在Storm之上提供高级抽象以及有状态流处理和低延迟分布式查询.

Trident使用spout和bolt,但这些低 - 级别组件由Trident在执行前自动生成. Trident具有函数,过滤器,连接,分组和聚合.

Trident处理流作为一系列批次,称为事务.通常,这些小批量的大小将在数千或数百万个元组的数量级,具体取决于输入流.这样,Trident就不同于Storm,后者执行元组逐元组处理.

批处理概念与数据库事务非常相似.为每个事务分配一个事务ID.一旦所有处理完成,该交易被认为是成功的.但是,处理其中一个事务元组的失败将导致重新传输整个事务.对于每个批次,Trident将在事务开始时调用beginCommit,并在其结束时提交.

Trident拓扑

Trident API公开使用"TridentTopology"类创建Trident拓扑的简单选项.基本上,Trident拓扑从spout接收输入流并在流上执行有序的操作序列(过滤,聚合,分组等). Storm Tuple被Trident Tuple取代,而Bolts被操作取代.可以创建一个简单的Trident拓扑,如下所示;

TridentTopology topology = new TridentTopology();


三叉戟元组

三叉戟元组是一个命名的值列表. TridentTuple接口是Trident拓扑的数据模型. TridentTuple接口是可由Trident拓扑处理的基本数据单元.

Trident Spout

三叉鲸喷口类似于Storm喷口,以及使用Trident功能的其他选项.实际上,我们仍然可以使用我们在Storm拓扑中使用的IRichSpout,但它本质上是非事务性的,我们将无法使用Trident提供的优势.

具有使用Trident功能的所有功能的基本喷口是"ITridentSpout".它支持事务和不透明的事务语义.其他的喷口是IBatchSpout,IPartitionedTridentSpout和IOpaquePartitionedTridentSpout.

除了这些通用喷口外,Trident还有许多三叉鲸喷口的样品实施.其中一个是FeederBatchSpout spout,我们可以使用它来轻松发送三叉戟元组的命名列表而不用担心批处理,并行等等.

FeederBatchSpout创建和数据馈送可以如图所示完成低于 :

TridentTopology topology = new TridentTopology();FeederBatchSpout testSpout = new FeederBatchSpout(   ImmutableList.of("fromMobileNumber", "toMobileNumber", "duration"));topology.newStream("fixed-batch-spout", testSpout)testSpout.feed(ImmutableList.of(new Values("1234123401", "1234123402", 20)


三叉戟操作

Trident依靠"三叉戟操作"来处理三叉戟元组的输入流. Trident API具有许多内置操作来处理简单到复杂的流处理.这些操作包括简单验证,复杂分组和三叉戟元组的聚合.让我们来看看最重要和最常用的操作.

过滤器

过滤器是一个用于执行输入验证任务的对象. Trident过滤器获取三叉戟元组字段的子集作为输入,并根据是否满足某些条件返回true或false.如果返回true,则元组保留在输出流中;否则,从流中删除元组. Filter基本上将继承自 BaseFilter 类并实现 isKeep 方法.以下是过滤器操作的示例实现 :

public class MyFilter extends BaseFilter {   public boolean isKeep(TridentTuple tuple) {      return tuple.getInteger(1) % 2 == 0;   }}input[1, 2][1, 3][1, 4]output[1, 2][1, 4]


过滤可以使用"each"方法在拓扑中调用函数. "Fields"类可用于指定输入(三叉戟元组的子集).示例代码如下 :

TridentTopology topology = new TridentTopology();topology.newStream("spout", spout).each(new Fields("a", "b"), new MyFilter())


函数

函数是一个用于对单个三叉戟元组执行简单操作的对象.它需要三叉戟元组字段的子集,并发出零个或多个新的三叉戟元组字段.

函数基本上继承自 BaseFunction 类并实现执行方法.下面给出了一个示例实现 :

public class MyFunction extends BaseFunction {   public void execute(TridentTuple tuple, TridentCollector collector) {      int a = tuple.getInteger(0);      int b = tuple.getInteger(1);      collector.emit(new Values(a + b));   }}input[1, 2][1, 3][1, 4]output[1, 2, 3][1, 3, 4][1, 4, 5]


就像过滤操作一样,可以使用每个方法在拓扑中调用函数操作.示例代码如下 :

TridentTopology topology = new TridentTopology();topology.newStream("spout", spout)   .each(new Fields("a, b"), new MyFunction(), new Fields("d")));


聚合

聚合是用于对输入批处理或分区或流执行聚合操作的对象. Trident有三种类型的聚合.它们如下&&;

  • 聚合 : 单独聚合每批三叉戟元组.在聚合过程中,最初使用全局分组对元组进行重新分区,以将同一批次的所有分区组合到一个分区中.

  • partitionAggregate : 聚合每个分区而不是整批三叉戟元组.分区聚合的输出完全替换输入元组.分区聚合的输出包含单个字段元组.

  • persistentaggregate : 在所有批处理中聚合所有三叉戟元组并将结果存储在内存或数据库中.

TridentTopology topology = new TridentTopology();// aggregate operationtopology.newStream("spout", spout)   .each(new Fields("a, b"), new MyFunction(), new Fields("d"))   .aggregate(new Count(), new Fields("count"))// partitionAggregate operationtopology.newStream("spout", spout)   .each(new Fields("a, b"), new MyFunction(), new Fields("d"))   .partitionAggregate(new Count(), new Fields("count"))// persistentAggregate - saving the count to memorytopology.newStream("spout", spout)   .each(new Fields("a, b"), new MyFunction(), new Fields("d"))   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));


可以使用CombinerAggregator,ReducerAggregator或通用聚合器接口创建聚合操作.上面示例中使用的"count"聚合器是内置聚合器之一.它使用"CombinerAggregator"实现.实现如下 :

public class Count implements CombinerAggregator {   @Override   public Long init(TridentTuple tuple) {      return 1L;   }   @Override   public Long combine(Long val1, Long val2) {      return val1 + val2;   }   @Override   public Long zero() {      return 0L;   }}


分组

分组操作是一个内置操作,可以通过 groupBy 方法调用.groupBy方法通过在指定字段上执行partitionBy来重新分区流,然后在每个分区内,它将元组组合在一起,其组字段通常情况下,我们使用"groupBy"和"persistentAggregate"来获得组合聚合.示例代码如下 :

TridentTopology topology = new TridentTopology();// persistentAggregate - saving the count to memorytopology.newStream("spout", spout)   .each(new Fields("a, b"), new MyFunction(), new Fields("d"))   .groupBy(new Fields("d")   .persistentAggregate(new MemoryMapState.Factory(), new Count(), new Fields("count"));


合并和加入

合并和加入可以通过使用"合并"和"加入"来完成合并组合一个或多个流.合并类似于合并,除了加入使用来自双方的三叉元组字段来检查和连接两个流的事实.此外,加入仅在批处理级别下工作.示例代码为跟随 :

TridentTopology topology = new TridentTopology();topology.merge(stream1, stream2, stream3);topology.join(stream1, new Fields("key"), stream2, new Fields("x"),    new Fields("key", "a", "b", "c"));

国家维护

Trident提供了一种状态维护机制。 状态信息可以存储在拓扑本身中,否则您也可以将其存储在单独的数据库中。 原因是维持一个状态,如果任何元组在处理期间失败,则重试失败的元组。 这会在更新状态时产生问题,因为您不确定此元组的状态是否先前已更新。 如果在更新状态之前元组失败,则重试元组将使状态稳定。 但是,如果在更新状态后元组失败,则重试相同的元组将再次增加数据库中的计数并使状态不稳定。 需要执行以下步骤以确保仅处理一次消息:

  • 小批量处理元组。

  • 为每个批次分配唯一的ID。 如果重试批次,则会为其指定相同的唯一ID。

  • 状态更新按批次排序。 例如,在第一批的状态更新完成之前,将无法进行第二批的状态更新。

分布式RPC

分布式RPC用于从Trident拓扑中查询和检索结果。 Storm有一个内置的分布式RPC服务器。 分布式RPC服务器从客户端接收RPC请求并将其传递给拓扑。 拓扑处理请求并将结果发送到分布式RPC服务器,该服务器由分布式RPC服务器重定向到客户端。 Trident的分布式RPC查询执行方式与普通RPC查询类似,但这些查询并行运行的事实除外。

何时使用Trident?

与许多用例一样,如果要求只处理一次查询,我们可以通过在Trident中编写拓扑来实现。 另一方面,在Storm的情况下很难实现一次处理。 因此,Trident对于您只需要处理一次的用例非常有用。 Trident不适用于所有用例,尤其是高性能用例,因为它增加了Storm的复杂性并管理状态。

Trident的工作实例

我们将把上一节中讨论的调用日志分析器应用程序转换为Trident框架。 由于其高级API,与普通风暴相比,Trident应用程序相对容易。 Storm基本上需要在Trident中执行Function,Filter,Aggregate,GroupBy,Join和Merge操作中的任何一个。 最后,我们将使用LocalDRPC类启动DRPC服务器,并使用LocalDRPC类的execute方法搜索某个关键字。

格式化信息

FormatCall类的目的是格式化包括"呼叫者号码"和"接收者号码"的呼叫信息。 完整的程序代码如下 :

Coding: FormatCall.java

import backtype.storm.tuple.Values;import storm.trident.operation.BaseFunction;import storm.trident.operation.TridentCollector;import storm.trident.tuple.TridentTuple;public class FormatCall extends BaseFunction {   @Override   public void execute(TridentTuple tuple, TridentCollector collector) {      String fromMobileNumber = tuple.getString(0);      String toMobileNumber = tuple.getString(1);      collector.emit(new Values(fromMobileNumber + " - " + toMobileNumber));   }}

CSVSplit

CSVSplit类的目的是基于"逗号(,)"拆分输入字符串并发出字符串中的每个单词。 此函数用于解析分布式查询的输入参数。 完整的代码如下:

Coding: CSVSplit.java

import backtype.storm.tuple.Values;import storm.trident.operation.BaseFunction;import storm.trident.operation.TridentCollector;import storm.trident.tuple.TridentTuple;public class CSVSplit extends BaseFunction {   @Override   public void execute(TridentTuple tuple, TridentCollector collector) {      for(String word: tuple.getString(0).split(",")) {         if(word.length() > 0) {            collector.emit(new Values(word));         }      }   }}

日志分析器

T他是主要的应用。 最初,应用程序将使用FeederBatchSpout初始化TridentTopology并提供呼叫者信息。 可以使用TridentTopology类的newStream方法创建Trident拓扑流。 类似地,可以使用TridentTopology类的newDRCPStream方法创建Trident拓扑DRPC流。 可以使用LocalDRPC类创建简单的DRCP服务器。 LocalDRPC具有搜索某些关键字的执行方法。 完整的代码如下。

Coding: LogAnalyserTrident.java

import java.util.*;import backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.LocalDRPC;import backtype.storm.utils.DRPCClient;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import storm.trident.TridentState;import storm.trident.TridentTopology;import storm.trident.tuple.TridentTuple;import storm.trident.operation.builtin.FilterNull;import storm.trident.operation.builtin.Count;import storm.trident.operation.builtin.Sum;import storm.trident.operation.builtin.MapGet;import storm.trident.operation.builtin.Debug;import storm.trident.operation.BaseFilter;import storm.trident.testing.FixedBatchSpout;import storm.trident.testing.FeederBatchSpout;import storm.trident.testing.Split;import storm.trident.testing.MemoryMapState;import com.google.common.collect.ImmutableList;public class LogAnalyserTrident {   public static void main(String[] args) throws Exception {      System.out.println("Log Analyser Trident");      TridentTopology topology = new TridentTopology();      FeederBatchSpout testSpout = new FeederBatchSpout(ImmutableList.of("fromMobileNumber",         "toMobileNumber", "duration"));      TridentState callCounts = topology         .newStream("fixed-batch-spout", testSpout)         .each(new Fields("fromMobileNumber", "toMobileNumber"),          new FormatCall(), new Fields("call"))         .groupBy(new Fields("call"))         .persistentAggregate(new MemoryMapState.Factory(), new Count(),          new Fields("count"));      LocalDRPC drpc = new LocalDRPC();      topology.newDRPCStream("call_count", drpc)         .stateQuery(callCounts, new Fields("args"), new MapGet(), new Fields("count"));      topology.newDRPCStream("multiple_call_count", drpc)         .each(new Fields("args"), new CSVSplit(), new Fields("call"))         .groupBy(new Fields("call"))         .stateQuery(callCounts, new Fields("call"), new MapGet(),          new Fields("count"))         .each(new Fields("call", "count"), new Debug())         .each(new Fields("count"), new FilterNull())         .aggregate(new Fields("count"), new Sum(), new Fields("sum"));      Config conf = new Config();      LocalCluster cluster = new LocalCluster();      cluster.submitTopology("trident", conf, topology.build());      Random randomGenerator = new Random();      int idx = 0;      while(idx < 10) {         testSpout.feed(ImmutableList.of(new Values("1234123401",             "1234123402", randomGenerator.nextInt(60))));         testSpout.feed(ImmutableList.of(new Values("1234123401",             "1234123403", randomGenerator.nextInt(60))));         testSpout.feed(ImmutableList.of(new Values("1234123401",             "1234123404", randomGenerator.nextInt(60))));         testSpout.feed(ImmutableList.of(new Values("1234123402",             "1234123403", randomGenerator.nextInt(60))));         idx = idx + 1;      }      System.out.println("DRPC : Query starts");      System.out.println(drpc.execute("call_count","1234123401 - 1234123402"));      System.out.println(drpc.execute("multiple_call_count", "1234123401 -         1234123402,1234123401 - 1234123403"));      System.out.println("DRPC : Query ends");      cluster.shutdown();      drpc.shutdown();      // DRPCClient client = new DRPCClient("drpc.server.location", 3772);   }}

构建和运行应用程序

完整的应用程序有三个Java代码。 它们如下:

  • FormatCall.java

  • CSVSplit.java

  • LogAnalyerTrident.java

可以使用以下命令构建应用程序:

javac -cp "/path/to/storm/apache-storm-0.9.5/lib/*" *.java

可以使用以下命令运行该应用程序:

java -cp "/path/to/storm/apache-storm-0.9.5/lib/*":. LogAnalyserTrident

输出

应用程序启动后,应用程序将输出有关群集启动过程,操作处理,DRPC服务器和客户端信息以及最终群集关闭过程的完整详细信息。 此输出将显示在控制台上,如下所示。

DRPC : Query starts[["1234123401 - 1234123402",10]]DEBUG: [1234123401 - 1234123402, 10]DEBUG: [1234123401 - 1234123403, 10][[20]]DRPC : Query ends