A pipeline is a common architectural style for sequential
processing of data based on the pipes and filters, interceptor or intercepting
filter patterns. This implementation is based on the intercepting filter
pattern.
A pipeline can be thought of as water flowing through a
pipe. The pipe connects a series of stages where processing occurs. Because all
the stages use the same interface they can be composed into different solutions
by connecting the stages to different pipes. This allows for adding, omitting
or rearranging stages without having to change the individual stages
themselves.
The problem this is trying to solve is how do implement a
configurable series of steps for processing data. Typically this logic is
composed of hard coding the sequence and many if or case statements to
implement the logic. By breaking down the pieces of the sequence simpler
classes can be designed that will be ignorant of the other processing steps.
This allows developers to focus on the small parts of the solution and not get
overwhelmed by the entire problem. Another advantage of smaller component parts
greater reuse and a pluggable architecture.
A pipeline is for simple linear processing and is not
orchestration. An orchestration provides conditional branching where a pipeline
infers a linear flow from stage to stage. A typical scenario for a pipeline
would be in order processing. This may include checking inventory, validating
credit card information, applying sales tax, calculating shipping charges, etc
prior to saving the final order. Another possible use can be in decompressing
and decrypting a stream before processing. This is also the pattern used for applying IHttpModules
to an ASP.Net request.
Structure
The following is the structure of the pipeline.

Sequence

PipelineManager
The PipelineManager manages filter processing. It contains
the FilterChain and initiates processing. It also calls the processor that will
complete the processing.
FilterChain
The FilterChain is an ordered collection of independent
filters.
FilterOne, FilterTwo, FilterThree
These are the individual filters. The FilterChain
coordinates their processing.
CoreProcessor
The CoreProcessor processes the result of the filters.
Pipeline Class Design
The pipeline is comprised of four main classes,
PipelineManager, FilterChain, IFilter and ICoreProcessor which are elaborated
on later. The pipeline accepts the data in as a reference parameter and returns
a Boolean indicating success or failure in the processing steps. During the
processing the data can be modified or completely replaced with new data. The
second parameter can be used to indicate that processing should stop on
failure, a step returning false.
The intercepting filter pattern uses a controller to call
each stage in the pipeline as opposed to the other pattern implementations
which chains the stages into a linked list where each stage calls the next. In
my opinion this is more advantageous since it prevents the issue of a developer
forgetting to call the next stage in their filter code.
The classes for the Pipeline are as shown.

The IFilter interfaces provide the processing stages for
each pipeline. The ref data parameter allows the data to be completely replaced
during the processing. The Process method returns a Boolean value to indicate
success or failure with the step. This can be useful in determine if processing
needs to be terminated due to a failed step. It also allows the pipeline to be
used as a non-classic Chain of Responsibility.
public interface IFilter<T>
{
bool Process(ref T
data);
}
The FilterChain is an extension of the List class. It is
responsible for executing the individual filters in the pipeline. An optional
parameter can be used to indicate if it should stop on any filter returning a
false (failure).
public class FilterChain<T> : List<IFilter<T>>
{
internal bool Process(ref T data, bool
stopOnFailure)
{
bool success = true;
foreach (IFilter<T>
processor in this)
{
if (!processor.Process(ref
data))
false;
if (stopOnFailure && !success)
break;
}
return success;
}
internal bool Process(ref T data)
{
return Process(ref data, false);
}
}
The PipelineManager contains the filter chain and the final
processor of the data. This is the class that should be used by the client when
using the pipeline. On a design note, the final processor was implemented as an
interface rather than a delegate to be more explicit and preserve the OO
design. This can be implemented with a delegate with minimal changes to the
code. The processor can also be null if no further processing is required.
public class PipelineManager<T>
{
private FilterChain<T>
filters;
private ICoreProcessor<T>
processor;
public PipelineManager()
{
filters = new FilterChain<T>();
}
public PipelineManager(FilterChain<T>
filters)
{
this.filters = filters;
}
public FilterChain<T>
Filters
{
get { return filters; }
}
public ICoreProcessor<T>
Processor
{
get { return processor;
}
set { processor = value;
}
}
public bool ProcessFilter(ref T data, bool
stopOnFailure)
{
bool success = filters.Process(ref
data, stopOnFailure);
if (!stopOnFailure && success &&
processor != null)
processor.Execute(data);
return success;
}
public bool ProcessFilter(ref T data)
{
return ProcessFilter(ref
data, false);
}
}
The ICoreProcessor interface provides the final target for
the data. It is used by the FilterManager.
public interface ICoreProcessor<T>
{
void Execute(T data);
}
Creating Pipelines from a Configuration File
To increase the usefulness of the pipelines they should be
allowed to be specified in a configuration file. This will allow each stage to
become a pluggable part of the pipeline. Several classes were added to allow
the specification within a configuration. The configuration classes and the
pipeline factory are as follows.

The configuration section uses the add/remove/clear
semantics for the list of processing stages in a pipeline. The type of pipeline
created from a particular section must match the type of stages specified in
the configuration. This results in a configuration section that has the
following structure.
<filterPipeline>
<handlers>
<add type=" PipelineTest.StringFilter,
PipelineTest" />
</handlers>
</filterPipeline>
One of the issues in loading from a configuration file was
determining the actual type a pipeline can handle. Generics require the classes
for each type be created at compile time. This issue was resolved by using a
generic method in a factory which forces the pipeline user to determine the
types that will be handled. The factory contains one method for creating the
pipeline from the configuration.
Using a configuration file for the stage determination
allows for a very flexible processing framework. This allows for completely
reconfiguring the pipeline without modifying the code. Since the name of the
configuration section for each pipeline is specified in the client code,
multiple pipelines can be created and used within a single application.
This pattern can be used with other patterns to create a
powerful and flexible framework. Download the sample code, written in Visual Studio 2005, and look at this
simple yet powerful pattern.
Download the Visual Studio 2005 Solution that accompanies this article