OPTIONS

Pre-Aggregated Reports

Overview

This document outlines the basic patterns and principles for using MongoDB as an engine for collecting and processing events in real time for use in generating up to the minute or second reports.

Problem

Servers and other systems can generate a large number of documents, and it can be difficult to access and analyze such large collections of data originating from multiple servers.

This document makes the following assumptions about real-time analytics:

  • There is no need to retain transactional event data in MongoDB, and how your application handles transactions is outside of the scope of this document.
  • You require up-to-the minute data, or up-to-the-second if possible.
  • The queries for ranges of data (by time) must be as fast as possible.

See also

Storing Log Data.”

Solution

The solution described below assumes a simple scenario using data from web server access logs. With this data, you will want to return the number of hits to a collection of web sites at various levels of granularity based on time (i.e. by minute, hour, day, week, and month) as well as by the path of a resource.

To achieve the required performance to support these tasks, upserts and increment operations will allow you to calculate statistics, produce simple range-based queries, and generate filters to support time-series charts of aggregated data.

Schema

Schemas for real-time analytics systems must support simple and fast query and update operations. In particular, attempt to avoid the following situations which can degrade performance:

  • documents growing significantly after creation.

    Document growth forces MongoDB to move the document on disk, which can be time and resource consuming relative to other operations;

  • queries requiring MongoDB to scan documents in the collection without using indexes; and

  • deeply nested documents that make accessing particular fields slow.

Intuitively, you may consider keeping “hit counts” in individual documents with one document for every unit of time (i.e. minute, hour, day, etc.) However, queries must return multiple documents for all non-trivial time-rage queries, which can slow overall query performance.

Preferably, to maximize query performance, use more complex documents, and keep several aggregate values in each document. The remainder of this section outlines several schema designs that you may consider for this real-time analytics system. While there is no single pattern for every problem, each pattern is more well suited to specific classes of problems.

One Document Per Page Per Day

Consider the following example schema for a solution that stores all statistics for a single day and page in a single document:

{
    _id: "20101010/site-1/apache_pb.gif",
    metadata: {
        date: ISODate("2000-10-10T00:00:00Z"),
        site: "site-1",
        page: "/apache_pb.gif" },
    daily: 5468426,
    hourly: {
        "0": 227850,
        "1": 210231,
        ...
        "23": 20457 },
    minute: {
        "0": 3612,
        "1": 3241,
        ...
        "1439": 2819 }
}

This approach has a couple of advantages:

  • For every request on the website, you only need to update one document.
  • Reports for time periods within the day, for a single page require fetching a single document.

There are, however, significant issues with this approach. The most significant issue is that, as you upsert data into the hourly and minute fields, the document grows. Although MongoDB will pad the space allocated to documents, it will need to reallocate these documents multiple times throughout the day, which impacts performance.

Pre-allocate Documents

Simple Pre-Allocation

To mitigate the impact of repeated document migrations throughout the day, you can tweak the “one document per page per day” approach by adding a process that “pre-allocates” documents with fields that hold 0 values throughout the previous day. Thus, at midnight, new documents will exist.

Note

To avoid situations where your application must pre-allocate large numbers of documents at midnight, it’s best to create documents throughout the previous day by upserting randomly when you update a value in the current day’s data.

This requires some tuning, to balance two requirements:

  1. your application should have pre-allocated all or nearly all of documents by the end of the day.
  2. your application should infrequently pre-allocate a document that already exists to save time and resources on extraneous upserts.

As a starting point, consider the average number of hits a day (h), and then upsert a blank document upon update with a probability of 1/h.

Pre-allocating increases performance by initializing all documents with 0 values in all fields. After create, documents will never grow. This means that:

  1. there will be no need to migrate documents within the data store, which is a problem in the “one document per page per day” approach.
  2. MongoDB will not add padding to the records, which leads to a more compact data representation and better memory use of your memory.

Add Intra-Document Hierarchy

Note

MongoDB stores BSON documents as a sequence of fields and values, not as a hash table. As a result, writing to the field stats.mn.0 is considerably faster than writing to stats.mn.1439.

BSON memory layout (unoptimized)

In order to update the value in minute #1349, MongoDB must skip over all 1349 entries before it.

To optimize update and insert operations you can introduce intra-document hierarchy. In particular, you can split the minute field up into 24 hourly fields:

{
    _id: "20101010/site-1/apache_pb.gif",
    metadata: {
        date: ISODate("2000-10-10T00:00:00Z"),
        site: "site-1",
        page: "/apache_pb.gif" },
    daily: 5468426,
    hourly: {
        "0": 227850,
        "1": 210231,
        ...
        "23": 20457 },
    minute: {
        "0": {
            "0": 3612,
            "1": 3241,
            ...
            "59": 2130 },
        "1": {
        "60": ... ,
        },
        ...
        "23": {
            ...
            "1439": 2819 }
    }
}

This allows MongoDB to “skip forward” throughout the day when updating the minute data, which makes the update performance more uniform and faster later in the day.

BSON memory layout (optimized)

To update the value in minute #1349, MongoDB first skips the first 23 hours and then skips 59 minutes for only 82 skips as opposed to 1439 skips in the previous schema.

