Blog

August 31, 2011

I am going to discuss a SQLstream application for monitoring traffic flow in real-time. In this application, vehicles with GPS enabled devices transmit vehicle position along with other vehicle information such as speed and engine state. SQLstream receives this information as a real-time data stream and uses streaming SQL analytics to detect and predict the rapid onset of congestion on the road network in real-time.

Streaming SQL for Congestion Detection
The SQLstream application for congestion detection uses a typical streaming SQL processing pipeline. In this case, data is fed into the SQLstream pipeline using our Log File Adapter. SQLstream adapters provide an interface to sources and targets such as databases, log files, network sockets and mail servers. Adapters are built using SQL/MED specification which is part of ANSI SQL standard. In this application, each log file contains the vehicle positions on the road network for the latest minute.

The conditioning pipeline performs data cleansing operations such as rejecting poor quality data (records with missing or out-of-bounds columns) followed by mapping of vehicle positions (lat/long pair) to a “road element” of the road network using a UDX to perform geo-spatial lookups in an external road network database.

The diagram and the example SQL below show our implementation of a streaming SQL pipeline for congestion detection. Each vehicle reports its position and speed every minute. Two consecutive vehicle positions are then used to interpolate vehicle speeds for each road element on the vehicle path between reporting positions. The interpolated speed is based on actual distance traveled by the vehicle between two consecutive reports. The interpolated speed is calculated in a User Defined Transform(UDX). The UDX is written in Java. The UDX also associates a confidence factor with each interpolated speed value based on the position of the road element relative to endpoints of the vehicle path.

Streaming Traffic Flow Analytics
As illustrated below, the analytics pipeline calculates 15, 5, 4, 3, 2 & 1 minute moving average speeds for each road element. Each road element is color coded based on the 15-minute moving average speed. The results are streamed to a Google Earth display.

CREATE OR REPLACE VIEW “EstimatedReSpeeds” AS
SELECT STREAM “RE”, “reID”, “Carriageway”, “rePrescribed”, “reSpeedLimit”,
++SUM(“reVehicles”) OVER “last1″ AS “reVehiclesLast1″,
++SUM(“reVehicles”) OVER “last2″ AS “reVehiclesLast2″,
++SUM(“reVehicles”) OVER “last3″ AS “reVehiclesLast3″,
++SUM(“reVehicles”) OVER “last4″ AS “reVehiclesLast4″,
++SUM(“reVehicles”) OVER “last5″ AS “reVehiclesLast5″,
++SUM(“reVehicles”) OVER “last15″ AS “reVehiclesLast15″,
++SUM(“reSpeed” * “reConfidence”) OVER “last1″ /
++SUM(“reConfidence”) OVER “last1″ AS “reSpeedLast1″,
++SUM(“reSpeed” * “reConfidence”) OVER “last2″ /
++SUM(“reConfidence”) OVER “last2″ AS “reSpeedLast2″,
++SUM(“reSpeed” * “reConfidence”) OVER “last3″ /
++SUM(“reConfidence”) OVER “last3″ AS “reSpeedLast3″,
++SUM(“reSpeed” * “reConfidence”) OVER “last4″ /
++SUM(“reConfidence”) OVER “last4″ AS “reSpeedLast4″,
++SUM(“reSpeed” * “reConfidence”) OVER “last5″ /
++SUM(“reConfidence”) OVER “last5″ AS “reSpeedLast5″,
++SUM(“reSpeed” * “reConfidence”) OVER “last15″ /
++SUM(“reConfidence”) OVER “last15″ AS “reSpeedLast15″
FROM “Stage3″
WINDOW “last1″ AS (PARTITION BY “RE”
++RANGE INTERVAL ’1′ MINUTE PRECEDING),
+++++“last2″ AS (PARTITION BY “RE”
++RANGE INTERVAL ’2′ MINUTE PRECEDING),
+++++“last3″ AS (PARTITION BY “RE”
++RANGE INTERVAL ’3′ MINUTE PRECEDING),
+++++“last4″ AS (PARTITION BY “RE”
++RANGE INTERVAL ’4′ MINUTE PRECEDING),
+++++“last5″ AS (PARTITION BY “RE”
++RANGE INTERVAL ’5′ MINUTE PRECEDING),
+++++“last15″ AS (PARTITION BY “RE”
++RANGE INTERVAL ’15′ MINUTE PRECEDING);

Detecting the rapid onset of congestion
Congestion is detected by comparing moving averages for the larger time window with that for the smaller time window. For example, comparing a 2-minute average with a 1-minute average:

CREATE OR REPLACE VIEW “CongestionRule1″ AS
SELECT STREAM
++–- name, ID, highway name, speed limit etc. for each road element
++“RE”, “reID”, “Carriageway”, “rePrescribed”, “reSpeedLimit”,
++–- volume of vehicle reports in each time window
++“reVehiclesLast1″, “reVehiclesLast2″, “reVehiclesLast3″,
++“reVehiclesLast4″, “reVehiclesLast5″, “reVehiclesLast15″,
++–- estimated avg speed for each road element
++“reSpeedLast1″, “reSpeedLast2″, “reSpeedLast3″,
++“reSpeedLast4″, “reSpeedLast5″,”reSpeedLast15″
FROM “EstimatedReSpeeds”
WHERE “reSpeedLast1″ < 0.80 * “reSpeedLast2″ AND – slowdown by 20 %
++“reSpeedLast2″ < 0.80 * “reSpeedLast3″ AND
++“reSpeedLast3″ < 0.80 * “reSpeedLast4″ AND
++“reSpeedLast4″ < 0.80 * “reSpeedLast5″ ;

SQLstream Traffic Congestion Detection - Visualization

Note that these estimated speeds are over overlapping windows and as such slowdown thresholds are set accordingly.

Fine tuning slowdown thresholds and other information, such as the proximity of traffic lights and the volume of vehicle reports in each time window, improves the quality of congestion detection algorithm.

The Google Earth screenshot illustrates real-time traffic view as well as detected slowdowns as pins. The severity of the slowdown is indicated by different shades of red.

August 23, 2011

At SQLstream we have a comprehensive implementation of standard SQL windowing operations such as SUM, COUNT and AVG. Recently though we needed a more sophisticated function for a decaying weighted average that would emphasize more recent samples over older samples. We implemented a new EXP_AVG() operation, as shown in this example query:

SELECT rowtime, ticker, price,
++EXP_AVG(price, INTERVAL ’10′ SECOND) OVER w
FROM t
WINDOW w AS (PARTITION BY ticker
+++++++++++++ORDER BY rowtime
+++++++++++++RANGE INTERVAL ’30′ SECOND PRECEDING);

EXP_AVG takes a value expression and an interval constant half life. In this example, two samples within the WINDOW are separated by 10 seconds, the older one will be given half as much weight as the newer one.

How would this look in standard SQL without an EXP_AVG function?

I thought it would be interesting to look at how a decaying average would be implemented in SQL without an EXP_AVG operation, as under the covers we implement it using standard SQL windowed sums. If we knew that the times for our rows would always fall under a narrow range, we would increase the weights of the samples as time goes forwards and simply scale down the total (rather than lowering the weights of samples as time goes backwards from the most recent sample). This would be straightforward. The SQL would look something like this.

First, we need a function to turn our time based units into something on which we can do arithmetic:

CREATE FUNCTION toSeconds(t TIMESTAMP, offset TIMESTAMP)
RETURNS DECIMAL(12,3) CONTAINS SQL
RETURN CAST((t – offset) SECOND(9,3) AS DECIMAL(12,3));

Then, we need a function for calculating exponential weight:

CREATE FUNCTION expWeight (seconds DOUBLE, halfLife DOUBLE)
RETURNS DOUBLE CONTAINS SQL
RETURN EXP(seconds * (LN(2)/halfLife));

The complete SQL will then be:

SELECT rowtime, ticker,
++SUM(price * expWeight(toSeconds(rowtime, OFFSET), 10)) OVER w
++/ SUM(expWeight(toSeconds(rowtime, OFFSET), 10)) OVER w
++as avgPrice
FROM t
WINDOW w AS (PARTITION BY ticker
+++++++++++++ORDER BY rowtime
+++++++++++++RANGE INTERVAL ’30′ SECOND PRECEDING);

Unfortunately this would not work in practice as the expWeight function would overflow once the row times got more than a few halfLifes advanced from OFFSET. We can’t reset the offset as long as there are any non zero values still in the window. This gives us our out. What we can do is partition the incoming values into two windowed aggregates, always sending zeros to one of them and switching and reseting the offset when the window for that aggregate fills with zeros, this way we’re never summing values whose weights are calculated using different starting offsets.

Here’s the SQL for this. We’ll need some more functions describing when and how we reset our offsets. We’ll need an arbitrary reference for our time calculations:

CREATE FUNCTION toSeconds(t TIMESTAMP)
RETURNS DECIMAL(12,3) CONTAINS SQL
RETURN CAST((t – TIMESTAMP ’1970-01-01 00:00:00′) SECOND(9,3)
+++++++++++AS DECIMAL(12,3));

To use modulo arithmetic in SQL, convert to an integer:

CREATE FUNCTION toMillis(t TIMESTAMP)
RETURNS BIGINT CONTAINS SQL
RETURN CAST(toSeconds(t))*1000 aS BIGINT);

