The specification is intended for discussion within the RDF Stream Processing Community Group. Its content does not yet represent the consensus of the Community Group.

This specification is incomplete.

Introduction

Data streams are one of the main sources of information in a wide range of domains and applications, and it is required to make these streams available at Web scale. For this to become a reality, we need to define Web standards and guidelines on how to produce and consume structured data streams. RDF ([[!RDF11-primer]]) is a W3C recommendation for structuring and representing data on the Web. However, this model is based on a traditional persisted-data paradigm, where the focus is on maintaining a bounded set of data elements in a knowledge base. This paradigm does not fit the case of data streams, where data elements flow continuously over time, forming unbounded sequences of data.

In this context, the W3C RSP Community Group has taken the task to explore the existing technical and theoretical proposals that incorporate streams to the RDF model and to its query language, SPARQL [[!SPARQL11-OVERVIEW]]. More concretely, one of the main goals of the RSP Group is to define a common but extensible core model for RDF stream processing. It is not the intention of the RSP Group to propose a one-size-fits-all model, which is probably infeasible, but rather a common ground that can serve as a starting point for RSP engines to be able to interoperate.

This document proposes a set of requirements for RDF stream processing and design principles for an RDF stream data model and query language. The remainder of the document is structured in three parts. First we state the key requirements for the RSP model in Section 2. Second we provide a proposal of design principles for a data model that extends RDF to support streams in Section 3. Finally Section 4 describes the design principles for the SPARQL query feature extensions to comprehend the RDF Stream model.

Requirements

The requirements for RDF stream processing summarized below are the result of the RSP Group on existing use cases [[RSP-USECASES]], as well as the needs gathered by analyzing existing works of the state of the art. Many of them are also valid for non-RDF systems (e.g., see [[STONEBRAKER2005]]) while others come as the result of our individual or joint work on extending RDF for stream processing. The requirements are stated at a high level, and we do not fully detail them as in the use cases. The goal is to allow the reader to understand the challenges that RSP systems should be able to address.

Functional Requirements

Abstract Model

  1. RDF streams should be representable in an abstract model and the semantics of this abstract model should provide the basis of the results of RSP queries.
  2. The RDF stream abstract model should be serialized in concrete formats derived from standard formats extending beyond the standard format only when necessary.
  3. An RDF stream should be identifiable with an IRI.
  4. RDF streams may have timestamps based on different notions of time (time instants, intervals) with different semantics (application, validity, transactional).
  5. In case no timestamp is associated to an RDF stream data element, the system is responsible of managing time-based ordering of stream elements.
  6. The RDF stream model should be susceptible to be restricted in order to facilitate implementation and support efficient representation.

Query Language

  1. RSP queries should support combining multiple RDF streams as well as stored RDF (aka static RDF graphs or datasets).
  2. RSP queries should support the minimal set of features identified in this document.
  3. The semantics of the RSP data and query model should allow generating predictable results, so that correctness of the RSP query process can be evaluated.
  4. RSP queries should be able to access all knowledge explicitly expressed in the stream, including names of named graphs and triples containing such names.
  5. RSP queries should be able to refer to the named graphs of streams.

In addition to these general requirements, we define a set of mandatory features for RSPs that implement query processing and a set of optional features that they may support.

Mandatory Query Processing Features

  1. RSPs should be able to query streaming data from one or several RDF streams.
  2. RSPs should be able to query data from RDF graphs combined with RDF streams.
  3. RSPs should support SELECT and CONSTRUCT queries.
  4. RSPs should support defining one or more time windows over an RDF stream.
  5. RSPs should support all SPARQL 1.1 [[!SPARQL11-Query]] operators.
  6. RSPs should support nesting RSP queries.

Optional Query Processing Features

  1. RSPs may support defining count-based windows over RDF streams.
  2. RSPs may support RSTREAM, ISTREAM and DSTREAM operators [[ARASU2006]].
  3. RSPs may support sequence operators (a pattern followed by another pattern).
  4. RSPs may support the WITHIN operator (evaluate if a pattern occurs within an interval).
  5. RSPs may support combinations of sequence operators and window operators.
  6. RSPs may support accessing timestamp information of the RDF stream within a query.

RSP Engines

  1. RSPs should be capable of processing streams of data reactively.
  2. RSP query engines should support a declarative query language derived from (and compatible with) SPARQL, extended with operators that can consume and produce RDF streams.
  3. RSP engines should be able to query a portion of the knowledge expressed in an RDF stream.
  4. RSP engines should support defining a query that gets continuously executed over the RDF streams, producing continuous results.

Non-Functional Requirements

  1. The RSP data and query model should be compatible with RDF (e.g., reads it is as part of the dataset) and SPARQL 1.1 [[!SPARQL11-Query]] (e.g., uses the same operators and query forms).
  2. The RDF stream model should be extensible, so to allow different types of time notions (e.g., application time) using standard vocabularies.
  3. The RSP query model should allow extensions for specific operators beyond those described in this document (e.g., other types of windows, CEP derived operators).
  4. The expression of common query constructs should feel intuitive.

