Multithreading

Source Code

All source code can be found on GitHub here.

This is part of my HowTo in .NET seriies. An overview can be seen here,


Intro

Multithreading is the concept of creating and utilising multiple threads. There are a number of reasons for this:

* Split processor heavy work between all available processors to increase performance
* Prevent synchronous method calls from blocking the current thread:
* Web services
* Database Calls
* Prevent calls from the UI appearing unresponsive

ThreadStart & ParameterizedThreadStart

ThreadStart and the ParameterizedThreadStart class provide functionality for executing code within a new thread, with or without parameters.

ThreadStart

ThreadStart allows spawning code in a new thread for a parameterless delegate which returns void. Execution only starts once Start has been called upon the thread.

var aThread = new Thread(new ThreadStart(MyMethod))
{
    Name = "My Thread",     
    Priority = ThreadPriority.Highest
};      
    
aThread.Start();   

ParameterizedThreadStart

ParameterizedThreadStart allows spawning code in a new thread for a delegate which takes one parameter of type object and returns void. The object parameter is passed into the start method.

Where multiple parameters are required, these should be wrapped up in a class and passed in.

var aThread = new Thread (new ParameterizedThreadStart (MyMethod)) {
    Name = "My Thread",     
    Priority = ThreadPriority.Highest
};      

aThread.Start (new object());

Cancelling

A CancellationTokenSource instance can be used to cancel a thread.

CancellationTokenSource can be passed in directly, via a member variable of a user defined params object or simply made accessible to all required threads.

Cancel() is called upon the token if the thread should be cancelled.

var cts = new CancellationTokenSource ();

var aThread = new Thread (new ParameterizedThreadStart (MyMethod)) {
    Name = "My Thread",     
    Priority = ThreadPriority.Highest
};      

aThread.Start (cts.Token);

Thread.Sleep (10000);

cts.Cancel ();

The executing thread should monitor IsCancellationRequested or alternatively call ThrowIfCancellationRequested. ThrowIfCancellationRequested is semantically equal to throwing an OperationCanceledException when IsCancellationRequested returns true;

private void MyMethod (object obj)
{
    CancellationToken token = (CancellationToken)obj;


    for (int i = 0; i < Int32.MaxValue; i++) {    
        // Causes an exception to be raised if cancellation is called
        token.ThrowIfCancellationRequested ();


        // Alternatively check the token manually if cancellation has been raised
        if (token.IsCancellationRequested) {
            throw new OperationCanceledException (token);                
        }
    }
}

Timer Class

System.Threading.Timer allows periodic event signalling via the TimerCallback delegate.

The TimerCallback delegate returns void and takes a single parameter of type object.

A Timer is invoked with the delegate, the param object, the time in milliseconds before starting, and the time in milliseconds between events.

The following calls the method MyMethod with a params object after 2 seconds and every second afterwards:

var t = new Timer(
new TimerCallback(MyMethod), new object() , 2000, 1000); 

CLR ThreadPool

Threads are an expensive resource to spawn and terminate, as such the CLR maintains a pool of worker threads you can use.

All threads in the ThreadPool are background threads with a normal priority.

You can request work from a pool thread with the WaitCallback delegate. It takes one parameter of type object and returns void:

var wcb = new WaitCallback(DoMethod);    

The delegate is assigned via the ThreadPool class:

ThreadPool.QueueUserWorkItem(wcb, new object()); 

ThreadPool can help to ensure that the number of concurrent running threads is kept within a required tolerance, this can ensure that a server does not buckle over from trying to do too many things at one time.

Due to the nature of ThreadPool and the limited number of threads contained within, you can not guarantee the length of time before your process begins.

CancellationTokenSource can be used to cancel an executing thread on the ThreadPool.

CancellationTokenSource cts = new CancellationTokenSource ();
ThreadPool.QueueUserWorkItem (new WaitCallback (DoMethod), cts.Token);
cts.Cancel ();

