Puma's Thread Pool Implementation

Puma is a concurrent web server for Rack apps that uses multiple threads for concurrency. Other popular web servers in the community are backed by multiple processes, or by a single-threaded event loop, so Puma is really the front-runner when it comes to multi-threaded servers.

At Puma’s multi-threaded core is a thread pool implementation. I’m going to walk you through the main threaded logic so you can see how a real-world, battle-tested project handles things.

A what now?

Puma’s thread pool is responsible for spawning the worker threads and feeding them work. The thread pool wraps up all of the multi-threaded concerns so that the rest of the server is just concerned with networking and the actual domain logic.

The Puma::ThreadPool is actually a totally generic class, not Puma specific. This makes it a good study, and it could potentially be extracted into something generally useful.

Once initialized, the pool is responsible for receiving work and feeding it to an available worker thread. The ThreadPool also has an auto-trimming feature, whereby the number of active threads is kept to a minimum, but more threads can be spawned during times of high load. Afterwards, the thread pool would be trimmed down to the minimum again. Note: I edited the example methods slightly to remove this logic, as it didn’t add anything to the discussion.

The whole thing

Here’s the whole implementation of the Puma::ThreadPool#spawn_thread method. This gets called once for each worker thread to be spawned for the pool. I’ll walk through it section by section.

Just for reference, one instance of this class will spawn many threads. Much like in our examples, instance variables are shared among multiple threads.

   def spawn_thread
      @spawned += 1

      th = Thread.new do
        todo  = @todo
        block = @block
        mutex = @mutex
        cond  = @cond

        extra = @extra.map { |i| i.new }

        while true
          work = nil
          continue = true

          mutex.synchronize do
            while todo.empty?
              if @shutdown
                continue = false
                break
              end

              @waiting += 1
              cond.wait mutex
              @waiting -= 1
            end

            work = todo.pop if continue
          end

          break unless continue

          block.call(work, *extra)
        end

        mutex.synchronize do
          @spawned -= 1
          @workers.delete th
        end
      end

      @workers << th

      th
    end

In bits

Now you’ll see it bit by bit.

   def spawn_thread
      @spawned += 1

I want to highlight this very first line in the method because it illustrates an important point that I mentioned in the chapter on Mutexes. This @spawned instance variable is shared among all the active threads and, as you know, this += operation is not thread-safe! From what we can see, there’s no mutex being used. What gives?

In the source file, there’s this very important comment right above this method:

    # Must be called with @mutex held!

This is a great example of mutexes being opt-in. This method must be called with the shared @mutex held, but it doesn’t do any internal checking, so it would be possible to call this method without a mutex, potentially corrupting the value of @spawned.

Just a good reminder that mutexes only work if callers respect the implicit contract it offers. Moving on.

      th = Thread.new do
        todo  = @todo
        block = @block
        mutex = @mutex
        cond  = @cond

        extra = @extra.map { |i| i.new }

The first line here spawns a thread that will become part of the pool. This is only part of the block that’s passed to Thread.new.

At first glance, this bit of code looks like it might be assigning local variables so as not to share references with other threads. If each thread needed to re-assign its mutex, for instance, it would want to switch to a local reference so as not to affect other threads.

But the git blame for this bit of code suggests otherwise. Since this is a hot code path for Puma, using local variables will slightly improve performance over using instance variables. The references are never re-assigned by the individual threads, and this does nothing to prevent the threads sharing references. In this case, the threads must share the reference to the mutex and condition variable in order for their guarantees to hold.

These kinds of optimizations are common for web servers, but rare for application logic.

        while true
          work = nil
          continue = true

Now we get into the real meat of this method.

The first line enters an endless loop. So this thread will execute forever, until it hits its exit condition further down. We’ll see the work and continue variables further down. They’re just initialized here.

          mutex.synchronize do
            while todo.empty?
              if @shutdown
                continue = false
                break
              end

              @waiting += 1
              cond.wait mutex
              @waiting -= 1
            end

            work = todo.pop if continue
          end

OK, that’s a big paste. I’ll highlight some of the outer constructs, then re-focus on the inner stuff.

First, all of the code in this block happens inside of the mutex.synchronize call. So other threads have to wait while the current thread executes this block.

            while todo.empty?
              if @shutdown
                continue = false
                break
              end

              @waiting += 1
              cond.wait mutex
              @waiting -= 1
            end

This little block of code came straight out of the one earlier, you’re still inside the mutex here. This block only runs if todo is empty. todo is a shared array that holds work to be done. If it’s empty, that means there’s not currently any work to do.

If there’s no work to do, this worker thread will check to see if it should shut down. In that case it will set that continue variable to false and break out of this inner while loop.

If it doesn’t need to shut down, things get more interesting.

First, it increments a global counter saying that it’s going to wait. This operation is safe because the shared mutex is still locked here. Next, it waits on the shared condition variable. Remember that this releases the mutex and puts the current thread to sleep. It won’t be woken up again until there’s some work to do. Since it released the shared mutex, another thread can go through the same routine.

Also notice that a while loop is used as the outer construct here, rather than an if statement. Remember that when once signaled by a condition variable, the condition should be re-checked to ensure that another thread hasn’t already processed the work.

Once enough work arrives, this thread will get woken up. As part of being signaled by the condition variable, it will re-acquire the shared mutex, which once again makes it safe to decrement the global counter.

            work = todo.pop if continue

Now the thread has been awoken, re-acquired the mutex, and found todo to contain some work, it pops the unit of work from todo. This is the last bit of code still inside the mutex.synchronize block.

          break unless continue

          block.call(work, *extra)

This little bit of code is outside the mutex.synchronize block, but now outside the while loop around the condition variable. If it’s time to shut down, this thread will need to break out of its outer infinite loop. This accomplishes that.

If it’s not time to shut down, then this worker thread can process the work to do. In this case, it simply calls the shared block with the work object that it received. The block is passed in to the constructor and is the block of code that each worker thread will perform.

        mutex.synchronize do
          @spawned -= 1
          @workers.delete th
        end
      end

The body of the thread ends with a little housekeeping. Once the thread leaves its infinite loop, it needs to re-acquire the mutex to remove its reference from some shared variables.

      @workers << th

      th

The last two lines are outside the scope of the block passed to Thread.new. So they’ll execute immediately after the thread is spawned. And remember, even here the mutex is held by the caller of this method!

Here the current thread is added to @workers, then returned.

Wrap-up

This implementation nicely illustrates many of the concepts that were covered in this book. And as far as abstractions go, Puma does a superb job of isolating the concurrency-primitive logic from the actual domain logic of the server. I definitely reccomend checking out how the ThreadPool is used in Puma, and the lack of threading primitives through the rest of the codebase.

Simiarly, I encourage you to check out the other methods in the ThreadPool class, tracing the flow from initialization, to work units being added to the thread pool, to work units being processed from the thread pool, all the way to shutdown.