Archive for the ‘streaming SQL’ Category

Real-time QoS Monitoring for IP Services

Thursday, January 26th, 2012

QoS and service level monitoring has always presented a challenge for telecommunications companies. With the increase in uptake of IP voice and video services, the vast data volumes generated, and the lack of an end to end view, make monitoring the service experience in real-time increasingly difficult.

Real-time QoS Monitoring

Diagram 1: Real-time QoS Monitoring

In this blog I’m looking at the core building blocks of a real-time IP service monitoring solution by using a much simplified view of a real-time application. Diagram 1 illustrates the basic problem – how to monitor an IP service when the end to end view is only possible by piecing together large volumes of events from many different sources – the core network provider’s network, the home network and the cable modem, and the service providers platforms.

In SQLstream we capture each event stream in real-time. Applications are built as streaming pipelines – unlike a traditional database solution, where event data must first be stored and then processed, SQLstream streams the data through processing views, capturing, combining, filtering, aggregating and applying analytics to the events streams, without having to store the data.  This enables real-time operational intelligence with extremely high volume performance with very low latency.

The first views in the pipeline capture the data streams. A declaration for an external data feed is shown below, the real-time MyEvent stream, where source of events is the external system agent or integration adapter.


   CREATE OR REPLACE STREAM MyEvent (
       "eventName"    VARCHAR(10),
       "eventSeq"     BIGINT,
       "eventVal1"    INTEGER,
       "eventVal2"    SMALLINT,
       "eventVal3"    BIGINT
   ) DESCRIPTION 'source of events'; 

The raw MyEvent data is first filtered, searching for the events of interest. As illustrated in the code example below, these initial views tend to be as simple as possible in order to maximize reuse – the simplest being a SELECT STREAM * FROM WHERE statement. Streams can be combined, grouped or joined in a single view, or a single view provided per stream, or both.


   CREATE OR REPLACE VIEW RawEvents AS
        SELECT STREAM *
        FROM MyEvent
        WHERE "eventName" = 'RawEvent';

Diagram 2 illustrates the concept of the streaming data pipeline, using a simplified example for exception detection. The SQL view illustrated above is the Stream Capture #1 view in the diagram. The use case is built on a real world example, raising an exception if a number of events of a particular type or value are detected within a specified time window.

Real-time Stream Processing Pipeline

Real-time Stream Processing Pipeline

The second view in the pipeline, Stream Processor #1, is shown below. In this example the view is responsible for the basic processing of the stream, counting the number of events that occur within a time window, in this case 180 seconds.


  CREATE OR REPLACE VIEW CountedEvents AS
   SELECT STREAM
      *,
      COUNT("eventName") OVER win AS "eventCount",
      FIRST_VALUE(RE.ROWTIME) OVER win AS "firstEventTime",
      FIRST_VALUE("eventSeq") OVER win AS "firstEventSeq"
      FROM RawEvents AS RE
   WINDOW win AS (RANGE INTERVAL '180' SECOND(3) PRECEDING);

The final stage in this particular processing pipeline is the detection of the alert.

  CREATE OR REPLACE VIEW FlagTriggerEvents AS
      SELECT STREAM *,
          "eventCount" >= 3 AS "alert"
      FROM CountedEvents;

It would of course be possible to include all processing in a single view. However, maximizing reuse of views is a major consideration when building a stream processing application. The example is to illustrate how a pipeline can be constructed, where each view can have any number of consumers. For example, any number of Rule views can read from the Stream Processor #1 view, and any number of views can read directly from the stream capture view.

The application includes significantly more sophisticated integrations, features and analytics than illustrated here. For example:

  • Multiple rules
  • Recording and forwarding the events responsible for the generation of the alerts
  • Detect escalation
  • Detect clearance events
  • Join with alert history to identify exceptional events that deviate significantly from historical norms

These use cases are important components of a complete solution, and I’ll be providing examples in subsequent blogs, explaining how these have been implemented.

Tutorial: Streaming applications: Geospatial Visualization – Part 1

Friday, October 7th, 2011

A streaming SQLstream application will feel very familiar to anyone with some basic knowledge of SQL and traditional RDBMS applications.  SQLstream uses standards-based SQL, except that streaming SQL queries run forever, processing data as they arrive over specified time windows.

This blog is the first in a series of tutorials for SQLstream developers, describing how to build a streaming SQL applications.  Over the coming months, these tutorials will address the different components of streaming data applications, and provide worked examples and guidance.

Streaming Visualization, Part 1: Setting up

We’ll begin the series by looking at a typical streaming use case – displaying real-time sensor data on a map.  We have a source of geo-located data flowing in SQLstream that we’d like to visualize. Using Google Earth and Ruby on Rails, I’ll demonstrate an easily-implemented solution with lots of room for expansion.

Google Earth - Real-time streaming data visualization

Google Earth - Real-time streaming data visualization

For this example, our approach is to connect the SQLstream pipeline to Google Earth using a staging database–a common deployment scenario. We’ll be using PostgreSQL for the staging database, but MySQL or any other database supported by Rails will work. A SQLstream pump will use TableUpdate to write a record of latitude, longitude, and description for each event to a display list in PostgreSQL. When Google Earth places a web request for data, Rails will service the request by rendering the contents of the display list as KML, Earth’s dialect of XML. We’ll start with SQLstream, Ruby, and PostgreSQL already installed and focus on what’s necessary to get them all talking to each other.

Getting the Data

With SQLstream installed, make sure all of the distributed plugins are installed (if you haven’t done this already) and start the server:

linux> cd $SQLSTREAM_HOME/plugin/autocp
linux> ln -s ../*.jar .
linux> cd $SQLSTREAM_HOME
linux> bin/sqlstreamd

We’re going to get our data from a web feed of recent earthquakes provided by the US Geological Survey. In another shell:

linux> cd $SQLSTREAM_HOME/examples/webfeed
linux> sqllineClient < webfeed.sql
linux> sqllineClient < usgs.sql

We now have several streams available to us within SQLstream, the one we want to visualize is SmallQuakesDay, which includes columns containing the location (‘point’ as lat/lon) and magnitude (‘mag’) of the quake.

Creating the Display List

We’ll use Rails to do all of the work of creating the display list. If you don’t have Rails installed yet, start by installing Ruby’s Gem package management system (in Ubuntu, this is the rubygems package). You’ll also need the development files for Postgres installed (postgres-server-dev in Ubuntu). You can now use gem to install rails and associated tools with this command:

linux> gem install mongrel rails pg rubyzip

I recommend that you add gem’s bin directory to your path (on my system it’s /var/lib/gems/1.8/bin) so that the commands ‘rails’, ‘rake’, and ‘bundle’ are found. Create an empty directory to work in (we’ll call it ‘$QUAKE’ here), cd there, and create a new rails server in the sub-directory ‘quakekml’ with these commands:

linux> cd $QUAKE
linux> rails new quakekml -d postgresql

You can test the server by starting it with these commands and visiting http://localhost:3000 in a web browser:

linux> cd $QUAKE/quakekml
linux> rails server

Use ^C to shut the server down so we can configure the database access. Edit the file $QUAKE/quakekml/config/database.yml, it should already contain sections describing the development, test, and production databases. Edit the username and password settings in each section to match a user you’ve configured in Postgres who can create databases. The only database we’ll be using is ‘quakekml_development’, but Rails will create all three when you issue this command:

linux> rake db:create:all

Create a display list consisting of a timestamp, lat/lon, and magnitude for each quake with the commands:

linux> rails generate scaffold quake_event when:timestamp lat:float lon:float mag:float
linux> rake db:migrate

You now not only have an empty table in Postgres, you also have a full web interface for viewing and editing that table. Start the server again and visit http://localhost:3000/quake_events to see it. Our next steps are to generate KML for Google Earth from this table, and to feed the table from SQLstream. The scaffolding created by Rails is a handy debugging tool we can use to inspect the table and manually add items to test the visualization.

Next time

Parts 2 and 3 of the visualization tutorial will be published over the coming weeks.  Part 2 focuses on how to render streaming analytics in Google Earth, and the final part of the tutorial will discuss how to get the data flowing.

Real-time congestion detection with Streaming SQL

Wednesday, August 31st, 2011

I am going to discuss a SQLstream application for monitoring traffic flow in real-time. In this application, vehicles with GPS enabled devices transmit vehicle position along with other vehicle information such as speed and engine state. SQLstream receives this information as a real-time data stream and uses streaming SQL analytics to detect and predict the rapid onset of congestion on the road network in real-time.

Streaming SQL for Congestion Detection
The SQLstream application for congestion detection uses a typical streaming SQL processing pipeline. In this case, data is fed into the SQLstream pipeline using our Log File Adapter. SQLstream adapters provide an interface to sources and targets such as databases, log files, network sockets and mail servers. Adapters are built using SQL/MED specification which is part of ANSI SQL standard. In this application, each log file contains the vehicle positions on the road network for the latest minute.

Streaming SQL Pipeline for Real-time Traffic Congestion Detection (click to enlarge)

The conditioning pipeline performs data cleansing operations such as rejecting poor quality data (records with missing or out-of-bounds columns) followed by mapping of vehicle positions (lat/long pair) to a “road element” of the road network using a UDX to perform geo-spatial lookups in an external road network database.

The diagram and the example SQL below show our implementation of a streaming SQL pipeline for congestion detection. Each vehicle reports its position and speed every minute. Two consecutive vehicle positions are then used to interpolate vehicle speeds for each road element on the vehicle path between reporting positions. The interpolated speed is based on actual distance traveled by the vehicle between two consecutive reports. The interpolated speed is calculated in a User Defined Transform(UDX). The UDX is written in Java. The UDX also associates a confidence factor with each interpolated speed value based on the position of the road element relative to endpoints of the vehicle path.

Streaming Traffic Flow Analytics
As illustrated below, the analytics pipeline calculates 15, 5, 4, 3, 2 & 1 minute moving average speeds for each road element. Each road element is color coded based on the 15-minute moving average speed. The results are streamed to a Google Earth display.

CREATE OR REPLACE VIEW “EstimatedReSpeeds” AS
SELECT STREAM “RE”, “reID”, “Carriageway”, “rePrescribed”, “reSpeedLimit”,
++SUM(“reVehicles”) OVER “last1″ AS “reVehiclesLast1″,
++SUM(“reVehicles”) OVER “last2″ AS “reVehiclesLast2″,
++SUM(“reVehicles”) OVER “last3″ AS “reVehiclesLast3″,
++SUM(“reVehicles”) OVER “last4″ AS “reVehiclesLast4″,
++SUM(“reVehicles”) OVER “last5″ AS “reVehiclesLast5″,
++SUM(“reVehicles”) OVER “last15″ AS “reVehiclesLast15″,
++SUM(“reSpeed” * “reConfidence”) OVER “last1″ /
++SUM(“reConfidence”) OVER “last1″ AS “reSpeedLast1″,
++SUM(“reSpeed” * “reConfidence”) OVER “last2″ /
++SUM(“reConfidence”) OVER “last2″ AS “reSpeedLast2″,
++SUM(“reSpeed” * “reConfidence”) OVER “last3″ /
++SUM(“reConfidence”) OVER “last3″ AS “reSpeedLast3″,
++SUM(“reSpeed” * “reConfidence”) OVER “last4″ /
++SUM(“reConfidence”) OVER “last4″ AS “reSpeedLast4″,
++SUM(“reSpeed” * “reConfidence”) OVER “last5″ /
++SUM(“reConfidence”) OVER “last5″ AS “reSpeedLast5″,
++SUM(“reSpeed” * “reConfidence”) OVER “last15″ /
++SUM(“reConfidence”) OVER “last15″ AS “reSpeedLast15″
FROM “Stage3″
WINDOW “last1″ AS (PARTITION BY “RE”
++RANGE INTERVAL ‘1′ MINUTE PRECEDING),
+++++“last2″ AS (PARTITION BY “RE”
++RANGE INTERVAL ‘2′ MINUTE PRECEDING),
+++++“last3″ AS (PARTITION BY “RE”
++RANGE INTERVAL ‘3′ MINUTE PRECEDING),
+++++“last4″ AS (PARTITION BY “RE”
++RANGE INTERVAL ‘4′ MINUTE PRECEDING),
+++++“last5″ AS (PARTITION BY “RE”
++RANGE INTERVAL ‘5′ MINUTE PRECEDING),
+++++“last15″ AS (PARTITION BY “RE”
++RANGE INTERVAL ‘15′ MINUTE PRECEDING);

Detecting the rapid onset of congestion
Congestion is detected by comparing moving averages for the larger time window with that for the smaller time window. For example, comparing a 2-minute average with a 1-minute average:

CREATE OR REPLACE VIEW “CongestionRule1″ AS
SELECT STREAM
++–- name, ID, highway name, speed limit etc. for each road element
++“RE”, “reID”, “Carriageway”, “rePrescribed”, “reSpeedLimit”,
++–- volume of vehicle reports in each time window
++“reVehiclesLast1″, “reVehiclesLast2″, “reVehiclesLast3″,
++“reVehiclesLast4″, “reVehiclesLast5″, “reVehiclesLast15″,
++–- estimated avg speed for each road element
++“reSpeedLast1″, “reSpeedLast2″, “reSpeedLast3″,
++“reSpeedLast4″, “reSpeedLast5″,”reSpeedLast15″
FROM “EstimatedReSpeeds”
WHERE “reSpeedLast1″ < 0.80 * “reSpeedLast2″ AND – slowdown by 20 %
++“reSpeedLast2″ < 0.80 * “reSpeedLast3″ AND
++“reSpeedLast3″ < 0.80 * “reSpeedLast4″ AND
++“reSpeedLast4″ < 0.80 * “reSpeedLast5″ ;

SQLstream Traffic Congestion Detection - Visualization

SQLstream Traffic Congestion Detection - Visualization (click to enlarge)

Note that these estimated speeds are over overlapping windows and as such slowdown thresholds are set accordingly.

Fine tuning slowdown thresholds and other information, such as the proximity of traffic lights and the volume of vehicle reports in each time window, improves the quality of congestion detection algorithm.

The Google Earth screenshot illustrates real-time traffic view as well as detected slowdowns as pins. The severity of the slowdown is indicated by different shades of red.

Calculating Decaying Averages in Streaming SQL

Tuesday, August 23rd, 2011

At SQLstream we have a comprehensive implementation of standard SQL windowing operations such as SUM, COUNT and AVG. Recently though we needed a more sophisticated function for a decaying weighted average that would emphasize more recent samples over older samples. We implemented a new EXP_AVG() operation, as shown in this example query:

SELECT rowtime, ticker, price,
++EXP_AVG(price, INTERVAL ‘10′ SECOND) OVER w
FROM t
WINDOW w AS (PARTITION BY ticker
+++++++++++++ORDER BY rowtime
+++++++++++++RANGE INTERVAL ‘30′ SECOND PRECEDING);

EXP_AVG takes a value expression and an interval constant half life. In this example, two samples within the WINDOW are separated by 10 seconds, the older one will be given half as much weight as the newer one.

How would this look in standard SQL without an EXP_AVG function?

I thought it would be interesting to look at how a decaying average would be implemented in SQL without an EXP_AVG operation, as under the covers we implement it using standard SQL windowed sums. If we knew that the times for our rows would always fall under a narrow range, we would increase the weights of the samples as time goes forwards and simply scale down the total (rather than lowering the weights of samples as time goes backwards from the most recent sample). This would be straightforward. The SQL would look something like this.

First, we need a function to turn our time based units into something on which we can do arithmetic:

CREATE FUNCTION toSeconds(t TIMESTAMP, offset TIMESTAMP)
RETURNS DECIMAL(12,3) CONTAINS SQL
RETURN CAST((t – offset) SECOND(9,3) AS DECIMAL(12,3));

Then, we need a function for calculating exponential weight:

CREATE FUNCTION expWeight (seconds DOUBLE, halfLife DOUBLE)
RETURNS DOUBLE CONTAINS SQL
RETURN EXP(seconds * (LN(2)/halfLife));

The complete SQL will then be:

SELECT rowtime, ticker,
++SUM(price * expWeight(toSeconds(rowtime, OFFSET), 10)) OVER w
++/ SUM(expWeight(toSeconds(rowtime, OFFSET), 10)) OVER w
++as avgPrice
FROM t
WINDOW w AS (PARTITION BY ticker
+++++++++++++ORDER BY rowtime
+++++++++++++RANGE INTERVAL ‘30′ SECOND PRECEDING);

Unfortunately this would not work in practice as the expWeight function would overflow once the row times got more than a few halfLifes advanced from OFFSET. We can’t reset the offset as long as there are any non zero values still in the window. This gives us our out. What we can do is partition the incoming values into two windowed aggregates, always sending zeros to one of them and switching and reseting the offset when the window for that aggregate fills with zeros, this way we’re never summing values whose weights are calculated using different starting offsets.

Here’s the SQL for this. We’ll need some more functions describing when and how we reset our offsets. We’ll need an arbitrary reference for our time calculations:

CREATE FUNCTION toSeconds(t TIMESTAMP)
RETURNS DECIMAL(12,3) CONTAINS SQL
RETURN CAST((t – TIMESTAMP ‘1970-01-01 00:00:00′) SECOND(9,3)
+++++++++++AS DECIMAL(12,3));

To use modulo arithmetic in SQL, convert to an integer:

CREATE FUNCTION toMillis(t TIMESTAMP)
RETURNS BIGINT CONTAINS SQL
RETURN CAST(toSeconds(t))*1000 aS BIGINT);

We’ll want to partition time into window sized epochs starting at our arbitrary time reference:

CREATE FUNCTION isEvenEpoch (t TIMESTAMP, windowSeconds INT)
RETURNS BOOLEAN CONTAINS SQL
RETURN MOD(toMillis(t),
++windowSeconds*2000)< windowSeconds*1000;

We’ll use the start of each epoch as the offset for all rows that fall in that epoch:

CREATE FUNCTION epochStart (t TIMESTAMP, windowSeconds INT)
RETURNS decimal(12,3) CONTAINS SQL
RETURN CAST(MOD(toMillis(t), windowSeconds*1000)
+++++++++++AS DECIMAL(12,3))/1000;

The following two functions are used to correct the inserting of zeros into a window and all the non zero values still in the window are from the previous epoch:

CREATE FUNCTION evenAgingFactor (t TIMESTAMP,
+++++++++++++windowSeconds INT, halflife DOUBLE)
RETURNS DOUBLE CONTAINS SQL
RETURN CASE WHEN isEvenEpoch(t, windowSeconds)
+++++++++++++THEN 1
+++++++++++++ELSE expWeight(-windowSeconds, halflife) END;
CREATE FUNCTION oddAgingFactor (t TIMESTAMP,
+++++++++++++windowSeconds INT, halflife DOUBLE)
RETURNS double CONTAINS SQL
RETURN CASE WHEN isEvenEpoch(t, windowSeconds)
+++++++++++++THEN expWeight(-windowSeconds, halflife)
+++++++++++++ELSE 1 END;

The complete SQL that uses the two window approach to calculate the decaying average using a 30 second window and a 10 second half life is:

SELECT rowtime, ticker,
+++(oddAgingFactor(rowtime, 30, 10) *
++++SUM(CASE WHEN isEvenEpoch(rowtime, 30)
++++++++THEN 0
++++++++ELSE price * expWeight(epochStart(rowtime, 30), 10) END) OVER w
+++++ evenAgingFactor(rowtime, 30, 10) *
++++SUM(CASE WHEN isEvenEpoch(rowtime, 30)
++++++++THEN price * expWeight(epochStart(rowtime, 30), 10)
++++++++ELSE 0 END) OVER w)
++++/
++++(oddAgingFactor(rowtime, 30, 10) *
++++SUM(CASE WHEN isEvenEpoch(rowtime, 30)
++++++++THEN 0
++++++++ELSE expWeight(epochStart(rowtime, 30), 10) END) OVER w
+++++ evenAgingFactor(rowtime, 30, 10) *
++++SUM(CASE WHEN isEvenEpoch(rowtime, 30)
++++++++THEN expWeight(epochStart(rowtime, 30), 10)
++++++++ELSE 0 END) OVER w)
as avgPrice
FROM t WINDOW w AS (PARTITION BY TICKER
++++++++++++++++++++ORDER BY rowtime
++++++++++++++++++++RANGE INTERVAL ‘30′ SECOND PRECEDING);

To understand how this would work in practice lets assume we start our query at 12:00. We’ll only look at the numerator of our query as the denominator uses the same technique to calculate a weighted count.

Time Window A state Window B state
12:00:00 Empty Empty
12:00:00 – 12:00:30 Values with increasing weights being added. Zeros being added.
12:00:30 Has weighted sum * expWeight(30,10) Zero
12:00:30 – 12:01:00 Zeros being added. Will have older part of weighted sum. Values with increasing weights being added. Will have newer part of weighted sum.
12:01:00 Zero. All non zero values will have been aged out. We can safely reset our scaling factor. Has weighted sum * expWeight(30,10)

With care to apply appropriate scaling to the parts this SQL can be used to calculate a weighted average using standard windowed operations. However, I think you’ll agree, it’s easier with our EXP_AVG() operation.

Real-Time Seismic Monitoring in the Cloud with SQLstream

Wednesday, July 20th, 2011

SQLstream is helping to predict earthquakes across the world in real time. The system has been developed by a consortium of universities and government agencies, with funding from NSF (National Science Foundation), to provide an infrastructure of networked tools for research in ocean science – constructing an internet-based system to collect and share data.

Ocean Research Program Overview

Real-time Seismic Monitoring Program

This is a large system with 16,000 land and sea-based sensors, each sending several channels of seismic event data at a rate of 40 data points per second. The application executes in an Amazon EC2 Cloud on a cluster of SQLstream servers, connected by an AMQP message bus. SQLstream’s AMQP adapter (built with RabbitMQ) enables the streaming SQL application to view the AMQP bus as a domain of input and output data streams. The initial prototype was upgraded to a full-scale system running on a cluster of servers without any changes to the streaming SQL application.

SQLstream’s contribution to the project, an application that processes seismographic data in real-time, demonstrates:

  1. An operational deployment of streaming SQL for scientific calculations in real-time
  2. Real-time distributed processing in an Amazon EC2 cloud using SQLstream and AMQP
  3. Rapid development and rollout of real-time data applications using standards-based streaming SQL.

Seismic Monitoring
The sensor network contains about 16,000 seismic sensors, organized into grids, covering large parts of the North American continent and the adjacent oceans (see illustration below). Each sensor measures the motion of the ground under it in three dimensions, and transmits its data as several digitized channels, typically sampled 40 times a second.

Seismic Sensor Map

Seismic Sensor Map

While the rate of each signal channel is modest (since seismic waves are low-frequency),
this adds up to a large amount of data to process in real time. Moreover, the rules for detecting a seismic event are heuristics that apply to a time interval of several minutes: so the application has to calculate some quantities from the raw data and to store these calculated values over a time window.

But to detect an earthquake reliably, it’s better to monitor all the sensors at once, looking for a disturbance in the signals that first appear in one place, and then appear in nearby places: a disturbance signal that propagates and changes shape in a way consistent with the physics of a seismic wave.

Real-time sensor network management in an EC2 Cloud
Monitoring tens of thousands of signal channels arriving at 40 sample points per second is a complicated problem, but it can be made simpler by breaking it into stages. We’re interested in earthquakes, which are infrequent, so SQLstream first reduces the amount of data by scanning each channel for patterns that suggest the beginning, the peak or the end of a quake: in other words reduce the dense signal to a sequence of interesting events. Then we can look for events detected on other channels that could be due to the same quake propagating in physical space and time.

In the first phase, we built a real-time seismic event detector in streaming SQL.
We translated a scientific algorithm into streaming SQL, and connected to the scientific sensor data infrastructure – using our AMQP adapter. This involved less than 100 lines of streaming SQL.

In the second phase, the prototype was scaled up to a full-size system, dealing with 16,000 sensor channels, running on multiple SQLstream server nodes created automatically in an Amazon Elastic Cloud. This expansion required no changes to the streaming SQL application developed for the initial prototype – simply running the same streaming SQL application inside an elastic container/manager.

Real-time Event Detection with Streaming SQL
The sensor data processing pipeline has five functional stages:

  1. Reading Messages – over the AMQP adapter. A configuration parameter specifies which “topics” SQLstream subscribes to, that is, which set of sensor channels it receives.
  2. Unpack Data – a user defined transform (UDX) unpacks data channel messages into individual data points.
  3. Signal Processing – extract higher order information from the raw data to identify seismic events given background noise and non-seismic ‘bumps’. An example function would be to calculate the ratios of multiple rolling averages of the signal value (x) over different time windows.

    Plot of Seismic Events

  4. Event Detection – The next stage applies heuristic rules to the processed data streams – the output is much sparser: a stream of significant events, each indicating a possible start/peak/stop of a seismic wave on a particular channel. If events of the correct type occur within a correct interval of each other (as shown in the Signal Plot Diagram), they are accepted as significant.
  5. Writing Messages – output (publish) significant events over the AMQP adapter.

Automatic EC2 scaling for Big Data sensor volume
The pipeline described has been proven to scale well to handle even greater data volume:

  1. Channels are processed independently, so scale as O(N).
  2. Additional channels are processed by adding more pipelines
  3. Each pipeline starts and ends with AMQP messages – the SQLstream / AMQP interoperability has been shown to scale well.
  4. To add a pipeline, we simply add another Elastic Compute server, running the same pipeline streaming SQL, but configured to subscribe to its own set of sensor channels.

This is the first part of a series of blogs describing the seimic monitoring solution. The next blogs will focus on the streaming SQL used in the application, and the SQLstream / AMQP architecture for Big Data scalability.

SQLstream and Mozilla Firefox 4, a look at the SQL behind the scene

Wednesday, March 23rd, 2011

Since Tuesday’s announcement that the Firefox Download Monitor is powered by SQLstream, we’ve received a number of questions about how it all fits together. I hope this description helps answer some of those questions.

Mozilla Firefox Real-Time Download Monitor - Day 2

Mozilla's Firefox Real-Time Download Monitor - Day 2

SQLstream server executes SQL statements, just like standard SQL, except the SQLstream’s queries run continuously, analyzing input data in real-time as it arrives. Statements are presented via JDBC or user friendly tools which use JDBC internally. Statements are compiled/prepared, the planner/optimizer chooses an access plan, and a runtime engine executes the plan. SQLstream is compliant with SQL 2008 and 2003 with just a couple of extensions. One extension includes the keyword STREAM as part of a SELECT statement. The STREAM keyword indicates that the results are continuously streaming rather than a point in time TABLE.

Applications in SQLstream are constructed out of a set of SQL CREATE STREAM statements and SQL VIEWS against streams and other views. Those statements are assembled into a pipeline. When describing a pipeline we refer to statements on the source side as being upstream, and statements closer to the destination as being downstream.

In the middle of the pipeline, we define a stream named FirefoxDownloadStream_ which contains the results of the parsed and conditioned download events. The stream declaration is identical to a table definition with the exception of the type of the object being a STREAM rather than TABLE.

CREATE STREAM "FirefoxDownloadStream_" (
+++"download_type"++++++++++VARCHAR(15),
+++"utc_timestamp"++++++++++TIMESTAMP,
+++"product_name"+++++++++++VARCHAR(12),
+++"product_version"++++++++VARCHAR(12),
+++"product_major_version"++VARCHAR(12),
+++"product_os"+++++++++++++VARCHAR(10),
+++"locale_code"++++++++++++VARCHAR(5),
+++"country_code"+++++++++++VARCHAR(2),
+++"city_name"++++++++++++++VARCHAR(32),
+++"region_code"++++++++++++VARCHAR(2),
+++"longitude"++++++++++++++VARCHAR(8),
+++"latitude"+++++++++++++++VARCHAR(8)
);

The stream is populated with a SQL INSERT-SELECT statement. Again, standard SQL statements are used. The WHERE clause defines that the downloads include new first time downloads, complete upgrades of prior versions of Firefox, or partial upgrades of prior versions of Firefox.

INSERT INTO "FirefoxDownloadStream_"
++("download_type",
+++"utc_timestamp",
+++"product_name",
+++"product_version",
+++"product_major_version",
+++"product_os",
+++"locale_code",
+++"country_code",
+++"city_name",
+++"region_code",
+++"longitude",
+++"latitude"
++)
SELECT STREAM
+++"dlType"+++AS "download_type",
+++"dlTime"+++AS "utc_timestamp",
+++"product"++AS "product_name",
+++"version"++AS "product_version",
+++"GetMajorVersion"("version") AS "product_major_version",
+++"os"+++++++AS "product_os",
+++"lang",++++AS "locale_code",
+++"cc",++++++AS "country_code",
+++"city",++++AS "city_name",
+++"rg",++++++AS "region_code",
+++CAST("latitude" AS VARCHAR(10)) AS "latitude",
+++CAST("longitude" AS VARCHAR(10)) AS "longitude"
FROM "FirefoxCountryFilter"
WHERE (("dlType" IS NULL) OR ("dlType" = 'complete') OR ("dlType" = 'partial'));

The download events contain the time of each download. Mozilla has a number of download servers feeding the worldwide requests to download Firefox. Each of these servers feeds the results of the download requests to a common logfile which is “tailed” by SQLstream. As the time for each download differs due to each client’s network capacity, the download requests may be slightly out of order. In practice the biggest gap we’ve seen is 4 seconds. Since we’re measuring downloads over the long period of time, it was deemed sufficient to adjust the download time of late arrivals to match the most recent download time.
The following SQL statement does that adjustment.


CREATE OR REPLACE VIEW "FirefoxDownloadStream" AS
+++SELECT STREAM MAX("utc_timestamp") OVER(ROWS UNBOUNDED PRECEDING)
++++++++++AS ROWTIME,
++++++++++*
+++FROM "FirefoxDownloadStream_";

SQLstream associates a ROWTIME with each row in a STREAM. The ROWTIME is a monotonically increasing SQL timestamp. In the default case, the ROWTIME is the current time expressed in UTC. Most applications require time to be defined according to time associated with the data itself. Associating the ROWTIME of a row in a stream based on the data contents of the row, is done by the AS ROWTIME clause for an individual column. In the Mozilla pipeline, we set the ROWTIME to be the maximum of the values in the “utc_timestamp” column to be that rows ROWTIME.
The analytics portion of the pipeline is implemented with a standard SQL statement. For example, each 10 seconds the number of downloads for each product, version, … country, city, region is calculated.


CREATE OR REPLACE VIEW "FirefoxStreamForLocationCounters"
DESCRIPTION 'Compute product counters for a minute' AS
+++SELECT STREAM
++++++++++"download_type",
++++++++++"product_name",
++++++++++"product_major_version",
++++++++++"product_version",
++++++++++"country_code",
++++++++++"region_code",
++++++++++"city_name",
++++++++++"latitude",
++++++++++"longitude",
++++++++++count(*) AS "count"
+++FROM "FirefoxDownloadStream" F
+++GROUP BY FLOOR(F.ROWTIME TO MINUTE),
++++++++++++FLOOR(F.ROWTIME - INTERVAL '10' SECOND TO MINUTE),
++++++++++++FLOOR(F.ROWTIME - INTERVAL '20' SECOND TO MINUTE),
++++++++++++FLOOR(F.ROWTIME - INTERVAL '30' SECOND TO MINUTE),
++++++++++++FLOOR(F.ROWTIME - INTERVAL '40' SECOND TO MINUTE),
++++++++++++FLOOR(F.ROWTIME - INTERVAL '50' SECOND TO MINUTE),
++++++++++++"product_name",
++++++++++++"download_type",
++++++++++++"product_major_version",
++++++++++++"product_version",
++++++++++++"country_code",
++++++++++++"region_code",
++++++++++++"city_name",
++++++++++++"latitude",
++++++++++++"longitude";

There is a similar view declaration where similar calculations are done for each product. Most of the interest since Tuesday is of course related to Firefox 4.0 downloads. This second view allows Mozilla to drill down on downloads by platform as well as downloads for previous (and future) Firefox versions.


CREATE OR REPLACE VIEW "FirefoxStreamForProductCounters"
DESCRIPTION 'Compute product counters for a minute' AS
+++SELECT STREAM
++++++++++"download_type",
++++++++++"product_name",
++++++++++"product_major_version",
++++++++++"product_version",
++++++++++"product_os",
++++++++++count(*) AS "count"
+++FROM "FirefoxDownloadStream" F
+++GROUP BY FLOOR(F.ROWTIME TO MINUTE),
++++++++++++FLOOR(F.ROWTIME - INTERVAL '10' SECOND TO MINUTE),
++++++++++++FLOOR(F.ROWTIME - INTERVAL '20' SECOND TO MINUTE),
++++++++++++FLOOR(F.ROWTIME - INTERVAL '30' SECOND TO MINUTE),
++++++++++++FLOOR(F.ROWTIME - INTERVAL '40' SECOND TO MINUTE),
++++++++++++FLOOR(F.ROWTIME - INTERVAL '50' SECOND TO MINUTE),
++++++++++++"product_name",
++++++++++++"download_type",
++++++++++++"product_major_version",
++++++++++++"product_version",
++++++++++++"product_os";

Each of these views (FirefoxStreamForLocationCounters and FirefoxStreamForProductCounters) is based on the FirefoxDownloadStream. Each defined stream and view is a point where the application can access data either directly or via another VIEW or INSERT…SELECT.

One component of the solution is a piece of code we call the HBaseAgent. The agent uses the JDBC interface to SQLstream and issues a SELECT * FROM each of the described views containing the location and product counter 10-second download counts. The HBaseAgent maps each fetched row to the HBase schema as defined by Mozilla.

I write this blog about 24 hours after Firefox 4 launched. So far there are more than 8 million downloads of Firefox 4. It certainly has been an exciting day for Mozilla and I congratulate everyone who contributed. I’m happy that SQLstream has been able to contribute to their success.

SQLstream and Mozilla Firefox 4.0

Tuesday, March 22nd, 2011

SQLstream has been powering Mozilla’s Firefox Download Monitor since 2009. A SQLstream based application has been continually aggregating hundreds of millions of download events, receiving minute by minute aggregations via a continuously running SQL SELECT statement using the SQLstream JDBC driver. A continuously running SELECT statement is syntactically and semantically identical to other SELECT statements with the addition that end of data is never returned in SQLSTATE by the FETCH associated with the cursor.

Mozilla 4.0 Real-Time Download Monitor is Powered By SQLstream

Mozilla 4.0 Real-Time Download Monitor is Powered By SQLstream

For the launch of Firefox 4, Mozilla again turned to SQLstream to enhance the download monitor. Applications in SQLstream are built by defining a series of SQL stream definitions and SQL views. Business rules are embedded in these definitions. Definitions are assembled into a pipeline and each definition provides a point where data is available to applications. A stream definition is analogous to a SQL table definition and contains the column names and data types for each defined element. Each stream has an implicit ROWTIME column, a monotonically increasing value associated with the data in each column.

The download monitor tails the log files written by all of Mozilla’s download servers which provide new versions of Firefox. Each entry in the log is parsed. The IP address of each download is converted to a country, city, region, latitude, and longitude. For the Firefox 4 release, Mozilla wanted the results of the download aggregation to be stored in an HBase table in their HBase/Hadoop cluster. Storing the data allows future historical analysis to complement the realtime analysis provided by SQLstream.

SQLstream aggregates downloads to two separate column families in a single HBase table. The ‘product’ column family contains the overall download count. The ‘location’ column family contains the count of downloads for each country, region, city, latitude, longitude.

SQLstream uses a GROUP BY clause along with a COUNT(*) to calculate the number of downloads for each 10 seconds. The author wrote a new piece of SQLstream code which provides an interface from SQLstream to HBase. The HBaseAgent maps the results of the GROUP BY and calls the HBase API to persist data in HBase. The incrementColumnValue API is key in that it allows SQLstream to aggregate download counts on a realtime basis and efficiently update HBase by providing incremental values.

The application periodically reads data from HBase and sends formatted data to each connected browser. See for yourself at http://glow.mozilla.org/. The map provides a running count of Mozilla Firefox 4.0 download with raindrops on the map indicating each location where one or more downloads have just occurred. As I write this blog entry, I can see that Europe and Asia are hot while North America is just waking up. Clicking on the colored rings in the lower left hand corner of the map, allows drill down to the geographic locations.

Mozilla’s Daniel Einspanjer has also blogged about their new real-time download vizualization application. The blog explains the overall architecture of the real-time application using SQLstream, and the SQLstream integration with HBase.

You can read more about the previous Firefox 3 download monitor on Julian Hyde’s blog. Julian is the CTO of SQLstream.

Mozilla also blogged about the history of the Firefox 3 download monitor on the Mozilla Webdev blog.

Concepts in Streaming SQL

Tuesday, October 26th, 2010

A streaming SQL query is a continuous, standing query that executes over streaming data. Data streams are processed using familiar SQL relational operators augmented to handle time sensitive data. Streaming queries are similar to database queries in how they analyze data; they differ by operating continuously on data as they arrive and by updating results in real-time.

Streaming SQL queries process dynamic, flowing data, in contrast to traditional RDBMSs, which process static, stored data with repeated single-shot queries. Streaming SQL is simple to configure using existing IT skills, dramatically reducing integration cost and complexity. Combining the intuitive power of SQL with this simplicity of configuration enables much faster implementation of business ideas, while retaining the scalability and investment protection important for business-critical systems.

By processing transactions continuously, streaming SQL directly addresses the real-time business needs for low latency, high volume, and rapid integration. Complex, time-sensitive transformations and analytics, operating continuously across multiple input data sources, are simple to configure and generate streaming-analytics answers as input data arrive. Sources can include any application inputs or outputs, or any of the data feeds processed or generated within an enterprise. Examples include financial trading data, internet clickstream data, sensor data, and exception events. SQL can process multiple input and output streams of data, for multiple publishers and subscribers. To learn more about Streaming SQL, please read our “Concepts in Streaming SQL” mini-white paper.

How SQLstream Can Reduce Complex Business Logic to a Single SQL Query

Monday, October 18th, 2010

In the game industry, complex game logic needs to be applied to streams of events generated by gameplay.  In single player games, this logic is simply handled by applying the correct computations.  However, in an Internet based social game where millions of players interact together online, the problem takes on an entirely different dimension.  Storing the game events on disk inside of a database becomes increasingly difficult as the rate of gameplay events increases.  Logic and computation must be applied to the data and a disparate set of data must be queried to correctly update the game state.

The solution can be surprisingly simple and similar in form to storing all gameplay events in a traditional database.  SQLstream’s powerful streaming and windowed aggregation capabilities can reduce this use case and complex logic to a single query.

For example, consider a game where users create videos that are viewed and rated by other users:

  • A score for the video must be computed based upon the last 4 weeks of gameplay.
  • Let’s say a view is worth 12 points and various ratings levels are worth between 0 and 30 points.
  • The content’s score is the total points accumulated in the last week, plus 50% of the points in the week before that, plus 20% the points in the week before that and 10% of the points in the week before that.

Let’s assume we have two streams of data which contain streams of gameplay events:

--
-- A stream containing 1 row per an individual view of a video
--
CREATE STREAM "s_video_view" (
++++++++++++++++"video_id" INTEGER,+++++-- the viewed video
++++++++++++++++"user_id" INTEGER,++++++-- the id of the viewer
++++++++++++++++"performer_id" INTEGER++-- the id of the performer
++++++++++++++++);


--
-- A stream containing 1 row per an individual rating of a video
--
CREATE STREAM "s_video_rating" (
++++++++++++++++"video_id" INTEGER,+++++-- the viewed video
++++++++++++++++"performer_id" INTEGER,+-- the id of performer
++++++++++++++++"rater_id" INTEGER,+++++-- the id of the rater
++++++++++++++++"rating" INTEGER++++++++-– the rating given
++++++++++++++++);

To handle all the described business logic, we must first compute the total number of points generated from each gameplay event and add that to the stream of tuples entering the system.  The following query accomplishes this:

SELECT STREAM "performer_id", "video_id", 12 AS "points"
+++++FROM "s_video_view"
UNION ALL
SELECT STREAM "performer_id", "video_id",
++++++++++CASE "s_video_rating"."rating" WHEN 2 THEN 1
+++++++++++++++++++++++++++++++++++++++++WHEN 3 THEN 8
+++++++++++++++++++++++++++++++++++++++++WHEN 4 THEN 20
+++++++++++++++++++++++++++++++++++++++++WHEN 5 THEN 30
+++++++++++++++++++++++++++++++++++++++++ELSE 0
+++++++++++++++++++++++++++++++++++++++++END AS "points"
+++++FROM "s_video_rating"

This query will produce a stream of data containing the performer_id, video_id, and points for each scoring event in the system.

Next we must compute rolling time based score over the event stream for each unique video_id.  SQLstream provides the SQL:99, SQL:2003, and SQL:2008 standard WINDOW facility to make this easy:

WINDOW "last_7_days" AS ( PARTITION BY "video_id"
++++++++++++++++++++++++++RANGE INTERVAL '7' DAY PRECEEDING),
+++++++"last_14_days" AS ( PARTITION BY "video_id"
+++++++++++++++++++++++++++RANGE INTERVAL '14' DAY PRECEEDING),
+++++++"last_21_days" AS ( PARTITION BY "video_id"
+++++++++++++++++++++++++++RANGE INTERVAL '21' DAY PRECEEDING),
+++++++"last_28_days" AS ( PARTITION BY "video_id"
+++++++++++++++++++++++++++RANGE INTERVAL '28' DAY PRECEEDING)

These rolling windows contain all the scoring events over the last 7, 14, 21, and 28 days respectively grouped by the video_id.  This means that any aggregation function applied to that window will be applied to the stream events with the same video_id.  So, COUNT(*) OVER "last_7_days" would produce one row for each unique video_id with a scoring event in the last week.  Those rows would contain a count of the number of scoring events for each unique video_id.

By subtracting the SUM of the points in the 7 day window from the number of points in the 14 day window, we can compute the number of points in the week starting two weeks ago and ending one week ago.  This technique allows us to implement computations on rolling windows that are not bounded by the current time.

Putting the entire example together, we get the following view:

CREATE VIEW "v_video_score" AS
+++++SELECT STREAM "video_id", "performer_id",
+++++++++++++++SUM("points") OVER "last_7_days" +
+++++++++++++++++((SUM("points") OVER "last_14_days" -
+++++++++++++++++++SUM("points") OVER "last_7_days") * 0.5) +
+++++++++++++++++((SUM("points") OVER "last_21_days" -
+++++++++++++++++++SUM("points") OVER "last_14_days") * 0.2) +
+++++++++++++++++((SUM("points") OVER "last_28_days" -
+++++++++++++++++++SUM("points") OVER "last_21_days") * 0.1) AS "score"
++++++++FROM ( SELECT STREAM "performer_id", "video_id", 12 AS "points"
++++++++++++++++++++FROM "s_video_view"
+++++++++++++++UNION ALL
+++++++++++++++SELECT STREAM "performer_id", "video_id",
+++++++++++++++++++++++++++CASE "s_video_rating"."rating" WHEN 2 THEN 1
+++++++++++++++++++++++++++++++++++++++++++++++++WHEN 3 THEN 8
+++++++++++++++++++++++++++++++++++++++++++++++++WHEN 4 THEN 20
+++++++++++++++++++++++++++++++++++++++++++++++++WHEN 5 THEN 30
+++++++++++++++++++++++++++++++++++++++++++++++++ELSE 0
+++++++++++++++++++++++++++++++++++++++++++++++++END AS "points"
++++++++++++++++++++FROM "s_video_rating" )
++++++++WINDOW "last_7_days" AS ( PARTITION BY "video_id"
++++++++++++++++++++++++++++++++++RANGE INTERVAL '7' DAY PRECEEDING),
+++++++++++++++"last_14_days" AS ( PARTITION BY "video_id"
+++++++++++++++++++++++++++++++++++RANGE INTERVAL '14' DAY PRECEEDING),
+++++++++++++++"last_21_days" AS ( PARTITION BY "video_id"
+++++++++++++++++++++++++++++++++++RANGE INTERVAL '21' DAY PRECEEDING),
+++++++++++++++"last_28_days" AS ( PARTITION BY "video_id"
+++++++++++++++++++++++++++++++++++RANGE INTERVAL '28' DAY PRECEEDING);

which produces a stream of rows containing the video_id, performer_id, and a score computed over the rolling windows.  This view outputs a row every time a new event is inserted added the system.  The complex business logic has been reduced to one SQL query.

By attaching the results of this query to a foreign table, a nearly real-time cache of the videos and their current scores can be maintained in your database.  Alternatively, by using SQLstream’s JDBC driver, a distributed caching system such as memcached could be kept updated with the latest scores for each video simply by calling this query.  But those are topics for another post.

I hope you enjoyed this simplified real world example of how streaming SQL can:

  • simplify your business logic processing
  • improve your ability to deliver real-time data to your customers, clients, and colleagues.

The Business Case for Streaming SQL

Monday, October 4th, 2010

Businesses need to respond faster than ever to customer information and demands, which are arriving in rapidly increasing volumes from ever more diverse and distributed systems. This need for real-time business models can not be addressed by traditional integration and business intelligence solutions because streaming analytics and related concepts are central to the solution. The real-time model means responding immediately to new information as it arrives and streaming analytics is at the core of these next generation IT systems.

Increasing the speed of business under these pressures of rapidly increasing data volume and more diverse data sources has been expensive and complex. Rapid responsiveness has proved elusive because real-time needs simply cannot be met by delivering more information faster from historical data. Real-time businesses require distributed technology that provides low latency and high-performance processing of data and event streams. By using continuous, streaming SQL queries, business answers can be generated as soon as input data becomes available. Whereas databases query historical data, streaming SQL queries and transforms data on the wire without any prior staging in a database.

As a result, streaming SQL is complementary to traditional EAI, business intelligence, and data warehousing solutions. By completing real-time processing and analysis before storing the data, streaming SQL delivers reduces the cost of processing rapidly arriving data. Even better, streaming SQL makes existing, in-house SQL skills immediately applicable to real-time analysis, reducing integration time and costs.

To learn more about the Business Case for Streaming SQL, please read our “Concepts in Streaming SQL” mini-white paper.

Streaming SQL and Bollinger Bands

Thursday, June 10th, 2010

Last year has been an interesting experience as I participated in a number of customer “Proof Of Concept” projects for SQLstream. Developing these real-time, stream computing projects greatly increased my appreciation for the advantages of an open, extensible and standards-compliant middleware infrastructure.

For example, I needed to implement an “edge detection” mechanism for a POC project. My colleagues at SQLstream recommended using “Bollinger bands” for determining outliers. So, I browsed through the  wikipedia entry for Bollinger Bands to learn more. Bollinger bands are very similar to standard deviations or quartile deviations. A Standard deviation measures variability or dispersion in data distribution. Bollinger bands, on the other hand, provide thresholds to filter outliers in the data. In fact, Bollinger bands are based on the moving average and moving standard deviation of the data set. For typical data sets, Bollinger bands can be defined as:

lowerBB(lower Bollinger Band) = avg – (k * stddev),

upperBB(upper Bollinger Band) = avg + (k* stddev)

where avg and stddev are the average and standard deviation over a sufficiently large time window and k is the constant that needs to be determined for the activity being monitored. For typical data sets, k = 2 will create the upper bollinger band at 95th percentile of the data set.

Bollinger Bands are widely used in the financial services industry. However, Bollinger Bands can be applied to solve problems in other industries. (As I am not claiming to be a statistics expert, I would certainly appreciate honest feedback on our application of Bollinger bands in streaming queries.)

Bollinger bands certainly are a good tool to identify sudden spikes in the activity being monitored in real-time. A number of examples come to my mind,

  • Sudden spikes in the price for a ticker symbol in a stock exchange. For example,

SELECT STREAM ROWTIME, ticker, price,

FROM (SELECT STREAM ROWTIME, ticker, price,

AVG(price) OVER (PARTITION BY ticker RANGE INTERVAL ‘1′ HOUR PRECEDING) AS “avgLastHour”,

STDDEV(price) OVER (PARTITION BY ticker RANGE INTERVAL ‘1′ HOUR PRECEDING) AS “stdDevLastHour”,

AVG(price) OVER (PARTITION BY ticker ROWS 5 PRECEDING) AS “avgLast5Trades”

FROM BIDS) AS S

WHERE S.”avgLast5Trades” > S.”avgLastHour” + 2 * S.”stdDevLastHour”;

  • Spikes in the error rate on a web server. For example,

SELECT STREAM ROWTIME, url, “numErrorsLastMinute”,

FROM (SELECT STREAM ROWTIME, url, “numErrorsLastMinute”,

AVG(“numErrorsLastMinute”) OVER (PARTITION BY url RANGE INTERVAL ‘1′ HOUR PRECEDING) AS “avgErrorsPerMinute”,

STDDEV(“numErrorsLastMinute”) OVER (PARTITION BY url RANGE INTERVAL ‘1′ HOUR PRECEDING) AS “stdDevErrorsPerMinute”

FROM “HttpRequestsPerMinute”) AS S

WHERE S.”numErrorsLastMinute” > S.”avgErrorsPerMinute” + 2 * S.”stdDevErrorsPerMinute”;

  • Monitoring call volumes in a call center.
  • Analytics on social/online gaming services.

In the Stream Computing context, Bollinger bands provide the high/low-water marks for monitoring activity. Whenever the level of recent activity crosses these Bollinger Band thresholds, the activity can be flagged. The streaming analytics engine can then perform additional analytics to detect patterns in the activity and to provide actionable information to regulate the system that is being monitored. At the very least, Bollinger bands can be used to filter out “uninteresting” rows from the stream, thereby reducing the load on the streaming pipeline.

At SQLstream, we used windowed aggregation functions such as AVG() OVER (…) and STDDEV() OVER (…) to establish Bollinger bands. It is necessary to compute AVG and STDDEV on sufficiently large windows of time. In a streaming context, we used sufficiently large windows of time to calculate Bollinger bands. So, as the window slides forward in time, the Bollinger bands reflect more recent activity levels. The current activity levels can then be computed on a much smaller window, potentially including only the current row in the stream. Should the current activity level cross either of the Bollinger bands, we then mark that as a spike in the activity level. The formula for Bollinger bands needs to be changed based on the data distribution, that is, to determine exactly what multiple of standard deviation is appropriate.

Coming back to my point about openness and extensibility, as you can see in the example queries above, you could execute a very similar query in Oracle or SQL server. Key features such as windowed aggregation functions, often called SQL OLAP functions, have been in SQLstream for a long time. Interestingly, SQLstream did not support STDDEV() windowed aggregation function during the POC. A lot of the SQL experts will know STDDEV can be easily rewritten using a formula involving AVG. Our Chief Architect, Julian Hyde, was quick enough to “sweeten” the deal by adding the “syntactic sugar” necessary to support STDDEV natively.

I am sure a lot of you readers have interesting ideas and questions. Please feel free to post them here and I will be happy to engage in conversation.