Lost In The Queue...

36:04 reading time


… found in translation.

A while back, Kris wrote about some of the interesting ways redis configuration choices can harm the performance of your applications. The investigation he described was the most illuminating so far in our efforts to understand and tame the microservice beasts we have bred, but I’d like to talk a bit about my own pet preoccupations and how I came by them: specifically, how in translating the library we use to build Python workers into Clojure, I learned a few new things about the code that had been under my nose all along.

A Pattern for Distributing Synchronous Tasks Using Redis

So Kris mentioned PSH, which stands for Psomething Service Hub. (Well, almost—the Psomething part is Top Psecret!) To build it, we wrote a couple of “tasklib” libraries, one in Ruby and one in Python, to enqueue synchrononous1 “tasks"—remote function calls that return all the disparate pieces of context we need to build our pages—in redis. On the other end, worker processes use one of those same libraries (though not necessarily the same language’s implementation as the caller’s) to listen for and receive tasks, dispatch them to the correct function, and return the results. While each worker may provide several different tasks, we strive to keep them focused on a single business domain. The web layer in turn is focused on nothing other than serving requests by orchestrating the concurrent execution of these tasks and ensuring that their results make it into the rendering context for the page being requested.

Basic PSH Architecture

The ecosystem is rounded out with a registry that helps the calling application find the correct queue for the task it would like to call, and a UI on top of that where we can see which workers are providing which tasks on which nodes, what their arguments are, and metrics around: how busy they are, how quickly they are executing, and how often they are erroring. There’s even an automatically generated form for each task that we can use to play around with different arguments to see how they behave.

It’s all pretty cool in my opinion, and I personally designed and built almost none of it. What I knew about the system, I learned by implementing specific workers and building sites with the services those workers provided, or by fixing the occasional bug or adding the odd tweak… but I didn’t really grok it in full until I decided to try to port the worker functionality of the Python version of tasklib to Clojure. Only then did I begin to start worrying and fear the queue.

Building a PSH Worker in Clojure

Here, more or less2, is the Python code I was starting with:

import json
from Queue import Queue
from threading import Thread

class Worker(object):
    def __init__(self, tasks, redis_conn, redis_queue, threads=1):
        self.tasks = tasks
        self.conn = redis_conn
        self.redis_queue = redis_queue
        self.threads = threads
        self.queue = Queue()

    def start(self):
        self.start_threads()
        self.listen()

    def start_threads(self):
        for i in range(self.threads):
            t = Thread(target=self.exec_from_queue, args=())
            t.daemon = True
            t.start()

    def listen(self):
        while 1:
            _, job = self.conn.blpop(self.redis_queue)
            self.queue.put(json.loads(job))

    def exec_from_queue(self):
        job = self.queue.get(block=True)
        result = self.tasks[job['task']](**job['args'])
        self.send_result(job, result)

    def send_result(self, job, result):
        self.conn.rpush(job['result_queue'], json.dumps(result))

Simple enough, right? We prefork a fixed number of threads that wait for jobs they can blpop, and then loop over a stream of jobs and dispatch them to those threads, which process them and send the results. Setting up tasks and starting a worker for this version of the code would look something like this:

import redis

def my_task(**kwargs):
    return {k: 10 + v for k, v in kwargs.items()}

worker = Worker({'my_task:1.0': my_task},
                redis.StrictRedis(),
                'my_worker',
                threads=4)
worker.start()

Here is my fairly literal translation of the above library code into Clojure:

(ns "worker"
  (:require [clojure.core.async :refer [chan <!! >!!]]
            [cheshire.core :refer [generate-string parse-string]]
            [taoensso.carmine :as redis]))

(defn send-result [redis-pool job result]
  (->> (generate-string result)
       (redis/rpush (job "result_queue"))
       (redis/wcar redis-pool)))

(defn exec [task-map job-msg redis-pool]
  (let [job (parse-string job-msg)
        task (task-map (job "task"))
        result (task (job "args"))]
    (send-result redis-pool job result)))

(defn start-thread [task-map job-chan redis-pool]
  (loop []
    (let [job (<!! job-chan)]
      (exec task-map job redis-pool))
    (recur)))

(defn listen [job-chan redis-pool redis-queue]
  (loop []
    (let [[_ job-msg] (redis/wcar redis-pool (redis/blpop redis-queue 0))]
      (>!! job-chan job-msg))
    (recur)))

