Ruby concurrency: in praise of condition variables

In a previous post, we talked about the benefits conferred by Ruby mutexes. While a programmer’s familiarity with mutexes is likely to depend on what kind of programs she usually writes, most developers tend to be at least somewhat familiar with these particular synchronization primitives. This article, however, is going to focus on a much lesser-known synchronization construct: the condition variable.

Condition variables are used for putting threads to sleep and waking them back up once a certain condition is met. Don’t worry if this sounds a bit vague; we’ll go into a lot more detail later. As condition variables always need to be used in conjunction with mutexes, we’ll lead with a quick mutex recap. Next, we’ll introduce consumer-producer problems and how to elegantly solve them with the aid of condition variables. Then, we’ll have a look at how to use these synchronization primitives for implementing blocking method calls. Finishing up, we’ll describe some curious condition variable behavior and how to safeguard against it.

A mutex recap

A mutex is a data structure for protecting shared state between multiple threads. When a piece of code is wrapped inside a mutex, the mutex guarantees that only one thread at a time can execute this code. If another thread wants to start executing this code, it’ll have to wait until our first thread is done with it. I realize this may all sound a bit abstract, so now is probably a good time to bring in some example code.

Writing to shared state

In this first example, we’ll have a look at what happens when two threads try to modify the same shared variable. The snippet below shows two methods: counters_with_mutex and counters_without_mutex. Both methods start by creating a zero-initialized counters array before spawning 5 threads. Each thread will perform 100,000 loops, with every iteration incrementing all elements of the counters array by one. Both methods are the same in every way except for one thing: only one of them uses a mutex.

def counters_with_mutex
  mutex = Mutex.new
  counters = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

  5.times.map do
    Thread.new do
      100000.times do
        mutex.synchronize do
          counters.map! { |counter| counter + 1 }
        end
      end
    end
  end.each(&:join)

  counters.inspect
end

def counters_without_mutex
  counters = [0, 0, 0, 0, 0, 0, 0, 0, 0, 0]

  5.times.map do
    Thread.new do
      100000.times do
        counters.map! { |counter| counter + 1 }
      end
    end
  end.each(&:join)

  counters.inspect
end

puts counters_with_mutex
# => [500000, 500000, 500000, 500000, 500000, 500000, 500000, 500000, 500000, 500000]

puts counters_without_mutex
# => [500000, 447205, 500000, 500000, 500000, 500000, 203656, 500000, 500000, 500000]
# note that we seem to have lost some increments here due to not using a mutex

As you can see, only the method that uses a mutex ends up producing the correct result. The method without a mutex seems to have lost some increments. This is because the lack of a mutex makes it possible for our second thread to interrupt our first thread at any point during its execution. This can lead to some serious problems.

For example, imagine that our first thread has just read the first entry of the counters array, incremented it by one, and is now getting ready to write this incremented value back to our array. However, before our first thread can write this incremented value, it gets interrupted by the second thread. This second thread then goes on to read the current value of the first entry, increments it by one, and succeeds in writing the result back to our counters array. Now we have a problem!

We have a problem because the first thread got interrupted before it had a chance to write its incremented value to the array. When the first thread resumes, it will end up overwriting the value that the second thread just placed in the array. This will cause us to essentially lose an increment operation, which explains why our program output has entries in it that are less than 500,000.

All these problems can be avoided by using a mutex. Remember that a thread executing code wrapped by a mutex cannot be interleaved with another thread wanting to execute this same code. Therefore, our second thread would never have gotten interleaved with the first thread, thereby avoiding the possibility of results getting overwritten.

Reading from shared state

There’s a common misconception that a mutex is only required when writing to a shared variable, and not when reading from it. The snippet below shows 50 threads flipping the boolean values in the flags array over and over again. Many developers think this snippet is without error as the code responsible for changing these values was wrapped inside a mutex. If that were true, then every line of the output of puts flags.to_s should consist of 10 repetitions of either true or false. As we can see below, this is not the case.

mutex = Mutex.new
flags = [false, false, false, false, false, false, false, false, false, false]

threads = 50.times.map do
  Thread.new do
    100000.times do
      # don't do this! Reading from shared state requires a mutex!
      puts flags.to_s

      mutex.synchronize do
        flags.map! { |f| !f }
      end
    end
  end
