|
Personal Info:
Joe  leads the architecture of an experimental OS's developer platform, where
he is also chief architect of its programming language. His current mission is to enable
writing large-scale software that is reliable, secure, and scalable by-construction. Before this, Joe
founded the Parallel Extensions to .NET project.
He has been granted 19 patents, with 49 pending. When not working, Joe enjoys travelling with his wife,
writing books, writing music,
studying music theory & mathematics, and doing anything involving food & wine.
My books
My music
Disclaimer:
The content of this site are my own personal opinions and do
not represent my employer's view in anyway.
© 2012, Joe Duffy
|
|
 Friday, September 26, 2008
I just returned from TechEd Australia, which was a lot of fun.
I have a fair number of additional speaking engagements coming up:
As of the PDC the book will also be readily available. Wahoo!
If you'll be at any of the conferences and want to meet up, please drop me a line.
 Sunday, September 21, 2008
The enumeration pattern in .NET unfortunately implies some overhead that makes it difficult to compete with ordinary for loops. In fact, the difference between
T[] a = …;
for (int i = 0, c = a.Length; i < c; i++) …action(a[i])…;
and
T[] a = …;
IEnumerator<int> ae = ((IEnumerable<T>)a).GetEnumerator();
while (ae.MoveNext()) …action(ae.Current)…;
is about 3X. That is, the former is 1/3rd the expense of the latter, in terms of raw enumeration overhead. Clearly as action becomes more expensive the significance of this overhead lessens. But if your plan is to invoke a small action over a large number of elements, using an enumerator instead of indexing directly into the array could in fact cause your algorithm to take 3X longer to finish.
There are many reasons for this problem. They are probably obvious. Using an enumerator requires at least two interface method calls just to extract a single element from the array. Because there are O(length) number of these operations, the overhead imposed will be O(length) as well. Contrast that with the nice, compact for loop, which emits ldarg IL instructions that access the array directly. This will end up computing some offset (e.g., i * sizeof(T)) and dereferencing right into the array memory. The enumerator needs to do that, of course, but only after the two interface calls are made. Additionally, it is possible for the JIT compiler to omit the bounds check on the array access if it knows ‘c’ in the predicate ‘i < c’ was computed from ‘a.Length’, because arrays in .NET are immutable and their size cannot change.
(Strangely, it appears going through IList<T> is even slower than enumeration. In fact, it appears to be more than 3X the cost of going through IList<T>’s enumerator, and over 10X that of indexing into the array using true ldarg instructions instead of interface calls to IList<T>’s element indexer.)
All of this actually makes it somewhat difficult for those on my team building PLINQ to compete with hand written programs. That’s true of LINQ generally. In fact, LINQ tends to be worse, because you string several enumerators together to form a query, often leading to even more overhead attributed to enumeration. So you might reasonably wonder: if people care about performance, then why would they willingly start off 3X “in the hole” in hopes that they will eventually gain it back when they use machines with >= 4 cores? It’s a completely fair criticism (although you must recall that everything I’m talking about is “pure overhead” and once you begin to have sizable computations in the per-element action it matters less and less). We continually do a lot of work to try to recoup these costs.
There are actually many alternative enumeration models, and I think .NET needs to change direction in the future. In addition to the overhead associated with the pattern, .NET’s enumeration pattern is a “pull” model (versus “push”), which makes it incredibly hard to tolerate blocking within calls to MoveNext. Over time, I think we will need to pursue the push model more seriously.
I’ve thrown together a few different examples of alternative enumeration techniques. To cut to the chase, here is a simple micro-benchmark test that enumerates over 1,000,000 elements 25 times, invoking an empty (non-inlineable) method for each element. The per-element work here is quite small (although not empty) and so the results are a bit more extreme than a real workload would show:
For loop (int[]) 739255 tcks % of baseline
For loop (IList<int>) 7534609 tcks 1019.216%
ForEach loop (int[]) 829617 tcks 112.2234%
int[] IEnumerator<int> 2152414 tcks 291.1599%
IEnumerator<int> 2062876 tcks 279.048%
IFastEnumerator<int> 1758992 tcks 237.9412%
IForEachable<int> [s] 1103745 tcks 149.305%
IForEachable<int> [i] 976742 tcks 132.1252%
IForEachable2<int> 957883 tcks 129.5741%
These are:
- “For loop (int[])” is an ordinary for loop over the array directly.
- “For loop (IList<int>)” is an ordinary for loop over the array’s IList<T> interface.
- “ForEach loop (int[])” is an ordinary foreach loop over the array directly.
- “int[] IEnumerator<int>” uses the array’s implementation of IEnumerator<T>.
- “IEnumerator<int>” is a custom IEnumerator<T> implementation.
- “IFastEnumerator<int>” is an implementation of new pull interface (defined below).
- “IForEachable<int>” is an implementation of a new push interface (defined below) that uses delegates to represent the per-element action. The only difference between the “[s]” and “[i]” variants are that the delegate is bound to a static method for “[s]” and an instance method for “[i]”.
- “IForEachable2<int>” is a slight variant of IForEachable<T> (also defined below).
Notice that with IForEachable2<T>, we’ve gotten within 30% of the efficient for loop. Unfortunately, I do get somewhat different numbers when compiling with the /o+ switch:
For loop (int[]) 777746 tcks % of baseline
For loop (IList<int>) 7569517 tcks 973.2634%
ForEach loop (int[]) 735846 tcks 94.61264%
int[] IEnumerator<int> 2340361 tcks 300.9159%
IEnumerator<int> 2063039 tcks 265.2587%
IFastEnumerator<int> 1806568 tcks 232.2825%
IForEachable<int> [s] 1090644 tcks 140.2314%
IForEachable<int> [i] 946090 tcks 121.6451%
IForEachable2<int> 1234201 tcks 158.6895%
For comparison purposes, I get numbers like this if the loop body is completely empty except for accessing the current element:
For loop (int[]) 452039 tcks % of baseline
For loop (IList<int>) 422732 tcks 93.51671%
ForEach loop (int[]) 461274 tcks 102.043%
int[] IEnumerator<int> 1958711 tcks 433.3058%
IEnumerator<int> 1730502 tcks 382.8214%
IFastEnumerator<int> 1372421 tcks 303.6068%
IForEachable<int> [s] 1091720 tcks 241.5101%
IForEachable<int> [i] 958401 tcks 212.0173%
IForEachable2<int> 664572 tcks 147.0165%
And this (with /o+):
For loop (int[]) 262146 tcks % of baseline
For loop (IList<int>) 263302 tcks 100.441%
ForEach loop (int[]) 372924 tcks 142.2581%
int[] IEnumerator<int> 1889132 tcks 720.6412%
IEnumerator<int> 1635837 tcks 624.0175%
IFastEnumerator<int> 1479579 tcks 564.4103%
IForEachable<int> [s] 1096712 tcks 418.3592%
IForEachable<int> [i] 962261 tcks 367.0706%
IForEachable2<int> 698340 tcks 266.3935%
These numbers aren’t quite as meaningful because we have no idea what’s being optimized away by the C# and JIT compilers. For example, they may notice we’re not using the current element at all and therefore eliminate the access altogether. Nevertheless, the relative ranking of efficiency has remained nearly the same (with the notable exception of the array’s IList<T> test being much less worse).
(All of these numbers were gathered on a 32-bit OS on a 64-bit machine. Because the JIT compilers for 32-bit and 64-bit are so different, you can expect vastly different results across architectures.)
Anyway, here is what IFastEnumerator<T>, IForEachable<T>, and IForEachable2<T> look like:
interface IFastEnumerable<T>
{
IFastEnumerator<T> GetEnumerator();
}
interface IFastEnumerator<T>
{
bool MoveNext(ref T elem);
}
interface IForEachable<T>
{
void ForEach(Action<T> action);
}
interface IForEachable2<T>
{
void ForEach(Functor<T> functor);
}
abstract class Functor<T>
{
public abstract void Invoke(T t);
}
I also have a data type called SimpleList<T> that implements each of these, including IEnumerable<T>. This is what the test harness uses for its benchmarking. So any boneheaded mistakes I’ve made in the implementation of this class could cause us to draw the wrong conclusions about the interfaces themselves. Hopefully there are none:
class SimpleList<T> :
IEnumerable<T>, IFastEnumerable<T>, IForEachable<T>, IForEachable2<T>
{
private T[] m_array;
public SimpleList(T[] array) { m_array = array; }
// Etc …
}
The class of course implements IEnumerable<T> in the standard way:
IEnumerator<T> IEnumerable<T>.GetEnumerator()
{
return new ClassicEnumerable(m_array);
}
System.Collections.IEnumerator System.Collections.IEnumerable.GetEnumerator()
{
return new ClassicEnumerable(m_array);
}
class ClassicEnumerable : IEnumerator<T>
{
private T[] m_a;
private int m_index = -1;
internal ClassicEnumerable(T[] a) { m_a = a; }
public bool MoveNext() { return ++m_index < m_a.Length; }
public T Current { get { return m_a[m_index]; } }
object System.Collections.IEnumerator.Current { get { return Current; } }
public void Reset() { m_index = -1; }
public void Dispose() { }
}
The idea behind IFastEnumerable<T> (and specifically IFastEnumerator<T>) is to return the current element during the call to MoveNext itself. This cuts the number of interface method calls necessary to enumerate a list in half. The impact to performance isn’t huge, but it was enough to cut our overhead from about 3X to 2.3X. Every little bit counts:
IFastEnumerator<T> IFastEnumerable<T>.GetEnumerator()
{
return new FastEnumerable(m_array);
}
class FastEnumerable : IFastEnumerator<T>
{
private T[] m_a;
private int m_index = -1;
internal FastEnumerable(T[] a) { m_a = a; }
public bool MoveNext(ref T elem)
{
if (++m_index >= m_a.Length)
return false;
elem = m_a[m_index];
return true;
}
}
(Update: after writing the blog post, I made a couple slight optimizations that make this a bit tighter (fewer field fetches):
class FastEnumerable : IFastEnumerator<T>
{
private T[] m_a;
private int m_index = -1;
internal FastEnumerable(T[] a) { m_a = a; }
public bool MoveNext(ref T elem)
{
T[] a = m_a;
int i;
if ((i = ++m_index) >= a.Length)
return false;
elem = a[i];
return true;
}
}
The impact to performance isn't huge, but does improve the performance to about 2.1X of the baseline.)
The IForEachable<T> interface is a push model in the sense that the caller provides a delegate and the ForEach method is responsible for invoking it once per element in the collection. ForEach doesn’t return until this is done. In addition to having far fewer method calls to enumerate a collection, there isn’t a single interface method call. Delegate dispatch is also much faster than interface method dispatch. The result is nearly twice as fast as the classic IEnumerator<T> pattern (when /o+ isn’t defined). Now we’re really getting somewhere!
void IForEachable<T>.ForEach(Action<T> action)
{
T[] a = m_array;
for (int i = 0, c = a.Length; i < c; i++)
action(a[i]);
}
Delegate dispatch still isn’t quite the speed of virtual method dispatch. And delegates bound to static methods are actually slightly slower than those bound to instance methods, which is why you’ll notice a slight difference in the original “[s]” versus “[i]” measurements. The reason is subtle. There is a delegate dispatch stub that is meant to call the target method: when the delegate refers to an instance method, the ‘this’ reference pushed in EAX points to the delegate object when it is invoked and the stub can simply replace it with the target object and jump; for static methods, however, all of the arguments need to be “shifted” downward, because there is no ‘this’ reference to be passed and therefore the first actual argument to the static method must take the place of the current value in EAX.
The IForEachable2<T> interface just replaces delegate calls with virtual method calls. Somebody calling it will pass an instance of the Functor<T> class with the Invoke method overridden. The implementation of ForEach then looks quite a bit like IForEachable<T>’s, just with virtual method calls in place of delegate calls:
void IForEachable2<T>.ForEach(Functor<T> functor)
{
T[] a = m_array;
for (int i = 0, c = a.Length; i < c; i++)
functor.Invoke(a[i]);
}
And that’s it. Here is the program that drives the little micro-benchmark tests that I showed output for at the beginning:
class Program
{
public static void Main()
{
const int size = 2500000;
Random r = new Random();
int[] array = new int[size];
for (int i = 0; i < size; i++) array[i] = r.Next();
SimpleList<int> list = new SimpleList<int>(array);
const int iters = 25;
long baseline = 0;
GC.Collect();
GC.WaitForPendingFinalizers();
// Regular for loop
{
Stopwatch sw = Stopwatch.StartNew();
for (int i = 0; i < iters; i++)
{
for (int j = 0, c = array.Length; j < c; j++)
DoNothing(array[j]);
}
baseline = sw.ElapsedTicks;
Console.WriteLine("For loop (int[])\t{0} tcks\t% of baseline", baseline);
}
// Regular for loop (IList<int>)
{
Stopwatch sw = Stopwatch.StartNew();
IList<int> ia = array;
for (int i = 0; i < iters; i++)
{
for (int j = 0, c = ia.Count; j < c; j++)
DoNothing(ia[j]);
}
long elapsed = sw.ElapsedTicks;
Console.WriteLine("For loop (IList<int>)\t{0} tcks\t{1}%",
elapsed, 100*(elapsed / (float)baseline));
}
GC.Collect();
GC.WaitForPendingFinalizers();
// Regular foreach loop
{
Stopwatch sw = Stopwatch.StartNew();
for (int i = 0; i < iters; i++)
{
foreach (int x in array)
DoNothing(x);
}
long elapsed = sw.ElapsedTicks;
Console.WriteLine("ForEach loop (int[])\t{0} tcks\t{1}%",
elapsed, 100 * (elapsed / (float)baseline));
}
GC.Collect();
GC.WaitForPendingFinalizers();
// Regular foreach loop
{
Stopwatch sw = Stopwatch.StartNew();
for (int i = 0; i < iters; i++)
{
IEnumerator<int> e = ((IEnumerable<int>)array).GetEnumerator();
while (e.MoveNext())
DoNothing(e.Current);
}
long elapsed = sw.ElapsedTicks;
Console.WriteLine("int[] IEnumerator<int>\t{0} tcks\t{1}%",
elapsed, 100 * (elapsed / (float)baseline));
}
// IEnumerator<T>
{
Stopwatch sw = Stopwatch.StartNew();
for (int i = 0; i < iters; i++)
{
IEnumerator<int> e = ((IEnumerable<int>)list).GetEnumerator();
while (e.MoveNext())
DoNothing(e.Current);
}
long elapsed = sw.ElapsedTicks;
Console.WriteLine("IEnumerator<int>\t{0} tcks\t{1}%",
elapsed, 100 * (elapsed / (float)baseline));
}
GC.Collect();
GC.WaitForPendingFinalizers();
// IFastEnumerator<T>
{
Stopwatch sw = Stopwatch.StartNew();
for (int i = 0; i < iters; i++)
{
int x = 0;
IFastEnumerator<int> e = ((IFastEnumerable<int>)list).GetEnumerator();
while (e.MoveNext(ref x))
DoNothing(x);
}
long elapsed = sw.ElapsedTicks;
Console.WriteLine("IFastEnumerator<int>\t{0} tcks\t{1}%",
elapsed, 100 * (elapsed / (float)baseline));
}
GC.Collect();
GC.WaitForPendingFinalizers();
// IForEachable<T>
{
Stopwatch sw = Stopwatch.StartNew();
for (int i = 0; i < iters; i++)
{
Action<int> act = new Action<int>(DoNothing);
((IForEachable<int>)list).ForEach(act);
}
long elapsed = sw.ElapsedTicks;
Console.WriteLine("IForEachable<int> [s]\t{0} tcks\t{1}%",
elapsed, 100 * (elapsed / (float)baseline));
}
GC.Collect();
GC.WaitForPendingFinalizers();
// IForEachable<T>
{
Stopwatch sw = Stopwatch.StartNew();
for (int i = 0; i < iters; i++)
{
DoNothingClosure dnc = new DoNothingClosure();
Action<int> act = new Action<int>(dnc.DoNothing);
((IForEachable<int>)list).ForEach(act);
}
long elapsed = sw.ElapsedTicks;
Console.WriteLine("IForEachable<int> [i]\t{0} tcks\t{1}%",
elapsed, 100 * (elapsed / (float)baseline));
}
GC.Collect();
GC.WaitForPendingFinalizers();
// IForEachable2<T>
{
Stopwatch sw = Stopwatch.StartNew();
for (int i = 0; i < iters; i++)
{
DoNothingFunctor dnf = new DoNothingFunctor();
((IForEachable2<int>)list).ForEach(dnf);
}
long elapsed = sw.ElapsedTicks;
Console.WriteLine("IForEachable2<int>\t{0} tcks\t{1}%",
elapsed, 100 * (elapsed / (float)baseline));
}
}
[System.Runtime.CompilerServices.MethodImpl(
System.Runtime.CompilerServices.MethodImplOptions.NoInlining)]
private static void DoNothing(int x) { }
class DoNothingClosure
{
[System.Runtime.CompilerServices.MethodImpl(
System.Runtime.CompilerServices.MethodImplOptions.NoInlining)]
public void DoNothing(int x) { }
}
class DoNothingFunctor : Functor<int>
{
public override void Invoke(int x) { DoNothing(x); }
}
}
To summarize, .NET enumeration costs something over typical for loops that index straight into arrays. Most programs needn’t worry about these kinds of overheads. If you’re accessing a database, manipulating a large complicated object, or what have you, inside of the individual iterations, then the overheads we’re talking about here are miniscule. In fact, walking 1,000,000 elements is in the microsecond range for all of the benchmarks I showed, even the slowest ones. So none of this is anything to lose sleep over. But if you have a closed system that controls all of its enumeration, it may be worth doing some targeted replacement of enumerators with the more efficient patterns, particularly if you tend to enumerate lots and lots of elements lots and lots of times in your program.
 Wednesday, September 17, 2008
In part 2 of this series, I described a new work stealing queue data structure used for work item management. This structure allows us to push and pop elements into a thread-local work queue without heavy-handed synchronization. Moreover, this distributed a large amount of the scheduling responsibility across the threads (and hence processors). The result is that, for recursively queued work items, scalability is improved and pressure on the typical bottleneck in a thread pool (i.e., the global lock) is alleviated.
What we didn’t do last time was actually integrate the new queue into the thread pool that was shown in part 1. This extension is actually somewhat simple. We’ll continue to use the IThreadPool interface so that we can easily harness and benchmark the various thread pool implementations against each other.
We’ll add a new class LockAndWsqThreadPool, which mimics the design of the original SimpleLockThreadPool class. We’ll only need to add two fields to it:
- private WorkStealingQueue<WorkItem>[] m_wsQueues: This is an array of queues—one per thread in the pool—that will be used to store recursively queued work.
- [ThreadStatic] private static WorkStealingQueue<WorkItem> m_wsq: This represents the unique work stealing queue for a particular thread in the pool.
OK, so with these extensions there are clearly three specific changes we need to make:
- A new thread pool thread needs to allocate its work stealing queue.
- When queuing a new work item, we must check to see if we’re on a pool thread. If so, we will queue the work item into the work stealing queue instead of the global queue.
- When a pool thread looks for work, it needs to:
- First consult its local work stealing queue.
- If that fails, it then looks at the global queue.
- Lastly, if that fails, it needs to steal from other work stealing queues.
Let’s review each one individually. Later we’ll see the full code.
#1 is handled in the DispatchLoop function:
private WorkStealingQueue<WorkItem>[] m_wsQueues = new WorkStealingQueue<WorkItem>[Environment.ProcessorCount];
private void DispatchLoop() { // Register a new WSQ. WorkStealingQueue<WorkItem> wsq = new WorkStealingQueue<WorkItem>(); m_wsq = wsq; // Store in TLS. AddWsq(wsq);
try { /* a whole bunch of stuff … */ } finally { Remove(wsq); } }
private void AddWsq(WorkStealingQueue<WorkItem> wsq) { lock (m_wsQueues) { for (int i = 0; i < m_wsQueues.Length; i++) { if (m_wsQueues[i] == null) { m_wsQueues[i] = wsq; } else if (i == m_wsQueues.Length - 1) { WorkStealingQueue<WorkItem>[] queues = new WorkStealingQueue<WorkItem>[m_wsQueues.Length*2]; Array.Copy(m_wsQueues, queues, i+1); queues[i+1] = wsq; m_wsQueues = queues; } } } } private void RemoveWsq(WorkStealingQueue<WorkItem> wsq) { lock (m_wsQueues) { for (int i = 0; i < m_wsQueues.Length; i++) { if (m_wsQueues[i] == wsq) { m_wsQueues[i] = null; } } } }
#2, of course, happens within the QueueUserWorkItem function:
public void QueueUserWorkItem(WaitCallback work, object obj) { WorkItem wi = …; /* as before … */
// Now insert the work item into the queue, possibly waking a thread. WorkStealingQueue<WorkItem> wsq = m_wsq; if (wsq != null) { // Single TLS to determine if we're on a pool thread. wsq.LocalPush(wi); if (m_threadsWaiting > 0) // OK to read lock-free. lock (m_queue) { Monitor.Pulse(m_queue); } } else { /* as before… queue to the global queue */ } }
Lastly, #3 is the most complicated. Searching the local queue is done with a call to wsq.LocalPop. If that fails, the work stealing queue is empty, and the logic then looks a lot like the original thread pool’s dispatch loop logic in that we then look for work in the global queue. If that fails, we will just iterate over the other threads’ work stealing queues, doing a TrySteal operation. If none of them had work, we go back the global queue, try again, and then finally wait for work to arrive. (See the full code sample below for details.) Notice that there’s a fairly tricky race condition here that we’re leaving unhandled: if we search for work, try to steal, and ultimately find no work, we will then embark on a trip back to the global queue; during this trip, another pool thread might recursively queue work into its work stealing queue and we will miss it. Generally speaking, this is OK because that thread will eventually get to it (presumably) but with some clever synchronization trickery we can actually handle this case. Perhaps I will show such a solution in a subsequent part in this series.
Anyway, what we’re left with is code that looks something like this:
public class LockAndWsqThreadPool : IThreadPool { // Constructors-- // Two things may be specified: // ConcurrencyLevel == fixed # of threads to use // FlowExecutionContext == whether to capture & flow ExecutionContexts for work items public LockAndWsqThreadPool() : this(Environment.ProcessorCount, true) { } public LockAndWsqThreadPool(int concurrencyLevel) : this(concurrencyLevel, true) { } public LockAndWsqThreadPool(bool flowExecutionContext) : this(Environment.ProcessorCount, flowExecutionContext) { } public LockAndWsqThreadPool(int concurrencyLevel, bool flowExecutionContext) { if (concurrencyLevel <= 0) throw new ArgumentOutOfRangeException("concurrencyLevel"); m_concurrencyLevel = concurrencyLevel; m_flowExecutionContext = flowExecutionContext; // If suppressing flow, we need to demand permissions. if (!flowExecutionContext) new SecurityPermission(SecurityPermissionFlag.Infrastructure).Demand(); } // Each work item consists of a closure: work + (optional) state obj + context. struct WorkItem { internal WaitCallback m_work; internal object m_obj; internal ExecutionContext m_executionContext; internal WorkItem(WaitCallback work, object obj) { m_work = work; m_obj = obj; m_executionContext = null; } internal void Invoke() { // Run normally (delegate invoke) or under context, as appropriate. if (m_executionContext == null) m_work(m_obj); else ExecutionContext.Run(m_executionContext, s_contextInvoke, this); } private static ContextCallback s_contextInvoke = delegate(object obj) { WorkItem wi = (WorkItem)obj; wi.m_work(wi.m_obj); }; } private readonly int m_concurrencyLevel; private readonly bool m_flowExecutionContext; private readonly System.Collections.Queue m_queue = new System.Collections.Queue(); private WorkStealingQueue<WorkItem>[] m_wsQueues = new WorkStealingQueue<WorkItem>[Environment.ProcessorCount]; private Thread[] m_threads; private int m_threadsWaiting; private bool m_shutdown; [ThreadStatic] private static WorkStealingQueue<WorkItem> m_wsq; // Methods to queue work. public void QueueUserWorkItem(WaitCallback work) { QueueUserWorkItem(work, null); } public void QueueUserWorkItem(WaitCallback work, object obj) { WorkItem wi = new WorkItem(work, obj); // If execution context flowing is on, capture the caller's context. if (m_flowExecutionContext) wi.m_executionContext = ExecutionContext.Capture(); // Make sure the pool is started (threads created, etc). EnsureStarted(); // Now insert the work item into the queue, possibly waking a thread. WorkStealingQueue<WorkItem> wsq = m_wsq; if (wsq != null) { // Single TLS to determine if we're on a pool thread. wsq.LocalPush(wi); if (m_threadsWaiting > 0) // OK to read lock-free. lock (m_queue) { Monitor.Pulse(m_queue); } } else { lock (m_queue) { m_queue.Enqueue(wi); if (m_threadsWaiting > 0) Monitor.Pulse(m_queue); } } } // Ensures tha threads have begun executing. private void EnsureStarted() { if (m_threads == null) { lock (m_queue) { if (m_threads == null) { m_threads = new Thread[m_concurrencyLevel]; for (int i = 0; i < m_threads.Length; i++) { m_threads[i] = new Thread(DispatchLoop); m_threads[i].Start(); } } } } } private void AddWsq(WorkStealingQueue<WorkItem> wsq) { lock (m_wsQueues) { for (int i = 0; i < m_wsQueues.Length; i++) { if (m_wsQueues[i] == null) { m_wsQueues[i] = wsq; } else if (i == m_wsQueues.Length - 1) { WorkStealingQueue<WorkItem>[] queues = new WorkStealingQueue<WorkItem>[m_wsQueues.Length*2]; Array.Copy(m_wsQueues, queues, i+1); queues[i+1] = wsq; m_wsQueues = queues; } } } } private void RemoveWsq(WorkStealingQueue<WorkItem> wsq) { lock (m_wsQueues) { for (int i = 0; i < m_wsQueues.Length; i++) { if (m_wsQueues[i] == wsq) { m_wsQueues[i] = null; } } } } // Each thread runs the dispatch loop. private void DispatchLoop() { // Register a new WSQ. WorkStealingQueue<WorkItem> wsq = new WorkStealingQueue<WorkItem>(); m_wsq = wsq; // Store in TLS. AddWsq(wsq); try { while (true) { WorkItem wi = default(WorkItem); // Search order: (1) local WSQ, (2) global Q, (3) steals. if (!wsq.LocalPop(ref wi)) { bool searchedForSteals = false; while (true) { lock (m_queue) { // If shutdown was requested, exit the thread. if (m_shutdown) return; // (2) try the global queue. if (m_queue.Count != 0) { // We found a work item! Grab it ... wi = (WorkItem)m_queue.Dequeue(); break; } else if (searchedForSteals) { m_threadsWaiting++; try { Monitor.Wait(m_queue); } finally { m_threadsWaiting--; } // If we were signaled due to shutdown, exit the thread. if (m_shutdown) return; searchedForSteals = false; continue; } } // (3) try to steal. WorkStealingQueue<WorkItem>[] wsQueues = m_wsQueues; int i; for (i = 0; i < wsQueues.Length; i++) { if (wsQueues[i] != wsq && wsQueues[i].TrySteal(ref wi)) break; } if (i != wsQueues.Length) break; searchedForSteals = true; } } // ...and Invoke it. Note: exceptions will go unhandled (and crash). wi.Invoke(); } } finally { RemoveWsq(wsq); } } // Disposing will signal shutdown, and then wait for all threads to finish. public void Dispose() { m_shutdown = true; if (m_queue != null) { lock (m_queue) { Monitor.PulseAll(m_queue); } for (int i = 0; i < m_threads.Length; i++) m_threads[i].Join(); } } }
I have a little harness that measures the throughput of the different thread pool implementations for varying degrees of recursively queued work. I’ll share this out too in a subsequent part in this series, once we have a few more variants to pit against each other. Anyway, as you’d imagine, there is very little difference between LockAndWsqThreadPool and SimpleLockThreadPool when all work is queued from external (non-pool) threads. However, when I queue 10,000 items externally and, from each of those, queue 100 items recursively, I see a 3X throughput improvement on my four core machine. When I queue 100 items externally and, from each of those, queue 10,000 items recursively, the improvement is more than 8X. And so on. As the number of cores increases, the improvement only becomes greater.
Another aspect not shown—because of the very limited QueueUserWorkItem-style API we’re building on—is something called “wait inlining.” We do this in TPL. When you recursively queue work items in a divide-and-conquer kind of problem, there’s often more latent parallelism than will be realized. Instead of requiring all of that parallelism to consume a thread, and blocking each time a work item is waited on, we can run work items inline if they haven’t started yet.
One easy way to do this is to limit inlining to only threads that do so from their own local work stealing queue. Because we are guaranteed the local pop/push methods won’t interleave with such inlines, we can just acquire the stealing lock and search the list for the particular element, e.g.:
public bool Remove(T obj) { for (int i = m_tailIndex - 1; i > m_headIndex; i--) { if (m_array[i & m_mask] == obj) { lock (m_foreignLock) { if (m_array[i & m_mask] != obj) return false; // lost a race.
// Adjust indices or leave a null in our wake. if (i == m_tailIndex - 1) m_tailIndex--; else if (i == m_headIndex + 1) m_headIndex++; else m_array[i & m_mask] = null;
return true; } }
return false; } }
This is just a new method on the WorkStealingQueue<T> data structure. This requires that the local and foreign pop methods now check for null values and restart the relevant operation should one be found, because of the work item to be removed is not the head or tail item we cannot prevent subsequent removals from seeing it (i.e., the indices must remain the same).
Next time, in part 4 of this series, we’ll take a look at what it takes to share threads among multiple instances of the LockAndWsqThreadPool class. This allows many pools to be created within a single AppDomain without requiring entirely separate sets of threads to service each one of them. This capability enables you to isolate different work queues from one another, to ensure that certain components aren’t starved by other (potentially misbehaving) ones.
 Saturday, September 13, 2008
Most programs are tangled webs of data and control dependencies. For sequential programs, this doesn’t matter much aside from putting constraints on the legal optimizations available to a compiler. But it gets worse. Imperative programs today are also full of side-effect dependencies. Unlike data and control dependence—which most compilers can identify and understand the semantics of (aliasing aside)—side-effect dependencies are hidden and the semantic meaning of them is entirely ad-hoc. These can include scribbling to shared memory, writing to the disk, or printing to the console.
One of my goals is to push programming languages in the direction of full disclosure of all kinds of dependencies. I believe this will eventually help to foster ubiquitous parallelism. These dependencies, after all, are what inherently limit the latent parallelism in a program and are “real” in the sense that they are typically algorithmic. I would prefer that developers think about how to modify or rewrite their algorithm to eliminate any unnecessary dependencies, and also to be clever about eliminating necessary ones, rather than trying to navigate a minefield of dependencies that are implicit, undocumented, and often hard to understand. Our tools should be oriented towards aiding such endeavors.
That’s not to say that knowing about dependencies will immediately make all programs parallel programs. Research in automatic parallelism for purely functional languages like Haskell has shown that this is a naïve point of view. My belief is that this is a key step along the path, however. With it new models and patterns can emerge that reduce dependencies so that parallelism can be introduced without accidentally violating subtle and hidden dependencies, causing races.
The biggest question left unanswered in my mind is the role state will play in software of the future.
That seems like an absurd statement, or a naïve one at the very least. State is everywhere:
- The values held in memory.
- Data locally on disk.
- Data in-flight that is being sent over a network.
- Data stored in the cloud, including on a database, remote filesystem, etc.
Certainly all of these kinds of state will continue to exist far into the future. Data is king, and is one major factor that will drive the shift to parallel computing. The question then is how will concurrent programs interact with this state, read and mutate it, and what isolation and synchronization mechanisms are necessary to do so?
I’ve been working on or around software transactional memory (STM) for over 3 years now. Many think it’s a panacea, and it has been held up as somewhat of a “last hope for mankind” kind of technology. As with anything, it’s best to temper the enthusiasm with some realism. Things are never so simple. STM will be one tool (of many) in the toolkit of programmers writing the next generation of concurrent code. In fact, I have over time come to believe that it’s one of the least radical ones that we need. This is probably bad news, given the vast number of difficulties the community has uncovered in our attempts to efficiently and correctly implement an STM system.
Many programs have ample gratuitous dependencies, simply because of the habits we’ve grown accustomed to over 30 odd years of imperative programming. Our education, mental models, books, best-of-breed algorithms, libraries, and languages all push us in this direction. We like to scribble intermediary state into shared variables because it’s simple to do so and because it maps to our von Neumann model of how the computer works.
We need to get rid of these gratuitous dependencies. Merely papering over them with a transaction—making them “safe”—doesn’t do anything to improve the natural parallelism that a program contains. It just ensures it doesn’t crash. Sure, that’s plenty important, but providing programming models and patterns to eliminate the gratuitous dependencies also achieves the goal of not crashing but with the added benefit of actually improving scalability too. Transactions have worked so well in enabling automatic parallelism in databases because the basic model itself (without transactions) already implies natural isolation among queries. Transactions break down and scalability suffers for programs that aren’t architected in this way. We should learn from the experience of the database community in this regard.
There is a kind of natural taxonomy for the structure concurrent programs:
A. Agents, where isolation is king and interactions are loosely coupled. This is classically referred to as “message passing”, but there are many different reifications of this idea that expose the idea of messages differently: actors (e.g., as in Scheme circa 1980’s), active objects, Ada tasks, Erlang processes, web services, and so on.
B. Task parallelism, where logically independent activities (from a dependence point of view) may be run concurrently. This can range from coarse- to fine-grained, but is normally fixed in number.
C. Data parallelism, in which data drives the coarseness of concurrency.
There is also a natural taxonomy for the way concurrent programs manipulate state:
1. At a coarse-grained level, any changes to state are committed via transactions.
2. At a fine-grained level, all computations are purely functional and without side-effects.
You’ll notice a nice correlation between { (A) & (1) }, and { (B), (C), & (2) }.
And you’ll also notice that I explicitly didn’t mention mutable shared state at all, except for implying mutations would only occur at a coarse granularity and with transactions. This is an oversimplification. Even within the fine-grained computations, guaranteed isolation can allow computations to allocate new state and manipulate it in a myriad of ways. The key here is that the state must be guaranteed to be isolated, and that within such pockets of guaranteed isolation familiar imperative programming can be used. This spans graphs of structured task and data parallelism.
Even this is an oversimplification, but as a broadly appealing programming model I think it is what we ought to strive for. There will always be hidden mutation of shared state inside lower level system components. These are often called “benevolent side-effects,” thanks to Hoare, and apply to things like lazy initialization and memorization caches. These will be done by concurrency ninjas who understand locks. And their effects will be isolated by convention.
Any true effects that must escape a pocket of isolation then get communicated transactionally to others.
Efforts in Haskell have lead to similar conclusions. Monads, of course, are the way to get side-effects into a purely functional language like Haskell: http://portal.acm.org/citation.cfm?id=262011. The state monad allows one to manipulate state lazily via a monad, in a semi-imperative way, and a paper called “Lazy functional state threads” by Launchbury and Peyton-Jones shows how to combine the state monad with threading to enable a model very similar to the one I describe: http://portal.acm.org/citation.cfm?id=178243.178246. Combine this with STM and we’re getting somewhere: http://portal.acm.org/citation.cfm?id=1065944.1065952. Sadly, I do think Haskell’s syntax is too mathematical for most and that we need a fair bit of sugar on top of the raw use of monads and combining stateful effects. But as an underlying model of computation I think the kernel of the idea is just right.
I admit that I’m a little sad that F# has taken an impure-by-default stance. Given the roots in ML and O’Caml, and the more pragmatic goals of the language, this stance isn’t a surprise. And a bunch of people will be wildly successful and happy using it as-is. F# is, however, Microsoft’s first attempt to hoist functional programming unto our professional development community, and pure-by-default is actually a fairly innocuous (but subtly crucial) position to take. (Except for those damn impure libraries.) I fear we may be missing our “once in every 5 years” chance to do the right thing. But I guess we don’t quite know for sure what the right thing is just yet; we simply didn’t take the leap of faith.
Even with all of this support, we’d be left with an ecosystem of libraries like the .NET Framework itself which have been built atop a fundamentally mutable and imperative system. The path forward here is less clear to me, although having the ability to retain a mutable model within pockets of guaranteed isolation certainly makes me think the libraries are salvageable. Thankfully, the shift will likely be very gradual, and the pieces that pose substantial problems can be rewritten in place incrementally over time. But we need the fundamental language and type system support first.
 Tuesday, August 12, 2008
Miguel de Icaza recently blogged about the addition of Parallel Extensions to the Mono family.
C# 3.0 and Parallel FX/LINQ in Mono
"For a while I wanted to blog about the open source implementation of the Parallel Extensions for Mono that Jeremie Laval has been working on. Jeremie is one of our mentored students in the 2008 Google Summer of Code. Dual CPU laptops are becoming the norm; quad-core computers are now very affordable, and eight CPU machines are routinely purchased as developer workstations. The Parallel Extension API makes it easy to prepare your software to run on multi processor machines by providing constructs that take care of distributing the work to various CPUs based on the computer load and the number of processors available."
Read more...
 Monday, August 11, 2008
The primary reason a traditional thread pool doesn’t scale is that there’s a single work queue protected by a global lock. For obvious reasons, this can easily become a bottleneck. Two primary things contribute heavily to whether the global lock becomes a limiting factor for a particular workload’s throughput:
- As the size of work items become smaller, the frequency at which the pool’s threads must acquire the global lock increases. Moving forward, we expect the granularity of latent parallelism to become smaller such that programs can scale as more processors are added.
- As more processors are added, the arrival rate at the lock will increase when compared to the same workload run with fewer processors. This inherently limits the ability to “get more work through” that single straw that is the global queue.
For coarse-grained work items, and for small numbers of processors, these problems simply aren’t too great. That has been the CLR ThreadPool’s forte for quite some time; most work items range in the 1,000s to 10,000s (or more) of CPU cycles, and 8-processors was considered pushing the limits. Clearly the direction the whole industry is headed in exposes these fundamental flaws very quickly. We’d like to enable work items with 100s and 1,000s of cycles and must scale well beyond 4, 8, 16, 32, 64, ... processors.
Decentralized scheduling techniques can be used to combat this problem. In other words, if we give different components their own work queues, we can eliminate the central bottleneck. This approach works to a degree but becomes complicated very quickly because clearly we don’t want each such queue to have its own pool of dedicated threads. So we’d need some way of multiplexing a very dynamic and comparatively large number of work pools onto a mostly-fixed and comparatively small number of OS threads.
Introducing work stealing
Another technique – and the main subject of this blog post – is to use a so-called work stealing queue (WSQ). A WSQ is a special kind of queue in that it has two ends, and allows lock-free pushes and pops from one end (“private”), but requires synchronization from the other end (“public”). When the queue is sufficiently small that private and public operations could conflict, synchronization is necessary. It is array-based and can grow dynamically. This data structure was made famous in the 90’s when much work on dynamic work scheduling was done in the research community.
In the context of a thread pool, the WSQ can augment the traditional global queue to enable more efficient private queuing and dequeuing. It works roughly as follows:
- We still have a global queue protected by a global lock.
- (We can of course consider the ability to have separate pools to reduce pressure on this.)
- Each thread in the pool has its own private WSQ.
- When work is queued from a pool thread, the work goes into the WSQ, avoiding all locking.
- When work is queued from a non-pool thread, it goes into the global queue.
- When threads are looking for work, they can have a preferred search order:
- Check the local WSQ. Work here can be dequeued without locks.
- Check the global queue. Work here must be dequeued using locks.
- Check other threads’ WSQs. This is called “stealing”, and requires locks.
If you haven’t guessed, this is by-and-large how the Task Parallel Library (TPL) schedules work.
For workloads that recursively queue a lot of work, the use of a per-thread WSQ substantially reduces the synchronization necessary to complete the work, leading to far better throughput. There are also fewer cache effects due to sharing of the global queue information. “Stealing” is our last course of action in the abovementioned search logic, because it has the secondary effect of causing another thread to have to visit the global queue (or steal) sooner. In some sense, it is double the cost of merely getting an item from the global queue.
Another (subtle) aspect of WSQs is that they are LIFO for private operations and FIFO for steals. This is inherent in how the WSQ’s synchronization works (and is key to enabling lock-freedom), but has additional rationale:
- By executing the work most recently pushed into the queue in LIFO order, chances are that memory associated with it will still be hot in the cache.
- By stealing in FIFO order, chances are that a larger “chunk” of work will be stolen (possibly reducing the chance of needing additional steals). The reason for this is that many work stealing workloads are divide-and-conquer in nature; in such cases, the recursion forms a tree, and the oldest items in the queue lie closer to the root; hence, stealing one of those implicitly also steals a (potentially) large subtree of computations that will unfold once that piece of work is stolen and run.
This decision clearly changes the regular order of execution when compared to a mostly-FIFO system, and is the reason we’re contemplating exposing options to control this behavior from TPL.
A simple WorkStealingQueue<T> type
With all that background behind us, let’s jump straight into a really simple implementation of a work stealing queue written in C#.
public class WorkStealingQueue<T>
{
The queue is array-based, and we keep two indexes—a head and a tail. The tail represents the private end and the head represents the public end. We also maintain a mask that is always equal to the size of the list minus one, helping with some of the bounds-checking arithmetic and handling automatic wraparound for indexing into the array. Because of the way we use the mask (we will assume all legal bits for indexing into the list are on), the count must always be a power of two. We arbitrarily select the number 32 as the queues initial (power of two) size.
private const int INITIAL_SIZE = 32;
private T[] m_array = new T[INITIAL_SIZE];
private int m_mask = INITIAL_SIZE - 1;
private volatile int m_headIndex = 0;
private volatile int m_tailIndex = 0; We also need a lock to protect the operations that require synchronization.
private object m_foreignLock = new object();
Although they aren’t exercised very much in the code, we have some helper properties. The queue is empty when the head is equal to or greater than the tail, and the count can be computed by subtracting the head from the tail. Because these fields never wrap (because we use the mask), this is correct.
public bool IsEmpty
{
get { return m_headIndex >= m_tailIndex; }
}
public int Count
{
get { return m_tailIndex - m_headIndex; }
} OK, let’s get into the meat of the implementation. Pushing is the obvious place to start, and, for obvious reasons, we only support private pushes. Public pushes are useless given the protocol explained above, i.e., the only public operation we will support is stealing. Keep in mind when reading this code that m_tailIndex and m_headIndex are both volatile variables.
public void LocalPush(T obj)
{
int tail = m_tailIndex;
First we must check whether there is room in the queue. To do so, we just see if m_tailIndex is less than the sum of m_mask (the size of the list minus one) and m_headIndex. False negatives are OK, and are certainly possible because a concurrent steal may come along and take an element, making room, immediately after the check. We will handle this by synchronizing in a moment.
if (tail < m_headIndex + m_mask)
{
If there is indeed room, we can merely stick the object into the array (masking m_tailIndex with m_mask to ensure we’re within the legal range) and then increment m_tailIndex by one. This may look unsafe, but it is in fact safe: writes retire in order in .NET’s memory model, and we know no other thread is changing m_tailIndex (only private operations write to it) and that no thread will try to access the current array slot into which we’re storing the element.
m_array[tail & m_mask] = obj;
m_tailIndex = tail + 1;
}
Otherwise, we need to head down the slow path which involves resizing.
else
{
We will take the lock and check that we still need to make room.
lock (m_foreignLock)
{
int head = m_headIndex;
int count = m_tailIndex - m_headIndex;
if (count >= m_mask)
{
Assuming we need to make more room, we will just double the size of the array, copy elements, fix up the fields, and move on. Remember that the array length is always a power of two, so we can get the next power of two by simply bitshifting to the left by one. We do that for the mask too, but need to remember to “turn on” the least significant bit by oring one into the mask.
T[] newArray = new T[m_array.Length << 1];
for (int i = 0; i < m_array.Length; i++)
newArray[i] = m_array[(i + head) & m_mask];
m_array = newArray;
m_headIndex = 0;
m_tailIndex = tail = count;
m_mask = (m_mask << 1) | 1;
After we’re done resizing, the m_headIndex is reset to 0, and the m_tailIndex is the previous size of the queue. We can then store into the queue in same way we would have earlier.
}
m_array[tail & m_mask] = obj;
m_tailIndex = tail + 1;
}
}
}
And that’s that: we’ve added an item into the queue with a local push. Now let’s look at the reverse: removing an element with a local pop. Remember, it’s impossible for a local push and pop to interleave with one another because they must be executed by the same thread serially.
public bool LocalPop(ref T obj)
{
First we read the current value of m_tailIndex. If the queue is currently empty, i.e., m_headIndex >= m_tailIndex, then we just return false right away. This is how “emptiness” is conveyed to callers.
int tail = m_tailIndex;
if (m_headIndex >= tail)
return false; Next we disable an annoying C# compiler warning.
#pragma warning disable 0420 Now we have determined there is at least one element in the queue (or was during our previous check). We will now subtract one from the tail, which effectively removes the element. There is still a chance that we will “lose” in a race with another thread doing a steal, so we’ll need to be very careful. In fact, there is a subtle .NET memory model gotcha to be aware of: we must guarantee our write to take the element does not get trapped in the write buffer beyond a subsequent read of the m_headIndex. If that could happen, we might mistakenly think we took the element, while at the same time a stealing thread thought it took the same element! The result would be that the same item will be dequeued by two threads which could lead to disaster. In a thread pool, it’d amount to the same work item being run twice. To ensure this reordering can’t happen, we must use a XCHG to perform the write to m_tailIndex.
tail -= 1;
Interlocked.Exchange(ref m_tailIndex, tail); We detect whether we lost the race by checking to see if our dequeuing of the element has made the queue empty. If it hasn’t, we can just read the array element in the new m_tailIndex position and return it.
if (m_headIndex <= tail)
{
obj = m_array[tail & m_mask];
return true;
}
else
{
Otherwise, we take the lock and see what to do. This blocks out all steals. Either we will find that there indeed is an element remaining, and we can just return it as we would have done above, or we must “put the element back” by just incrementing the m_tailIndex. If we have to back out our modification, we just return false to indicate that the queue has become empty. We know we aren’t racing with it becoming non-empty because only private pushes are supported.
lock (m_foreignLock)
{
if (m_headIndex <= tail)
{
// Element still available. Take it.
obj = m_array[tail & m_mask];
return true;
}
else
{
// We lost the race, element was stolen, restore the tail.
m_tailIndex = tail + 1;
return false;
}
}
}
}
Lastly, let’s take a look at the public pop capability. We allow a timeout to be supplied, because it’s often useful during the stealing logic to use a 0-timeout on the first pass through all the WSQs. This can help to eliminate lock wait times and more evenly distribute contention across the list of WSQs.
private bool TrySteal(ref T obj, int millisecondsTimeout)
{
First we acquire the WSQ’s lock, ensuring mutual exclusion among all other concurrent steals, resize operations, and local pops that may make the queue empty.
bool taken = false;
try
{
taken = Monitor.TryEnter(m_foreignLock, millisecondsTimeout);
if (taken)
{
Once inside the lock, we must increment m_headIndex by one. This moves the head towards the tail, and has the effect of taking an element. Now this part gets quite tricky. We must ensure that we don’t remove the last element when racing with a local pop that went down its fast path (i.e., it didn’t acquire the lock). Given two threads racing to take an element—a steal and a local pop—we must ensure precisely one of them “wins”. Having both succeed will lead to the same element being popped twice, and having neither succeed could lead to reporting back an empty queue when in fact an element exists.
To do that, we will write to the m_headIndex variable to tentatively take the element, and must then read the m_tailIndex right afterward to ensure that the queue is still non-empty. As with the pop logic earlier, we need to use an XCHG operation to write the m_headIndex field, otherwise we will potentially suffer from a similar legal memory reordering bug.
int head = m_headIndex;
Interlocked.Exchange(ref m_headIndex, head + 1); If the queue is non-empty, we just read the element as we usually do: by indexing into the array with the new m_headIndex value using the proper masking. We then return true to indicate an element was found.
if (head < m_tailIndex)
{
obj = m_array[head & m_mask];
return true;
}
Otherwise, the queue is empty and we must return. Clearly this is racy and by the time we return the queue may be non-empty. If the pool will subsequently wait for work to arrive, this must be taken into consideration so as not to incur lost wake-ups.
else
{
m_headIndex = head;
return false;
}
}
}
We of course need to release the lock at the end of it all.
finally
{
if (taken)
Monitor.Exit(m_foreignLock);
}
return false;
}
}
And that’s it! As with most lock-free algorithms, the core idea is surprisingly simple but deceptively subtle and intricate. After seeing it written out and explained in detail, I hope that you’ll have that “Ah hah!” moment that always happens after staring at this kind of code for a little while. In future posts, we’ll take a closer look at the performance differences between this and a traditional globally synchronized queue, and discuss what it takes to merge the two ideas implementation-wise.
Appendix
For reference, here’s the full code without all the explanation intertwined:
using System;
using System.Threading;
public class WorkStealingQueue<T>
{
private const int INITIAL_SIZE = 32;
private T[] m_array = new T[INITIAL_SIZE];
private int m_mask = INITIAL_SIZE - 1;
private volatile int m_headIndex = 0;
private volatile int m_tailIndex = 0;
private object m_foreignLock = new object();
public bool IsEmpty
{
get { return m_headIndex >= m_tailIndex; }
}
public int Count
{
get { return m_tailIndex - m_headIndex; }
}
public void LocalPush(T obj)
{
int tail = m_tailIndex;
if (tail < m_headIndex + m_mask)
{
m_array[tail & m_mask] = obj;
m_tailIndex = tail + 1;
}
else
{
lock (m_foreignLock)
{
int head = m_headIndex;
int count = m_tailIndex - m_headIndex;
if (count >= m_mask)
{
T[] newArray = new T[m_array.Length << 1];
for (int i = 0; i < m_array.Length; i++)
newArray[i] = m_array[(i + head) & m_mask];
// Reset the field values, incl. the mask.
m_array = newArray;
m_headIndex = 0;
m_tailIndex = tail = count;
m_mask = (m_mask << 1) | 1;
}
m_array[tail & m_mask] = obj;
m_tailIndex = tail + 1;
}
}
}
public bool LocalPop(ref T obj)
{
int tail = m_tailIndex;
if (m_headIndex >= tail)
return false;
#pragma warning disable 0420
tail -= 1;
Interlocked.Exchange(ref m_tailIndex, tail);
if (m_headIndex <= tail)
{
obj = m_array[tail & m_mask];
return true;
}
else
{
lock (m_foreignLock)
{
if (m_headIndex <= tail)
{
// Element still available. Take it.
obj = m_array[tail & m_mask];
return true;
}
else
{
// We lost the race, element was stolen, restore the tail.
m_tailIndex = tail + 1;
return false;
}
}
}
}
private bool TrySteal(ref T obj, int millisecondsTimeout)
{
bool taken = false;
try
{
taken = Monitor.TryEnter(m_foreignLock, millisecondsTimeout);
if (taken)
{
int head = m_headIndex;
Interlocked.Exchange(ref m_headIndex, head + 1);
if (head < m_tailIndex)
{
obj = m_array[head & m_mask];
return true;
}
else
{
m_headIndex = head;
return false;
}
}
}
finally
{
if (taken)
Monitor.Exit(m_foreignLock);
}
return false;
}
}
 Wednesday, July 30, 2008
 Tuesday, July 29, 2008
This is the first part in a series I am going to do on building a custom thread pool. Not that I’m advocating you do such a thing, but I figured it could be interesting to explore the intricacies involved. We’ll start off really simple:
- A CLR monitor based queuing mechanism.
- A static, fixed number of threads.
- The ability to create multiple pools that are isolated from one another.
- Flowing of ExecutionContexts and the ability to turn it off.
As the series progresses, I intend to incorporate some interesting facets such as:
- Dynamic thread injection, so that the number of threads is not fixed.
- Thread sharing among multiple pools in an AppDomain.
- A per-thread work stealing queue to increase the efficiency of recursively queued work.
- Interoperability with I/O completion ports.
- Returning an IAsyncResult object for seamless APM integration.
- Cancelation.
- Anything else that readers suggest might be interesting. Let me know.
And with that, let’s begin.
For now, we’ll use a very simple interface, IThreadPool, under which we can implement various mechanisms and policies. This will make it easier to write generic test harnesses that compare different implementations. For this post we won’t really make use of that capability (much), but we will use it to compare the stock CLR ThreadPool against a very simple custom one.
interface IThreadPool : IDisposable
{
void QueueUserWorkItem(WaitCallback work, object obj);
}
So that we can subsequently compare implementations, we have two simple implementations of IThreadPool. One does safe ThreadPool.QueueUserWorkItem calls, and the other does unsafe ThreadPool.UnsafeQueueUserWorkItem calls. The only difference is that the latter doesn’t flow the ExecutionContext across threads.
class CLRThreadPool : IThreadPool
{
public void QueueUserWorkItem(WaitCallback work, object obj)
{
ThreadPool.QueueUserWorkItem(work, obj);
}
public void Dispose() { }
}
class CLRUnsafeThreadPool : IThreadPool
{
public void QueueUserWorkItem(WaitCallback work, object obj)
{
ThreadPool.UnsafeQueueUserWorkItem(work, obj);
}
public void Dispose() { }
}
Our simple thread pool, SimpleLockThreadPool, will have 7 fields:
- private int m_concurrencyLevel: the number of threads to create statically, specified at construction time (w/ a default of Environment.ProcessorCount);
- private bool m_flowExecutionContext: whether execution context flowing is turned on (the default) or off. Turning it off can provide some performance gains.
- private Queue<WorkItem> m_queue: the queue of actual work items. This object is also used as a monitor. We’ll see what the WorkItem data structure looks like momentarily.
- private Thread[] m_threads: the set of threads actively dequeuing and running work items from this pool. Each instance of SimpleLockThreadPool has its own set.
- private int m_threadsWaiting: a hint used to avoid pulsing on enqueue when no threads are waiting. Threads increment and decrement before and after (respectively) waiting for work.
- private bool m_shutdown: set to true when threads are requested to exit.
Each WorkItem is a struct with three fields. Using a struct avoids superfluous heap allocations.
- internal WaitCallback m_work: the delegate to invoke.
- internal object m_obj: some optional state to pass as the argument to m_work.
- internal ExecutionContext m_executionContext: a context captured at enqueue time, to be used when running the callback. This ensures the appropriate security context and logical call context flow to the work item’s stack, for example.
There are just 4 methods of interest:
- public void QueueUserWorkItem(WaitCallback work, object obj): implements the IThreadPool interface, and does a few things. It allocates a new WorkItem, optionally captures and stores an ExecutionContext, ensures the pool has started, and then enqueues the WorkItem into the pool, possibly pulsing a single thread (if any are waiting). There’s also a convenient overload that doesn’t take an obj for situations where it isn’t needed.
- private void EnsureStarted(): a simple helper method that will lazily initialize and start the set of threads in a particular pool. These threads just sit in a loop and dequeue work. The lazy aspect ensures that a pool that doesn’t ever get used won’t allocate threads.
- private void DispatchLoop(): this is the main method run by each pool thread. All it does is sit in a loop dequeuing and (if the queue is empty) waiting for new work to arrive. When shutdown is initiated, the method voluntarily quits. If any pool work items throw an exception, this top-level method lets them go unhandled, resulting in a crash of the thread.
- public void Dispose(): shuts down all the threads in a pool. It is synchronous, so it actually waits for them to complete before returning. If work items take a long time to finish, this could be a problem. Extending this to timeout, etc., would be trivial.
And that’s really it. This is a very simple and naïve start, but it will prove to be a good starting point for all of the extensions I mentioned at the outset. Here’s the full code.
public class SimpleLockThreadPool : IThreadPool
{
// Constructors--
// Two things may be specified:
// ConcurrencyLevel == fixed # of threads to use
// FlowExecutionContext == whether to capture & flow ExecutionContexts for work items
public SimpleLockThreadPool() :
this(Environment.ProcessorCount, true) { }
public SimpleLockThreadPool(int concurrencyLevel) :
this(concurrencyLevel, true) { }
public SimpleLockThreadPool(bool flowExecutionContext) :
this(Environment.ProcessorCount, flowExecutionContext) { }
public SimpleLockThreadPool(int concurrencyLevel, bool flowExecutionContext)
{
if (concurrencyLevel <= 0)
throw new ArgumentOutOfRangeException("concurrencyLevel");
m_concurrencyLevel = concurrencyLevel;
m_flowExecutionContext = flowExecutionContext;
// If suppressing flow, we need to demand permissions.
if (!flowExecutionContext)
new SecurityPermission(SecurityPermissionFlag.Infrastructure).Demand();
}
// Each work item consists of a closure: work + (optional) state obj + context.
struct WorkItem
{
internal WaitCallback m_work;
internal object m_obj;
internal ExecutionContext m_executionContext;
internal WorkItem(WaitCallback work, object obj)
{
m_work = work;
m_obj = obj;
m_executionContext = null;
}
internal void Invoke()
{
// Run normally (delegate invoke) or under context, as appropriate.
if (m_executionContext == null)
m_work(m_obj);
else
ExecutionContext.Run(m_executionContext, ContextInvoke, null);
}
private void ContextInvoke(object obj)
{
m_work(m_obj);
}
}
private readonly int m_concurrencyLevel;
private readonly bool m_flowExecutionContext;
private readonly Queue<WorkItem> m_queue = new Queue<WorkItem>();
private Thread[] m_threads;
private int m_threadsWaiting;
private bool m_shutdown;
// Methods to queue work.
public void QueueUserWorkItem(WaitCallback work)
{
QueueUserWorkItem(work, null);
}
public void QueueUserWorkItem(WaitCallback work, object obj)
{
WorkItem wi = new WorkItem(work, obj);
// If execution context flowing is on, capture the caller's context.
if (m_flowExecutionContext)
wi.m_executionContext = ExecutionContext.Capture();
// Make sure the pool is started (threads created, etc).
EnsureStarted();
// Now insert the work item into the queue, possibly waking a thread.
lock (m_queue)
{
m_queue.Enqueue(wi);
if (m_threadsWaiting > 0)
Monitor.Pulse(m_queue);
}
}
// Ensures tha threads have begun executing.
private void EnsureStarted()
{
if (m_threads == null)
{
lock (m_queue)
{
if (m_threads == null)
{
m_threads = new Thread[m_concurrencyLevel];
for (int i = 0; i < m_threads.Length; i++)
{
m_threads[i] = new Thread(DispatchLoop);
m_threads[i].Start();
}
}
}
}
}
// Each thread runs the dispatch loop.
private void DispatchLoop()
{
while (true)
{
WorkItem wi = default(WorkItem);
lock (m_queue)
{
// If shutdown was requested, exit the thread.
if (m_shutdown)
return;
// Find a new work item to execute.
while (m_queue.Count == 0)
{
m_threadsWaiting++;
try { Monitor.Wait(m_queue); }
finally { m_threadsWaiting--; }
// If we were signaled due to shutdown, exit the thread.
if (m_shutdown)
return;
}
// We found a work item! Grab it ...
wi = m_queue.Dequeue();
}
// ...and Invoke it. Note: exceptions will go unhandled (and crash).
wi.Invoke();
}
}
// Disposing will signal shutdown, and then wait for all threads to finish.
public void Dispose()
{
m_shutdown = true;
lock (m_queue)
{
Monitor.PulseAll(m_queue);
}
for (int i = 0; i < m_threads.Length; i++)
m_threads[i].Join();
}
}
I think everything should be self-explanatory given the earlier explanation of all the fields and types. Let’s take a look at a simple test harness for this. There are a myriad of useful tests, and the one that I will show right now is but one of them. It queues a whole lot of work items, and then blocks waiting for them to complete. I have two variants: one of them allows work items to begin executing before the queuing is done, while the other separates the phases. Here is the general test.
class Program
{
public static void Main(string[] args)
{
bool separateQueueFromDrain = bool.Parse(args[0]);
const int warmupRunsPerThreadPool = 100;
const int realRunsPerThreadPool = 1000000;
IThreadPool[] threadPools = new IThreadPool[]
{
new CLRThreadPool(),
new CLRUnsafeThreadPool(),
new SimpleLockThreadPool(true), // Flow EC
new SimpleLockThreadPool(false), // Don't flow EC
};
long[] queueCost = new long[threadPools.Length];
long[] drainCost = new long[threadPools.Length];
Console.WriteLine("+ Running benchmarks ({0}) +", threadPools.Length);
for (int i = 0; i < threadPools.Length; i++)
{
IThreadPool itp = threadPools[i];
Console.Write("#{0} {1}: ", i, itp.ToString().PadRight(26));
// Warm up:
using (CountdownEvent cev = new CountdownEvent(warmupRunsPerThreadPool))
{
WaitCallback wc = delegate { cev.Decrement(); };
for (int j = 0; j < warmupRunsPerThreadPool; j++)
itp.QueueUserWorkItem(wc, null);
cev.Wait();
}
// Now do the real thing:
int g0collects = GC.CollectionCount(0);
int g1collects = GC.CollectionCount(1);
int g2collects = GC.CollectionCount(2);
using (CountdownEvent cev = new CountdownEvent(realRunsPerThreadPool))
using (ManualResetEvent gun = new ManualResetEvent(false))
{
WaitCallback wc = delegate {
if (separateQueueFromDrain) { gun.WaitOne(); }
cev.Decrement();
};
Stopwatch sw = Stopwatch.StartNew();
for (int j = 0; j < realRunsPerThreadPool; j++)
itp.QueueUserWorkItem(wc, null);
queueCost[i] = sw.ElapsedTicks;
sw = Stopwatch.StartNew();
if (separateQueueFromDrain) { gun.Set(); }
cev.Wait();
drainCost[i] = sw.ElapsedTicks;
}
g0collects = GC.CollectionCount(0) - g0collects;
g1collects = GC.CollectionCount(1) - g1collects;
g2collects = GC.CollectionCount(2) - g2collects;
Console.WriteLine("q: {0}, d: {1}, t: {2} (collects: 0={3},1={4},2={5})",
queueCost[i].ToString("#,##0"),
drainCost[i].ToString("#,##0"),
(queueCost[i] + drainCost[i]).ToString("#,##0"),
g0collects,
g1collects,
g2collects
);
itp.Dispose();
GC.Collect(2);
GC.WaitForPendingFinalizers();
}
Console.WriteLine();
Console.WriteLine("+ Comparison against baseline ({0}) +", threadPools[0]);
for (int i = 0; i < threadPools.Length; i++)
{
Console.WriteLine("#{0} {1}: q: {2}x, d: {3}x, t: {4}x",
i,
threadPools[i].ToString().PadRight(26),
queueCost[i] / (float)queueCost[0],
drainCost[i] / (float)drainCost[0],
(queueCost[i] + drainCost[i]) / ((float)queueCost[0] + drainCost[0])
);
}
}
}
If we pass ‘true’ on the command line, the phases are separated, and if we pass ‘false’ they are not. The ‘true’ part allows us to hone in on the source of overhead (is it the queuing itself, or the dispatching of work items?), but at the expense of needing to keep more of the work items in memory at once (because pool threads can’t drain them as we queue them). We run the test over an array of IThreadPool implementations, and for each one print out the cost to queue work, drain work, and the number of Gen0, Gen1, and Gen2 collections performed for each one. The GC statistics are interesting because they tell us how much more memory (roughly speaking) we are allocating for the same workload on different pool implementations. As our pool gets more complicated, this will be something to keep your eye on.
Here are some sample numbers on my dual-core laptop. Your results will vary. When ‘true’ is passed, I see numbers like the following:
+ Running benchmarks (4) +
#0 CLRThreadPool : q: 3,163,506, d: 5,137,893, t: 8,301,399 (collects: 0=16,1=8,2=3)
#1 CLRUnsafeThreadPool : q: 1,285,806, d: 4,428,451, t: 5,714,257 (collects: 0=5,1=4,2=1)
#2 SimpleLockThreadPool : q: 4,208,686, d: 11,839,614, t: 16,048,300 (collects: 0=104,1=14,2=4)
#3 SimpleLockThreadPool : q: 499,575, d: 3,992,190, t: 4,491,765 (collects: 0=1,1=1,2=1)
+ Comparison against baseline (CLRThreadPool) +
#0 CLRThreadPool : q: 1x, d: 1x, t: 1x
#1 CLRUnsafeThreadPool : q: 0.4064497x, d: 0.8619196x, t: 0.6883487x
#2 SimpleLockThreadPool : q: 1.330387x, d: 2.304371x, t: 1.933204x
#3 SimpleLockThreadPool : q: 0.1579181x, d: 0.7770092x, t: 0.5410853x
And when ‘false’ is passed, I see similar but subtly different numbers:
+ Running benchmarks (4) +
#0 CLRThreadPool : q: 3,476,630, d: 27,592, t: 3,504,222 (collects: 0=20,1=6,2=0)
#1 CLRUnsafeThreadPool : q: 2,636,319, d: 140,653, t: 2,776,972 (collects: 0=5,1=2,2=0)
#2 SimpleLockThreadPool : q: 4,850,171, d: 6,227,052, t: 11,077,223 (collects: 0=95,1=14,2=4)
#3 SimpleLockThreadPool : q: 826,987, d: 132,755, t: 959,742 (collects: 0=1,1=1,2=1)
+ Comparison against baseline (CLRThreadPool) +
#0 CLRThreadPool : q: 1x, d: 1x, t: 1x
#1 CLRUnsafeThreadPool : q: 0.7582973x, d: 5.097601x, t: 0.7924646x
#2 SimpleLockThreadPool : q: 1.395078x, d: 225.6832x, t: 3.161108x
#3 SimpleLockThreadPool : q: 0.2378703x, d: 4.811358x, t: 0.2738816x
Notice right away that we are handily beating the heck out of the CLR thread pool in the case where we don’t flow ExecutionContext objects (the #3 case). In fact, we are only 27% the cost for the ‘false’ variant. But we unfortunately don’t fare nearly as well when we flow ExecutionContext objects (the #2 case). It turns out that’s because the CLR has a unique advantage over us when compared to our naïve call to ExecutionContext.Capture. Just look at the sizeable difference in Gen0 collections; we are clearly allocating a lot more memory. This will be a topic for a subsequent post.
 Sunday, July 20, 2008
Here's a slightly more formal approach to proving that the CLR MM is improperly implemented for the particular example I showed earlier.
As the Java MM folks have done, I will use a combination of happens-before and synchronizes-with relations, which allows order in a properly synchronized program to be describe as a "flat" sequence with total ordering among elements. Assume < means synchronizes-with. If a happens-before b, and a < b, then any writes in a are visible to any loads in b. This relation is transitive: if a < b and b < c, then a < c. Given this, we can take an observed set of results (the values held in memory locations), a hypothesized execution order (which we can infer from the observation), and validate it against the program order (as written in the source); we do this by taking the MM-specific synchronizes-with relation rules, and see if we can produce the observed output given our belief of the execution order. If we find a contradiction (the execution order required to produce the output could not be produced given the program order and MM rules), either there is an alternative execution order we failed to guess, or we have found a violation of the memory model.
Single threaded programs are easy. Multi threaded programs are hard. We must manually "sequentialize" the program by constructing an interleaving of all executed program operations into a single flat sequence, and permute them as needed to produce the output in order to formulate a hypothesis of the execution order. This is of course very difficult to do, so it only works with very small programs (like the one I will show below).
I will try to define the CLR 2.0 MM in terms of synchronizes-with, although I have to admit it’s going to be difficult to do off the top of my head:
- a < b, given a volatile load a that precedes any other memory operation b. (Loads are acquire.)
- a < b, given any memory operation a that precedes any other store b. (Stores are release.)
- a < b, given two separate memory operations a which precedes b that work with the same memory location. (Data dependence.)
- a < b, given any memory operation a that precedes a full fence b. (Cannot move after a fence.)
- a < b, given a full fence a that precedes some memory operation b. (Cannot move before a fence.)
- a < b, given a lock acquire a that precedes some memory operation b. (Lock acquires are acquire fences.)
- a < b, given a memory operation a that precedes a lock release b. (Lock releases are release fences.)
Let’s take the disturbing example, assuming all loads and stores are volatile.
X = 1; Y = 1; R0 = X; R2 = Y; R1 = Y; R3 = X;
Let’s hypothesize about execution order.
To produce an output in which R1 == R3 == 0, let us observe that it must be the case that X = 1 and Y = 1 must not happen first. If one such instruction does occur first, then any possible outcome leads to R1 and/or R3 holding the value 1. That is because of rule 3: if X = 1 happened first, then X = 1 < R3 = X, leading to R3 == 1 and similarly if Y = 1 happened first, then Y= 1 < R1 = Y, leading to R1 == 1. So let us try to make X = 1 and Y = 1 not happen first.
Indeed, it is impossible for R0 = X or R2 = Y to happen first. This is because of CLR MM rule 3: X = 1; R0 = X leads to data dependence, and thus X = 1 < R0 = X. Similarly, Y = 1 < R2 = Y. Dead end. Let’s try the only other route.
The only remaining possibility to produce the output R1 == R3 == 0 is if R1 = Y or R3 = X occurs first. Let us try to make R1 = Y occur first. Ah-hah! We cannot! Given CLR MM rule 1, R0 = X < R1 = Y. And because of transitivity, this necessarily implies that X = 1 < R1 = Y. The same holds for the other thread’s instructions: Y = 1 < R3 = X. The output R1 == R3 == 0 is therefore a contradiction and disallowed by the CLR MM.
Now, this is light years from a formal proof, but is the reasoning I’ve been using in my mind to explain why this new realization is fundamentally very disturbing and is explicitly not allowed by the CLR MM. Thankfully it seems the JIT team agrees and is willing to fix this for the next release. And, I'm still in search of an example of code that is broken by this problem ...
 Wednesday, July 16, 2008
The adjacent release/acquire problem is well known. As an example, given the program:
P0 P1 ========== ========== X = 1; Y = 1; R0 = Y; R1 = X;
The outcome R0 == R1 == 0 is entirely legal. This could happen because writes are delayed in processor store buffers; so before R0 = Y retires, the store X = 1 may have not even left the local processor P0; similarly, before R1 = X retires, the store Y = 1 may not have even left processor P1. It is as if the program was written as follows:
P0 P1 ========== ========== R0 = Y; R1 = X; X = 1; Y = 1;
The standard way to fix this is to emit a full fence:
P0 P1 ========== ========== X = 1; Y = 1; XCHG; XCHG; R0 = Y; R1 = X;
But here is one that may be a little surprising:
P0 P1 ========== ========== X = 1; Y = 1; R0 = X; R2 = Y; R1 = Y; R3 = X;
Assuming X and Y are "volatile" to the compiler, is R1 == R3 == 0 a possible outcome in this program?
Based on the rules we provide for .NET's MM, and Intel's whitepaper, one could reasonably argue "no". The reasoning goes as follows. True data dependence prohibits R0 = X from moving before X = 1, and the no load/load reordering rule (e.g. Intel's Rule 2.1) prohibits R1 = Y from moving before R0 = X. Thus, transitively, R1 = Y may not move before X = 1. Similarly, true data dependence prohibits R2 = Y from moving before Y = 1, and the no load/load reordering rule prohibits R3 = X from moving before R2 = Y, and therefore R3 = X may not move before Y = 1. Given this reasoning, the individual instruction streams cannot be reordered in place. And therefore, no interleaving of them will yield R1 == R3 == 0, because either X = 1 or Y = 1 must happen first, and both R1 = Y and R3 = X must come later. Hence at least one of R1 or R3 will observe a value of 1.
Sadly, this reasoning is incorrect. Rule 2.4 in the Intel whitepaper states that "intra-processor forwarding is allowed." They even have an innocent example in the paper, but it actually doesn't exhibit load/load reordering. It does, however, illustrate that stores may be delayed for some time in a write buffer. Perhaps surprisingly, such intra-processor forwarding of buffered stores is actually permitted to satisfy subsequent loads from that location by the same processor before the store has left the processor. This can happen even if it means passing intermediate loads from different memory locations! The result is that load/load reordering is effectively possible under some circumstances. Loads still physically retire in order of course, but because they may be satisfied by pending writes that other processors cannot yet see, it is as if the original program were written as:
P0 P1 ========== ========== R1 = Y; R3 = X; X = 1; Y = 1; R0 = X; R2 = Y;
The fundamentally contradicts what most people believe about .NET's MM, and indeed, Intel's MM as specified in that whitepaper. To be fair, the whitepaper actually does call this out, but in a roundabout and misleading fashion. The text in Rule 2.1, which states that "no loads can be reordered with other loads", is far too strong.
Anytime a little hole in something as fundamental as MM axioms is uncovered, it is cause for concern. So I found this discovery deeply disturbing. Many abstractions and theorems are proved with the assumption that the MM is rock solid. I know a lot of code I have written relies on such proofs.
That said, I've been racking my brain (and in fact was having nightmares about it last evening) trying to uncover a case where this is worse than the existing release/acquire reordering issue that I opened this post with. Everything I come up with is saved at the last minute by rules 2.1 (for stores) and 2.5 "stores are transitively visible". The basic problem is that a processor can get stuck seeing its own written value for some time, during which other processors cannot, but ultimately it doesn't seem to matter because the buffer will eventually be flushed. Then any intermediary values that the destination may have held while that processor was stuck will have been overwritten anyway, so the outcome should be explainable (albeit racey). I'm still thinking hard about this.
|
|
Recent Entries:
Search:
Browse by Date:
| | Sun | Mon | Tue | Wed | Thu | Fri | Sat | | 31 | 1 | 2 | 3 | 4 | 5 | 6 | | 7 | 8 | 9 | 10 | 11 | 12 | 13 | | 14 | 15 | 16 | 17 | 18 | 19 | 20 | | 21 | 22 | 23 | 24 | 25 | 26 | 27 | | 28 | 29 | 30 | 1 | 2 | 3 | 4 | | 5 | 6 | 7 | 8 | 9 | 10 | 11 |
Browse by Category:
Notables:
|