ElasticSearch 2.0 and Pipeline Aggregations

-

ElasticSearch 2.0.0 beta is out and apart from many performance related updates, one major addition has been the pipeline aggregations. This has been one of the most anticipated feature requests of the new version, as the name suggests it allows us to set up a pipleline aggregation which is able to perform computation on the buckets produced as a result of the earlier aggregation.

ElasticSearch Pipeline aggregations are broadly classified in two types –
• Parent – A pipeline aggregation computes it’s output (bucket/aggregation) and this output gets added to the bucket/aggregation of the parent aggregation.
• Sibling – An existing aggregation becomes an input of a pipeline of aggregations and you get new aggregations at the same level as the sibling aggregation instead of it becoming part existing buckets on the input aggregations.

We will look at some examples for each of these two to get a better understanding of the concept.
Pipeline Aggregations don’t support sub-aggregations but they do support chaining, thus in a chain of pipeline aggregations the final output contains the output of each aggregation in the chain. In order to reference the aggregation which would be computed upon in a pipeline, the keyword used is “buckets_path” . The syntax is as follows –
“”buckets_path”: “Aggs>Metric””
As we see in the above syntax, bucket_path refers to an aggregation and the metric in that aggregation. Let’s see some examples.
Let us first create an index, based on the data provided in the ElasticSearch – definitive guide. For all the commands, I have used the “sense” extension of chrome, as currently the 2.0.0 Beta version doesn’t support the marvel plugin installation from command prompt.

