Minimal implementations in Modern C++: Producer-Consumer problem

Apr 20, 2020

This implementation was inspired by Stackoverflow user Yakk - Adam Nevraumont ’s answer for a question about std::condition_variable . I extended it to make it a working example, and I plan to use it as a base for a new project I have in mind.

What is the Producer-Consumer problem?

This problem is a classic example of synchronization and parallel computing. Suppose you have available multiple execution threads. In addition to that, you also have tasks that can be executed at the same time, independent from each other. The problem now is how we distribute the tasks between those threads.

Producer - Consumer problem

In a simple producer-consumer problem, the producer will create tasks and push them to a queue. The queue thread will handle the synchronization. The consumer threads will wake up only when they have to execute a task. When the consumer thread finishes a task, it sleeps until the queue thread sends a new signal.

To the implementation

In C++11 a new synchronization primitive was introduced: std::condition_variable . This primitive allows us to block one or more threads until a shared state is modified and the thread, or threads, has been modified. This primitive needs a std::mutex for the blocker thread to modify the stated state, and for the blocked threads to wait on the condition variable.

As an overview, our producer and queue will be on the same thread. Our consumers will run on separate threads and will get the std::condition_variable and std::mutex from the queue. They will start a loop in which they will wait on the condition variable and check if there is a new task, or if the queue has stopped pushing new tasks. Our sample program will send 10 tasks, wait for an amount of time, and stop the queue.

The task we are going to push into the queue is going to be very simple. It will require an Id number, a duration in milliseconds, and a mutex.

 1#pragma once
 2#include <chrono>
 3#include <mutex>
 4
 5class Task {
 6   public:
 7    Task(unsigned int id, std::chrono::milliseconds duration,
 8         std::mutex& coutMutex);
 9
10    void Execute();
11
12    unsigned int GetId() const { return m_Id; }
13
14   private:
15    unsigned int m_Id = 0;
16    std::chrono::milliseconds m_Duration;
17    std::mutex& m_CoutMutex;
18};

We are going to use this mutex to have exclusive use in the std::cout buffer. Otherwise, our messages are going to be all mixed out in the standard output.

The Execute method will also be very simple: it will sleep the thread for the m_Duration milliseconds, and print out this duration before exiting.

1void Task::Execute() {
2    std::this_thread::sleep_for(m_Duration);
3    {
4        std::scoped_lock<std::mutex> guard(m_CoutMutex);
5        std::cout << "Task {" << m_Id << "} finished in " << m_Duration.count()
6                  << "ms.\n";
7    }
8}

The std::scoped_lock was introduced in C++17 and it works similarly as std::lock_guard. One difference is that std::scoped_lock allows us to try to acquire more than one std::mutex, preventing possible deadlocks.

It is very important to surround the critical section in brackets. That will make the lock to release the std::mutex as soon as it is no longer needed. Remember that std::scoped_lock behaves according to the RAII principle .

We are going to implement a TaskQueue class to wrap our std::queue and our synchronization mechanism.

 1#pragma once
 2#include <mutex>
 3#include <queue>
 4#include <tuple>
 5
 6class Task;
 7
 8class TaskQueue {
 9   public:
10    ~TaskQueue();
11
12    // Thread safe functions for producers
13    void PushTask(Task* t);
14    void PushTasks(std::vector<Task*>& tasks);
15    void StopQueue();
16    
17    // Way for consumers to get the sync variables
18    std::tuple<std::mutex&, std::condition_variable&> Subscribe();
19    
20    // Non-thread safe function. Consumers must ensure
21    // lock acquisition
22    bool HasPendingTask() const { return !m_Queue.empty(); }
23    bool IsQueueStopped() const { return m_QueueIsStopped; }
24    Task* GetNextTask();
25   
26   private:
27    bool m_QueueIsStopped = false;
28    std::queue<Task*> m_Queue;
29    std::mutex m_Mutex;
30    std::condition_variable m_ConditionVariable;
31};

Producers will interact with this queue by pushing Task instances. In our case, we will handle the object destruction in the queue, but it can be improved with smart pointers. StopQueue will allow the producer to stop the queue and send a signal to the consumers that the queue is no longer pushing new tasks.

Consumers will get access to the std::mutex, and std::condition_variable to wait for new tasks. The PushTask and PushTasks functions will acquire the mutex to exclusively add new tasks. After that, the lock is no longer needed to wake up the waiting threads.

 1void TaskQueue::PushTask(Task* t) {
 2    {
 3        std::scoped_lock l{m_Mutex};
 4        m_Queue.push(t);
 5    }
 6    m_ConditionVariable.notify_one();
 7}
 8
 9void TaskQueue::PushTasks(std::vector<Task*>& tasks) {
10    {
11        std::scoped_lock l{m_Mutex};
12        for (Task* t : tasks) {
13            m_Queue.push(t);
14        }
15    }
16    m_ConditionVariable.notify_all();
17}