(defn start [task-map redis-pool redis-queue num-threads]
  (let [job-chan (chan)]
    (dotimes [n num-threads]
      (future (start-thread task-map job-chan redis-pool)))
    (listen job-chan redis-pool redis-queue)))

Once again, we’re preforking a fixed number of threads to wait for jobs, and then setting up a loop to blpop from a redis queue. You might use it like this:

(ns "my-worker"
  (:require [worker :refer [start]]))

(defn my-task [kwargs]
  (into {} (for [[k v] kwargs] [k (+ v 10)])))

(defn -main []
  (start {"my_task:1.0" my-task}
         {}
         "my_worker"
         4))

Pretty straightforward, right? What’s the big deal?

The Garden of Forking Queues

One of the ways we monitor the health of our PSH workers is by keeping an eye on the length of each redis list that acts a request queue. If the queues are backed up, we get nagios alerts. And we can look at pretty grafana graphs to see spikes and trends.

Queue Length

But I don’t recall getting many such alerts, and the graph pretty much always looks like that: flat. Somehow, come bots, scanners, or fat-fingered display campaign publishes, our PSH workers were always keeping up! Or were they?

Here is another graph that measures a stat we called queue_wait:

Queue Wait

You might think that this graph measures the (worst-case) length of time jobs are spending in redis before some worker process blpops them, and that would be a perfectly reasonable thing to measure.3 But that’s not what this stat is measuring. Instead, it’s measuring the time it takes to get from the blpop to the start of execution:


def listen(self):
    while 1:
        _, job = self.conn.blpop(self.redis_queue)
        job = json.loads(job)
        job['recv'] = time.time()
        self.queue.put(json.loads(job))

def exec_from_queue(self):
    job = self.queue.get(block=True)
    self.send_timing('queue_wait', 1000 * (time.time() - job['recv']))
    result = self.tasks[job['task']](**job['args'])

As the graph shows, sometimes we’re not keeping up after all.

What does this have to do with porting the library to Clojure? Among a dozen other mysteries that plagued me as I was building this out,4 there was brief period where my greatest difficulty was understanding why, at various times as I rearranged the order of things, everything would suddenly grind to a halt. This forced me to look harder at the behavior of channels, blocking put (>!!), and blocking take (<!!) in core.async.

Here again is the listener function in the Clojure version:

(defn listen [job-chan redis-pool redis-queue]
  (loop []
    (let [[_ job-msg] (redis/wcar redis-pool (redis/blpop redis-queue 0))]
      (>!! job-chan job-msg))
    (recur)))

What is the loop doing? It’s

  • Grabbing a job from the redis queue with blpop. This will block until there’s a job available.
  • Putting the job onto a channel. Because we created the channel with no arguments, it is unbuffered, which means that if none of the threads we started are in the point in their loop at which they’re ready to take from the channel, this operation will block until one of them is. And if the listener loop is blocking at this point, it’s not trying to blpop another job from redis.

To restate this another way, a worker’s listener loop will grab at most one job if none of its worker threads are available.

Now contrast the behavior of the Python implementation:


def listen(self):
    while 1:
        _, job = self.conn.blpop(self.redis_queue)
        job = json.loads(job)
        job['recv'] = time.time()
        self.queue.put(json.loads(job))

Here, we are

  • Grabbing a job from the redis queue with blpop. Like before, this will block until there’s a job available.
  • Putting the job onto a Queue. Because we created this Queue with no arguments, it has a max size of 0, which means we can put objects without regard to how many objects are already resident in the queue. So we re-enter the loop as soon as we finish putting the job on the internal queue, and if there’s a job waiting for us we grab it and re-enter the loop as soon as we finish putting the job on the internal queue, and…

To restate this another way, a worker’s listener loop will keep grabbing jobs without regard to whether there are worker threads available to execute them. In fact, the diagram I drew at the beginning of the article should look more like this:

The Actual Architecture

There are many reasons why this might be undesirable. For one thing, it was preventing us from measuring our backlog of pending tasks and alerting us when that backlog backs up. Even worse, a worker process in this configuration can hoard tasks that another worker process might be able to get to sooner (much like some engineers I’ve, uh, worked with). At the very least, you would want to have some insight into just how many tasks are squirreled away within your various worker nodes.

Down The Redis Hole5

