开发手册 欢迎您!
软件开发者资料库

实时应用程序(Twitter)

实时应用程序(Twitter) - 从简介,基础知识,集群体系结构,工作流程,安装步骤,基本操作,简单生产者示例,消费者组示例,与Storm集成,与Spark集成,实时应用程序(Twitter)开始学习Apache kafka ,工具,应用程序。

让我们分析实时应用程序以获取最新的Twitter提要及其主题标签.早些时候,我们已经看到Storm和Spark与Kafka的整合.在这两个场景中,我们创建了一个Kafka Producer(使用cli)向Kafka生态系统发送消息.然后,风暴和火花集成通过使用Kafka消费者读取消息并分别将其注入风暴和火花生态系统.所以,实际上我们需要创建一个Kafka Producer,它应该&减去;

  • 使用"Twitter Streaming API"阅读twitter feed,

  • 处理Feed,

  • 提取HashTags并

  • 将其发送给Kafka.

一旦Kafka收到,Storm/Spark集成就会收到信息并将其发送到Storm/Spark生态系统.

Twitter Streaming API

可以使用任何编程语言访问"Twitter Streaming API". "twitter4j"是一个开源的非官方Java库,它提供了一个基于Java的模块,可以轻松访问"Twitter Streaming API". "twitter4j"提供了一个基于监听器的框架来访问推文.要访问"Twitter Streaming API",我们需要登录Twitter开发者帐户,并且应该获得以下 OAuth 身份验证详细信息.

  • Customerkey

  • CustomerSecret

  • AccessToken

  • AccessTookenSecret

创建开发者帐户后,下载"twitter4j"jar文件并将其放在java类路径中.

完整Twitter Kafka生产者编码(KafkaTwitterProducer.java)列在下面 :

import java.util.Arrays;import java.util.Properties;import java.util.concurrent.LinkedBlockingQueue;import twitter4j.*;import twitter4j.conf.*;import org.apache.kafka.clients.producer.Producer;import org.apache.kafka.clients.producer.KafkaProducer;import org.apache.kafka.clients.producer.ProducerRecord;public class KafkaTwitterProducer {   public static void main(String[] args) throws Exception {      LinkedBlockingQueue queue = new LinkedBlockingQueue(1000);            if(args.length < 5){         System.out.println(            "Usage: KafkaTwitterProducer                                       ");         return;      }            String consumerKey = args[0].toString();      String consumerSecret = args[1].toString();      String accessToken = args[2].toString();      String accessTokenSecret = args[3].toString();      String topicName = args[4].toString();      String[] arguments = args.clone();      String[] keyWords = Arrays.copyOfRange(arguments, 5, arguments.length);      ConfigurationBuilder cb = new ConfigurationBuilder();      cb.setDebugEnabled(true)         .setOAuthConsumerKey(consumerKey)         .setOAuthConsumerSecret(consumerSecret)         .setOAuthAccessToken(accessToken)         .setOAuthAccessTokenSecret(accessTokenSecret);      TwitterStream twitterStream = new TwitterStreamFactory(cb.build()).get-Instance();      StatusListener listener = new StatusListener() {                 @Override         public void onStatus(Status status) {                  queue.offer(status);            // System.out.println("@" + status.getUser().getScreenName()                + " - " + status.getText());            // System.out.println("@" + status.getUser().getScreen-Name());            /*for(URLEntity urle : status.getURLEntities()) {               System.out.println(urle.getDisplayURL());            }*/            /*for(HashtagEntity hashtage : status.getHashtagEntities()) {               System.out.println(hashtage.getText());            }*/         }                  @Override         public void onDeletionNotice(StatusDeletionNotice statusDeletion-Notice) {            // System.out.println("Got a status deletion notice id:"                + statusDeletionNotice.getStatusId());         }                  @Override         public void onTrackLimitationNotice(int numberOfLimitedStatuses) {            // System.out.println("Got track limitation notice:" +                num-berOfLimitedStatuses);         }         @Override         public void onScrubGeo(long userId, long upToStatusId) {            // System.out.println("Got scrub_geo event userId:" + userId +             "upToStatusId:" + upToStatusId);         }                        @Override         public void onStallWarning(StallWarning warning) {            // System.out.println("Got stall warning:" + warning);         }                  @Override         public void onException(Exception ex) {            ex.printStackTrace();         }      };      twitterStream.addListener(listener);            FilterQuery query = new FilterQuery().track(keyWords);      twitterStream.filter(query);      Thread.sleep(5000);            //Add Kafka producer config settings      Properties props = new Properties();      props.put("bootstrap.servers", "localhost:9092");      props.put("acks", "all");      props.put("retries", 0);      props.put("batch.size", 16384);      props.put("linger.ms", 1);      props.put("buffer.memory", 33554432);            props.put("key.serializer",          "org.apache.kafka.common.serializa-tion.StringSerializer");      props.put("value.serializer",          "org.apache.kafka.common.serializa-tion.StringSerializer");            Producer producer = new KafkaProducer(props);      int i = 0;      int j = 0;            while(i < 10) {         Status ret = queue.poll();                  if (ret == null) {            Thread.sleep(100);            i++;         }else {            for(HashtagEntity hashtage : ret.getHashtagEntities()) {               System.out.println("Hashtag: " + hashtage.getText());               producer.send(new ProducerRecord(                  top-icName, Integer.toString(j++), hashtage.getText()));            }         }      }      producer.close();      Thread.sleep(5000);      twitterStream.shutdown();   }}

编译

使用以下命令编译应用程序 :

javac -cp"/path/to/kafka/libs/*":"/path/to/twitter4j/lib/*": KafkaTwitterProducer.java

执行

打开两个控制台.在一个控制台中运行上面编译的应用程序,如下所示.

java -cp "/path/to/kafka/libs/*":"/path/to/twitter4j/lib/*":. KafkaTwitterProducer my-first-topic food

在另一个窗口中运行上一章中解释的任何一个Spark/Storm应用程序.需要注意的要点是,在两种情况下使用的主题应该相同.在这里,我们使用"my-first-topic"作为主题名称.

输出

此应用程序的输出将取决于关键字和推特的当前馈送.下面指定了一个示例输出(风暴积分).

. . .food : 1foodie : 2burger : 1. . .