Skip to content

All about reducers count

Alexandre Mazari edited this page Aug 7, 2015 · 2 revisions

Finding the right number of reducers for a flow, or part of it, is a very important and tedious step toward optimal performances and cluster resources usage. This page details the behaviour of and the facilities provided by Scalding to help in that process.

By default, Scalding reads the "mapred.reduce.tasks" JobConf's value as the number of reducers used throughout the whole flow. If not present, it defaults to the value of 1.

Job developer may specify the number of reducers for each aggregation results using the withReducer method on *Grouped instances.

Reducers number estimation

Scalding can optionally be instrumented to estimate the ideal number of reducers at each step of the flow using the facilities located in the reducer_estimation package. Estimation logic is provided by ReducerEstimator implementations and injected thought the Cascading's FlowStepStrategy mechanism.

Instructing Scalding to apply a reducers estimation policy

The following command-line argument instructs Scalding to use one (or several) ReducerEstimator:

-Dscalding.reducer.estimator.classes="Comma-separated list of fully qualified estimator classes" Defaults to ""

A programmatically approach is also provided:

class MyJob(args: Args) extends Job(args) {
  ...
  scaldingConfig.addReducerEstimator(...) 
  ...
}

Whether estimators computed value takes precedence over programmatically defined reducers number may be stated with

-Dscalding.reducer.estimator.override=boolean Defaults to false

Selecting the reducers estimation policy

Scalding comes with two built-in ReducersEstimators implementations:

  • InputSizeReducerEstimator computes the number of reducers based on input size and a fixed "bytesPerReducer" value. It can be set by passing -Dscalding.reducer.estimator.bytes.per.reducer=12345 Defaults to 8GB
  • RatioBasedEstimator computes the average ratio of mapper bytes to reducer bytes of previous steps and uses that to scale the estimate produced by InputSizeReducerEstimator. RatioBasedEstimator optionally ignores history items whose input size is drastically different than the current job. The -Dscalding.reducer.estimator.input.ratio.threshold=float parameter specifies the lower bound on allowable input size ratio. Defaults to 0.10 (10%), which sets the upper bound to 10x. The number of historical steps used for the computation can be set with -Dscalding.reducer.estimator.max.history=int. Defaults to 1.

Custom ReducerEstimator implementations may also be provided.

Monitoring and debugging

For each FlowStep, both the originally-present-in-JobConf and estimator-computed values are consultable in Hadoop's Job tracker through the following keys:

/** Output param: what the Reducer Estimator recommended, regardless of if it was used. */
"scalding.reducer.estimator.result"
/** Output param: what the original job config was. */
"scalding.reducer.estimator.original.mapred.reduce.tasks"

Contents

Getting help

Documentation

Matrix API

Third Party Modules

Videos

How-tos

Tutorials

Articles

Other

Clone this wiki locally