Compléter
Mastering Testing and Debugging PySpark ApplicationsVersion en ligne
Drills to master testing and debugging of PySpark applications
1
cls.spark.stop
@classmethod
unittest.main
1,), (2,), (3
self.assertTrue
cls.spark = SparkSession.builder
if __name__ == "__main__
def tearDownClass(cls
from pyspark.sql.functions import col
master("local[2
value", "doubled
return df.withColumn("doubled", col(column_name) * 2
def test_add_doubled_column(self
class PySparkTest(unittest.TestCase
result_df = add_doubled_column(df, "value
from pyspark.sql import SparkSession
expected_data = [(1, 2), (2, 4), (3, 6
def setUpClass(cls
def add_doubled_column(df, column_name
value
expected_df = self.spark.createDataFrame
expected_data
import unittest
df = self.spark.createDataFrame
appName("PySparkTesting
getOrCreate
result_df.collect() == expected_df.collect
@classmethod
Scenario
1
:
Unit
Testing
a
Data
Transformation
Function
in
PySpark
Problem
:
You
have
developed
a
function
that
transforms
a
DataFrame
by
adding
a
new
column
which
doubles
the
values
of
an
existing
column
.
You
need
to
write
a
unit
test
to
validate
that
this
function
performs
as
expected
.
Solution
:
import
unittest
from
pyspark
.
sql
import
SparkSession
from
pyspark
.
sql
.
functions
import
col
def
add_doubled_column
(
df
,
column_name
)
:
return
df
.
withColumn
(
"
doubled
"
,
col
(
column_name
)
*
2
)
class
PySparkTest
(
unittest
.
TestCase
)
:
@classmethod
def
setUpClass
(
cls
)
:
cls
.
spark
=
SparkSession
.
builder
\
.
appName
(
"
PySparkTesting
"
)
\
.
master
(
"
local
[
2
]
"
)
\
.
getOrCreate
(
)
@classmethod
def
tearDownClass
(
cls
)
:
cls
.
spark
.
stop
(
)
def
test_add_doubled_column
(
self
)
:
#
Create
a
sample
DataFrame
df
=
self
.
spark
.
createDataFrame
(
[
(
1
,
)
,
(
2
,
)
,
(
3
,
)
]
,
[
"
value
"
]
)
#
Apply
the
transformation
result_df
=
add_doubled_column
(
df
,
"
value
"
)
#
Expected
DataFrame
expected_data
=
[
(
1
,
2
)
,
(
2
,
4
)
,
(
3
,
6
)
]
expected_df
=
self
.
spark
.
createDataFrame
(
expected_data
,
[
"
value
"
,
"
doubled
"
]
)
#
Assert
that
the
transformed
DataFrame
matches
the
expected
DataFrame
self
.
assertTrue
(
result_df
.
collect
(
)
=
=
expected_df
.
collect
(
)
,
"
The
transformed
DataFrame
does
not
match
the
expected
DataFrame
.
"
)
#
Run
the
tests
if
__name__
=
=
"
__main__
"
:
unittest
.
main
(
)
-
-
-
)
:
)
)
:
)
:
\
.
"
)
\
.
]
"
)
\
.
(
)
)
:
(
)
)
:
#
Create
a
sample
DataFrame
(
[
(
,
)
]
,
[
"
"
]
)
#
Apply
the
transformation
"
)
#
Expected
DataFrame
)
]
(
,
[
"
"
]
)
#
Assert
that
the
transformed
DataFrame
matches
the
expected
DataFrame
(
(
)
,
"
The
transformed
DataFrame
does
not
match
the
expected
DataFrame
.
"
)
#
Run
the
tests
"
:
(
)
Explanation
:
Function
Definition
:
add_doubled_column
takes
a
DataFrame
and
a
column
name
,
then
returns
the
DataFrame
with
an
additional
column
where
the
values
are
doubled
.
Unit
Test
Setup
:
Using
unittest
framework
to
structure
PySpark
tests
,
with
setUpClass
and
tearDownClass
for
initializing
and
stopping
the
Spark
session
.
Test
Method
:
test_add_doubled_column
creates
a
sample
DataFrame
,
applies
the
transformation
,
and
compares
the
result
with
an
expected
DataFrame
to
ensure
the
function
works
correctly
.
Assertions
:
Uses
assertTrue
with
a
condition
that
checks
if
the
collected
data
from
the
result
DataFrame
matches
the
expected
DataFrame
,
providing
clear
feedback
if
the
test
fails
.
This
unit
testing
approach
is
a
fundamental
part
of
developing
reliable
PySpark
applications
,
allowing
you
to
validate
transformations
and
logic
independently
of
the
complete
ETL
pipeline
.
2
partition_sizes = df.withColumn("partition_id", spark_partition_id()).groupBy("partition_id").agg(count("*").alias("records_per_partition
partition_sizes.show
from pyspark.sql.functions import spark_partition_id, count
Scenario
2
:
Identifying
and
Resolving
Data
Skew
in
a
Spark
Job
Debugging
Common
Issues
in
Spark
Jobs
Problem
:
You
notice
that
some
tasks
in
your
Spark
job
are
taking
significantly
longer
than
others
,
suggesting
a
possible
data
skew
.
Solution
:
from
pyspark
.
sql
.
functions
import
spark_partition_id
,
count
#
Analyze
the
distribution
of
data
across
partitions
partition_sizes
=
df
.
withColumn
(
"
partition_id
"
,
spark_partition_id
(
)
)
.
groupBy
(
"
partition_id
"
)
.
agg
(
count
(
"
*
"
)
.
alias
(
"
records_per_partition
"
)
)
#
Check
the
output
to
identify
skewed
partitions
partition_sizes
.
show
(
)
#
Analyze
the
distribution
of
data
across
partitions
"
)
)
#
Check
the
output
to
identify
skewed
partitions
(
)
Explanation
:
Identify
Skew
:
Use
spark_partition_id
(
)
to
add
a
column
indicating
the
partition
ID
for
each
row
,
then
group
by
this
ID
and
count
the
records
in
each
partition
.
Large
discrepancies
in
these
counts
suggest
skew
.
Resolution
:
Depending
on
the
findings
,
consider
redistributing
the
data
either
by
repartitioning
or
using
techniques
like
salting
keys
that
have
a
large
number
of
records
.
3
complex_formula_udf = udf(complex_formula, DoubleType
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType
def complex_formula(x
return (x * 2.54) / (x ** 0.5 + 2
df = df.withColumn("result", complex_formula_udf(df["input_column
Scenario
4
:
(
Advanced
Topics
)
Applying
a
Custom
Calculation
to
a
DataFrame
Column
Using
a
UDF
Problem
:
You
need
to
apply
a
complex
mathematical
formula
to
a
column
in
a
DataFrame
,
which
is
not
directly
supported
by
built
-
in
Spark
functions
.
Solution
:
from
pyspark
.
sql
.
functions
import
udf
from
pyspark
.
sql
.
types
import
DoubleType
#
Define
the
UDF
def
complex_formula
(
x
)
:
return
(
x
*
2
.
54
)
/
(
x
*
*
0
.
5
+
2
)
#
Register
the
UDF
complex_formula_udf
=
udf
(
complex_formula
,
DoubleType
(
)
)
#
Apply
the
UDF
to
a
DataFrame
df
=
df
.
withColumn
(
"
result
"
,
complex_formula_udf
(
df
[
"
input_column
"
]
)
)
-
-
-
-
-
-
#
Define
the
UDF
)
:
)
#
Register
the
UDF
(
)
)
#
Apply
the
UDF
to
a
DataFrame
"
]
)
)
Explanation
:
UDF
Definition
:
Create
a
Python
function
complex_formula
that
performs
the
calculation
.
UDF
Registration
:
Convert
the
Python
function
into
a
UDF
that
Spark
can
use
,
specifying
DoubleType
(
)
as
the
return
type
to
ensure
correct
data
handling
.
Application
:
Use
withColumn
to
apply
the
UDF
to
the
DataFrame
,
creating
a
new
column
with
the
results
of
the
calculation
.
4
def mask_data(df, column_name
from pyspark.sql.functions import col, sha2
return df.withColumn(column_name, sha2(col(column_name), 256
secure_df = mask_data(df, "sensitive_column
Scenario
6
:
(
Advanced
Topics
)
Enforcing
Data
Masking
on
Sensitive
Columns
Data
Security
and
Governance
in
Spark
Problem
:
You
need
to
ensure
that
sensitive
data
,
such
as
personal
identifiers
,
is
masked
or
anonymized
in
the
dataset
before
it
is
used
for
analysis
to
comply
with
privacy
regulations
.
Solution
:
from
pyspark
.
sql
.
functions
import
col
,
sha2
#
Function
to
mask
sensitive
data
def
mask_data
(
df
,
column_name
)
:
return
df
.
withColumn
(
column_name
,
sha2
(
col
(
column_name
)
,
256
)
)
#
Applying
data
masking
to
the
DataFrame
secure_df
=
mask_data
(
df
,
"
sensitive_column
"
)
-
-
-
-
#
Function
to
mask
sensitive
data
)
:
)
)
#
Applying
data
masking
to
the
DataFrame
"
)
Explanation
:
Data
Masking
Function
:
sha2
is
used
to
hash
the
sensitive
column
data
,
here
using
a
256
-
bit
SHA
-
2
algorithm
.
Hashing
transforms
the
sensitive
data
into
a
fixed
-
size
string
,
which
appears
random
and
cannot
be
easily
reversed
.
This
ensures
that
the
sensitive
data
is
anonymized
while
maintaining
a
unique
identifier
for
records
.
Application
:
The
mask_data
function
applies
this
transformation
to
any
specified
column
of
the
DataFrame
.
This
method
helps
in
protecting
sensitive
data
and
is
part
of
data
governance
practices
to
comply
with
data
security
standards
and
regulations
.
This
solution
provides
a
practical
approach
to
enhancing
data
security
in
PySpark
through
data
masking
,
essential
for
protecting
sensitive
information
and
adhering
to
compliance
requirements
in
data
processing
workflows
.
5
df.rdd.map(update_accumulator).collect
my_accumulator = spark.sparkContext.accumulator({}, DictAccumulatorParam
return v1
return
v1[key] = v2[key
else
def update_accumulator(value
v1[key] += v2[key
if key in v1
def addInPlace(self, v1, v2
for key in v2
class DictAccumulatorParam(AccumulatorParam
from pyspark import AccumulatorParam
return value
print(my_accumulator.value
my_accumulator.add({value: 1
def zero(self, initialValue
Scenario
3
:
Using
Accumulators
to
Track
Execution
Metrics
Monitoring
and
Optimizing
Spark
Job
Performance
Problem
:
You
want
to
monitor
specific
operations
within
your
Spark
job
,
such
as
counting
certain
events
or
tracking
the
occurrence
of
specific
conditions
.
Solution
:
from
pyspark
import
AccumulatorParam
class
DictAccumulatorParam
(
AccumulatorParam
)
:
def
zero
(
self
,
initialValue
)
:
return
{
}
def
addInPlace
(
self
,
v1
,
v2
)
:
for
key
in
v2
:
if
key
in
v1
:
v1
[
key
]
+
=
v2
[
key
]
else
:
v1
[
key
]
=
v2
[
key
]
return
v1
#
Create
an
accumulator
my_accumulator
=
spark
.
sparkContext
.
accumulator
(
{
}
,
DictAccumulatorParam
(
)
)
#
Example
usage
in
a
transformation
def
update_accumulator
(
value
)
:
my_accumulator
.
add
(
{
value
:
1
}
)
return
value
df
.
rdd
.
map
(
update_accumulator
)
.
collect
(
)
#
Access
the
accumulated
values
print
(
my_accumulator
.
value
)
-
-
-
-
)
:
)
:
{
}
)
:
:
:
]
:
]
#
Create
an
accumulator
(
)
)
#
Example
usage
in
a
transformation
)
:
}
)
(
)
#
Access
the
accumulated
values
)
Explanation
:
Custom
Accumulator
:
Define
a
custom
accumulator
that
operates
on
dictionaries
,
allowing
you
to
count
occurrences
of
various
values
throughout
your
job
.
Monitoring
:
Use
the
accumulator
in
a
map
operation
or
similar
to
increment
counts
based
on
data
values
or
conditions
encountered
during
job
execution
.
This
provides
a
way
to
track
detailed
metrics
about
what's
happening
within
your
Spark
tasks
.
Insights
:
After
the
job
,
inspect
the
values
of
the
accumulator
to
gain
insights
into
the
distribution
or
frequency
of
events
,
which
can
help
in
debugging
and
understanding
job
behavior
.
These
scenarios
equip
you
with
practical
methods
for
debugging
and
monitoring
PySpark
jobs
,
ensuring
you
can
identify
performance
bottlenecks
and
operational
issues
,
and
apply
effective
solutions
to
maintain
and
improve
job
efficiency
.
6
readStream
option("kafka.bootstrap.servers", kafka_bootstrap_servers
format("kafka
kafka_topic_name = "stream_topic
load
option("subscribe", kafka_topic_name
kafka_bootstrap_servers = 'localhost:9092'
df_stream = spark
getOrCreate
df_stream.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING
format("console
query = df_stream.writeStream
outputMode("append
spark = SparkSession.builder
start
from pyspark.sql import SparkSession
query.awaitTermination
appName("KafkaStreaming
Scenario
5
:
(
Advanced
Topics
)
Consuming
Streaming
Data
from
Apache
Kafka
Integrating
with
Apache
Kafka
for
Streaming
Data
Problem
:
You
need
to
read
streaming
data
from
a
Kafka
topic
for
real
-
time
processing
in
PySpark
.
Solution
:
from
pyspark
.
sql
import
SparkSession
#
Create
a
Spark
session
spark
=
SparkSession
.
builder
\
.
appName
(
"
KafkaStreaming
"
)
\
.
getOrCreate
(
)
#
Define
Kafka
parameters
kafka_topic_name
=
"
stream_topic
"
kafka_bootstrap_servers
=
'localhost
:
9092'
#
Read
from
Kafka
df_stream
=
spark
\
.
readStream
\
.
format
(
"
kafka
"
)
\
.
option
(
"
kafka
.
bootstrap
.
servers
"
,
kafka_bootstrap_servers
)
\
.
option
(
"
subscribe
"
,
kafka_topic_name
)
\
.
load
(
)
#
Processing
the
stream
#
Here
you
can
define
transformations
,
for
example
,
parsing
the
Kafka
message
df_stream
.
selectExpr
(
"
CAST
(
key
AS
STRING
)
"
,
"
CAST
(
value
AS
STRING
)
"
)
#
Start
streaming
query
=
df_stream
.
writeStream
\
.
outputMode
(
"
append
"
)
\
.
format
(
"
console
"
)
\
.
start
(
)
query
.
awaitTermination
(
)
-
-
#
Create
a
Spark
session
\
.
"
)
\
.
(
)
#
Define
Kafka
parameters
"
#
Read
from
Kafka
\
.
\
.
"
)
\
.
)
\
.
)
\
.
(
)
#
Processing
the
stream
#
Here
you
can
define
transformations
,
for
example
,
parsing
the
Kafka
message
)
"
)
#
Start
streaming
\
.
"
)
\
.
"
)
\
.
(
)
(
)
Explanation
:
Session
Creation
:
Initialize
a
Spark
session
to
handle
streaming
data
.
Kafka
Configuration
:
Specify
the
Kafka
bootstrap
servers
and
the
topic
you
are
subscribing
to
.
Streaming
Read
:
Use
readStream
to
connect
to
Kafka
and
continuously
read
the
incoming
stream
.
Data
Processing
:
Apply
any
necessary
transformations
,
here
converting
Kafka
byte
messages
to
strings
for
easier
handling
.
Stream
Output
:
Start
the
stream
processing
and
output
to
the
console
for
demonstration
purposes
.
In
production
,
this
could
be
directed
to
databases
,
file
systems
,
or
other
storage
services
.
These
solutions
demonstrate
how
to
extend
PySpark's
capabilities
to
handle
custom
data
transformations
with
UDFs
and
integrate
with
real
-
time
data
streams
from
Apache
Kafka
,
essential
skills
for
advanced
data
engineering
tasks
in
dynamic
and
scalable
environments
.
7
selectExpr("CAST(value AS STRING) as raw_data
writeStream
option("subscribe", kafka_topic_name
format("console
sales_data = sales_stream.select
outputMode("complete
readStream
load
query.awaitTermination
get_json_object(col("raw_data"), "$.amount").cast("double").alias
aggregated_sales = sales_data
kafka_bootstrap_servers = 'localhost:9092'
appName("Real-Time Sales Aggregation
window(col("timestamp"), "1 minute
get_json_object(col("raw_data"), "$.timestamp").alias("timestamp
query = aggregated_sales
kafka_topic_name = "sales_data
sales_stream = spark
option("kafka.bootstrap.servers", kafka_bootstrap_servers
from pyspark.sql.functions import window, col
spark = SparkSession.builder
format("kafka
get_json_object(col("raw_data"), "$.store_id").alias("store_id
start
option("startingOffsets", "earliest
alias("total_sales
groupBy
col("store_id
sum("amount
amount
getOrCreate
from pyspark.sql import SparkSession
Real
-
World
Scenario
:
Processing
Streaming
Data
from
Apache
Kafka
with
PySpark
Scenario
:
Real
-
Time
Data
Aggregation
from
Multiple
Sources
Problem
:
You
need
to
aggregate
real
-
time
sales
data
coming
from
multiple
store
locations
via
Apache
Kafka
to
monitor
overall
sales
performance
and
generate
timely
reports
.
Solution
:
from
pyspark
.
sql
import
SparkSession
from
pyspark
.
sql
.
functions
import
window
,
col
#
Initialize
Spark
Session
spark
=
SparkSession
.
builder
\
.
appName
(
"
Real
-
Time
Sales
Aggregation
"
)
\
.
getOrCreate
(
)
#
Define
Kafka
parameters
kafka_topic_name
=
"
sales_data
"
kafka_bootstrap_servers
=
'localhost
:
9092'
#
Read
streaming
data
from
Kafka
sales_stream
=
spark
\
.
readStream
\
.
format
(
"
kafka
"
)
\
.
option
(
"
kafka
.
bootstrap
.
servers
"
,
kafka_bootstrap_servers
)
\
.
option
(
"
subscribe
"
,
kafka_topic_name
)
\
.
option
(
"
startingOffsets
"
,
"
earliest
"
)
\
.
load
(
)
\
.
selectExpr
(
"
CAST
(
value
AS
STRING
)
as
raw_data
"
)
#
Parse
the
streaming
data
sales_data
=
sales_stream
.
select
(
get_json_object
(
col
(
"
raw_data
"
)
,
"
$
.
store_id
"
)
.
alias
(
"
store_id
"
)
,
get_json_object
(
col
(
"
raw_data
"
)
,
"
$
.
amount
"
)
.
cast
(
"
double
"
)
.
alias
(
"
amount
"
)
,
get_json_object
(
col
(
"
raw_data
"
)
,
"
$
.
timestamp
"
)
.
alias
(
"
timestamp
"
)
)
#
Aggregate
data
over
a
1
-
minute
window
aggregated_sales
=
sales_data
\
.
groupBy
(
window
(
col
(
"
timestamp
"
)
,
"
1
minute
"
)
,
col
(
"
store_id
"
)
)
\
.
sum
(
"
amount
"
)
\
.
alias
(
"
total_sales
"
)
#
Write
the
results
to
a
sink
,
e
.
g
.
,
console
for
testing
query
=
aggregated_sales
\
.
writeStream
\
.
outputMode
(
"
complete
"
)
\
.
format
(
"
console
"
)
\
.
start
(
)
query
.
awaitTermination
(
)
-
-
-
#
Initialize
Spark
Session
\
.
"
)
\
.
(
)
#
Define
Kafka
parameters
"
#
Read
streaming
data
from
Kafka
\
.
\
.
"
)
\
.
)
\
.
)
\
.
"
)
\
.
(
)
\
.
"
)
#
Parse
the
streaming
data
(
"
)
,
(
"
"
)
,
"
)
)
#
Aggregate
data
over
a
1
-
minute
window
\
.
(
"
)
,
"
)
)
\
.
"
)
\
.
"
)
#
Write
the
results
to
a
sink
,
e
.
g
.
,
console
for
testing
\
.
\
.
"
)
\
.
"
)
\
.
(
)
(
)
Explanation
:
Kafka
Integration
:
The
solution
begins
by
connecting
to
a
Kafka
topic
that
streams
sales
data
,
ensuring
all
messages
from
the
earliest
are
consumed
.
Data
Parsing
:
Using
Spark's
get_json_object
to
extract
relevant
fields
from
JSON
formatted
messages
ensures
that
each
piece
of
data
is
correctly
interpreted
and
used
for
further
processing
.
Window
-
Based
Aggregation
:
Grouping
data
by
a
time
window
and
store
ID
,
and
then
summing
up
sales
amounts
within
each
window
,
allows
for
real
-
time
monitoring
of
sales
performance
across
different
store
locations
.
Output
:
The
aggregated
data
is
output
to
the
console
,
which
can
be
replaced
with
more
robust
storage
or
reporting
solutions
for
production
environments
.
This
solution
is
an
example
of
how
to
handle
real
-
time
data
streams
efficiently
,
providing
actionable
insights
that
can
support
dynamic
business
operations
and
decision
-
making
.
|