end
threads.each(&:join)
$ ruby flags.rb > output.log
$ grep 'true, false' output.log | wc -l
    30

What’s happening here is that our mutex only guarantees that no two threads can modify the flags array at the same time. However, it is perfectly possible for one thread to start reading from this array while another thread is busy modifying it, thereby causing the first thread to read an array that contains both true and false entries. Luckily, all of this can be easily avoided by wrapping puts flags.to_s inside our mutex. This will guarantee that only one thread at a time can read from or write to the flags array.

Before moving on, I would just like to mention that even very experienced people have gotten tripped up by not using a mutex when accessing shared state. In fact, at one point there even was a Java design pattern that assumed it was safe to not always use a mutex to do so. Needless to say, this pattern has since been amended.

Consumer-producer problems

With that mutex refresher out of the way, we can now start looking at condition variables. Condition variables are best explained by trying to come up with a practical solution to the consumer-producer problem. In fact, consumer-producer problems are so common that Ruby already has a data structure aimed at solving these: the Queue class. This class uses a condition variable to implement the blocking variant of its shift() method. In this article, we made a conscious decision not to use the Queue class. Instead, we’re going to write everything from scratch with the help of condition variables.

Let’s have a look at the problem that we’re going to solve. Imagine that we have a website where users can generate tasks of varying complexity, e.g. a service that allows users to convert uploaded jpg images to pdf. We can think of these users as producers of a steady stream of tasks of random complexity. These tasks will get stored on a backend server that has several worker processes running on it. Each worker process will grab a task, process it, and then grab the next one. These workers are our task consumers.

With what we know about mutexes, it shouldn’t be too hard to write a piece of code that mimics the above scenario. It’ll probably end up looking something like this.

tasks = []
mutex = Mutex.new
threads = []

class Task
  def initialize
    @duration = rand()
  end

  def execute
    sleep @duration
  end
end

# producer threads
threads += 2.times.map do
  Thread.new do
    while true
      mutex.synchronize do
        tasks << Task.new
        puts "Added task: #{tasks.last.inspect}"
      end
      # limit task production speed
      sleep 0.5
    end
  end
end

# consumer threads
threads += 5.times.map do
  Thread.new do
    while true
      task = nil
      mutex.synchronize do
        if tasks.count > 0
          task = tasks.shift
          puts "Removed task: #{task.inspect}"
        end
      end
      # execute task outside of mutex so we don't unnecessarily
      # block other consumer threads
      task.execute unless task.nil?
    end
  end
end

threads.each(&:join)

The above code should be fairly straightforward. There is a Task class for creating tasks that take between 0 and 1 seconds to run. We have 2 producer threads, each running an endless while loop that safely appends a new task to the tasks array every 0.5 seconds with the help of a mutex. Our 5 consumer threads are also running an endless while loop, each iteration grabbing the mutex so as to safely check the tasks array for available tasks. If a consumer thread finds an available task, it removes the task from the array and starts processing it. Once the task had been processed, the thread moves on to its next iteration, thereby repeating the cycle anew.

While the above implementation seems to work just fine, it is not optimal as it requires all consumer threads to constantly poll the tasks array for available work. This polling does not come for free. The Ruby interpreter has to constantly schedule the consumer threads to run, thereby preempting threads that may have actual important work to do. To give an example, the above code will interleave consumer threads that are executing a task with consumer threads that just want to check for newly available tasks. This can become a real problem when there is a large number of consumer threads and only a few tasks.

If you want to see for yourself just how inefficient this approach is, you only need to modify the original code for consumer threads with the code shown below. This modified program prints well over a thousand lines of This thread has nothing to do for every single line of Removed task. Hopefully, this gives you an indication of the general wastefulness of having consumer threads constantly poll the tasks array.

# modified consumer threads code
threads += 5.times.map do
  Thread.new do
    while true
      task = nil
      mutex.synchronize do
        if tasks.count > 0
          task = tasks.shift
          puts "Removed task: #{task.inspect}"
        else
          puts 'This thread has nothing to do'
        end
      end
      # execute task outside of mutex so we don't unnecessarily
      # block other consumer threads
      task.execute unless task.nil?
    end
  end
