Créer une activité
Jouer Compléter
Scenario 1 : Unit Testing a Data Transformation Function in PySpark
Problem : You have developed a function that transforms a DataFrame by adding a new column which doubles the values of an existing column . You need to write a unit test to validate that this function performs as expected .

Solution :

import unittest
from pyspark . sql import SparkSession
from pyspark . sql . functions import col

def add_doubled_column ( df , column_name ) :
return df . withColumn ( " doubled " , col ( column_name ) * 2 )

class PySparkTest ( unittest . TestCase ) :
def setUpClass ( cls ) :
cls . spark = SparkSession . builder \
. appName ( " PySparkTesting " ) \
. master ( " local [ 2 ] " ) \
. getOrCreate ( )

def tearDownClass ( cls ) :
cls . spark . stop ( )

def test_add_doubled_column ( self ) :
# Create a sample DataFrame
df = self . spark . createDataFrame (
[ ( 1 , ) , ( 2 , ) , ( 3 , ) ] ,
[ " value " ]

# Apply the transformation
result_df = add_doubled_column ( df , " value " )

# Expected DataFrame
expected_data = [ ( 1 , 2 ) , ( 2 , 4 ) , ( 3 , 6 ) ]
expected_df = self . spark . createDataFrame (
expected_data ,
[ " value " , " doubled " ]

# Assert that the transformed DataFrame matches the expected DataFrame
self . assertTrue (
result_df . collect ( ) = = expected_df . collect ( ) ,
" The transformed DataFrame does not match the expected DataFrame . "

# Run the tests
if __name__ = = " __main__ " :
unittest . main ( )

- - -


____________________ ) :
____________________ )

____________________ ) :
____________________ ) :
____________________ \
. ____________________ " ) \
. ____________________ ] " ) \
. ____________________ ( )

____________________ ) :
____________________ ( )

____________________ ) :
# Create a sample DataFrame
____________________ (
[ ( ____________________ , ) ] ,
[ " ____________________ " ]

# Apply the transformation
____________________ " )

# Expected DataFrame
____________________ ) ]
____________________ (
____________________ ,
[ " ____________________ " ]

# Assert that the transformed DataFrame matches the expected DataFrame
____________________ (
____________________ ( ) ,
" The transformed DataFrame does not match the expected DataFrame . "

# Run the tests
____________________ " :
____________________ ( )

Explanation :

Function Definition : add_doubled_column takes a DataFrame and a column name , then returns the DataFrame with an additional column where the values are doubled .
Unit Test Setup : Using unittest framework to structure PySpark tests , with setUpClass and tearDownClass for initializing and stopping the Spark session .
Test Method : test_add_doubled_column creates a sample DataFrame , applies the transformation , and compares the result with an expected DataFrame to ensure the function works correctly .
Assertions : Uses assertTrue with a condition that checks if the collected data from the result DataFrame matches the expected DataFrame , providing clear feedback if the test fails .
This unit testing approach is a fundamental part of developing reliable PySpark applications , allowing you to validate transformations and logic independently of the complete ETL pipeline .
Scenario 2 : Identifying and Resolving Data Skew in a Spark Job

Debugging Common Issues in Spark Jobs

Problem : You notice that some tasks in your Spark job are taking significantly longer than others , suggesting a possible data skew .

Solution :

from pyspark . sql . functions import spark_partition_id , count

# Analyze the distribution of data across partitions
partition_sizes = df . withColumn ( " partition_id " , spark_partition_id ( ) ) . groupBy ( " partition_id " ) . agg ( count ( " * " ) . alias ( " records_per_partition " ) )

# Check the output to identify skewed partitions
partition_sizes . show ( )


# Analyze the distribution of data across partitions
____________________ " ) )

# Check the output to identify skewed partitions
____________________ ( )

Explanation :

Identify Skew : Use spark_partition_id ( ) to add a column indicating the partition ID for each row , then group by this ID and count the records in each partition . Large discrepancies in these counts suggest skew .
Resolution : Depending on the findings , consider redistributing the data either by repartitioning or using techniques like salting keys that have a large number of records .
Scenario 4 : ( Advanced Topics ) Applying a Custom Calculation to a DataFrame Column Using a UDF

Problem : You need to apply a complex mathematical formula to a column in a DataFrame , which is not directly supported by built - in Spark functions .

Solution :

from pyspark . sql . functions import udf
from pyspark . sql . types import DoubleType

# Define the UDF
def complex_formula ( x ) :
return ( x * 2 . 54 ) / ( x * * 0 . 5 + 2 )

# Register the UDF
complex_formula_udf = udf ( complex_formula , DoubleType ( ) )

# Apply the UDF to a DataFrame
df = df . withColumn ( " result " , complex_formula_udf ( df [ " input_column " ] ) )

- - - - - -


# Define the UDF
____________________ ) :
____________________ )

