RSS 2.0

Personal Info:

Joe Send mail to the author(s) leads the architecture of an experimental OS's developer platform, where he is also chief architect of its programming language. His current mission is to enable writing large-scale software that is reliable, secure, and scalable by-construction. Before this, Joe founded the Parallel Extensions to .NET project. He has been granted 19 patents, with 49 pending. When not working, Joe enjoys travelling with his wife, writing books, writing music, studying music theory & mathematics, and doing anything involving food & wine.

My books

My music

Disclaimer:
The content of this site are my own personal opinions and do not represent my employer's view in anyway.

© 2012, Joe Duffy

 
 Wednesday, June 09, 2004

I've been toying around with C# iterators a bit more lately, particularly regarding non-mainstream applications. I discussed partial algorithm computation during a couple earlier posts (here and here), but now wish to turn to producer/consumer models. A producer/consumer model is one in which a single producer is responsible for generating items of interest, which are later processed by one or more consumers.

I really should do a bit more explanation, but for now I'll simply present some code examples with brief comments.

To wire up threaded producers and consumers, quite a bit of plumbing is required; thus, I have encapsulated it all into a couple common abstract classes:

public abstract class Producer<T>

{

 

      public Producer()

      {

            worker = new Thread(new ThreadStart(this.ProductionCycle));

      }

 

      private Queue<T> buffer = new Queue<T>();

 

      public Thread worker;

 

      private bool done;

 

      public bool Done

      {

            get

            {

                  return done;

            }

      }

 

      public IEnumerable<T> ConsumerChannel

      {

            get

            {

                  if (done)

                        throw new InvalidOperationException("Production is not currently active");

 

                  while (!done)

                  {

                        Nullable<T> consumed = new Nullable<T>();

 

                        //BUG: compiler crashes when using lock(...) construct within iterator

                        Monitor.Enter(buffer);

                        if (buffer.Count == 0)

                              Monitor.Wait(buffer);

                        if (buffer.Count > 0)

                              consumed = new Nullable<T>(buffer.Dequeue());

                        Monitor.Exit(buffer);

 

                        if (consumed.HasValue)

                              yield return consumed.Value;

                  }

 

                  yield break;

            }

      }

 

      public void BeginProduction()

      {

            done = false;

            worker.Start();

      }

 

      public void EndProduction()

      {

            done = true;

            lock (buffer)

            {

                  Monitor.PulseAll(buffer);

            }

      }

 

      private void ProductionCycle()

      {

            while (!done)

            {

                  T t = ProduceNext();

                  lock (buffer)

                  {

                        buffer.Enqueue(t);

                        Monitor.Pulse(buffer);

                  }

            }

      }

 

      protected abstract T ProduceNext();

 

}

 

public abstract class Consumer<T>

{

 

      public Consumer(Producer<T> producer)

      {

            this.producer = producer;

            worker = new Thread(new ThreadStart(this.ConsumerCycle));

      }

 

      private Producer<T> producer;

 

      public Thread worker;

 

      private bool done = false;

 

      public bool Done

      {

            get

            {

                  return done;

            }

      }

 

      public void BeginConsumption()

      {

            done = false;

            worker.Start();

      }

 

      public void EndConsumption()

      {

            done = true;

      }

 

      private void ConsumerCycle()

      {

            foreach (T t in producer.ConsumerChannel)

            {

                  Consume(t);

                  if (done)

                        break;

            }

      }

 

      protected abstract void Consume(T t);

 

}

 

(Note: I haven't spent enough time on the threading issues, and as such may have overlooked a thing or two. It's pretty messy as it stands, but it'll do for now...)

To create specific concrete classes from these, it's a matter of simply overriding a couple methods. For instance, the following producer generates a never-ending sequence of random numbers, while the consumer simply prints them out:

class RandomNumberProducer : Producer<int>

{

 

      public RandomNumberProducer() : base()

      {

            rand = new Random();

      }

 

      private Random rand;

 

      protected override int ProduceNext()

      {

            return rand.Next();

      }

 

}

 

class RandomNumberConsumer : Consumer<int>

{

 

      public RandomNumberConsumer(RandomNumberProducer p) : base(p)

      {

      }

 

      private static int counter = 0;

 

      private int id = ++counter;

 

      protected override void Consume(int t)

      {

            Console.Out.WriteLine("#{0}: consumed {1}", id, t);

      }

 

}

 

To wire them up, and kick off the cycles, the following test code does the trick:

 

RandomNumberProducer p = new RandomNumberProducer();

 

RandomNumberConsumer c1 = new RandomNumberConsumer(p);

RandomNumberConsumer c2 = new RandomNumberConsumer(p);

RandomNumberConsumer c3 = new RandomNumberConsumer(p);

 

p.BeginProduction();

 

c1.BeginConsumption();

c2.BeginConsumption();

c3.BeginConsumption();

 

Thread.Sleep(2500);

 

c3.EndConsumption();

c2.EndConsumption();

c1.EndConsumption();

 

p.EndProduction();

 

These examples probably don't quite illustrate the advantages of this approach very well. Obviously, using well factored base classes provides a lot of benefit, namely that they encapsulate and implement the threading and synchronization boilerplate. Interestingly, the iterators remove the need for the consumer to worry about any synchronization.

This means that the following simple consumer code is actually thread safe - even if there are other consumers wired up to the producer:

foreach (int i in p.ConsumerChannel)

{

      Console.Out.WriteLine("consumed: {0}", i);

}

In theory, the threading and blocking is all handled by the producer's iterator (although I question if my implementation is entirely correct).

Pretty nifty, if you ask me...

[I apologize for the hideous HTML for the code samples - I really wanted to retain the Visual Studio color scheme, and the only easy way I could find to accomplish this was to cut and paste into Word. Probably not an issue on most browsers, but just in case...]

 

Recent Entries:

Search:

Browse by Date:
<June 2004>
SunMonTueWedThuFriSat
303112345
6789101112
13141516171819
20212223242526
27282930123
45678910

Browse by Category:

Notables: