开发手册 欢迎您!
软件开发者资料库

Yahoo!中的Apache Storm金融

Yahoo!中的Apache Storm财务 - 从简介,核心概念,集群架构,工作流,分布式消息系统,安装,工作示例,Trident,Twitter中的Apache Storm,Yahoo!简单而简单的步骤学习Apache Storm。财务,应用。

雅虎!金融是互联网领先的商业新闻和金融数据网站。它是雅虎的一部分!并提供有关财务新闻,市场统计数据,国际市场数据以及任何人都可以访问的财务资源的其他信息。

如果您是注册的Yahoo!用户,然后你可以自定义Yahoo!财务以利用其某些产品。雅虎Finance API用于查询来自Yahoo的财务数据!

此API显示的数据实时延迟15分钟,并每1分钟更新一次数据库,以访问当前库存 - 相关信息。现在让我们看一个公司的实时情景,看看当股票价值低于100时如何发出警报。

Spout Creation

鲸鱼喷水的目的是获取公司的详细信息并将价格发给螺栓。您可以使用以下程序代码创建一个spout。

编码:YahooFinanceSpout.java

 
import java.util中*。
import java.io. *;
import java.math.BigDecimal;
// import yahoofinace packages
import yahoofinance.YahooFinance;
import yahoofinance.Stock;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
公共类YahooFinanceSpout实现IRichSpout {
私有SpoutOutputCollector收集器;
private boolean completed = false;
私有TopologyContext上下文;
@Override
public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple(){
try {
Stock stock = YahooFinance.get("INTC");
BigDecimal price = stock.getQuote()。getPrice();
this.collector.emit(新值("INTC",price.doubleValue()));
stock = YahooFinance.get("GOOGL");
price = stock.getQuote()。getPrice();
this.collector.emit(new Values("GOOGL",price.doubleValue()));
stock = YahooFinance.get("AAPL");
price = stock.getQuote()。getPrice();
this.collector.emit(new Values("AAPL",price.doubleValue()));
} catch(例外e){}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields( "公司","价格"));
}
@Override
public void close(){}
public boolean isDistributed(){
return false;
}
@Override
public void activate(){}
@Override
public void deactivate(){}
@Override
public void ack(Object msgId){}
@Override
public void fail(Object msgId){}
@Override
公共地图< String,Object> getComponentConfiguration(){
返回null;
}
}

螺栓创建

这里螺栓的用途当价格低于100时,它将处理给定公司的价格。当股票价格低于100时,它使用Java Map对象将截止价格限制警报设置为 true ;否则是假的。完整的程序代码如下−

编码:PriceCutOffBolt.java

 
import java.util .HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
公共类PriceCutOffBolt实现IRichBolt {
Map< String,Integer> cutOffMap;
Map< String,Boolean>结果映射;
private OutputCollector collector;
@Override
public void prepare(Map conf,TopologyContext context,OutputCollector collector){
this.cutOffMap = new HashMap< String,Integer>();
this.cutOffMap.put("INTC",100);
this.cutOffMap.put("AAPL",100);
this.cutOffMap.put("GOOGL",100);
this.resultMap = new HashMap< String,Boolean>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple){
String company = tuple.getString(0);
Double price = tuple.getDouble(1);
if(this.cutOffMap.containsKey(company)){
Integer cutOffPrice = this.cutOffMap.get(company);
if(price< cutOffPrice){
this.resultMap.put(company,true);
} else {
this.resultMap.put(company,false);
}
}
collector.ack(tuple);
}
@Override
public void cleanup(){
for(Map.Entry< String,Boolean> entry:resultMap.entrySet()){
System.out.println(entry.getKey()+":"+ entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("cut_off_price"));
}
@Override
公共地图< String,Object> getComponentConfiguration(){
返回null;
}
}

提交拓扑

这是主要的应用程序,其中YahooFinanceSpout.java和PriceCutOffBolt.java连接在一起并生成拓扑。以下程序代码显示了如何提交拓扑。

编码:YahooFinanceStorm.java

 
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
公共类YahooFinanceStorm {
public static void main(String [] args)throws Exception {
Config config = new Config();
config.setDebug(true);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("yahoo-finance-spout",new YahooFinanceSpout());
builder.setBolt("price-cutoff-bolt",new PriceCutOffBolt())
.fieldsGrouping("yahoo-finance-spout",new Fields("company"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("YahooFinanceStorm",config,builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}

构建并运行应用程序

完整的应用程序有三个Java码。它们如下:<

  • YahooFinanceSpout.java
  • PriceCutOffBolt.java
  • YahooFinanceStorm.java

可以使用以下命令构建应用程序−

 
javac -cp"/path/to/storm/apache-storm-0.9.5/lib/*\":\"/path/to/yahoofinance/lib/*"* .java

可以使用以下命令运行应用程序−

 
javac -cp"/path/to/storm/apache-storm-0.9.5/lib/*\":\"/path/to/yahoofinance/lib/*":
YahooFinanceStorm

输出

输出类似于以下−

 
GOOGL:false
AAPL:false
INTC:true