blog image

Anomaly Detection Using Data Stored In Apache Druid

Apache Druid is an open source database which is well suited for real time, analytical workloads.  Druid is the database which underlies Timeflow, our real time event processing platform, which we make fully available to our customers to use for their steam processing or subsequent offline analysis.

In a previous article we discussed how to create simple visualizations using Plotly and Dash based on real time data stored within Druid.  We also added interactive features such as sliders and selectors to allow for interactive ad-hoc analysis of the data within the web application.  We then moved the project into Dash so we had a standalone application.  

To continue our example, we now wish to move to more sophisticated analysis examples, in this case anomaly detection, to explain how this logic can be injected into the process.  

Imagine that using the interactive application outlined above, our data scientists or business users have observed the obvious seasonal behavior as well as a number of potential anomalies in the data that occur throughout the business day. Our aim is to further investigate these seasonal patterns and the anomaly data in a more rigorous way.

The data scientist would like to fit a model to the time series which allows to capture the seasonal behavior and also to identify the anomalies automatically.  To do this, he decides to make use of Facebooks Prophet library.

Extract Druid via JSON HTTP API 

Similar to the last article, the first step is to extract our data of interest from Druid via our preferred HTTP API:

import json
import requests

# define the Druid URL
url = 'http://druid_ip:druid_port/druid/v2/?pretty'

# define the Druid query
query = {'queryType': 'scan', 
              'dataSource': 'table_name', 
              'intervals': ['2020-06-03T07:00:00.000Z/2020-06-03T12:00:00.000Z'],
              'granularity': 'all'}

# run the Druid query
results = json.dumps(requests.post(url, headers={'Content-Type': 'application/json'}, json=query).json()[1]['events'])

Convert Data Into Pandas and Perform Exploratory Analysis

Again, we will convert our dataset to a Pandas dataframe and carry out some exploratory analysis of the dataset to get a feel for the data returned.

# organize the results of the Druid query in a pandas data frame
import pandas as pd

df = pd.read_json(results, orient='records')
df = df[['__time', 'Value']]
df.rename(columns={'__time': 'time', 'Value': 'value'}, inplace=True)
df.sort_values(by='time', inplace=True)

df.head()
df.tail()
df.describe()
(df['value'] > 150).sum()

Anomaly Detection

In order to identify the anomalies, we fit a model to our time series and extract the corresponding 99.99% prediction interval. We then classify as anomalies all the data points outside the 99.99% prediction interval. Given that our previous analysis has shown that our time series displays intraday seasonal behavior, the Prophet model is an appropriate choice in our case.

The following code fits the Prophet model to our time series.

from fbprophet import Prophet
X = pd.DataFrame({'ds': df['time'], 'y': df['value']})
m = Prophet(interval_width=0.9999).fit(X)

The following code extracts the data frame with model predictions. The interpretation of the different data frame columns is the following: ‘ds’ is the time index, ‘yhat_lower’ is the lower bound of the 99.99% prediction interval, ‘yhat_upper’ is the upper bound of the 99.99% prediction interval, and ‘yhat’ is the model prediction or fitted value. For convenience we also include the actual values in the same data frame, which we call ‘ytrue’.

predictions = m.predict(X)
predictions = predictions[['ds', 'yhat_lower', 'yhat_upper', 'yhat']]
predictions['ytrue'] = df['value'].values
predictions.head()

The following code extracts the anomalies, which as outlined above are the data points falling outside the 99.99% prediction interval, i.e. either below the lower bound of the interval, or above the upper bound of the interval.

predictions['anomaly'] = np.where((predictions['ytrue'] < predictions['yhat_lower']) | (predictions['ytrue'] > predictions['yhat_upper']), True, False)
predictions.head()

Visualizing Anomalies with Plotly

We now wish to expose our anomalies to the business users so they can analyse the identified orders to understand what is special about them. The sample code below demonstrates how to do this:

import plotly.graph_objects as go

# create the layout
layout = {'plot_bgcolor': 'white', 
          'paper_bgcolor': 'white',
          'margin': {'t':10, 'b':10, 'l':10, 'r':10, 'pad':0},
          'yaxis': {'showgrid': True, 
                    'zeroline': False, 
                    'mirror': True, 
                    'color': '#737373', 
                    'linecolor': '#d9d9d9',
                    'gridcolor': '#d9d9d9',
                    'tickformat': '$,.0f'},
          'xaxis': {'range':[predictions['ds'].min(), predictions['ds'].max()],
                    'autorange': False,
                    'showgrid': True, 
                    'zeroline': False, 
                    'mirror': True, 
                    'color': '#737373', 
                    'linecolor': '#d9d9d9', 
                    'gridcolor': '#d9d9d9',
                    'type': 'date', 
                    'tickformat': '%d %b %y %H:%M', 
                    'tickangle': 0,
                    'nticks': 5}}

# create the traces
data = []

data.append(go.Scatter(x=predictions.query('anomaly == False')['ds'],
                       y=predictions.query('anomaly == False')['ytrue'],
                       mode='markers',
                       marker=dict(color='#000000', size=5),
                       name='Orders',
                       hovertemplate='<b>Orders</b><br>'
                       '<b>Time:</b> %{x|%d %b %Y %H:%M:%S}<br>'
                       '<b>Value:</b> %{y: $,.2f}<extra></extra>'))

data.append(go.Scatter(x=predictions.query('anomaly == True')['ds'],
                       y=predictions.query('anomaly == True')['ytrue'],
                       mode='markers',
                       marker=dict(color='#e83e8c', size=5), 
                       name='Anomalies',
                       hovertemplate='<b>Anomalies</b><br>'
                       '<b>Time:</b> %{x|%d %b %Y %H:%M:%S}<br>'
                       '<b>Value:</b> %{y: $,.2f}<extra></extra>'))

data.append(go.Scatter(x=predictions['ds'],
                       y=predictions['yhat'],
                       mode='lines',
                       line=dict(color='#8A348E', width=3, dash='dot'),
                       name='Model Predictions',
                       hovertemplate='<b>Model Predictions</b></br>'
                       '<b>Time:</b> %{x|%d %b %Y %H:%M:%S}<br>'
                       '<b>Value:</b> %{y: $,.2f}<extra></extra>'))

# create the figure
fig = go.Figure(data=data, layout=layout)

# display the figure
fig.show()

# export the figure
fig.write_html('model_plot.html')

And here is the finished interactive result:

As in our previous blog post, the user can hover on the data points with the mouse in order to display a tooltip with their corresponding time and value.

Furthermore, the user can double click on the legend items to switch on and off the visibility of the different series. For instance, double clicking on the ‘Orders’ and ‘Model Predictions’ legend items will remove them from the plot, allowing the user to focus only on the anomalies.

As with the previous post, this could of course be added into a richer Dash application where we need to create a more interactive experience.

Though it wasn’t a particularly sophisticated example of anomaly detection, hopefully this example builds on the previous one to demonstrate how we can integrate much more sophisticated analysis between Druid and Plotly using standard Python libraries and frameworks.

All code examples can be downloaded from this Github repository.