Separate Documents by Granularity Level

Pre-allocating documents is a reasonable design for storing intra-day data, but the model breaks down when displaying data over longer multi-day periods like months or quarters. In these cases, consider storing daily statistics in a single document as above, and then aggregate monthly data into a separate document.

This introduce a second set of upsert operations to the data collection and aggregation portion of your application but the gains reduction in disk seeks on the queries, should be worth the costs. Consider the following example schema:

  1. Daily Statistics

    {
        _id: "20101010/site-1/apache_pb.gif",
        metadata: {
            date: ISODate("2000-10-10T00:00:00Z"),
            site: "site-1",
            page: "/apache_pb.gif" },
        hourly: {
            "0": 227850,
            "1": 210231,
            ...
            "23": 20457 },
        minute: {
            "0": {
                "0": 3612,
                "1": 3241,
                ...
                "59": 2130 },
            "1": {
                "0": ...,
            },
            ...
            "23": {
                "59": 2819 }
        }
    }
    
  2. Monthly Statistics

    {
        _id: "201010/site-1/apache_pb.gif",
        metadata: {
            date: ISODate("2000-10-00T00:00:00Z"),
            site: "site-1",
            page: "/apache_pb.gif" },
        daily: {
            "1": 5445326,
            "2": 5214121,
            ... }
    }
    

Operations

This section outlines a number of common operations for building and interacting with real-time-analytics reporting system. The major challenge is in balancing performance and write (i.e. upsert) performance. All examples in this document use the Python programming language and the PyMongo driver for MongoDB, but you can implement this system using any language you choose.

Log an Event

Logging an event such as a page request (i.e. “hit”) is the main “write” activity for your system. To maximize performance, you’ll be doing in-place updates with the upsert operation. Consider the following example:

from datetime import datetime, time

def log_hit(db, dt_utc, site, page):

    # Update daily stats doc
    id_daily = dt_utc.strftime('%Y%m%d/') + site + page
    hour = dt_utc.hour
    minute = dt_utc.minute

    # Get a datetime that only includes date info
    d = datetime.combine(dt_utc.date(), time.min)
    query = {
        '_id': id_daily,
        'metadata': { 'date': d, 'site': site, 'page': page } }
    update = { '$inc': {
            'hourly.%d' % (hour,): 1,
            'minute.%d.%d' % (hour,minute): 1 } }
    db.stats.daily.update(query, update, upsert=True)

    # Update monthly stats document
    id_monthly = dt_utc.strftime('%Y%m/') + site + page
    day_of_month = dt_utc.day
    query = {
        '_id': id_monthly,
        'metadata': {
            'date': d.replace(day=1),
            'site': site,
            'page': page } }
    update = { '$inc': {
            'daily.%d' % day_of_month: 1} }
    db.stats.monthly.update(query, update, upsert=True)

The upsert operation (i.e. upsert=True) performs an update if the document exists, and an insert if the document does not exist.

Note

This application requires upserts, because the pre-allocation method only pre-allocates new documents with a high probability, not with complete certainty.

Without preallocation, you end up with a dynamically growing document, slowing upserts as MongoDB moves documents to accommodate growth.

Pre-allocate

To prevent document growth, you can preallocate new documents before the system needs them. As you create new documents, set all values to 0 for so that documents will not grow to accommodate updates. Consider the following preallocate() function:

def preallocate(db, dt_utc, site, page):

    # Get id values
    id_daily = dt_utc.strftime('%Y%m%d/') + site + page
    id_monthly = dt_utc.strftime('%Y%m/') + site + page

    # Get daily metadata
    daily_metadata = {
        'date': datetime.combine(dt_utc.date(), time.min),
        'site': site,
        'page': page }
    # Get monthly metadata
    monthly_metadata = {
        'date': daily_metadata['date'].replace(day=1),
        'site': site,
        'page': page }

    # Initial zeros for statistics
    hourly = dict((str(i), 0) for i in range(24))
    minute = dict(
        (str(i), dict((str(j), 0) for j in range(60)))
        for i in range(24))
    daily = dict((str(i), 0) for i in range(1, 32))

    # Perform upserts, setting metadata
    db.stats.daily.update(
        {
            '_id': id_daily,
            'hourly': hourly,
            'minute': minute},
        { '$set': { 'metadata': daily_metadata }},
        upsert=True)
    db.stats.monthly.update(
        {
            '_id': id_monthly,
            'daily': daily },
        { '$set': { 'm': monthly_metadata }},
        upsert=True)

The function pre-allocated both the monthly and daily documents at the same time. The performance benefits from separating these operations are negligible, so it’s reasonable to keep both operations in the same function.

Ideally, your application should pre-allocate documents before needing to write data to maintain consistent update performance. Additionally, its important to avoid causing a spike in activity and latency by creating documents all at once.

In the following example, document updates (i.e. “log_hit()”) will also pre-allocate a document probabilistically. However, by “tuning probability,” you can limit redundant preallocate() calls.

from random import random
from datetime import datetime, timedelta, time

# Example probability based on 500k hits per day per page
prob_preallocate = 1.0 / 500000

