Producer/Consumer Queue and BlockingCollection in C# 4.0

By Peter Bromberg

Shows how to use the new BlockingCollection class in C# 4.0

A producer/consumer queue is a common requirement in threading. A queue is set up to describe work items or data on which work is performed. When a task needs to be executed, it is enqueued, allowing the caller to perform other actions. One or more worker threads work in the background, dequeueing and executing queued items.

One advantage of this model is that you have precise control over how many worker threads execute at once. This allows you to limit consumption of CPU time and other resources. If the tasks perform intensive disk I/O you might have just one worker thread to avoid starving the operating system. Another type of application may have 100 worker threads. You can also dynamically add and remove workers throughout the queue’s life. The CLR ThreadPool itself is a kind of producer/consumer queue.

A producer/consumer queue typically holds items of data on which the same task is performed. By making the item a delegate, you can write a very general-purpose producer/consumer queue where each item can do anything.

.Net Framework 4.0 provides a set of new collections in the System.Collections.Concurrent namespace. These are fully thread-safe, and are designed to be the Parallel (PFX) counterparts of their Generic non-parallel equivalents:

ConcurrentStack<T>
ConcurrentQueue<T>
ConcurrentBag<T>
BlockingCollection<T>
ConcurrentDictionary<TKey,TValue>

ConcurrentBag<T>, which has no non-parallel counterpart, is unique in that it stores an unordered collection of objects with duplicates permitted. ConcurrentBag<T> is suitable in situations when you don’t care which element you get when calling Take or TryTake. The benefit of ConcurrentBag<T> over a concurrent Queue or Stack is that the Bag’s Add method has almost no contention when called by many threads at once.


The IProducerConsumerCollection<T>


A producer/consumer collection is one for which the two primary use cases are:
- Adding an element (“producing”)
- Retrieving an element while removing it (“consuming”)

The typical examples would be Stacks and Queues. Producer/consumer collections are significant in parallel programming because they are conducive to efficient lock-free implementations.

The IProducerConsumerCollection<T> interface represents a thread-safe producer/consumer collection. The following classes implement this interface:

ConcurrentStack<T>
ConcurrentQueue<T>
ConcurrentBag<T>


The BlockingCollection<T>


If you call TryTake on any of the producer/consumer collections mentioned and the collection is empty, the method returns false. Sometimes it would be more useful to wait until an element is available.

The designers of PFX encapsulated this functionality into a wrapper class called BlockingCollection<T>. A blocking collection wraps any collection that implements IProducerConsumerCollection<T> and lets you Take an element from the wrapped collection — blocking if no element is available.

A blocking collection also lets you limit the total size of the collection, blocking the producer if that size is exceeded.

To use BlockingCollection<T>:
1. You instantiate the class, optionally specifying the IProducerConsumerCollection<T> to wrap and the maximum size (bound) of the collection.
2.You Call Add (blocking) or TryAdd (non-blocking) to add elements to the underlying collection.
3. Call Take or TryTake to remove (consume) elements from the underlying collection.

If you call the constructor without passing in a collection, the class will automatically instantiate a ConcurrentQueue<T>. The producing and consuming methods let you specify cancellation tokens and timeouts. Add and TryAdd may block if the collection size is bounded; Take and TryTake block while the collection is empty.

Another way to consume elements is to call GetConsumingEnumerable, which I use in the following example. This returns a sequence that yields elements as they become available. You can force the sequence to end by calling CompleteAdding. This method also prevents further elements from being enqueued.

The example code here borrows on the sample code provided in "C# 4.0 in a Nutshell (book review)" by the Albahari brothers from O'Reilly, and uses the example model that I used in this previous article. I've modified the code to permit the passing of a state object which allows the passing of an MSDN forums "short name" allowing for the download of the MSDN forum XML document for each forum based on an HTTP template:

static void Main(string[] args)
{
string[] forums = File.ReadAllLines(Environment.CurrentDirectory + @"\forums.txt");
Task[] tasks = new Task[forums.Length];
ProducerConsumerQueue q = new ProducerConsumerQueue(tasks.Length, forums );
DateTime start = DateTime.Now;
for (int i = 0; i < tasks.Length ; i++ )
{
_state = forums[i];
Task task = q.EnqueueTask(null,_state);
tasks[i] = task;
}
Task.WaitAll(tasks);
DateTime end = DateTime.Now;
TimeSpan elapsed = end - start;
Console.WriteLine("Done in " +elapsed.TotalMilliseconds.ToString( ) +" ms.");

Console.ReadKey();
}