static void DoMethod (object obj)
{
    CancellationToken token = (CancellationToken)obj;


    for (int i = 0; i < Int32.MaxValue; i++) {    
        token.ThrowIfCancellationRequested ();            
    }
}

Task Parallel Library

Microsoft introduced the Task Parallel Library (TPL) in .NET 4.0.

The library automatically splits up an applications workload across all available CPUs dynamically by requesting worker threads within the CLR thread pool. It provides the classes Parallel and Task.

Parallel

System.Threading.Tasks.Parallel can be used to perform an operation upon all elements in a collection or a custom counter.

The operations can take any of the System.Func or System.Action delegates.

Parallel does not guarantee the order of execution and as such should not be used when the order of execution matters.

Parallel should not be used upon small sets of data or when threads requires resources which require synchronization as this can actually decrease performance.

ForEach

To perform a delegate for each element in a collection:

Parallel.ForEach (new List<int>(), anElement => { 
    /*Some Process */ 
});

For

To perform a countered loop, where the iteration count i is available inside the iteration:

Parallel.For (0, Int32.MaxValue, i => { 
    /* Some Process */ 
});

Break and Stop

ParallelLoopState exposes Stop() and Break(). Break() will cause all iterations with an index less than the callers to finish before the loop is terminated. Stop will simply stop as soon as it is conveniently possible.

Parallel.For (0, Int32.MaxValue, (i, loopState) => { 
    loopState.Stop();
    loopState.Break();
    /*Some Process */ 
});

Invoke

Allows executing a series of operations, the CLR will attempt to do them in parallel.

If the operations modify a shared resource the operation will automatically not be executed in parallel.

Parallel.Invoke( 
    () => { /* Do something #1 */ }, 
    () => { /* Do something #2 */ }, 
    () => { /* Do something #3 */ }, 
    () => { /* o something #4  */ });

Task Class

A simple alternative to asynchronous delegates:

Task.Factory.StartNew (() => {
    DoSomething ();
});  

A task scheduler is responsible for starting and managing tasks. It will by default use threads from the ThreadPool.

The task can be created and started in separate steps:

var aTask = new Task (() => {
    DoSomething ();
});

aTask.Start ();

The wait function can be called to pause the current thread until the task has finished:

aTask.Wait ();

Alternatively the task can be run synchronously:

aTask.RunSynchronously ();

Results can be returned with the Result property. This will block the current thread until the thread has finished executing. Result can only be called on Task:

var aTask = new Task<bool> (() => {
    return true;
});

var result = aTask.Result;

Continuation Task

The ContinueWith method can be used to execute another process as soon as a Task finishes.

Task<bool> t = Task.Run (() => {
    return true;
}).ContinueWith ((x) => {
    return !x.Result;
}); 

var result = t.Result;

ContinueWith can also take a TaskContinuationOptions enum to denote when the method should run; OnlyOnRanToCompletion,OnlyOnFaulted and OnlyOnCanceled

Task<string> t = Task.Run (() => {
    return string.Empty;
}); 

t.ContinueWith ((i) => {
    return "OnlyOnCanceled";
}, TaskContinuationOptions.OnlyOnCanceled); 

t.ContinueWith ((i) => {
    return "OnlyOnFaulted";
}, TaskContinuationOptions.OnlyOnFaulted); 

t = t.ContinueWith ((i) => {
    return "OnlyOnRanToCompletion";
}, TaskContinuationOptions.OnlyOnRanToCompletion); 

var result = t.Result;

Child Tasks

Child tasks can also be assigned when running a Task. Here the parent task will not be deemed as finished until all child tasks have finished. Child Tasks are added with TaskCreationOptions.AttachedToParent.

  Task<int[]> parent = Task.Run (() => {
    var results = new int[2]; 
    new Task (() => results [0] = 0,                     
        TaskCreationOptions.AttachedToParent).Start ();                 
    new Task (() => results [1] = 1, 
        TaskCreationOptions.AttachedToParent).Start ();                       
    return results;
});             

