SQL Timeseries Queries

Some queries for working with timeseries data in a standard SQL table. For this I'll be using an integer timestamp for simplicity and SQLite as the DB. However, in a real situation using datetime is probably better, and the SQL queries are portable across at least most of the major DBs.

Overview of table structures

Structure notes:

Table Schema:

series_id TEXT PK ts INT PK version_ts INT PK value TEXT metadata_json JSON created_at INT (default now())

Column Notes:

A note on timestamp resolution

It's important to get your minimum timestamp resolution correct for your use case. Too precise (i.e. nanosecond for something that happens once an hour) means you waste space and spend effort dealing with jitter in your data collection. For example is 2022-01-01T00:00:00.01 actually meaningfully different than 2022-01-01T00:00:00.02? Not precise enough, and you lose information to bucketing issues. For example if I collect data 3 times a minute but only store at minute resolution, there will be 3 values collected at minute X when they should be a seconds X:00, X:20, and X:40. My strategy is to find the most resolution I need, and drop resolution for aggregations. Generally second resolution is enough for physical events. For machine events, I'll skip past millisecond to nanosecond; not that clocks are particularly accurate at nanosecond resolution, but its much more likely for separate consecutive events to be unique. If you still get consistent collisions at nanosecond, consider moving your version from a timestamp to a incrementing sequence so that you have a guarantee against collisions, and because time clearly isn't that meaningful to you anymore since the odds of devices being accurate to the nanosecond is small. If your devices are actually that accurate and you need to keep the timestamp while dodging collisions, then go figure something else out for your GPS or LHC style project.

When dropping resolution, binning (aka truncating) is generally a more consistent approach than rounding, and easier. Binning means everything that happens between minute 0 and minute 1 gets "binned" into minute 0, even 0:59. You can do this DB or application side, depends on what's best for your use case. DB side has the benefit of usually filtering out extra data before it leaves the server, so less data transfer.

Query 1: Versioned Points

This type of query gets the value of the latest version at each timestep. An example use case here is point observations. Getting the proper resolution (and binning when needed) is important here, otherwise any jitter in the data collection will cause multiple versions to be represented in the final data.

Query

WITH ver AS (
    SELECT
        MAX(version_ts) as max_ver,
        ts
    FROM timeseries
    WHERE 0 <= ts  -- Start time filter here
    GROUP BY ts, series_id
)
SELECT
    t.ts,
    t.version_ts,
    t.value
FROM timeseries AS t
INNER JOIN ver
    ON ver.ts = t.ts
        AND ver.max_ver = t.version_ts
ORDER BY t.ts DESC

Data Example

In this example, version 2 completely replaces version 1, and version 3 is the latest for timestamp 2. So the data returned by the query is -O-Y-O-O.

  |
3-|   Y
  |  / \   
2-|-O O O-O
  |
1-| x x x x
  |
0-|________________________
  | | | | |
  0 1 2 3 4
`x`   : Version 1
`O`   : Version 2
`Y`   : Version 3
`----`: Data returned by query

Jitter example

Here, you can see the issue with timestamp jitter in this query. Because the versions are offset by one (pretend one second), a sawtooth pattern between the versions appears. If this were more realistic, say data came it at either XX:00 or XX:01, because of network conditions, the query would included both versions even though you really just want to include the latest. This is where you'd need to bin the data in the query.

  |
2-|-O   O   O-
  |  \ / \ /
1-|   x   x   
  |
0-|____________
  | | | | | |
  0 1 2 3 4 5

`x`   : earliest version
`O`   : latest version
`----`: Data returned by query

Versioned Scenario Query

This query gets the latest version for the entire time window that query is valid for. This is best for things like forecasts, where several time steps will be predicted at once and overlap with previous predictions. However, if there's lots of holes in a version, the final data will be pretty sparse.

Query

Interestingly, the last time I attempted to solve the versioned scenario query, I didn't figure it out. I resorted to doing some of the work in the application, which meant more data transfer from the DB and loops in the application after the query. And then several years later, revised the problem and solved it. The magic on this query is the LAG function, which gives you the value from the Nth row behind the current row. This lets the cte make a "valid time window" for each version.

WITH cte AS (
    SELECT
        version_ts,
        MIN(ts) as min_ts,
        LAG(ts, 1, NULL) OVER (ORDER BY version_ts DESC) as lag_ts
    FROM timeseries
    WHERE 0 <= ts  -- Start time filter here
    GROUP BY version_ts, series_id
)
SELECT
    t.ts,
    t.version_ts,
    t.value
FROM timeseries t
INNER JOIN cte
    ON t.version_ts = cte.version_ts
        AND cte.min_ts <= t.ts
        AND t.ts < ifnull(lag_ts, t.ts+1)
ORDER BY t.ts DESC;

Data Example

Here, we take much the same query that caused issues with the Versioned Points query, adding an extra x and removing an O just to make things more clear. The data returned from this query is x-O-------O, no x's after the first are returned because the O version is the most up to date from times 1 to 6. However, you can see that the gap between O is comparatively large, so if instead of a few seconds apart, they're a few hours apart, the data returned may have large gaps.

  |
