Posts Tagged ‘streaming SQL’

Real-time congestion detection with Streaming SQL

Wednesday, August 31st, 2011

I am going to discuss a SQLstream application for monitoring traffic flow in real-time. In this application, vehicles with GPS enabled devices transmit vehicle position along with other vehicle information such as speed and engine state. SQLstream receives this information as a real-time data stream and uses streaming SQL analytics to detect and predict the rapid onset of congestion on the road network in real-time.

Streaming SQL for Congestion Detection
The SQLstream application for congestion detection uses a typical streaming SQL processing pipeline. In this case, data is fed into the SQLstream pipeline using our Log File Adapter. SQLstream adapters provide an interface to sources and targets such as databases, log files, network sockets and mail servers. Adapters are built using SQL/MED specification which is part of ANSI SQL standard. In this application, each log file contains the vehicle positions on the road network for the latest minute.

Streaming SQL Pipeline for Real-time Traffic Congestion Detection (click to enlarge)

The conditioning pipeline performs data cleansing operations such as rejecting poor quality data (records with missing or out-of-bounds columns) followed by mapping of vehicle positions (lat/long pair) to a “road element” of the road network using a UDX to perform geo-spatial lookups in an external road network database.

The diagram and the example SQL below show our implementation of a streaming SQL pipeline for congestion detection. Each vehicle reports its position and speed every minute. Two consecutive vehicle positions are then used to interpolate vehicle speeds for each road element on the vehicle path between reporting positions. The interpolated speed is based on actual distance traveled by the vehicle between two consecutive reports. The interpolated speed is calculated in a User Defined Transform(UDX). The UDX is written in Java. The UDX also associates a confidence factor with each interpolated speed value based on the position of the road element relative to endpoints of the vehicle path.

Streaming Traffic Flow Analytics
As illustrated below, the analytics pipeline calculates 15, 5, 4, 3, 2 & 1 minute moving average speeds for each road element. Each road element is color coded based on the 15-minute moving average speed. The results are streamed to a Google Earth display.

CREATE OR REPLACE VIEW “EstimatedReSpeeds” AS
SELECT STREAM “RE”, “reID”, “Carriageway”, “rePrescribed”, “reSpeedLimit”,
++SUM(“reVehicles”) OVER “last1″ AS “reVehiclesLast1″,
++SUM(“reVehicles”) OVER “last2″ AS “reVehiclesLast2″,
++SUM(“reVehicles”) OVER “last3″ AS “reVehiclesLast3″,
++SUM(“reVehicles”) OVER “last4″ AS “reVehiclesLast4″,
++SUM(“reVehicles”) OVER “last5″ AS “reVehiclesLast5″,
++SUM(“reVehicles”) OVER “last15″ AS “reVehiclesLast15″,
++SUM(“reSpeed” * “reConfidence”) OVER “last1″ /
++SUM(“reConfidence”) OVER “last1″ AS “reSpeedLast1″,
++SUM(“reSpeed” * “reConfidence”) OVER “last2″ /
++SUM(“reConfidence”) OVER “last2″ AS “reSpeedLast2″,
++SUM(“reSpeed” * “reConfidence”) OVER “last3″ /
++SUM(“reConfidence”) OVER “last3″ AS “reSpeedLast3″,
++SUM(“reSpeed” * “reConfidence”) OVER “last4″ /
++SUM(“reConfidence”) OVER “last4″ AS “reSpeedLast4″,
++SUM(“reSpeed” * “reConfidence”) OVER “last5″ /
++SUM(“reConfidence”) OVER “last5″ AS “reSpeedLast5″,
++SUM(“reSpeed” * “reConfidence”) OVER “last15″ /
++SUM(“reConfidence”) OVER “last15″ AS “reSpeedLast15″
FROM “Stage3″
WINDOW “last1″ AS (PARTITION BY “RE”
++RANGE INTERVAL ‘1′ MINUTE PRECEDING),
+++++“last2″ AS (PARTITION BY “RE”
++RANGE INTERVAL ‘2′ MINUTE PRECEDING),
+++++“last3″ AS (PARTITION BY “RE”
++RANGE INTERVAL ‘3′ MINUTE PRECEDING),
+++++“last4″ AS (PARTITION BY “RE”
++RANGE INTERVAL ‘4′ MINUTE PRECEDING),
+++++“last5″ AS (PARTITION BY “RE”
++RANGE INTERVAL ‘5′ MINUTE PRECEDING),
+++++“last15″ AS (PARTITION BY “RE”
++RANGE INTERVAL ‘15′ MINUTE PRECEDING);

