Pattern: Thread Pool
Overview
This pattern is to Preforking what Thread Per Connection is to Process Per Connection. Much like Preforking, this pattern will spawn a number of threads when the server boots and defer connection handling to each independent thread.
The flow of this architecture is the same as the previous, but substitute ‘thread’ for ‘process’.
Implementation
require 'socket'
require 'thread'
require_relative '../command_handler'
module FTP
Connection = Struct.new(:client) do
CRLF = "\r\n"
def gets
client.gets(CRLF)
end
def respond(message)
client.write(message)
client.write(CRLF)
end
def close
client.close
end
end
class ThreadPool
CONCURRENCY = 25
def initialize(port = 21)
@control_socket = TCPServer.new(port)
trap(:INT) { exit }
end
def run
Thread.abort_on_exception = true
threads = ThreadGroup.new
CONCURRENCY.times do
threads.add spawn_thread
end
sleep
end
def spawn_thread
Thread.new do
loop do
conn = Connection.new(@control_socket.accept)
conn.respond "220 OHAI"
handler = CommandHandler.new(conn)
loop do
request = conn.gets
if request
conn.respond handler.handle(request)
else
conn.close
break
end
end
end
end
end
end
end
server = FTP::ThreadPool.new(4481)
server.run
Again, two main methods here. One spawns the threads, the other encapsulates the spawning and thread behaviour. Since we’re working with threads, we’ll once again be using the Connection
class.
CONCURRENCY = 25
def initialize(port = 21)
@control_socket = TCPServer.new(port)
trap(:INT) { exit }
end
def run
Thread.abort_on_exception = true
threads = ThreadGroup.new
CONCURRENCY.times do
threads.add spawn_thread
end
sleep
end
The run
method creates a ThreadGroup
to keep track of all the threads. ThreadGroup
is a bit like a thread-aware Array. You add threads to the ThreadGroup
, but when a member thread finishes execution it’s silently dropped from the group.
You can use ThreadGroup#list
to get a list of all the threads currently in the group, all of which will be active. We don’t actually use this in this implementation but ThreadGroup
would be useful if we wanted to act on all active threads (to join
them, for instance).
Much like in the last chapter, we simply call the spawn_thread
method as many times as CONCURRENCY
calls for. Notice how the CONCURRENCY
number is higher here than in Preforking? Again, that’s because threads are lighter weight and, therefore, we can have more of them. Just keep in mind that the MRI GIL mitigates some of this gain.
The end of this method calls sleep
to prevent it from exiting. The main thread remains idle while the pool does the work. Theoretically it could be doing its own work monitoring the pool, but here it just sleep
s to prevent it from exiting.
def spawn_thread
Thread.new do
loop do
conn = Connection.new(@control_socket.accept)
conn.respond "220 OHAI"
handler = CommandHandler.new(conn)
loop do
request = conn.gets
if request
conn.respond handler.handle(request)
else
conn.close
break
end
end
end
end
end
This method is pretty unexciting. It follows the same pattern as Preforking. Namely, spawn a thread that loops around the connection handling code. Again, the kernel ensures that a single connection can only be accept
ed into a single thread.
Considerations
Much of the considerations of this pattern are shared with the previous.
Besides the obvious thread vs. process tradeoff this pattern will not need to spawn threads each time it handles a connection, does not have any crazy locks or race conditions, yet still provides parallel processing.