end

Condition variables to the rescue

So how we can create a more efficient solution to the consumer-producer problem? That is where condition variables come into play. Condition variables are used for putting threads to sleep and waking them only once a certain condition is met. Remember that our current solution to the producer-consumer problem is far from ideal because consumer threads need to constantly poll for new tasks to arrive. Things would be much more efficient if our consumer threads could go to sleep and be woken up only when a new task has arrived.

Shown below is a solution to the consumer-producer problem that makes use of condition variables. We’ll talk about how this works in a second. For now though, just have a look at the code and perhaps have a go at running it. If you were to run it, you would probably see that This thread has nothing to do does not show up anymore. Our new approach has completely gotten rid of consumer threads busy polling the tasks array.

The use of a condition variable will now cause our consumer threads to wait for a task to be available in the tasks array before proceeding. As a result of this, we can now remove some of the checks we had to have in place in our original consumer code. I’ve added some comments to the code below to help highlight these removals.

tasks = []
mutex = Mutex.new
cond_var = ConditionVariable.new
threads = []

class Task
  def initialize
    @duration = rand()
  end

  def execute
    sleep @duration
  end
end

# producer threads
threads += 2.times.map do
  Thread.new do
    while true
      mutex.synchronize do
        tasks << Task.new
        cond_var.signal
        puts "Added task: #{tasks.last.inspect}"
      end
      # limit task production speed
      sleep 0.5
    end
  end
end

# consumer threads
threads += 5.times.map do
  Thread.new do
    while true
      task = nil
      mutex.synchronize do
        while tasks.empty?
          cond_var.wait(mutex)
        end

        # the `if tasks.count == 0` statement will never be true as the thread
        # will now only reach this line if the tasks array is not empty
        puts 'This thread has nothing to do' if tasks.count == 0

        # similarly, we can now remove the `if tasks.count > 0` check that
        # used to surround this code. We no longer need it as this code will
        # now only get executed if the tasks array is not empty.
        task = tasks.shift
        puts "Removed task: #{task.inspect}"
      end
      # Note that we have now removed `unless task.nil?` from this line as
      # our thread can only arrive here if there is indeed a task available.
      task.execute
    end
  end
end

threads.each(&:join)

Aside from us removing some if statements, our new code is essentially identical to our previous solution. The only exception to this are the five new lines shown below. Don’t worry if some of the accompanying comments don’t quite make sense yet. Now is also a good time to point out that the new code for both the producer and consumer threads was added inside the existing mutex synchronization blocks. Condition variables are not thread-safe and therefore always need to be used in conjunction with a mutex!

# declaring the condition variable
cond_var = ConditionVariable.new
# a producer thread now signals the condition variable
# after adding a new task to the tasks array
cond_var.signal
# a consumer thread now goes to sleep when it sees that
# the tasks array is empty. It can get woken up again
# when a producer thread signals the condition variable.
while tasks.empty?
  cond_var.wait(mutex)
end

Let’s talk about the new code now. We’ll start with the consumer threads snippet. There’s actually so much going on in these three lines that we’ll limit ourselves to covering what cond_var.wait(mutex) does for now. We’ll explain the need for the while tasks.empty? loop later. The first thing to notice about the wait method is the parameter that’s being passed to it. Remember how a condition variable is not thread-safe and therefore should only have its methods called inside a mutex synchronization block? It is that mutex that needs to be passed as a parameter to the wait method.

Calling wait on a condition variable causes two things to happen. First of all, it causes the thread that calls wait to go to sleep. That is to say, the thread will tell the interpreter that it no longer wants to be scheduled. However, this thread still has ownership of the mutex as it’s going to sleep. We need to ensure that the thread relinquishes this mutex because otherwise all other threads waiting for this mutex will be blocked. By passing this mutex to the wait method, the wait method internals will ensure that the mutex gets released as the thread goes to sleep.

Let’s move on to the producer threads. These threads are now calling cond_var.signal. The signal method is pretty straightforward in that it wakes up exactly one of the threads that were put to sleep by the wait method. This newly awoken thread will indicate to the interpreter that it is ready to start getting scheduled again and then wait for its turn.

