雅虎!金融是互联网领先的商业新闻和金融数据网站。它是雅虎的一部分!并提供有关财务新闻,市场统计数据,国际市场数据以及任何人都可以访问的财务资源的其他信息。
如果您是注册的Yahoo!用户,然后你可以自定义Yahoo!财务以利用其某些产品。雅虎Finance API用于查询来自Yahoo的财务数据!
此API显示的数据实时延迟15分钟,并每1分钟更新一次数据库,以访问当前库存 - 相关信息。现在让我们看一个公司的实时情景,看看当股票价值低于100时如何发出警报。
Spout Creation
鲸鱼喷水的目的是获取公司的详细信息并将价格发给螺栓。您可以使用以下程序代码创建一个spout。
编码:YahooFinanceSpout.java
import java.util中*。
import java.io. *;
import java.math.BigDecimal;
// import yahoofinace packages
import yahoofinance.YahooFinance;
import yahoofinance.Stock;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.topology.IRichSpout;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.spout.SpoutOutputCollector;
import backtype.storm.task.TopologyContext;
公共类YahooFinanceSpout实现IRichSpout {
私有SpoutOutputCollector收集器;
private boolean completed = false;
私有TopologyContext上下文;
@Override
public void open(Map conf,TopologyContext context,SpoutOutputCollector collector){
this.context = context;
this.collector = collector;
}
@Override
public void nextTuple(){
try {
Stock stock = YahooFinance.get("INTC");
BigDecimal price = stock.getQuote()。getPrice();
this.collector.emit(新值("INTC",price.doubleValue()));
stock = YahooFinance.get("GOOGL");
price = stock.getQuote()。getPrice();
this.collector.emit(new Values("GOOGL",price.doubleValue()));
stock = YahooFinance.get("AAPL");
price = stock.getQuote()。getPrice();
this.collector.emit(new Values("AAPL",price.doubleValue()));
} catch(例外e){}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields( "公司","价格"));
}
@Override
public void close(){}
public boolean isDistributed(){
return false;
}
@Override
public void activate(){}
@Override
public void deactivate(){}
@Override
public void ack(Object msgId){}
@Override
public void fail(Object msgId){}
@Override
公共地图< String,Object> getComponentConfiguration(){
返回null;
}
}
螺栓创建
这里螺栓的用途当价格低于100时,它将处理给定公司的价格。当股票价格低于100时,它使用Java Map对象将截止价格限制警报设置为 true ;否则是假的。完整的程序代码如下−
编码:PriceCutOffBolt.java
import java.util .HashMap;
import java.util.Map;
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.task.OutputCollector;
import backtype.storm.task.TopologyContext;
import backtype.storm.topology.IRichBolt;
import backtype.storm.topology.OutputFieldsDeclarer;
import backtype.storm.tuple.Tuple;
公共类PriceCutOffBolt实现IRichBolt {
Map< String,Integer> cutOffMap;
Map< String,Boolean>结果映射;
private OutputCollector collector;
@Override
public void prepare(Map conf,TopologyContext context,OutputCollector collector){
this.cutOffMap = new HashMap< String,Integer>();
this.cutOffMap.put("INTC",100);
this.cutOffMap.put("AAPL",100);
this.cutOffMap.put("GOOGL",100);
this.resultMap = new HashMap< String,Boolean>();
this.collector = collector;
}
@Override
public void execute(Tuple tuple){
String company = tuple.getString(0);
Double price = tuple.getDouble(1);
if(this.cutOffMap.containsKey(company)){
Integer cutOffPrice = this.cutOffMap.get(company);
if(price< cutOffPrice){
this.resultMap.put(company,true);
} else {
this.resultMap.put(company,false);
}
}
collector.ack(tuple);
}
@Override
public void cleanup(){
for(Map.Entry< String,Boolean> entry:resultMap.entrySet()){
System.out.println(entry.getKey()+":"+ entry.getValue());
}
}
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer){
declarer.declare(new Fields("cut_off_price"));
}
@Override
公共地图< String,Object> getComponentConfiguration(){
返回null;
}
}
提交拓扑
这是主要的应用程序,其中YahooFinanceSpout.java和PriceCutOffBolt.java连接在一起并生成拓扑。以下程序代码显示了如何提交拓扑。
编码:YahooFinanceStorm.java
import backtype.storm.tuple.Fields;
import backtype.storm.tuple.Values;
import backtype.storm.Config;
import backtype.storm.LocalCluster;
import backtype.storm.topology.TopologyBuilder;
公共类YahooFinanceStorm {
public static void main(String [] args)throws Exception {
Config config = new Config();
config.setDebug(true);
TopologyBuilder builder = new TopologyBuilder();
builder.setSpout("yahoo-finance-spout",new YahooFinanceSpout());
builder.setBolt("price-cutoff-bolt",new PriceCutOffBolt())
.fieldsGrouping("yahoo-finance-spout",new Fields("company"));
LocalCluster cluster = new LocalCluster();
cluster.submitTopology("YahooFinanceStorm",config,builder.createTopology());
Thread.sleep(10000);
cluster.shutdown();
}
}
构建并运行应用程序
完整的应用程序有三个Java码。它们如下:<
- YahooFinanceSpout.java
- PriceCutOffBolt.java
- YahooFinanceStorm.java
可以使用以下命令构建应用程序−
javac -cp"/path/to/storm/apache-storm-0.9.5/lib/*\":\"/path/to/yahoofinance/lib/*"* .java
可以使用以下命令运行应用程序−
javac -cp"/path/to/storm/apache-storm-0.9.5/lib/*\":\"/path/to/yahoofinance/lib/*":
YahooFinanceStorm
输出
输出类似于以下−
GOOGL:false
AAPL:false
INTC:true