开发手册 欢迎您!
软件开发者资料库

Apache Storm - 工作示例

Apache Storm工作示例 - 从简介,核心概念,集群体系结构,工作流,分布式消息系统,安装,工作示例,Trident,Twitter中的Apache Storm,Yahoo!简单而简单的步骤学习Apache Storm。财务,应用。

我们已经了解了Apache Storm的核心技术细节,现在是时候编写一些简单的场景了.

场景 - 移动呼叫日志分析器

移动呼叫及其持续时间将作为Apache Storm的输入,Storm将在同一呼叫者和接收者之间处理和分组呼叫及其呼叫总数.

Spout Creation

Spout是一个用于生成数据的组件.基本上,spout将实现IRichSpout接口. "IRichSpout"界面有以下重要方法 :

  • 打开 : 为spout提供要执行的环境.执行者将运行此方法来初始化喷口.

  • nextTuple : 通过收集器发出生成的数据.

  • 关闭 : 当喷口即将关闭时,将调用此方法.

  • declareOutputFields : 声明元组的输出模式.

  • ack : 确认已处理特定元组

  • 失败 : 指定不处理特定元组而不进行重新处理.

打开

open 方法的签名如下 :

open(Map conf,TopologyContext context,SpoutOutputCollector collector)

  • conf : 为此喷口提供风暴配置.

  • context : 提供有关拓扑中的喷口位置,其任务ID,输入和输出信息的完整信息.

  • 收集器 : 使我们能够发出将由螺栓处理的元组.

nextTuple

nextTuple 方法的签名如下 :

nextTuple()

从与ack()和fail()方法相同的循环中定期调用nextTuple().当没有工作要做时,它必须释放对线程的控制,以便其他方法有机会被调用.所以nextTuple的第一行检查处理是否已经完成.如果是这样,它应该至少睡眠一毫秒以减少处理器上的负载.

关闭

的签名关闭方法如下 :

close()

declareOutputFields

declareOutputFields 方法的签名如下 :

declareOutputFields(OutputFieldsDeclarer declarer)

declarer : 它用于声明输出流ID,输出字段等.

此方法用于指定元组的输出模式.

ack

ack 方法的签名如下 :

ack(Object msgId)

此方法确认已处理特定元组.

失败

nextTuple 方法的签名如下 :

ack(Object msgId )

此方法通知特定元组尚未完全处理. Storm将重新处理特定的元组.

FakeCallLogReaderSpout

在我们的场景中,我们需要收集呼叫日志详细信息.通话记录的信息包含.

  • 来电号码

  • 接收方号码

  • 持续时间

由于我们没有通话记录的实时信息,我们将生成虚假的通话记录.假信息将使用Random类创建.完整的程序代码如下所示.

编码和减号; FakeCallLogReaderSpout.java

import java.util.*;//import storm tuple packagesimport backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;//import Spout interface packagesimport backtype.storm.topology.IRichSpout;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.spout.SpoutOutputCollector;import backtype.storm.task.TopologyContext;//Create a class FakeLogReaderSpout which implement IRichSpout interface    to access functionalitiespublic class FakeCallLogReaderSpout implements IRichSpout {   //Create instance for SpoutOutputCollector which passes tuples to bolt.   private SpoutOutputCollector collector;   private boolean completed = false;   //Create instance for TopologyContext which contains topology data.   private TopologyContext context;   //Create instance for Random class.   private Random randomGenerator = new Random();   private Integer idx = 0;   @Override   public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {      this.context = context;      this.collector = collector;   }   @Override   public void nextTuple() {      if(this.idx <= 1000) {         List mobileNumbers = new ArrayList();         mobileNumbers.add("1234123401");         mobileNumbers.add("1234123402");         mobileNumbers.add("1234123403");         mobileNumbers.add("1234123404");         Integer localIdx = 0;         while(localIdx++ < 100 && this.idx++ < 1000) {            String fromMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));            String toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));            while(fromMobileNumber == toMobileNumber) {               toMobileNumber = mobileNumbers.get(randomGenerator.nextInt(4));            }            Integer duration = randomGenerator.nextInt(60);            this.collector.emit(new Values(fromMobileNumber, toMobileNumber, duration));         }      }   }   @Override   public void declareOutputFields(OutputFieldsDeclarer declarer) {      declarer.declare(new Fields("from", "to", "duration"));   }   //Override all the interface methods   @Override   public void close() {}   public boolean isDistributed() {      return false;   }   @Override   public void activate() {}   @Override    public void deactivate() {}   @Override   public void ack(Object msgId) {}   @Override   public void fail(Object msgId) {}   @Override   public Map getComponentConfiguration() {      return null;   }}

Bolt Creation