public class WorkItem
{
public readonly TaskCompletionSource<object> TaskSource;
public readonly CancellationToken? CancelToken;
public readonly object State;
public WorkItem(TaskCompletionSource<object> taskSource,
CancellationToken? cancelToken,
object state)
{
TaskSource = taskSource;
//Action = action;
CancelToken = cancelToken;
State = state;
}
}

public class ProducerConsumerQueue : IDisposable
{
private string ForumTemplate = "http://social.msdn.microsoft.com/Forums/en-US/{0}/threads?outputas=xml";
System.Collections.Concurrent.BlockingCollection<WorkItem> _taskQ = new BlockingCollection<WorkItem>();
public ProducerConsumerQueue(int workerCount, string[] states)
{
// Create and start a separate Task for each consumer:
for (int i = 0; i < workerCount; i++)
Task.Factory.StartNew(Consume,states[i],TaskCreationOptions.LongRunning);
}
public void Dispose() { _taskQ.CompleteAdding(); }
public Task EnqueueTask( object state)
{
return EnqueueTask( null,state);
}
public Task EnqueueTask( CancellationToken? cancelToken,object state)
{
var tcs = new TaskCompletionSource<object>();
_taskQ.Add(new WorkItem(tcs, cancelToken,state));

return tcs.Task;
}
void Consume(object state)
{
foreach (WorkItem workItem in _taskQ.GetConsumingEnumerable())
if (workItem.CancelToken.HasValue &&
workItem.CancelToken.Value.IsCancellationRequested)
{
workItem.TaskSource.SetCanceled();
}
else
try
{
DoSomeWork(workItem.State);
workItem.TaskSource.SetResult(workItem.State ); // Indicate completion
}
catch (Exception ex)
{
workItem.TaskSource.SetException(ex);
}
}

void DoSomeWork(object state)
{
string forumShortName = (string)state;
string url = string.Format(ForumTemplate, forumShortName);
WebClient wc = new WebClient();
try
{
wc.DownloadString(url);
}
catch
{
// we probably timed out here so, nada!
}
finally
{
wc.Dispose();
}
Console.WriteLine("retrieved: " + forumShortName);
}
}


In EnqueueTask, we enqueue a work item which encapsulates a target delegate (In this case, null) and a task completion source — which lets us later control the task that we return to the consumer.

In Consume, we first check whether a task has been canceled after dequeuing the work item. If not, we run the DoSomeWork method and then call SetResult on the task completion source to indicate its completion.

We can now wait on task, perform continuations on it, have exceptions propagate to continuations on parent tasks, and so on. In other words, you get the advantages of the Task model while implementing your own scheduler. While the above example does not take advantage of all these features, it provides a decent base to run on that shows how they may be used.

BlockingCollection also provides static methods called AddToAny and TakeFromAny, which let you add or take an element while specifying several blocking collections. The action is then honored by the first collection able to service the request. As in the example from the previous article here, I've used the Task.Factory.StartNew(Consume,states[i],TaskCreationOptions.LongRunning) which indicates to PFX that in parallelizing the operation, to use a separate thread for each Task no matter how many cores we may have available.

Most developers I've worked with write single-threaded blocking code constructs to perform multiple operations that are essentially the same. This is highly inefficient and takes a long time to get a job done, because the paradigm is "WaitUntilIAmDoneBeforeICanDoItAgain", etc.

A few developers have mastered using the ThreadPool and BackgroundWorker classes - certainly a big improvement.
That extends the paradigm to "ICanDoThisOnMultipleThreadsAtTheSameTime".

But if you really want to be able to have your code "sing" on multiple cores with highly efficient multithreaded code constructs, you need to take the time to study and understand PFX. That extends the paradigm to "ICanDoThisOnMultipleThreadsAtTheSameTimeAndUseAllYourCPUCoresToo".

Does your PC or Server have more than one Core? I bet it does. Don't let them sit around with nothing to do on your concurrent programming needs - better to learn how to put them all to work!