When the consumers awake, they will use the non-thread safe functions to verify if they need to either: consume a new task, or finish execution because the queue will no longer send new tasks.

For the consumer thread, I will break the method for a more detailed look:

1void WorkerThread(const int workerId, TaskQueue& taskQueue,
2                  std::mutex& coutMutex) {
3    auto& [m, cv] = taskQueue.Subscribe();
4    

This declaration is a new feature introduced in C++17 called Structured binding declaration . It makes the declaration more readable, and it is the same as declaring and assigning to a std::tuple. Now that we have the synchronization mechanism, we can go into our main loop:

 5    while (true) {
 6        auto data = [&]() -> std::optional<Task*> {
 7            std::unique_lock l{m};
 8            cv.wait(l, [&] {
 9                return taskQueue.IsQueueStopped() || taskQueue.HasPendingTask();
10            });
11            if (taskQueue.IsQueueStopped()) {
12                return {};
13            } else {
14                Task* taskToProcess = taskQueue.GetNextTask();
15                assert(taskToProcess != nullptr);
16                return taskToProcess;
17            }
18        }();

We will try to get out Task using C++17’s std::optional. This container allows us to create an instance that may not have a value after initialization. The worker thread will wait for it to be woken up inside of the std::optional constructor. If the task queue has a pending task, we will get it and return it as the value inside the std::optional.

17        if (!data) {
18            break;
19        }
20
21        Task* taskPtr = *task;
22        {
23            std::scoped_lock<std::mutex> guard(coutMutex);
24            std::cout << "Worker {" << workerId << "} is executing task {"
25                      << taskPtr->GetId() << "}.\n";
26        }
27
28        // process the data
29        taskPtr->Execute();
30        delete taskPtr;
31    }
32}

If the std::optional returns without a value, the thread ends execution. Otherwise we print a debug message and execute the task.

Let’s put it all together in our main function:

 1int main(int argc, char* argv[]) {
 2    const unsigned int thread_pool_size =
 3        std::thread::hardware_concurrency() - 1;
 4    assert(thread_pool_size > 0);
 5
 6    std::mutex coutMutex;
 7
 8    std::vector<std::thread> thread_pool(thread_pool_size);
 9    TaskQueue taskQueue;
10
11    for (size_t i = 0; i < thread_pool_size; ++i) {
12        thread_pool[i] = std::thread(WorkerThread, i, std::ref(taskQueue),
13                                     std::ref(coutMutex));
14    }

Here we create an array of worker threads. At this point they will start execution and immediately sleep until we push new tasks.

15    std::random_device rd;
16    std::mt19937 gen(rd());
17    std::uniform_int_distribution<> taskDuration(0, 1000);
18
19    for (int i = 0; i < 5; ++i) {
20        taskQueue.PushTask(
21            new Task(i, std::chrono::milliseconds(taskDuration(gen)),
22                     std::ref(coutMutex)));
23    }
24
25    std::vector<Task*> taskBatch;
26    taskBatch.resize(5);
27    for (int i = 0; i < 5; ++i) {
28        taskBatch[i] =
29            new Task(i + 5, std::chrono::milliseconds(taskDuration(gen)),
30                     std::ref(coutMutex));
31    }
32    taskQueue.PushTasks(taskBatch);

Usually, the producer thread is also a loop that creates tasks until the program finishes. For our example, we will create only 10 tasks with random durations. The duration is a random value from 0 to 1000 milliseconds.

33    std::this_thread::sleep_for(std::chrono::seconds(10));
34    taskQueue.StopQueue();
35
36    for (size_t i = 0; i < thread_pool_size; ++i) {
37        thread_pool[i].join();
38    }
39    return 0;
40}

To make the end of our program simple, we will sleep the producer thread for 10 seconds to hopefully process all tasks, and stop the queue. In the last statement, we wait for all consumers to finish, and we exit the program.

An example output of this program may be:

Worker {0} is executing task {0}.
Worker {1} is executing task {1}.
Worker {2} is executing task {2}.
Worker {3} is executing task {3}.
Worker {4} is executing task {4}.
Task {4} finished in 418ms.
Worker {4} is executing task {5}.
Task {3} finished in 526ms.
Worker {3} is executing task {6}.
Task {5} finished in 179ms.
Worker {4} is executing task {7}.
Task {2} finished in 640ms.
Worker {2} is executing task {8}.
Task {1} finished in 781ms.
Worker {1} is executing task {9}.
Task {6} finished in 277ms.
Task {0} finished in 912ms.
Task {7} finished in 337ms.
Task {8} finished in 527ms.
Task {9} finished in 861ms.
Cpp DevelopmentC++Design patterns

Memory allocators in C++ - Part 1

Make your pointers smart - C++ good practices