Detecting the rapid onset of congestion
Congestion is detected by comparing moving averages for the larger time window with that for the smaller time window. For example, comparing a 2-minute average with a 1-minute average:

CREATE OR REPLACE VIEW “CongestionRule1″ AS
SELECT STREAM
++–- name, ID, highway name, speed limit etc. for each road element
++“RE”, “reID”, “Carriageway”, “rePrescribed”, “reSpeedLimit”,
++–- volume of vehicle reports in each time window
++“reVehiclesLast1″, “reVehiclesLast2″, “reVehiclesLast3″,
++“reVehiclesLast4″, “reVehiclesLast5″, “reVehiclesLast15″,
++–- estimated avg speed for each road element
++“reSpeedLast1″, “reSpeedLast2″, “reSpeedLast3″,
++“reSpeedLast4″, “reSpeedLast5″,”reSpeedLast15″
FROM “EstimatedReSpeeds”
WHERE “reSpeedLast1″ < 0.80 * “reSpeedLast2″ AND – slowdown by 20 %
++“reSpeedLast2″ < 0.80 * “reSpeedLast3″ AND
++“reSpeedLast3″ < 0.80 * “reSpeedLast4″ AND
++“reSpeedLast4″ < 0.80 * “reSpeedLast5″ ;

SQLstream Traffic Congestion Detection - Visualization

SQLstream Traffic Congestion Detection - Visualization (click to enlarge)

Note that these estimated speeds are over overlapping windows and as such slowdown thresholds are set accordingly.

Fine tuning slowdown thresholds and other information, such as the proximity of traffic lights and the volume of vehicle reports in each time window, improves the quality of congestion detection algorithm.

The Google Earth screenshot illustrates real-time traffic view as well as detected slowdowns as pins. The severity of the slowdown is indicated by different shades of red.

Calculating Decaying Averages in Streaming SQL

Tuesday, August 23rd, 2011

At SQLstream we have a comprehensive implementation of standard SQL windowing operations such as SUM, COUNT and AVG. Recently though we needed a more sophisticated function for a decaying weighted average that would emphasize more recent samples over older samples. We implemented a new EXP_AVG() operation, as shown in this example query:

SELECT rowtime, ticker, price,
++EXP_AVG(price, INTERVAL ‘10′ SECOND) OVER w
FROM t
WINDOW w AS (PARTITION BY ticker
+++++++++++++ORDER BY rowtime
+++++++++++++RANGE INTERVAL ‘30′ SECOND PRECEDING);

EXP_AVG takes a value expression and an interval constant half life. In this example, two samples within the WINDOW are separated by 10 seconds, the older one will be given half as much weight as the newer one.

How would this look in standard SQL without an EXP_AVG function?

I thought it would be interesting to look at how a decaying average would be implemented in SQL without an EXP_AVG operation, as under the covers we implement it using standard SQL windowed sums. If we knew that the times for our rows would always fall under a narrow range, we would increase the weights of the samples as time goes forwards and simply scale down the total (rather than lowering the weights of samples as time goes backwards from the most recent sample). This would be straightforward. The SQL would look something like this.

First, we need a function to turn our time based units into something on which we can do arithmetic:

CREATE FUNCTION toSeconds(t TIMESTAMP, offset TIMESTAMP)
RETURNS DECIMAL(12,3) CONTAINS SQL
RETURN CAST((t – offset) SECOND(9,3) AS DECIMAL(12,3));

Then, we need a function for calculating exponential weight:

CREATE FUNCTION expWeight (seconds DOUBLE, halfLife DOUBLE)
RETURNS DOUBLE CONTAINS SQL
RETURN EXP(seconds * (LN(2)/halfLife));

The complete SQL will then be:

SELECT rowtime, ticker,
++SUM(price * expWeight(toSeconds(rowtime, OFFSET), 10)) OVER w
++/ SUM(expWeight(toSeconds(rowtime, OFFSET), 10)) OVER w
++as avgPrice
FROM t
WINDOW w AS (PARTITION BY ticker
+++++++++++++ORDER BY rowtime
+++++++++++++RANGE INTERVAL ‘30′ SECOND PRECEDING);

Unfortunately this would not work in practice as the expWeight function would overflow once the row times got more than a few halfLifes advanced from OFFSET. We can’t reset the offset as long as there are any non zero values still in the window. This gives us our out. What we can do is partition the incoming values into two windowed aggregates, always sending zeros to one of them and switching and reseting the offset when the window for that aggregate fills with zeros, this way we’re never summing values whose weights are calculated using different starting offsets.

Here’s the SQL for this. We’ll need some more functions describing when and how we reset our offsets. We’ll need an arbitrary reference for our time calculations:

CREATE FUNCTION toSeconds(t TIMESTAMP)
RETURNS DECIMAL(12,3) CONTAINS SQL
RETURN CAST((t – TIMESTAMP ‘1970-01-01 00:00:00′) SECOND(9,3)
+++++++++++AS DECIMAL(12,3));

To use modulo arithmetic in SQL, convert to an integer:

CREATE FUNCTION toMillis(t TIMESTAMP)
RETURNS BIGINT CONTAINS SQL
RETURN CAST(toSeconds(t))*1000 aS BIGINT);

We’ll want to partition time into window sized epochs starting at our arbitrary time reference:

CREATE FUNCTION isEvenEpoch (t TIMESTAMP, windowSeconds INT)
RETURNS BOOLEAN CONTAINS SQL
RETURN MOD(toMillis(t),
++windowSeconds*2000)< windowSeconds*1000;

We’ll use the start of each epoch as the offset for all rows that fall in that epoch:

CREATE FUNCTION epochStart (t TIMESTAMP, windowSeconds INT)
RETURNS decimal(12,3) CONTAINS SQL
RETURN CAST(MOD(toMillis(t), windowSeconds*1000)
+++++++++++AS DECIMAL(12,3))/1000;

The following two functions are used to correct the inserting of zeros into a window and all the non zero values still in the window are from the previous epoch:

CREATE FUNCTION evenAgingFactor (t TIMESTAMP,
+++++++++++++windowSeconds INT, halflife DOUBLE)
RETURNS DOUBLE CONTAINS SQL
RETURN CASE WHEN isEvenEpoch(t, windowSeconds)
+++++++++++++THEN 1
+++++++++++++ELSE expWeight(-windowSeconds, halflife) END;
CREATE FUNCTION oddAgingFactor (t TIMESTAMP,
+++++++++++++windowSeconds INT, halflife DOUBLE)
RETURNS double CONTAINS SQL
RETURN CASE WHEN isEvenEpoch(t, windowSeconds)
+++++++++++++THEN expWeight(-windowSeconds, halflife)
+++++++++++++ELSE 1 END;

The complete SQL that uses the two window approach to calculate the decaying average using a 30 second window and a 10 second half life is:

SELECT rowtime, ticker,
+++(oddAgingFactor(rowtime, 30, 10) *
++++SUM(CASE WHEN isEvenEpoch(rowtime, 30)
++++++++THEN 0
++++++++ELSE price * expWeight(epochStart(rowtime, 30), 10) END) OVER w
+++++ evenAgingFactor(rowtime, 30, 10) *
++++SUM(CASE WHEN isEvenEpoch(rowtime, 30)
++++++++THEN price * expWeight(epochStart(rowtime, 30), 10)
++++++++ELSE 0 END) OVER w)
++++/
++++(oddAgingFactor(rowtime, 30, 10) *
++++SUM(CASE WHEN isEvenEpoch(rowtime, 30)
++++++++THEN 0
++++++++ELSE expWeight(epochStart(rowtime, 30), 10) END) OVER w
+++++ evenAgingFactor(rowtime, 30, 10) *
++++SUM(CASE WHEN isEvenEpoch(rowtime, 30)
++++++++THEN expWeight(epochStart(rowtime, 30), 10)
++++++++ELSE 0 END) OVER w)
as avgPrice
FROM t WINDOW w AS (PARTITION BY TICKER
++++++++++++++++++++ORDER BY rowtime
++++++++++++++++++++RANGE INTERVAL ‘30′ SECOND PRECEDING);

To understand how this would work in practice lets assume we start our query at 12:00. We’ll only look at the numerator of our query as the denominator uses the same technique to calculate a weighted count.

Time Window A state Window B state
12:00:00 Empty Empty
12:00:00 – 12:00:30 Values with increasing weights being added. Zeros being added.
12:00:30 Has weighted sum * expWeight(30,10) Zero
12:00:30 – 12:01:00 Zeros being added. Will have older part of weighted sum. Values with increasing weights being added. Will have newer part of weighted sum.
12:01:00 Zero. All non zero values will have been aged out. We can safely reset our scaling factor. Has weighted sum * expWeight(30,10)

With care to apply appropriate scaling to the parts this SQL can be used to calculate a weighted average using standard windowed operations. However, I think you’ll agree, it’s easier with our EXP_AVG() operation.

SQLstream at PostgreSQL Conference: West 2010 (2)

Monday, November 8th, 2010

PostgreSQL Conference: West 2010Attendance at the PostgreSQL West 2010 Conference was encouraging considering a million people had gathered in the city to celebrate the World Series victory of the San Francisco Giants.

I’ve posted the presentation from the event (previously blogged about here) as given by myself and SQLstream’s chief architect Julian Hyde.

We presented the concepts of Streaming GIS, integrating SQLsteam’s real-time streaming data analytics with the PostgreSQL-based Geographic Information Systems (GIS) engine PostGIS. With examples from SQLstream’s commercial traffic congestion monitoring application, we discussed how sophisticated high performance real-time geospatial applications can be delivered quickly and easily using standards-based SQL.

Click here for a non-slideshare PDF version.

SQLstream at PostgreSQL Conference: West 2010

Tuesday, November 2nd, 2010

PostgreSQL Conference: West 2010SQLstream’s chief architect Julian Hyde and founding engineer Sunil Mujumdar are to present at PostgreSQL Conference: West 2010.

In a talk entitled Streaming GIS using PostGIS and SQLstream, Julian and Sunil will describe the SQLstream stream computing platform based on industry-standard SQL, and its integration with the PostgreSQL-based Geographic Information Systems (GIS) engine PostGIS.

Wireless sensors and internet services are generating data faster than conventional database technologies can process that data. In particular, mobile resource management requires the stream processing of high volume, location-based data. Solutions require new methods such as Streaming SQL to address these new sources of big data, in real-time. We’ll be discussing key concepts in stream computing, data warehouse feeds and the integration of PostGIS into a high performance streaming environment.

Using mobile resource management as a case study, we will illustrate with examples from SQLstream’s commercial traffic monitoring application.

Concepts in Streaming SQL

Tuesday, October 26th, 2010

A streaming SQL query is a continuous, standing query that executes over streaming data. Data streams are processed using familiar SQL relational operators augmented to handle time sensitive data. Streaming queries are similar to database queries in how they analyze data; they differ by operating continuously on data as they arrive and by updating results in real-time.

Streaming SQL queries process dynamic, flowing data, in contrast to traditional RDBMSs, which process static, stored data with repeated single-shot queries. Streaming SQL is simple to configure using existing IT skills, dramatically reducing integration cost and complexity. Combining the intuitive power of SQL with this simplicity of configuration enables much faster implementation of business ideas, while retaining the scalability and investment protection important for business-critical systems.

By processing transactions continuously, streaming SQL directly addresses the real-time business needs for low latency, high volume, and rapid integration. Complex, time-sensitive transformations and analytics, operating continuously across multiple input data sources, are simple to configure and generate streaming-analytics answers as input data arrive. Sources can include any application inputs or outputs, or any of the data feeds processed or generated within an enterprise. Examples include financial trading data, internet clickstream data, sensor data, and exception events. SQL can process multiple input and output streams of data, for multiple publishers and subscribers. To learn more about Streaming SQL, please read our “Concepts in Streaming SQL” mini-white paper.

How SQLstream Can Reduce Complex Business Logic to a Single SQL Query

Monday, October 18th, 2010

In the game industry, complex game logic needs to be applied to streams of events generated by gameplay.  In single player games, this logic is simply handled by applying the correct computations.  However, in an Internet based social game where millions of players interact together online, the problem takes on an entirely different dimension.  Storing the game events on disk inside of a database becomes increasingly difficult as the rate of gameplay events increases.  Logic and computation must be applied to the data and a disparate set of data must be queried to correctly update the game state.

The solution can be surprisingly simple and similar in form to storing all gameplay events in a traditional database.  SQLstream’s powerful streaming and windowed aggregation capabilities can reduce this use case and complex logic to a single query.

For example, consider a game where users create videos that are viewed and rated by other users:

  • A score for the video must be computed based upon the last 4 weeks of gameplay.
  • Let’s say a view is worth 12 points and various ratings levels are worth between 0 and 30 points.
  • The content’s score is the total points accumulated in the last week, plus 50% of the points in the week before that, plus 20% the points in the week before that and 10% of the points in the week before that.

Let’s assume we have two streams of data which contain streams of gameplay events:

--
-- A stream containing 1 row per an individual view of a video
--
CREATE STREAM "s_video_view" (
++++++++++++++++"video_id" INTEGER,+++++-- the viewed video
++++++++++++++++"user_id" INTEGER,++++++-- the id of the viewer
++++++++++++++++"performer_id" INTEGER++-- the id of the performer
++++++++++++++++);


--
-- A stream containing 1 row per an individual rating of a video
--
CREATE STREAM "s_video_rating" (
++++++++++++++++"video_id" INTEGER,+++++-- the viewed video
++++++++++++++++"performer_id" INTEGER,+-- the id of performer
++++++++++++++++"rater_id" INTEGER,+++++-- the id of the rater
++++++++++++++++"rating" INTEGER++++++++-– the rating given
++++++++++++++++);

To handle all the described business logic, we must first compute the total number of points generated from each gameplay event and add that to the stream of tuples entering the system.  The following query accomplishes this:

SELECT STREAM "performer_id", "video_id", 12 AS "points"
+++++FROM "s_video_view"
UNION ALL
SELECT STREAM "performer_id", "video_id",
++++++++++CASE "s_video_rating"."rating" WHEN 2 THEN 1
+++++++++++++++++++++++++++++++++++++++++WHEN 3 THEN 8
+++++++++++++++++++++++++++++++++++++++++WHEN 4 THEN 20
+++++++++++++++++++++++++++++++++++++++++WHEN 5 THEN 30
+++++++++++++++++++++++++++++++++++++++++ELSE 0
+++++++++++++++++++++++++++++++++++++++++END AS "points"
+++++FROM "s_video_rating"

This query will produce a stream of data containing the performer_id, video_id, and points for each scoring event in the system.

Next we must compute rolling time based score over the event stream for each unique video_id.  SQLstream provides the SQL:99, SQL:2003, and SQL:2008 standard WINDOW facility to make this easy:

WINDOW "last_7_days" AS ( PARTITION BY "video_id"
++++++++++++++++++++++++++RANGE INTERVAL '7' DAY PRECEEDING),
+++++++"last_14_days" AS ( PARTITION BY "video_id"
+++++++++++++++++++++++++++RANGE INTERVAL '14' DAY PRECEEDING),
+++++++"last_21_days" AS ( PARTITION BY "video_id"
+++++++++++++++++++++++++++RANGE INTERVAL '21' DAY PRECEEDING),
+++++++"last_28_days" AS ( PARTITION BY "video_id"
+++++++++++++++++++++++++++RANGE INTERVAL '28' DAY PRECEEDING)

These rolling windows contain all the scoring events over the last 7, 14, 21, and 28 days respectively grouped by the video_id.  This means that any aggregation function applied to that window will be applied to the stream events with the same video_id.  So, COUNT(*) OVER "last_7_days" would produce one row for each unique video_id with a scoring event in the last week.  Those rows would contain a count of the number of scoring events for each unique video_id.

By subtracting the SUM of the points in the 7 day window from the number of points in the 14 day window, we can compute the number of points in the week starting two weeks ago and ending one week ago.  This technique allows us to implement computations on rolling windows that are not bounded by the current time.

Putting the entire example together, we get the following view:

CREATE VIEW "v_video_score" AS
+++++SELECT STREAM "video_id", "performer_id",
+++++++++++++++SUM("points") OVER "last_7_days" +
+++++++++++++++++((SUM("points") OVER "last_14_days" -
+++++++++++++++++++SUM("points") OVER "last_7_days") * 0.5) +
+++++++++++++++++((SUM("points") OVER "last_21_days" -
+++++++++++++++++++SUM("points") OVER "last_14_days") * 0.2) +
+++++++++++++++++((SUM("points") OVER "last_28_days" -
+++++++++++++++++++SUM("points") OVER "last_21_days") * 0.1) AS "score"
++++++++FROM ( SELECT STREAM "performer_id", "video_id", 12 AS "points"
++++++++++++++++++++FROM "s_video_view"
+++++++++++++++UNION ALL
+++++++++++++++SELECT STREAM "performer_id", "video_id",
+++++++++++++++++++++++++++CASE "s_video_rating"."rating" WHEN 2 THEN 1
+++++++++++++++++++++++++++++++++++++++++++++++++WHEN 3 THEN 8
+++++++++++++++++++++++++++++++++++++++++++++++++WHEN 4 THEN 20
+++++++++++++++++++++++++++++++++++++++++++++++++WHEN 5 THEN 30
+++++++++++++++++++++++++++++++++++++++++++++++++ELSE 0
+++++++++++++++++++++++++++++++++++++++++++++++++END AS "points"
++++++++++++++++++++FROM "s_video_rating" )
++++++++WINDOW "last_7_days" AS ( PARTITION BY "video_id"
++++++++++++++++++++++++++++++++++RANGE INTERVAL '7' DAY PRECEEDING),
+++++++++++++++"last_14_days" AS ( PARTITION BY "video_id"
+++++++++++++++++++++++++++++++++++RANGE INTERVAL '14' DAY PRECEEDING),
+++++++++++++++"last_21_days" AS ( PARTITION BY "video_id"
+++++++++++++++++++++++++++++++++++RANGE INTERVAL '21' DAY PRECEEDING),
+++++++++++++++"last_28_days" AS ( PARTITION BY "video_id"
+++++++++++++++++++++++++++++++++++RANGE INTERVAL '28' DAY PRECEEDING);

which produces a stream of rows containing the video_id, performer_id, and a score computed over the rolling windows.  This view outputs a row every time a new event is inserted added the system.  The complex business logic has been reduced to one SQL query.

By attaching the results of this query to a foreign table, a nearly real-time cache of the videos and their current scores can be maintained in your database.  Alternatively, by using SQLstream’s JDBC driver, a distributed caching system such as memcached could be kept updated with the latest scores for each video simply by calling this query.  But those are topics for another post.

I hope you enjoyed this simplified real world example of how streaming SQL can:

  • simplify your business logic processing
  • improve your ability to deliver real-time data to your customers, clients, and colleagues.

The Business Case for Streaming SQL

Monday, October 4th, 2010

Businesses need to respond faster than ever to customer information and demands, which are arriving in rapidly increasing volumes from ever more diverse and distributed systems. This need for real-time business models can not be addressed by traditional integration and business intelligence solutions because streaming analytics and related concepts are central to the solution. The real-time model means responding immediately to new information as it arrives and streaming analytics is at the core of these next generation IT systems.

Increasing the speed of business under these pressures of rapidly increasing data volume and more diverse data sources has been expensive and complex. Rapid responsiveness has proved elusive because real-time needs simply cannot be met by delivering more information faster from historical data. Real-time businesses require distributed technology that provides low latency and high-performance processing of data and event streams. By using continuous, streaming SQL queries, business answers can be generated as soon as input data becomes available. Whereas databases query historical data, streaming SQL queries and transforms data on the wire without any prior staging in a database.

As a result, streaming SQL is complementary to traditional EAI, business intelligence, and data warehousing solutions. By completing real-time processing and analysis before storing the data, streaming SQL delivers reduces the cost of processing rapidly arriving data. Even better, streaming SQL makes existing, in-house SQL skills immediately applicable to real-time analysis, reducing integration time and costs.

To learn more about the Business Case for Streaming SQL, please read our “Concepts in Streaming SQL” mini-white paper.

Big Data – Dealing with the Data Tsunami

Tuesday, June 22nd, 2010

GigaOM Structure 2010 Big Data and Cloud Computing There is a lot of buzz these days about the challenge of “Big Data”.  I’ll be speaking on the subject at GigaOM’s Structure2010, on the “DEALING WITH THE DATA TSUNAMI: THE BIG DATA” panel. There are many dimensions to the challenges posed by “Big Data”, which I’ve presented here as five separate but related themes.

Speed of data arrival

The first theme is speed.  When a lot of data arrive fast, it is often overlooked that they arrive in raw form and need to be processed or cooked before they can be of any real value. The processing normally comprises cleaning, filtering, aggregating and validating.  Sometimes the data need to be enhanced, normalized or de-normalized.  While there are a number of proprietary ETL tools out there that can help, most people prefer to perform these operations using SQL.  This approach has become known as ELT as the data are Extracted, Loaded and then Transformed (as opposed to Transformed then Loaded).  In the past, this has meant loading raw data into a data warehouse’s staging tables and then performing the ELT with SQL in batches until the data are fully cooked and ready to take part in the “main course” queries. 

One of the strengths of the SQLstream approach is that for the first time you can use standards-based SQL for performing these ELT steps but as Continuous ETL rather than operating upon the data after first storing it.  We call this “analyze-before-store” approach: Query the Future – as the scope of the continuous queries is from the moment they start until the end of future time (in contrast with historical queries whose scope is from the moment they start until as far back in time as the data are stored).  SQLstream’s queries continuously process, clean, aggregate and enhance the data in a highly parallelized dataflow pipelined process.  The staging is in main memory using 64-bit architecture and multiple cores and servers.  This provides a highly scalable efficient and cost effective solution to ETL, with the virtuous side-effect of enabling the data warehouse to be kept continuously up-to-date by feeding it a stream of fully cooked data and updating its aggregate tables continuously in near real-time.  All of this is done without stealing valuable cycles of the data warehouse server.

Data location

