Filesystem

Thread Safe Queue - Revisit

Introduction

One of my engineering assignments from my computing module was to create a thread safe queue. I correctly completed the assignment, but I have recently revisited concurrency theory and it has left me thinking about what I could've been done better.

Here's my original attempt, I have ommitted parts for brevity but kept what I think is important. In short, I locked over the body of every method:

struct item {
    string str;
    int num;

    item(string s, int n) {
        str = s;
        num = n;
    }
};

class ts_queue {
    private:
        vector<item> v;
        mutex m;

    public:
        void add (item i) {
            lock_guard<mutex> l(m);
            v.insert(v.begin(), i);
        }
        item remove () {
            lock_guard<mutex> l(m);
            item i = v.back();
            v.pop_back();
            return i;
        }
        void populate (int n_items);
        void reverse () {
            lock_guard<mutex> l(m);
            // reverse in place by index
        }
        void print ();
        void random_delete () {
            lock_guard<mutex> l(m);
            // v.erase random element
        }
        bool empty ();
};

Add

To achieve queue-like behaviour I inserted at the front of the vector and popped from the back. Vectors are contiguous in memory for index access and insertion shifts every subsequent element along in memory, leading to O(N) time complexity. In particular, it is O(N) for memory read-writes, which could cause lots of cache synchronisation traffic between cores. Further more, this will invalidate caches of the back element, which was likely cached by the thread was removing elements.

Using a ring buffer, producer and consumer indicies that progress through an array and wrap at the end, would be O(1) for add and remove.

Contention between add and remove

In many cases add and remove are operating on different parts of the data structure. The concurrency lectures illustrated to me that semaphores can be used to elegantly synchronise the producer and the consumer, so they don't touch the same state. Combining this with separate mutexes for p & c will allow them to work concurrently.

Reverse

The improvements so far invite a much better treatment of the reverse method, you can switch the direction that you move around the circular buffer, the switch itself needs to be done under a lock.

Other ops

Less common operations (such as print or random_delete) could still acquire a global lock for simplicity, I will start with this for now. I want to look at using atomics too.

Plan

To experiment with different thread safe queues I will create tests to validate correct behaviour and a benchmark app which will measure performance for an example task.

My engineering work was a single file, to facilitate my experimenation I have broken this up into the following directory structure:

thread-safe-queue/
├── CMakeLists.txt
├── include/
│   └── ts_queue.h      - queue interface
├── src/
│   ├── naive_queue.cc  - naive implementation
│   ├── ring_queue.cc   - ring implementation
├── tests/
│   └── main.cc
├── benchmarks/
    └── main.cc

Usage:

cmake -S . -B build && cmake --build build

./build/bench_naive
./build/bench_ring

./build/tests_naive
./build/tests_ring

Implementation

I would like to compile different implemenations for my original attempt and my new ring based queue. My instinct from C is to define a header and compile different source files. But this is a C++ object with methods.

I can mark the methods as virtual, this will allow me to override them in derived classes for each of my implementations. This dynamic dispatch adds runtime overhead and I don't need it, I want to pick the implemenation at compile time.

So I can leave the methods as normal functions on the ts_queue class in the header. But, my two implementations would like to store different state on the object - eg. ring vs. vector.

PIMPL

this is not correct

This is a usecase for the PIMPL pattern. Instead, the header declares the object with members - impl struct holding implemenation specific members - pimpl a pointer to impl

...
    private:
    struct impl;
    std::unique_ptr<impl> pimpl;
};

This allows an interface to be declared while maintaining static allocation.

Then, the implementations need to declare an impl struct and in their constructor, instantiate it and declare pimpl a unique pointer to it.

template<typename T>
struct ts_queue<T>::impl {
    T ring[N];
    std::atomic<size_t> head{0};
    std::atomic<size_t> tail{0};
    std::counting_semaphore<N> spaces{N};
    std::counting_semaphore<N> items{0};
    std::mutex m;
};
ts_queue<T>::ts_queue(): pimpl(std::make_unique<impl>()) {}

Templating

Early on I realised that any logging I do from my threads gets mashed together in std out. The very thread safe queue that I am creating can be used to solve this, so I made the object type agnostic. The implementations instantiate the template with a concrete type (e.g. ts_queue), generating the required code at compile time.

segfault...

Inevitable. So at this point I've implemented my initial ring based queue. I wrote a benchmark that worked happily on the ring queue that looks like this:

void worker_remove (ts_queue<item> &q, ts_queue<string> &log_q) {
    while (true) {
        item it = q.remove();
        ...
    }
}

void worker_log(ts_queue<string> &log_q) {
    while (true) {
        string s = log_q.remove();
        cout << s << "\n";
    }
}

int main () {
    ts_queue<item> q;
    ts_queue<string> log_q;
    thread t_add(worker_add, ref(q), ref(log_q));
    thread t_remove(worker_remove, ref(q), ref(log_q));
    thread t_log(worker_log, ref(log_q));
    thread_guard g1(t_add);
    thread_guard g2(t_remove);
    thread_guard g3(t_log);
}

But my original queue segfaulted. This is because in my benchmark the thread calls remove even if the thread is not empty. This is fine on my ring queue because the removal function just blocks until there is an element to remove. But this is not safe at all on my original, v.back is UB if there are no elements in the queue. (my original queue was tested against a different sample exercise which checked the queue was not empty before calling remove)

API contracts

This has opened a can of worms:

remove():
    - original: unsafe and non-blocking
    - new: blocking
add():
    - original: blocking and unbounded
    - new: blocking and bounded

Whether a function is blocking or not is part of the contract of the API, it needs to be consistent. And for a fair comparison, I'll make both queues bounded.

I need to modify my original implementation, I'm going to make removal block and bound the queue size so that an apples to apples comparison can be made.

condition variable

Benchmarking

So I now have two implementations, I'd like to improve my benchmark app so that I can time it and, in theory, observe a difference in performance.

My benchmark program will take