SQL Timeseries Queries
Some queries for working with timeseries data in a standard SQL table. For this I'll be using an integer timestamp for simplicity and SQLite as the DB. However, in a real situation using datetime is probably better, and the SQL queries are portable across at least most of the major DBs.
Overview of table structures
Structure notes:
- Table is append only
- The table holds multiple series
- All data is versioned
- There can only be a single value for a series at a timestamp for a given version (i.e. the Primary key is series_id, timestamp, version)
- Values can be anything, so the Application code will be responsible for keeping track of Text vs Numerics
- Note: Another way to do this would be creating a Text value column and a numeric value column, it's a tradeoff. While that still requires application code to keep track of what to insert where, it wouldn't need to do type conversions or track the type outside the Data somehow, at the cost of more storage.
Table Schema:
series_id TEXT PK | ts INT PK | version_ts INT PK | value TEXT | metadata_json JSON | created_at INT (default now()) |
---|
Column Notes:
- series_id: unique identifier for a data series. This should probably have a foreign key relationship to another table detailing series level context info (location, update frequency, units, etc.)
- ts: timestamp representing when the value was recorded. I send this in from the application, rather than having the DB generate a timestamp, so that the timestamp is closer to the recording time
- version_ts: version of the data. This doesn't strictly need to be a timestamp, just something that can be sorted, though having it be a timestamp is nice for future debugging. It could also be generated by the DB rather than passed in by the application, but care has to be taken so that all data collected at the same time have the same version, an autogenerated timestamp can create a new timestamp per row depending on the specifics of the DB and transactions, so I tend to make this the applications responsibility as well.
- value: whatever the value is at the timestamp for the version.
- metadata_json: a place to dump extra info at the row level. A JSON column has some nice properties for DBs that support querying it, but utility and implementation will vary depending on DB support
- created_at: A maintenance column, fill it with an autogenerated timestamp. Normally you'd have updated as well, but timeseries are append only so that isn't needed here.
A note on timestamp resolution
It's important to get your minimum timestamp resolution correct for your use case. Too precise (i.e. nanosecond for something that happens once an hour) means you waste space and spend effort dealing with jitter in your data collection. For example is 2022-01-01T00:00:00.01 actually meaningfully different than 2022-01-01T00:00:00.02? Not precise enough, and you lose information to bucketing issues. For example if I collect data 3 times a minute but only store at minute resolution, there will be 3 values collected at minute X when they should be a seconds X:00, X:20, and X:40. My strategy is to find the most resolution I need, and drop resolution for aggregations. Generally second resolution is enough for physical events. For machine events, I'll skip past millisecond to nanosecond; not that clocks are particularly accurate at nanosecond resolution, but its much more likely for separate consecutive events to be unique. If you still get consistent collisions at nanosecond, consider moving your version from a timestamp to a incrementing sequence so that you have a guarantee against collisions, and because time clearly isn't that meaningful to you anymore since the odds of devices being accurate to the nanosecond is small. If your devices are actually that accurate and you need to keep the timestamp while dodging collisions, then go figure something else out for your GPS or LHC style project.
When dropping resolution, binning (aka truncating) is generally a more consistent approach than rounding, and easier. Binning means everything that happens between minute 0 and minute 1 gets "binned" into minute 0, even 0:59. You can do this DB or application side, depends on what's best for your use case. DB side has the benefit of usually filtering out extra data before it leaves the server, so less data transfer.
Query 1: Versioned Points
This type of query gets the value of the latest version at each timestep. An example use case here is point observations. Getting the proper resolution (and binning when needed) is important here, otherwise any jitter in the data collection will cause multiple versions to be represented in the final data.
Query
WITH ver AS (
SELECT
MAX(version_ts) as max_ver,
ts
FROM timeseries
WHERE 0 <= ts -- Start time filter here
GROUP BY ts, series_id
)
SELECT
t.ts,
t.version_ts,
t.value
FROM timeseries AS t
INNER JOIN ver
ON ver.ts = t.ts
AND ver.max_ver = t.version_ts
ORDER BY t.ts DESC
Data Example
In this example, version 2 completely replaces version 1, and version 3 is the latest for timestamp 2. So the data returned by the query is -O-Y-O-O
.
|
3-| Y
| / \
2-|-O O O-O
|
1-| x x x x
|
0-|________________________
| | | | |
0 1 2 3 4
`x` : Version 1
`O` : Version 2
`Y` : Version 3
`----`: Data returned by query
Jitter example
Here, you can see the issue with timestamp jitter in this query. Because the versions are offset by one (pretend one second), a sawtooth pattern between the versions appears. If this were more realistic, say data came it at either XX:00 or XX:01, because of network conditions, the query would included both versions even though you really just want to include the latest. This is where you'd need to bin the data in the query.
|
2-|-O O O-
| \ / \ /
1-| x x
|
0-|____________
| | | | | |
0 1 2 3 4 5
`x` : earliest version
`O` : latest version
`----`: Data returned by query
Versioned Scenario Query
This query gets the latest version for the entire time window that query is valid for. This is best for things like forecasts, where several time steps will be predicted at once and overlap with previous predictions. However, if there's lots of holes in a version, the final data will be pretty sparse.
Query
Interestingly, the last time I attempted to solve the versioned scenario query, I didn't figure it out. I resorted to doing some of the work in the application, which meant more data transfer from the DB and loops in the application after the query. And then several years later, revised the problem and solved it. The magic on this query is the LAG function, which gives you the value from the Nth row behind the current row. This lets the cte make a "valid time window" for each version.
WITH cte AS (
SELECT
version_ts,
MIN(ts) as min_ts,
LAG(ts, 1, NULL) OVER (ORDER BY version_ts DESC) as lag_ts
FROM timeseries
WHERE 0 <= ts -- Start time filter here
GROUP BY version_ts, series_id
)
SELECT
t.ts,
t.version_ts,
t.value
FROM timeseries t
INNER JOIN cte
ON t.version_ts = cte.version_ts
AND cte.min_ts <= t.ts
AND t.ts < ifnull(lag_ts, t.ts+1)
ORDER BY t.ts DESC;
Data Example
Here, we take much the same query that caused issues with the Versioned Points query, adding an extra x
and removing an O
just to make things more clear. The data returned from this query is x-O-------O
, no x
's after the first are returned because the O
version is the most up to date from times 1 to 6. However, you can see that the gap between O
is comparatively large, so if instead of a few seconds apart, they're a few hours apart, the data returned may have large gaps.
|
2-| O-------O-
| /
1-|-x x x
|
0-|____________
| | | | | | |
0 1 2 3 4 5 6
`x` : earliest version
`O` : latest version
`----`: Data returned by query
Thoughts
I haven't done lots of testing on large data sets, or looked at query plans much, but these are a good starting point for using a standard SQL DB as a timeseries DB. This timeseries table definitely prioritizes inserts over reads; assuming lots of rows, the above queries and variants are never going to be particularly fast on the raw data, perhaps besides getting very recent data. As the table gets larger, getting a data retention and archived aggregate solution in place becomes important for accessing data far in the past. A full python example is below, which will generate a SQLite DB, set up some test data, run the queries, and make some plots. I didn't bother completely encoding the queries into SQLAlchemy Core, but it's all expressible in the Core notation. For larger timeseries applications, I've had some success with Postgres + TimescaleDB scaling up to terabyte levels of data.
Python script
# pip install sqlalchemy matplotlib
import matplotlib.pyplot as plt
from matplotlib.pyplot import Axes
from sqlalchemy import Column, TEXT, create_engine, INTEGER
from sqlalchemy.exc import IntegrityError
from sqlalchemy.orm import declarative_base, Session
# SQLAlchemy Setup
Base = declarative_base()
engine = create_engine(f"sqlite:///timeseries_test.sqlite", echo=False, future=True)
class Timeseries(Base):
__tablename__ = "timeseries"
series_id: str = Column(TEXT, primary_key=True, nullable=False)
ts: int = Column(INTEGER, primary_key=True, nullable=False)
version_ts: int = Column(INTEGER, primary_key=True, nullable=False)
value: int = Column(INTEGER, nullable=False)
# Create table
Base.metadata.create_all(engine)
# Test Data
series_1: list[Timeseries] = []
max_version: int = 3
for version_ts in range(0, max_version):
ts_0: int = 0 + version_ts # Comment out addition of version_ts to create Versioned Points dataset
ts_step: int = 3
for ts in range(ts_0, 30, ts_step):
row = Timeseries(series_id="id1", ts=ts, version_ts=version_ts, value=(version_ts + 1))
series_1.append(row)
# False to skip re-adding data to DB, but PK will prevent dupes and the try-catch skips the dupe errors
if True:
try:
with Session(engine) as session:
session.add_all(series_1)
session.commit()
except IntegrityError as ie:
print("data already loaded")
# Matplotlib setup
obs_axs: list[Axes]
obs_fig, obs_axs = plt.subplots(2)
version_colors = ["ro", "go", "bo"]
# "Normal" point versioned data query: get the latest at each timestamp
versioned_points_sql: str = """
WITH ver AS (
SELECT
MAX(version_ts) as max_ver,
ts
FROM timeseries
WHERE {0} <= ts -- Python Variable Substitution Here
GROUP BY ts, series_id
)
SELECT
t.ts,
t.version_ts,
t.value
FROM timeseries AS t
INNER JOIN ver
ON ver.ts = t.ts
AND ver.max_ver = t.version_ts
ORDER BY t.ts DESC
"""
since_ts: int = 0
sql = versioned_points_sql.format(since_ts)
with Session(engine) as session:
res_proxy = session.execute(sql)
data: list[Timeseries] = [row for row in res_proxy]
# Build point plot
obs_axs[0].plot([d[0] for d in data], [d[2] for d in data], "k")
for ver in range(0, max_version):
x = [d[0] for d in data if d[1] == ver]
y = [d[2] for d in data if d[1] == ver]
obs_axs[0].plot(x, y, version_colors[ver])
obs_axs[0].set_title("Versioned Points")
obs_axs[0].legend(["data", "version 0", "version 1", "version 2"])
# Scenario versioned query: Consider versions continuous
versioned_points_sql: str = """
WITH cte AS (
SELECT
version_ts,
MIN(ts) as min_ts,
LAG(ts, 1, NULL) OVER (ORDER BY version_ts DESC) as lag_ts
FROM timeseries
WHERE {0} <= ts -- Python Variable Substitution Here
GROUP BY version_ts, series_id
)
SELECT
t.ts,
t.version_ts,
t.value
FROM timeseries t
INNER JOIN cte
ON t.version_ts = cte.version_ts
AND cte.min_ts <= t.ts
AND t.ts < ifnull(lag_ts, t.ts+1)
ORDER BY t.ts DESC;
"""
since_ts: int = 0
sql = versioned_points_sql.format(since_ts)
with Session(engine) as session:
res_proxy = session.execute(sql)
data: list[Timeseries] = [row for row in res_proxy]
# Build scenario plot
obs_axs[1].plot([d[0] for d in data], [d[2] for d in data], "k")
for ver in range(0, max_version):
x = [d[0] for d in data if d[1] == ver]
y = [d[2] for d in data if d[1] == ver]
obs_axs[1].plot(x, y, version_colors[ver])
obs_axs[1].set_title("Versioned Scenario")
obs_axs[1].legend(["data", "version 0", "version 1", "version 2"])
plt.show()