Simple F# producer-consumer pattern

The producer-consumer pattern allows a producer thread to generate data (e.g loaded from disk or read from a web site) and put in a queue, concurrently with a thread that consumes the data from the queue.

A typical application of this is the processing of multiple disk files. When done in a single threaded, sequential manner, the CPU mostly sits idle while waiting for the disk during file load. The disk, in turn, doesn’t do anything useful while waiting for the CPU to finish processing a file before the next one is loaded. Using the producer-consumer pattern, the files are loaded from the disk and queued while the CPU processes files already in the queue, speeding up the processing of the second and all subsequent files. To boot, you get the possibility of running multiple producers and multiple consumers concurrently, as well as the smug feeling that goes with it.

The producer-consumer pattern is imperative rather than functional by nature, but nevertheless useful in functional programs. To make the ProducerConsumer class below look a bit ‘functionalish’, I’ve made the input (producer) a sequence, and defined member functions similar to Seq.map, Seq.fold and Seq.iter for consumption. In contrast to Seq.map/fold/iter, you can only perform consumption once, since you consume elements from a queue, leaving it empty afterwards. In the ProducerConsumer class, I’ve emphasized this by naming the consumption functions MapConsume, FoldConsume and IterConsume.

The way we use the finished class will be something like:

let someBigFiles = ["foo.big"; "bar.big"; "baz.big"]
let someOtherBigFiles = ["foo.mp3"; "bar.mp3"; "baz.mp3"]
let producers =
    [Seq.map File.ReadAllBytes someBigFiles; 
     Seq.map File.ReadAllBytes someOtherBigFiles]

let processFile fileBytes =
    // Some lengthy operations here ...

let pc = ProducerConsumer(producers, queueLength)
let results = pc.MapConsume(processFile, degreeOfParallelism)

Let’s start by defining the class’ constructor arguments and the necessary housekeeping variables:

type ProducerConsumer<'T>(inputSeqs : seq<seq<'T>>, qSize : int) as this =
    let queue            = Array.create qSize (None : option<'T>)  // The queue is a wrap-around array.
    let qNextItemToRead  = ref 0  // Next item no to read from the input stream.
    let qNextItemToQueue = ref 0  // Next item no to put in the queue.
    let emptyCount       = new Semaphore(qSize, qSize)  // Empty slots in the queue.
    let fullCount        = new Semaphore(0, qSize)      // Filled slots in the queue.
    let endOfInput       = new EventWaitHandle(false, EventResetMode.ManualReset)
    let canceled         = new EventWaitHandle(false, EventResetMode.ManualReset)

Note that the input is a sequence of sequences, seq<seq<‘T>>. Each seq<‘t> is a producer, so the class allows multiple producers.

The simplest way to define a function that runs a single producer would be:

let produce (inputSeq : seq<'T>) =
    for item in inputSeq do
        emptyCount.WaitOne() |> ignore
        let itemToQueue = System.Threading.Interlocked.Increment(&qNextItemToQueue.contents) - 1
        queue.[itemToQueue % qSize] <- Some item
        fullCount.Release() |> ignore

The queue is a wrap-around array, so the index for an item is [itemNo % queueSize]. Waiting for the emptyCount semaphore makes sure that we don’t overwrite items before the consumer has read them.

The corresponding simple function to consume a single item would be:

member x.ConsumeOne() =
    fullCount.WaitOne() |> ignore
    let itemToRead = System.Threading.Interlocked.Increment(&qNextItemToRead.contents) - 1
    let result = queue.[itemToRead % qSize]
    queue.[itemToRead % qSize] <- None
    emptyCount.Release() |> ignore
    result

Cancellation support

The above functions illustrate the principle of how the producer-consumer pattern works. However, it’s not quite complete. For realistic use, we need a way to tell the consumer when all input is consumed, and support for cancellation. Fortunately, adding them is easy. Setting and getting cancellation status is done by a couple of member functions:

member x.Cancel() = canceled.Set() |> ignore
member x.IsCanceled = canceled.WaitOne(0)

The produce function with added cancellation support looks like:

let produce (inputSeq : seq<'T>) =
    let cancellableSeq = inputSeq |> Seq.takeWhile (fun _ -> not this.IsCanceled)
    for item in cancellableSeq do
        let isCanceled = WaitHandle.WaitAny([|canceled; emptyCount|]) = 0
        if not isCanceled then
            let itemToQueue = System.Threading.Interlocked.Increment(&qNextItemToQueue.contents) - 1
            queue.[itemToQueue % qSize] <- Some item
            fullCount.Release() |> ignore

There is a slight problem with this code. Seq.takeWhile will read each item from the input sequence (i.e. cause an item to be produced) before it evaluates the predicate function (fun _ -> not this.IsCanceled). This is no big problem, since it will only delay the cancellation a little, but it can be solved quite easily by copying the source code for Seq.takeWhile and modifying it to a function that evaluates the predicate function before it grabs an item from the sequence:

let takeWhilePre p (source: seq<_>) = 
    seq { use e = source.GetEnumerator() 
            let latest = ref Unchecked.defaultof<_>
            while p() && e.MoveNext() do
                latest := e.Current
                yield !latest }

In the produce function, the cancellableSeq assignment is changed to

let cancellableSeq = inputSeq |> takeWhilePre (fun _ -> not this.IsCanceled)

Supporting multiple producers and denoting end of input

Now it’s time to make good on our promise to support multiple producers, as well as devising a way to tell the consumer when all input is consumed. If you’re familiar with F# async, that’s actually easier done than said:

let produceAll (inputSeqs : seq<seq<'T>>) =
    [ for i in inputSeqs -> async { produce i } ]
    |> Async.Parallel
    |> Async.Ignore
    |> Async.RunSynchronously
    endOfInput.Set() |> ignore

On the consumer side of things, there’s need for a bit more careful thought:

member x.ConsumeOne() =
    // The order of WaitHandles in WaitAny arg is important.
    let waitHandleIndex = WaitHandle.WaitAny([|canceled; fullCount; endOfInput|])
    let isCanceledOrEndOfInput = waitHandleIndex = 0 || waitHandleIndex = 2

    if isCanceledOrEndOfInput then
        None
    else 
        let itemToRead = System.Threading.Interlocked.Increment(&qNextItemToRead.contents) - 1
        let result = queue.[itemToRead % qSize]
        queue.[itemToRead % qSize] <- None
        emptyCount.Release() |> ignore
        result

What we need to think about, is the order of the WaitHandles in the WaitAny array parameter.When several WaitHandles satisfy the wait, WaitAny will return the lowest array index of the WaitHandles doing so. E.g. if canceled isn’t set, fullCount is nonzero and endOfInput is set, then the return value will be the index of fullCount, which is 1 in the [|canceled; fullCount; endOfInput|] array.

As one would expect, canceled should be first in the array. endOfInput signifies the end of producer input, so it may be set when there’s still consumable data in the queue. Thus, it’s important that fullCount comes before endOfInput in the WaitAny array, so that we don’t prematurely set isCanceledOrEndOfInput to true. When all data is consumed, fullCount will be zero and endOfInput will be set.

By now, you’ll wonder why ConsumeOne is a member function while produceAll isn’t. The reason is that produceAll is a local function in the constructor, and the constructor sets production in motion by calling it,

do Async.Start (async { produceAll inputSeqs })

Convenient consumption

While users of the ProducerConsumer class could consume all produced data by repeatedly calling ConsumeOne until it returns None, it’s more convenient to consume by functions reminiscent of fold, map and iter, as described in the introduction. The FoldConsume will be the basis for MapConsume and IterConsume, so we define it first. As a matter of course, we’ll want to support parallelism:

member x.FoldConsume(folder, state, degreeOfParallelism) =
    let rec consumeLoop folder state =
        let item = x.ConsumeOne()
        match item with
        | Some i ->
            let state' = folder state i
            consumeLoop folder state'
        | None -> state

    [ for parallelSlot in 0..degreeOfParallelism-1 -> async { return (consumeLoop folder state) } ]
    |> Async.Parallel
    |> Async.RunSynchronously

F# Seq.fold threads an accumulator state through the computation and returns the final state as the computation’s result, but ProducerConsumer.FoldConsume will return one final state for each degree of parallelism. You’ll obviously have to merge these states into a single state, if you want to accomplish something akin to using Seq.fold.

Having defined FoldConsume, we can now define MapConsume and IterConsume:

member x.MapConsume(mapping, degreeOfParallelism) =
    let result =
        x.FoldConsume((fun acc i -> (mapping i)::acc), [], degreeOfParallelism)
        |> List.concat
    if degreeOfParallelism = 1 then result |> List.rev else result

member x.IterConsume(action, degreeOfParallelism) =
    x.FoldConsume((fun _ i -> action i), (), degreeOfParallelism)
    |> ignore

Note that the last line of MapConsume reverses the list of results if there is no parallelism (degreeOfParallelism = 1). This is done to return the results in the order that corresponds to the order of the input, in the special case of one single producer and a non parallel consumer. A short example illustrates this:

let pc = ProducerConsumer([[0..3]], 4)
let mapped = pc.MapConsume((fun i -> i*10), 1)

In the example above, the value of mapped will be [0; 10; 20; 30], whereas the order of that list would be nondeterministic for any degree of parallelism higher than 1. In the latter cases, it doesn’t make sense to waste CPU cycles reversing the list, so we don’t.

Putting it all together

Assembling all code our fragments into a finished class gives us:

open System
open System.Threading

// Type for the producer/consumer pattern. Both producers and consumers
// can be multiple and run in parallel.
// Construct with a sequence of input sequences, which are the producers and 
// will be enumerated in parallel, and the queue size.
type ProducerConsumer<'T>(inputSeqs : seq<seq<'T>>, qSize : int) as this =
    let queue            = Array.create qSize (None : option<'T>)  // The queue is a wrap-around array.
    let qNextItemToRead  = ref 0  // Next item no to read from the input stream.
    let qNextItemToQueue = ref 0  // Next item no to put in the queue.
    let emptyCount       = new Semaphore(qSize, qSize)  // Empty slots in the queue.
    let fullCount        = new Semaphore(0, qSize)      // Filled slots in the queue.
    let endOfInput       = new EventWaitHandle(false, EventResetMode.ManualReset)
    let canceled         = new EventWaitHandle(false, EventResetMode.ManualReset)

    // Copied from F# source code for Seq.takeWhile and modified so that p's
    // argument is unit and p is checked before e.MoveNext instead of after.
    let takeWhilePre p (source: seq<_>) = 
        seq { use e = source.GetEnumerator() 
              let latest = ref Unchecked.defaultof<_>
              while p() && e.MoveNext() do
                  latest := e.Current
                  yield !latest }

    let produce (inputSeq : seq<'T>) =
        let cancellableSeq = inputSeq |> takeWhilePre (fun _ -> not this.IsCanceled)
        for item in cancellableSeq do
            let isCanceled = WaitHandle.WaitAny([|canceled; emptyCount|]) = 0
            if not isCanceled then
                let itemToQueue = System.Threading.Interlocked.Increment(&qNextItemToQueue.contents) - 1
                queue.[itemToQueue % qSize] <- Some item
                fullCount.Release() |> ignore

    let produceAll (inputSeqs : seq<seq<'T>>) =
        [ for i in inputSeqs -> async { produce i } ]
        |> Async.Parallel
        |> Async.Ignore
        |> Async.RunSynchronously
        endOfInput.Set() |> ignore

    do Async.Start (async { produceAll inputSeqs })

    member x.Cancel() = canceled.Set() |> ignore
    member x.IsCanceled = canceled.WaitOne(0)

    // Get the next item.
    // Returns None if the ProducerConsumer is canceled or there are no more items,
    // oterwise returns Some 'T.
    member x.ConsumeOne() =
        // The order of WaitHandles in WaitAny arg is important.
        let waitHandleIndex = WaitHandle.WaitAny([|canceled; fullCount; endOfInput|])
        let isCanceledOrEndOfInput = waitHandleIndex = 0 || waitHandleIndex = 2

        if isCanceledOrEndOfInput then
            None
        else 
            let itemToRead = System.Threading.Interlocked.Increment(&qNextItemToRead.contents) - 1
            let result = queue.[itemToRead % qSize]
            queue.[itemToRead % qSize] <- None
            emptyCount.Release() |> ignore
            result

    // Analogous to Seq.fold.
    // Returns a result state for each degree of parallelism.
    member x.FoldConsume(folder, state, degreeOfParallelism) =
        let rec consumeLoop folder state =
            let item = x.ConsumeOne()
            match item with
            | Some i ->
                let state' = folder state i
                consumeLoop folder state'
            | None -> state

        [ for parallelSlot in 0..degreeOfParallelism-1 -> async { return (consumeLoop folder state) } ]
        |> Async.Parallel
        |> Async.RunSynchronously

    // Analogous to List.map.
    // If degreeOfParallelism = 1 and there is a single input sequence, the 
    // results are in the same order as the input sequence, otherwise the
    // order is nondeterministic.
    member x.MapConsume(mapping, degreeOfParallelism) =
        let result =
            x.FoldConsume((fun acc i -> (mapping i)::acc), [], degreeOfParallelism)
            |> List.concat
        if degreeOfParallelism = 1 then result |> List.rev else result

    // Analogous to Seq.iter.
    member x.IterConsume(action, degreeOfParallelism) =
        x.FoldConsume((fun _ i -> action i), (), degreeOfParallelism)
        |> ignore

Taking the ProducerConsumer class for a spin

With the class in place, the next step is to take it for a spin by using it from some test code. To be able to print to the console without mixing output from different threads into an indecipherable character salad, all printing is done in a critical section:

let mutexPrintLock = new Object()
let mutexPrint s = lock mutexPrintLock (fun () -> printf "%s" s)

As producers, we use a sequences that loop over a collection of items, simulating work by pausing before each item is yielded. We’ll create such a sequence by this function:

let producerSeq items (sleepTime : int) = 
    seq { for i in items do
                mutexPrint (sprintf "Generating value %d.\n" i)
                Thread.Sleep(sleepTime)  // Simulate work, e.g. database access.
                yield i }

Production is set in motion when we create an instance of the ProducerConsumer class, in this case feeding it 2 producers and specifying a queue length of 8 items:

let list1 = [1..20]
let list2 = [1000..1000..4000]

let pc = ProducerConsumer([producerSeq list1 500; producerSeq list2 2000], 8)

Consumption is performed by a function that also fakes work by taking a nap:

let consumer i =
    mutexPrint (sprintf "Thread %d got %d from the queue.\n" Thread.CurrentThread.ManagedThreadId i)
    Thread.Sleep(3000)  // Simulate work.
    i

Finally, we start the consumption on 4 parallel threads and wait for the result:

let result = pc.MapConsume(consumer, 4)
printfn "\nResult: %A" result

The test code in its’ entirety, a bit more fleshed out, looks like:

let mutexPrintLock = new Object()
let mutexPrint s = lock mutexPrintLock (fun () -> printf "%s" s)

let producerSeq items (sleepTime : int) = 
    seq { for i in items do
                mutexPrint (sprintf "Generating value %d.\n" i)
                Thread.Sleep(sleepTime)  // Simulate work, e.g. database access.
                yield i }

let list1 = [1..20]
let list2 = [1000..1000..4000]

let pc = ProducerConsumer([producerSeq list1 500; producerSeq list2 2000], 8)

// Uncomment to try cancellation.
// let cancelTimer = new Timer((fun _ -> mutexPrint "Cancelling!\n"; pc.Cancel()), null, 4900, Timeout.Infinite)

let consumer i =
    mutexPrint (sprintf "Thread %d got %d from the queue.\n" Thread.CurrentThread.ManagedThreadId i)
    Thread.Sleep(3000)  // Simulate work.
    i

let result = pc.MapConsume(consumer, 4)
let sortedResult = result |> List.sort

printfn "\nResult: %A" result
printfn "Sorted result: %A\n" sortedResult

if sortedResult = list1 @ list2 then printfn "Results test OK."
else printfn "Incorrect results generated."

And that, ladies and gentlemen, is how we do that.

  1. No comments yet.

  1. No trackbacks yet.