Marc Paré
RVIT Co.

Building the Analytics Infrastructure for an IoT Startup

I spent 2014 and 2015 in charge of the backend for an IoT startup. I built the system from scratch, coordinated contribution from a team of developers and data scientists, as well as kept it running 24/7. At its peak, the system received 170 GB of raw data a day from sensors deployed across the United States, collecting over 20TB of data by the end of the project.

Here's how I designed and scaled the system and what surprised me along the way.

Setting the stage

The Persistent Efficiency team brought me on after a few iterations of their hardware were completed. This was pretty interesting stuff: a novel energy sensor that was completely non-invasive; you could just tape it directly onto breaker panels and start collecting data.

The team ran into difficulties with off the shelf tools storing and navigating the streams of data from a single installation. The root of the issue was that the sensors were quite chatty, sending full waveforms of data. This resulted in payloads of tens of kilobytes each, which grew very quickly considering each of the 10+ sensors on a panel sent these payloads every few seconds.

Early hardware. That's a Raspberry Pi almost out of frame in the bottom right.

Final generation hardware: smaller, faster, and stickier.

Initial design

I worked with the team to design a system that could handle their large streams of data as well as support continued innovation in their metrics pipeline.

The first version of this system was named WATT (Where All the Time Series are Tabulated). We evolved it significantly a few months later, naming it "Faraday", after another influential energy scientist.

I broke the system into two main chunks: one for real-time metrics and another for offline data processing.

Incoming data is immediately queued for processing into real-time metrics as well as archived in the offline data system.

I chose Heroku as a starting point with certain performance-critical components offloaded to AWS. Heroku was extremely useful for facilitating contribution by junior developers as well as allowing us to multiply the effort of a small number of engineers along the path to finding product-market fit.

Real-time Metrics

To be useful to users, raw sensor data had to be run through some fairly complex algorithms to deconvolute crosstalk from the sensor array.

At peak scale, about 170 GB of raw data passed through this pipeline every day, generating about 500,000 rows of metrics in Postgres.

A queue was used to allow scaling capacity of real-time metrics process. Raw payloads were pushed to a queue by the web app, then popped off by workers for deconvolution. The resulting metrics were pushed to Postgres by the worker.

Caching was used judiciously throughout the metric worker process. A clever bit of caching for real-time metrics used Redis expiring keys. A complex blob of metadata was needed for each metrics computation, which was very difficult to properly invalidate in the cache. Instead of tackling this head on, I just cached it with a five minute expiring key. This metadata changed very rarely (perhaps monthly), which made this dead simple approach very effective.

Rollup Metrics

To make queries fast, metrics were rolled up across various time intervals: minutely, five-minutely, hourly, daily, etc. Rollups were a simple sum: a minutely rollup is the sum of all the metrics received during that minute.

This was a tricky piece to get right. The initial implementation produced far too much concurrent write load on Postgres, causing constantly increasing database bloat.

To reduce concurrent write load on Postgres, I came up with a scheme for delaying rollups. First, some words on approaches that don't work. One approach you could take is to trigger a rollup every time a new metrics comes in. This is the aforementioned case that causes too much write load since you recompute every interval of rollup (minutely, five-minutely, etc.) for every single metric that comes in. Another approach that doesn't work is to schedule rollups based on a clock. For example, at 12:01PM, you rollup the data for 12:00PM to 12:01PM. There are many cases where data doesn't hit the server at exactly the right time. Also, sometimes timestamps drift slightly as clocks get out of sync on data acquisition devices. So what do you do?

I developed a scheme using Redis sorted sets to delay rolling up metrics intelligently. When a new metric enters the system, a timestamp tracking the latest metric receipt is refreshed in a Redis sorted set (the exact key scheme is "[panel-id]-[minute]"). Rollup workers periodically check if the oldest key in this sorted set is old enough to trigger a rollup. The trick here is that as long as metrics are still coming in for a given minutely interval, the timestamp will keep getting refreshed, never being old enough to trigger a rollup. Once readings stop coming in for that minute, the key stops being refreshed and eventually gets old enough to trigger a rollup. Then, the rollup is created (e.g. the sum of all the power readings are added up for a particular minute) and another key is touched in Redis, this time for the next rollup in the chain -- first minutely, then five-minutely, fifteen-minutely, hourly, etc. During normal operation, a rollup for a given interval will only ever be created once. The scheme is also resilient to many failure scenarios such as a customer network outage that delays data acquisition.

While we're on the subject of write load, database bloat is a problem particular to Postgres because of its MVCC model. Basically, if you continuously update the same rows, the engine isn't able to delete old data, and queries to these rows start to slow down and eventually fail. Uber's engineering team ran into a variation of this issue recently. Fortunately, I was able to find a work around for our case instead of having to migrate to MySQL. I found this scaling problem to be a particularly valuable lesson, as it showed the importance of understanding the particulars of a database engine rather than just high level SQL.

Offline Data Processing

Every bit of data that entered Faraday was archived into a data lake. This was a key requirement for the system as the algorithms for interpreting raw sensor data were under continuous development. As the algorithms changed, the entire set of historical metrics had to be rebuilt. This sort of architecture has become increasingly prevalent because of the decreasing cost of storage and compute power as well as improved tooling for distributed computation (in other words: Big Data!).

Data lake archival was relatively straightforward and inexpensive thanks to Amazon S3. The only wrinkle with this process is that multiple sensor payloads needed to be combined into a single file since it is much more efficient for Map Reduce jobs to have large files.