Requirements related to the performance, throughput, efficiency, availability of the system concern the engines that implement this model and not the model itself.

The use cases could be mapped to these requirements.

Out of Scope

  1. Record a stream.

Other requirements to be added as out of scope. Issue

RSP Data Model: Design Principles

The data model for representing RDF streams considers unbounded sequences of data elements that flow over time. More specifically, these data elements are RDF triples, assembled in graphs, as an event or observation typically requires more than one triple to be represented. Each of these graphs in the sequence respects a particular order given by time annotations which can be explicit or implicit. The main considerations for this abstract model are detailed below, first describing the time model, and then the RDF stream model itself.

Time in Stream Data Elements

The notion of time in an RDF stream is important as it establishes an order among the data elements. Time annotations (in some cases called timestamps) can be defined in a totally ordered domain with a distance metric. Time can be represented as:

A convenient time model in this context is the Time Ontology in OWL [[!OWL-TIME]], which defines Instant and Interval as subclasses of TemporalEntity.

Within the time model we do not specify mechanisms for dealing with delayed and out of order triples.

RDF Stream

An RDF stream is a sequence of RDF graphs, including associated metadata, as a flexible mechanism to add time-related metadata. The RSP group identified the following types of time metadata (although we do not exclude others):

In many cases the arrival order of the RDF graphs suffices as an “implicit timestamp” attached to each RDF graph. In such cases the timestamp can be annotated as a system timestamp. In any case, for the abstract model the RDF stream elements appear to be timestamped, even if this is done at the system level on arrival of data elements.

An RDF stream processor may attach additional metadata, e.g., describing the time at which it received the data element. Producers of RDF streams can attach the production time to the RDF graphs they stream. If the RDF graph represents information with a validity interval, the producer can also attach metadata to describe the start and end time of the validity interval.

An RDF stream should be defined under the following considerations:

A key difference wrt previous RDF stream models is the support of streaming graphs instead of triples. This allows to structure more complex events in a stream, as opposed to plain triples. For example, an observation typically requires several triples to be described (e.g., observed property, value, time, observer). Another example is related to the flexibility in timestamping (one vs. two timestamps or application time vs. system time), which is only possible if timestamps can be attached to an event structure. That cannot be achieved with plain triples.

Running Example: In the following, we use the following example based in social sensing: i.e., people detected in rooms over time. URL of the stream: . Or ex:social if we set the prefix ex: as http://example.org/streams/. Sample data on the stream:


:g1 {:axel :isIn :RedRoom. :darko :isIn :RedRoom} {:g1 prov:generatedAtTime t1}
:g2 {:axel :isIn :BlueRoom. }                     {:g2 prov:generatedAtTime t2}
:g3 {:minh :isIn :RedRoom. }                      {:g3 prov:generatedAtTime t3}
...
        

:g1 {:axel :isIn :RedRoom. :darko :isIn :RedRoom} {:g1 :atInterval 2016-03-01T13:00:00Z/2016-03-01T13:00:10Z}
:g2 {:axel :isIn :BlueRoom. }                     {:g2 :atInterval 2016-03-01T13:00:20Z/2016-03-01T13:00:40Z}
:g3 {:minh :isIn :RedRoom. }                      {:g3 :atInterval 2016-03-01T13:00:50Z/2016-03-01T13:00:60Z}
...
        

Considerations

Punctuation

A punctuation is a pattern p inserted into the data stream with the meaning that no data element i matching p will occur further on in the stream [[TUCKER2003]] [[MAIER2005]]. For streams of RDF graphs this can be used like this: A punctuation is a pattern p inserted into the graph stream with the meaning that no triples i from graph p will occur further on in the stream. In order to implement punctuation, special triples (ones that indicate punctuation, e.g., <:g rsp:punctuate rsp:now>) could be employed. However, as we use the Web stack we can do punctuation out-of-band, i.e., by doing punctuation on a lower layer of the stack. For example we can communicate through chunked transfer encoding ([[RFC2616]], Section 3.6.1) from HTTP 1.1 [[HTTP11]]. Each chunk contains a complete graph and the receiver will know that after a chunk is received the event is completely received and can be processed further in an atomic fashion. There is a guarantee that no triples for this graph will arrive later. Using HTTP chunked connections no special triples are needed.

Immutability and Event Derivation

In many event processing systems [[CUGOLA2012]], events are immutable [[LUCKHAM2011]]. This stems from the definition of what an event is: An event is "an occurrence within a particular system or domain; it is something that has happened, or is contemplated as having happened" [[ETZION2010]]. So events cannot be made to unhappen.

Open Question: Does this apply to all systems/applications/usecases or just to many as stated above?

Adopting immutability as a general assumption can be very useful for building systems, as it allows more control over the correctness and predictability of the system, especially in a distributed environment and when consistency is at stake in a concurrent processing setting. But then, how can a stream processing agent process events if they are immutable? Every processing task produces new derived events as results, and as an advantage the underived events are still available for other uses and remain immutable. For RSP this means: (1) create a new (unique) graph for the derived event and (2) possibly link back to the base event(s) thus enabling drill-down or root cause / provenance analysis of the derived event. The links can be made with DUL:hasConstituent from DOLCE Ultralight [[DUL]], or with sub-properties such as in [[HARTH2011]].

