Spark Core是整个项目的基础.它提供分布式任务调度,调度和基本I/O功能. Spark使用称为RDD(弹性分布式数据集)的专用基础数据结构,它是跨机器分区的数据的逻辑集合. RDD可以通过两种方式创建;一种是通过引用外部存储系统中的数据集,另一种是通过在现有RDD上应用转换(例如map,filter,reducer,join).
RDD抽象通过语言集成公开API.这简化了编程复杂性,因为应用程序操作RDD的方式类似于操作本地数据集合.
Spark Shell
Spark提供交互式shell : 交互式分析数据的强大工具.它以Scala或Python语言提供. Spark的主要抽象是一个名为Resilient Distributed Dataset(RDD)的分布式项目集合.可以从Hadoop输入格式(例如HDFS文件)或通过转换其他RDD来创建RDD.
打开Spark Shell
使用以下命令打开Spark shell.
$ spark-shell
创建简单RDD
让我们从文本文件中创建一个简单的RDD.使用以下命令创建一个简单的RDD.
scala> val inputfile = sc.textFile("input.txt")
上述命令的输出是
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MappedRDD[1] at textFile at:12
Spark RDD API引入了一些转换和少量动作来操纵RDD.
RDD转换
RDD转换返回指向新RDD的指针,并允许您在RDD之间创建依赖关系.依赖链中的每个RDD(依赖关系字符串)都有一个用于计算其数据的函数,并且具有指向其父RDD的指针(依赖关系).
Spark是惰性的,因此除非执行任何操作,否则不执行任何操作您调用一些将触发作业创建和执行的转换或操作.请看下面的单词计数示例片段.
因此,RDD转换不是一组数据,而是程序中的一个步骤(可能是唯一的一步)告诉Spark如何获取数据以及如何处理它.
以下是RDD转换列表.
S.No | 转换&含义 |
---|---|
1 | map(func) 返回一个新的分布式数据集,它是通过函数 func 传递源的每个元素而形成的. |
2 | filter(func) 返回通过选择 func 返回true的源元素形成的新数据集. |
3 | flatMap(func) 与map类似,但每个输入项可以映射到0个或更多输出项(因此 func 应该返回Seq而不是单个项目.) |
4 | mapPartitions(func) 与map类似,但在RDD的每个分区(块)上单独运行,因此 func 必须是类型迭代< T> &RARR;迭代< U>在类型为T的RDD上运行时. |
5 | mapPartitionsWithIndex(func) 与地图分区类似,但也提供带有整数值的 func 表示分区的索引,因此 func 必须是类型(Int,Iterator< T>)⇒迭代< U>当在类型T的RDD上运行时. |
6 | sample(withReplacement,fraction,seed) 对数据进行分数,使用或没有替换,使用给定的随机数生成器种子. |
7 | union(otherDataset) 返回一个新数据集,其中包含源数据集和参数中元素的并集. |
8 | intersection(otherDataset) 返回包含源数据集和参数中元素交集的新RDD. |
9 | distinct([numTasks]) 返回包含源数据集的不同元素的新数据集. |
10 | groupByKey([numTasks]) 在数据集上调用时(K,V)对,返回(K,Iterable< V>)对的数据集. 注意 : 如果要对每个键执行聚合(例如总和或平均值)进行分组,则使用reduceByKey或aggregateByKey将产生更好的性能. |
11 | reduceByKey(func,[numTasks]) 在(K,V)对的数据集上调用时,返回(K,V)对的数据集,其中使用给定的reduce函数 func ,必须是(V,V)和rArr类型; V.与groupByKey类似,reduce任务的数量可通过可选的第二个参数进行配置. |
12 | aggregateByKey(zeroValue)(seqOp,combOp,[numTasks]) 调用(K,V)对的数据集,返回(K,U)对的数据集,其中使用给定的组合函数和中性"零"值聚合每个键的值.允许与输入值类型不同的聚合值类型,同时避免不必要的分配.与groupByKey类似,reduce任务的数量可通过可选的第二个参数进行配置. |
13 | sortByKey([升序],[numTasks]) 在数据集上调用时K实现有序的(K,V)对,返回按键按升序或降序排序的(K,V)对数据集,如布尔升序参数中所指定. |
14 | join(otherDataset,[numTasks]) 当调用类型为(K,V)和(K,W)的数据集时,返回(K,(V,W))对的数据集以及所有元素对为每个键.通过leftOuterJoin,rightOuterJoin和fullOuterJoin支持外连接. |
15 | cogroup(otherDataset,[numTasks]) 调用类型为(K,V)的数据集时(和K,W),返回(K,(Iterable< V>,Iterable< W>))元组的数据集.此操作也称为组合. |
16 | cartesian(otherDataset) 在类型T和U的数据集上调用时,返回(T,U)对的数据集(所有元素对). |
17 | pipe(command,[envVars]) 通过shell命令管道RDD的每个分区,例如一个Perl或bash脚本. RDD元素被写入进程的stdin,并且输出到其stdout的行将作为字符串的RDD返回. |
18 | coalesce(numPartitions) 减少分区数量RDD到numPartitions.过滤大型数据集后,可以更有效地运行操作. |
19 | repartition(numPartitions) 随机重新调整RDD中的数据以创建更多或更少的分区和余额穿过他们.这总是通过网络洗牌所有数据. |
20 | repartitionAndSortWithinPartitions(partitioner) 根据给定的分区程序重新分区RDD,并在每个生成的分区中按键分类记录.这比调用重新分区然后在每个分区内排序更有效,因为它可以将排序推送到shuffle机器中. |
操作
下表给出了返回值的操作列表.
S.No | 行动&含义 |
---|---|
1 | reduce(func) 使用函数 func (它接受两个参数并返回一个)来聚合数据集的元素.该函数应该是可交换的和关联的,以便可以并行正确计算. |
2 | collect() 以驱动程序的数组形式返回数据集的所有元素程序.在过滤器或其他返回足够小的数据子集的操作之后,这通常很有用. |
3 | count() 返回数据集中元素的数量. |
4 | first() 返回数据集的第一个元素(类似于take(1)). |
5 | take(n) 返回包含数据集的第一个 n 元素的数组. |
6 | takeSample(withReplacement,num,[seed]) 返回一个数组,其中包含数据集的 num 个元素的随机样本,有或没有替换,可选择预先指定随机数生成器种子. |
7 | takeOrdered(n,[ordering]) 返回th的第一个 n 元素e RDD使用自然顺序或自定义比较器. |
8 | saveAsTextFile(path) 将数据集的元素作为文本文件(或文本文件集)写入在本地文件系统,HDFS或任何其他Hadoop支持的文件系统的给定目录中. Spark在每个元素上调用toString将其转换为文件中的一行文本. |
9 | saveAsSequenceFile(path)(Java和Scala) 写入数据集的元素作为本地文件系统,HDFS或任何其他Hadoop支持的文件系统中给定路径中的Hadoop SequenceFile.这可以在实现Hadoop的Writable接口的键值对的RDD上使用.在Scala中,它也可以在可隐式转换为Writable的类型上使用(Spark包括基本类型的转换,如Int,Double,String等). |
10 | saveAsObjectFile(path)(Java and Scala) 使用Java序列化以简单格式写入数据集的元素,然后可以使用SparkContext.objectFile()加载. |
11 | countByKey() 仅适用于类型(K,V)的RDD.返回(K,Int)对的hashmap以及每个键的计数. |
12 | foreach(func) 运行函数 func 数据集的每个元素.这通常用于副作用,例如更新累加器或与外部存储系统交互. 注意 : 修改foreach()之外的累加器以外的变量可能会导致未定义的行为.有关更多详细信息,请参阅了解闭包. |
使用RDD编程
让我们借助一个例子看一下RDD编程中几个RDD转换和动作的实现.
示例
考虑一个字数例子 : 它计算出现在文档中的每个单词.将以下文本视为输入,并将其保存为主目录中的 input.txt 文件.
input.txt 输入文件.
people are not as beautiful as they look, as they walk or as they talk.they are only as beautiful as they love, as they care as they share.
按照下面给出的步骤执行给定的示例.
打开Spark-Shell
以下命令用于打开spark shell.通常,使用Scala构建spark.因此,Spark程序在Scala环境中运行.
$ spark-shell
如果Spark shell成功打开,那么您将找到以下输出.查看输出的最后一行"可用作sc的Spark上下文"表示Spark容器自动创建名为 sc 的spark上下文对象.在开始程序的第一步之前,应该创建SparkContext对象.
Spark assembly has been built with Hive, including Datanucleus jars on classpath Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties 15/06/04 15:25:22 INFO SecurityManager: Changing view acls to: hadoop 15/06/04 15:25:22 INFO SecurityManager: Changing modify acls to: hadoop 15/06/04 15:25:22 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop) 15/06/04 15:25:22 INFO HttpServer: Starting HTTP Server 15/06/04 15:25:23 INFO Utils: Successfully started service 'HTTP class server' on port 43292. Welcome to ____ __ / __/__ ___ _____/ /__ _\ \/ _ \/ _ `/ __/ '_/ /___/ .__/\_,_/_/ /_/\_\ version 1.4.0 /_/ Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71) Type in expressions to have them evaluated. Spark context available as sc scala>
创建RDD
首先,我们必须使用Spark-Scala API读取输入文件并创建RDD .
以下命令用于从给定位置读取文件.这里,使用inputfile的名称创建新的RDD.在textFile("")方法中作为参数给出的String是输入文件名的绝对路径.但是,如果仅给出文件名,则表示输入文件位于当前位置.
scala> val inputfile = sc.textFile("input.txt")
执行字数转换
我们的目标是计算文件中的单词.创建一个平面地图,将每一行拆分为单词( flatMap(line⇒ line.split("")).
接下来,将每个单词作为一个单词读取值为'1'的键(< key,value> =< word,1>)使用map函数( map(word⇒(word,1) ).
最后,通过添加类似键的值来减少这些键( reduceByKey(_+ _)).
以下命令用于执行字数统计逻辑.执行此操作后,您将找不到任何输出,因为这不是动作,这是转换;指向新RDD或告诉spark如何处理给定数据)
scala> val counts = inputfile.flatMap(line => line.split(" ")).map(word => (word, 1)).reduceByKey(_+_);
当前RDD
使用RDD时,如果你想了解当前的RDD,然后使用以下命令.它将显示有关当前RDD及其依赖关系的说明ging.
scala> counts.toDebugString
缓存转换
您可以使用persist()或cache()方法标记要保留的RDD.第一次在动作中计算它,它将保留在节点的内存中.使用以下命令将中间转换存储在内存中.
scala> counts.cache()
应用动作
应用动作,如将所有变换存储到文本文件中. saveAsTextFile("")方法的String参数是输出文件夹的绝对路径.请尝试以下命令将输出保存在文本文件中.在以下示例中,'output'文件夹位于当前位置.
scala> counts.saveAsTextFile("output")
检查输出
打开另一个终端转到主目录(其中)火花在另一个终端执行).使用以下命令检查输出目录.
[hadoop@localhost ~]$ cd output/ [hadoop@localhost output]$ ls -1 part-00000 part-00001 _SUCCESS
使用以下命令查看 Part-00000 文件的输出.
[hadoop @ localhost输出] $ cat part-00000
输出
(people,1) (are,2) (not,1) (as,8) (beautiful,2) (they, 7) (look,1)
以下命令用于查看 Part-00001 文件的输出.
[hadoop @ localhost输出] $ cat part-00001
输出
(walk, 1) (or, 1) (talk, 1) (only, 1) (love, 1) (care, 1) (share, 1)
UN Persist t存储
在UN-persisting之前,如果要查看用于此应用程序的存储空间,请在浏览器中使用以下URL.
http://localhost:4040
您将看到以下屏幕,其中显示了存储空间用于应用程序,它在Spark shell上运行.
如果要UN-persist特定RDD的存储空间,请使用以下命令.
Scala> counts.unpersist()
您将看到输出如下 :
15/06/27 00:57:33 INFO ShuffledRDD: Removing RDD 9 from persistence list 15/06/27 00:57:33 INFO BlockManager: Removing RDD 9 15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_1 15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_1 of size 480 dropped from memory (free 280061810) 15/06/27 00:57:33 INFO BlockManager: Removing block rdd_9_0 15/06/27 00:57:33 INFO MemoryStore: Block rdd_9_0 of size 296 dropped from memory (free 280062106) res7: cou.type = ShuffledRDD[9] at reduceByKey at:14
要验证浏览器中的存储空间,请使用以下URL.
http://localhost:4040/
您将看到以下屏幕.它显示了用于应用程序的存储空间,它们运行在Spark shell上.