spark 中的dataFrame和python中的DataFrame几乎拥有相同的数据处理方法,鉴于python在数据分析处理领域越来越重要的地位,这或许是spark的数据集从RDD转向Dataframe的一个因素吧。
1,数据查看
show(n=20, truncate=True)---将前n行打印到控制台。n -要显示的行数,默认20条数据。truncate-如果设置为True,默认情况下截断长度超过20个字符的字符串。如果设置为大于1的数,则截断长字符串以长度截断并将单元格对齐。
collect() --- 将所有记录作为行的列表返回
first()---以Row的形式返回第一行。
take(n) ; head(n=None) ; limit(n)
注:如果n>1,返回的是Row的列表,如果n=1,返回的是单个Row
count()---返回这个DataFrame中的行数。
>>> df.show()
|age| name|
| 2|Alice|
| 5| Bob|
>>> df.first()
Row(age=2, name=u'Alice')
>>> df.limit(2).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.limit(0).collect()
[]
>>> df.count()
2
2,数据筛选
select(*cols) 投射一组表达式并返回一个新的DataFrame。
>>> df.selec;5).collect()
[Row(age=2, name=u'Alice')]
>>> df.select('name', 'age').collect()
[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
>>> df.selec, + 10).alias('age')).collect()
[Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
columns --- 将所有列名作为一个列表返回
types --- 将以元组列表的形式返回数据的类型
printSchema()---以树格式打印模式。
>>> df.columns
['age', 'name']
>>>df.types
[('age','int'),('name','string')]
>>> df.printSchema()
|-- age: integer (nullable = true)
|-- name: string (nullable = true)
3,数据排序
sort(*cols,**kwargs)---返回一个按指定列排序的新DataFrame
orderBy(*cols, **kwargs)---返回一个按指定列排序的新DataFrame。
cols—要排序的列或列名列表。
ascending-布尔或布尔列表(默认为真)。排序升序和降序。为多个排序顺序指定列表。如果指定了列表,则列表的长度必须等于cols的长度。
>>> df.sort.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.sort("age", ascending=False).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy.desc()).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> from import *
>>> df.sort(asc("age")).collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.orderBy(desc("age"), "name").collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
>>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
[Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
数据关联
crossJoin(other) --- 用另一个DataFrame返回笛卡尔积。
join(other, on=None, how=None) --- 连接
与另一个DataFrame连接,使用给定的联结表达式。default inner
必须是他们其中的一个: inner, cross, outer, full, full_outer, left, left_outer, right, right_outer, left_semi, and left_anti.
>>> df.select("age", "name").collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> d("name", "height").collect()
[Row(name=u'Tom', height=80), Row(name=u'Bob', height=85)]
>>> df.crossJoin(d("height")).select("age", "name", "height").collect()
[Row(age=2, name=u'Alice', height=80), Row(age=2, name=u'Alice', height=85),
Row(age=5, name=u'Bob', height=80), Row(age=5, name=u'Bob', height=85)]
>>> df.join(df2, 'name', 'inner').drop('age', 'height').collect()
[Row(name=u'Bob')]
>>> df.join(df2, df.name == d, 'outer').selec, d).collect()
[Row(name=None, height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)]
>>> df.join(df2, 'name', 'outer').select('name', 'height').collect()
[Row(name=u'Tom', height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)]
数据探索
describe(*cols)
计算数字和字符串列的统计信息。这包括计数、平均值、stddev、min和max。如果没有给出列,这个函数计算所有数值或字符串列的统计信息。
>>> df.describe(['age']).show()
+-------+------------------+
|summary| age|
+-------+------------------+
| count| 2|
| mean| 3.5|
| stddev|
| min| 2|
| max| 5|
+-------+------------------+
>>> df.describe().show()
+-------+------------------+-----+
|summary| age| name|
| count| 2| 2|
| mean| 3.5| null|
| stddev| null|
| min| 2|Alice|
| max| 5| Bob|
数据清洗及处理
distinct() ---去重,返回一个新的DataFrame,其中包含这个DataFrame中的不同行。
drop(*cols) --- 删除指定的列并返回一个新的DataFrame。
dropna(how='any', thresh=None, subset=None)---删除空值行返回一个新的DataFrame
Da()和Da()是彼此的别名
df.dropna()和df.na.drop()方式效果相等
how -“any”或“all”。如果“any”,则删除包含任何null的行。如果“all”,只在其所有值为null时才删除一行。
thresh - int,如果指定默认值为None,则删除小于thresh非空值的行。这会覆盖how参数。
subset子集-可选的列名列表。
fillna(value, subset=None)---替换null值返回新的DataFrame
Da()和Da()是彼此的别名
df.na.fill(value) 和 df.fillna(value)方法效果相等
value - int, long, float, string或dict.值来替换空值。如果值是一个dict,那么子集将被忽略,值必须是从列名(字符串)到替换值的映射。替换值必须是int、long、float、boolean或string。
subset子集-可选的列名列表。在没有匹配数据类型的子集中指定的列将被忽略。如果value是一个字符串,子集包含一个非字符串列,那么非字符串列就会被忽略。
>>> d({'age': 50, 'name': 'unknown'}).show()
|age|height| name|
| 10| 80| Alice|
| 5| null| Bob|
| 50| null| Tom|
| 50| null|unknown|
>>> df.distinct().count()
2
>>> df.select("age", "name").collect()
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
>>> df.drop('age').collect()
[Row(name=u'Alice'), Row(name=u'Bob')]
>>> df.join(df2, df.name == d, 'inner').dro).collect()
[Row(age=5, height=85, name=u'Bob')]
replace(to_replace, value=None, subset=None)---用另一个值替换一个值,返回一个新的DataFrame
Da()和Da() 是彼此的别名
to_replace - bool,int,long,float,string,list或dict.要替换的值。如果值是一个dict,那么值将被忽略,to_replace必须是值和替换之间的映射。
value- int,long,float,string或list。替换值必须是int、long、float或string。如果value是一个列表,value的长度和类型应该与to_replace相同。如果value是标量,to_replace是序列,则将value用作to_replace中的每个项的替换。
subset-可选的列名列表。在不具有hav的子集中指定的列
>>> d(['Alice', 'Bob'], ['A', 'B'], 'name').show()
| age|height|name|
| 10| 80| A|
| 5| null| B|
|null| null| Tom|
|null| null|null|
filter(condition)---使用给定条件过滤行。where()是filter()的别名。
foreach(f)---将 f 函数应用于该DataFrame的所有行。
>>> df.filter > 3).collect()
[Row(age=5, name=u'Bob')]
>>> df.where == 2).collect()
[Row(age=2, name=u'Alice')]
>>> df.filter("age > 3").collect()
[Row(age=5, name=u'Bob')]
>>> df.where("age = 2").collect()
[Row(age=2, name=u'Alice')]
>>> def f(person):
... prin)
>>> df.foreach(f)
groupBy(*cols) 聚合函数
使用指定的列对DataFrame进行分组,这样我们就可以在它们上运行聚合
groupby()是groupBy()的别名。
*cols--要分组的列的列表。每个元素应该是列名(字符串)或表达式(列)。
DataFrame上的聚合的一组方法,由DataFrame. groupby()创建。
agg(* exprs)---计算聚合并以DataFrame的形式返回结果
可用的聚合函数是avg、max、min、sum、count。
如果exprs是一个从字符串到字符串的唯一字典映射,那么键是要在其上执行聚合的列,值是聚合函数。
或者,exprs也可以是聚合列表达式的列表。
参数:exprs——从列名(字符串)到聚合函数(字符串)的字典映射,或者列的列表。
avg(*cols)---为每个组计算每个数值列的平均值,mean()是avg()的别名。
count()---计算每组记录的数量。
max(*cols)--min(*cols)--sum(*cols)---(最大,最小,合计)为每组计算每列的最大值
pivot(pivot_col,value=None) -类似数据透视表
pivot_col——要pivot的列的名称。
value——将被转换为输出DataFrame中的列的值的列表。
以当前[[DataFrame]]的一个列为轴,并执行指定的聚合。pivot函数有两种版本:一种要求调用者指定要旋转的不同值的列表,另一种则没有。后者更简洁,但效率较低,因为Spark首先需要在内部计算不同值的列表。
>>> gdf = df.groupBy)
>>> sorted({"*": "count"}).collect())
[Row(name=u'Alice', count(1)=1), Row(name=u'Bob', count(1)=1)]
>>> df.groupBy().avg('age').collect()
[Row(avg(age)=3.5)]
>>> d().avg('age', 'height').collect()
[Row(avg(age)=3.5, avg(height)=82.5)]
>>> sorted).count().collect())
[Row(age=2, count=1), Row(age=5, count=1)]
>>> df.groupBy().max('age').collect()
[Row(max(age)=5)]
>>> d().max('age', 'height').collect()
[Row(max(age)=5, max(height)=85)]
>>> d("year").pivot("course", ["dotNET", "Java"]).sum("earnings").collect()
[Row(year=2012, dotNET=15000, Java=20000), Row(year=2013, dotNET=48000, Java=30000)]
>>> d("year").pivot("course").sum("earnings").collect()
[Row(year=2012, Java=20000, dotNET=15000), Row(year=2013, Java=30000, dotNET=48000)]
相关性
corr(col1, col2, method=None)
计算DataFrame两列的相关性作为一个双值。目前只支持皮尔逊相关系数。
cov(col1, col2)---计算给定列的样本协方差,它们的名称指定为双值
转换数据格式
toJSON(use_unicode=True)---每一行都作为返回RDD中的一个元素转换为JSON文档。
>>> df.toJSON().first()
u'{"age":2,"name":"Alice"}'
toLocalIterator()---返回一个迭代器
该迭代器包含此DataFrame中的所有行。迭代器将消耗与此DataFrame中最大分区相同的内存。
>>> li())
[Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
toPandas()
将此DataFrame的内容返回为Pandas 。只有在安装了pandas并可以使用的情况下才能使用。