RDF Stream Query Language: Design Principles

RDF stream query processing languages have been proposed in the past, and some of the features described in this section have been presented in one or more of these languages. In this section we selected the most prominent query language features, which we consider would be essential or at least important in a standard RSP query language. When applicable, we compare with existing query languages, and if necessary, we propose new query constructs or combinations, if they were not supported in existing languages.

Input

The input for an RSP query may include streams and stored data. In SPARQL, the query input is encapsulated on a dataset that declares a default and named graphs. Based on this assumption, an RSP query is posed against an extended dataset, composed of (potentially):

For stored RDF graphs the declaration of named and unnamed graphs should follow the SPARQL specification of a dataset. For declaring streams a similar mechanism can be applied. For example in C-SPARQL this has been achieved with a similar declaration via the FROM clause:

As a consequence, windowed streams in this case are merged, so that the result can be accessed in the query body. Alternatively, in CQELS streams can be accessed individually inside the query body, therefore not requiring a merge previously:

The proposed approach should consider the declaration of streams (and window declarations), and the individual usage of windows inside the query body. We will analyze the window definitions later, but we already show how streams can be declared in the FROM clause and, thanks to a window identifier, they can be used in the query body:


FROM NAMED WINDOW :win ON ex:social [… window spec …] 
        

Notice that in a single query one may issue different windows for the same stream (e.g., with different window parameters, such as size or slide), on multiple streams, or combined with RDF graphs. The following example shows a declaration of multiple windows on several streams, combined with standard named graph declarations. We omit the window specifications, as they will be covered later on.


FROM NAMED WINDOW :win1 ON ex:social  [RANGE PT10M] 
FROM NAMED WINDOW :win2 ON ex:social  [RANGE PT1M] 
FROM NAMED WINDOW :win3 ON ex:sensors [RANGE PT1M] 
FROM NAMED :people  
        

Output

Depending on the Query form (e.g., SELECT, CONSTRUCT), the output can be of a different form. Also, depending if the results are returned as a stream or as finite sets, the output is of a different type. In summary, an RSP query answer could have the following types of output:

How are the timestamps of the graphs defined in the query results?

Results of a query could be part of a subscription, allowing the query client to obtain results as soon as they are available, or following subscription rules. These aspects are related to the architecture of an RSP engine, and are not part of this document.

RDF Stream Operators: Overview

Following the model as proposed in several previous works, we consider three main classes of operators over RDF streams and RDF graphs (for more information refer also to [[SR4LD2014]]):

In addition to these operators, we may consider S2S operators that take and produce a stream (e.g., stream filtering).

In these RSP operators the R denotes finite RDF graphs or mappings, as opposed to unbounded sequences of RDF graphs, i.e., streams. In addition to those operators (which can be thought of as part of a RSP Data Manipulation Language (DML) in SQL terms), there is also the need for a Data Definition Language (DDL) to register a stream, register continuous queries, etc. Of all known RSP languages, only C-SPARQL has DDL primitives, but they are limited to query registration, e.g., REGISTER (QUERY|STREAM) name AS.

In the case of R2R operators, existing SPARQL 1.1 [[SPARQL11-Query]] operators are already covered by the SPARQL specification and semantics, and should be supported in an RSP query language (semantics for RDF streams may need adaptations).

S2S Operators

While S2S operators can be seen as a combination of S2R and R2S, it can be argued that for some tasks, e.g., stream filtering or others, it is not necessarily desired that S2R operators must be used. Most window-based RSP systems do not support this type of operations, and therefore it is not technically possible to define this type of queries in these systems. Also, the lack of use of S2R operators in this scenario may lead to scalability issues as the number of stream elements to be considered is never limited. However, in cases where neither joins nor aggregations are performed, e.g., simple graph pattern matching and filtering, when past stream elements can be forgotten, this type of query can be of use.

SELECT

The projection is performed using the SELECT clause that includes the variable names in the query body that are requested in the results. As mentioned before, existing window-based RSP languages can only operate over windows and not directly on the streams. For instance, if we simply want to output who is in which room, following the previous example, this can be a achieved in CQELS by setting up a time window with any (reasonably small) window size. In this case the window size does not have any impact on the results, as in CQELS the output is provided as soon as there is a match (also known as a content change window policy).


SELECT ?room ?person
WHERE {
  STREAM ex:social [RANGE 1m]  {?person :isIn ?room}
}
          

In RSP-QL we could be able to omit the window specification altogether.


SELECT ?room ?person
FROM NAMED WINDOW :win ON ex:social
WHERE {
  WINDOW :win {?person :isIn ?room}
}
          

FILTER

The filter operator is a selection as in standard SPARQL. In this case, as it is in the context of a S2S operation, it follows the same considerations as the previous case. For illustrative purposes we show an example of how such type of query could be specified.