We’ll want to partition time into window sized epochs starting at our arbitrary time reference:

CREATE FUNCTION isEvenEpoch (t TIMESTAMP, windowSeconds INT)
RETURNS BOOLEAN CONTAINS SQL
RETURN MOD(toMillis(t),
++windowSeconds*2000)< windowSeconds*1000;

We’ll use the start of each epoch as the offset for all rows that fall in that epoch:

CREATE FUNCTION epochStart (t TIMESTAMP, windowSeconds INT)
RETURNS decimal(12,3) CONTAINS SQL
RETURN CAST(MOD(toMillis(t), windowSeconds*1000)
+++++++++++AS DECIMAL(12,3))/1000;

The following two functions are used to correct the inserting of zeros into a window and all the non zero values still in the window are from the previous epoch:

CREATE FUNCTION evenAgingFactor (t TIMESTAMP,
+++++++++++++windowSeconds INT, halflife DOUBLE)
RETURNS DOUBLE CONTAINS SQL
RETURN CASE WHEN isEvenEpoch(t, windowSeconds)
+++++++++++++THEN 1
+++++++++++++ELSE expWeight(-windowSeconds, halflife) END;
CREATE FUNCTION oddAgingFactor (t TIMESTAMP,
+++++++++++++windowSeconds INT, halflife DOUBLE)
RETURNS double CONTAINS SQL
RETURN CASE WHEN isEvenEpoch(t, windowSeconds)
+++++++++++++THEN expWeight(-windowSeconds, halflife)
+++++++++++++ELSE 1 END;

The complete SQL that uses the two window approach to calculate the decaying average using a 30 second window and a 10 second half life is:

SELECT rowtime, ticker,
+++(oddAgingFactor(rowtime, 30, 10) *
++++SUM(CASE WHEN isEvenEpoch(rowtime, 30)
++++++++THEN 0
++++++++ELSE price * expWeight(epochStart(rowtime, 30), 10) END) OVER w
+++++ evenAgingFactor(rowtime, 30, 10) *
++++SUM(CASE WHEN isEvenEpoch(rowtime, 30)
++++++++THEN price * expWeight(epochStart(rowtime, 30), 10)
++++++++ELSE 0 END) OVER w)
++++/
++++(oddAgingFactor(rowtime, 30, 10) *
++++SUM(CASE WHEN isEvenEpoch(rowtime, 30)
++++++++THEN 0
++++++++ELSE expWeight(epochStart(rowtime, 30), 10) END) OVER w
+++++ evenAgingFactor(rowtime, 30, 10) *
++++SUM(CASE WHEN isEvenEpoch(rowtime, 30)
++++++++THEN expWeight(epochStart(rowtime, 30), 10)
++++++++ELSE 0 END) OVER w)
as avgPrice
FROM t WINDOW w AS (PARTITION BY TICKER
++++++++++++++++++++ORDER BY rowtime
++++++++++++++++++++RANGE INTERVAL ’30′ SECOND PRECEDING);

To understand how this would work in practice lets assume we start our query at 12:00. We’ll only look at the numerator of our query as the denominator uses the same technique to calculate a weighted count.

Time Window A state Window B state
12:00:00 Empty Empty
12:00:00 – 12:00:30 Values with increasing weights being added. Zeros being added.
12:00:30 Has weighted sum * expWeight(30,10) Zero
12:00:30 – 12:01:00 Zeros being added. Will have older part of weighted sum. Values with increasing weights being added. Will have newer part of weighted sum.
12:01:00 Zero. All non zero values will have been aged out. We can safely reset our scaling factor. Has weighted sum * expWeight(30,10)

With care to apply appropriate scaling to the parts this SQL can be used to calculate a weighted average using standard windowed operations. However, I think you’ll agree, it’s easier with our EXP_AVG() operation.

Posted under Streaming SQL

Vice President Marketing
August 10, 2011

The latest product update of SQLstream, version 2.5.1, has just been released and shipped.  This will be the final 2.x release prior to the SQLstream 3 launch, and although SQLstream 2.5.1 is predominately a maintenance release, it does include a range of feature enhancements, including:

- Support for exponentially decaying averages in windowed aggregation functions

- Enhancements to the standard Log File and Socket Adapters

A number of our customers have already downloaded and upgraded to the new version, and across a range of industries, including those in transportation, sensor network management, e-Commerce and web analytics.

Click here if you’d like to register for SQLstream product downloads (note – registration is required), or would  like to know more about solutions for real-time analytics and data management.

But what of SQLstream 3?  Well, watch this space.  SQLstream 3 will take a major step forward, not just in SQLstream’s stream computing product capability, but also in the way stream computing solutions are built and deployed.  More on this over the coming months.

Posted under Uncategorized