使用 Pandera 的 PySpark 应用程序的数据验证
如果您是数据从业者,您会意识到数据验证对于确保准确性和一致性至关重要。在处理大型数据集或来自不同来源的数据时,这一点变得尤为重要。但是,Pandera Python 库可以帮助简化和自动化数据验证过程。Pandera 是一个精心制作的开源库,旨在简化模式和数据验证的任务。它建立在熊猫的健壮性和多功能性之上,并引入了专为数据验证目的而设计的直观且富有表现力的 API。
推荐:使用NSDT场景编辑器快速搭建3D应用场景
本文简要介绍了 Pandera 的主要功能,然后继续解释 Pandera 数据验证如何与自最新版本 (Pandera 0.16.0) 以来使用本机 PySpark SQL 的数据处理工作流集成。
Pandera 旨在与其他流行的 Python 库配合使用,如 pandas、pyspark.pandas、Dask 等。这样可以轻松地将数据验证合并到现有数据处理工作流中。直到最近,Pandera 还缺乏对 PySpark SQL 的原生支持,但为了弥合这一差距,QuantumBlack 的一个团队,麦肯锡的 AI 由 Ismail Negm-PARI、Neeraj Malhotra、Jaskaran Singh Sidana、Kasper Janehag、Oleksandr Lazarchuk 以及 Pandera 创始人 Niels Bantilan 组成。,开发了原生的 PySpark SQL 支持并将其贡献给了 Pandera。本文的文字也是团队准备的,下面用他们的话写。
Pandera的主要特点
如果您不熟悉使用Pandera来验证数据,我们建议您查看Khuyen Tran的“使用Pandera验证您的pandas DataFrame”,其中描述了基础知识。总之,我们简要解释了简单直观的 API、内置验证功能和自定义的主要功能和优势。
简单直观的接口
Pandera 的突出特点之一是其简单直观的 API。您可以使用易于阅读和理解的声明性语法来定义数据架构。这使得编写既高效又有效的数据验证代码变得容易。
下面是 Pandera 中的架构定义示例:
class InputSchema(pa.DataFrameModel):
year: Series[int] = pa.Field()
month: Series[int] = pa.Field()
day: Series[int] = pa.Field()
内置验证函数
Pandera 提供了一组内置函数(通常称为检查)来执行数据验证。当我们调用 Pandera 模式时,它将执行模式和数据验证。数据验证将在后台调用函数。validate()
check
下面是如何使用 Pandera 在数据帧对象上运行数据的简单示例。check
class InputSchema(pa.DataFrameModel):
year: Series[int] = pa.Field(gt=2000, coerce=True)
month: Series[int] = pa.Field(ge=1, le=12, coerce=True)
day: Series[int] = pa.Field(ge=0, le=365, coerce=True)
InputSchema.validate(df)
如上所示,对于字段,我们定义了一个检查,强制此字段中的所有值必须大于 2000,否则 Pandera 将引发验证失败。year
gt=2000
以下是 Pandera 默认提供的所有内置检查的列表:
eq: checks if value is equal to a given literal
ne: checks if value is not equal to a given literal
gt: checks if value is greater than a given literal
ge: checks if value is greater than & equal to a given literal
lt: checks if value is less than a given literal
le: checks if value is less than & equal to a given literal
in_range: checks if value is given range
isin: checks if value is given list of literals
notin: checks if value is not in given list of literals
str_contains: checks if value contains string literal
str_endswith: checks if value ends with string literal
str_length: checks if value length matches
str_matches: checks if value matches string literal
str_startswith: checks if value starts with a string literal
自定义验证函数
除了内置的验证检查之外,Pandera 还允许您定义自己的自定义验证函数。这使您能够根据用例灵活地定义自己的验证规则。
例如,您可以定义一个用于数据验证的 lambda 函数,如下所示:
schema = pa.DataFrameSchema({
"column2": pa.Column(str, [
pa.Check(lambda s: s.str.startswith("value")),
pa.Check(lambda s: s.str.split("_", expand=True).shape[1] == 2)
]),
})
向 Pandera 添加对 PySpark SQL DataFrame 的支持
在添加对 PySpark SQL 的支持的过程中,我们坚持了两个基本原则:
- 界面和用户体验的一致性
- 针对 PySpark 的性能优化。
首先,让我们深入研究一致性的主题,因为从用户的角度来看,无论选择的框架如何,他们都有一组一致的 API 和一个接口,这一点很重要。由于Pandera提供了多种框架可供选择,因此在PySpark SQL API中拥有一致的用户体验更为重要。
考虑到这一点,我们可以使用 PySpark SQL 定义 Pandera 模式,如下所示:
from pyspark.sql import DataFrame, SparkSession
import pyspark.sql.types as T
import pandera.pyspark as pa
spark = SparkSession.builder.getOrCreate()
class PanderaSchema(DataFrameModel):
"""Test schema"""
id: T.IntegerType() = Field(gt=5)
product_name: T.StringType() = Field(str_startswith="B")
price: T.DecimalType(20, 5) = Field()
description: T.ArrayType(T.StringType()) = Field()
meta: T.MapType(T.StringType(), T.StringType()) = Field()
data_fail = [
(5, "Bread", 44.4, ["description of product"], {"product_category": "dairy"}),
(15, "Butter", 99.0, ["more details here"], {"product_category": "bakery"}),
]
spark_schema = T.StructType(
[
T.StructField("id", T.IntegerType(), False),
T.StructField("product", T.StringType(), False),
T.StructField("price", T.DecimalType(20, 5), False),
T.StructField("description", T.ArrayType(T.StringType(), False), False),
T.StructField(
"meta", T.MapType(T.StringType(), T.StringType(), False), False
),
],
)
df_fail = spark_df(spark, data_fail, spark_schema)
在上面的代码中,定义了传入 pyspark 数据帧的架构。它有 5 个字段,对 and 字段进行数据检查和强制执行。 PanderaSchema
dtypes
id
product_name
class PanderaSchema(DataFrameModel):
"""Test schema"""
id: T.IntegerType() = Field(gt=5)
product_name: T.StringType() = Field(str_startswith="B")
price: T.DecimalType(20, 5) = Field()
description: T.ArrayType(T.StringType()) = Field()
meta: T.MapType(T.StringType(), T.StringType()) = Field()
接下来,我们构建了一个虚拟数据,并强制实施了 中定义的本机 PySpark SQL 架构。spark_schema
spark_schema = T.StructType(
[
T.StructField("id", T.IntegerType(), False),
T.StructField("product", T.StringType(), False),
T.StructField("price", T.DecimalType(20, 5), False),
T.StructField("description", T.ArrayType(T.StringType(), False), False),
T.StructField(
"meta", T.MapType(T.StringType(), T.StringType(), False), False
),
],
)
df_fail = spark_df(spark, data_fail, spark_schema)
这样做是为了模拟架构和数据验证失败。
以下是数据帧的内容:df_fail
df_fail.show()
+---+-------+--------+--------------------+--------------------+
| id|product| price| description| meta|
+---+-------+--------+--------------------+--------------------+
| 5| Bread|44.40000|[description of p...|{product_category...|
| 15| Butter|99.00000| [more details here]|{product_category...|
+---+-------+--------+--------------------+--------------------+
接下来,我们可以调用 Pandera 的验证函数来执行模式和数据级验证,如下所示:
df_out = PanderaSchema.validate(check_obj=df)
我们将很快探讨的内容。df_out
PySpark 的性能优化
我们的贡献是专门为使用 PySpark 数据帧时的最佳性能而设计的,这在处理大型数据集时至关重要,以便处理 PySpark 分布式计算环境的独特挑战。
Pandera 使用 PySpark 的分布式计算架构来高效处理大型数据集,同时保持数据的一致性和准确性。我们针对 PySpark 性能重写了 Pandera 的自定义验证函数,以便更快、更高效地验证大型数据集,同时降低数据错误和大容量不一致的风险。
全面的错误报告
我们对Pandera进行了另一项添加,以便能够以Python字典对象的形式生成详细的错误报告。这些报告可通过从验证函数返回的数据帧进行访问。它们根据用户的配置提供所有架构和数据级别验证的全面摘要。
事实证明,此功能对于开发人员快速识别和解决任何与数据相关的问题很有价值。通过使用生成的错误报告,团队可以编译其应用程序中架构和数据问题的完整列表。这使他们能够高效、精确地确定问题的优先级和解决方案。
需要注意的是,此功能目前仅适用于 PySpark SQL,为用户提供了在 Pandera 中使用错误报告时增强的体验。
在上面的代码示例中,请记住我们在 Spark 数据帧上调用过:validate()
df_out = PanderaSchema.validate(check_obj=df)
它返回了一个数据帧对象。使用访问器,我们可以从中提取错误报告,如下所示:
print(df_out.pandera.errors)
{
"SCHEMA":{
"COLUMN_NOT_IN_DATAFRAME":[
{
"schema":"PanderaSchema",
"column":"PanderaSchema",
"check":"column_in_dataframe",
"error":"column 'product_name' not in dataframe Row(id=5, product='Bread', price=None, description=['description of product'], meta={'product_category': 'dairy'})"
}
],
"WRONG_DATATYPE":[
{
"schema":"PanderaSchema",
"column":"description",
"check":"dtype('ArrayType(StringType(), True)')",
"error":"expected column 'description' to have type ArrayType(StringType(), True), got ArrayType(StringType(), False)"
},
{
"schema":"PanderaSchema",
"column":"meta",
"check":"dtype('MapType(StringType(), StringType(), True)')",
"error":"expected column 'meta' to have type MapType(StringType(), StringType(), True), got MapType(StringType(), StringType(), False)"
}
]
},
"DATA":{
"DATAFRAME_CHECK":[
{
"schema":"PanderaSchema",
"column":"id",
"check":"greater_than(5)",
"error":"column 'id' with type IntegerType() failed validation greater_than(5)"
}
]
}
}
如上所示,错误报告在 python 字典对象中的 2 个级别上聚合,以便下游应用程序轻松使用,例如使用 Grafana 等工具随时间推移的时间序列可视化错误:
- 验证类型 = 或
SCHEMA
DATA
- 错误类别 = 或 等。
DATAFRAME_CHECK
WRONG_DATATYPE
这种重构错误报告的新格式是在 0.16.0 中引入的,作为我们贡献的一部分。
开/关开关
对于依赖 PySpark 的应用程序,具有开/关开关是一项重要功能,可以在灵活性和风险管理方面产生重大影响。具体而言,开/关开关允许团队在生产中禁用数据验证,而无需更改代码。
这对于性能至关重要的大数据管道尤其重要。在许多情况下,数据验证可能会占用大量处理时间,这可能会影响管道的整体性能。使用开/关开关,团队可以在必要时快速轻松地禁用数据验证,而无需经历耗时的修改代码过程。
我们的团队在 Pandera 中引入了开/关开关,因此用户只需更改配置设置即可轻松关闭生产中的数据验证。这提供了在必要时确定性能优先级所需的灵活性,而不会牺牲开发中的数据质量或准确性。
要启用验证,请在环境变量中设置以下内容:
export PANDERA_VALIDATION_ENABLED=False
Pandera将选取此选项以禁用应用程序中的所有验证。默认情况下,验证处于启用状态。
目前,此功能仅适用于 0.16.0 版本的 PySpark SQL,因为它是我们的贡献引入的新概念。
对Pandera执行的精细控制
除了开/关开关功能外,我们还引入了对 Pandera 验证流程执行的更精细的控制。这是通过引入可配置的设置来实现的,这些设置允许用户在三个不同的级别控制执行:
SCHEMA_ONLY
:此设置仅执行架构验证。它检查数据是否符合架构定义,但不执行任何其他数据级验证。DATA_ONLY
:此设置仅执行数据级验证。它根据定义的约束和规则检查数据,但不验证架构。SCHEMA_AND_DATA
:此设置同时执行架构和数据级验证。它根据架构定义以及定义的约束和规则检查数据。
通过提供这种精细控制,用户可以选择最适合其特定用例的验证级别。例如,如果主要关注点是确保数据符合定义的架构,则可以使用该设置来减少总体处理时间。或者,如果已知数据符合架构,并且重点是确保数据质量,则可以使用该设置来确定数据级验证的优先级。SCHEMA_ONLY
DATA_ONLY
对 Pandera 执行的增强控制使用户能够在精度和效率之间取得微调的平衡,从而实现更有针对性和优化的验证体验。
export PANDERA_VALIDATION_DEPTH=SCHEMA_ONLY
默认情况下,将启用验证,并设置深度,可以根据用例将其更改为或根据需要更改。SCHEMA_AND_DATA
SCHEMA_ONLY
DATA_ONLY
目前,此功能仅适用于 0.16.0 版本的 PySpark SQL,因为它是我们的贡献引入的新概念。
列和数据帧级别的元数据
我们的团队为 Pandera 添加了一项新功能,允许用户在和级别存储额外的元数据。此功能旨在允许用户在其架构定义中嵌入上下文信息,以供其他应用程序利用。Field
Schema / Model
例如,通过存储有关特定列的详细信息(如数据类型、格式或单位),开发人员可以确保下游应用程序能够正确解释和使用数据。同样,通过存储有关特定用例需要架构的哪些列的信息,开发人员可以优化数据处理管道、降低存储成本并提高查询性能。
在架构级别,用户可以存储信息以帮助对整个应用程序的不同架构进行分类。此元数据可以包括架构用途、数据源或数据的日期范围等详细信息。这对于管理复杂的数据处理工作流特别有用,其中多个架构用于不同的目的,需要有效地跟踪和管理。
class PanderaSchema(DataFrameModel):
"""Pandera Schema Class"""
id: T.IntegerType() = Field(
gt=5,
metadata={"usecase": ["RetailPricing", "ConsumerBehavior"],
"category": "product_pricing"},
)
product_name: T.StringType() = Field(str_startswith="B")
price: T.DecimalType(20, 5) = Field()
class Config:
"""Config of pandera class"""
name = "product_info"
strict = True
coerce = True
metadata = {"category": "product-details"}
在上面的示例中,我们引入了有关架构对象本身的其他信息。这在 2 个级别是允许的:字段和架构。
To extract the metadata on schema level (including all fields in it), we provide helper functions as:
PanderaSchema.get_metadata()
The output will be dictionary object as follows:
{
"product_info": {
"columns": {
"id": {"usecase": ["RetailPricing", "ConsumerBehavior"],
"category": "product_pricing"},
"product_name": None,
"price": None,
},
"dataframe": {"category": "product-details"},
}
}
目前,此功能是 0.16.0 中的一个新概念,已针对 PySpark SQL 和 Pandas 添加。
总结
我们引入了几个新功能和概念,包括允许团队在不更改代码的情况下禁用生产中的验证的开/关开关、对 Pandera 验证流程的精细控制,以及在列和数据帧级别存储其他元数据的能力。
由3D建模学习工作室 整理翻译,转载请注明出处!