Hadoop Streaming: Whole Files

Over the past few weeks, I've taken a long overdue dive into Hadoop to see if it could improve a risk calculation and reporting problem.

Current system

The existing system is a fairly traditional Unix-based batch system for a bank. A number of files are prepared for running through a C++ calculation engine, these files are scheduled and run on a single multi-processor box, and the output is loaded into an RDBMS for reporting. Nothing too fancy (well, the calculations are indeed fancy, but for this article, consider the calculation engine as a black-box).

And it has all the normal performance bottlenecks a traditional design has: preparing the files for the engine uses an RDBMS-backed ETL data transformation process that doesn't scale, a calculation engine that works on a collection of records in a file rather than a stream of records, and finally loading the results of the calculations into a reporting RDBMS for aggregation and ad-hoc analysis. In the specific case I was looking at, the calculation engine wasn't the real bottleneck in this pipeline: in fact, the data transformation and load of the reporting database absolutely dwarfed the computational phase by a factor of more than 20X. Yet another example of how expensive data movement is, and why one should minimize it when designing a system.

Could a Big Data1 approach improve this?

As more and more banks are exploring data lakes2, one very popular platform they are experimenting with is Hadoop's HDFS. It promises cheap, highly available storage, with the additional ability to run computations on the data stored within the platform (i.e., bring the processing to the data, rather than the data to the processing).

That last point is key: as noted in the description of the current system, the largest problem by far is moving data around. By refactoring this system in a MapReduce model, data does not need to be shovelled around between machines and processes. Assuming there are no huge penalties for using the framework, I'd expect there to be a performance improvement in the overall pipeline.

The design

I won't be discussing the first part of the problem (data transformation), as that is a fairly typical Hadoop Map implementation (read a record at a time, process, then output). However, invoking the calculation engine and processing the output of that is where it gets interesting.

Computation process

Given that the input was already prepared for the computation engine (approximately 2,500 computable elements in each file), the first thing to do was copy these files into directory on HDFS.

cd input-data  
hdfs dfs -mkdir input  
hdfs dfs -put *.csv input/  

Conveniently, the files were named as a sequence, file-xxxxx.csv, where xxxxx went from 00001 through 03000.

Yarn distributed shell

The first attempt I made was to use the Yarn distributed shell jar. I wrote a bash script that wrapped around the computation engine, used $CONTAINER_ID to identify which file it was operating on, and it worked. Sort of...

There were a few problems:

  • For some reason, the Application Master process died with our old buddy, a Java GC failure, when the number of containers in one job was greater than 1,000. Which meant I had to get clever with the wrapper to "chunk" the 2,900 files into 3 batches. Barf.
  • There was no data affinity - containers were run based on CPU/memory availability, and the script needed to copy the input file to the local drive, without controlling for locality (i.e., breaking the "bring the processing to the data" goal).
  • Shared input data was a pain to move around, so I ended up putting that on HDFS as well, which involved another copy down to the local filesystem.

Here's how I invoked the code:

yarn org.apache.hadoop.yarn.applications.distributedshell.Client \  
    -jar $YARN_PREFIX/hadoop-yarn-applications-distributedshell.jar \
    -num_containers $num_containers \
    -container_memory $container_mem \
    -timeout  $timeout_ms \
    -shell_command "$batch_cmd input $offset"

I won't bother showing how some of the parameters are computed - I'll leave that to your imagination (and, as you have guessed, although I got it working, I abandoned this technique due to it missing the boat on data affinity).

That said, it did perform quite well. Even without the data affinity, it was still a win.

Java Map program

The next approach I took was to see if I could use the "traditional" MapReduce approach and write a Java class that would take advantage of the Context object to determine the input file name (when passed input as the HDFS path). Lots of nice benefits taking this approach, such as not needing to worry about a file naming convention, and it will provide data affinity. However, I did have to worry about Hadoop splitting the files into smaller pieces (which is solvable - seen the next section), but more importantly, I basically had to replicate most of the functionality of hadoop-yarn-applications-distributedshell.jar. And with the usual Java verbosity, it ended up being hundreds of lines of code before I abandoned it (I had a basic version working, but it was brittle and awful).

I'm not even going to bother showing any code for that mess.

Hadoop Streaming

Now, this approach got my motor revving. It ended up fitting the model of what I wanted to achieve almost out of the box. The only bit I needed to do was to force Hadoop to not split the file, and make sure that I didn't use the default TextInputFormat object as a file reader - instead, I used KeyValueTextInputFormat as the (non-intuitively named) file reader3.

In the end, the "clever bit" is that a Map job will be run once for each file, with data affinity, and the file will be available on stdin for the script. A simple cat - > $filename before launching the computation program is all that's needed.

So, first things first, I needed to change the isSplitable() method on KeyValueTextInputFormat to return false. So this itty-bitty jar file was created as follows:

package com.example.hadoop;