var finalTask = parent.ContinueWith (
                 parentTask => {


        var count = 0;
        foreach (int i in parentTask.Result) {
            count += i;
        }
        return count;              
    });  

finalTask.Wait ();       
var finalResult = finalTask.Result;    

TaskFactory

A TaskFactory can be used to define settings for child tasks:

Task<int[]> parent = Task.Run (() => {

    var results = new int[2];                   


    TaskFactory tf = new TaskFactory (
                         TaskCreationOptions.AttachedToParent,                    
                         TaskContinuationOptions.ExecuteSynchronously);                 

    tf.StartNew (() => 0);                 
    tf.StartNew (() => 1);                 

    return results;
});               

var finalTask = parent.ContinueWith (
                    parentTask => { 
        var count = 0;
        foreach (int i in parentTask.Result) {
            count += i;
        }

        return count;                
    });

finalTask.Wait ();
var finalResult = finalTask.Result;    

WaitAll

WaitAll can be used to wait for all child tasks to finish executing:

var tasks = new Task[3];            
tasks [0] = Task.Run (() => {
    return 1;
});             

tasks [1] = Task.Run (() => { 
    return 2;
}); 

tasks [2] = Task.Run (() => {
    return 3;
}
);

Task.WaitAll (tasks);  

WaitAny

WaitAny causes the current thread to block until any task has completed processing.

Task<int>[] tasks = new Task<int>[3];

tasks [0] = Task.Run (() => {
    return 1;
});

tasks [1] = Task.Run (() => {
    return 2;
});

tasks [2] = Task.Run (() => {
    return 3;
});

int result = 0;

while (result < 6) { 
    int i = Task.WaitAny (tasks);
    var completedTask = tasks [i];                 

    result += completedTask.Result;

    var taskList = tasks.ToList ();
    taskList.RemoveAt (i);

    tasks = taskList.ToArray ();
}

WhenAll

WhenAll can be used to schedule a continuation task to run after all child tasks have finished executing:

var tasks = new Task<int>[3];

tasks [0] = Task.Run (() => {
    return 1;
});

tasks [1] = Task.Run (() => {
    return 2;
});

tasks [2] = Task.Run (() => {
    return 3;
});

int result = 0;
var completionTask = Task.WhenAll (tasks);

completionTask.ContinueWith (x => result += x.Result.Sum( y => y));    

Timing Out A Task

An overload of the WaitAny method takes a maximum wait time. This can be used to cancel a task after a set time period.

var longRunning = Task.Run (() => {
    Thread.Sleep (10000);
    return 1;
}); 

int index = Task.WaitAny (new[] { longRunning }, 1000); 

var result = 0;

if( longRunning.IsCompleted)
    result += longRunning.Result;

Canceling Tasks

Tasks can be called with a CancellationTokenSource.

When cancelling a task with a CancellationTokenSource it will appear to end as if no error has happened. If you require the task to error out upon cancellation then you should throw a OperationCanceledException. An alternative to catching the exception is to use a continuation task with the TaskContinuationOptions.OnlyOnCanceled option.

var cts = new CancellationTokenSource ();
var token = cts.Token;
var isCancelled = false;

Task<bool> task = Task.Run (() => {
    while (!token.IsCancellationRequested) {  
        Thread.Sleep (1000);
    }                       
    throw new OperationCanceledException ();
}, token).ContinueWith ((t) => { 
    return true;
}, TaskContinuationOptions.OnlyOnCanceled);

Thread.Sleep (3000);     

cts.Cancel ();

var result = task.Result;    

Async Keyword

.NET 4.5 now supports the Async keyword, the CLR can call any method asynchronously virtually automatically.

Methods requiring asynchronous calling are marked as async and return either Task or Task depending upon if they return type T or void.