Bolt是一个以元组为输入的组件,处理元组,并产生新的元组作为输出.螺栓将实现 IRichBolt 接口.在这个程序中,两个螺栓类 CallLogCreatorBolt CallLogCounterBolt 用于执行操作.

IRichBolt接口有以下方法 :

  • 准备 : 为螺栓提供要执行的环境.执行者将运行此方法来初始化喷口.

  • 执行 : 处理单个输入元组.

  • 清理 : 当螺栓关闭时调用.

  • declareOutputFields : 声明元组的输出模式.

准备

的签名准备方法如下 :

prepare(Map conf,TopologyContext context,OutputCollector collector)

  • conf : 为此螺栓提供Storm配置.

  • context : 提供有关拓扑中螺栓位置,其任务ID,输入和输出信息等的完整信息.

  • 收集器 : 允许我们发出已处理的元组.

执行

签名执行方法如下 :

execute(Tuple tuple)

这里元组是要处理的输入元组.

执行方法一次处理一个元组.元组数据可以通过Tuple类的getValue方法访问.没有必要立即处理输入元组.可以处理多个元组并将其输出为单个输出元组.可以使用OutputCollector类发出已处理的元组.

cleanup

清理方法的签名是跟随&减去;

cleanup()

declareOutputFields

declareOutputFields 方法的签名如下 :

declareOutputFields(OutputFieldsDeclarer declarer) )

这里参数 declarer 用于声明输出流ID,输出字段等.

此方法用于指定元组的输出模式

调用日志创建者螺栓

调用日志创建者螺栓接收调用日志元组.呼叫日志元组具有呼叫者号码,接收者号码和呼叫持续时间.此螺栓只是通过组合呼叫者号码和接收者号码来创建新值.新值的格式为"来电号码 - 接收号码",并将其命名为新字段"呼叫".完整的代码如下:

Coding :  CallLogCreatorBolt.java

 //import util packages  import java.util.HashMap;  import java.util.Map;  import backtype.storm.tuple.Fields;  import backtype.storm.tuple.Values;  import backtype.storm.task.OutputCollector;  import backtype.storm.task.TopologyContext; //导入Storm IRichBolt包 import backtype.storm.topology.IRichBolt;  import backtype.storm.topology.OutputFieldsDeclarer;  import backtype.storm.tuple.Tuple; //创建一个实现IRichBolt接口的CallLogCreatorBolt类公共类CallLogCreatorBolt实现IRichBolt {//为OutputCollector创建实例,它收集并发出元组以产生输出私有OutputCollector收集器;  @Override  public void prepare(Map conf,TopologyContext context,OutputCollector collector){ this.collector = collector; }  @Override  public void execute(Tuple tuple){ String from = tuple.getString(0);  String to = tuple.getString(1); 整数持续时间= tuple.getInteger(2);  collector.emit(new values(from +" - "+ to,duration)); }  @Override  public void cleanup(){}  @Override  public void declareOutputFields(OutputFieldsDeclarer declarer){ declarer.declare(new Fields("call","duration")); }  @Override 公共地图< String,Object> getComponentConfiguration(){返回null; } }

通话记录计数器螺栓

通话记录计数器螺栓接听电话和它作为一个元组的持续时间.此bolt在prepare方法中初始化一个字典(Map)对象.在执行方法中,它检查元组并在字典对象中为元组中的每个新"调用"值创建一个新条目,并在字典对象中设置值1.对于字典中已有的条目,它只是递增其值.简单来说,这个螺栓将调用及其计数保存在字典对象中.我们也可以将其保存到数据源,而不是将调用及其计数保存在字典中.完整的程序代码如下 :

Coding :  CallLogCounterBolt.java

//import util packagesimport java.util.HashMap;import java.util.Map;import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;import backtype.storm.task.OutputCollector;import backtype.storm.task.TopologyContext;//import Storm IRichBolt packageimport backtype.storm.topology.IRichBolt;import backtype.storm.topology.OutputFieldsDeclarer;import backtype.storm.tuple.Tuple;//Create a class CallLogCreatorBolt which implement IRichBolt interfacepublic class CallLogCreatorBolt implements IRichBolt {   //Create instance for OutputCollector which collects and emits tuples to produce output   private OutputCollector collector;   @Override   public void prepare(Map conf, TopologyContext context, OutputCollector collector) {      this.collector = collector;   }   @Override   public void execute(Tuple tuple) {      String from = tuple.getString(0);      String to = tuple.getString(1);      Integer duration = tuple.getInteger(2);      collector.emit(new Values(from + " - " + to, duration));   }   @Override   public void cleanup() {}   @Override   public void declareOutputFields(OutputFieldsDeclarer declarer) {      declarer.declare(new Fields("call", "duration"));   }   @Override   public Map getComponentConfiguration() {      return null;   }}

创建拓扑

