开发手册 欢迎您!
软件开发者资料库

Spark SQL - DataFrames

Spark SQL DataFrames - 从Spark简介,Spark RDD,Spark安装,Spark SQL简介,Spark SQL DataFrames,Spark SQL数据源开始学习Spark SQL。

DataFrame是一个分布式数据集合,它被组织到命名列中.从概念上讲,它等同于具有良好优化技术的关系表.

可以从不同来源的数组构建DataFrame,例如Hive表,结构化数据文件,外部数据库或现有RDD .此API专为现代大数据和数据科学应用程序而设计,灵感来自R编程中的 DataFrame Python中的Pandas .

功能of DataFrame

以下是DataFrame的一些特征功能 :

  • 能够在单个节点群集上处理大小为Kilobytes到PB的数据到大型群集.

  • 支持不同的数据格式(Avro,csv,elastic搜索,和Cassandra)和存储系统(HDFS,HIVE表,mysql等).

  • 通过Spark SQL Catalyst优化器进行最先进的优化和代码生成(树转换框架).

  • 可以通过Spark-Core轻松地与所有大数据工具和框架集成.

  • 为Python,Java,Scala和R编程提供API.

SQLContext

SQLContext是一个类,用于初始化fu Spark SQL的功能.初始化SQLContext类对象需要SparkContext类对象(sc).

以下命令用于通过spark-shell初始化SparkContext.

$ spark-shell

默认情况下,SparkContext对象初始化为名称 sc 当spark-shell启动时.

使用以下命令创建SQLContext.

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

示例

让我们考虑一个名为 employee.json 的JSON文件中的员工记录示例.使用以下命令创建DataFrame(df)并使用以下内容读取名为 employee.json 的JSON文档.

employee.json : 将此文件放在当前 scala> 指针所在的目录中.

{   {"id" : "1201", "name" : "satish", "age" : "25"}   {"id" : "1202", "name" : "krishna", "age" : "28"}   {"id" : "1203", "name" : "amith", "age" : "39"}   {"id" : "1204", "name" : "javed", "age" : "23"}   {"id" : "1205", "name" : "prudvi", "age" : "23"}}

DataFrame Operations

DataFrame为结构化数据操作提供特定于域的语言.在这里,我们提供了一些使用DataFrames进行结构化数据处理的基本示例.

按照下面给出的步骤执行DataFrame操作 :

读取JSON文档

首先,我们必须阅读JSON文档.基于此,生成名为(dfs)的DataFrame.

使用以下命令读取名为 employee.json 的JSON文档.数据显示为带有字段和减号的表格; id,name和age.

scala> val dfs = sqlContext.read.json("employee.json")

输出 : 字段名称是从 employee.json 自动获取的.

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

显示数据

如果你想要查看DataFrame中的数据,然后使用以下命令.

scala> dfs.show()

输出 : 您可以以表格格式查看员工数据.

:22, took 0.052610 s+----+------+--------+|age | id   |  name  |+----+------+--------+| 25 | 1201 | satish || 28 | 1202 | krishna|| 39 | 1203 | amith  || 23 | 1204 | javed  || 23 | 1205 | prudvi |+----+------+--------+

使用printSchema方法

如果要查看DataFrame的结构(架构),请使用以下命令.

scala> dfs.printSchema()

输出

root   |-- age: string (nullable = true)   |-- id: string (nullable = true)   |-- name: string (nullable = true)

使用选择方法

使用以下命令从以下三列中获取名称  - 列DataFrame.

scala> dfs.select("name").show()

输出 : 您可以看到名称列的值.

:22, took 0.044023 s+--------+|  name  |+--------+| satish || krishna|| amith  || javed  || prudvi |+--------+

使用年龄过滤器

使用以下命令找到年龄大于23岁(年龄> 23岁)的员工.

scala> dfs.filter(dfs("age") > 23).show()

输出

:22, took 0.078670 s+----+------+--------+|age | id   | name   |+----+------+--------+| 25 | 1201 | satish || 28 | 1202 | krishna|| 39 | 1203 | amith  |+----+------+--------+

使用groupBy方法

使用以下命令计算年龄相同的员工人数.

scala> dfs.groupBy("age").count().show()

输出 : 两名员工的年龄为23岁.

:22, took 5.196091 s+----+-----+|age |count|+----+-----+| 23 |  2  || 25 |  1  || 28 |  1  || 39 |  1  |+----+-----+

以编程方式运行SQL查询

SQLContext使应用程序能够以编程方式运行SQL查询,同时运行SQL函数并将结果作为DataFrame返回.

通常,在后台,SparkSQL支持两种不同的方法将现有的RDD转换为DataFrames和minus;

Sr.否方法&描述
1使用反射推断模式

此方法使用反射生成包含特定类型对象的RDD模式.

2以编程方式指定模式

创建DataFrame的第二种方法是通过编程接口,允许您构建模式,然后将其应用于现有的RDD.