To achieve this, I built an accumulator with DynamoDB. Payloads are initially pushed to DynamoDB, keyed by panel id and timestamp. Workers periodically checked if there were enough payloads accumulated for a panel to archive. After archiving, the payloads were deleted from DynamoDB.

Data archived to S3 is binned by the ID of the panel that it is monitoring as well as the timestamp that it was received. The keys of the objects in S3 look something like this:

faraday/payloads/[panel-id]/[year]/[month]/[day]/[hour]/[minute]/[timestamp].json.gz

This key format was chosen to make MapReduce jobs easy to run on subsets of the data. For example, to reprocess all the data for a panel, you would issue a command that looked something like:

$> recompute s3://raw-production-payloads/[panel-id]/*

The last benefit of the data lake was that it helped me sleep at night, since it was very hard to lose data and bugs in metrics computations could be retroactively fixed.

MapReduce

MapReduce was a key component in Faraday.

The data science team of Persistent Efficiency worked constantly to improve the deconvolution algorithms to retrieve more signal from their noisy sensors. Each time these algorithms were updated, every historical metric had to be recomputed. MapReduce on the AWS platform made this possible.

I designed a small framework for metrics computations that allowed the same code to be run as part of the real-time system as well as the MapReduce system. This meant that any update to the algorithms would be immediately ready for rebuilding via MapReduce without having to write new code.

One tricky part of rebuilding metrics tables is loading rebuilt metrics into Postgres. Sending a bunch of INSERT and UPDATE queries is too slow. Eventually, I worked out a variation of the approach described in this StackOverflow answer to bulk upsert large quantities of data. I also found it's important to remove indexes and CLUSTER before bulk upserts. All of this would eventually be combined into a series of no-brainer command-line scripts like so:

$> recompute --runner emr --algorithm panelytics [panel-id] [s3-url]
$> prep-import 
$> import-metrics --target prod [filename.csv]

Finally, some words on working with Amazon's MapReduce offering, EMR. Overall, it was easy to get started with thanks to mrjob. Eventually, though, I had to spend some quality time with the AWS EMR best practices white paper. There were a number of dramatic performance improvements possible by tweaking memory settings, Python packages, and data serialization. Nothing too fancy, mostly basic tweaks after inspecting hot spots using the various profiling tools. It was important to dedicate time to performance right away, as it is easy to inadvertently write very slow jobs.

Capacity Testing with Synthetic Load

In the early stages of the project, it was very difficult to estimate the eventual load that the app would receive. To get ahead of possible scaling issues, I created a tool to generate synthetic load on the application.

The tool was very simple: generate realistic payloads and push them through the system. I implemented it as a Heroku app so that the load could be scaled up as high as desired. I ran tests simulating extreme loads and smoked out a number of scale issues far before they affected customer installations.

I thought I was way ahead of capacity because of my testing. Fortunately for the company, though, I was wrong, and we eventually grew past the upper limit of my tests. Funnily enough, at almost the exact scale past my initial tests, we ran into a new (and quite severe) bottleneck. It doesn't work until you test it!

Why not NoSQL?

The topic of NoSQL stores for analytics data came up quite a few times over the course of this project. Each time, I decided against them, and it did not turn out to be a limitation. Indeed, relational storage in Postgres was extremely useful as we built out aggregation that required complex combinations of metadata, which would have been more difficult accomplish with the more limited query functionality in NoSQL stores.

One of the key arguments for NoSQL stores is raw performance. However, I found that using Postgres properly was more than performant enough. It is surprisingly easy to make SQL slow but also surprisingly fast when used well.

This whole thread on Postgres scalability on Hacker News was a useful guide. I liked this scaling rule of thumb:

Approximately, if you have something like 10+ billion items, use Cassandra. If you have less than 10 billion items, Postgres will be fine, and is easier to manage IMO. If you do use postgres, you should vertically partition the table. This will help keep indexes smaller, improve the the cache hit rate, vastly improve the ease with which you can drop older data, and make various other admin tasks easier.

We never ended up needing to do the table partitioning upgrade, but it would have allowed us to scale even further with Postgres when the time came.

Tools

We leaned on Open Source and SaaS for everything in this project

Conclusion

Two of the driving considerations for the design of a system like Faraday are scaling and resilience. You want to be able to turn a dial to increase the capacity of the system (scale), and you want the system to fail as gracefully as possible when you encounter inevitable bugs and failures (resilience). We definitely achieved those goals over the course of the project.

For resilience, at the worst case, as long as data is making it into cold storage, anything in the database could be rebuilt at any time.

If upstream components such as metrics workers or Postgres were acting up, payloads would queue in SQS until they stopped throwing errors. This meant that many classes of bugs resulted in only degraded performance rather than complete system downtime.

Each of the pools of workers in Faraday could be dialed up at any time to an arbitrary number servers. Well, at least in theory. In practice, sometimes workers would bottleneck at a certain scale. Fortunately, none of these bottlenecks proved catastrophic, and I was able to continually improve the system over time (rather than be stuck redesigning it from scratch because of fundamental architectural mistakes).

Overall, this was quite a fun system to design and operate. There were a few restless nights while debugging tricky scaling issues, but only because the problems were so interesting. Indeed, the much harder to problem to solve with this project was finding product-market fit; the company ran out of runway far before the backend failed to scale!