So what code does our newly awoken thread start executing once it gets scheduled again? It starts executing from where it left off. Essentially, a newly awoken thread will return from its call to cond_var.wait(mutex) and resume from there. Personally, I like to think of calling wait as creating a save point inside a thread from which work can resume once the thread gets woken up and rescheduled again. Please note that since the thread wants to resume from where it originally left off, it’ll need to reacquire the mutex in order to get scheduled. This mutex reacquisition is very important, so be sure to remember it.

This segues nicely into why we need to use while tasks.empty? when calling wait in a consumer thread. When our newly awoken thread resumes execution by returning from cond_var.wait, the first thing it’ll do is complete its previously interrupted iteration through the while loop, thereby evaluating while tasks.empty? again. This actually causes us to neatly avoid a possible race condition.

Let’s say we don’t use a while loop and use an if statement instead. The resulting code would then look like shown below. Unfortunately, there is a very hard to find problem with this code. Note how we now need to re-add the previously removed if tasks.count > 0 and unless task.nil? statements to our code below in order to ensure its safe execution.

# consumer threads
threads += 5.times.map do
  Thread.new do
    while true
      task = nil
      mutex.synchronize do
        cond_var.wait(mutex) if tasks.empty?

        # using `if tasks.empty?` forces us to once again add this
        # `if tasks.count > 0` check. We need this check to protect
        # ourselves against a nasty race condition.
        if tasks.count > 0
          task = tasks.shift
          puts "Removed task: #{task.inspect}"
        else
          puts 'This thread has nothing to do'
        end
      end
      # using `if tasks.empty?` forces us to re-add `unless task.nil?`
      # in order to safeguard ourselves against a now newly introduced
      # race condition
      task.execute unless task.nil?
    end
  end
end

Imagine a scenario where we have:

A consumer thread that’s awake will go back to sleep only when there are no more tasks in the tasks array. That is to say, a single consumer thread will keep processing tasks until no more tasks are available. Now, let’s say one of our producer threads adds a new task to the currently empty tasks array before calling cond_var.signal at roughly the same time as our active consumer thread is finishing its current task. This signal call will awaken one of our sleeping consumer threads, which will then try to get itself scheduled. This is where a race condition is likely to happen!

We’re now in a position where two consumer threads are competing for ownership of the mutex in order to get scheduled. Let’s say our first consumer thread wins this competition. This thread will now go and grab the task from the tasks array before relinquishing the mutex. Our second consumer thread then grabs the mutex and gets to run. However, as the tasks array is empty now, there is nothing for this second consumer thread to work on. So this second consumer thread now has to do an entire iteration of its while true loop for no real purpose at all.

We now find ourselves in a situation where a complete iteration of the while true loop can occur even when the tasks array is empty. This is a not unlike the position we were in when our program was just busy polling the tasks array. Sure, our current program will be more efficient than busy polling, but we will still need to safeguard our code against the possibility of an iteration occurring when there is no task available. This is why we needed to re-add the if tasks.count > 0 and unless task.nil? statements. Especially the latter of these two is important, as otherwise our program might crash with a NilException.

Luckily, we can safely get rid of these easily overlooked safeguards by forcing each newly awakened consumer thread to check for available tasks and having it put itself to sleep again if no tasks are available. This behavior can be accomplished by replacing the if tasks.empty? statement with a while tasks.empty? loop. If tasks are available, a newly awoken thread will exit the loop and execute the rest of its code. However, if no tasks are found, then the loop is repeated, thereby causing the thread to put itself to sleep again by executing cond_var.wait. We’ll see in a later section that there is yet another benefit to using this while loop.

Building our own Queue class

At the beginning of a previous section, we touched on how condition variables are used by the Queue class to implement blocking behavior. The previous section taught us enough about condition variables for us to go and implement a basic Queue class ourselves. We’re going to create a thread-safe SimpleQueue class that is capable of:

It’s easy enough to write code that meets these first two criteria. It will probably end up looking something like the code shown below. Note that our SimpleQueue class is using a mutex as we want this class to be thread-safe, just like the original Queue class.