# Register the UDF
____________________ ( ) )

# Apply the UDF to a DataFrame
____________________ " ] ) )

Explanation :

UDF Definition : Create a Python function complex_formula that performs the calculation .
UDF Registration : Convert the Python function into a UDF that Spark can use , specifying DoubleType ( ) as the return type to ensure correct data handling .
Application : Use withColumn to apply the UDF to the DataFrame , creating a new column with the results of the calculation .
Scenario 6 : ( Advanced Topics ) Enforcing Data Masking on Sensitive Columns

Data Security and Governance in Spark

Problem : You need to ensure that sensitive data , such as personal identifiers , is masked or anonymized in the dataset before it is used for analysis to comply with privacy regulations .

Solution :

from pyspark . sql . functions import col , sha2

# Function to mask sensitive data
def mask_data ( df , column_name ) :
return df . withColumn ( column_name , sha2 ( col ( column_name ) , 256 ) )

# Applying data masking to the DataFrame
secure_df = mask_data ( df , " sensitive_column " )

- - - -


# Function to mask sensitive data
____________________ ) :
____________________ ) )

# Applying data masking to the DataFrame
____________________ " )

Explanation :

Data Masking Function : sha2 is used to hash the sensitive column data , here using a 256 - bit SHA - 2 algorithm . Hashing transforms the sensitive data into a fixed - size string , which appears random and cannot be easily reversed . This ensures that the sensitive data is anonymized while maintaining a unique identifier for records .
Application : The mask_data function applies this transformation to any specified column of the DataFrame . This method helps in protecting sensitive data and is part of data governance practices to comply with data security standards and regulations .
This solution provides a practical approach to enhancing data security in PySpark through data masking , essential for protecting sensitive information and adhering to compliance requirements in data processing workflows .
Scenario 3 : Using Accumulators to Track Execution Metrics

Monitoring and Optimizing Spark Job Performance

Problem : You want to monitor specific operations within your Spark job , such as counting certain events or tracking the occurrence of specific conditions .

Solution :

from pyspark import AccumulatorParam

class DictAccumulatorParam ( AccumulatorParam ) :
def zero ( self , initialValue ) :
return { }

def addInPlace ( self , v1 , v2 ) :
for key in v2 :
if key in v1 :
v1 [ key ] + = v2 [ key ]
else :
v1 [ key ] = v2 [ key ]
return v1

# Create an accumulator
my_accumulator = spark . sparkContext . accumulator ( { } , DictAccumulatorParam ( ) )

# Example usage in a transformation
def update_accumulator ( value ) :
my_accumulator . add ( { value : 1 } )
return value

df . rdd . map ( update_accumulator ) . collect ( )

# Access the accumulated values
print ( my_accumulator . value )

- - - -


____________________ ) :
____________________ ) :
____________________ { }

____________________ ) :
____________________ :
____________________ :
____________________ ]
____________________ :
____________________ ]

# Create an accumulator
____________________ ( ) )

# Example usage in a transformation
____________________ ) :
____________________ } )

____________________ ( )

# Access the accumulated values
____________________ )

Explanation :

Custom Accumulator : Define a custom accumulator that operates on dictionaries , allowing you to count occurrences of various values throughout your job .
Monitoring : Use the accumulator in a map operation or similar to increment counts based on data values or conditions encountered during job execution . This provides a way to track detailed metrics about what's happening within your Spark tasks .
Insights : After the job , inspect the values of the accumulator to gain insights into the distribution or frequency of events , which can help in debugging and understanding job behavior .
These scenarios equip you with practical methods for debugging and monitoring PySpark jobs , ensuring you can identify performance bottlenecks and operational issues , and apply effective solutions to maintain and improve job efficiency .

Scenario 5 : ( Advanced Topics ) Consuming Streaming Data from Apache Kafka

Integrating with Apache Kafka for Streaming Data

Problem : You need to read streaming data from a Kafka topic for real - time processing in PySpark .

Solution :

from pyspark . sql import SparkSession

# Create a Spark session
spark = SparkSession . builder \
. appName ( " KafkaStreaming " ) \
. getOrCreate ( )

# Define Kafka parameters
kafka_topic_name = " stream_topic "
kafka_bootstrap_servers = 'localhost : 9092'

# Read from Kafka
df_stream = spark \
. readStream \
. format ( " kafka " ) \
. option ( " kafka . bootstrap . servers " , kafka_bootstrap_servers ) \
. option ( " subscribe " , kafka_topic_name ) \
. load ( )

# Processing the stream
# Here you can define transformations , for example , parsing the Kafka message
df_stream . selectExpr ( " CAST ( key AS STRING ) " , " CAST ( value AS STRING ) " )