You can download the complete Visual Studio 2010 solution here.

Popularity  (7288 Views)
Picture
Biography - Peter Bromberg
Peter Bromberg is a C# MVP, MCP, and .NET expert who has worked in banking, financial and telephony for over 20 years. Pete focuses exclusively on the .NET Platform, and currently develops SOA and other .NET applications for a Fortune 500 clientele. Peter enjoys producing digital photo collage with Maya,playing jazz flute, the beach, and fine wines. You can view Peter's UnBlog and IttyUrl sites. Follow Microsoft MVP
Create New Account
Article Discussion: Producer/Consumer Queue and BlockingCollection in C# 4.0
Peter Bromberg posted at Saturday, May 22, 2010 6:46 PM
reply
Mickael replied to Peter Bromberg at Wednesday, October 06, 2010 2:30 PM
Thank you Peter for this clear explanation.

I have implemented this PCQueue inside an UDP socket listener and it works fine. I have replaced

ThreadPool.QueueUserWorkItem

by this pcq and the stacking performance was greatly enhanced.
Instead of 18 msec for QueueUserWorkItem calling, it is nearly three time faster (between 4 to 6 msec).

Nevertheless, If I receive continuous datagrams on udp socket, I see that the consumers can not take work items from the blockingcollection, I guess it is because the add method has an higher priority compare to GetConsumingEnumerable method.

I will modify the pcq class to add several blockingCollection and instead of using add, will use TryAddToAny( BlockingCollection < T > [] , T)  and will create several task to consume the work items. Hopefully that the TryAddToAny doesn't fill first blockingcollection and then second one but make loadbalancing.
Have you already experienced that?

Thank you
reply
Peter Bromberg replied to Mickael at Wednesday, October 06, 2010 2:30 PM
No, I have not experimented with that but it sounds like you are on the right track. When you are done, perhaps you can write a new article on your findings and we will publish it.
reply
Mickael replied to Peter Bromberg at Wednesday, October 06, 2010 2:30 PM
Hi Peter,
finally I have implemented a load balancing between several blockingcollection. The drawback with this implementation is that the FIFO option si no more implemented nevertheless, it fits my meeds.
The takefromany and addtoany methods doesn't implement loadbalacning, it clears or add to the first blockingcollection and then the second one once this one is full. I have not made lot of tests but the ones I have done demonstrate this behaviour.

Below is my source code. I would be happy if you could add some comments on it if it can be potimized.
One more tiume thank you for your sample.

Mickael
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
  
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
  
namespace OT.Eqpt.WorkShop.LogListen
{
  public class PCQueueAction : IDisposable
  {
    protected class WorkItem
    {
      public readonly TaskCompletionSource<object> TaskSource;
      public readonly Action Action;
      public readonly CancellationToken? CancelToken;
  
      public WorkItem(TaskCompletionSource<object> taskSource, Action action, CancellationToken? cancelToken)
      {
        TaskSource = taskSource;
        Action = action;
        CancelToken = cancelToken;
      }
    }
  
    /// <summary>
    /// Cancel workitem tasks and BlockingCollection methods like Take, Add tryTake...
    /// </summary>
    protected CancellationTokenSource _CancelToken = new CancellationTokenSource();
  
    /// <summary>
    /// BlockingCollection array which store the workitems.
    /// </summary>
    protected BlockingCollection<WorkItem>[] _taskQ;
  
    /// <summary>
    /// Used for inserting a workitem in a BlockingCollection with loadbalancing
    /// </summary>
    protected int _intCurrentTaskQ = 0;
  
