Scale Sense — Data Model¶
DynamoDB single-table design¶
Table: scale-sense
Always query by PK + SK prefix. Never scan.
Access patterns → queries¶
| Access pattern | Query |
|---|---|
| Dashboard load: all taps at location | PK=LOCATION#loc-001, SK begins_with TAP# |
| Tap current state | PK=LOCATION#loc-001, SK=TAP#07#STATE |
| Tap config (beer, keg, device) | PK=LOCATION#loc-001, SK=TAP#07 |
| Beer profile | PK=BEER#guinness-draught, SK=META |
| Beer keg weights | PK=BEER#guinness-draught, SK begins_with KEG# |
| Keg lifecycle | PK=KEG#keg-20260318-007, SK=META |
| Keg tare record | PK=KEG#keg-20260318-007, SK=TARE |
| Device meta + status | PK=DEVICE#SCALE-0042, SK=META |
| Device telemetry | PK=DEVICE#SCALE-0042, SK=TELEMETRY |
| Current calibration | PK=DEVICE#SCALE-0042, SK=CALIBRATION |
| Calibration history | PK=DEVICE#SCALE-0042, SK begins_with CAL# |
| Active alerts at location | PK=ALERT#loc-001, SK begins_with ACTIVE# |
| Alert history | PK=ALERT#loc-001, SK begins_with HIST# |
| Check grace period active | PK=FIRING#TAP#07, SK=low_keg |
| WebSocket connections | PK=WS_CONN#<conn_id>, SK=META |
| All WS connections for location | GSI: loc_id-index, PK=loc_id |
| Device from scale ID | GSI: device-index, PK=device_id |
| Low battery devices at location | GSI: location-battery-index |
Hot rows¶
TAP#07#STATE is written every 60s per scale. Keep separate from TAP#07 config.
DEVICE#SCALE-0042 / TELEMETRY is written every 5 min.
Both use DynamoDB on-demand — no capacity planning needed.
Readings store (DynamoDB fallback while Timestream is pending)¶
PK = READINGS#keg#<keg_id> (keyed by keg, not tap — clean break on beer swap)
SK = ISO timestamp (sortable)
TTL = 30 days
Attributes: tap_id, loc_id, device_id, keg_id, level_pct, gallons, weight_lbs, battery_pct, battery_v
Falls back to PK = READINGS#<tap_id> when no keg is assigned (backwards compat for old data).
History queries scope to current keg_id from tap config — old keg data not mixed in.
Response includes both data (level_pct) and battery_data arrays for dual-series chart.
Tare data — dual write¶
Tare values are written to TWO places on every tare measurement:
- KEG#<id> / TARE — source of truth: empty_offset, full_ref, tared_at, tare_status
- DEVICE#<id> / TELEMETRY — diagnostic copy: tare_empty_offset, tare_full_lbs (reflects scale NVS)
The scale computes level_pct itself using NVS-stored tare values. The keg record is the audit trail.
When tare_full is recorded, BEER#<id> / KEG#<type> full_lbs is updated via updateItem (preserves empty_lbs).
Beer keg weight source badges¶
BEER#<id> / KEG#<type> records carry a measured_at field when updated by a real tare:
- No measured_at → default (static reference weights, seeded at half_barrel for all beers)
- measured_at present → set (real tare measurement)
- measured_count > 1 → calculated (running average — future, issue #20)
Tap ID format¶
Tap IDs always include location: loc-001#TAP#07
This is the tap_id used in Timestream dimensions and MQTT topics.
Beer library¶
Global library = beers with PK BEER#<id> — available to all locations.
Custom beers = beers with PK BEER#loc-001#<id> — location-specific.
Search combines both when filtering by loc_id.
Specific gravity¶
Default: 1.0 (water)
Stored on BEER#<id> / META as specific_gravity attribute.
Used in gallons calculation: gallons = (net_kg / sg) * 0.264172
Common values:
- Light lager (Coors Light): 1.007
- Standard lager (Modelo): 1.010
- Pale ale (Sierra Nevada): 1.012
- IPA (Lagunitas): 1.013
- Stout (Guinness): 1.014
Keg type reference data¶
| Type | Gallons | Full lbs | Empty lbs |
|---|---|---|---|
| half_barrel | 15.5 | 161.5 | 29.0 |
| quarter_barrel | 7.75 | 87.0 | 22.0 |
| sixth_barrel | 5.17 | 61.0 | 16.5 |
| slim_quarter | 7.75 | 87.0 | 20.0 |
These are stored per beer profile per keg type, not hardcoded. Allows for density variations (Guinness nitrogen vs standard CO2 keg weights differ slightly).
Alert state storage¶
Active alert key: ALERT#loc-001 / ACTIVE#low_keg#TAP#07
- Only one active alert per (location, type, entity)
- Overwritten in place as state transitions
Grace period key: FIRING#TAP#07 / low_keg
- Uses DynamoDB TTL = grace_period_secs from location settings
- Auto-deleted when grace period expires
- If alert fires, delete explicitly
History key: ALERT#loc-001 / HIST#<ISO-timestamp>
- Append-only — every alert lifecycle creates a record
- Used for alert history log in dashboard
WebSocket connection tracking¶
Key: WS_CONN#<connection_id> / META
Attributes: { loc_id, user_id, connected_at, expires_at }
TTL on expires_at (2 hours) — auto-cleanup abandoned connections.
On $connect: write record
On $disconnect: delete record
On ingest-reading Lambda: query all WS_CONN# for loc_id, fan out, delete GoneException connections
Timestream queries reference¶
Dashboard chart — history¶
-- Bin size selected by Lambda based on range parameter
SELECT
BIN(time, {bin}) AS t,
AVG(measure_value::double) AS level_pct
FROM "scale-sense"."keg-readings"
WHERE measure_name = 'level_pct'
AND tap_id = '{tap_id}'
AND time >= ago({range})
GROUP BY BIN(time, {bin})
ORDER BY t ASC
Pour rate + hours remaining¶
WITH recent AS (
SELECT time, measure_value::double AS gallons
FROM "scale-sense"."keg-readings"
WHERE measure_name = 'gallons'
AND tap_id = '{tap_id}'
AND time >= ago({window_hours}h)
)
SELECT
(MAX(gallons) - MIN(gallons)) / {window_hours}.0 AS gallons_per_hour,
MIN(gallons) / ((MAX(gallons) - MIN(gallons)) / {window_hours}.0) AS hours_remaining
FROM recent
Default window: 6 hours (configurable per location in settings).
Keg lifespan¶
SELECT
tap_id,
MIN(time) AS tapped_at,
MAX(time) AS last_reading,
MAX(measure_value::double) - MIN(measure_value::double) AS total_poured_gallons,
date_diff('hour', MIN(time), MAX(time)) AS lifespan_hours
FROM "scale-sense"."keg-readings"
WHERE measure_name = 'gallons'
AND tap_id = '{tap_id}'
AND time BETWEEN '{start}' AND '{end}'
GROUP BY tap_id
Pour volume by hour of day¶
SELECT
EXTRACT(HOUR FROM time) AS hour_of_day,
AVG(measure_value::double) AS avg_gallons_poured
FROM "scale-sense"."keg-readings"
WHERE measure_name = 'gallons'
AND loc_id = '{loc_id}'
AND time >= ago(30d)
GROUP BY EXTRACT(HOUR FROM time)
ORDER BY hour_of_day ASC
Device battery trend¶
SELECT
BIN(time, 1d) AS day,
device_id,
AVG(measure_value::double) AS avg_battery_pct,
MIN(measure_value::double) AS min_battery_pct
FROM "scale-sense"."keg-readings"
WHERE measure_name = 'battery_pct'
AND loc_id = '{loc_id}'
AND time >= ago(7d)
GROUP BY BIN(time, 1d), device_id
ORDER BY day ASC, device_id ASC
Timestream cost notes¶
- Memory store (24h): fast, expensive per GB
- Magnetic store (1y): slow, cheap
- Always filter on a dimension (tap_id or loc_id) before time — reduces scan cost
- Use BIN() aggressively on long time ranges — reduce scanned rows
- Multi-measure records cost one write vs N writes for separate measures
- Style split query joins beer table from DynamoDB in Lambda (Timestream can't join)
DynamoDB write patterns¶
Ingest reading (Lambda — hot path)¶
// Three writes per reading — all parallel
await Promise.all([
// 1. Update tap state
dynamodb.update({
TableName: TABLE_NAME,
Key: { PK: `LOCATION#${loc_id}`, SK: `TAP#${tap_number}#STATE` },
UpdateExpression: "SET level_pct=:l, gallons=:g, weight_lbs=:w, updated_at=:t, tare_status=:s",
ExpressionAttributeValues: { :l: level_pct, :g: gallons, :w: weight_lbs, :t: now, :s: tare_status }
}),
// 2. Update device telemetry
dynamodb.update({
TableName: TABLE_NAME,
Key: { PK: `DEVICE#${device_id}`, SK: "TELEMETRY" },
UpdateExpression: "SET battery_v=:bv, battery_pct=:bp, updated_at=:t",
ExpressionAttributeValues: { :bv: battery_v, :bp: battery_pct, :t: now }
}),
// 3. Write to Timestream
timestream.writeRecords({ ... })
])
Beer assignment (tap + keg creation)¶
// Parallel: create keg record + update tap config
const kegId = `keg-${dateStr}-${nanoid(4)}`
await Promise.all([
dynamodb.put({
TableName: TABLE_NAME,
Item: {
PK: `KEG#${kegId}`,
SK: "META",
beer_id: beerId,
keg_type: kegType,
tapped_at: now,
loc_id: locId,
full_lbs: fullLbs,
empty_lbs: emptyLbs,
sg: specificGravity
}
}),
dynamodb.update({
TableName: TABLE_NAME,
Key: { PK: `LOCATION#${locId}`, SK: `TAP#${tapNumber}` },
UpdateExpression: "SET beer_id=:b, keg_id=:k, keg_type=:t, full_lbs=:fl, empty_lbs=:el",
ExpressionAttributeValues: { :b: beerId, :k: kegId, :t: kegType, :fl: fullLbs, :el: emptyLbs }
}),
dynamodb.update({
TableName: TABLE_NAME,
Key: { PK: `LOCATION#${locId}`, SK: `TAP#${tapNumber}#STATE` },
UpdateExpression: "SET tare_status=:s",
ExpressionAttributeValues: { :s: "pending" }
})
])