Thread-safe Data Structures
Implementing a thread-safe, blocking queue
Now that you’ve got an understanding of mutexes and condition variables, you can use these concepts to build a quintessential thread-safe data structure: a blocking queue.
This will be something you can use like this:
q = BlockingQueue.new
q.push 'thing'
q.pop #=> 'thing'
That’s a pretty simple API, but the other part of the contract is that you don’t want users of BlockingQueue
to have to use a mutex. Rather, it should be internally thread-safe.
The last requirement is that the pop
operation, when called on an empty queue, should block until something is pushed onto the queue, to avoid busy code like this:
# this is a busy loop
loop do
# constantly pop off the queue, but only
# process non-nil results
if item = q.pop
...
end
end
# this is what we want
loop do
# when the queue pops something, you can be sure
# that `item` contains a real, intended value
item = q.pop
...
end
Let’s start with a really simple base, working up incrementally.
class BlockingQueue
def initialize
@storage = Array.new
end
def push(item)
@storage.push(item)
end
def pop
@storage.shift
end
end
This provides the framework for us to build on. Right away you should see a problem here.
Given that the underlying Array
isn’t thread-safe, and this class doesn’t use a Mutex
, the modifications happening in the push
and pop
methods will not protect the underlying Array
from concurrent modfications.
A Mutex
will rectify this.
require 'thread'
class BlockingQueue
def initialize
@storage = Array.new
@mutex = Mutex.new
end
def push(item)
@mutex.synchronize do
@storage.push(item)
end
end
def pop
@mutex.synchronize do
@storage.shift
end
end
end
Now the initialize
method creates a Mutex
local to this object. There’s no need for a global mutex here. Different instances of this class will provide their own thread-safety guarantees. So while one instance is pushing data into its Array
, there’s no problem with another instance pushing data into its Array
concurrently. The issue only arises when the concurrent modification is happening on the same instance.
So, this is looking better. But you’re missing the blocking pop
behaviour. Currently a nil
is returned from pop
if the queue is empty. A ConditionVariable
can rectify this.
require 'thread'
class BlockingQueue
def initialize
@storage = Array.new
@mutex = Mutex.new
@condvar = ConditionVariable.new
end
def push(item)
@mutex.synchronize do
@storage.push(item)
@condvar.signal
end
end
def pop
@mutex.synchronize do
while @storage.empty?
@condvar.wait(@mutex)
end
@storage.shift
end
end
end
This gets you the behaviour you need. We have one ConditionVariable
object being shared among any threads that will be using this object instance.
When a thread calls the pop
method, assuming that the underlying Array
is empty, it calls ConditionVariable#wait
, putting this thread to sleep. When the push
method is called, it signals the condition variable, waking up one thread that’s waiting on the condition variable to shift that item off of the Array
.
Queue, from the standard lib
Ruby’s standard library ships with a class called Queue
. This is the only thread-safe data structure that ships with Ruby. It’s part of the set of utilties that’s loaded when you require 'thread'
.
In this example, you came dangerously close to mirroring the implementation of Queue
! Queue
has a few more methods than your BlockingQueue
, but its behaviour regarding push
and pop
is exactly the same.
Queue
is very useful because of its blocking behaviour. Typically, you would use a Queue
to distribute workloads to multiple threads, with one thread pushing to the queue, and multiple threads popping. The popping threads are put to sleep until there’s some work for them to do.
require 'thread'
queue = Queue.new
producer = Thread.new do
10.times do
queue.push(Time.now.to_i)
sleep 1
end
end
consumers = []
3.times do
consumers << Thread.new do
loop do
unix_timestamp = queue.pop
formatted_timestamp = unix_timestamp.to_s.reverse.
gsub(/(\d\d\d)/, '\1,').reverse
puts "It's been #{formatted_timestamp} seconds since the epoch!"
end
end
end
producer.join
Array and Hash
Queue
is useful, but sometimes you do need a regular ol' Array or Hash to get the job done. Unfortunately, Ruby doesn’t ship with any thread-safe Array or Hash implementations.
The core Array
and Hash
classes are not thread-safe by default, nor should they be. Thread-safety concerns would add overhead to their implementation, which would hurt performance for single-threaded use cases.
You might be thinking: “With all of the great concurrency support available to Java on the JVM, surely the JRuby Array
and Hash
are thread-safe?” They’re not. For the exact reason mentioned above, using a thread-safe data structure in a single-threaded context would reduce performance.
Indeed, even in a language like Java, these basic data structures aren’t thread-safe. However, unlike Ruby, Java does have dependable, thread-safe alternatives bulit-in.
In Ruby, when you need a thread-safe Array or Hash, I suggest the thread_safe
rubygem. This gem provides thread-safe versions under its own namespace.
ThreadSafe::Array
can be used in place ofArray
.ThreadSafe::Hash
can be used in place ofHash
.
Note that these data structures aren’t re-implementations; they actually wrap the core Array and Hash, ensuring that each method call is protected by a Mutex.
Immutable data structures
Immutable data structures are inherently thread-safe. Read more about them in the appendix on Immutability.