Wrap Your Threads in an Abstraction

Up until now, all of the example code in the book has been of a trivial nature, stuff that you might write in a simple script to accomplish a task on your local machine, but probably wouldn’t imagine putting in a production situation. Now I want to turn you toward the real world.

This chapter will help you put threads at the right level of abstraction in your code, rather than having them mixed up alongside your domain logic.

Single level of abstraction

In the book Clean Code, Robert Martin laid out principles for improving code readability and maintanability. One of the principles that’s relevant to this discussion is “one level of abstraction per function." It states that all code in a given function should be at the same level of abstraction; i.e., high-level code and low-level code don’t mix well.

What constitutes high-level code and low-level code is certainly up for debate, but I would label code that works with threads, mutexes, and the like as low-level code. It is almost certainly a lower level of abstraction than your domain logic.

Let me use an analogy to make my point.

When you use a database adapter (like mysql2 or redis-rb), do you ever see the Socket constant? No. All of these adapters are using a socket internally for communication, but that’s at a different level of abstraction than writing SQL statements.

The database adapters extract out the complexities of working with sockets, handing network exceptions, buffering, etc. The same is true when working with threads.

Threads are also a low-level concept with its own complexities. It’s something provided by your operating system, and is not likely part of the main domain logic of your application.

Given this, it’s best to wrap threads in an abstraction where possible. In certain circumstances your domain logic might need direct access to a Mutex or might need to spawn threads very deliberately. This isn’t a hard and fast rule, just a guideline.

Let’s start with a simple case. Way back at the beginning of the book we looked at a trivial FileUploader class. Here it is, to refresh your memory.

require 'thread'

class FileUploader
  def initialize(files)
    @files = files
  end

  def upload
    threads = []

    @files.each do |(filename, file_data)|
      threads << Thread.new {
        status = upload_to_s3(filename, file_data)
        results << status
      }
    end

    threads.each(&:join)
  end

  def results
    @results ||= Queue.new
  end

  def upload_to_s3(filename, file)
    # omitted
  end
end

uploader = FileUploader.new('boots.png' => '*pretend png data*', 'shirts.png' => '*pretend png data*')
uploader.upload

puts uploader.results.size

I even made a case way back at the start of the book that the Thread.new here sticks out like a sore thumb. It’s quite obviously at a different level of abstraction than the rest of the code. Let’s extract that to somewhere else so that this code can, once again, focus on its intent.

module Enumerable
  def concurrent_each
    threads = []

    each do |element|
      threads << Thread.new {
        yield element
      }
    end

    threads.each(&:join)
  end
end

This is a simple wrapper around Enumerable#each that will spawn a thread for each element being iterated over. It wouldn’t be wise to use this code in production yet because it has no upper bound on the number of threads it will spawn. Give it an Array with 100,000 elements and it will attempt to spawn 100,000 threads!

A better approach here would be to spawn a fixed-size pool of threads, like Puma does, then keep passing work to those threads as they can handle it.

Now we can rewrite the FileUploader class to make use of this new method, still preserving its behaviour, but no longer mixing levels of abstraction.

require 'thread'
require_relative 'concurrent_each'

class FileUploader
  attr_reader :results

  def initialize(files)
    @files = files
    @results = Queue.new
  end

  def upload
    @files.concurrent_each do |(filename, file_data)|
      status = upload_to_s3(filename, file_data)
      @results << status
    end
  end

  def upload_to_s3(filename, file)
    # omitted
  end
end

uploader = FileUploader.new('boots.png' => '*pretend png data*', 'shirts.png' => '*pretend png data*')
uploader.upload

puts uploader.results.size

This is much easier to grok at a glance, with the concurrency details handled now by ConcurrentEach.

Actor model

In some cases, it will make sense for you to write your own simple abstractions on top of your multi-threaded code, as in the above example. In other cases, especially when multi-threading concerns are at the core of your program (think servers, networking, etc.), you’ll get more benefit from a more mature abstraction.

