Constrained parallelism for the Task Parallel Library
When developing .NET applications there is often the need to execute multiple background processes, for example, fetching and rendering different size thumbnails for images. Typically you queue actions like these onto the thread pool. But in the case of thumbnail generation you typically want to fetch a base image first and then perform the resize operations on it. If five web pages each request a different thumbnail size simultaneously you may end up fetching the same image five times before processing it. Of course, you can add file based locking around this to ensure that only the first once gets to fetch the data but it would be much better if you could instead instruct the Task Parallel Library to execute co-dependent tasks sequentially.
The new Task parallel library has continuations that allow one task to chain onto the end of a previous task but you still a way to track all the tasks currently active so you can find the other task to chain onto it. In a multi-threaded asp.net environment that's not so easy.
Below is a TaskFactory that gives you constrained parallelism allowing you to queue up tasks in such a way that no two tasks with the same key will execute in parallel. To use it you simply create a new TaskFactorySequentiallyByKey and then call StartNewChainByKey() with a suitable key, e.g. “RENDERimage12345.jpg”. This method returns a normal Task object that you can Wait on or add more continuations. All the usual TaskFactory constructor options are provided so you can have a different TaskScheduler, common cancellation token, and other options.
Note also that it expects an Action<CancellationToken> not just a plain Action. This is so your Action can be polite and monitor the cancellation token to know when to stop early. If you don’t need that you can always pass in a closure that tosses the CancellationToken, i.e. (token) => MyAction().
[csharp] using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Threading; using System.Diagnostics;
namespace Utility { /// <summary> /// The TaskFactorySequentiallyByKey factory limits concurrency when actions are passed with the same key. Those actions are executed sequentially /// and never in parallel. /// </summary> /// <remarks> /// For example, you have an action to fetch an image from the web to a local hard drive and then render a specific size of thumbnail for it. /// The action includes code to check if the original image is already on disk, if not it fetches it. /// It then checks if the correct size thumbnail has been rendered, if not it renders it. /// You want to be able to fire off requests for thumbnails from multiple different asp.net web pages and ensure that any two requests for the /// same original image are executed sequentially so that the image is only fetched once from the web before both thumbnail renders run. /// </remarks> public class TaskFactorySequentiallyByKey : TaskFactory { /// <summary> /// Tasks currently queued based on key /// </summary> Dictionary<string, Task> inUse = new Dictionary<string, Task>();
public TaskFactorySequentiallyByKey() : base() { }
public TaskFactorySequentiallyByKey(CancellationToken cancellationToken) : base(cancellationToken) { }
public TaskFactorySequentiallyByKey(TaskScheduler scheduler) : base(scheduler) { }
public TaskFactorySequentiallyByKey(TaskCreationOptions creationOptions, TaskContinuationOptions continuationOptions) : base(creationOptions, continuationOptions) { }
public TaskFactorySequentiallyByKey(CancellationToken cancellationToken, TaskCreationOptions creationOptions, TaskContinuationOptions continuationOptions, TaskScheduler scheduler) : base(cancellationToken, creationOptions, continuationOptions, scheduler) { }
protected virtual void FinishedUsing(string key, Task taskThatJustCompleted) { lock (this.inUse) { // If the key is present AND it point to the task that just finished THEN we are done // and can clear the key for the next task that comes in ... if (this.inUse.ContainsKey(key)) if (this.inUse[key] == taskThatJustCompleted) { this.inUse.Remove(key); Debug.WriteLine("Finished using " + key + " completely"); } else { Debug.WriteLine("Finished an item for " + key); }
} }
/// <summary> /// Queue an action but prevent parallel execution of items having the same key. Instead, run them sequentially. /// </summary> /// <remarks> /// This allows you to, for example, queue up tasks to fetch an image from the web to a cache and render a thumbnail for it at different sizes /// while ensuring that the image is only fetched to the cache once before each different size thumbnail is generated /// </remarks> public Task StartNewChainByKey(string key, Action<CancellationToken> action) { return StartNewChainByKey(key, action, base.CancellationToken); }
/// <summary> /// Queue an action but prevent parallel execution of items having the same key. Instead, run them sequentially. /// </summary> /// <remarks> /// This allows you to, for example, queue up tasks to fetch an image from the web to a cache and render a thumbnail for it at different sizes /// while ensuring that the image is only fetched to the cache once before each different size thumbnail is generated /// </remarks> public Task StartNewChainByKey(string key, Action<CancellationToken> action, CancellationToken cancellationToken) { CancellationToken combined = cancellationToken == base.CancellationToken ? base.CancellationToken : CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, base.CancellationToken).Token;
lock (inUse) { Task result; if (inUse.TryGetValue(key, out result)) { // chain the supplied action after it ... result = result.ContinueWith((task) => action(combined), combined);
// And then schedule a completion check after that result.ContinueWith((task) => FinishedUsing(key, task));
// Update the dictionary so that it tracks the new LAST task in line, not any of the earlier ones inUse[key] = result;
Debug.WriteLine("Chained onto " + key);
return result; }
// otherwise simply create it and start it after remembering that the key is in use result = new Task(() => action(combined), combined);
inUse.Add(key, result);
// queue up the check after it result.ContinueWith((task) => FinishedUsing(key, task));
Debug.WriteLine("Starting a new action for " + key);
// And finally start it result.Start(this.Scheduler);
return result; } } } }