Mutterings and Musings

Realtime Metrics Using Ordered Sets and Redis

At IQ Engines, our system generates several different types of features when we process images; with the release of our SmartAlbum API, we wanted to be able to show what is happening across multiple feature detectors in as near real-time possible. Additionally, we have the constraint that we'd like to keep a record of all events; in other words we want this to be as lossless as possible.

I've found Redis to be a great data storage tool; it's very fast, robust, mature, and provides operations at the a data structure level. It pretty much takes your Intro to Data Structures class and turns it into a database. It's the swiss army knife to MySQL's hammer.

Prior research

Before rolling our own, we took a good look at another solution out there, Spool's metrics tracking. Their method was clever, and involved using Redis as well (the bitset (now SETBIT) and BITCOUNT operations), which -- provided one uses an hash with reasonably low collision potential -- would total up unique events within a certain window. The windows could then identified via the string name, e.g., unique_users_2007_12_04 (unique users for December 4, 2007). Redis' bitset operation runs in O(1) time, and its bitcount operation runs in O(N) time. Therefore, each time a user logs in, it's an operation of ultimate simplicity to record, and retrieval (a much less-frequent task) takes respectably little time as well. Spool also gets clever with calculating other time frames using Redis' bitop operation, performing unions across multiple time frames to retrieve longer-spanned metric information.

Although this is a great example of Redis' power and the tools it offers, there are a few caveats with this approach:

  1. It only works for events that have a unique identifier; and
  2. It requires defining the metrics' window in advance.

We wanted to be able to reconstruct the statistics in various ways post-recording; in short, to record events, not summaries.

Our approach

I wanted a system that would record all of the events that occurred, and would do so on objects for which computing an unique hash might be difficult. I basically wanted a running stream of events, categorized by time but not prescripted into certain windows.

Enter ordered sets

Redis offers native operations on hashes, lists, strings, and (un)ordered sets, among others. It's the ordered sets that we utilized in our system. Ordered sets arrange unique members by (possibly repeating) scores, e.g., Bob->1, Susie->2, Jean->2, Fred->3, etc. Redis' provides an efficient array of tools for working with ordered sets, including O(log(N)) insertion and O(log(N)) retrieval. Yes, that's right; although it's a bit more expensive to insert into a set than a string, retrieval is actually faster.

The basic idea is this: for each event, give it a unique name and assign its score to be its timestamp. Then, when you go to retrieve the events, you can use ZRANGEBYSCORE to get just the events that occured between time A and time B.

The obvious caveat here is that every event must have a unique name. However, this name can be anything, from something timestamp-ish, to a random number, to - my favorite - a ticker stored in Redis that provides a unique number every time using the INCR command. (With this last one, you are absolutely assured of no collisions, provided the relationship between the db storing the ticker and the one recording the events is consistent.)

Example

So let's say we're recording events for when somebody submits a blob to our API. We have a field in Redis called blob:submit, and our counter is called blob:index. In Python:

## we're using Andy McCurdy's redis-py, ## https://github.com/andymccurdy/redis-py from redis import Redis from time import time

conn = Redis(host=, db=)

def add_blob(*args, **kwargs): ## stuff stuff stuff

## add it to the metrics database! conn.zadd('blob:submit', conn.incr('blob:index'), time())

That's it! We've successfully recorded an event of type 'submit' for object 'blob'!

Retrieval

Ok, so we're plugging along, recording data. But data's pretty meaningless unless you can display it in a pretty manner using d3, right? Well, of course. Now let's assume you want to retrieve the data from the last day using a window size of 5 seconds, left-aligned (meaning here, for a window count of N > 1, the first window is the full window size and the last may be cut off). Let's pull a list of event counts:

conn = Redis(host=, db=)

def retrieve(window, timespan): """ retrieve a sequence of windows of size over the course of the last seconds; time.time() returns ., so our scores are in seconds """

now = time.time() ## keep a consistent reference

data = conn.zrangebyscore('blob:submit', now-86400, now)

## optionally chop into windows windowed_data = []

## python's forced-int in the division below is actually helpful here for ## window alignment for i in xrange(timespan/window)): item = [] while len(data) > 0 and data[0] < window*i: item.append(data.pop(0)) windowed_data.append(item) return windowed_data

And voila! data broken up into windows.

If we're just interested in totals, and not the datapoints themselves, it might be tempting to use ZCOUNT, like so:

def retrieve(window, timespan): now = time.time()

windowed_data = []

for i in xrange(timespan/window): windowed_data.append(conn.zcount('blob:submit', (now-timespan)+(iwinddow), (now-timespan)+((i+1)window))) return windowed_data

