How Sidekiq Uses Celluloid
Sidekiq is a multi-threaded background job processing system backed by Redis. It’s multi-threaded, but if you take a close look through the source code, you won’t see many of the constructs you’ve learned in this book. No Thread.new
or Mutex#synchonize
. This is because all of Sidekiq’s multi-threaded processing is implemented on top of Celluloid.
This is for good reason. Mike Perham, author of Sidekiq, suggests:
As soon as you introduce the
Thread
constant, you’ve probably just introduced 5 new bugs into your code.
Sidekiq is a system composed of Celluloid actors. At the top level, there’s a Manager actor that holds the state of the whole system and mediates between the collaborator actors. The collaborator actors are the Fetcher, which fetches new jobs from Redis, and the Processors, which perform the actual work of the jobs.
This is a rough sketch of Sidekiq’s actor architecture. Specifically, we’re going to focus on how the fetch
, assign
, and process
messages are handled.
Into the source
Normally, when walking through code like this, I would focus on a particular class or a particular method. But given the nature of Sidekiq’s architecture, and that of most actor-based systems, the building block isn’t classes or methods, it’s messages. So the focus will be on messages. That being said, in a Celluloid actor, messages and their associated behaviour are defined as Ruby methods, so you will be looking at methods, but they won’t be used strictly in the traditional sense.
Our journey starts in the Manager actor.
fetch
Once the manager has been initialized, it’s started with the start
method.
def start
@ready.each { dispatch }
end
def dispatch
return if stopped?
# This is a safety check to ensure we haven't leaked
# processors somehow.
raise "BUG: No processors, cannot continue!" if @ready.empty? && @busy.empty?
raise "No ready processor!?" if @ready.empty?
@fetcher.async.fetch
end
In the Manager, the @ready
variable holds references to the Processor actors that are ready to process jobs. So it calls the dispatch
method once for each idle Processor. The dispatch
method does some housekeeping at the beginning, but the last line is the important one.
The async
method used on the last line will send the fetch
message to the @fetcher
actor and will not wait for the return value. It’s essentially is a fire-and-forget call where the message is sent, but the Manager doesn’t wait for the response.
Since the fetch
message is asynchronous, it can be sent multiple times and queued in the Fetcher actor’s mailbox until it can process the backlog. This is expected behaviour for Sidekiq. For instance, with 25 Processor actors, the fetch
message will be sent 25 times. The Fetcher actor will process each fetch
in turn, as new jobs get pushed into Redis.
This is an excerpt from the Fetcher#fetch
method.
work = @strategy.retrieve_work
if work
@mgr.async.assign(work)
else
after(0) { fetch }
end
The Fetcher first tries to retrieve a unit of work. You can see that if it doesn’t retrieve any work, it calls itself again. This, again, points to the asynchronous nature of sending messages. The Manager sent the fetch
message, but since it’s not waiting for a return value, the Fetcher is free to take as long as necessary before sending a message back to the Manager.
When it’s finally able to retrieve work, it asynchronously sends the assign
message to the Manager, passing along the unit of work.
assign
Here’s the Manager#assign
method that receives that unit of work.
def assign(work)
watchdog("Manager#assign died") do
if stopped?
# Race condition between Manager#stop if Fetcher
# is blocked on redis and gets a message after
# all the ready Processors have been stopped.
# Push the message back to redis.
work.requeue
else
processor = @ready.pop
@in_progress[processor.object_id] = work
@busy << processor
processor.async.process(work)
end
end
end
This method, too, has some housekeeping at the beginning, such as is real-world code after all. The else
block contains the real business logic.
First, the Manager grabs the next available Processor, then keeps tracks of its status appropriately in its internal data structures.
Notice that the Manager is using plain Ruby arrays here with @ready
and @busy
. Even though they’re used in a multi-threaded context, there’s no thread-safety concern here. This Manager actor lives in its own thread, it own these instance variables, and doesn’t share them with other actors. Since the variables are contained within this thread, there can be no issue with multiple threads interacting with them concurrently.
On the last line there’s another asynchronous message sent, the chosen Processor receives the process
message along with the unit of work. Again, this will be a fire-and-forget style of message, with the Manager not waiting for a response. Instead, the Processor will perform the work, then send a message back to the Manager when it’s finished.
I won’t share the definition of the process
method because it’s almost entirely focused on the actual performing of the job, there’s very little fodder in terms of multi-threading primitives.
Wrap-up
You saw how Sidekiq handled the fetch
, assign
, and process
messages. This paradigm should feel a bit different from traditional Ruby code. In traditional Ruby code you might achieve the same behaviour with something like the following:
class Manager
def dispatch
loop do
work = @fetcher.fetch
result = processor_pool.process(work)
log_result(result)
end
end
# ...
end
The most obvious difference I see between the Sidekiq codebase and a more traditional Ruby codebase is the lack of dependence upon return values. In my example above there’s one method that calls a bunch of others, collecting return values and passing them around.
When sending messages in Sidekiq, return values are seldom used. Instead, when an actor sends a message, they expect a message to be sent back to them in return. This keeps things asynchronous. Besides this stylistic difference, the Sidekiq codebase is remarkably free of threading primitives. This is due to how well Celluloid respects Ruby’s object system, as we explore in the last chapter.
Sidekiq is a great example of how simple it can be to integrate multi-threaded concurrency, via actors, with your business logic.