Saturday, May 4, 2013

Rafting With the Elephant: Cascading and Configuration Of Steps

At work we've been having a time trying to efficiently leverage our Hadoop cluster. One major issue with our project is that we must ensure that no data duplication occur at any point in our processing. Such checks necessitate the use of a reducers.

The problem with reducers is that they really shouldn't process more than 10 GB [1]. In our case the system was schleping about 100 GB of data from  the various nodes to the single reducer since 1 reduce is Cascading's default setting. This made the processing arduous. However, there are parts where we need all of the data to go to just one reducer (statistical calculations). As a result we are unable to set at a macro level the job config to, say, 4 reducers.

This issue is compounded by the fact that we are currently using 2.0.8. This version has a bug in it where the Pipe.getStepConfigDef does nothing even on GroupBy or, in our case, the GroupBy within the Unique. Chris Wensel of Cascading Fame has acknowledged the bug [2]. Frankly, I can't get too mad at this for two reasons: 1) Cascading has to be hands down the best open source project I've ever used and 2) we're on an unsupported version of the framework so Chris' "too bad for you [3]" statement makes sense and they're working on it.

But bugs aside, I have a real problem; you might have the same problem. How does one, like me fix, this (especially when the flow is taking 16 hrs our a 24 hr window that resets every 16 hrs)? The answer lies in the magic of [FlowStepStrategy]. This interface allows us developers the opportunity of setting configurations before steps are sent out the door to the cluster. In our project we've been using it all along, but improperly. As a result it took me awhile to figure out how to leverage this great feature.

Basically the key to the trick I'm about to write is the argument param in the apply method, flowStep. This param has the method containsPipeNamed. By using this and the various dot files produced by the flow, you can figure out which step your pipes are firing in. Armed with this information you can implement this class looking for pipes with certain names. When you hit a pipe (preferably a a GroupBy named pipe), you can customize the job configurations. In our case we look for a GroupBy pipe with a certain name. When hit, we add "mapred.reduce.tasks" set to 4. This dramatically decreases the amount of time spent on reducing, but allows us to maintain the default of 1 reducer down the line. Or course you could add other configurations here as well.

Once you've implemented that interface you just add your special class to the flow. Then POW! things move faster.

Hopefully this will help others facing the same issue. I hope that the problem is solved in the future.