It certainly seems easier, but we have to remember that ZCOUNT, like ZRANGEBYSCORE, is a O(log(n)) function, where n is the size of the entire set, including all of the data outside of the interesting range; thus, where our first example only requires the entirety of the data once, the second example requires it once for each window. Yikes. Instead of using ZCOUNT, we can modify the loop in the first example a bit:

for i in xrange(timespan/window)): count = 0 while len(data) > 0 and data[0] < window*i: count += 1 data.pop(0) windowed_data.append(count)

And there we go -- the count, instead of the data itself.

Caching

As with any system where aggregated information is regenerated over and over, we want some caching to keep things lean in subsequent requests. Well, not suprisingly, Redis can help us out here! A simple, solid option for caching static data is the Redis hash, and it's attendent functions HGET and HSET (and, if we do it right, the bulk equivalents HMGET and HMSET).

Let's go ahead and write some simple caching code. We don't need anything complex; <type>:<datatype> should be sufficient.

from colections import defaultdict

def get_cached(type, span, window): """ In this example, we are going to return all of the timestamps. This could be easily modified to return a simple count of events, too. """ now = time.time()

values = defaultdict(list) missing_indices = []

for step in xrange(now-span, now, window): ## retrieve from our cached hash, => : cached = conn.hget('statcache:%s' % type, '%s:%s' % ()art, window)) if not cached: missing_indices.append(step) else: values[step] = cached replacement_indices = missing_indices[:] ## make a copy for modification

## now cache all values that we don't have if len(replacement_indices) > 0: data = conn.zrangebyscore('blob:submit', now-span, now, withscores=True)

## we can assume the data is ordered for d,tstamp in data: if tstamp >= replacement_indices[0]: while len(replacement_indices) > 1 and tstamp >= replacement_indices[1]: ## move to the next window replacement_indices.pop(0) ## we've advanced to the relevant window values[replacement_indices[0]].append(tstamp)

## now cache the new values [conn.hset('statcache:%s' % type, '%s:%s' % (start, window), values[i]) for i in missing_indices]

Look pretty good so far, but we're leaving out one detail that's crucial for consistency: key normalization. In this case, we need to normalize our cache indices somehow, or else the cache jumps all over the place. For example, if we have a 5 second window and timespan, and we poll for it every second, then we will get a new set of data every time, i.e.:

## assuming instantaneous calculation

now = time.time()

get_cached('stat', 5, 5) ## returns [now-5, now] time.sleep(1) get_cached('stat', 5, 5) ## returns [now+1-5, now+1], oh noes!

We can correct this problem with some simple window alignment. Let's add the following to the beginning of the function above:

... now = time.time() offset = now % window now = now-offset ## now everything will be aligned! ...

Now, given the same window size, we will be able to use the same cached values moving forward, and our beautiful d3 graphs will transition smoothly.

Further improvements

It is probably obvious that storing all of this information could take a whole bunch of memory. Redis is an in-memory database, so, depending on how beefy your machine is, this could become a problem sooner or later.

This problem could be mostly eliminated by periodic preening. As a simple example, let's say that you only need to know about events in the last month. Let's also assume that the only window size being employed is the 5-second window (obviously this can change depending on timespam). In that case, Redis' ZREMRANGEBYSCORE could be employed:

import re

## we only want the latest month's data now = time.time() one_month = 2592000

conn.zremrangebyscore('blob:submit', '-inf', now - one_month)

## of course we want to remove the cached values, too offset = now % 5 keys = hmget('statcache:%s' % type).keys()

## let's pipeline this to keep roundtrips down pipe = conn.pipeline(transaction=False) for k in keys: cur_start =
re.match('^(?P.):(?P.)$', k).groupdict()['start'] if cur_start < now - one_month: pipe.hdel('statcache:%s' % type, k) pipe.execute()

You can run this (or some variant) however often you feel is necessary. Additionally, you can cache to different time scales for different caching periods, and clean those out accordingly; for example, you may want 5 second windows for recent data, but only need 1 hour windows for the last year, so cleaning out only 5 second windows for anything older than a month, and everything for windows older than a year would be appropriate.

Conclusion

Redis provides a very powerful toolbox for doing all sorts of neat tricks. We've coerced the ordered set operations into doing our bidding for tracking events across time; more generally, it can be used to track any numerically-ordered set of unique events. More advanced usage could use UUID's as the keys into a more complex dataset, and then index into that using several metrics via the ordered set ops. And this is just one data type; I encourage everyone to check out the Redis docs to see what else this amazing tool can do.