How Sidekiq Uses Celluloid

Sidekiq is a multi-threaded background job processing system backed by Redis. It’s multi-threaded, but if you take a close look through the source code, you won’t see many of the constructs you’ve learned in this book. No Thread.new or Mutex#synchonize. This is because all of Sidekiq’s multi-threaded processing is implemented on top of Celluloid.

This is for good reason. Mike Perham, author of Sidekiq, suggests:

As soon as you introduce the Thread constant, you’ve probably just introduced 5 new bugs into your code.

Sidekiq is a system composed of Celluloid actors. At the top level, there’s a Manager actor that holds the state of the whole system and mediates between the collaborator actors. The collaborator actors are the Fetcher, which fetches new jobs from Redis, and the Processors, which perform the actual work of the jobs.

This is a rough sketch of Sidekiq’s actor architecture. Specifically, we’re going to focus on how the fetch, assign, and process messages are handled.

Into the source

Normally, when walking through code like this, I would focus on a particular class or a particular method. But given the nature of Sidekiq’s architecture, and that of most actor-based systems, the building block isn’t classes or methods, it’s messages. So the focus will be on messages. That being said, in a Celluloid actor, messages and their associated behaviour are defined as Ruby methods, so you will be looking at methods, but they won’t be used strictly in the traditional sense.

Our journey starts in the Manager actor.

fetch

Once the manager has been initialized, it’s started with the start method.

def start
  @ready.each { dispatch }
end

def dispatch
  return if stopped?
  # This is a safety check to ensure we haven't leaked
  # processors somehow.
  raise "BUG: No processors, cannot continue!" if @ready.empty? && @busy.empty?
  raise "No ready processor!?" if @ready.empty?

  @fetcher.async.fetch
end

In the Manager, the @ready variable holds references to the Processor actors that are ready to process jobs. So it calls the dispatch method once for each idle Processor. The dispatch method does some housekeeping at the beginning, but the last line is the important one.

The async method used on the last line will send the fetch message to the @fetcher actor and will not wait for the return value. It’s essentially is a fire-and-forget call where the message is sent, but the Manager doesn’t wait for the response.

Since the fetch message is asynchronous, it can be sent multiple times and queued in the Fetcher actor’s mailbox until it can process the backlog. This is expected behaviour for Sidekiq. For instance, with 25 Processor actors, the fetch message will be sent 25 times. The Fetcher actor will process each fetch in turn, as new jobs get pushed into Redis.

This is an excerpt from the Fetcher#fetch method.

work = @strategy.retrieve_work

if work
  @mgr.async.assign(work)
else
  after(0) { fetch }
end

The Fetcher first tries to retrieve a unit of work. You can see that if it doesn’t retrieve any work, it calls itself again. This, again, points to the asynchronous nature of sending messages. The Manager sent the fetch message, but since it’s not waiting for a return value, the Fetcher is free to take as long as necessary before sending a message back to the Manager.

When it’s finally able to retrieve work, it asynchronously sends the assign message to the Manager, passing along the unit of work.

assign

Here’s the Manager#assign method that receives that unit of work.

def assign(work)
  watchdog("Manager#assign died") do
    if stopped?
      # Race condition between Manager#stop if Fetcher
      # is blocked on redis and gets a message after
      # all the ready Processors have been stopped.
      # Push the message back to redis.
      work.requeue
    else
      processor = @ready.pop
      @in_progress[processor.object_id] = work
      @busy << processor
      processor.async.process(work)
    end
  end
end

This method, too, has some housekeeping at the beginning, such as is real-world code after all. The else block contains the real business logic.

First, the Manager grabs the next available Processor, then keeps tracks of its status appropriately in its internal data structures.

Notice that the Manager is using plain Ruby arrays here with @ready and @busy. Even though they’re used in a multi-threaded context, there’s no thread-safety concern here. This Manager actor lives in its own thread, it own these instance variables, and doesn’t share them with other actors. Since the variables are contained within this thread, there can be no issue with multiple threads interacting with them concurrently.

On the last line there’s another asynchronous message sent, the chosen Processor receives the process message along with the unit of work. Again, this will be a fire-and-forget style of message, with the Manager not waiting for a response. Instead, the Processor will perform the work, then send a message back to the Manager when it’s finished.

I won’t share the definition of the process method because it’s almost entirely focused on the actual performing of the job, there’s very little fodder in terms of multi-threading primitives.

Wrap-up

You saw how Sidekiq handled the fetch, assign, and process messages. This paradigm should feel a bit different from traditional Ruby code. In traditional Ruby code you might achieve the same behaviour with something like the following:

class Manager
  def dispatch
    loop do
      work = @fetcher.fetch

      result = processor_pool.process(work)
      log_result(result)
    end
  end
  # ...
end

The most obvious difference I see between the Sidekiq codebase and a more traditional Ruby codebase is the lack of dependence upon return values. In my example above there’s one method that calls a bunch of others, collecting return values and passing them around.

When sending messages in Sidekiq, return values are seldom used. Instead, when an actor sends a message, they expect a message to be sent back to them in return. This keeps things asynchronous. Besides this stylistic difference, the Sidekiq codebase is remarkably free of threading primitives. This is due to how well Celluloid respects Ruby’s object system, as we explore in the last chapter.

Sidekiq is a great example of how simple it can be to integrate multi-threaded concurrency, via actors, with your business logic.