# Start streaming
query = df_stream . writeStream \
. outputMode ( " append " ) \
. format ( " console " ) \
. start ( )

query . awaitTermination ( )

- -


# Create a Spark session
____________________ \
. ____________________ " ) \
. ____________________ ( )

# Define Kafka parameters
____________________ "

# Read from Kafka
____________________ \
. ____________________ \
. ____________________ " ) \
. ____________________ ) \
. ____________________ ) \
. ____________________ ( )

# Processing the stream
# Here you can define transformations , for example , parsing the Kafka message
____________________ ) " )

# Start streaming
____________________ \
. ____________________ " ) \
. ____________________ " ) \
. ____________________ ( )

____________________ ( )

Explanation :

Session Creation : Initialize a Spark session to handle streaming data .
Kafka Configuration : Specify the Kafka bootstrap servers and the topic you are subscribing to .
Streaming Read : Use readStream to connect to Kafka and continuously read the incoming stream .
Data Processing : Apply any necessary transformations , here converting Kafka byte messages to strings for easier handling .
Stream Output : Start the stream processing and output to the console for demonstration purposes . In production , this could be directed to databases , file systems , or other storage services .

These solutions demonstrate how to extend PySpark's capabilities to handle custom data transformations with UDFs and integrate with real - time data streams from Apache Kafka , essential skills for advanced data engineering tasks in dynamic and scalable environments .
Real - World Scenario : Processing Streaming Data from Apache Kafka with PySpark
Scenario : Real - Time Data Aggregation from Multiple Sources
Problem : You need to aggregate real - time sales data coming from multiple store locations via Apache Kafka to monitor overall sales performance and generate timely reports .

Solution :

from pyspark . sql import SparkSession
from pyspark . sql . functions import window , col

# Initialize Spark Session
spark = SparkSession . builder \
. appName ( " Real - Time Sales Aggregation " ) \
. getOrCreate ( )

# Define Kafka parameters
kafka_topic_name = " sales_data "
kafka_bootstrap_servers = 'localhost : 9092'

# Read streaming data from Kafka
sales_stream = spark \
. readStream \
. format ( " kafka " ) \
. option ( " kafka . bootstrap . servers " , kafka_bootstrap_servers ) \
. option ( " subscribe " , kafka_topic_name ) \
. option ( " startingOffsets " , " earliest " ) \
. load ( ) \
. selectExpr ( " CAST ( value AS STRING ) as raw_data " )

# Parse the streaming data
sales_data = sales_stream . select (
get_json_object ( col ( " raw_data " ) , " $ . store_id " ) . alias ( " store_id " ) ,
get_json_object ( col ( " raw_data " ) , " $ . amount " ) . cast ( " double " ) . alias ( " amount " ) ,
get_json_object ( col ( " raw_data " ) , " $ . timestamp " ) . alias ( " timestamp " )

# Aggregate data over a 1 - minute window
aggregated_sales = sales_data \
. groupBy (
window ( col ( " timestamp " ) , " 1 minute " ) ,
col ( " store_id " )
) \
. sum ( " amount " ) \
. alias ( " total_sales " )

# Write the results to a sink , e . g . , console for testing
query = aggregated_sales \
. writeStream \
. outputMode ( " complete " ) \
. format ( " console " ) \
. start ( )

query . awaitTermination ( )

- - -


# Initialize Spark Session
____________________ \
. ____________________ " ) \
. ____________________ ( )

# Define Kafka parameters
____________________ "

# Read streaming data from Kafka
____________________ \
. ____________________ \
. ____________________ " ) \
. ____________________ ) \
. ____________________ ) \
. ____________________ " ) \
. ____________________ ( ) \
. ____________________ " )

# Parse the streaming data
____________________ (
____________________ " ) ,
____________________ ( " ____________________ " ) ,
____________________ " )

# Aggregate data over a 1 - minute window
____________________ \
. ____________________ (
____________________ " ) ,
____________________ " )
) \
. ____________________ " ) \
. ____________________ " )

# Write the results to a sink , e . g . , console for testing
____________________ \
. ____________________ \
. ____________________ " ) \
. ____________________ " ) \
. ____________________ ( )

____________________ ( )

Explanation :

Kafka Integration : The solution begins by connecting to a Kafka topic that streams sales data , ensuring all messages from the earliest are consumed .
Data Parsing : Using Spark's get_json_object to extract relevant fields from JSON formatted messages ensures that each piece of data is correctly interpreted and used for further processing .
Window - Based Aggregation : Grouping data by a time window and store ID , and then summing up sales amounts within each window , allows for real - time monitoring of sales performance across different store locations .
Output : The aggregated data is output to the console , which can be replaced with more robust storage or reporting solutions for production environments .
This solution is an example of how to handle real - time data streams efficiently , providing actionable insights that can support dynamic business operations and decision - making .