2-|   O-------O-
  |  /          
1-|-x   x   x   
  |
0-|____________
  | | | | | | |
  0 1 2 3 4 5 6

`x`   : earliest version
`O`   : latest version
`----`: Data returned by query

Thoughts

I haven't done lots of testing on large data sets, or looked at query plans much, but these are a good starting point for using a standard SQL DB as a timeseries DB. This timeseries table definitely prioritizes inserts over reads; assuming lots of rows, the above queries and variants are never going to be particularly fast on the raw data, perhaps besides getting very recent data. As the table gets larger, getting a data retention and archived aggregate solution in place becomes important for accessing data far in the past. A full python example is below, which will generate a SQLite DB, set up some test data, run the queries, and make some plots. I didn't bother completely encoding the queries into SQLAlchemy Core, but it's all expressible in the Core notation. For larger timeseries applications, I've had some success with Postgres + TimescaleDB scaling up to terabyte levels of data.

Python script

# pip install sqlalchemy matplotlib
import matplotlib.pyplot as plt
from matplotlib.pyplot import Axes

from sqlalchemy import Column, TEXT, create_engine, INTEGER
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import declarative_base, Session

# SQLAlchemy Setup
Base = declarative_base()
engine = create_engine(f"sqlite:///timeseries_test.sqlite", echo=False, future=True)


class Timeseries(Base):
    __tablename__ = "timeseries"

    series_id: str = Column(TEXT, primary_key=True, nullable=False)
    ts: int = Column(INTEGER, primary_key=True, nullable=False)
    version_ts: int = Column(INTEGER, primary_key=True, nullable=False)
    value: int = Column(INTEGER, nullable=False)


# Create table
Base.metadata.create_all(engine)

# Test Data
series_1: list[Timeseries] = []
max_version: int = 3
for version_ts in range(0, max_version):
    ts_0: int = 0 + version_ts  # Comment out addition of version_ts to create Versioned Points dataset
    ts_step: int = 3
    for ts in range(ts_0, 30, ts_step):
        row = Timeseries(series_id="id1", ts=ts, version_ts=version_ts, value=(version_ts + 1))
        series_1.append(row)

# False to skip re-adding data to DB, but PK will prevent dupes and the try-catch skips the dupe errors
if True:
    try:
        with Session(engine) as session:
            session.add_all(series_1)
            session.commit()
    except IntegrityError as ie:
        print("data already loaded")


# Matplotlib setup
obs_axs: list[Axes]
obs_fig, obs_axs = plt.subplots(2)
version_colors = ["ro", "go", "bo"]

# "Normal" point versioned data query: get the latest at each timestamp
versioned_points_sql: str = """
WITH ver AS (
    SELECT
        MAX(version_ts) as max_ver,
        ts
    FROM timeseries
    WHERE {0} <= ts  -- Python Variable Substitution Here
    GROUP BY ts, series_id
)
SELECT
    t.ts,
    t.version_ts,
    t.value
FROM timeseries AS t
INNER JOIN ver
    ON ver.ts = t.ts
        AND ver.max_ver = t.version_ts
ORDER BY t.ts DESC
"""

since_ts: int = 0
sql = versioned_points_sql.format(since_ts)
with Session(engine) as session:
    res_proxy = session.execute(sql)
    data: list[Timeseries] = [row for row in res_proxy]

# Build point plot
obs_axs[0].plot([d[0] for d in data], [d[2] for d in data], "k")
for ver in range(0, max_version):
    x = [d[0] for d in data if d[1] == ver]
    y = [d[2] for d in data if d[1] == ver]
    obs_axs[0].plot(x, y, version_colors[ver])
    obs_axs[0].set_title("Versioned Points")
    obs_axs[0].legend(["data", "version 0", "version 1", "version 2"])

# Scenario versioned query: Consider versions continuous
versioned_points_sql: str = """
WITH cte AS (
    SELECT
        version_ts,
        MIN(ts) as min_ts,
        LAG(ts, 1, NULL) OVER (ORDER BY version_ts DESC) as lag_ts
    FROM timeseries
    WHERE {0} <= ts  -- Python Variable Substitution Here
    GROUP BY version_ts, series_id
)
SELECT
    t.ts,
    t.version_ts,
    t.value
FROM timeseries t
INNER JOIN cte
    ON t.version_ts = cte.version_ts
        AND cte.min_ts <= t.ts
        AND t.ts < ifnull(lag_ts, t.ts+1)
ORDER BY t.ts DESC;
"""

since_ts: int = 0
sql = versioned_points_sql.format(since_ts)
with Session(engine) as session:
    res_proxy = session.execute(sql)
    data: list[Timeseries] = [row for row in res_proxy]


# Build scenario plot
obs_axs[1].plot([d[0] for d in data], [d[2] for d in data], "k")
for ver in range(0, max_version):
    x = [d[0] for d in data if d[1] == ver]
    y = [d[2] for d in data if d[1] == ver]
    obs_axs[1].plot(x, y, version_colors[ver])
    obs_axs[1].set_title("Versioned Scenario")
    obs_axs[1].legend(["data", "version 0", "version 1", "version 2"])

plt.show()