Integrating Apache Spark into your existing Hadoop system – Part I

June 22, 2016

As evidenced by our previous blog post, Statistics is Eating the World, data is at the very center of Wealthfront’s values. At Wealthfront, fields ranging from research, analytics to marketing, client services, human resources and even employee productivity all heavily rely on data in their decision makings. Such a variety of data sources and requirements need a sophisticated and robust platform to handle them. This previous blog post offers an excellent overview of the data technology stack used by Wealthfront. In this two-part blog series, I will discuss about how we integrated Apache Spark into our existing platform.

Wealthfront embarked on the adoption of Apache Spark beginning of this year, a few months into the process, we now have nearly 15% of our production data processing jobs running on Spark, which has also become our default choice of framework to write new data computation jobs. In this part, I will briefly discuss some integration work we performed to interface with our existing data infrastructures, and in the next blog, I will discuss how we deploy Spark jobs to run in cloud and tune its performance against both new and existing jobs.

Background

Apache Spark, the new poster child of Big Data, is quickly becoming the de-facto big data processing framework. Compared to Hadoop MapReduce, Spark offers several advantages, like faster processing for iterative jobs, simpler APIs and the potential of streaming and real-time processing. On the other hand, it is also notoriously hard to integrate Spark with production-grade data pipelines effectively. A widely used metaphor is that to build Spark is like to build a fighter jet. Once you build it, the good thing is you now have a fighter jet, but you still need to learn to fly it. This gap in theory and practice, in our experiences, is mainly reflected in below two areas:

  1. While Spark in theory can be 10x to 100x faster than Hadoop MapReduce, it is not a panacea to take over all your data processing jobs. With traditional data batch processing jobs which typically involve huge volumes of raw data (>100s of GBs) and a non-iterative linear flow, Spark does not really offer clear performance advantages. In many cases you may even need to throw a much larger amount of hardwares at it for it to simply stay afloat. This means you will most likely want to keep your existing Hadoop system in parallel with Spark to cater for different kinds of use cases, which in turn translates to more integration and maintenance work.
  2. If we put the integration to existing system aside, setting up a Spark cluster is easy, almost deceptively so. However, as the fighter jet metaphor goes, there are hundreds, if not thousands of levers to pull once you start running Spark jobs. This means to optimize Spark performance, you have to understand intimately how it works under the hood and even so it can often be frustratingly hard to find the correct knob to turn. This constant worry about underlying implementation could become a drain on productivity.

To understand how Wealthfront overcame some of the issues described here, let’s look at the current computation stack we are using. Before Spark was introduced, we used Apache Crunch to carry out most of the data heavy-lifting. Crunch is a high level framework for writing Hadoop MapReduce jobs. It is particularly suited to development in Java with its compile-time type checking collections and ease of handling POJOs (Plain Old Java Objects). Furthermore, our data records are saved in Avro format, a common serialization framework with MapReduce systems. Hence most of our infrastructure was designed to work with this Hadoop ecosystem. It would be best if Spark could be embedded in this ecosystem to minimize both system and productivity friction.

Now, Crunch has the ability to run on top of both Hadoop MapReduce and Spark, so any of our existing Crunch jobs could already be deployed on a Spark cluster with minimal code modification. Nevertheless, we still looked to add the capabilities of writing native Spark jobs in Scala code in our platform for 2 main reasons:

  1. Test runs showed the same logic implemented in native Spark code has a better performance over Crunch jobs running on Spark.
  2. We are looking at introducing Spark as an interactive analytics platform in addition to our current systems of SQL and R analytics. Comparatively, Scala offers a lower entry barrier to non-engineers. (Crunch has its own Scala wrapper – Scrunch, but it is experimental at this stage and has very scarce documentations)

Now let’s look at two aspects of how we integrated Spark into our data infrastructures, namely how we adapted Spark to read/write our Avro data records in a similar way as Crunch is doing, and how we blend the execution of two types of jobs together seamlessly.