In the following example we have two methods which are tagged as async. They each call a method which returns a Task or Task param.

public async Task<bool> RunWithReturn ()
{
    return await TaskWithReturn ();
}

public async Task RunNoReturn ()
{
    await TaskNoReturn ();
}

private Task<bool>  TaskWithReturn ()
{
    var t = new Task<bool> (() => {
        Thread.Sleep (1);
        return true;
    });

    t.Start ();
    return t;
}

private Task TaskNoReturn ()
{
    var t = new Task (() => {
        Thread.Sleep (1);
        IsSet = true;
    });

    t.Start ();
    return t;
}

These methods returning Task and Task are called as normal. They could be called like:

Task<bool> t1 = TaskWithReturn ();
Task t2 = TaskNoReturn ();

The method is not actually executed until the returned task is called with the await keyword:

var result = await t1;
await t2;

The method call and the await can be wrapped up in one line:

await TaskWithReturn ();
await TaskNoReturn ();

Asynchronous Delegates

Delegates allow asynchronous invocation by BeginInvoke() and EndInvoke()

BeginInvoke() and EndInvoke() are automatically generated by the compiler and have API based upon the parameters and return type of the delegate they were defined against.

BeginInvoke method:

* Takes the the parameters the delegate was defined against
* Takes an optional callback method
* Takes an optional state parameter of type object. This can be cast into any type inside the callback method.
* Returns an IAsyncResult which can help with getting the results back and also with working with the callback

EndInvoke method:

* Returns the same type the delegate was defined against
* Blocks the thread until the the delegate has finished executing

IAsyncResult provides various ways of waiting for the results:

* IsCompleted
* AsyncState
* AsyncWaitHandle
* CompletedSynchronously

IsCompleted

?

Returns true when the delegate has finished running:

delegate  bool AnAction(bool paramA, int param2 );

var d = new AnAction(ActionMethod);   

IAsyncResult ar = d.BeginInvoke(true, 1, null, null);   

do
{  
Thread.Sleep(10000);
 } while(!ar.IsCompleted);   

var result = d.EndInvoke(ar);

AsyncWaitHandle

Wraps up the IsComplete and Sleep from above into one handy function:

var d = new AnAction (DoAction);  
IAsyncResult ar = d.BeginInvoke (true, 1, null, null);   

do {
} while (!ar.AsyncWaitHandle.WaitOne (1000, true));

var result = d.EndInvoke (ar);

AsyncCallback With State

Triggers a callback delegate upon completion.

The callback will be on the same thread as the delegate and not on the main thread.

Here we create a new Foo object which will have all the parameters of the callback wrapped up into one class.

var d = new AnAction(DoAction);  

var ar = d.BeginInvoke(true, 10, 
    new AsyncCallback(MyCallback), 
    new object());

public void MyCallback(IAsyncResult iar) 
{   
    AsyncResult ar = (AsyncResult)iar;   
    var d = (AnAction)ar.AsyncDelegate;
    var result = d.EndInvoke(ar);
    var obj = (object)ar.AsyncState;
} 

Cancelling An Asynchronous Delegate

Cancellation of an executing asynchronous delegate can be performed with the CancellationTokenSource ( See cancelling Tasks).

Parallel LINQ Queries (PLINQ)

PLINQ libraries allow an easy API for running queries in a parallel manner.

PLINQ is requested in code though the CLR will decide if parallel execution will beneficial or not.

System.Linq.ParallelEnumerable contains all the extension methods for PLINQ.

Spawning

Requesting parallel execution is as simple as calling the Parallel method.

Extension methods:

var data = myData.AsParallel().Select( x => x );

Natural methods:

var data = ( from value in myData.AsParallel() select value );

WithDegreeOfParallelism

PLINQ will normally use all available processors to process a query though never more than 64. WithDegreeOfParallelism can be used to restrict the number processors that it it will consume.

