Spark SQL - 数据帧


DataFrame 是分布式数据集合,被组织成命名列。从概念上讲,它相当于具有良好优化技术的关系表。

DataFrame 可以从一系列不同的源(例如 Hive 表、结构化数据文件、外部数据库或现有 RDD)构建。该 API 专为现代大数据和数据科学应用程序而设计,其灵感来自R 编程中的 DataFramePython 中的 Pandas

数据框的特点

这是 DataFrame 的一组特征 -

  • 能够在单节点集群到大型集群上处理千字节到拍字节大小的数据。

  • 支持不同的数据格式(Avro、csv、elasticsearch 和 Cassandra)和存储系统(HDFS、HIVE 表、mysql 等)。

  • 通过 Spark SQL Catalyst 优化器(树转换框架)进行最先进的优化和代码生成。

  • 可以通过 Spark-Core 轻松与所有大数据工具和框架集成。

  • 提供用于 Python、Java、Scala 和 R 编程的 API。

SQL上下文

SQLContext是一个类,用于初始化Spark SQL的功能。初始化 SQLContext 类对象需要 SparkContext 类对象 (sc)。

以下命令用于通过spark-shell初始化SparkContext。

$ spark-shell

默认情况下,SparkContext 对象在 Spark-Shell 启动时使用名称sc进行初始化。

使用以下命令创建 SQLContext。

scala> val sqlcontext = new org.apache.spark.sql.SQLContext(sc)

例子

让我们考虑一个名为employee.json的 JSON 文件中的员工记录示例。使用以下命令创建 DataFrame (df) 并读取名为employee.json的 JSON 文档,其中包含以下内容。

employee.json - 将此文件放置在当前scala>指针所在的目录中。

{
   {"id" : "1201", "name" : "satish", "age" : "25"}
   {"id" : "1202", "name" : "krishna", "age" : "28"}
   {"id" : "1203", "name" : "amith", "age" : "39"}
   {"id" : "1204", "name" : "javed", "age" : "23"}
   {"id" : "1205", "name" : "prudvi", "age" : "23"}
}

数据框操作

DataFrame 为结构化数据操作提供了特定于领域的语言。在这里,我们提供了一些使用 DataFrame 进行结构化数据处理的基本示例。

按照下面给出的步骤执行 DataFrame 操作 -

阅读 JSON 文档

首先,我们必须阅读 JSON 文档。基于此,生成一个名为(dfs)的DataFrame。

使用以下命令读取名为employee.json的JSON文档。数据显示为包含字段的表格 - id、name 和age。

scala> val dfs = sqlContext.read.json("employee.json")

输出- 字段名称自动取自employee.json

dfs: org.apache.spark.sql.DataFrame = [age: string, id: string, name: string]

显示数据

如果您想查看 DataFrame 中的数据,请使用以下命令。

scala> dfs.show()

输出- 您可以以表格格式查看员工数据。

<console>:22, took 0.052610 s
+----+------+--------+
|age | id   |  name  |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
| 23 | 1204 | javed  |
| 23 | 1205 | prudvi |
+----+------+--------+

使用 printSchema 方法

如果您想查看 DataFrame 的结构(架构),请使用以下命令。

scala> dfs.printSchema()

输出

root
   |-- age: string (nullable = true)
   |-- id: string (nullable = true)
   |-- name: string (nullable = true)

使用选择方法

使用以下命令从 DataFrame 中获取三列中的name -column。

scala> dfs.select("name").show()

输出- 您可以看到名称列的值。

<console>:22, took 0.044023 s
+--------+
|  name  |
+--------+
| satish |
| krishna|
| amith  |
| javed  |
| prudvi |
+--------+

使用年龄过滤器

使用以下命令查找年龄大于 23 岁(age > 23)的员工。

scala> dfs.filter(dfs("age") > 23).show()

输出

<console>:22, took 0.078670 s
+----+------+--------+
|age | id   | name   |
+----+------+--------+
| 25 | 1201 | satish |
| 28 | 1202 | krishna|
| 39 | 1203 | amith  |
+----+------+--------+

使用groupBy方法

使用以下命令来统计同龄员工的数量。

scala> dfs.groupBy("age").count().show()

输出- 两名员工的年龄为 23 岁。

<console>:22, took 5.196091 s
+----+-----+
|age |count|
+----+-----+
| 23 |  2  |
| 25 |  1  |
| 28 |  1  |
| 39 |  1  |
+----+-----+

以编程方式运行 SQL 查询

SQLContext 使应用程序能够在运行 SQL 函数时以编程方式运行 SQL 查询,并将结果作为 DataFrame 返回。

一般来说,在后台,SparkSQL 支持两种不同的方法将现有 RDD 转换为 DataFrames -

先生 否 方法与说明
1 使用反射推断模式

此方法使用反射来生成包含特定类型对象的 RDD 的架构。

2 以编程方式指定架构

创建 DataFrame 的第二种方法是通过编程接口,它允许您构建架构,然后将其应用到现有的 RDD。