The second theme is data location.  Like houses, location is very important when it comes to assessing the value (or usefulness) of the data.  Location might be spatial or temporal.  If you wish to be alerted of a special price for gas at a specific gas station, clearly it is of greater value if you are currently in the immediate vicinity of the gas station.  This shows the value of both the location in space and the location in time.  In contrast, most data warehouses dumbly store all service data and records without regard to their value. 

Clearly, the value of the data in many cases greatly diminishes over time.  Many of the queries that a business might pose are better targeted at current data.  That is particularly true of targeted advertisements, but also when monitoring customer service level, cloud computing infrastructure and the like.  The data are much more valuable when the business is able to take proactive initiative to capitalize on the value – fixing problems or issues before they negatively impact customers, or making that promotion or sale before the customer purchases product or service from a competitor.  SQLstream’s continuous queries are all about focusing analytics where they have the most value by specifying explicit windows of focus for the queries in terms of time, quantity or space.  While many rows can flow into and out of the window of focus for any given query, the window represents the immediate focus of attention.

Pace of change

The third theme is the pace of change of data.  If you have a large quantity of data that is not changing very much, then historical queries and analysis will no doubt provide you with all of your answers.  However, if the data are changing constantly, or a lot of new data arriving constantly, or if you have a focus on a specific window of time or space, then historical analysis has little value.  What you care about is the derivative of the change – the rates of change.  For example, are our sales accelerating or decelerating?  Is the rate of acceleration unusually high or low?  What about service outages and error rates?  Or customer complaints?  The SQLstream approach enables you to see what is changing rather than what is staying the same.  It is analogous to predator vision: the predators want to see what is moving and their vision system prioritizes that over what remains motionless.  SQLstream provides such dynamic vision.

Balancing historical and continuous analysis

The fourth theme is the need to complement data mining and the results of historical analysis with continuous analysis.  Data warehousing allows you to find patterns and predictors from past data and to back test all of your hypotheses over extended periods of time.  The back testing of such hypotheses often takes the form of SQL queries that search for patterns of changes of data over time and check that the predicted results occurred and with what frequency.  Once you have mined and captured such valuable predictors, it is straightforward to take the SQL you have generated and tweak it to be used in real-time, continuously executed against live data.  Using this approach, SQLstream allows you to leverage you data mining results to perform real-time predictive analytics, giving your business a real-time heads up for key indicators of buying signals, or systems’ failure or what ad should be served up based on a customer’s web behavior.

Brain over brawn processing

My fifth and final theme is “smart declarative” versus “dumb brute force” when applied to data queries.  The latter is how I see Hadoop-based approaches.  You parallelize a problem to take advantage of a lot of available servers and related CPU cycles, but you do not rely on any intelligence on how you partition the problem.  In fact not having to “think” is one of the primary appeals of the technique.  It is a brute force method of brawn over brain.  However, where the problem space is truly huge, or the time or financial budget is more limited, there is always the attraction of the “brain over brawn” technique.  Declarative SQL processing draws upon the mathematical tractability of analyzing patterns and dependencies within the data, the use of keys and indexing, the rewriting of complex formulae into simpler ones and avoiding recalculation of intermediate results – in order to provide a faster, more efficient and smarter way of finding the solutions.  Such declarative techniques can still take extensive advantage of parallelism and inexpensive or available servers and CPU cycles, but they rely on smart analysis in order to optimize the calculations.  SQLstream, and all SQL-based data warehouses, heavily draw upon these mathematical SQL properties and patterns and analysis of the data to do the smart thing when it comes to query processing. 

Stream Computing of the kind embodied by SQLstream however has even greater potential to take advantage of parallelism over and above SQL data warehouses because SQLstream’s Stream Computing has no transactional bottleneck and is purely declarative.  Input streams are not “side-effected” by the execution of stream SQL statements, rather new streams are created from the original ones (which are left untouched and can be presented concurrently to other SQLstream servers).  The execution paradigm is one of parallel dataflow execution – a paradigm that lends itself not only to massive parallel execution but also to massively distributed execution.  I believe that as Hadoop becomes more widely understood and deployed, people will begin to see just how much of a better job could be performed by adding a little intelligence and just how powerful declarative stream computing can be.

Streaming SQL and Bollinger Bands

Thursday, June 10th, 2010

Last year has been an interesting experience as I participated in a number of customer “Proof Of Concept” projects for SQLstream. Developing these real-time, stream computing projects greatly increased my appreciation for the advantages of an open, extensible and standards-compliant middleware infrastructure.

