Singleton tasks: A TaskFactory for the Task Parallel Library with 'run-only-one' semantics
When developing .NET applications there is often the need to execute some slow background process repeatedly. For example, fetching a feed from a remote site, updating a user's last logged in time, ... etc. Typically you queue actions like these onto the thread pool. But under load that becomes problematic as requests may be coming in faster than you can service them, the queue builds up and you are now executing multiple requests for the same action when you only really needed to do one. Even when not under load, if two users request a web page that requires the same image to be loaded and resized for display you only want to fetch it and resize it once. What you really want is an intelligent work queue that can coalesce multiple requests for the same action into a single action that gets executed just once.
The new Task parallel library doesn't have anything that can handle these 'run-only-one' actions directly but it does have all the necessary building blocks to build one by creating a new TaskFactory and using Task continuations.
Below is a TaskFactory that gives you 'run-only-one' actions. To use it you simply create a new TaskFactoryLimitOneByKey and then call StartNewOrUseExisting() with a suitable key, e.g. "FETCH/cache/image12345.jpg". This method returns a normal Task object that you can Wait on or add more continuations. All the usual TaskFactory constructor options are provided so you can have a different TaskScheduler, common cancellation token, and other options.
Note also that it expects an Action<CancellationToken> not just a plain Action. This is so your Action can be polite and monitor the cancellation token to know when to stop early. If you don't need that you can always pass in a closure that tosses the CancellationToken, i.e. (token) => MyAction().
[csharp] using System; using System.Collections.Generic; using System.Linq; using System.Text; using System.Threading.Tasks; using System.Threading; using System.Diagnostics;
namespace Utility { /// <summary> /// A task factory where Tasks are queued up with a key and only one of that key is allowed to exist either in the queue or executing /// </summary> /// <remarks> /// This is useful for tasks like fetching a file from backing store, or updating local information from a remote service /// You want to be able to queue up a Task to go do the work but you don't want it to happen 5 times in quick succession /// NB: This does not absolve you from using file locking and other techniques in your method to handle simultaneous requests, /// it just greatly reduces the chances of it happening. Another example would be updating a user's last logged in data in a /// database. Under heavy load the queue to write to the database may be getting long and you don't want to update it for the same /// user repeatedly if you can avoid it with a single write. /// </remarks> public class TaskFactoryLimitOneByKey : TaskFactory { /// <summary> /// Tasks currently queued based on key /// </summary> Dictionary<string, Task> inUse = new Dictionary<string, Task>();
public TaskFactoryLimitOneByKey() : base() { }
public TaskFactoryLimitOneByKey(CancellationToken cancellationToken) : base(cancellationToken) { }
public TaskFactoryLimitOneByKey(TaskScheduler scheduler) : base(scheduler) { }
public TaskFactoryLimitOneByKey(TaskCreationOptions creationOptions, TaskContinuationOptions continuationOptions) : base(creationOptions, continuationOptions) { }
public TaskFactoryLimitOneByKey(CancellationToken cancellationToken, TaskCreationOptions creationOptions, TaskContinuationOptions continuationOptions, TaskScheduler scheduler) : base(cancellationToken, creationOptions, continuationOptions, scheduler) { }
protected virtual void FinishedUsing(string key, Task taskThatJustCompleted) { lock (this.inUse) { // If the key is present AND it point to the task that just finished THEN we are done // and can clear the key so that the next task coming in using it will get to execute ... if (this.inUse.ContainsKey(key)) if (this.inUse[key] == taskThatJustCompleted) { this.inUse.Remove(key); Debug.WriteLine("Finished using " + key + " completely"); } else { Debug.WriteLine("Finished an item for " + key); }
} }
/// <summary> /// Queue only one of a given action based on a key. A singleton pattern for Tasks with the same key. /// </summary> /// <remarks> /// This allows you to queue up a request to, for example, render a file based on the file name /// Even if multiple users all request the file at the same time, only one render will ever run /// and they can all wait on that Task to complete. /// </remarks> public Task StartNewOrUseExisting(string key, Action<CancellationToken> action) { return StartNewOrUseExisting(key, action, base.CancellationToken); }
/// <summary> /// Queue only one of a given action based on a key. A singleton pattern for Tasks with the same key. /// </summary> /// <remarks> /// This allows you to queue up a request to, for example, render a file based on the file name /// Even if multiple users all request the file at the same time, only one render will ever run /// and they can all wait on that Task to complete. /// </remarks> public Task StartNewOrUseExisting (string key, Action<CancellationToken> action, CancellationToken cancellationToken) { CancellationToken combined = cancellationToken == base.CancellationToken ? base.CancellationToken : CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, base.CancellationToken).Token; lock (inUse) { if (inUse.ContainsKey(key)) { Debug.WriteLine("Reusing existing action for " + key); return inUse[key]; // and toss the new action away }
// otherwise, make a new one and add it ... with a continuation on the end to pull it off ... Task result = new Task(() => action(combined), combined); inUse.Add(key, result);
// queue up the check after it result.ContinueWith((finished) => this.FinishedUsing(key, result));
Debug.WriteLine("Starting a new action for " + key);
// and finally start it result.Start(this.Scheduler); return result; } } } } [/csharp]