Storm拓扑基本上是节俭结构. TopologyBuilder类提供了简单易用的方法来创建复杂的拓扑. TopologyBuilder类具有设置spout (setSpout)和设置bolt (setBolt)的方法.最后,TopologyBuilder具有createTopology来创建拓扑.使用以下代码片段创建拓扑 :

TopologyBuilder builder = new TopologyBuilder();builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())   .shuffleGrouping("call-log-reader-spout");builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())   .fieldsGrouping("call-log-creator-bolt", new Fields("call"));

shuffleGrouping fieldsGrouping 方法有助于为spout和bolt设置流分组.

本地群集

出于开发目的,我们可以使用"LocalCluster"对象创建本地群集,然后使用"localCluster"类的"submitTopology"方法提交拓扑. "submitTopology"的一个参数是"Config"类的实例. "Config"类用于在提交拓扑之前设置配置选项.此配置选项将在运行时与群集配置合并,并使用prepare方法发送到所有任务(spout和bolt).将拓扑提交到集群后,我们将等待10秒钟,以便集群计算提交的拓扑,然后使用"LocalCluster"的"shutdown"方法关闭集群.完整的程序代码如下 :

Coding :  LogAnalyserStorm.java

import backtype.storm.tuple.Fields;import backtype.storm.tuple.Values;//import storm configuration packagesimport backtype.storm.Config;import backtype.storm.LocalCluster;import backtype.storm.topology.TopologyBuilder;//Create main class LogAnalyserStorm submit topology.public class LogAnalyserStorm {   public static void main(String[] args) throws Exception{      //Create Config instance for cluster configuration      Config config = new Config();      config.setDebug(true);      //      TopologyBuilder builder = new TopologyBuilder();      builder.setSpout("call-log-reader-spout", new FakeCallLogReaderSpout());      builder.setBolt("call-log-creator-bolt", new CallLogCreatorBolt())         .shuffleGrouping("call-log-reader-spout");      builder.setBolt("call-log-counter-bolt", new CallLogCounterBolt())         .fieldsGrouping("call-log-creator-bolt", new Fields("call"));      LocalCluster cluster = new LocalCluster();      cluster.submitTopology("LogAnalyserStorm", config, builder.createTopology());      Thread.sleep(10000);      //Stop the topology      cluster.shutdown();   }}

构建并运行应用程序

完整的应用程序有四个Java码.它们是 :

  • FakeCallLogReaderSpout.java

  • CallLogCreaterBolt.java

  • CallLogCounterBolt.java

  • LogAnalyerStorm.java

可以使用以下命令构建应用程序 :

javac -cp"/path/to/storm/apache-storm-0.9.5/lib/*"* .java

可以使用以下命令运行应用程序 :

java -cp"/path/to/storm/apache-storm-0.9.5/lib/*": LogAnalyserStorm

输出

启动应用程序后,它将输出有关集群启动过程,spout的完整详细信息和螺栓处理,最后是集群关闭过程.在"CallLogCounterBolt"中,我们打印了呼叫及其计数详细信息.此信息将在控制台上显示如下 :

1234123402 - 1234123401 : 781234123402 - 1234123404 : 881234123402 - 1234123403 : 1051234123401 - 1234123404 : 741234123401 - 1234123403 : 811234123401 - 1234123402 : 811234123403 - 1234123404 : 861234123404 - 1234123401 : 631234123404 - 1234123402 : 821234123403 - 1234123402 : 831234123404 - 1234123403 : 861234123403 - 1234123401 : 93

非JVM语言

风暴拓扑由Thrift接口实现,可以轻松地以任何语言提交拓扑. Storm支持Ruby,Python和许多其他语言.让我们来看看python绑定.

Python Binding

Python是一个通用的解释,交互式,面向对象的,高级的编程语言. Storm支持Python实现其拓扑. Python支持发出,锚定,执行和记录操作.

如您所知,可以使用任何语言定义螺栓.用另一种语言编写的螺栓作为子流程执行,Storm通过stdin/stdout与那些带有JSON消息的子流程进行通信.首先拿一个支持python绑定的示例螺栓WordCount.

public static class WordCount implements IRichBolt {   public WordSplit() {      super("python", "splitword.py");   }   public void declareOutputFields(OutputFieldsDeclarer declarer) {      declarer.declare(new Fields("word"));   }}

此处 WordCount 类实现 IRichBolt 界面并运行python实现指定的超级方法参数"splitword.py".现在创建一个名为"splitword.py"的python实现.

import storm   class WordCountBolt(storm.BasicBolt):      def process(self, tup):         words = tup.values[0].split(" ")         for word in words:         storm.emit([word])WordCountBolt().run()

这是Python的示例实现,用于计算给定句子中的单词.同样,您也可以与其他支持语言绑定.