For example, I needed to implement an “edge detection” mechanism for a POC project. My colleagues at SQLstream recommended using “Bollinger bands” for determining outliers. So, I browsed through the  wikipedia entry for Bollinger Bands to learn more. Bollinger bands are very similar to standard deviations or quartile deviations. A Standard deviation measures variability or dispersion in data distribution. Bollinger bands, on the other hand, provide thresholds to filter outliers in the data. In fact, Bollinger bands are based on the moving average and moving standard deviation of the data set. For typical data sets, Bollinger bands can be defined as:

lowerBB(lower Bollinger Band) = avg – (k * stddev),

upperBB(upper Bollinger Band) = avg + (k* stddev)

where avg and stddev are the average and standard deviation over a sufficiently large time window and k is the constant that needs to be determined for the activity being monitored. For typical data sets, k = 2 will create the upper bollinger band at 95th percentile of the data set.

Bollinger Bands are widely used in the financial services industry. However, Bollinger Bands can be applied to solve problems in other industries. (As I am not claiming to be a statistics expert, I would certainly appreciate honest feedback on our application of Bollinger bands in streaming queries.)

Bollinger bands certainly are a good tool to identify sudden spikes in the activity being monitored in real-time. A number of examples come to my mind,

  • Sudden spikes in the price for a ticker symbol in a stock exchange. For example,

SELECT STREAM ROWTIME, ticker, price,

FROM (SELECT STREAM ROWTIME, ticker, price,

AVG(price) OVER (PARTITION BY ticker RANGE INTERVAL ‘1′ HOUR PRECEDING) AS “avgLastHour”,

STDDEV(price) OVER (PARTITION BY ticker RANGE INTERVAL ‘1′ HOUR PRECEDING) AS “stdDevLastHour”,

AVG(price) OVER (PARTITION BY ticker ROWS 5 PRECEDING) AS “avgLast5Trades”

FROM BIDS) AS S

WHERE S.”avgLast5Trades” > S.”avgLastHour” + 2 * S.”stdDevLastHour”;

  • Spikes in the error rate on a web server. For example,

SELECT STREAM ROWTIME, url, “numErrorsLastMinute”,

FROM (SELECT STREAM ROWTIME, url, “numErrorsLastMinute”,

AVG(“numErrorsLastMinute”) OVER (PARTITION BY url RANGE INTERVAL ‘1′ HOUR PRECEDING) AS “avgErrorsPerMinute”,

STDDEV(“numErrorsLastMinute”) OVER (PARTITION BY url RANGE INTERVAL ‘1′ HOUR PRECEDING) AS “stdDevErrorsPerMinute”

FROM “HttpRequestsPerMinute”) AS S

WHERE S.”numErrorsLastMinute” > S.”avgErrorsPerMinute” + 2 * S.”stdDevErrorsPerMinute”;

  • Monitoring call volumes in a call center.
  • Analytics on social/online gaming services.

In the Stream Computing context, Bollinger bands provide the high/low-water marks for monitoring activity. Whenever the level of recent activity crosses these Bollinger Band thresholds, the activity can be flagged. The streaming analytics engine can then perform additional analytics to detect patterns in the activity and to provide actionable information to regulate the system that is being monitored. At the very least, Bollinger bands can be used to filter out “uninteresting” rows from the stream, thereby reducing the load on the streaming pipeline.

At SQLstream, we used windowed aggregation functions such as AVG() OVER (…) and STDDEV() OVER (…) to establish Bollinger bands. It is necessary to compute AVG and STDDEV on sufficiently large windows of time. In a streaming context, we used sufficiently large windows of time to calculate Bollinger bands. So, as the window slides forward in time, the Bollinger bands reflect more recent activity levels. The current activity levels can then be computed on a much smaller window, potentially including only the current row in the stream. Should the current activity level cross either of the Bollinger bands, we then mark that as a spike in the activity level. The formula for Bollinger bands needs to be changed based on the data distribution, that is, to determine exactly what multiple of standard deviation is appropriate.

Coming back to my point about openness and extensibility, as you can see in the example queries above, you could execute a very similar query in Oracle or SQL server. Key features such as windowed aggregation functions, often called SQL OLAP functions, have been in SQLstream for a long time. Interestingly, SQLstream did not support STDDEV() windowed aggregation function during the POC. A lot of the SQL experts will know STDDEV can be easily rewritten using a formula involving AVG. Our Chief Architect, Julian Hyde, was quick enough to “sweeten” the deal by adding the “syntactic sugar” necessary to support STDDEV natively.

I am sure a lot of you readers have interesting ideas and questions. Please feel free to post them here and I will be happy to engage in conversation.