    /// <summary>
    /// Instantiate Producer Consumer Queue Object.
    /// <para>Consumer tasks are divided equally on all BlockingCollection. For instance 2 BC and 5 tasks => 3 tasks for first BC and 2 tasks for 2nd BC </para>
    /// </summary>
    /// <param name="pintConsumerCount">Consumer Task quantity splitted by BlockingCollection</param>
    /// <param name="pintBlockingCollCount">BlockingCollection quantity for load balancing</param>
    public PCQueueAction(int pintConsumerCount, int pintBlockingCollCount)
    {
      if ((pintConsumerCount == 0) || (pintBlockingCollCount == 0))
        throw new ArgumentOutOfRangeException("Consumer or BlockingCollection quantity must be > 0");
      if (pintConsumerCount < pintBlockingCollCount)
        throw new ArgumentException("Consumer count must be superior to BlockingCollection count.");
  
      _taskQ = new BlockingCollection<WorkItem>[pintBlockingCollCount];
      for (int intLoop = 0; intLoop < pintBlockingCollCount; intLoop++)
        _taskQ[intLoop] = new BlockingCollection<WorkItem>();
      // Create and start a separate Task for each consumer and attribute equally threads for all BlockingCollections.
      for (int intLoop = 0; intLoop < pintConsumerCount; intLoop++)
        Task.Factory.StartNew(() => Consume(_taskQ[intLoop % _taskQ.Length]), TaskCreationOptions.LongRunning);
    }
  
    public void Dispose()
    {
      _CancelToken.Cancel(); //Cancel the TryTakeAny, TryTake or Take in Consume method if in blocking mode
      for (int intLoop = 0; intLoop < _taskQ.Length; intLoop++)
        _taskQ[intLoop].CompleteAdding();
    }
  
    public Task EnqueueTask(Action action)
    {
      return EnqueueTask(action, _CancelToken.Token);//By default set Canceltoken to the class field CancelToken
    }
  
    public Task EnqueueTask(Action action, CancellationToken? cancelToken)
    {
      try
      {
        var tcs = new TaskCompletionSource<object>();
        //Load balance work items in Blocking Collection array
        _taskQ[_intCurrentTaskQ % _taskQ.Length].Add(new WorkItem(tcs, action, cancelToken));
        _intCurrentTaskQ = (_intCurrentTaskQ == Int32.MaxValue) ? 0 : _intCurrentTaskQ + 1;
  
        return tcs.Task;
      }
      catch (OperationCanceledException)
      {
        return null;
        //Ex thrown by TryTake, Take, TakeAny when CancelationToken is set at Cancel => No action
      }
      catch (Exception)
      {
        throw;
      }
    }
  
    void Consume(BlockingCollection<WorkItem> pBlockColl)
    {
      try
      {
        foreach (WorkItem workItem in pBlockColl.GetConsumingEnumerable())
        {
          if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested)
            workItem.TaskSource.SetCanceled();
          else
            try
            {
              workItem.Action();
              workItem.TaskSource.SetResult(null);   // Indicate completion
            }
            catch (Exception ex)
            {
              workItem.TaskSource.SetException(ex);
            }
        }
      }
      catch (OperationCanceledException)
      {
        //Ex thrown by TryTake, Take, TakeAny when CancelationToken is set at Cancel => No action
      }
      catch (Exception)
      {//TODO add a log entry
        throw;
      }
  
      //WorkItem workItem;
      //while (CheckStillWorkItems())
      //{
      //  try
      //  {
      //    if (BlockingCollection<WorkItem>.TakeFromAny(_taskQ, out workItem, _CancelToken.Token) != -1)
      //    {
      //    if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested)
      //      workItem.TaskSource.SetCanceled();
      //    else
      //      try
      //      {
      //      System.Diagnostics.Debug.WriteLine("PCQueueLength => " + _taskQ[0].Count + " / " + _taskQ[1].Count);
      //      workItem.Action();
      //      //workItem.TaskSource.SetResult(null);   // Indicate completion
      //      }
      //      catch (Exception ex)
      //      {
      //      workItem.TaskSource.SetException(ex);
      //      }
      //    }
      //    else
      //    System.Diagnostics.Debug.WriteLine("Error while taking from taskQ");
      //  }
      //  catch (OperationCanceledException)
      //  {
      //    //Ex thrown by TryTake, Take, TakeAny when CancelationToken is set at Cancel => No action
      //  }
      //  catch (Exception)
      //  {//TODO add a log entry
      //    throw;
      //  }
    }
  
    //private bool CheckStillWorkItems()
    //{
    //  bool booResult = true;
    //  for (int intLoop = 0; intLoop < _taskQ.Length; intLoop++)
    //    booResult &= _taskQ[intLoop].IsCompleted;
    //  return !booResult;
    //}
  }
}
reply