Skip to content
Marc Claesen edited this page Oct 8, 2013 · 28 revisions

The MultistagePipe class defines a sequence of elementary data processing steps. Each elementary step is implemented in a BasicBlock. The MultistagePipe plays a crucial role to implement aggregation schemes in EnsembleSVM. For a list of currently offered aggregation schemes, please click here.

We will first show some example sequences to give an idea of the concept. Click here for guidelines in implementing new MultistagePipe functionality.

Some basic examples of multistage pipelines

Some example pipelines.

Some example workflows using the BasicBlocks included in EnsembleSVM. From left to right: (i) weighted majority voting, (ii) preprocessing by linear scaling followed by the average of base model probabilities obtained via Platt scaling and (iii) a linear logistic regression model to aggregate base model predictions.

Example implementation of a multistage pipeline

Here we will guide you through the code involved in building a multistage pipeline. In this example we construct a pipeline for aggregation through (weighted) majority voting. This pipeline accepts base model predictions as input in a std::vector<double> and involves the following steps to compute its output double:

  1. Threshold base predictions based on their sign: f(x) = x > 0 ? 1 : 0.

  2. Scale thresholded values to perform weighted voting.

  3. Average the votes, with correct denominator in case non-uniform weights were used.

The code associated with this multistage pipeline takes the following form:

MULTISTAGEPIPELINE(MajorityVote,double,std::vector<double>)

This macro defines the MajorityVote class and takes care of the necessary functions. Classes created with the MULTISTAGEPIPELINE(Foo,result_type,argument_type) macro have a private constructor, accepting a std::unique_ptr<MultistagePipe<result_type(argument_type)>> and a friend class Factory<Foo>.

This factory allows the user to build the internal pipeline as desired. In the next step, this factory must be defined and implemented for the newly defined class.

template <>
struct Factory<MajorityVote>{
    MULTISTAGEPIPELINE_FACTORY_TYPEDEFS(MajorityVote)
    std::unique_ptr<MajorityVote>
    operator()(std::vector<double>&& coeffs, double threshold=0.0) const;
    static std::unique_ptr<MultistagePipe<Res(Arg)>> deserialize(std::istream& is);
};

The Factory to construct and deserialize a MajorityVote object. The macro inside the factory declaration provides some convenient typedefs. The factory's functions can be implemented in a few straightforward lines of code based on existing basic blocks:

typedef std::vector<double> Vector;
std::unique_ptr<MajorityVote>
Factory<MajorityVote>::operator()(Vector&& coeffs, double threshold=0.0) const{
	double divisor = std::accumulate(coeffs.begin(),coeffs.end(),0.0);
	Factory<Threshold<Vector(Vector)>> fact_thresh;
	auto thresh = fact_thresh(threshold,1.0,0.0,coeffs.size());
	Factory<Scale<Vector(Vector)>> fact_scaled;
	auto scaled = fact_scaled(std::move(thresh),std::move(coeffs));
	Factory<Average<double(Vector)>> fact_avg;
	auto avg = fact_avg(std::move(scaled),divisor);
	return std::unique_ptr<MajorityVote>(new MajorityVote(std::move(avg)));
}
std::unique_ptr<MultistagePipe<Res(Arg)>> 
Factory<MajorityVote>::deserialize(std::istream& is){
	auto thresh = Factory<Threshold<Vector(Vector)>>::deserialize(is);
	auto scaled = Factory<Scale<Vector(Vector)>>::deserialize(is,std::move(thresh));
	auto avg = Factory<Average<double(Vector)>>::deserialize(is,std::move(scaled));
	return std::unique_ptr<MultistagePipe<Res(Arg)>>(new MajorityVote(std::move(avg)));
}

The code shows that operator() is very straightforward to implement, using the appropriate BasicBlock factories to take care of pipeline concatenation and parametrize every block. Finally, the deserialize function defines the order in which the blocks must be deserialized. Due to C++'s static typing, this must be defined explicitly.

At some point after declaring the Factory, a macro call is required to finalize the implementation of the new MultistagePipe:

MULTISTAGEPIPELINE_POST_FACTORY(MajorityVote)

This macro call simply connects MajorityVote::deserialize with Factory<MajorityVote>::deserialize, which is an internal requirement for deserialization. This can be placed in a header or an implementation file at your choosing. Currently, we have placed these in src/pipelines.cpp.

Finally, we must register the new class' deserialization procedure so we can deserialize it automatically. It would have been possible to do this implicitly through global objects inside the MULTISTAGEPIPELINE_POST_FACTORY macro, but to keep the structure clear we do it explicitly. All the pipes included in EnsembleSVM get registered in include/Registration.hpp, specifically in the registerMultistagePipes function. This header is implicitly included once in every executable through include/Executable.hpp.

What actually happens in MultistagePipes

The factories and C++11's auto keyword are a big help when concatenating basic blocks. To see this, consider that the types of thresh, scaled and avg are as follows:

decltype(thresh) ==
std::unique_ptr<Threshold<std::vector<double>(std::vector<double>)>>

decltype(scaled) ==
std::unique_ptr<Scale<std::vector<double>(std::vector<double>),
    Threshold<std::vector<double>(std::vector<double>)>>>

decltype(avg) ==
std::unique_ptr<Average<double(std::vector<double>),
    Scale<std::vector<double>(std::vector<double>),
        Threshold<std::vector<double>(std::vector<double>)>>>>