Time Series Guide

IMPORTANT NOTE:

The time series features have been deprecated and will be replaced in a future release.

Overview

Vantiq provides support for operations on series data. Series are defined as sequences of data elements that occur over time. Examples include the track of a user’s position as they move from location to location within the store or the readings from a temperature sensor over time.

Vantiq assumes the series is presented to the system in real time with new records arriving continuously from the source of the data. Vantiq leverages this behavior to keep the series up-to-date and allow real time analysis.

Within Vantiq, a series is a set of time-based records that fall within a defined rolling window, usually bounded in time.

Dates and Intervals

Time is an important characteristic of a time series. Vantiq supports two time related abstractions:

Dates are implemented as java.time.Instant objects that represent time as seconds and nanoseconds relative to epoch, January 1st, 1970 00:00:00 GMT.

Dates can be accepted and converted into any of the following forms:

Conversion from a higher precision representation to a lower precision representation truncates higher precision.

Intervals can be specified in one of the following units:

Date and Interval Procedures

The following procedures support the manipulation of dates and/or intervals:

The following procedures are used to extract components of a date:

Note that all operations that require a timezone use GMT.

Series Definition

A series is a non-persistent (in-memory) collection of objects that is ordered in time. A series may also have computed properties representing aggregation operations on the series. Since the series is an in-memory construct, the data records within the series may be lost during a system restart. Typically, a series is used to process newly arriving data perhaps to filter and aggregating it into a more easily analyzed or more compact form. Once the data is prepared it is stored in a persistent set for long term storage or published to a downstream system.

Since it is typical that a single series may want to track a number of similar data streams, series supports the notion of a key. For example, a factory may have 10 identical machines all with voltage sensors. By defining a key, the system is able to store and process the data for all the machines in a single series.

Series Creation

A series is created using the CREATE SERIES statement. For example, the following statement creates a series named voltages that uses the sensorId as the key property, the timestamp property to define the time window, and computes the average voltage in the avgVoltage computed aggregation property. In this case, the window is defined to be no more than the last 5 records received.

CREATE SERIES voltages (
    sensorId   String,
    timestamp  DateTime,
    voltage    Real,
    avgVoltage Real = avg(voltage)
) WITH
   window:    "timestamp",
   maxCount:  5,
   groupBy:   "sensorId"

The available aggregation functions are:

Note that the sum, avg, rise, min, and max functions support only Integer, Real, and DateTime data types.

Record Insertion

The INSERT INTO SERIES statement to add a new record to the series. For example, the following statement adds the record that is required. From the above definition, the required properties for the series record are sensorId, timestamp, and voltage. An example message is:

{
    sensorId:  "Machine ABC",
    timestamp: "2016-03-02T12:34:23.432Z",
    voltage:   2.34
}

The following statement inserts a record into the series from the message m:

INSERT INTO SERIES voltages (
    sensorId:  m.sensorId,
    timestamp: m.timestamp,
    voltage:   m.voltage
)

On insert into the series, the series is updated to ensure that the records in the series reflects the series definition. For example, in the above voltages series, if the series contained 5 records and a record was inserted, then the series prunes the oldest record to maintain a maximum of 5 records.

After pruning, the defined aggregate properties in the series are recalculated so that the aggregates are available immediately after the insert statement.

Series Data

To retrieve aggregate data from the series, issue a SELECT operation the series, e.g.:

SELECT FROM SERIES voltages

This will produce a stream of objects, each of which will contain the group by property and any aggregate properties defined for the series. There will be one record for each unique value of the group by property that has been recorded, e.g.:

{
    sensorId: "Machine ABC",
    avgVoltage: 2.11
}

To filter the results of a SELECT on a SERIES, use the WHERE clause to specify conditions using the group by property or any of the aggregate properties. Here’s an example using the Voltages series:

SELECT FROM SERIES voltages WHERE sensorId == "MySpecificId"

which would result in the retrieval of only 1 record, the voltages record with sensorId equal to “MySpecificId”, or:

SELECT FROM SERIES voltages WHERE voltage > 2.0

which would result in the retrieval of all voltages records grouped by sensorId, filtered to only include records with an aggregate voltage > 2.0.

SERIES types also support SELECT ONE queries, which will either produce a single record or an error if no records or more than 1 matching records are found. Here’s an example use the Voltages series:

SELECT ONE FROM SERIES voltags WHERE sensorId == "MySpecificId"

which would result in the retrieval of exactly 1 record, the voltages record with sensorId equal to “MySpecificId”, or produce an error if no voltages record has sensorId “MySpecificId”.

Time Series Example

This section describes a simple but complete example of processing readings from a sensor.

The sensor is assumed to produce readings of voltage between 0 and 5 volts and, nominally, produces one reading every second. The arriving sensor reading is placed on an MQTT queue represented as a JSON object in the following format:

{ 
    "id": <sensorId>, 
    "ts": <epochMilliseconds>, 
    "vl": <voltage> 
}

Vantiq is tasked with reading these values and creating a series that consists of the last 5 readings. When a new reading arrives the average of the last 5 values is produced as the new reading. In other words, we are computing a very simple moving average.

Finally, if the moving average ever exceeds 2.0 volts, produce a notification. For the purposes of this example the notification is delivered to a service. In a real system, the notification could be delivered to an application or a user via SMS, email or some other notification system.

Type Definition

The first step in the process is to define a type that will hold the current reading from the sensor. It is not essential to create a type but it is conveinent to keep a persistent copy of the current state of the sensor for use by external applications. The type definition, in JSON format, is as follows:

{ 
  "name": "Sensor",
  "properties": {
    "sensorId"   : { "type": "String"  },
    "timestamp"  : { "type": "Integer" },
    "voltage"    : { "type": "Real"    },
    "avgVoltage" : { "type": "Real"    }
  },
  "naturalKey": [ "sensorId" ]
}

This definition is in a form that can be loaded directly to Vantiq either via the Vantiq CLI or the developer console.

Source Definition

The source is the channel through which data is ingested into Vantiq. The source must be defined to obtain the voltage readings from MQTT, a commonly used connectivity protocol that is directly supported by Vantiq.

In our example, we define a source that connects to an MQTT server running locally:

{
  "name":      "sensorMqtt",
  "type":      "MQTT",
  "direction": "SOURCE",
  "config": {
      "serverURIs": [ "tcp://localhost:1883" ],
      "topics":     [ "com.accessg2.mqtt.sensor.rdg" ]
  }
}

This definition is in a form that can be loaded directly to Vantiq either via the Vantiq CLI or the developer console. The serverURIs and the topic property will need to be adjusted to reflect the address of your MQTT server and the topic on which you elect to publish the voltage readings.

Series Definition

A series must be defined that stores the last 5 records. This can be accomplished by defining a setup() procedure that can be executed to define the series:

PROCEDURE setup()

//
// Create a series that stores the last 5 records
//
CREATE SERIES voltages (
    sensorId   String,
    timestamp  DateTime,
    voltage    Real,
    avgVoltage Real = avg(voltage)
) WITH
   window:    "timestamp",
   maxCount:  5,
   groupBy:   "sensorId"

RETURN { status: "success" }

This procedure can be loaded through the Vantiq CLI or directly within the developer console. To execute the procedure, the most efficient method is to click the “play” button in the developer console.

Rule Definition

A rule is defined to process the voltage readings. In the following, we provide detailed comments to explain the processing flow for the inbound data.

RULE voltageReading
WHEN MESSAGE ARRIVES FROM "sensorMqtt" AS message

//
// The message arrives (on an MQTT topic) with three JSON properties:
//
//      id - the sensor id as a String
//      ts - the timestamp as epoch time in milliseconds
//      vl - the sensor reading as a real value between 0 and 5 (the voltage)
//
// During startup conditions the series may not be fully populated.
// The series computes the average across the records available in
// the series.
//

//
// Insert the newly arrived record into the voltages series.
// The series holds 5 records.  When a sixth record is added the oldest
// record is removed from the series leaving a series that contains 
// the most recent five records.
//
INSERT INTO SERIES voltages (
    sensorId:  message.id,
    timestamp: message.ts,
    voltage:   message.vl
)

//
// Upon insert into the series, all the defined aggregation operations
// are recalculated.  In this case, the avgVoltage now has the 
// average including the newly inserted record.
//
// To access the series aggregates, use the following syntax:
//
//   SELECT FROM SERIES voltages
//   for (v in voltages) {
//      ... process record ...
//   }
//
// Here we look for the record associated with our sensor
//
var avgVoltage = 0.0
SELECT FROM SERIES voltages
for (v in voltages UNTIL avgVoltage > 0)  {
    if (v.sensorId == message.id) {
        avgVoltage = v.avgVoltage
    }
}
// Alternatively we could specify a where clause to filter to the voltages
// records containing sensorId == message.id and avgVoltage > 0 using the 
// following syntax:
//
//   SELECT FROM SERIES voltages 
//   WHERE sensorId == message.id AND avgVoltage > 0
//   for (v in voltages) {
//      ... process records with sensorId == messageId AND avgVoltage > 0...    
//   }
//
// Because we know only one voltages record should have sensorId == message.id
// we can do this query as a SELECT ONE using the following syntax
// 
//   SELECT ONE FROM SERIES voltages WHERE sensorId == message.id 
//
// which will guaranteee the result is either a single record or an exception.
//
// Update the persistent state of the sensor.  If there is
// already a record, update that record.  Otherwise, create a
// new record.  In other words, we perform an upsert.
//
SELECT UNIQUE* Sensor:s WHERE sensorId == message.id
if(s == null) {
    INSERT Sensor(sensorId   : message.id, 
                  timestamp  : message.ts,
                  voltage    : message.vl, 
                  avgVoltage : avgVoltage)
} else {
    UPDATE Sensor(timestamp  : message.ts, 
                  voltage    : message.vl, 
                  avgVoltage : avgVoltage)
     WHERE sensorId == message.id
}

Identifying Situations

At this point, Vantiq is now ingesting the sensor data and computing the moving average over the voltage readings. The final step is to produce a notification whenever the moving average exceeds 2.0 volts. To do this, we define a rule to identify this situation.

RULE voltageTooHigh
WHEN UPDATE OCCURS ON Sensor AS s

//
// Define threshold in Volts
//
THRESHOLD = 2.0

//
// Simple if condition to test against the threshold
//
if(s.avgVoltage > THRESHOLD) {

    //
    // Use PUBLISH to trigger a notification through 
    // an outbound source named "alertService"
    //
    PUBLISH { body: { sensor: Sensor.sensorId, 
                     message: "voltage for last five readings > 2.0" }} 
    TO SOURCE alertService
}

This very simple rule is triggered each time the avgVoltage is updated for a Sensor. The rule then checks to see if the new average is exceeds 2.0 volts. If the value does exceed 2.0 volts a notification is sent to the REST source named alertService. The notification is delivered as a JSON object containing the sensorId for the sensor that has exceeded the voltage limit and a message identifying the specific condition detected.

Details for defining a REST service as a source can be found in the Remote Source Integration document.