DataFrame
DataFrame是最常见的结构化 API,它是包含行和列的数据表。说明这些列和列类型的一些规则被称为模式(schema)
创建一个 Dataframe 示例
spark = SparkSession.builder.getOrCreate() # 创建一个SparkSession实例
df = spark.read.json("file:///usr/local/spark/examples/src/main/resources/people.json")
df.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
Schema
Schema 定义了 DataFrame 的列名以及列的类型,可以手动定义或从数据源读取模式,由数据源来定义模式又被称为读时模式(schema-on-read)。
查询 DataFrame 的模式信息
df.printSchema()
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
利用行分隔的 JSON 半结构化性质来定义这个结构
- 一个模式是由许多字段构成的 StructType,这些字段即为StructField,具有名称、类型、布尔标志,且用户可以指定与该列关联的元数据(metadata)
spark.read.format("json").load("file:///usr/local/spark/examples/src/main/resources/people.json").schema
StructType(List(StructField(age,LongType,true),StructField(name,StringType,true)))
为一个DataFrame 创建并指定模式
myManualSchema = StructType([
StructField("age", LongType(), True),
StructField("name", StringType(), True)
])
df1 = spark.read.format("json").schema(myManualSchema)\
.load("file:///usr/local/spark/examples/src/main/resources/people.json")
df1.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
列
可以对 DataFrame 中的列进行选择、转换操作和删除,并将这些操作表示为表达式。
对于 Spark 而言,列是逻辑结构,它只是表达根据表达式为妹个记录计算出的值。你不能在 DataFrame的范围外操作一个列,必须使用 Spark的转换操作来对 DataFrame 的列的内容进行修改
构造列
以下二条等效col("someColumnName")
column("someColumnName")
Column<b'someColumnName'>
访问 DataFrame 的列
spark.read.format("json")\
.load("file:///usr/local/spark/examples/src/main/resources/people.json").columns
行
查看第一行
df.first()
Row(age=None, name='Michael')
创建行
myRow = Row("Hello", None, 1, False)
myRow[0], myRow[1]
('Hello', None)
DataFrame 转换操作
处理 DataFrame 对象时的基本核心操作:
- 添加行或列
- 删除行或列
- 将一行转换操作为一列(或相反)
- 根据列中的值更改行的顺序
从原始数据源种创建 DataFrame
df3 = spark.read.format("json")\
.load("file:///usr/local/spark/examples/src/main/resources/people.json")
df3.createOrReplaceTempView("dfTable")
df3.show()
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
通过获取一组行并将其转换为 DataFrame 以创建一个临时的 DataFrame
myManualSchema = StructType([
StructField("some", StringType(), True),
StructField("col", StringType(), True),
StructField("names", LongType(), False)
])
myRow = Row("Hello", None, 1)
myDf = spark.createDataFrame([myRow], myManualSchema)
myDf.show()
+-----+----+-----+
| some| col|names|
+-----+----+-----+
|Hello|null| 1|
+-----+----+-----+
select 函数和 selectExpr 函数
select
select 函数和 selectExpr 函数 支持在 DataFrame 上执行类似数据表的 SQL 查询:df.select("age").show()
df.select(df.age).show()
以上两条输出的结果是一样的
+----+
| age|
+----+
|null|
| 30|
| 19|
+----+
你还可以查询多个列:df.select(df.name, df.age).show()
也可以写成表达式:df.select(df.age + 1).show()
+---------+
|(age + 1)|
+---------+
| null|
| 31|
| 20|
+---------+
expr
expr 提供了一种灵活的引用方式,它可以引用一列,也可以引用对列进行操纵的字符串表达式
你可以使用 expr 更改列名:df.select(expr("name as NAMEtest")).show()
+--------+
|NAMEtest|
+--------+
| Michael|
| Andy|
| Justin|
+--------+
ps:你也可以这样修改列名df.select(df.name.alias("username"),df.age).show()
也可以通过 as 关键字和 alias 方法改回去:df.select(expr("name as NAMEtest").alias("name")).show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
selectExpr
因为 select 后跟着一系列 expr 非常常见,所以 Spark有一个非常常用的接口来描述此操作序列:selectExprdf.selectExpr("name as NAMEtest").show()
这条等效于df.select(expr("name as NAMEtest")).show()
+--------+
|NAMEtest|
+--------+
| Michael|
| Andy|
| Justin|
+--------+
你可以这样把列名改回去:df.select(expr("name as NAMEtest").alias("name")).show()
使用聚合函数
通过 select 我们还可以利用系统预定义好的聚合函数来指定在整个 DataFrame 上的聚合操作:df.selectExpr("avg(age)", "count(distinct(name))").show()
+--------+--------------------+
|avg(age)|count(DISTINCT name)|
+--------+--------------------+
| 24.5| 3|
+--------+--------------------+
转换操作成 Spark 类型(字面量)
有时候我们需要给 Spark 传递显式的值,它们只是一个值而非新的列,我们可以通过字面量(literal)传递,即讲给定的编程语言的字面上的值转换作为 Spark 可以理解的值,字面量就是表达式
当你需要比较一个值是否大于一个常量或者程序创建的变量时,可以使用下面的方法:df.select(expr("*"), lit(1).alias("One")).show()
+----+-------+---+
| age| name|One|
+----+-------+---+
|null|Michael| 1|
| 30| Andy| 1|
| 19| Justin| 1|
+----+-------+---+
添加列
这里是使用 withColumn 函数为 DataFrame 添加新列,这种方式更为规范
添加一个仅包含数字1的列df.withColumn("numberOne", lit(1)).show()
+----+-------+---------+
| age| name|numberOne|
+----+-------+---------+
|null|Michael| 1|
| 30| Andy| 1|
| 19| Justin| 1|
+----+-------+---------+
withColumn 函数里有两个参数:列名和为给定行赋值的表达式。我们也可以使用 withColumn 来对某一列重命名df.withColumn("names", expr("name")).columns
['age', 'name', 'names']
重命名列
除了上面介绍的方法和函数外,还可以使用 withColumnRenamed 方法,在这个方法中,第一个参数是要被修改的列名,第二个参数是新的列名df.withColumnRenamed("name", "newname").columns
['age', 'newname']
删除列
df.drop("name").columns
更改列的类型(强制类型转换)
df.withColumn("age", col("age").cast("string"))
DataFrame[age: string, name: string]
过滤行
为了过滤行,需要创建一个表达式来判断该表达式时 True 还是 False,然后过滤使表达式为 False 的行。有两种常见的实现过滤的方式:where 和 filter ,这二者可以执行相同的操作,接受相同的参数类型
过滤掉年龄大于20岁的:df.filter(df.age > 20 ).show()
这条与上一条时等效的df.where(df.age > 20 ).show()
可以通过链式把过滤条件串联从而实现 AND 过滤操作df.filter(df.age > 15 ).filter(df.age<20).show()
去重
通过调用 DataFrame 中的 distinct 方法,可以对行进行去重操作,这一个转换操作
下面这条会返回去重后的 DataFrame 的行数:df.select("name", "age").distinct().count()
随机抽样
seed = 5
withReplacement = False # 有无放回抽样
fraction = 0.5
df.sample(withReplacement, fraction, seed).show()
随机分割
dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count()
行排序
通过相互等价的 sort 和 orderBy 方法可以对 DataFrame 中的值排序,它们均接受表达式和字符串,以及多个列,默认设置是按升序排列
升序:df.sort(df.age.asc()).show()
降序:df.orderBy(df.age.desc()).show()
多列排序:df.sort(df.age.desc(), df.name.asc()).show()
limit 方法
通过这个方法,可以限制从 DataFrame 中提取的内容
只提取前两条记录df.limit(2).show()
分组聚合
可以通过 groupBy 方法对 DataFrame 进行分组聚合
按 age 列进行分组df.groupBy("age").count().show()