The Actor model is a well-known and proven technique for doing multi-threaded concurrency. As an abstraction, it’s much more mature than the simple map you did in the last section.

At a high level, an Actor is a long-lived ‘entity’ that communicates by sending messages. When I say long-lived entity, I’m talking about a long-running thread, not something that’s spawned ad-hoc. In the Actor model, each Actor has an ‘address’. If you know the address of an Actor, you can send it a message.

These messages go to the Actor’s mailbox, where they’re processed asynchronously when the Actor gets around to it.

This describes the core of the Actor model at a high level, but let’s start talking about the implementation. There’s obviously more than one way that this kind of system can be implemented. Indeed, there are several implementations available in the Ruby community. Besides Celluloid, rubinius-actor is probably the second most used. For the rest of this chapter, we’ll be looking at Celluloid.

What sets Celluloid apart is that it takes this conceptual idea of the Actor model and marries it to Ruby’s object model. Let’s see the basic operations of a Celluloid actor, and how they map to the overview I gave above.

require 'celluloid/autostart'
require 'net/http'

class XKCDFetcher
  include Celluloid

  def next
    response = Net::HTTP.get_response('dynamic.xkcd.com', '/random/comic/')
    random_comic_url = response['Location']

    random_comic_url
  end
end

The only real detail that separates this class from a regular Ruby class is the inclusion of the Celluloid module.

class XKCDFetcher
  include Celluloid

Including the Celluloid module into any Ruby class will turn instances of that class into full-fledged Celluloid actors. That’s all it takes.

From now on, any time that a new instance of XKCDFetcher is created, it will be wrapped by a Celluloid actor. Each Celluloid actor is housed by a separate thread, one thread per actor.

When you create a new actor, you immediately know its ‘address’. So long as you hold a reference to that object, you can send it messages. In Celluloid, sending messages to an actor equates to calling methods on an object. However, Celluloid does preserve the behaviour of regular method calls, so it still feels like a regular Ruby object.

# this spawns a new thread containing a Celluloid actor
fetcher = XKCDFetcher.new

# these behave like regular method calls
fetcher.object_id
fetcher.inspect

# this will fire the `next` method without
# waiting for its result
fetcher.async.next
fetcher.async.next

Regular method calls are sent to the actor’s mailbox but, behind the scenes, Celluloid will block the caller until it receives the result, just like a regular method call.

However, Celluloid also allows you to send a message to the actor’s mailbox without waiting for the result. You can see this in the example above using the async syntax. In this way, you can asynchronously fire off some work, then continue execution.

The async is good if you don’t care about the result, but what if you want to fire off a method asynchronously and get its result? That’s what Celluloid futures accomplish, and that rounds out this code example.

fetcher = XKCDFetcher.new
futures = []

10.times do
  futures << fetcher.future.next
end

futures.each do |future|
  puts "You should check out #{future.value}"
end

This example begins by spawning a new actor. Then the next method is called 10 times using the future syntax. First, call the future method on an actor, then the message you want to send, which must correspond to a public method on the actor. Celluloid kicks off that method asynchronously and returns you a Celluloid::Future object.

Calling #value on that future object will block until the value has been computed. In this example, 10 fetches to xkcd are kicked off asynchronously, then the results are all collected using the call to value.

This is the basic idea behind Celluloid. It actually has a lot more features, the wiki has great coverage of what it offers, but this concept of ‘objects as actors’ is really at its core.

Recall, for any class that includes the Celluloid module, instances of that class will each be independent actors. Actors are independent by virtue of being in their own thread, so there’s one thread per actor. Passing a message to an actor is as simple as calling one of its public methods. The behaviour associated with any particular message is simply the body of the method.

Celluloid is a great solution to concurrency that puts the abstraction at the right level and wraps up a lot of best practices. The next chapter will take a deeper look at real-world Celluloid project.