def log_hit(db, dt_utc, site, page):
    if random.random() < prob_preallocate:
        preallocate(db, dt_utc + timedelta(days=1), site, page)
    # Update daily stats doc
    ...

Using this method, there will be a high probability that each document will already exist before your application needs to issue update operations. You’ll also be able to prevent a regular spike in activity for pre-allocation, and be able to eliminate document growth.

Retrieving Data for a Real-Time Chart

This example describes fetching the data from the above MongoDB system, for use in generating a chart that displays the number of hits to a particular resource over the last hour.

Querying

Use the following query in a find_one operation at the Python/PyMongo console to retrieve the number of hits to a specific resource (i.e. /index.html) with minute-level granularity:

>>> db.stats.daily.find_one(
...     {'metadata': {'date':dt, 'site':'site-1', 'page':'/index.html'}},
...     { 'minute': 1 })

Use the following query to retrieve the number of hits to a resource over the last day, with hour-level granularity:

>>> db.stats.daily.find_one(
...     {'metadata': {'date':dt, 'site':'site-1', 'page':'/foo.gif'}},
...     { 'hourly': 1 })

If you want a few days of hourly data, you can use a query in the following form:

>>> db.stats.daily.find(
...     {
...         'metadata.date': { '$gte': dt1, '$lte': dt2 },
...         'metadata.site': 'site-1',
...         'metadata.page': '/index.html'},
...     { 'metadata.date': 1, 'hourly': 1 } },
...     sort=[('metadata.date', 1)])

Indexing

To support these query operation, create a compound index on the following daily statistics fields: metadata.site, metadata.page, and metadata.date (in that order.) Use the following operation at the Python/PyMongo console.

>>> db.stats.daily.ensure_index([
...     ('metadata.site', 1),
...     ('metadata.page', 1),
...     ('metadata.date', 1)])

This index makes it possible to efficiently run the query for multiple days of hourly data. At the same time, any compound index on page and date, will allow you to query efficiently for a single day’s statistics.

Get Data for a Historical Chart

Querying

To retrieve daily data for a single month, use the following query:

>>> db.stats.monthly.find_one(
...     {'metadata':
...         {'date':dt,
...         'site': 'site-1',
...         'page':'/index.html'}},
...     { 'daily': 1 })

To retrieve several months of daily data, use a variation on the above query:

>>> db.stats.monthly.find(
...     {
...         'metadata.date': { '$gte': dt1, '$lte': dt2 },
...         'metadata.site': 'site-1',
...         'metadata.page': '/index.html'},
...     { 'metadata.date': 1, 'daily': 1 } },
...     sort=[('metadata.date', 1)])

Indexing

Create the following index to support these queries for monthly data on the metadata.site, metadata.page, and metadata.date fields:

>>> db.stats.monthly.ensure_index([
...     ('metadata.site', 1),
...     ('metadata.page', 1),
...     ('metadata.date', 1)])

This field order will efficiently support range queries for a single page over several months.

Sharding

The only potential limits on the performance of this system are the number of shards in your system, and the shard key that you use.

An ideal shard key will distribute upserts between the shards while routing all queries to a single shard, or a small number of shards.

While your choice of shard key may depend on the precise workload of your deployment, consider using { metadata.site: 1, metadata.page: 1 } as a shard key. The combination of site and page (or event) will lead to a well balanced cluster for most deployments.

Enable sharding for the daily statistics collection with the following shardCollection command in the Python/PyMongo console:

>>> db.command('shardCollection', 'stats.daily', {
...     key : { 'metadata.site': 1, 'metadata.page' : 1 } })

Upon success, you will see the following response:

{ "collectionsharded" : "stats.daily", "ok" : 1 }

Enable sharding for the monthly statistics collection with the following shardCollection command in the Python/PyMongo console:

>>> db.command('shardCollection', 'stats.monthly', {
...     key : { 'metadata.site': 1, 'metadata.page' : 1 } })

Upon success, you will see the following response:

{ "collectionsharded" : "stats.monthly", "ok" : 1 }

One downside of the { metadata.site: 1, metadata.page: 1 } shard key is: if one page dominates all your traffic, all updates to that page will go to a single shard. This is basically unavoidable, since all update for a single page are going to a single document.

You may wish to include the date in addition to the site, and page fields so that MongoDB can split histories so that you can serve different historical ranges with different shards. Use the following shardCollection command to shard the daily statistics collection in the Python/PyMongo console:

>>> db.command('shardCollection', 'stats.daily', {
...     'key':{'metadata.site':1,'metadata.page':1,'metadata.date':1}})
{ "collectionsharded" : "stats.daily", "ok" : 1 }

Enable sharding for the monthly statistics collection with the following shardCollection command in the Python/PyMongo console:

>>> db.command('shardCollection', 'stats.monthly', {
...     'key':{'metadata.site':1,'metadata.page':1,'metadata.date':1}})
{ "collectionsharded" : "stats.monthly", "ok" : 1 }

Note

Determine your actual requirements and load before deciding to shard. In many situations a single MongoDB instance may be able to keep track of all events and pages.