SELECT ?room
FROM NAMED WINDOW :win ON ex:social
WHERE {
  WINDOW :win {?person :isIn ?room. FILTER(?person = :axel) }
}          
// Or alternatively this filter can be done simply as a BGP:
SELECT ?room
FROM NAMED WINDOW :win ON ex:social
WHERE {
  WINDOW :win {:axel :isIn ?room.}
}          
           

Aggregations

Some aggregation functions can also be applied directly over the stream without the need for an S2S operation, e.g., MAX, MIN, SUM.

S2R Operators

Several types of windows exist [[CHAKRAVARTHY2009]]. Those illustrated below are a subset of interest to the RSP community.

Time based sliding window

A time-based sliding windows is defined by two parameters: the window size and the window slide. The size is specified in terms of a time duration (e.g., 1 minute, 30 seconds, 1.3 days) and indicates that the window content, at time t will contain only those elements whose timestamps are greater than t-size. The slide parameter, specified as a time duration, indicates how often the window will be computed, or *slide* over time.

Additional case: About supporting windows that are not terminated by the current time, e.g., something like a window from 10 minutes in the past until 5 minutes in the past? Issue

As an example, consider a query that obtains the rooms where Axel has been in the last 10 minutes, updating results every minute:


REGISTER QUERY TrackAxelSliding AS
SELECT ?room 
FROM STREAM ex:social [RANGE 10m SLIDE 1m]
WHERE { 
 :axel :isIn ?room
}
          

The same query can be specified in CQELS.


SELECT ?room
WHERE { 
  STREAM ex:social [RANGE 10m SLIDE 1m] 
  {:axel :isIn ?room}
}
          

The results will be different depending on the RSP Engine that is used. For example in C-SPARQL the evaluation of the window is performed each time the window closes. CQELS does it when the window content needs to be changed, i.e., in the event of a new update from incoming streams or the expiration of an element in a window. For a more detailed description of these differences refer to [[DELLAGLIO2013]].

In RSP-QL, this query would be written in a similar way,


SELECT ?room
FROM NAMED WINDOW :win ON ex:social [RANGE PT10M SLIDE PT01M]
WHERE {
  WINDOW :win { :axel :isIn ?room }
}
          

Notice that the window size and slide follow the [[ISO8601]] standard for specifying durations. As the window is declared at the beginning of the query, and it has an identifier (:win in this example), it can be reused as many times as necessary in the query body.

Time-based tumbling window

This is a special case of the general sliding window, when the size of the slide is equal to the window length. For example: Give me the room where Axel has been in the last 10 minutes, updating results every 10 minutes.


SELECT ?room
WHERE { 
  STREAM ex:social [RANGE 10m TUMBLING] {:axel :isIn ?room}
}
          

In RSP-QL we can use this syntax sugar or even omit the slide and default it to a tumbling window on that case.

Triple-based windows

In principle, triple-based windows were defined to emulate tuple count windows in CQL-like data stream systems. For example this query returns the last five entries of a price stream in CQL:


Select P.price From Prices[Rows 5] as P
          

For our example, we can get the last three people detected in the stream with the following C-SPARQL query.


REGISTER QUERY Track3latest AS
SELECT ?who 
FROM STREAM ex:social [RANGE TRIPLES 3]
WHERE {
 ?who :isIn ?room
}
          

In this example it works because each triple in the stream is an event on itself. However, in the general case one event requires several triples in the stream to be fully described. For example consider a stream containing observations as the following:


:obs1 a ssn:Observation;
      ssn:observedBy :sensor1;
      ssn:observedProperty :peopleInRoom;
      ssn:observedValue :axel;
      dul:locatedIn :redRoom.
          

If we ask for the latest three triples in the stream, we will not obtain the latest three observations, but only three triples, each one with incomplete information. For this reason, C-SPARQL triple-based windows are not always useful, although they work as specified. Other languages such as SPARQLStream do not include support for this altogether, as it has a demonstrated faulty behavior. In the RSP model discussed in this W3C Community Group, as we allow representing RDF streams as sequences of graphs, rather than just triples, it should be possible to redefine this operator, with cleaner semantics.

Count-based window based on Basic Graph Pattern

This BGP count-based window is introduced to overcome the above limitation of triple-based window on RDF streams defined as sequences of graphs. Instead of counting single triples, this count-based window will count the groups of triples(subgraphs) that match a certain basic graph pattern. For example, the above pattern can be used to query the last three observations (filtered by observedValue :axel) as events composed from a set of triples.


SELECT ?obs ?room 
FROM NAMED WINDOW :win ON ex:social [RANGE BGP 3]
WHERE {
 ?obs a ssn:Observation;
      ssn:observedBy ?sensor;
      ssn:observedProperty :peopleInRoom;
      ssn:observedValue :axel;
      due:locatedIn ?room.
}
          

The keyword TRIPLES above can be used instead of BGP as it also covers above triple window.

Partitioned Windows (not supported)

