DataFrame是一个分布式数据集合,它被组织到命名列中.从概念上讲,它等同于具有良好优化技术的关系表.
可以从不同来源的数组构建DataFrame,例如Hive表,结构化数据文件,外部数据库或现有RDD .此API专为现代大数据和数据科学应用程序而设计,灵感来自R编程中的 DataFrame 和 Python中的Pandas .
功能of DataFrame
以下是DataFrame的一些特征功能 :
能够在单个节点群集上处理大小为Kilobytes到PB的数据到大型群集.
支持不同的数据格式(Avro,csv,elastic搜索,和Cassandra)和存储系统(HDFS,HIVE表,mysql等).
通过Spark SQL Catalyst优化器进行最先进的优化和代码生成(树转换框架).
可以通过Spark-Core轻松地与所有大数据工具和框架集成.
为Python,Java,Scala和R编程提供API.
SQLContext
SQLContext是一个类,用于初始化fu Spark SQL的功能.初始化SQLContext类对象需要SparkContext类对象(sc).
以下命令用于通过spark-shell初始化SparkContext.
$ spark-shell
默认情况下,SparkContext对象初始化为名称 sc 当spark-shell启动时.
使用以下命令创建SQLContext.
scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)
示例
让我们考虑一个名为 employee.json 的JSON文件中的员工记录示例.使用以下命令创建DataFrame(df)并使用以下内容读取名为 employee.json 的JSON文档.
employee.json : 将此文件放在当前 scala> 指针所在的目录中.
{ {"id" : "1201", "name" : "satish", "age" : "25"} {"id" : "1202", "name" : "krishna", "age" : "28"} {"id" : "1203", "name" : "amith", "age" : "39"} {"id" : "1204", "name" : "javed", "age" : "23"} {"id" : "1205", "name" : "prudvi", "age" : "23"}}
DataFrame Operations
DataFrame为结构化数据操作提供特定于域的语言.在这里,我们提供了一些使用DataFrames进行结构化数据处理的基本示例.
按照下面给出的步骤执行DataFrame操作 :
读取JSON文档
首先,我们必须阅读JSON文档.基于此,生成名为(dfs)的DataFrame.
使用以下命令读取名为 employee.json 的JSON文档.数据显示为带有字段和减号的表格; id,name和age.
scala> val dfs = sqlContext.read.json("employee.json")
输出 : 字段名称是从 employee.json 自动获取的.
dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]
显示数据
如果你想要查看DataFrame中的数据,然后使用以下命令.
scala> dfs.show()
输出 : 您可以以表格格式查看员工数据.
:22, took 0.052610 s+----+------+--------+|age | id | name |+----+------+--------+| 25 | 1201 | satish || 28 | 1202 | krishna|| 39 | 1203 | amith || 23 | 1204 | javed || 23 | 1205 | prudvi |+----+------+--------+
使用printSchema方法
如果要查看DataFrame的结构(架构),请使用以下命令.
scala> dfs.printSchema()
输出
root |-- age: string (nullable = true) |-- id: string (nullable = true) |-- name: string (nullable = true)
使用选择方法
使用以下命令从以下三列中获取名称 - 列DataFrame.
scala> dfs.select("name").show()
输出 : 您可以看到名称列的值.
:22, took 0.044023 s+--------+| name |+--------+| satish || krishna|| amith || javed || prudvi |+--------+
使用年龄过滤器
使用以下命令找到年龄大于23岁(年龄> 23岁)的员工.
scala> dfs.filter(dfs("age") > 23).show()
输出
:22, took 0.078670 s+----+------+--------+|age | id | name |+----+------+--------+| 25 | 1201 | satish || 28 | 1202 | krishna|| 39 | 1203 | amith |+----+------+--------+
使用groupBy方法
使用以下命令计算年龄相同的员工人数.
scala> dfs.groupBy("age").count().show()
输出 : 两名员工的年龄为23岁.
:22, took 5.196091 s+----+-----+|age |count|+----+-----+| 23 | 2 || 25 | 1 || 28 | 1 || 39 | 1 |+----+-----+
以编程方式运行SQL查询
SQLContext使应用程序能够以编程方式运行SQL查询,同时运行SQL函数并将结果作为DataFrame返回.
通常,在后台,SparkSQL支持两种不同的方法将现有的RDD转换为DataFrames和minus;
Sr.否 | 方法&描述 |
---|---|
1 | 使用反射推断模式 此方法使用反射生成包含特定类型对象的RDD模式. |
2 | 以编程方式指定模式 创建DataFrame的第二种方法是通过编程接口,允许您构建模式,然后将其应用于现有的RDD. |