I/O Integration

To integrate native Spark code into our existing data platform with all the existing data files, I/O interface and all the helper utilities, we”hijacked” the I/O components of Apache Crunch and insert Spark in the middle. A core part of the input code is below:

Here we take advantage of the Source class provided by Apache Crunch. It is a wrapper class around the input file paths and input configurations. The input DAG visualization can be seen below

To get a little bit under the hood, let’s look at how this integration works.

When source.configureSource() is called, the input configurations and input paths are concatenated into one long string with appropriate delimiters and the entire string is then recorded in the Spark job configuration. This allows all nodes to receive a copy of the information and enables the parallel reading of the source data.

The key part of the input process is the newAPIHadoopRDD() method, which Spark uses to read HDFS data with the new mapreduce API. It is standard practice to store the entire Avro record in the Hadoop key alone while leaving the value part as null. This RDD construction requires an org.apache.hadoop.mapreduce.InputFormat object. In our case we piggybacked the CrunchInputFormat class. It is perfectly feasible to implement your own flavor of InputFormat. Two functions need to be overridden:

  • List<InputSplit> getSplits(JobContext context), which recovers the output paths info from the job configuration, and then provide a way to split the input data (our usage of CrunchInputFormat internally simply initializes a wrapper around org.apache.hadoop.mapreduce.lib.input.CombineFileInputFormat and uses its getSplits(…) method)
  • RecordReader<K, V> createRecordReader(InputSplit inputSplit, TaskAttemptContext context), which reads in a split and reads in the Hadoop key value pairs. In the case of Avro files, a simple implementation will extend org.apache.hadoop.mapreduce.RecordReader, and utilize a DatumReader to read all Avro records into keys, and leave values as null

 

The last, but often forgotten part of the input process is to map and deep copy each record. Since Hadoop reuses Writables, if a deep copy is not performed, the RDD produced at the end will have weird behavior when cached. If you find yourself riddled with an input RDD with all records identical to each other, this is most likely the step that was missed.

The output integration is very similar, with the saveAsNewAPIHadoopFile(…) method sitting at the middle of the action. A subclass implementation of org.apache.hadoop.mapreduce.OutputFormat is required. In our case we used the AvroOutputFormat provided by Crunch, which is an implementation of org.apache.hadoop.mapreduce.lib.output.FileOutputFormat that reads in output paths from job configuration and use a org.apache.avro.file.DataFileWriter to write the files into the output paths.

Execution Control

Our existing execution control relies on the use of Reflections to fetch all children of a BaseCrunchExecutor class (which is itself a child of  BaseExecutor). In addition, we added an annotation on every job like below.

@RunIn(batches = {...}, runAfter = {...}, runWith = {...})

 This annotation determines the batch in which each job runs as well as any associated run time parameters. We use inheritance to make native Spark jobs extending a different abstract class BaseNativeSparkExecutor which inherits from the same BaseExecutor class. The class hierarchy is as below

This way, the run time logic can still fetch all children of BaseExecutor  and branch off based on whether the job class inherits BaseCrunchExecutor/BaseNativeSparkExecutor to kick off the Crunch/Spark engines. We also add a new element (runWith in the code example above) into the annotation definition to override the control based on parent class, so that it is possible to run a Crunch job written in Java on Spark framework. The flow control diagram is as below

Conclusion

The above integrations are two of the key parts we assembled to attain the capability of writing native Spark while maintaining our interaction with the underlying data infrastructures. This allows the Spark jobs to work smoothly with all the Crunch jobs. Probably more importantly, both types of jobs give a very similar feel to the end developer. The steps in setting up the IO are almost identical in both frameworks, and from an execution/scheduling perspective, the end user does not even need to care which job is of which type. Both types of jobs can run in the same batch and depend on each other. Such integration allows developers to focus solely on the computation logic of the job, which can often be overlooked in the adoption of new frameworks