RSS 2.0

Personal Info:

Joe Send mail to the author(s) is a lead architect on an OS incubation project at Microsoft, and was the architect for Parallel Extensions to .NET. He is an author and frequent speaker.

Blogroll:
Other
News
 C|Net
 Kuro5hin
 The Register
Technology
 <?xmlhack?>
 Daily WTF
 DevX
 Hacknot
 Java Today
 Microsoft Top 10 Downloads
 MSDN
 MSDN: "Longhorn"
 MSDN: XML Developer Center
 Slashdot
 Techdirt
 theserverside.com
 W3C
 Web Pages That Suck
 XML Cover Pages
 XML Journal
 xml.com
Technology Blogs
 Aaron Skonnard [PluralSight]
 Adam Bosworth [Google]
 Andy Rich [MS/C++]
 Arpan Desai [MS/XML]
 BCL Team [MS]
 Bill Clementson [Lisp]
 Bill de hÓra
 Bruce Eckel [J]
 Bruce Tate [J]
 Casey Chestnut
 Cedric Beust [Google]
 Chris Anderson [MS/Avalon]
 Chris Lyon [MS]
 Christian Weyer
 Clemens Vasters [newtelligence]
 Craig Andera [PluralSight]
 Dan Sugalski [Parrot]
 Daniel Cazzulino
 Dave Chappel
 Dave Roberts [Lisp]
 Dave Thomas [PragProg]
 Dave Winer
 Dion Almaer [J]
 Don Demsak
 Doug Purdy [MS/Indigo]
 Drew Marsh
 Eric Gunnerson [MS]
 Eric Rudder [MS]
 Eric Sink
 Fritz Onion [PluaralSight]
 Gavin King [J/Hibernate]
 Grady Booch [IBM]
 Hervey Wilson [MS/Indigo]
 Hillel Cooperman [MS/Shell]
 Howard Lewis Ship [J/Apache]
 Ingo Rammer [PluralSight]
 James Gosling [J/Sun]
 James Strachan [J/Groovy]
 Jason Matusow [MS/OSS]
 Jeffrey Schlimmer [MS/Indigo]
 Joe Beda [Google]
 Joel Spoelsky
 Jon Udell
 Josh Ledgard [MS/Evang]
 Joshua Allen [MS]
 Lambda
 Larry Osterman [MS]
 Maoni Stephens [MS/CLR]
 Mark Fussell [MS/XML]
 Martin Fowler
 Martin Gudgin [MS/Indigo]
 Me
 Michael Howard [MS]
 Miguel de Icaza [Mono]
 Mike Clark
 Omri Gazitt [MS/Indigo]
 Pat Helland [MS/PAG]
 Pinku Surana
 Raymond Chen [MS]
 Rich Lander [MS/CLR]
 Rob Howard
 Rob Relyea [MS/Avalon]
 Robert Cringely
 S. Somasegar [MS/DevDiv]
 Sam Gentile
 Scoble [MS/Evang]
 Scott Guthrie [MS/WebNet]
 Scott Hanselman
 Sean McGrath [J]
 Simon Fell
 Stanley Lippman [MS/C++]
 Steve Maine
 Steve Swartz [MS/Indigo]
 Steve Vinoski
 Steven Clarke [MS/Usability]
 Stuart Halloway
 Ted Leung
 Ted Neward [DM]
 Tim Bray [Sun]
 Tim Ewald [Mindreef]
 Tim O'Reilly
 Werner Vogels [Amazon]
 Wintellect
 Yasser Shohoud [MS/Indigo]