Deal with one input stream and several output streams (i.e., the partitions), over which the query is evaluated. Partitioned windows are based on knowing the schema, and deciding how to do the partition in a way that simplifies the query. This makes it complicated for RSP. Example queries (partitioned): In the examples below, we can partition the input stream by dividing it by people's name and then evaluate the query. (i) Find for each person the time spent until she leaves a room and enters another (time spent in a room). (ii) Find for each person the time elapsed between a person leaves room A and enters room B, independently of how many rooms are traversed in between (transition time).

Predicate-based window (not supported)

In Predicate-based windows, objects are qualified to be part of the window once they satisfy a certain query predicate. Similarly, objects expire only when they no longer satisfy a certain predicate. Predicate-based windows are a generalization of time-based and tuple-count sliding windows, and it needs some sort of caching mechanisms to be implemented.

Example queries: For each person, continuously report the elapsed time between each two consecutive readings. Only the latest reading of each person needs to be considered. Once the reading of a person is reported, the previous reading expires. In C-SPARQL we could in principle simulate predicate-based windows by using a network of queries.


REGISTER STREAM :s1 AS
CONSTRUCT { :s1 :matches ?who } 
FROM STREAM ex:social [RANGE ? STEP ?]
WHERE {
 ?who :isIn ?room
}

REGISTER QUERY :s2 AS
SELECT ?who 
FROM STREAM :s1 [RANGE 3 TRIPLES]
WHERE {
 :s1 :matches ?who
}
          

It is a first sketch and it may not work as the CQELS one.

In the general case, CQELS also needs nested queries to present such queries. However, in CQELS, there is a syntax for specifying last mappings that matched a certain basic graph pattern (using the BGP keyword). For example the query that retrieves the last three persons in the room can be represented via the following query.


SELECT ?who 
WHERE {
 STREAM ex:social [RANGE BGP 3] {?who :isIn ?room}
}
          

R2R Operators

The R2R operators are inherited by SPARQL and they should follow the semantics of [[SPARQL11-Query]]. We do not describe them all in this section but we focus on the most important ones, especially if there are specific details of interest for the RSP community.

SELECT

We again use the query that returns the rooms and persons every minute.


SELECT ?room ?person
FROM NAMED WINDOW :win ON ex:social [RANGE PT1M SLIDE PT1M]
WHERE {
  WINDOW :win { ?person :isIn ?room }
}   
          

GROUPBY

Aggregates are important operators in RSP and again we follow the SPARQL 1.1 aggregate semantics. The following example query illustrates its use: Find a person who has been to more than 5 different rooms during the past 5 minutes:


REGISTER QUERY FastAndFurious AS
SELECT ?person (count(distinct ?room) as ?rooms)
FROM STREAM  [RANGE 5m TUMBLING] 
WHERE {?person :isIn ?room}
GROUPBY ?person
HAVING  (?rooms >= 5)
          

SELECT ?person (count(distinct ?room) as ?rooms)
FROM NAMED WINDOW :win ON ex:social [RANGE PT5M]
WHERE { 
  WINDOW :win {?person :isIn ?room}
}
GROUPBY ?person
HAVING  (?rooms >= 5)
          

In a similar way, other aggregates such as SUM, COUNT, MIN, MAX can also be supported.

CONSTRUCT

The construct query allows to create new graphs based on que query bindings as in SPARQL 1.1. As an example query consider the following: Detect two different people in the same room for at least three seconds and create a graph of co-presence.


CONSTRUCT {?p1 :isWith ?p2}
WHERE {
  STREAM  [RANGE 3s SLIDE 1s]
  { ?p1 isIn ?room. ?p2 isIn ?room. FILTER(?p1!=?p2)}
}
          

An RSP-QL query would follow a similar approach:


CONSTRUCT {?p1 :isWith ?p2}
FROM NAMED WINDOW :win ON ex:social [RANGE PT3S SLIDE PT1S]
WHERE {
  WINDOW :win { ?p1 isIn ?room. ?p2 isIn ?room. FILTER(?p1!=?p2)}
}
          

However, this query does not ensure that both people are present in the window for the entire three seconds. In EP-SPARQL, there is a getDURATION() function that could be used for this type of query:


CONSTRUCT {?p1 :isWith ?p2 .}
WHERE     {STREAM  { ?p1 isIn ?room. } AND {?p2 isIn ?room. }}
FILTER    (?p1!=?p2 && getDURATION() < "PT3S"^^xsd:duration)
          

OPTIONAL

The Optional pattern matching provides the following feature: "if the optional part does not match, it creates no bindings but does not eliminate the solution" [[SPARQL11-Query]]. As an example consider que following query: detect persons who have entered either room1 or room2 in the past five minutes.


REGISTER QUERY OptionalQuery AS
SELECT ?person1 ?person2
FROM STREAM  [RANGE 5m TUMBLING] 
WHERE { 
    OPTIONAL {?person1 :isIn :room1} 
    OPTIONAL {?person2 :isIn :room2} 
    FILTER(bound(?person1) || bound(?person2))
}
          

In CQELS this is not supported, but it could work in two ways: first as in the previous example:


SELECT ?person1 ?person2
WHERE { 
STREAM  [RANGE 5M] { 
    OPTIONAL {?person1 :isIn :room1} 
    OPTIONAL {?person2 :isIn :room2} 
    FILTER(bound(?person1) || bound(?person2))
}}
          

Alternatively, using two individual sensor streams for room1 and room2 as follows, by using bound filters.


SELECT ?person1 ?person2
WHERE { 
   OPTIONAL { STREAM [RANGE 5M] {?person1 :isIn :room1}} 
   OPTIONAL { STREAM [RANGE 5M] {?person2 :isIn :room2}} 
   FILTER(bound(?person1) || bound(?person2))
}
          

FILTER MINUS

While the support of this operator is uneven in RSP engines, it is important for use cases where a certain type of negation is needed. More specifically, it "calculates solutions in the left-hand side that are not compatible with the solutions on the right-hand side" [[SPARQL11-Query]]. As an example, consider the following query: detect persons who entered room1 without a doctorate during the past five minutes.


SELECT ?person 
FROM STREAM  [RANGE 5m TUMBLING]
FROM NAMED 
WHERE {
  { { ?person :isIn :room1 }
    FILTER MINUS 
    {GRAPH  {?person :hasDegree :doctorate}
  }
}
          

It may parse, but it may not provide correct results. The problem is that all triples from the windows are merged in the default graph.Issue

In CQELS this is not supported although it is planned for future releases:


SELECT ?person 
WHERE {
  { STREAM [RANGE 5m] { ?person :isIn :room1}
    MINUS 
    {GRAPH  {?person :hasDegree :doctorate}
  }
}
          

What happens if the static part changes during the lifetime of the query?

R2S Operators

R2S operators produce a stream out of bounded RDF mappings. This operator is typically used as an output operator, producing a stream after the R2R and S2R operators have been applied. Of the existing RSP engines, only SPARQLStream has provided an implementation for these operators, namely:


SELECT ISTREAM ?room ?person
FROM NAMED STREAM ex:social [1 MINUTES SLIDE 1 MINUTES] 
WHERE {
 ?person :isIn ?room .
}
        

In this case the room and person will be output only if they were not in the previous window. Notice that implicitly C-SPARQL always RSTREAMs results and CQELS always ISTREAMs results, but does not allow for customizing the behavior, as reported in [[DELLAGLIO2013]]. In RSP-QL this should be explicit as in SPARQL-Stream:


SELECT ISTREAM ?room ?person
FROM NAMED WINDOW :win ON ex:social [RANGE PT1M SLIDE PT1M]
WHERE {
 WINDOW :win {?person :isIn ?room }
}
        

Time-aware Operators

These operators come from the Complex Event Processing community, especially those that exploit temporal relationships between events or stream elements. While these operators have been studied in RSP in works such as EP-SPARQL, it is still subject of research, especially when combining it with window-based approaches. Nevertheless, we present some of the most relevant operators of this kind, and how they could potentially be integrated with an RSP query language.

SEQ

The sequence operator (SEQ) specifies that the left hand event expression must be matched, and then the right hand event expression is evaluated. This type of expression is useful to find out when some event is followed by another event. As an example query, consider the following: Provide pairs of rooms, e.g., (room1, room2), where Axel and Darko have been together such that they are first in a room and then following each other in another room within 5 minutes.

In C-SPARQL this type of query is partially supported, through the timestamp function that returns the timestamp of a triple pattern. By comparing these timestamps, the result can be obtained, although the query results are a bit cumbersome.


REGISTER QUERY Seqence AS
SELECT DISTINCT ?room_x ?room_y
FROM STREAM  [RANGE 5m SLIDE 1s]
WHERE { 
GRAPH   { ?room_x lv:sameLevelWith ?room_y.}
 :axel :isIn ?room_x. 
 :darko :isIn ?room_x. 
 :axel :isIn ?room_y. 
 :darko :isIn ?room_y.
 FILTER (timestamp(:axel :isIn ?room_x) = timestamp(:darko :isIn ?room_x) && 
         timestamp(:axel :isIn ?room_y) = timestamp(:darko :isIn ?room_y) &&  
         timestamp(:axel :isIn ?room_x) < timestamp(:darko :isIn ?room_y) 
         )
} 
          

It may not be reactive

In CQELS this is not supported although it has been planned in a hybrid CQELS-CEP version as follows:


SELECT DISTINCT ?room_x ?room_y
WHERE { 
GRAPH   { ?room_x lv:sameLevelWith ?room_y.}
STREAM  [RANGE 5m]{
  {:axel :isIn ?room_x. :darko :isIn ?room_x.} SEQ 
  {:axel :isIn ?room_y. :darko :isIn ?room_y.}
} 
          

EP-SPARQL has a native way of expressing the SEQ operator providing a much simpler query as shown in the following example.


SELECT DISTINCT ?room_x ?room_y
WHERE { 
 {:axel :isIn ?room_x. :darko :isIn ?room_x.}  
SEQ
 {:axel :isIn ?room_y. :darko :isIn ?room_y.}}
 FILTER (getDURATION() < "PT5M"^^xsd:duration)
          

Alternatively, if the requirement is that Axel and Darko remain in room_x exactly the same time (and so in room_y), then the query would look like the following one:


SELECT DISTINCT ?room_x ?room_y
WHERE  {STREAM  
       { :axel :isIn ?room_x. }
EQUALS
       { :darko :isIn ?room_x.}
SEQ
       { :axel :isIn ?room_y. }
EQUALS
       { :darko :isIn ?room_y.}
}
FILTER (getDURATION() < " PT5M"^^xsd:duration)
          

REPETITION

The repetition in CEP matches repeating events over time. For example consider the following query: Find at least three different pairs of rooms, in which Axel and Darko have been together within ten minutes, moving from room_x to room_y. This query relies on a SEQ operator and on counting the repetition of such sequence. Again, as SEQ is not supported in RSPs as C-SPARQL, a workaround using an aggregation count as follows is needed:


REGISTER QUERY Q7 AS
SELECT COUNT (DISTINCT *) AS ?nroom
FROM STREAM  [RANGE 5m SLIDE 1s]
WHERE { 
GRAPH   { ?room_x lv:sameLevelWith ?room_y.}
 :axel :isIn ?room_x. 
 :darko :isIn ?room_x. 
 :axel :isIn ?room_y. 
 :darko :isIn ?room_y.
 FILTER (timestamp(:axel :isIn ?room_x) = timestamp(:darko :isIn ?room_x) && 
         timestamp(:axel :isIn ?room_y) = timestamp(:darko :isIn ?room_y) &&  
         timestamp(:axel :isIn ?room_x) < timestamp(:darko :isIn ?room_y) 
         )
} 
HAVING (?nroom >3) 

          

Since Esper has a native way of expressing the SEQ operator, this query might result easier if expressed in Esper. This has been tried in Esper (for illustrative purposes), using a simplistic TripleEvent object. Notice the use of the every xx -> pattern in the Esper query. For more information and the code leading to this query refer to [[KIAESPER]]. The query is expected to find at least three different pairs of rooms, in which Person-a and Person-b have been together within ten minutes, moving from room_a to room_b. This query relies on a SEQ operator and on counting the repetition of such sequence.


select * from pattern [ 
  every a=TripleEvent -> every b=TripleEvent -> 
  every d=TripleEvent -> every c=TripleEvent -> 
  every e=TripleEvent -> every f=TripleEvent 
where timer:within(10 min)] 
where a.subject!=b.subject and a.object=b.object and 
      c.subject!=d.subject and c.object=d.object and 
      e.subject!=f.subject and e.object=f.object and 
      a.subject=d.subject  and b.subject=c.subject and 
      d.subject=e.subject and c.subject=f.subject and 
      a.predicate='isIn' and b.predicate='isIn' and 
      c.predicate='isIn' and d.predicate='isIn' and 
      e.predicate='isIn' and f.predicate='isIn'
          

Other Operators

For completeness we also include other operators that may be useful in specific use cases. They are of particular interest, especially considering that many use cases require integration with stored data and even temporally valid stored data.

Refreshing stored data

Combining streaming and stored data is supported in existing RSP engines. However, in many cases the stored data can change during the query lifetime, so it might be important to refresh or update the stored contents at a certain point of the evaluation process timeline. An RSP should consider that different elements in background data change over time at different rates, e.g., the twitter followers of a famous person changes more often than those of a regular one. Also, rate of changes may vary over time, i.e., dynamic change rates [[DEHGHANZADEH2015]]. Existing RSP languages do not impose or propose any way of explicitly performing these updates. In fact, in some RSP engines the stored data is assumed to be static during query evaluation. An extension to the core functionality of the query language would be to let the user provide hints as to how often the stored data is updated. These may be interpreted by the query processing engine to indicate how often to refresh the stored data. We will need to think about the granularity of these hints, e.g., by dataset or class. An example of this, implemented as a query operator, can be found in SNEEql, using the RESCAN keyword:


SELECT * FROM locations [RESCAN 20 SECONDS];
          

Although SPARQLStream has been used to rewrite to SNEEql, the RESCAN operator has not been mapped to an equivalent in SPARQLStream.

Fact

Fact is a Complex Event Processing operator, which maintains temporal states (the Facts) of a system. It differentiates Events, i.e., things that happen(ed) and Facts, i.e., things that are true for a specified amount of time. A more detailed description can be found at TEF-SPARQL [[GAO2015]].


Axel enter RoomA, [2]
Darko enter RoomA, [3]
Axel leave RoomA, [6]
Axel enter RoomB, [6]
Darko leave RoomA, [8]
Darko enter RoomB, [8]
          

Each data entry in this stream is an Event: The event “Axel enters RoomA at time 2” is always true, as it actually happened. The Fact that “Axel isIn RoomA” is a temporal state in the system, which is only true for a restricted period of time. Axel is in roomA, only SINCE time 2, UNTIL he leaves the room at time 6. The FACT operator maintains such temporal states, together with operations such as SINCE (set the beginning time of a valid fact) and TILL (set the ending time of a fact).


CONSTRUCT FACT UserFact  {?user isIn ?room}
(UNION 
         (SINCE   ?user :enter ?room)
         (TILL  ?user :leave ?room) 
)
          

The benefits of using the Fact operator:

  • It simplifies stream reasoning by creating/updating Facts.
  • It saves the cost of maintaining events between consecutive time windows.
  • Query: Return the current number of people in each room, every three seconds.

    
    // creating facts
    CONSTRUCT FACT UserFact  {?user isIn ?room}
    (UNION 
             (SINCE   ?user :enter ?room)
             (TILL  ?user :leave ?room) 
    ) 
    // counting facts 
    SELECT ?room AS ROOM, ?user as USER (AGGREGATE ? room, COUNT ?user
             WHERE (CURRENT ?user isIn ? room)
             GROUP BY ? room
             EVERY  "P3SEC"^^xsd:Duration) 
              

    We can store and count the number of Facts (e.g., ). The Facts can be created by using the “SINCE” clause (when arrives, the fact is created with empty “endTime”. ) and terminated with the “TILL” clause (when arrives, the fact is terminated by updating “endTime”. )

    Esper with fact We use the operator "Named window" in Esper to manage Facts.

    
    create window factUser.win:keepall() as (userName String, room String, startTime Long, endTime Long)
    // create facts
    on UserEvent merge factUser where factUser.room = UserEvent.room and factUser.userName = UserEvent.userName
    when matched  and UserEvent.userAction = ''  
    // update the ending time of the fact, and delete
    then delete where factUser.userName = UserEvent.userName  
    when not matched
    // insert a fact with an opening end time
    then insert  
    select UserEvent.userName as userName, UserEvent.userRoom as room, UserEvent.timeStamp as startTime, 0L as endTime    
        
    // counting the facts
    select count(*), room from factUser group by room output all every 3 sec
              

    SPARQL without Fact Without the Fact operator, the system needs to create an additional stream to maintain system states. In this example, the UseRoomCounts stream is created for two kinds of counting events of each room: enterCount and leftCount.

    
    REGISTER STREAM UserEnterLeftCounts AS
    CONSTRUCT { ?room uc:enterCount ?enterCount ; uc:leftCount ?leftCount . }
    FROM STREAM  [RANGE 3 sec TUMBLING]
    WHERE {
              { SELECT ( COUNT(?userEnter) as ?enterCount ) ?room
                        WHERE { ?userEnter enter ?room } GROUP BY ?room }
              { SELECT ( COUNT(?userLeft) as ?leftCount ) ?room
                        WHERE { ?userLeft left ?room } GROUP BY ?room } }
    
    REGISTER STREAM UseRoomCounts AS
    CONSTRUCT {?room uc:currentCount ?newCount }
    FROM STREAM  [RANGE 3 sec TUMBLING]
    WHERE {
              { SELECT ( ?currentCount + ?enterCount - ?leftCount as ?newCount ) ?room
                        WHERE {?room uc:enterCount ?enterCount ; uc:leftCount ?leftCount ; uc:currentCount ?currentCount . } } }
    
              

    Esper without Fact Without using the "named window" operator, the system needs to copy the count events (Part 2 below) between windows, in order to maintain the count events. create schema countEventType (timeStamp Long , userCount Long, room String)

    
    // part1: create count events
    insert into countEventType 
    select  
               (select count(*)  
               from UserEvent.win:time_batch( 3 sec)  as ue 
               where ce.userRoom = ue.userRoom and ue.userAction = ''  ) 
               (select count(*)  
               from UserEvent.win:time_batch( 3 sec) as ue 
               where ce.userRoom = ue.userRoom and ue.userAction = '' ) 
    as userCount , ce.userRoom as room ,  (current_timestamp + 3000L) as timeStamp 
    from UserEvent.win:time_batch( 3 sec) as ce
    group by ce.userRoom  
    
    // part2:  copy count events between each window
    insert into countEventType 
    select sum(userCount) as userCount, room as room , (current_timestamp + 3000L) as timeStamp 
    from countEventType.win:time_batch( 3 sec) group by room  
        
    // part3:  aggregate count events
    select sum(userCount) as counter, ce.room as room, current_timestamp as currentTime, ce.room 
    from countEventType.win:time_batch( 3 sec) as ce 
    group by ce.room  
              

    If there are multiple streams involved, the cost of maintaining events could be very high. Facts provide a solution for modelling and maintaining temporal states.

Serialisation

The abstract model can be implemented in different concrete formats or serialisations. The question is, how can the model be serialised? Following our requirements, we shall attempt to remain as compatible as possible with existing RDF serialisations. In general, the RDF stream data model is defined independently of the various possible serialisations.

The W3C RSP Group has started to address this sub-topic in a dedicated thread. This initiative already explored the current standard formats for RDF, including RDF/XML, Turtle, N-Quads, N-Triples, JSON-LD and TriG. The binary representations that exist have also been explored, including HDT, SHDT, ERI, RDSZ and EXI. The evaluation and analysis of serialisation formats will continue during the Group life span, and final results go beyond the scope of this document.