Thread-safe Data Structures

Implementing a thread-safe, blocking queue

Now that you’ve got an understanding of mutexes and condition variables, you can use these concepts to build a quintessential thread-safe data structure: a blocking queue.

This will be something you can use like this:

q = BlockingQueue.new
q.push 'thing'
q.pop #=> 'thing'

That’s a pretty simple API, but the other part of the contract is that you don’t want users of BlockingQueue to have to use a mutex. Rather, it should be internally thread-safe.

The last requirement is that the pop operation, when called on an empty queue, should block until something is pushed onto the queue, to avoid busy code like this:

# this is a busy loop
loop do
  # constantly pop off the queue, but only
  # process non-nil results
  if item = q.pop
    ...
  end
end

# this is what we want
loop do
  # when the queue pops something, you can be sure 
  # that `item` contains a real, intended value
  item = q.pop
  ...
end

Let’s start with a really simple base, working up incrementally.

class BlockingQueue
  def initialize
    @storage = Array.new
  end

  def push(item)
    @storage.push(item)
  end

  def pop
    @storage.shift
  end
end

This provides the framework for us to build on. Right away you should see a problem here.

Given that the underlying Array isn’t thread-safe, and this class doesn’t use a Mutex, the modifications happening in the push and pop methods will not protect the underlying Array from concurrent modfications.

A Mutex will rectify this.

require 'thread'

class BlockingQueue
  def initialize
    @storage = Array.new
    @mutex = Mutex.new
  end

  def push(item)
    @mutex.synchronize do
      @storage.push(item)
    end
  end

  def pop
    @mutex.synchronize do
      @storage.shift
    end
  end
end

Now the initialize method creates a Mutex local to this object. There’s no need for a global mutex here. Different instances of this class will provide their own thread-safety guarantees. So while one instance is pushing data into its Array, there’s no problem with another instance pushing data into its Array concurrently. The issue only arises when the concurrent modification is happening on the same instance.

So, this is looking better. But you’re missing the blocking pop behaviour. Currently a nil is returned from pop if the queue is empty. A ConditionVariable can rectify this.

require 'thread'

class BlockingQueue
  def initialize
    @storage = Array.new
    @mutex = Mutex.new
    @condvar = ConditionVariable.new
  end

  def push(item)
    @mutex.synchronize do
      @storage.push(item)
      @condvar.signal
    end
  end

  def pop
    @mutex.synchronize do
      while @storage.empty?
        @condvar.wait(@mutex)
      end

      @storage.shift
    end
  end
end

This gets you the behaviour you need. We have one ConditionVariable object being shared among any threads that will be using this object instance.

When a thread calls the pop method, assuming that the underlying Array is empty, it calls ConditionVariable#wait, putting this thread to sleep. When the push method is called, it signals the condition variable, waking up one thread that’s waiting on the condition variable to shift that item off of the Array.

Queue, from the standard lib

Ruby’s standard library ships with a class called Queue. This is the only thread-safe data structure that ships with Ruby. It’s part of the set of utilties that’s loaded when you require 'thread'.

In this example, you came dangerously close to mirroring the implementation of Queue! Queue has a few more methods than your BlockingQueue, but its behaviour regarding push and pop is exactly the same.

Queue is very useful because of its blocking behaviour. Typically, you would use a Queue to distribute workloads to multiple threads, with one thread pushing to the queue, and multiple threads popping. The popping threads are put to sleep until there’s some work for them to do.

require 'thread'

queue = Queue.new

producer = Thread.new do
  10.times do
    queue.push(Time.now.to_i)
    sleep 1
  end
end

consumers = []

3.times do
  consumers << Thread.new do
    loop do
      unix_timestamp = queue.pop
      formatted_timestamp = unix_timestamp.to_s.reverse.
                            gsub(/(\d\d\d)/, '\1,').reverse

      puts "It's been #{formatted_timestamp} seconds since the epoch!"
    end
  end
end

producer.join

Array and Hash

Queue is useful, but sometimes you do need a regular ol' Array or Hash to get the job done. Unfortunately, Ruby doesn’t ship with any thread-safe Array or Hash implementations.

The core Array and Hash classes are not thread-safe by default, nor should they be. Thread-safety concerns would add overhead to their implementation, which would hurt performance for single-threaded use cases.

You might be thinking: “With all of the great concurrency support available to Java on the JVM, surely the JRuby Array and Hash are thread-safe?” They’re not. For the exact reason mentioned above, using a thread-safe data structure in a single-threaded context would reduce performance.

Indeed, even in a language like Java, these basic data structures aren’t thread-safe. However, unlike Ruby, Java does have dependable, thread-safe alternatives bulit-in.

In Ruby, when you need a thread-safe Array or Hash, I suggest the thread_safe rubygem. This gem provides thread-safe versions under its own namespace.

  • ThreadSafe::Array can be used in place of Array.
  • ThreadSafe::Hash can be used in place of Hash.

Note that these data structures aren’t re-implementations; they actually wrap the core Array and Hash, ensuring that each method call is protected by a Mutex.

Immutable data structures

Immutable data structures are inherently thread-safe. Read more about them in the appendix on Immutability.