Posts tagged Task
Constrained parallelism for the Task Parallel Library
Sep 1st
When developing .NET applications there is often the need to execute multiple background processes, for example, fetching and rendering different size thumbnails for images. Typically you queue actions like these onto the thread pool. But in the case of thumbnail generation you typically want to fetch a base image first and then perform the resize operations on it. If five web pages each request a different thumbnail size simultaneously you may end up fetching the same image five times before processing it. Of course, you can add file based locking around this to ensure that only the first once gets to fetch the data but it would be much better if you could instead instruct the Task Parallel Library to execute co-dependent tasks sequentially.
The new Task parallel library has continuations that allow one task to chain onto the end of a previous task but you still a way to track all the tasks currently active so you can find the other task to chain onto it. In a multi-threaded asp.net environment that’s not so easy.
Below is a TaskFactory that gives you constrained parallelism allowing you to queue up tasks in such a way that no two tasks with the same key will execute in parallel. To use it you simply create a new TaskFactorySequentiallyByKey and then call StartNewChainByKey() with a suitable key, e.g. “RENDERimage12345.jpg”. This method returns a normal Task object that you can Wait on or add more continuations. All the usual TaskFactory constructor options are provided so you can have a different TaskScheduler, common cancellation token, and other options.
Note also that it expects an Action<CancellationToken> not just a plain Action. This is so your Action can be polite and monitor the cancellation token to know when to stop early. If you don’t need that you can always pass in a closure that tosses the CancellationToken, i.e. (token) => MyAction().
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;
namespace Utility
{
/// <summary>
/// The TaskFactorySequentiallyByKey factory limits concurrency when actions are passed with the same key. Those actions are executed sequentially
/// and never in parallel.
/// </summary>
/// <remarks>
/// For example, you have an action to fetch an image from the web to a local hard drive and then render a specific size of thumbnail for it.
/// The action includes code to check if the original image is already on disk, if not it fetches it.
/// It then checks if the correct size thumbnail has been rendered, if not it renders it.
/// You want to be able to fire off requests for thumbnails from multiple different asp.net web pages and ensure that any two requests for the
/// same original image are executed sequentially so that the image is only fetched once from the web before both thumbnail renders run.
/// </remarks>
public class TaskFactorySequentiallyByKey : TaskFactory
{
/// <summary>
/// Tasks currently queued based on key
/// </summary>
Dictionary<string, Task> inUse = new Dictionary<string, Task>();
public TaskFactorySequentiallyByKey()
: base()
{
}
public TaskFactorySequentiallyByKey(CancellationToken cancellationToken)
: base(cancellationToken)
{ }
public TaskFactorySequentiallyByKey(TaskScheduler scheduler)
: base(scheduler)
{ }
public TaskFactorySequentiallyByKey(TaskCreationOptions creationOptions, TaskContinuationOptions continuationOptions)
: base(creationOptions, continuationOptions)
{ }
public TaskFactorySequentiallyByKey(CancellationToken cancellationToken, TaskCreationOptions creationOptions, TaskContinuationOptions continuationOptions, TaskScheduler scheduler)
: base(cancellationToken, creationOptions, continuationOptions, scheduler)
{ }
protected virtual void FinishedUsing(string key, Task taskThatJustCompleted)
{
lock (this.inUse)
{
// If the key is present AND it point to the task that just finished THEN we are done
// and can clear the key for the next task that comes in ...
if (this.inUse.ContainsKey(key))
if (this.inUse[key] == taskThatJustCompleted)
{
this.inUse.Remove(key);
Debug.WriteLine("Finished using " + key + " completely");
}
else
{
Debug.WriteLine("Finished an item for " + key);
}
}
}
/// <summary>
/// Queue an action but prevent parallel execution of items having the same key. Instead, run them sequentially.
/// </summary>
/// <remarks>
/// This allows you to, for example, queue up tasks to fetch an image from the web to a cache and render a thumbnail for it at different sizes
/// while ensuring that the image is only fetched to the cache once before each different size thumbnail is generated
/// </remarks>
public Task StartNewChainByKey(string key, Action<CancellationToken> action)
{
return StartNewChainByKey(key, action, base.CancellationToken);
}
/// <summary>
/// Queue an action but prevent parallel execution of items having the same key. Instead, run them sequentially.
/// </summary>
/// <remarks>
/// This allows you to, for example, queue up tasks to fetch an image from the web to a cache and render a thumbnail for it at different sizes
/// while ensuring that the image is only fetched to the cache once before each different size thumbnail is generated
/// </remarks>
public Task StartNewChainByKey(string key, Action<CancellationToken> action, CancellationToken cancellationToken)
{
CancellationToken combined = cancellationToken == base.CancellationToken ? base.CancellationToken :
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, base.CancellationToken).Token;
lock (inUse)
{
Task result;
if (inUse.TryGetValue(key, out result))
{
// chain the supplied action after it ...
result = result.ContinueWith((task) => action(combined), combined);
// And then schedule a completion check after that
result.ContinueWith((task) => FinishedUsing(key, task));
// Update the dictionary so that it tracks the new LAST task in line, not any of the earlier ones
inUse[key] = result;
Debug.WriteLine("Chained onto " + key);
return result;
}
// otherwise simply create it and start it after remembering that the key is in use
result = new Task(() => action(combined), combined);
inUse.Add(key, result);
// queue up the check after it
result.ContinueWith((task) => FinishedUsing(key, task));
Debug.WriteLine("Starting a new action for " + key);
// And finally start it
result.Start(this.Scheduler);
return result;
}
}
}
}
Singleton tasks: A TaskFactory for the Task Parallel Library with ‘run-only-one’ semantics
Sep 1st
When developing .NET applications there is often the need to execute some slow background process repeatedly. For example, fetching a feed from a remote site, updating a user’s last logged in time, … etc. Typically you queue actions like these onto the thread pool. But under load that becomes problematic as requests may be coming in faster than you can service them, the queue builds up and you are now executing multiple requests for the same action when you only really needed to do one. Even when not under load, if two users request a web page that requires the same image to be loaded and resized for display you only want to fetch it and resize it once. What you really want is an intelligent work queue that can coalesce multiple requests for the same action into a single action that gets executed just once.
The new Task parallel library doesn’t have anything that can handle these ‘run-only-one’ actions directly but it does have all the necessary building blocks to build one by creating a new TaskFactory and using Task continuations.
Below is a TaskFactory that gives you ‘run-only-one’ actions. To use it you simply create a new TaskFactoryLimitOneByKey and then call StartNewOrUseExisting() with a suitable key, e.g. “FETCH/cache/image12345.jpg”. This method returns a normal Task object that you can Wait on or add more continuations. All the usual TaskFactory constructor options are provided so you can have a different TaskScheduler, common cancellation token, and other options.
Note also that it expects an Action<CancellationToken> not just a plain Action. This is so your Action can be polite and monitor the cancellation token to know when to stop early. If you don’t need that you can always pass in a closure that tosses the CancellationToken, i.e. (token) => MyAction().
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;
namespace Utility
{
/// <summary>
/// A task factory where Tasks are queued up with a key and only one of that key is allowed to exist either in the queue or executing
/// </summary>
/// <remarks>
/// This is useful for tasks like fetching a file from backing store, or updating local information from a remote service
/// You want to be able to queue up a Task to go do the work but you don't want it to happen 5 times in quick succession
/// NB: This does not absolve you from using file locking and other techniques in your method to handle simultaneous requests,
/// it just greatly reduces the chances of it happening. Another example would be updating a user's last logged in data in a
/// database. Under heavy load the queue to write to the database may be getting long and you don't want to update it for the same
/// user repeatedly if you can avoid it with a single write.
/// </remarks>
public class TaskFactoryLimitOneByKey : TaskFactory
{
/// <summary>
/// Tasks currently queued based on key
/// </summary>
Dictionary<string, Task> inUse = new Dictionary<string, Task>();
public TaskFactoryLimitOneByKey()
: base()
{
}
public TaskFactoryLimitOneByKey(CancellationToken cancellationToken)
: base(cancellationToken)
{ }
public TaskFactoryLimitOneByKey(TaskScheduler scheduler)
: base(scheduler)
{ }
public TaskFactoryLimitOneByKey(TaskCreationOptions creationOptions, TaskContinuationOptions continuationOptions)
: base(creationOptions, continuationOptions)
{ }
public TaskFactoryLimitOneByKey(CancellationToken cancellationToken, TaskCreationOptions creationOptions, TaskContinuationOptions continuationOptions, TaskScheduler scheduler)
: base(cancellationToken, creationOptions, continuationOptions, scheduler)
{ }
protected virtual void FinishedUsing(string key, Task taskThatJustCompleted)
{
lock (this.inUse)
{
// If the key is present AND it point to the task that just finished THEN we are done
// and can clear the key so that the next task coming in using it will get to execute ...
if (this.inUse.ContainsKey(key))
if (this.inUse[key] == taskThatJustCompleted)
{
this.inUse.Remove(key);
Debug.WriteLine("Finished using " + key + " completely");
}
else
{
Debug.WriteLine("Finished an item for " + key);
}
}
}
/// <summary>
/// Queue only one of a given action based on a key. A singleton pattern for Tasks with the same key.
/// </summary>
/// <remarks>
/// This allows you to queue up a request to, for example, render a file based on the file name
/// Even if multiple users all request the file at the same time, only one render will ever run
/// and they can all wait on that Task to complete.
/// </remarks>
public Task StartNewOrUseExisting(string key, Action<CancellationToken> action)
{
return StartNewOrUseExisting(key, action, base.CancellationToken);
}
/// <summary>
/// Queue only one of a given action based on a key. A singleton pattern for Tasks with the same key.
/// </summary>
/// <remarks>
/// This allows you to queue up a request to, for example, render a file based on the file name
/// Even if multiple users all request the file at the same time, only one render will ever run
/// and they can all wait on that Task to complete.
/// </remarks>
public Task StartNewOrUseExisting (string key, Action<CancellationToken> action, CancellationToken cancellationToken)
{
CancellationToken combined = cancellationToken == base.CancellationToken ? base.CancellationToken :
CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, base.CancellationToken).Token;
lock (inUse)
{
if (inUse.ContainsKey(key))
{
Debug.WriteLine("Reusing existing action for " + key);
return inUse[key]; // and toss the new action away
}
// otherwise, make a new one and add it ... with a continuation on the end to pull it off ...
Task result = new Task(() => action(combined), combined);
inUse.Add(key, result);
// queue up the check after it
result.ContinueWith((finished) => this.FinishedUsing(key, result));
Debug.WriteLine("Starting a new action for " + key);
// and finally start it
result.Start(this.Scheduler);
return result;
}
}
}
}