Task Parallel Library: A scheduler with priority, apartment state and maximum degree of parallelism

Here is an alternative scheduler for the Task Parallel Library. This one lets you set the maximum degree of concurrency, the apartment state and the thread priority for your work queue. To use it simply pass a new instance to the TaskFactory constructor and then schedule tasks on that TaskFactory.

[csharp] using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Threading; using System.Collections.Concurrent; using System.Diagnostics;

namespace Utility { /// /// TaskScheduler that uses the task parallel library, complete with all the new wonderful task cancellation features /// This one adds the ability to set ApartmentState, ThreadPriority, MaximumConcurrency and does some logging /// public class TaskScheduler2 : TaskScheduler, IDisposable { private ApartmentState apartmentState; private ThreadPriority threadPriority;

private readonly List threads;

private BlockingCollection


/// /// An MTA, BelowNormal TaskScheduler with the appropriate number of threads /// public TaskScheduler2 (int numberOfThreads) : this(numberOfThreads, ApartmentState.MTA, ThreadPriority.BelowNormal) { }

public TaskScheduler2(int numberOfThreads, ApartmentState apartmentState, ThreadPriority threadPriority) { this.apartmentState = apartmentState; this.threadPriority = threadPriority;

if (numberOfThreads \< 1) throw new ArgumentOutOfRangeException(“numberOfThreads”);

tasks = new BlockingCollection


threads = Enumerable.Range(0, numberOfThreads).Select(i => { var thread = new Thread(() => { foreach (var task in tasks.GetConsumingEnumerable()) { ExecuteTaskWithTiming(task, “queued”); } }); thread.IsBackground = true; thread.Priority = this.threadPriority; thread.SetApartmentState(this.apartmentState); return thread; }).ToList();

threads.ForEach(t => t.Start()); }

protected override void QueueTask(Task task) { tasks.Add(task); }

protected override IEnumerable

GetScheduledTasks() { return tasks.ToArray(); }

protected override bool TryExecuteTaskInline(Task task, bool taskWasPreviouslyQueued) { // this is used to execute the Task on the thread that is waiting for it - i.e. INLINE // it needs to check the Apartment state and any other requirements if (Thread.CurrentThread.GetApartmentState() != this.apartmentState) return false; // can’t execute on wrong Appt state if (Thread.CurrentThread.Priority != this.threadPriority) return false; // can’t execute on wrong priority of thread either return ExecuteTaskWithTiming(task, “inline”); }

private bool ExecuteTaskWithTiming(Task task, string contextInfo) { Stopwatch sw = Stopwatch.StartNew(); Debug.WriteLine(“Starting “ + contextInfo + “ task”); bool ok = TryExecuteTask(task); Debug.WriteLine(“Ending “ + contextInfo + “ task, took “ + sw.ElapsedMilliseconds + “ms”); return ok; }

protected override bool TryDequeue(Task task) { return base.TryDequeue(task); }

public override int MaximumConcurrencyLevel { get { return threads.Count; } }

public void Dispose() { if (tasks != null) { tasks.CompleteAdding();

foreach (var thread in threads) thread.Join();

tasks.Dispose(); tasks = null; } } } } [/csharp]

Thu Nov 25 2010 07:24:13 GMT-0800 (Pacific Standard Time)

Next page: Holiday Season (Christmas) in our Smart Home

Previous page: MongoDB Map-Reduce - Hints and Tips