var data = 
myData.AsParallel ().WithDegreeOfParallelism(2).Select (x => x);

AsOrdered

PLINQ does not guarantee the order of processing the iterations.

AsOrdered can be called to instruct PLINQ to preserve the ordering within the result set. The query will still be run in parallel.

var data = myData.AsParallel ().Select (x => x).AsOrdered();

AsSequential

AsSequential can be used to force parts of a Linq query to be executed sequentially and not in parallel. Other parts of the query will still be performed in parallel.

var data = myData.AsParallel ().Select (x => x).AsSequential();

ForAll

ForAll allows parallel iteration through a collection. Order is not guaranteed; it will remove any sort order upon a collection and will iterate through each member when the item is available.

var myData = new List<int> () { 1, 2, 3, 4, 5, 6, 7, 8, 9 };

var ints = new List<int> ();

myData.AsParallel ().ForAll (x => ints.Add (x));

Errors

PLINQ will throw an AggregateException when any errors occur within the query when processing. All iterations will be performed; not matter how many errors occur.

Cancelling

The CancellationTokenSource can be used to cancell an executing PLINQ query. This is passed in with the WithCancellation method:

var cs = new CancellationTokenSource ();

Extension methods:

var myData = new List<int> () { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
var data = myData.AsParallel ().
          Select (x => x).WithCancellation (cs.Token);

Natural Linq:

var myData = new List<int> () { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
var data =
    (from Value in myData.AsParallel ().WithCancellation (cs.Token)
     select Value);

On the original thread you can call Cancel upon the token:

cs.Cancel ();

WithExecutionMode

WithExecutionMode can be used for force PLINQ into a parallel query though this should be used with care.

myData.AsParallel ().Select (x => x).
WithExecutionMode (ParallelExecutionMode.ForceParallelism);

File IO

In high latency environments, asynchronous file IO can provide performance gains by releasing threads until the file system responds:

public async Task CreateAndWriteAsyncToFile (byte[] data, string aFileName)
{
    using (FileStream stream = new FileStream (aFileName, 
                                   FileMode.OpenOrCreate,         
                                   FileAccess.Write, 
                                   FileShare.Read, 1024 * 4, true)) {         
        await stream.WriteAsync (data, 0, data.Length);     
    }
}

public async Task<byte[]> ReadFileAsync (string aFileName, int length)
{
    byte[] data = new byte[length];


    using (FileStream stream = new FileStream (aFileName, 
                                   FileMode.Open,         
                                   FileAccess.Read, 
                                   FileShare.Read, 1024 * 4, true)) {         
        await stream.ReadAsync (data, 0, data.Length);     
    }

    return data;
}

Networking

Asynchronous network downloads can be achieved with the GetStringAsync method.

The GetStringAsync uses asynchronous code internally and returns a Task to the caller that will finish when the data is retrieved:

Seeing as GetStringAsync returns a Task instance WhenAll can be used to register a callback when multiple instances are returned:

public async Task<string> ReadAsyncHttpRequest ()
{     
    HttpClient client = new HttpClient ();     
    return await client.GetStringAsync ("http://www.google.com");
}

HttpClient can be found within the System.Net.Http namespace which is found within the assembly System.Net.Http.dll.

public async Task<string> ExecuteMultipleRequestsInParallel ()
{     
    HttpClient client = new HttpClient ();     

    Task<string> one = client.GetStringAsync ("http://www.google.co.uk/");     
    Task<string> two = client.GetStringAsync ("http://monodevelop.com/");     

    await Task.WhenAll (one, two); 

    return one.Result + two.Result;
}
Advertisements

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out / Change )

Twitter picture

You are commenting using your Twitter account. Log Out / Change )

Facebook photo

You are commenting using your Facebook account. Log Out / Change )

Google+ photo

You are commenting using your Google+ account. Log Out / Change )

Connecting to %s