POST /cars/transactions/_bulk
{ "index": {}}
{ "price" : 10000, "color" : "red", "make" : "honda", "sold" : "2014-10-28" }
{ "index": {}}
{ "price" : 20000, "color" : "red", "make" : "honda", "sold" : "2014-11-05" }
{ "index": {}}
{ "price" : 30000, "color" : "green", "make" : "ford", "sold" : "2014-05-18" }
{ "index": {}}
{ "price" : 15000, "color" : "blue", "make" : "toyota", "sold" : "2014-07-

For brevity purposes I have only shared 4 documents, but in all I have inserted 16 records, you can improvise the above data to add 12 more records in a similar schema.
Link for sample data to get started – https://gist.github.com/tarunsapra/d2e5338bfb2cc032afe6

Average Bucket Aggregation
This is sibling aggregation as it calculates the avg. of a metric of a specified metric in a sibling aggregation. The sibling aggregation must be multi-bucket i.e. it should have multiple grouped values for a certain field (grouping of cars based on sold monthly). Now each group can have it’s total sales per month and with the help of avg. bucket pipleline aggregation we can calculate the average total monthly sales.

GET /cars/transactions/_search?search_type=count
{
   "aggs":{
      "sales_per_month":{
         "date_histogram":{
            "field":"sold",
            "interval":"month",
            "format":"yyyy-MM-dd"
         },
         "aggs":{
            "monthly_sum":{
               "sum":{
                  "field":"price"
               }
            }
         }
      },
      "avg_monthly_sales":{
         "avg_bucket":{
            "buckets_path":"sales_per_month>monthly_sum"
         }
      }
   }
}

Now we are calculating the average of monthly total in sales and the key syntax is the expression
“buckets_path”: “sales_per_month>monthly_sum”
Here the aggregation , “sales_per_month” and it’s metric “monthly_sum “is specified using the buckets_path syntax and this aggregation of “sales_per_month” gives us the sum of prices of cars sold on monthly basis and the sibling aggregation “avg_monthly_sale” generate the aggregation value of average total monthly sales.

"aggregations": {
      "sales_per_month": {
         "buckets": [
            {
               "key_as_string": "2014-01-01",
               "key": 1388534400000,
               "doc_count": 3,
               "monthly_sum": {
                  "value": 185000
               }
            },
           {
               "key_as_string": "2014-02-01",
               "key": 1391212800000,
               "doc_count": 1,
               "monthly_sum": {
                  "value": 25000
               }
            },            ………………  14 more records
         ]
      },
      "avg_monthly_sales": {
         "value": 41900
      }
   }

Thus we get the “avg_monthly_sales” which is in parallel to the aggregation “sales_per_month” aggregation thus this aggregation is sibling aggregation. In the aggregation query we can also change the interval from “month” to “quarter” and we get the average of quarterly total.

"avg_quaterly_sales": {
"value": 104750
}

Maximum and Minimum bucket aggregations
Just like average bucket aggregation, both Max and Min. bucket aggregations are sibling aggregation which are producing the output aggregation in parallel to the input aggregation in our case being “sales_per”month”. Max. and min. pipeline aggregation were eagerly awaited  by ES users as now it becomes straightforward to find the bucket with a max. or min. value based on the metric. In our previous example if we replace “avg_monthly_sale”  by-

"max_monthly_sales": {
          "max_bucket": {
              "buckets_path": "sales_per_month>monthly_sum"
          }
      }

and then by

"min_monthly_sales": {
           "min_bucket": {
               "buckets_path": "sales_per_month>monthly_sum"
           }
       }

We get the following in the output

"min_monthly_sales": {
        "value": 10000,
        "keys": [
           "2014-10-01"
        ]
     }

and for maximum –

"max_monthly_sales": {
         "value": 185000,
         "keys": [
            "2014-01-01"
         ]
      }

Thus we get the max. and min. bucket key along with the value ( this is really cool! ).

Sum Bucket Aggregation
This aggregation is again a sibling aggregation and helps in calculating the sum of all the bucket’s metrics.  For example if in our original aggregation statement, we add the following query before the “aggs” starts i.e. –

"query" : {
        "match" : {
            "make" : "bmw"
        }
   },
“aggs” …..
……

And now we do the aggregation “sum_bmw_sales” for the maker BMW and then just like and max and min. bucket pipeline aggregation we can add –

"sum_bmw_sales": {
      "sum_bucket": {
            "buckets_path": "sales_per_month>monthly_sum"
            }
        }

Thus now we have the per monthly total sale of the BMWs and the total yearly sum of the BMW label as well, in similar manner instead of the car label we can also specify date range or color based search and sum.

Derivative Aggregation
This aggregation is a parent aggregation as the computed derivative of the specified metric becomes part of the bucket of the input aggregation.

GET /cars/transactions/_search?search_type=count
{
   "aggs": {
      "sales_per_month": {
         "date_histogram": {
            "field": "sold",
            "interval": "month",
            "format": "yyyy-MM-dd"
         },
         "aggs": {
            "monthly_sum": {
               "sum": {
                  "field": "price"
               }
            },
            "sales_deriv": {
               "derivative": {
                  "buckets_path": "monthly_sum"
               }
            }
         }
      }
   }
}

In the above query we are calculating the derivative of the “monthly_sum” and output is ..

"aggregations": {
      "sales_per_month": {
         "buckets": [
            {
               "key_as_string": "2014-01-01",
               "key": 1388534400000,
               "doc_count": 3,
               "monthly_sum": {
                  "value": 185000
               }
            },
            {
               "key_as_string": "2014-02-01",
               "key": 1391212800000,
               "doc_count": 1,
               "monthly_sum": {
                  "value": 25000
               },
               "sales_deriv": {
                  "value": -160000
               }
            },
            {
               "key_as_string": "2014-03-01",
               "key": 1393632000000,
               "doc_count": 1,
               "monthly_sum": {
                  "value": 30000
               },
               "sales_deriv": {
                  "value": 5000
               }
            }, ……..13 more records

For the first bucket there is no derivate as derivate needs atleast 2 points.

Cumulative Sum Derivative
This is another Parent pipeline aggregation and calculates the cumulative sum of the specified metric of the input aggregation. In our case it would help in giving us the cumulative sum of the total sales over a monthly basis.
We can replace the “sales_deriv” part in our pervious query with this –

"cumulative_sales": {
               "cumulative_sum": {
                  "buckets_path": "monthly_sum"
               }
            }

and get the following output

"aggregations": {
      "sales_per_month": {
         "buckets": [
            {
               "key_as_string": "2014-01-01",
               "key": 1388534400000,
               "doc_count": 3,
               "monthly_sum": {
                  "value": 185000
               },
               "cumulative_sales": {
                  "value": 185000
               }
            },
            {
               "key_as_string": "2014-02-01",
               "key": 1391212800000,
               "doc_count": 1,
               "monthly_sum": {
                  "value": 25000
               },
               "cumulative_sales": {
                  "value": 210000
               }
            },
            {
               "key_as_string": "2014-03-01",
               "key": 1393632000000,
               "doc_count": 1,
               "monthly_sum": {
                  "value": 30000
               },
               "cumulative_sales": {
                  "value": 240000
               }
            }, ..13 more records..

With this aggregation we can easily visualize the cumulative sum over certain peak period for various products to get more insights.

Bucket Script Aggregation
This is s parent pipeline aggregation and uses scripts to perform arithmetic computation on specified metrics of each bucket of a multi-bucket aggregation. A use-case can be to add/subtract or calculate percentage of a sub-aggregation in context of a bucket. For example if you want to calculate monthly percentage of total sales of the BMW car then first we would need to put a sub-aggregation in place in each bucket for BMW maker and then calculate the percentage of BMWs sold monthly in context of total sales.
This is Pipeline Aggregation uses scripting, please read here for more details. Currently I would be using inline scripting which as advised by elastic is not secure for production environment. Thus to enable inline scripting please add the following line to your elasticsearch.yml file in config folder.
script.inline: on

"aggs": {
      "sales_per_month": {
         "date_histogram": {
            "field": "sold",
            "interval": "month",
            "format": "yyyy-MM-dd"
         },
         "aggs": {
            "monthly_sum": {
               "sum": {
                  "field": "price"
               }
            },
            "bmw_car": {
               "filter": {
                  "term": {
                     "make": "bmw"
                  }
               },
                "aggs": {
                    "sales": {
                      "sum": {
                        "field": "price"
                      }
                    }
                  }
            },
            "bmw_percentage": {
                    "bucket_script": {
                        "buckets_path": {
                          "bmwSales": "bmw_car>sales",
                          "totalSales": "monthly_sum"
                        },
                        "script": "bmwSales / totalSales * 100"
                    }
                }
         }
      }

Response is –

{
   "key_as_string": "2014-01-01",
      "key": 1388534400000,
      "doc_count": 3,
       "monthly_sum": {
          "value": 185000
         },
        "bmw_car": {
          "doc_count": 2,
           "sales": {
              "value": 160000
                 }
             },
           "bmw_percentage": {
                  "value": 86.48
             }
        },

Thus for the month of Jan we can see 3 cars were sold and 2 were bmw and we get the % as 86.48.

Bucket Selector Aggregation
This parent pipeline aggregation is very useful in scenarios wherein you don’t want certain buckets in the output based on a conditions supplied by you. Total_sum greater than some X, or Percentage greater than some value, Count > X etc.
This is again a script based aggregation thus we would need to have scripting enabled. We just need to add the following snippet in the exact place where we added “sales_deriv” aggregation as this aggregation is also parent aggregation.

"sales_bucket_filter": {
    "bucket_selector": {
        "buckets_path": {
            "totalSales": "monthly_sum"
                  },
            "script": "totalSales >= 30000"
           }
        }
     }

and now in the output we would only see the months where monthly sale is over 30000.
Here’s the Gist for Posting the 16 initial records – https://gist.github.com/tarunsapra/d2e5338bfb2cc032afe6

Conclusion
There are lot of real world use-cases for the pipeline aggregations and these would surely help in getting more insights from the data store in ES. I haven’t covered Moving-average aggregation as that would be covered in a separate post as that’s too vast for this blog post. Feel free to share your feedback/comments.