Run Spark Jobs on HDInsight with Azure Data Factory

One of the newer compute actions available in Azure Data Factory is the ability to execute Apache Spark scripts on HDInsight clusters (basic docs here).  I've been really excited about Spark for a while now and it's potential as a powerful, do-it-all analytics and general data engineering compute engine, so I was excited to see this functionality show up in Data Factory.

In this post, we'll look at 2 examples (one simple and one more complex) of how to use this functionality.  You can find all the code for the examples here.  To use the examples on your own, you'll need an Azure subscription with an HDInsight Spark cluster and a Data Factory deployed.

Simple Example

Let's start with a simple example.  The script below (pysparkSimple.py from the code files) will use the sample HVAC data from the HDInsight cluster.  It will read the data into a Spark DataFrame and do some basic aggregations before writing the results out to a new text file.

pysparksimple.png

Notice the entry point into the application.  In contrast to working in the Jupyter Notebooks environment provided on the HDInsight cluster, stand-alone apps and scripts are required to define the context in which they will operate.  The value for .appname will be useful later for identifying the application when looking at the Spark logs.

sciptsonblob.png

We'll copy pysparkSimple.py to a Blob store where the cluster can access it.  This is the file that will be called by the Data Factory pipeline.  Here you see it placed in the scripts folder in the cluster container.  

Now that the PySpark script file is deployed, let's look at the Data Factory objects needed to execute it.

For this simple data flow, the Data Factory objects will consist of the following from the code files:

  • ls_azstore_storageaccount --Linked Service for the storage account where the cluster/files are located
  • ls_hdi_cluster -- Linked Service for the HDI Spark Cluster
  • ds_hvacsource -- Dataset used as the source for the Pyspark script
  • ds_sparksimpleoutput -- Dataset used as the output for the Pyspark script
  • pipe_sparksimple -- Pipeline that executes the pysparkSimple.py file

There's nothing too unique about the linked services and datasets for this example so we won't spend any time looking at them, but lets take a quick look at the pipeline.

simplepipe.png

This pipeline will simply execute the pysparkSimple.py file that was deployed.  For the example, the pipeline is set to execute for a single day.  Pay attention to the "rootPath" argument, as the format for the path (\\) is a little unique relative to other paths in Data Factory.

After the pipeline is deployed, we should be able to go to the Monitor and Manage app for Data Factory and see that the pipeline executed (take note of the "start" and "end" datetime used in the pipeline).  We can also go to the Spark History Server (https://#clustername#.azurehdinsight.net/sparkhistory/) and find information about the run there.

sparkhistorysimple.png

A record with the App Name from our PySpark script (SparkonADF - Simple) should be shown in the list of completed applications.

simpleoutput.png

We should also see the output file from the script in our Blob storage account.  As with other output datasets in Data Factory, it's important to note than any output files that are created are actually created directly from the script and not the Data Factory output dataset.  The dataset in Data Factory is just a reference to the actual output.

Example with Arguments and Partitioning

Now lets look at a more complex example that uses arguments in the PySpark script (pysparkArgs.py) to process the same HVAC file for only a specific date and then partition the output file based on that specific date.

pysparkargs.png

Notice the 2 spots in the code that utilize arguments that are passed in at execution time.  The first spot is used to filter a DataFrame for a specific date, and the second spot is used in the folder name of the output file that is written.

After we've gone ahead and deployed this script file to the same scripts folder used in the previous example, we can look at the Data Factory objects for this data flow.  The Data Factory objects will consist of the following from the code files:

  • ls_azstore_storageaccount --Linked Service for the storage account where the cluster, files are located
  • ls_hdi_cluster -- Linked Service for the HDI Spark Cluster
  • ds_hvacsource -- Dataset used as the source for the Pyspark script
  • ds_sparkargsoutput -- Dataset used as the output for the Pyspark script
  • pipe_sparkargs -- Pipeline that executes the pysparkArgs.py file

Only the output dataset and pipeline files have changed, so let's take a look at them.

argsoutputdataset.png

The output dataset is set up similarly to the simple example, except that it is expecting a directory structure that is partitioned by the date (yyyy-MM-dd) of the SliceStart.  This is not an unusual pattern for data lakes and distributed data platforms as it typically allows for speedier data access and query times against larger datasets that use the partitioned field as a filter.

Now let's look at the pipeline file (pipe_sparkargs).

argspipe.png

Notice we've added an additional argument for... well... "arguments".  This is where any arguments that the PySpark script is expecting are defined.  If you remember, the PySpark script is expecting a date value to filter the DataFrame and partition the output.  Our pipeline is using Data Factory's built in Text.Format function along with the built in SliceStart variable (more details on Data Factory's built in functions and variables here) to provide the start date of the slice that is executing as the argument to the pysparkArgs.py script.

This pipeline also has the "getDebugInfo" argument set to "Always".  This will generate a log file in our scripts folder to help debug any issues we might encounter when the script runs. 

Finally, take note of the "start" and "end" arguments for the pipeline.  As our slice dates will be used in our script to filter data, these arguments have been set to correlate to data (dates) that are in the HVAC source file.

After deploying the Data Factory objects, let's take a look at the results.  First, we should be able to see the slices either staged or completed in the Monitor and Manage app for Data Factory.  Remember that the slices are for dates that are pretty far in the past, so you will likely need to changed the date range of slices you are filtering for.

adfargs.png

As we did previously, we should also be able to go to the Spark History Server and find a completed application run for each of the slice runs (App Name will be SparkonADF - with Args this time).

argsparkhistory.png

Remember that in the pipeline we also set the "getDebugInfo" argument to "Always".  There is now a log folder where our scripts were deployed which will provide more info for debugging each of the individual runs.

arglogs.png

Lets see if our process really filtered by each individual date and partitioned the output.  First we'll browse to the output specified in pysparkArgs.py. 

argpartitioned.png

As you can see, we have a sub-folder for each date that a slice ran.  Finally, let's open one of the folders and see what the contents of the file(s) are.  In this case, the file in the Date=2013-06-13 folder was opened, and based on the values in the Date column, it appears we have the results we expected.

argsoutput.png

Drops Mic

Hopefully these examples let you see how Data Factory can be used to automate both simple and complex Apache Spark scripts on HDInsight.  When used together, these services can make the management of and visibility into your Spark applications much more efficient.  Check back soon for a follow-up post where we'll look at how we can create on-demand Spark clusters to use with these pipelines.