I've been toying around with C# iterators a bit more lately, particularly regarding non-mainstream applications. I discussed partial algorithm computation during a couple earlier posts (here and here), but now wish to turn to producer/consumer models. A producer/consumer model is one in which a single producer is responsible for generating items of interest, which are later processed by one or more consumers.
I really should do a bit more explanation, but for now I'll simply present some code examples with brief comments.
To wire up threaded producers and consumers, quite a bit of plumbing is required; thus, I have encapsulated it all into a couple common abstract classes:
public abstract class Producer<T>
{
public Producer()
{
worker = new Thread(new ThreadStart(this.ProductionCycle));
}
private Queue<T> buffer = new Queue<T>();
public Thread worker;
private bool done;
public bool Done
{
get
{
return done;
}
}
public IEnumerable<T> ConsumerChannel
{
get
{
if (done)
throw new InvalidOperationException("Production is not currently active");
while (!done)
{
Nullable<T> consumed = new Nullable<T>();
//BUG: compiler crashes when using lock(...) construct within iterator
Monitor.Enter(buffer);
if (buffer.Count == 0)
Monitor.Wait(buffer);
if (buffer.Count > 0)
consumed = new Nullable<T>(buffer.Dequeue());
Monitor.Exit(buffer);
if (consumed.HasValue)
yield return consumed.Value;
}
yield break;
}
}
public void BeginProduction()
{
done = false;
worker.Start();
}
public void EndProduction()
{
done = true;
lock (buffer)
{
Monitor.PulseAll(buffer);
}
}
private void ProductionCycle()
{
while (!done)
{
T t = ProduceNext();
lock (buffer)
{
buffer.Enqueue(t);
Monitor.Pulse(buffer);
}
}
}
protected abstract T ProduceNext();
}
public abstract class Consumer<T>
{
public Consumer(Producer<T> producer)
{
this.producer = producer;
worker = new Thread(new ThreadStart(this.ConsumerCycle));
}
private Producer<T> producer;
public Thread worker;
private bool done = false;
public bool Done
{
get
{
return done;
}
}
public void BeginConsumption()
{
done = false;
worker.Start();
}
public void EndConsumption()
{
done = true;
}
private void ConsumerCycle()
{
foreach (T t in producer.ConsumerChannel)
{
Consume(t);
if (done)
break;
}
}
protected abstract void Consume(T t);
}
(Note: I haven't spent enough time on the threading issues, and as such may have overlooked a thing or two. It's pretty messy as it stands, but it'll do for now...)
To create specific concrete classes from these, it's a matter of simply overriding a couple methods. For instance, the following producer generates a never-ending sequence of random numbers, while the consumer simply prints them out:
class RandomNumberProducer : Producer<int>
{
public RandomNumberProducer() : base()
{
rand = new Random();
}
private Random rand;
protected override int ProduceNext()
{
return rand.Next();
}
}
class RandomNumberConsumer : Consumer<int>
{
public RandomNumberConsumer(RandomNumberProducer p) : base(p)
{
}
private static int counter = 0;
private int id = ++counter;
protected override void Consume(int t)
{
Console.Out.WriteLine("#{0}: consumed {1}", id, t);
}
}
To wire them up, and kick off the cycles, the following test code does the trick:
RandomNumberProducer p = new RandomNumberProducer();
RandomNumberConsumer c1 = new RandomNumberConsumer(p);
RandomNumberConsumer c2 = new RandomNumberConsumer(p);
RandomNumberConsumer c3 = new RandomNumberConsumer(p);
p.BeginProduction();
c1.BeginConsumption();
c2.BeginConsumption();
c3.BeginConsumption();
Thread.Sleep(2500);
c3.EndConsumption();
c2.EndConsumption();
c1.EndConsumption();
p.EndProduction();
These examples probably don't quite illustrate the advantages of this approach very well. Obviously, using well factored base classes provides a lot of benefit, namely that they encapsulate and implement the threading and synchronization boilerplate. Interestingly, the iterators remove the need for the consumer to worry about any synchronization.
This means that the following simple consumer code is actually thread safe - even if there are other consumers wired up to the producer:
foreach (int i in p.ConsumerChannel)
{
Console.Out.WriteLine("consumed: {0}", i);
}
In theory, the threading and blocking is all handled by the producer's iterator (although I question if my implementation is entirely correct).
Pretty nifty, if you ask me...
[I apologize for the hideous HTML for the code samples - I really wanted to retain the Visual Studio color scheme, and the only easy way I could find to accomplish this was to cut and paste into Word. Probably not an issue on most browsers, but just in case...]