Top 20
 Brad Abrams [MS/CLR]
 Chris Brumme [MS/CLR]
 Chris Sells [MS/Ultra]
 Cyrus Najmabadi [MS/C#]
 Dominic Cooney [MS/XAF]
 Don Box [MS/Ultra]
 Don Syme [MS/R]
 Guido van Rossum [Python]
 Herb Sutter [MS/C++]
 Ian Griffiths
 Jason Zander [MS/CLR]
 Jim Hugunin [MS/CLR]
 Joel Pobar [MS/CLR]
 Krzysztof Cwalina [MS/CLR]
 Patrick Logan
 Paul Graham
 Rico Mariani [MS/CLR]
 Rory Blyth [MS/DN]
 Sam Ruby
 Wesner Moise
VC/Business Blogs
 Ed Sim
 Fred Wilson
 Jonathan Schwartz [J/Sun]
 Lawrence Lessig [Stanford]
 Mark Cuban
 Michael Hyatt
 Pierre Omidyar
 Ross Mayfield
 VentureBlog
 Weekly Read
Wine, Food & Tea
 The Silk Road of Wine
 Vinography: a wine blog
 Wine Whys

Disclaimer:
The content of this site are my own personal opinions and do not represent my employer's view in anyway.

© 2010, Joe Duffy

 
 Saturday, October 31, 2009

Well, Visual Studio 2010 Beta 2 is out on the street.  It contains plenty of neat new things to keep one busy for at least a rainy Saturday.  I proved this today.

Of course, Parallel Extensions is in the box.  .NET 4.0's Task and Task<T> abstractions are used to implement such things as PLINQ and Parallel.For loops, but of course they are great for representing asynchronous work too.  The FromAsync adapters move you from the dark ages of IAsyncResult to the glitzy new space age of tasks.

Not only are tasks tastier than hamburgers, but they enable complex dataflow graphs of asynchronous work to unfold dynamically at runtime, thanks to the ContinueWith method.  From a Task<T> you can get a Task<U> that was computed based on the T; ad infinitum.  We like dataflow.  It is the key to unlocking parallelism, or more accurately, boiling away all else except for dataflow is the key.  But what about control flow, you might ask?  We like it less.  But you can do it, so long as you put in some work.  F#'s async workflows make this sort of thing a tad easier, but the raw libraries in .NET 4.0 don't come with any sort of loops or conditional capabilities.  Perhaps in the future they will.  Nevertheless, in this post I shall demonstrate how to build a couple simple ones.

Not because the lack of them is going to cause unprecidented and unheard of horrors, but rather because in doing so we'll see some neat features of tasks.

The two methods I will illustrate in this post are:

public static class TaskControlFlow

{

    public static Task For(int from, int to, Func<int, Task> body, int width)

    public static Task While(Func<int, bool> condition, Func<int, Task> body, int width)

}

Notice that each body is given the iteration index and is expected to launch asynchronous work and return a Task.  The parameters that these methods take are probably obvious.  Well, except for the last one.  The "width" indicates how many outstanding asynchronous bodies should be in flight at once.  The Task returned by For and While won't be considered done until all iterations are done, and any exceptions will be propagated as you might hope.  It would be pretty useless otherwise.

For example, we could write a while loop that does something very silly:

TaskControlFlow.While(

    i => i < 100,

    i => { return CreateTimerTask(250).ContinueWith(_ => Console.WriteLine(i)); },

    4

).Wait();

This just prints returns a "timer task" that completes after 250ms and prints out the iteration to the console. We pass a width of 4, so only four tasks will be outstanding at any given time.  Notice we call Wait at the end, since both For and While return tasks representing the in flight work.  This could have instead been written using a For loop as follows:

TaskControlFlow.For(0, 100,

    i => { return CreateTimerTask(250).ContinueWith(_ => Console.WriteLine(i)); },

    4

).Wait();

The CreateTimerTask method, by the way, looks like this:

private static Task CreateTimerTask(int ms)

{

    var tcs = new TaskCompletionSource<bool>();

    new Timer(x => ((TaskCompletionSource<bool>)x).SetResult(true), tcs, ms, -1);

    return tcs.Task;

}

As something more realistic, imagine we wanted to do something with a large number of files, and don't want to block a whole bunch of threads in the process.  The following "simple" expression will count up all of the bytes for all of the files in a particular directory, without once blocking the thread -- well, except for the initial call to Directory.GetFiles:

string win = "c:\\...\\";

string[] files = Directory.GetFiles(win);

int total = 0;

TaskControlFlow.For(0, files.Length,

    i => {

        bool eof = false;

        int offset = 0;

        byte[] buff = new byte[4096];

        FileStream fs = File.OpenRead(files[i]);

        return TaskControlFlow.While(

            j => !eof,

            j => Task<int>.Factory.

                FromAsync<byte[],int,int>(

                    fs.BeginRead, fs.EndRead, buff, offset, buff.Length,

                    null, TaskCreationOptions.None

                ).

                ContinueWith(v => {

                    if (eof = v.Result < buff.Length) {

                        fs.Close();

                    }

                    offset += v.Result;

                    Interlocked.Add(ref total, v.Result);

                }),

                1

        );

    },

    8

).Wait();

Console.WriteLine(total);

Pretty neat.  We've somewhat arbitrarily chosen a width of 8 for this loop.  And notice something very subtle but important here: we've chosen a width of 1 for the inner loop that plows through the bytes of a file.  This is because we're sharing state, and it would not be safe to launch numerous iterations at once.  The same byte[], eof variable, and so forth, would become corrupt.  I will mention in passing that it's unfortunate that we've got that interlocked stuck in there to add to the total.  Refactoring this so that we could just do a LINQ reduce over the whole thing would be nice.  Indeed, it can be done.

We can do away with the For implementation very quickly.  It is just implemented in terms of While:

public static Task For(int from, int to, Func<int, Task> body, int width)

{

    return While(i => from + i < to, body, width);

}

And it turns out that the While implementation is not terribly complicated either.  Here it is:

public static Task While(Func<int, bool> condition, Func<int, Task> body, int width)

{

    var tcs = new TaskCompletionSource<bool>();

 

    int currIx = -1; // Current shared index.

    int currCount = width; // The number of outstanding tasks.

 

    int canceled = 0; // 1 if at least one body was cancelled.

    ConcurrentBag<Exception> exceptions = null; // A collection of exceptions, if any.

 

    // Generate a continuation action: this fires for each body that completes.

    Action<Task> fcont = null;

    fcont = tsk => {

        if (tsk.IsFaulted) {

            // Accumulate exceptions.

            LazyInitializer.EnsureInitialized(ref exceptions);

            foreach (Exception inner in tsk.Exception.InnerExceptions) {

                exceptions.Add(inner);

            }

        }

        else if (tsk.IsCanceled) {

            // Mark that cancellation has occurred.

            canceled = 1;

        }

        else if (canceled == 0 && exceptions == null) {

            // If no cancellations / exceptions are found, attempt to kick off more work.

            int ix = Interlocked.Increment(ref currIx);

            if (condition(ix)) {

                // Generate a new body task, handling exceptions.  Then make sure we

                // tack on the continuation on that new task, so we can keep on going...

                // If the condition yielded 'false', we'll simply fall through and try to finish.

                Task btsk;

                try {

                    btsk = body(ix);

                }

                catch (Exception ex) {

                    btsk = AlreadyFaulted(ex);

                }

                btsk.ContinueWith(fcont);

                return;

            }

        }

 

        // If this is the last task, signal completion.

        if (Interlocked.Decrement(ref currCount) == 0) {

            if (exceptions != null) {

                tcs.SetException(exceptions);

            }

            else if (canceled == 1) {

                tcs.SetCanceled();

            }

            else {

                tcs.SetResult(true);

            }

        }

    };

 

    // Fire off the right number of starting tasks.

    for (int i = 0; i < width; i++) {

        AlreadyDone.ContinueWith(fcont);

    }

 

    return tcs.Task;

}

I've commented the code inline to illustrate what is going on.  The only other part that isn't shown are the AlreadyDone and AlreadyFaulted members, which simply give Tasks that are already in a final state.  This isn't strictly necessary, but come in handy in a number of situations:

internal static Task AlreadyDone;

 

static TaskControlFlow()

{

    var tcs = new TaskCompletionSource<bool>();

    tcs.SetResult(true);

    AlreadyDone = tcs.Task;

}

 

private static Task AlreadyFaulted(Exception ex)

{

    var tcs = new TaskCompletionSource<bool>();

    tcs.SetException(ex);

    return tcs.Task;

}

And that's it.  I'm done for now.  Hope you enjoyed it.  I've got a few other posts in the works -- primarily the result of a day full of hacking (I got in the office at 7am this morning, and have been here ever since, 14 hours later) -- demonstrating how to do speculative asynchronous work for if/else branches.  Finally, I also have a neat example that illustrates how to do deep dataflow-based speculation without having to wait for work to complete.  This combines the new .NET 4.0 dynamic capabilities with parallelism, so I'm pretty excited to get it working and write about it.

10/31/2009 8:06:24 PM (Pacific Daylight Time, UTC-08:00)  #    Comments [0]

 

Recent Entries:

Search:

Browse by Date:
<November 2009>
SunMonTueWedThuFriSat
25262728293031
1234567
891011121314
15161718192021
22232425262728
293012345

Browse by Category:

Notables:

Currently Up To:

Reading...

Listening...

Watching...