So if we accept that these additional hidden queues are a problem, how do we fix it? One possibility is to place an upper bound on the size of your queue and always wait before adding to it when it’s full:

 class Worker(object):
    def __init__(self, tasks, redis_conn, redis_queue, threads=1):
        # ...
        self.queue = Queue(maxsize=threads)

    def listen(self):
        while 1:
            _, job = self.conn.blpop(self.redis_queue)
            self.queue.put(json.loads(job), block=True)

This should result in the same behavior we saw in the Clojure example, where each worker process should grab at most one job when it has no threads to execute it.

That’s better, but now I’m obsessed with ensuring that my workers don’t grab any jobs they’re not ready to process! There are several ways to accomplish this; you could drop the internal queue and give each thread its own listener loop; or you could take an approach like my teammate Nat’s.

You might just as reasonably decide to monitor these internal queues so that any backlog of tasks they build up remains visible. You might reasonbaly decide to separate your workers according to the nature of their workloads, and let workers that handle jobs that are uniformly and predictably fast prefetch as many as they can.

But I am not a reasonable person, so here is what I did with my Clojure worker:

(defn start-thread [task-map job-chan ready-chan redis-pool]
  (loop []
    (>!! ready-chan true)
    (let [job (<!! job-chan)]
      (exec task-map job redis-pool))
    (recur)))

(defn listen [job-chan ready-chan redis-pool redis-queue]
  (loop []
    (let [is-somebody-ready? (<!! ready-chan)
          [_ job-msg] (redis/wcar redis-pool (redis/blpop redis-queue 0))]
      (>!! job-chan job-msg))
    (recur)))

(defn start [task-map redis-pool redis-queue num-threads]
  (let [job-chan (chan)
        ready-chan (chan num-threads)]
    (dotimes [n num-threads]
      (future (start-thread task-map job-chan ready-chan redis-pool)))
    (listen job-chan ready-chan redis-pool redis-queue)))

Here, in addition to creating one channel for incoming requests, we create a second channel for work requests from the worker threads. As each worker thread enters its loop, it pushes true onto this channel (which is a buffered channel able to accept as many values as there are worker threads). And now the first thing the listener thread does as it enters its loop is to try and take a value from that "request request” channel. If all the threads are busy, the channel will be empty and the listener thread will block. Once a thread comes free and puts a value on that channel, the listener thread will succeed in taking it and then start trying to blpop a request from the redis queue. With this configuration, we keep our backlog in redis, and we know that the worker taking a task from that backlog is really ready to perform the work: no more distributing tasks to workers that are already underwater.

Coda

In the course of testing the version of the Clojure code presented here, I managed to emit the following to my console:

"thread waiting for work"
"thread waiting for work"
"thread waiting for work"
""ltihsrteeande rw awiatiitnign gf ofro rw ojrokbs""

The reader is referred to Ned Batchelder’s excellent discussion of the two problems (or perhaps I should say tprowblo ems) joke.6


  1. Synchronous, as in we want the results before responding to some request. For true background jobs, we’re happy to stick with preexisting solutions like resque and the like. 

  2. This is a stripped-down version of the real code, which handles exceptions, logs things, sends stats to graphite, provides decorators to create task functions, decrypts encrypted requests, and so on. But it preserves the basic structure. 

  3. Reasonable, but trickier than it sounds. You could put a timestamp in the job before enqueuing it, and measure the time elapsed when the worker pulls the job out, but as you’ll recall from Kris’s article, there are times when the enqueueing operation itself takes long enough to skew the measurement. Or you could write a pair of Lua scripts for enqueueing and dequeueing: timestamp in, rpush; timestamp out, blpop. A project for another day maybe. 

  4. Probably the most difficult of which was figuring out a way to use metadata to annotate functions so that the worker could dynamically read them from a namespace specified as a command-line argument and describe them to the registry; tl,dr: functions are not vars! 

  5. If this article were about pun-driven-development, this would be the point in the plot where we drop redis for rabbitmq. On the other hand, if I didn’t like the consonantal near-rhyme of the current title better, I might have more accurately called this section “Down Async Hole.” 

  6. If the reader has no time for jokes, this post does a good job of explaining why this happened and how to avoid it (though I never saw it in the real implementation, because I was logging with timbre instead of prn). 


Jstern90x90

Jon Stern
Director, Software Engineering