finally I have implemented a load balancing between several blockingcollection. The drawback with this implementation is that the FIFO option si no more implemented nevertheless, it fits my meeds.
The takefromany and addtoany methods doesn't implement loadbalacning, it clears or add to the first blockingcollection and then the second one once this one is full. I have not made lot of tests but the ones I have done demonstrate this behaviour.
Below is my source code. I would be happy if you could add some comments on it if it can be potimized.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Collections.Concurrent;
namespace OT.Eqpt.WorkShop.LogListen
{
public class PCQueueAction : IDisposable
{
protected class WorkItem
{
public readonly TaskCompletionSource<object> TaskSource;
public readonly Action Action;
public readonly CancellationToken? CancelToken;
public WorkItem(TaskCompletionSource<object> taskSource, Action action, CancellationToken? cancelToken)
{
TaskSource = taskSource;
Action = action;
CancelToken = cancelToken;
}
}
/// <summary>
/// Cancel workitem tasks and BlockingCollection methods like Take, Add tryTake...
/// </summary>
protected CancellationTokenSource _CancelToken = new CancellationTokenSource();
/// <summary>
/// BlockingCollection array which store the workitems.
/// </summary>
protected BlockingCollection<WorkItem>[] _taskQ;
/// <summary>
/// Used for inserting a workitem in a BlockingCollection with loadbalancing
/// </summary>
protected int _intCurrentTaskQ = 0;
/// <summary>
/// Instantiate Producer Consumer Queue Object.
/// <para>Consumer tasks are divided equally on all BlockingCollection. For instance 2 BC and 5 tasks => 3 tasks for first BC and 2 tasks for 2nd BC </para>
/// </summary>
/// <param name="pintConsumerCount">Consumer Task quantity splitted by BlockingCollection</param>
/// <param name="pintBlockingCollCount">BlockingCollection quantity for load balancing</param>
public PCQueueAction(int pintConsumerCount, int pintBlockingCollCount)
{
if ((pintConsumerCount == 0) || (pintBlockingCollCount == 0))
throw new ArgumentOutOfRangeException("Consumer or BlockingCollection quantity must be > 0");
if (pintConsumerCount < pintBlockingCollCount)
throw new ArgumentException("Consumer count must be superior to BlockingCollection count.");
_taskQ = new BlockingCollection<WorkItem>[pintBlockingCollCount];
for (int intLoop = 0; intLoop < pintBlockingCollCount; intLoop++)
_taskQ[intLoop] = new BlockingCollection<WorkItem>();
// Create and start a separate Task for each consumer and attribute equally threads for all BlockingCollections.
for (int intLoop = 0; intLoop < pintConsumerCount; intLoop++)
Task.Factory.StartNew(() => Consume(_taskQ[intLoop % _taskQ.Length]), TaskCreationOptions.LongRunning);
}
public void Dispose()
{
_CancelToken.Cancel(); //Cancel the TryTakeAny, TryTake or Take in Consume method if in blocking mode
for (int intLoop = 0; intLoop < _taskQ.Length; intLoop++)
_taskQ[intLoop].CompleteAdding();
}
public Task EnqueueTask(Action action)
{
return EnqueueTask(action, _CancelToken.Token);//By default set Canceltoken to the class field CancelToken
}
public Task EnqueueTask(Action action, CancellationToken? cancelToken)
{
try
{
var tcs = new TaskCompletionSource<object>();
//Load balance work items in Blocking Collection array
_taskQ[_intCurrentTaskQ % _taskQ.Length].Add(new WorkItem(tcs, action, cancelToken));
_intCurrentTaskQ = (_intCurrentTaskQ == Int32.MaxValue) ? 0 : _intCurrentTaskQ + 1;
return tcs.Task;
}
catch (OperationCanceledException)
{
return null;
//Ex thrown by TryTake, Take, TakeAny when CancelationToken is set at Cancel => No action
}
catch (Exception)
{
throw;
}
}
void Consume(BlockingCollection<WorkItem> pBlockColl)
{
try
{
foreach (WorkItem workItem in pBlockColl.GetConsumingEnumerable())
{
if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested)
workItem.TaskSource.SetCanceled();
else
try
{
workItem.Action();
workItem.TaskSource.SetResult(null); // Indicate completion
}
catch (Exception ex)
{
workItem.TaskSource.SetException(ex);
}
}
}
catch (OperationCanceledException)
{
//Ex thrown by TryTake, Take, TakeAny when CancelationToken is set at Cancel => No action
}
catch (Exception)
{//TODO add a log entry
throw;
}
//WorkItem workItem;
//while (CheckStillWorkItems())
//{
// try
// {
// if (BlockingCollection<WorkItem>.TakeFromAny(_taskQ, out workItem, _CancelToken.Token) != -1)
// {
// if (workItem.CancelToken.HasValue && workItem.CancelToken.Value.IsCancellationRequested)
// workItem.TaskSource.SetCanceled();
// else
// try
// {
// System.Diagnostics.Debug.WriteLine("PCQueueLength => " + _taskQ[0].Count + " / " + _taskQ[1].Count);
// workItem.Action();
// //workItem.TaskSource.SetResult(null); // Indicate completion
// }
// catch (Exception ex)
// {
// workItem.TaskSource.SetException(ex);
// }
// }
// else
// System.Diagnostics.Debug.WriteLine("Error while taking from taskQ");
// }
// catch (OperationCanceledException)
// {
// //Ex thrown by TryTake, Take, TakeAny when CancelationToken is set at Cancel => No action
// }
// catch (Exception)
// {//TODO add a log entry
// throw;
// }
}
//private bool CheckStillWorkItems()
//{
// bool booResult = true;
// for (int intLoop = 0; intLoop < _taskQ.Length; intLoop++)
// booResult &= _taskQ[intLoop].IsCompleted;
// return !booResult;
//}
}
}