Dr. PySpark: How I Learned to Stop Worrying and Love Data Pipeline Testing

One major challenge in the data pipeline is to reliably test the pipeline codes. This is because the outcome of the logic is tightly coupled with data.
One way to overcome this is to use immutable data to test the pipeline. Obviously, this requires a good knowledge of the application and how well the data matches business requirements.
This also requires some set-ups to enable the developer to focus on building the application logic. In this blog-post, we will discuss a pluggable approach, that provides a testing environment based on locally stored files. Enabling developers to follow test-driven development, identify early bugs, and release the code via CICD based on the test outcome.

Testing breaks

Introduction

We will use Apache PySpark for developing our pipeline. Pytest will be used for test environment preparation and testing code. Follow along code is available at https://github.com/soyelherein/pyspark-cicd-template

Let’s consider we have a pipeline that consumes the “pageviews” file and merges it into the final target table.

Input Table
emailpage_viewcreated_datelast_active
james@example.com102020-01-012020-07-04
mary@example.com1002020-02-042020-02-04
john@example.com12020-03-042020-06-04
Incremental File
email,pages
james@example.com,home
james@example.com,about
patricia@example.com,home
Output Table
emailpage_viewcreated_datelast_active
james@example.com122020-01-012020-07-21
mary@example.com1002020-02-042020-02-04
john@example.com12020-03-042020-06-04
patricia@example.com12020-07-212020-07-01

We will require tables and files for development. They will be made available at runtime by fixture for Pytest by reading locally stored CSV files. In Pytest we can define fixture functions that will be common accross tests in a conftest.py. We have defined a method setup_testbed, for producing the DataFrames and tables based on a JSON configurations file test_bed.json. Below is the content of the config JSON file: Now, we know we have the code for providing a runtime test environment, we can just place a few sample files and start incrementally test and develop our pipeline. We will discuss more on this in the Testbed section. Let's modularizing our application into testable units so that it can be fitted into the frame.

Decouple Application

Our overall project structure would look like below:

There are typically five major sections in the pipeline code.

  1. Spark session management
  2. Declarations for static configutarion
  3. Extract
  4. Transform
  5. Load
jobs — We design separate Extract and Load methods to handle the IO operations of reading files and writing into table. We will test their behavior using mocks.
Transform method deals with the business logic, modify it to take DataFrames as input and returns DataFrames as output, so that it can compared against the locally stored gold standard data. Additionally, we will have an entry point method named run for our pipeline that does the coordination between other methods.

Extract — Reads the incremental file and historical data from the table and return 2 Dataframes

Transform — Calculates the metrics based on incremental and historical DataFrames and return a final DataFrame

Load — Writes the data into the final output path

Run — Does the integration between ETL process. It is exposed to the job submitter module. It accepts the spark session, job configurations, and a logger object to execute the pipeline.

configs and ddl — We will take out the static configurations and place them in a JSON file (configs/config.json) so that it can be overwritten as per the test config.

Given that we have structured our ETL jobs in testable modules. Last thing is to take out the spark environment management before jumping into tests.

Decouple Spark Environment

As it becomes tedious and impractical to test and debug spark-jobs by sending them to a cluster (spark-submit) and teams can become Sherlock Holmes — investigating clues in stack-traces on what could have gone wrong.
pipenv — To avoid the lifeless scenarios we might encounter, we can create an isolated environment (say thanks to pipenv) to initiate a Pyspark session whereas:

  • all development and production dependencies are described in the Pipfile

  • pipenv helps us managing project dependencies and Python environments (i.e. virtual environments)

  • convenient with dependencies management on an ad-hoc basis just with pip install pipenv --dev

dependencies.job_submitter — Since a data application can have numerous upstream and downstream pipelines, it makes sense to take the spark environment management and other common tasks into a shared entry point so that the applications can focus only on their business logic.
This submitter module takes the job name as an argument and executes the functionality defined in it. The pipeline itself has to expose a run method(discussed in the Decouple Application section) that is the entry point for the ETL.
This standalone module is entrusted with starting and stopping spark sessions, parsing the configuration files containing static variables(configs/config.json or mentioned otherwise), and any dynamic command-line arguments then executing the requested pipeline for all pipelines. Please head back to the Github repo for the details.
With this submitter module, the command to submit the pipeline becomes:

$SPARK_HOME/bin/spark-submit \ --py-files dependencies/job_submitter.py, jobs/pipeline_wo_modules.py \ dependencies/job_submitter.py  --job pipeline_wo_modules

Testbed

conftest — We have used Pytest style tests for our pipeline along with leveraging a few features (i.e. mock, patch) from unittest. This file does the heavy lifting of setting up jobs for tests i.e. providing test sparkSession and mocks creating the tables and DataFrames locally from the CSV files. The mapping is defined in the testbed.json file. This config is pretty self-explanatory. We have defined the DataFrame and table details under the “data” key.
If the job accepts any dynamic parameter as job-args(i.e. process_date), that override should be part of the “config” key. It would be sent as a dictionary argument to the job. setup_testbed a helper method is responsible for producing the DataFrame and tables once the test_bed.json file is configured. The file format can be configured as per the need in the conftest, default is as shown below. For read and write operations we encourage teams to use the generic methods like “read.load” and “write”, instead of “read.csv” or “read.orc” so that our mocks can be more generic.

test_pipeline —We have created a session-level pytest fixture containing all the hard work done in the conftest in an object. As you see in the later section we will perform the entire testing using its member attributes.

Now let’s test our transform method that takes the incremental and historical DataFrames as input and produces the final DataFrame. Since the I/O operations are already been separated out we can introspect the calling behavior of extract and load using mocks. These mocks are set up in the conftest file. Since we have already tested individual methods we can make use of patching to do the integration test by patching the outcomes of different functions and avoiding side-effects of writing into the disk. These tests can be run from IDE or by simply running pytest command.

Below is the configuration for PyCharm:

And the output of pytest:

In a complex production scenario, related pipeline methods can be connected in terms of inputs and expected outputs which is immutable. A fair understanding of application and segregation of different subject area can provide a valuable regression like confidence for CICD integration.

CICD :

Dockerfile — Contains the dockerized container with the virtual environment set up for the Jenkins agent.
Makefile —  This Makefile utility zips all the code, dependencies, and config in the packages.zip file so that Jenkins can create the artifact, and the CD process can upload it into a repository. The final code can be submitted as below:
$SPARK_HOME/bin/spark-submit \
                                --py-files packages.zip \
                                --files configs/config.json \
                                dependencies/job_submitter.py --job pipeline --conf-file configs/config.json
                                
Jenkinsfile — It defines the CICD process. where the Jenkins agent runs the docker container defined in the Dockerfile in the prepare step followed by running the test. Once the test is successful in the prepare artifact step, it uses the makefile to create a zipped artifact. The final step is to publish the artifact which is the deployment step.


Source code: https://github.com/soyelherein/pyspark-cicd-template

References

Best Practices for PySpark ETL Projects I have often lent heavily on Apache Spark and the SparkSQL APIs for operationalising any type of batch data-processing…alexioannides.com

Best Practices Writing Production-Grade PySpark Jobs How to Structure Your PySpark Job Repository and Codedeveloperzen.com

Data’s Inferno: 7 Circles of Data Testing Hell with Airflow Why data testing is hard, but you should really do it!medium.com

Published on 18th December 2020 ©soyelherein.github.io

HTML Comment Box is loading comments...