Wrap Your Threads in an Abstraction
Up until now, all of the example code in the book has been of a trivial nature, stuff that you might write in a simple script to accomplish a task on your local machine, but probably wouldn’t imagine putting in a production situation. Now I want to turn you toward the real world.
This chapter will help you put threads at the right level of abstraction in your code, rather than having them mixed up alongside your domain logic.
Single level of abstraction
In the book Clean Code, Robert Martin laid out principles for improving code readability and maintanability. One of the principles that’s relevant to this discussion is “one level of abstraction per function." It states that all code in a given function should be at the same level of abstraction; i.e., high-level code and low-level code don’t mix well.
What constitutes high-level code and low-level code is certainly up for debate, but I would label code that works with threads, mutexes, and the like as low-level code. It is almost certainly a lower level of abstraction than your domain logic.
Let me use an analogy to make my point.
When you use a database adapter (like mysql2 or redis-rb), do you ever see the Socket
constant? No. All of these adapters are using a socket internally for communication, but that’s at a different level of abstraction than writing SQL statements.
The database adapters extract out the complexities of working with sockets, handing network exceptions, buffering, etc. The same is true when working with threads.
Threads are also a low-level concept with its own complexities. It’s something provided by your operating system, and is not likely part of the main domain logic of your application.
Given this, it’s best to wrap threads in an abstraction where possible. In certain circumstances your domain logic might need direct access to a Mutex
or might need to spawn threads very deliberately. This isn’t a hard and fast rule, just a guideline.
Let’s start with a simple case. Way back at the beginning of the book we looked at a trivial FileUploader
class. Here it is, to refresh your memory.
require 'thread'
class FileUploader
def initialize(files)
@files = files
end
def upload
threads = []
@files.each do |(filename, file_data)|
threads << Thread.new {
status = upload_to_s3(filename, file_data)
results << status
}
end
threads.each(&:join)
end
def results
@results ||= Queue.new
end
def upload_to_s3(filename, file)
# omitted
end
end
uploader = FileUploader.new('boots.png' => '*pretend png data*', 'shirts.png' => '*pretend png data*')
uploader.upload
puts uploader.results.size
I even made a case way back at the start of the book that the Thread.new
here sticks out like a sore thumb. It’s quite obviously at a different level of abstraction than the rest of the code. Let’s extract that to somewhere else so that this code can, once again, focus on its intent.
module Enumerable
def concurrent_each
threads = []
each do |element|
threads << Thread.new {
yield element
}
end
threads.each(&:join)
end
end
This is a simple wrapper around Enumerable#each
that will spawn a thread for each element being iterated over. It wouldn’t be wise to use this code in production yet because it has no upper bound on the number of threads it will spawn. Give it an Array
with 100,000 elements and it will attempt to spawn 100,000 threads!
A better approach here would be to spawn a fixed-size pool of threads, like Puma does, then keep passing work to those threads as they can handle it.
Now we can rewrite the FileUploader
class to make use of this new method, still preserving its behaviour, but no longer mixing levels of abstraction.
require 'thread'
require_relative 'concurrent_each'
class FileUploader
attr_reader :results
def initialize(files)
@files = files
@results = Queue.new
end
def upload
@files.concurrent_each do |(filename, file_data)|
status = upload_to_s3(filename, file_data)
@results << status
end
end
def upload_to_s3(filename, file)
# omitted
end
end
uploader = FileUploader.new('boots.png' => '*pretend png data*', 'shirts.png' => '*pretend png data*')
uploader.upload
puts uploader.results.size
This is much easier to grok at a glance, with the concurrency details handled now by ConcurrentEach
.
Actor model
In some cases, it will make sense for you to write your own simple abstractions on top of your multi-threaded code, as in the above example. In other cases, especially when multi-threading concerns are at the core of your program (think servers, networking, etc.), you’ll get more benefit from a more mature abstraction.
The Actor model is a well-known and proven technique for doing multi-threaded concurrency. As an abstraction, it’s much more mature than the simple map
you did in the last section.
At a high level, an Actor is a long-lived ‘entity’ that communicates by sending messages. When I say long-lived entity, I’m talking about a long-running thread, not something that’s spawned ad-hoc. In the Actor model, each Actor has an ‘address’. If you know the address of an Actor, you can send it a message.
These messages go to the Actor’s mailbox, where they’re processed asynchronously when the Actor gets around to it.
This describes the core of the Actor model at a high level, but let’s start talking about the implementation. There’s obviously more than one way that this kind of system can be implemented. Indeed, there are several implementations available in the Ruby community. Besides Celluloid, rubinius-actor is probably the second most used. For the rest of this chapter, we’ll be looking at Celluloid.
What sets Celluloid apart is that it takes this conceptual idea of the Actor model and marries it to Ruby’s object model. Let’s see the basic operations of a Celluloid actor, and how they map to the overview I gave above.
require 'celluloid/autostart'
require 'net/http'
class XKCDFetcher
include Celluloid
def next
response = Net::HTTP.get_response('dynamic.xkcd.com', '/random/comic/')
random_comic_url = response['Location']
random_comic_url
end
end
The only real detail that separates this class from a regular Ruby class is the inclusion of the Celluloid
module.
class XKCDFetcher
include Celluloid
Including the Celluloid
module into any Ruby class will turn instances of that class into full-fledged Celluloid actors. That’s all it takes.
From now on, any time that a new instance of XKCDFetcher
is created, it will be wrapped by a Celluloid actor. Each Celluloid actor is housed by a separate thread, one thread per actor.
When you create a new actor, you immediately know its ‘address’. So long as you hold a reference to that object, you can send it messages. In Celluloid, sending messages to an actor equates to calling methods on an object. However, Celluloid does preserve the behaviour of regular method calls, so it still feels like a regular Ruby object.
# this spawns a new thread containing a Celluloid actor
fetcher = XKCDFetcher.new
# these behave like regular method calls
fetcher.object_id
fetcher.inspect
# this will fire the `next` method without
# waiting for its result
fetcher.async.next
fetcher.async.next
Regular method calls are sent to the actor’s mailbox but, behind the scenes, Celluloid will block the caller until it receives the result, just like a regular method call.
However, Celluloid also allows you to send a message to the actor’s mailbox without waiting for the result. You can see this in the example above using the async
syntax. In this way, you can asynchronously fire off some work, then continue execution.
The async
is good if you don’t care about the result, but what if you want to fire off a method asynchronously and get its result? That’s what Celluloid futures accomplish, and that rounds out this code example.
fetcher = XKCDFetcher.new
futures = []
10.times do
futures << fetcher.future.next
end
futures.each do |future|
puts "You should check out #{future.value}"
end
This example begins by spawning a new actor. Then the next
method is called 10 times using the future
syntax. First, call the future
method on an actor, then the message you want to send, which must correspond to a public method on the actor. Celluloid kicks off that method asynchronously and returns you a Celluloid::Future
object.
Calling #value
on that future object will block until the value has been computed. In this example, 10 fetches to xkcd are kicked off asynchronously, then the results are all collected using the call to value
.
This is the basic idea behind Celluloid. It actually has a lot more features, the wiki has great coverage of what it offers, but this concept of ‘objects as actors’ is really at its core.
Recall, for any class that includes the Celluloid
module, instances of that class will each be independent actors. Actors are independent by virtue of being in their own thread, so there’s one thread per actor. Passing a message to an actor is as simple as calling one of its public methods. The behaviour associated with any particular message is simply the body of the method.
Celluloid is a great solution to concurrency that puts the abstraction at the right level and wraps up a lot of best practices. The next chapter will take a deeper look at real-world Celluloid project.