OPTIONS

Java Driver and Aggregation Framework

Release 2.2.0 of MongoDB introduces the aggregation framework. Designed to be both performant and easy to use, the aggregation framework calculates aggregate values, (such as counts, totals and averages), without the need for complex map-reduce operations. The aggregation framework is both multithreaded and written in C++, thus it executes natively across nodes.

Note

The following code snippets come from the AggregationExample.java example code that can be found with the driver source.

Aggregation tasks are built around the concept of the aggregation pipeline. Just as UNIX-like shells use the pipe operator | to connect a series of command-line operations together, the aggregation framework passes documents through a pipeline of operations which transform these objects as they go. Version 2.9.0 of the Java driver provides a new helper method, DBCollection.aggregate() which can be used to create aggregation tasks.

Let’s use a simple example to demonstrate how the aggregation helper works. Suppose I am using MongoDB to store my employee’s travel expenses. I’ve created a collection named expenses, which store individual expenses by employee and by department. Here’s a sample document:

{  "_id" : ObjectId("503d5024ff9038cdbfcc9da4"),
   "employee" : 61,
   "department" : "Sales",
   "amount" : 77,
   "type" : "airfare"
}

First add some sample data:

DBCollection coll = db.getCollection("aggregationExample");
coll.insert(new BasicDBObjectBuilder()
       .add("employee", 1)
       .add("department", "Sales")
       .add("amount", 71)
       .add("type", "airfare")
       .get());
coll.insert(new BasicDBObjectBuilder()
       .add("employee", 2)
       .add("department", "Engineering")
       .add("amount", 15)
       .add("type", "airfare")
       .get());
coll.insert(new BasicDBObjectBuilder()
       .add("employee", 4)
       .add("department", "Human Resources")
       .add("amount", 5)
       .add("type", "airfare")
       .get());
coll.insert(new BasicDBObjectBuilder()
       .add("employee", 42)
       .add("department", "Sales")
       .add("amount", 77)
       .add("type", "airfare")
       .get());

I am auditing three departments: Sales, Engineering and Human Resources. I want to calculate each department’s average spend on airfare. I’d like to use the Aggregation Framework for the audit, so I think of the operation in terms of a pipeline:

  1. Operation: Match documents where type = "airfare"; then pipe into
  2. Operation: Pass only the department and the amount fields through the pipeline; then pipe into
  3. Operation: Average the expense amount, grouped by department.
  4. Operation: Sort the deparments by amount.

I will use the aggregation operators $match, $project and $group to perform each operation. Individual aggregation operations can be expressed as JSON objects, so I can think of my pipeline in JSON as:

  1. First operation:

    $match: { type: "airfare"}
    
  2. Piped into:

    $project: { department: 1, amount: 1 }
    
  3. Piped into:

    $group: { _id: "$department",
              average: { $avg: "$amount" } }
    
  4. Piped into:

    $sort: { "$amount": -1 }
    

I use the Java Driver’s aggregation helper to build out this pipeline in my application. Let’s take a look at the aggregate() method signature.

public AggregationOutput aggregate(final List<DBObject> pipeline)

The aggregate() method takes a list of DBObjects representing the aggregation operations, which will be chained together by the helper method to form the aggregation pipeline. Callers of the aggregate() method must pass at least one aggregation operation. Here’s the Java code we’ll use to perform the aggregation task:

// create our pipeline operations, first with the $match
DBObject match = new BasicDBObject("$match", new BasicDBObject("type", "airfare"));

// build the $projection operation
DBObject fields = new BasicDBObject("department", 1);
fields.put("amount", 1);
fields.put("_id", 0);
DBObject project = new BasicDBObject("$project", fields );

// Now the $group operation
DBObject groupFields = new BasicDBObject( "_id", "$department");
groupFields.put("average", new BasicDBObject( "$avg", "$amount"));
DBObject group = new BasicDBObject("$group", groupFields);

// Finally the $sort operation
DBObject sort = new BasicDBObject("$sort", new BasicDBObject("amount", -1));

// run aggregation
List<DBObject> pipeline = Arrays.asList(match, project, group, sort);
AggregationOutput output = coll.aggregate(pipeline);

Let’s take a look at the results of my audit:

for (DBObject result : output.results()) {
    System.out.println(result);
}
{ "_id" : "Sales" , "average" : 74.0}
{ "_id" : "Engineering" , "average" : 15.0}
{ "_id" : "Human Resources" , "average" : 5.0}

Aggregation cursors

MongoDB 2.6 adds the ability to return a cursor from the aggregation framework. To do that simply use AggregationOptions with the aggregation command:

AggregationOptions aggregationOptions = AggregationOptions.builder()
        .batchSize(100)
        .outputMode(AggregationOptions.OutputMode.CURSOR)
        .allowDiskUse(true)
        .build();

Cursor cursor = coll.aggregate(pipeline, aggregationOptions);

You can iterate the results of the aggregation as a normal cursor.

while (cursor.hasNext()) {
    System.out.println(cursor.next());
}

New in version 2.12.0: Aggregation Cursors are available in the 2.12.0 Java driver and work with MongoDB 2.6.0 and above.

To learn more about aggregation see the aggregation tutorial and the aggregation reference documentation.