One major challenge in the data pipeline is to reliably test the pipeline codes.
This is because the outcome of the logic is tightly coupled with data.
One way to overcome this is to use immutable data to test the pipeline.
Obviously, this requires a good knowledge of the application and how well the data matches business requirements.
This also requires some set-ups to enable the developer to focus on building the application logic.
In this blog-post, we will discuss a pluggable approach, that provides a testing environment based on locally stored files.
Enabling developers to follow test-driven development, identify early bugs, and release the code via CICD based on the test outcome.
We will use Apache PySpark for developing our pipeline. Pytest will be used for test environment preparation and testing code.
Follow along code is available at https://github.com/soyelherein/pyspark-cicd-template
Let’s consider we have a pipeline that consumes the “pageviews” file and merges it into the final target table.
page_view | created_date | last_active | |
---|---|---|---|
james@example.com | 10 | 2020-01-01 | 2020-07-04 |
mary@example.com | 100 | 2020-02-04 | 2020-02-04 |
john@example.com | 1 | 2020-03-04 | 2020-06-04 |
email,pages |
---|
james@example.com,home |
james@example.com,about |
patricia@example.com,home |
page_view | created_date | last_active | |
---|---|---|---|
james@example.com | 12 | 2020-01-01 | 2020-07-21 |
mary@example.com | 100 | 2020-02-04 | 2020-02-04 |
john@example.com | 1 | 2020-03-04 | 2020-06-04 |
patricia@example.com | 1 | 2020-07-21 | 2020-07-01 |
There are typically five major sections in the pipeline code.
Extract — Reads the incremental file and historical data from the table and return 2 Dataframes
Transform — Calculates the metrics based on incremental and historical DataFrames and return a final DataFrame
Load — Writes the data into the final output path
Run — Does the integration between ETL process. It is exposed to the job submitter module. It accepts the spark session, job configurations, and a logger object to execute the pipeline.
configs and ddl — We will take out the static configurations and place them in a JSON file (configs/config.json) so that it can be overwritten as per the test config.
Given that we have structured our ETL jobs in testable modules. Last thing is to take out the spark environment management before jumping into tests.
As it becomes tedious and impractical to test and debug spark-jobs by sending them to a cluster (spark-submit) and teams can become Sherlock Holmes — investigating clues in stack-traces on what could have gone wrong.
pipenv — To avoid the lifeless scenarios we might encounter, we can create an isolated environment (say thanks to pipenv) to initiate a Pyspark session whereas:
all development and production dependencies are described in the Pipfile
pipenv helps us managing project dependencies and Python environments (i.e. virtual environments)
convenient with dependencies management on an ad-hoc basis just with pip install pipenv --dev
dependencies.job_submitter — Since a data application can have numerous upstream and downstream pipelines, it makes sense to take the spark environment management and other common tasks into a shared entry point so that the applications can focus only on their business logic.
This submitter module takes the job name as an argument and executes the functionality defined in it. The pipeline itself has to expose a run method(discussed in the Decouple Application section) that is the entry point for the ETL.
This standalone module is entrusted with starting and stopping spark sessions,
parsing the configuration files containing static variables(configs/config.json or mentioned otherwise),
and any dynamic command-line arguments then executing the requested pipeline for all pipelines.
Please head back to the Github repo for the details.
With this submitter module, the command to submit the pipeline becomes:
$SPARK_HOME/bin/spark-submit \ --py-files dependencies/job_submitter.py, jobs/pipeline_wo_modules.py \ dependencies/job_submitter.py --job pipeline_wo_modules
conftest — We have used Pytest style tests for our pipeline along with leveraging a few features (i.e. mock, patch) from unittest.
This file does the heavy lifting of setting up jobs for tests i.e. providing test sparkSession and mocks creating the tables and DataFrames locally from the CSV files. The mapping is defined in the testbed.json file.
This config is pretty self-explanatory. We have defined the DataFrame and table details under the “data” key.
If the job accepts any dynamic parameter as job-args(i.e. process_date), that override should be part of the “config” key.
It would be sent as a dictionary argument to the job. setup_testbed a helper method is responsible for producing the DataFrame and tables once the test_bed.json file is configured.
The file format can be configured as per the need in the conftest, default is as shown below.
For read and write operations we encourage teams to use the generic methods like “read.load” and “write”, instead of “read.csv” or “read.orc” so that our mocks can be more generic.
test_pipeline —We have created a session-level pytest fixture containing all the hard work done in the conftest in an object. As you see in the later section we will perform the entire testing using its member attributes.
Now let’s test our transform method that takes the incremental and historical DataFrames as input and produces the final DataFrame.
Since the I/O operations are already been separated out we can introspect the calling behavior of extract and load using mocks. These mocks are set up in the conftest file.
Since we have already tested individual methods we can make use of patching to do the integration test by patching the outcomes of different functions and avoiding side-effects of writing into the disk.
These tests can be run from IDE or by simply running pytest
command.
And the output of pytest:
In a complex production scenario, related pipeline methods can be connected in terms of inputs and expected outputs which is immutable. A fair understanding of application and segregation of different subject area can provide a valuable regression like confidence for CICD integration.
$SPARK_HOME/bin/spark-submit \
--py-files packages.zip \
--files configs/config.json \
dependencies/job_submitter.py --job pipeline --conf-file configs/config.json
Jenkinsfile — It defines the CICD process. where the Jenkins agent runs the docker container defined in the Dockerfile in the prepare step followed by running the test. Once the test is successful in the prepare artifact step, it uses the makefile to create a zipped artifact. The final step is to publish the artifact which is the deployment step.Source code: https://github.com/soyelherein/pyspark-cicd-template
Published on 18th December 2020 ©soyelherein.github.io