import org.apache.hadoop.fs.*;  
import org.apache.hadoop.mapred.KeyValueTextInputFormat;

// output each line in a file verbatim, do not split file
public class NSTextInputFormat extends KeyValueTextInputFormat {  
    @Override
    protected boolean isSplitable(FileSystem fs, Path file) {
        return false;
    }
}
mkdir build  
javac -classpath `hadoop classpath` -d build NSTextInputFormat.java  
jar cf hadoop-wholefile.jar -C build .  
rm -r build  

I next created a tar file with the static data assets needed by each process. These would be copied to Hadoop's distributed cache, and made available to each job (as symbolic links to the extracted contents of the tar file).

cd static-data  
tar zcf ../static-data.tgz *  

Then, wrote the wrapper shell script:

#!/bin/bash

# create a local copy of the input file (the magic)
# HDFS path to file is in environment variable $map_input_file
filename=`basename $map_input_file`  
cat - > $filename

# run the computation engine
compute-risk -i $filename -s static-data -o output.csv 2> stderr.txt

# save the output and errors
test -s && hdfs dfs -put output.csv output/$filename  
test -s && hdfs dfs -put stderr.txt log/$filename  

And that's it! Pretty simple, no? Well, except for the compute-risk black-box of course...

To launch the job, it's just a matter of wrangling the options to present to hadoop:

#!/bin/bash

# remove status, output, and log dirs
hdfs dfs -rm -r -skipTrash status output log

# create output and log directories
hdfs dfs -mkdir -p output log

# mapreduce parameters
# http://hadoop.apache.org/docs/current/hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml
#
# run the job
hadoop jar $HADOOP_PREFIX/hadoop-streaming.jar \  
    -libjars hadoop-wholefile.jar \
    -archives static-data.tgz#static-data \
    -files compute-risk.sh \
    -D mapreduce.job.name="Streaming Risk Job" \
    -D mapreduce.job.reduces=0 \
    -D mapreduce.map.memory.mb=4096 \
    -inputformat com.example.hadoop.NSTextInputFormat \
    -outputformat org.apache.hadoop.mapred.lib.NullOutputFormat \
    -cmdenv passed_variable=$some_env_variable \
    -mapper compute-risk.sh \
    -input input \
    -output status

Couple of notes:

  • -libjars and -archives copy and expand the archive files to the distributed cache, and create a symbolic link of the expanded archive directory to the current working directory of the map job. The nice bit about the distributed cache is that it's eventually cleaned up by Hadoop after it's not needed.
  • -files does the same, but for individual files.
  • -D mapreduce.job.reduces=0 tells Hadoop there is no reduce phase - just run mappers, and exit.
  • -inputformat tells it to use our custom input file reader class that we provide in hadoop-wholefile.jar
  • -outputformat specifies in this case that there is no stdout from the mapper to capture, so just throw it away (although it will still need an -output directory)
  • -cmdenv in this example is just there to show that you can pass a series of environment variables to each mapper task.

Of course, there is no need for this to be a shell script - it can be any program that reads stdin and writes to stdout. But for this specific case, I needed to run compute-risk with a complete input file, and this model can be used for pretty much any program that doesn't fit the Hadoop record-based approach.

Reporting process

There's not much to talk about here at the moment. With the data landed in HDFS, and in CSV format, combined with wanting to use a BI tool that can connect through ODBC, Hive (with Tez) was the natural choice. And it worked wonderfully... on the test cluster (96 cores), and with 3.5B rows, it is able to run a fairly complex aggregation in under 1 minute. I ❤️ Hive so much... Take that, old-school RDBMS engines!

Wrap up

Overall, this was a stunning success for the client. By removing the database load/extract processes, and running processing locally, the end-to-end job went from over 7 hours to just over 20 minutes... Sure, it's using more computing power, but that's exactly the point: it is able to scale horizontally, where the existing solution cannot. Plus there's no wasted time copying data all over the place. And no more (very expensive) software license fees for the RDBMS.

From the end user's perspective, all that has changed is now the batch run takes a the length of a coffee break, rather than a full work day. The analysis tool (their interface) has not changed (it is just sending SQL to Hive instead of a traditional RDBMS).

Now... not to get all fanboy on Hadoop/Hive, there are a few things to keep in mind. This setup would be a terrible idea as the underlying technology for an OLTP system, and there are no referential integrity checks on the data, which means much more care is required for implementing an OLAP system. But, depending on the use case, I'm certainly going to be considering using this technique more often...


  1. Yes, the term "Big Data" annoys me to no end. Buzzwords. I don't want to leverage their synergy.

  2. I don't actually have a problem with the term "data lake". It's a colourful and meaningful analogy, rather than a buzzword that sounds like a 5 year old child created it. Er... kinda like Google or Hadoop... Well, at least they are proper nouns, right? 😎

  3. I am definitely leaving it as an exercise for the reader to figure out the differences between the input format readers - it's character building.