Consumer-Producer in NETMF using a BoundedBuffer

I will try to add some more context here, but for now, I will try to post the essentials.  Passing data between threads can be done in various ways.  Probably the most common pattern is the consumer-producer pattern using a shared queue. The shared queue is normally a Bounded Buffer. A bounded buffer is perfect structure for this because you can block on an empty queue and block on a full queue. You could use an unbounded buffer, but not as useful in NETMF with limited RAM. Moreover, even if you have the RAM, you typically want to limit the number of items and let any consumers catch up and not use up too many resources.

Because the bounded buffer will be shared between threads, it needs to take synchronization and thread safety into account.  Here is a minimal implementation for NETMF with Add(…), Take() and Count interfaces.  Should note that NETMF does not implement Monitor with all the features it has in general .Net. For instance, it does not have the important Wait() and Pulse() methods. Fortunately, we can add that ability with a reset event. Here is the BoundedBuffer for NETMF:

public class BoundedBuffer
{
    private Queue queue = new Queue();
    private int consumersWaiting;
    private int producersWaiting;
    private const int maxBufferSize = 128;
    private AutoResetEvent are = new AutoResetEvent(false);

    public int Count
    {
        get { return queue.Count; }
    }

    public void Add(object obj)
    {
        Monitor.Enter(queue);
        try
        {
            while (queue.Count == (maxBufferSize - 1))
            {
                producersWaiting++;
                Monitor.Exit(queue);
                are.WaitOne();
                Monitor.Enter(queue);
                producersWaiting--;
            }
            queue.Enqueue(obj);
            if (consumersWaiting > 0)
                are.Set();
        }
        finally
        {
            Monitor.Exit(queue);
        }
    }

    public object Take()
    {
        object item;
        Monitor.Enter(queue);
        try
        {
            while (queue.Count == 0)
            {
                consumersWaiting++;
                Monitor.Exit(queue);
                are.WaitOne();
                Monitor.Enter(queue);
                consumersWaiting--;
            }
            item = queue.Dequeue();
            if (producersWaiting > 0)
                are.Set();
        }
        finally
        {
            Monitor.Exit(queue);
        }
        return item;
    }
}

And here is a usage example. The main thread is the consumer reads items from the queue. The Producer thread fills the queue and posts a Null to signal end.

private static void BoundedBufferTest()
{
    // Main thread here (or some Consumer thread).
    BoundedBuffer bb = new BoundedBuffer();
            
    // Producer worker. Could be a thread in another class.
    new Thread(() =>
        {
            for (int i = 0; i < 500; i++)
            {
                // Read some service here <==
                // Write output for consumer.
                bb.Add(i);
                Thread.Sleep(1); // Slow it down for ping-pong effect.
            }
            bb.Add(null); // End.
        }).Start();

    // Read and process items in this thread created in producer thread.
    while (true)
    {
        object item = bb.Take();
        if (item == null) break; // Done.
        Debug.Print("Item: " + item.ToString());
    }
    Debug.Print("Complete");
}

We can add more function to BoundedBuffer, but will keep the implementation  simple and easy to reason about. The big brother .Net has various other methods to signal data between threads. The newest addition is Task<T>. Task is both a way to run code asynchronously and way to pass results between threads. Task is now a core element in .Net and used by the new Async feature in C# 4.0.  Task is essentially a Future – A value that will be posted some time in the future. In some respects, our BoundedBuffer could be thought of as a potential stream of Future values.

Other references on Fezzer.com –
BlockingQueue -Another implementation using Monitor2 on NETMF.
Monitor2 – Adds Wait and Pulse methods to the standard NETMF Monitor.

More next time.
William

Advertisements
This entry was posted in .Net Micro Framework, C# and tagged , , , , . Bookmark the permalink.