class SimpleQueue
  def initialize
    @elems = []
    @mutex = Mutex.new
  end

  def <<(elem)
    @mutex.synchronize do
      @elems << elem
    end
  end

  def shift(blocking = true)
    @mutex.synchronize do
      if blocking
        raise 'yet to be implemented'
      end
      @elems.shift
    end
  end
end

simple_queue = SimpleQueue.new
simple_queue << 'foo'

simple_queue.shift(false)
# => "foo"

simple_queue.shift(false)
# => nil

Now let’s have a look at what’s needed to implement the blocking shift behavior. As it turns out, this is actually very easy. We only want the thread to block if the shift method is called when the @elems array is empty. This is all the information we need to determine where we need to place our condition variable’s call to wait. Similarly, we want the thread to stop blocking once the << operator appends a new element, thereby causing @elems to no longer be empty. This tells us exactly where we need to place our call to signal.

In the end, we just need to create a condition variable that makes the thread go to sleep when a blocking shift is called on an empty SimpleQueue. Likewise, the << operator just needs to signal the condition variable when a new element is added, thereby causing the sleeping thread to be woken up. The takeaway from this is that blocking methods work by causing their calling thread to fall asleep. Also, please note that the call to @cond_var.wait takes place inside a while @elems.empty? loop. Always use a while loop when calling wait on a condition variable! Never use an if statement!

class SimpleQueue
  def initialize
    @elems = []
    @mutex = Mutex.new
    @cond_var = ConditionVariable.new
  end

  def <<(elem)
    @mutex.synchronize do
      @elems << elem
      @cond_var.signal
    end
  end

  def shift(blocking = true)
    @mutex.synchronize do
      if blocking
        while @elems.empty?
          @cond_var.wait(@mutex)
        end
      end
      @elems.shift
    end
  end
end

simple_queue = SimpleQueue.new

# this will print "blocking shift returned with: foo" after 5 seconds
# that is to say, the first thread will go to sleep until the second
# thread adds an element to the queue, thereby causing the first thread
# to be woken up again
threads = []
threads << Thread.new { puts "blocking shift returned with: #{simple_queue.shift}" }
threads << Thread.new { sleep 5; simple_queue << 'foo' }
threads.each(&:join)

One thing to point out in the above code is that @cond_var.signal can get called even when there are no sleeping threads around. This is a perfectly okay thing to do. In these types of scenarios calling @cond_var.signal will just do nothing.

Spurious wakeups

A “spurious wakeup” refers to a sleeping thread getting woken up without any signal call having been made. This is an impossible to avoid edge-case in condition variables. It’s important to point out that this is not being caused by a bug in the Ruby interpreter or anything like that. Instead, the designers of the threading libraries used by your OS found that allowing for the occasional spurious wakeup greatly improves the speed of condition variable operations. As such, any code that uses condition variables needs to take spurious wakeups into account.

So does this mean that we need to rewrite all the code that we’ve written in this article in an attempt to make it resistant to possible bugs introduced by spurious wakeups? You’ll be glad to know that this isn’t the case as all code snippets in this article have always wrapped the cond_var.wait statement inside a while loop!

We covered earlier how using a while loop makes our code more efficient when dealing with certain race conditions as it causes a newly awakened thread to check whether there is actually anything to do for it, and if not, the thread goes back to sleep. This same while loop helps us deal with spurious wakeups as well.

When a thread gets woken up by a spurious wakeup and there is nothing for it to do, our usage of a while loop will cause the thread to detect this and go back to sleep. From the thread’s point of view, being awakened by a spurious wakeup isn’t any different than being woken up with no available tasks to do. So the same mechanism that helps us deal with race conditions solves our spurious wakeup problem as well. It should be obvious by now that while loops play a very important role when working with condition variables.

Conclusion

Ruby’s condition variables are somewhat notorious for their poor documentation. That’s a shame, because they are wonderful data structures for efficiently solving a very specific set of problems. Although, as we’ve seen, using them isn’t without pitfalls. I hope that this post will go some way towards making them (and their pitfalls) a bit better understood in the wider Ruby community.

I also feel like I should point out that while everything mentioned above is correct to the best of my knowledge, I’m unable to guarantee that absolutely no mistakes snuck in while writing this. As always, please feel free to contact me if you think I got anything wrong, or even if you just want to say hello.