PySpark 安装指南 PySpark DataFrame 、PySpark Pandas Api快速入门用法示例权威指南点击这里看全文
文章目录
- PySpark 安装指南
-
- 支持的Python版本
- 使用PyPI安装
- 使用Conda安装
- 手动下载安装(最常用)
- 从源代码构建安装
- 依赖项
- PySpark DataFrame 快速入门指南
-
- 创建DataFrame
- 选择和访问数据
- 应用函数
- 分组数据
- 数据输入/输出
- 使用SQL
- 快速入门:Spark Connect
-
- 启动带有Spark Connect的Spark服务器
- 连接到Spark Connect服务器
- 创建DataFrame
- 快速入门:Spark上的Pandas API
-
- 对象创建
-
- 具有特定数据类型
- 显示数据的前几行
- 显示索引、列和底层numpy数据
- 显示数据的快速统计摘要
- 转置数据
- 按索引排序
- 按值排序
- 缺失数据
-
- 删除任何具有缺失数据的行。
- **填充缺失值**
- 操作
-
- 统计
- Spark配置
- 分组
- 绘图
- 输入/输出数据
-
- CSV
- Parquet
- Spark IO
- 往期高质量文档
PySpark 安装指南
PySpark是Apache Spark官方发布的一部分,可以在Apache Spark网站上获取。对于Python用户,PySpark还提供了从PyPI进行pip安装的方式。这通常适用于本地使用或作为连接到集群的客户端,而不是设置一个集群本身。
本页面提供了使用pip、Conda、手动下载和从源代码构建的PySpark安装说明。
支持的Python版本
Python 3.8及以上版本。
使用PyPI安装
使用PyPI进行PySpark安装的方法如下:
pip install pyspark
如果您想为特定组件安装额外的依赖项,可以按照以下方式安装:
# Spark SQL pip install pyspark[sql] # 在Spark上使用pandas API pip install pyspark[pandas_on_spark] plotly # 如果需要绘制数据,还可以安装plotly。 # Spark Connect pip install pyspark[connect]
对于带有/不带有特定Hadoop版本的PySpark,可以使用PYSPARK_HADOOP_VERSION环境变量进行安装:
PYSPARK_HADOOP_VERSION=3 pip install pyspark
默认发行版使用Hadoop 3.3和Hive 2.3。如果用户指定不同版本的Hadoop,pip安装将自动下载并使用PySpark所需的不同版本。根据网络和镜像选择的情况,下载可能需要一些时间。可以设置PYSPARK_RELEASE_MIRROR环境变量手动选择镜像以加快下载速度。
PYSPARK_RELEASE_MIRROR=http://mirror.apache-kr.org PYSPARK_HADOOP_VERSION=3 pip install
建议在pip命令中使用-v选项以跟踪安装和下载的状态:
PYSPARK_HADOOP_VERSION=3 pip install pyspark -v
PYSPARK_HADOOP_VERSION支持以下值:
- without:使用用户提供的Apache Hadoop构建的Spark
- 3:为Apache Hadoop 3.3及更高版本预构建的Spark(默认)
请注意,带有/不带有特定Hadoop版本的PySpark安装是实验性的。它可能会在次要版本之间发生更改或删除。
使用Conda安装
Conda是一个开源的包管理和环境管理系统(由Anaconda开发),最好通过Miniconda或Miniforge进行安装。该工具既跨平台又与语言无关,在实际应用中,Conda可以取代pip和virtualenv。
Conda使用所谓的"channels"来分发软件包。除了Anaconda自身的默认channel之外,最重要的channel是conda-forge。conda-forge是社区驱动的打包项目,提供了最广泛和最新的软件包(通常也是Anaconda channel的上游)。
从终端创建一个新的Conda环境并激活它,步骤如下:
conda create -n pyspark_env conda activate pyspark_env
激活环境后,使用以下命令在同一会话中安装pyspark、所选Python版本以及其他您想要与pyspark一起使用的包(也可以分步安装):
conda install -c conda-forge pyspark # 在这里还可以添加"python=3.8 some_package [etc.]"来指定Python版本和其他包
请注意,Conda下的PySpark由社区单独维护;虽然新版本通常会很快地打包发布,但其在conda(-forge)中的可用性不直接与PySpark的发布周期同步。
虽然在Conda环境中使用pip是可行的(使用与上述相同的命令),但不建议这样做,因为pip与Conda不兼容。
有关有用的Conda命令的简要摘要,请参阅其Cheat Sheet。
手动下载安装(最常用)
PySpark包含在Apache Spark网站提供的发行版中。您可以从该网站下载所需的发行版,并将tar文件解压到您希望安装Spark的目录中,例如:
tar xzvf spark-3.5.0-bin-hadoop3.tgz
确保SPARK_HOME环境变量指向解压后的目录,并更新PYTHONPATH环境变量以便找到SPARK_HOME/python/lib下的PySpark和Py4J库。以下是一个示例:
cd spark-3.5.0-bin-hadoop3 export SPARK_HOME=`pwd` export PYTHONPATH=$(ZIPS=("$SPARK_HOME"/python/lib/*.zip); IFS=:; echo "${ZIPS[*]}"):$PYTHONPATH
从源代码构建安装
要从源代码安装PySpark,请参考构建Spark的相关文档。
依赖项
下表列出了PySpark所需的一些依赖项及其支持的版本:
包名 | 支持的版本 | 备注 |
---|---|---|
py4j | >=0.10.9.7 | 必需 |
pandas | >=1.0.5 | Spark SQL和Spark Connect所需;Spark SQL可选 |
pyarrow | >=4.0.0 | Spark SQL和Spark Connect所需;Spark SQL可选 |
numpy | >=1.15 | Spark SQL和MLLib DataFrame API所需;Spark SQL可选 |
grpcio | >=1.48,<1.57 | Spark Connect所需 |
grpcio-status | >=1.48,<1.57 | Spark Connect所需 |
googleapis-common-protos | ==1.56.4 | Spark Connect所需 |
请注意,PySpark要求Java 8或更高版本,并正确设置JAVA_HOME环境变量。如果使用JDK 11,请设置-Dio.netty.tryReflectionSetAccessible=true以启用与Arrow相关的功能,并参考下载指南。
这是PySpark的安装指南,您可以根据自己的环境和需求选择适合您的安装方法。如果您需要更详细的安装说明和指导,请参考官方文档或相关资源。
PySpark DataFrame 快速入门指南
本文是PySpark DataFrame API的简短介绍和快速入门。PySpark DataFrames是惰性求值的,它们是建立在RDD之上的。当Spark对数据进行转换时,并不立即计算转换结果,而是计划如何在以后进行计算。只有在显式调用collect()等操作时,计算才会开始。本文展示了DataFrame的基本用法,主要面向新用户。您可以在快速入门页面上的“在线笔记本:DataFrame”中自己运行最新版本的这些示例。
Apache Spark文档网站还提供了其他有用的信息,包括最新版本的Spark SQL和DataFrames、RDD编程指南、结构化流处理编程指南、Spark流处理编程指南和机器学习库(MLlib)指南。
PySpark应用程序从初始化SparkSession开始,SparkSession是PySpark的入口点,如下所示。如果通过pyspark可执行文件在PySpark shell中运行它,则shell会自动将会话创建在变量spark中。
from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate()
创建DataFrame
可以使用pyspark.sql.SparkSession.createDataFrame方法创建一个PySpark DataFrame,通常通过传递一个列表、元组、字典和pyspark.sql.Rows的列表,一个pandas DataFrame或一个由此类列表组成的RDD来实现。pyspark.sql.SparkSession.createDataFrame方法可以通过scheme参数指定DataFrame的模式。当省略该参数时,PySpark会通过从数据中取样来推断相应的模式。
首先,可以从一组行创建一个PySpark DataFrame:
from datetime import datetime, date import pandas as pd from pyspark.sql import Row df = spark.createDataFrame([ Row(a=1, b=2., c='string1', d=date(2000, 1, 1), e=datetime(2000, 1, 1, 12, 0)), Row(a=2, b=3., c='string2', d=date(2000, 2, 1), e=datetime(2000, 1, 2, 12, 0)), Row(a=4, b=5., c='string3', d=date(2000, 3, 1), e=datetime(2000, 1, 3, 12, 0)) ]) df
使用显式模式创建一个带有模式的PySpark DataFrame:
df = spark.createDataFrame([ (1, 2., 'string1', date(2000, 1, 1), datetime(2000, 1, 1, 12, 0)), (2, 3., 'string2', date(2000, 2, 1), datetime(2000, 1, 2, 12, 0)
猜你喜欢
网友评论
- 搜索
- 最新文章
- 热门文章