Skip to content

Scale Sense — Data Model

DynamoDB single-table design

Table: scale-sense Always query by PK + SK prefix. Never scan.

Access patterns → queries

Access pattern Query
Dashboard load: all taps at location PK=LOCATION#loc-001, SK begins_with TAP#
Tap current state PK=LOCATION#loc-001, SK=TAP#07#STATE
Tap config (beer, keg, device) PK=LOCATION#loc-001, SK=TAP#07
Beer profile PK=BEER#guinness-draught, SK=META
Beer keg weights PK=BEER#guinness-draught, SK begins_with KEG#
Keg lifecycle PK=KEG#keg-20260318-007, SK=META
Keg tare record PK=KEG#keg-20260318-007, SK=TARE
Device meta + status PK=DEVICE#SCALE-0042, SK=META
Device telemetry PK=DEVICE#SCALE-0042, SK=TELEMETRY
Current calibration PK=DEVICE#SCALE-0042, SK=CALIBRATION
Calibration history PK=DEVICE#SCALE-0042, SK begins_with CAL#
Active alerts at location PK=ALERT#loc-001, SK begins_with ACTIVE#
Alert history PK=ALERT#loc-001, SK begins_with HIST#
Check grace period active PK=FIRING#TAP#07, SK=low_keg
WebSocket connections PK=WS_CONN#<conn_id>, SK=META
All WS connections for location GSI: loc_id-index, PK=loc_id
Device from scale ID GSI: device-index, PK=device_id
Low battery devices at location GSI: location-battery-index

Hot rows

TAP#07#STATE is written every 60s per scale. Keep separate from TAP#07 config. DEVICE#SCALE-0042 / TELEMETRY is written every 5 min. Both use DynamoDB on-demand — no capacity planning needed.

Readings store (DynamoDB fallback while Timestream is pending)

PK = READINGS#keg#<keg_id>    (keyed by keg, not tap — clean break on beer swap)
SK = ISO timestamp (sortable)
TTL = 30 days
Attributes: tap_id, loc_id, device_id, keg_id, level_pct, gallons, weight_lbs, battery_pct, battery_v

Falls back to PK = READINGS#<tap_id> when no keg is assigned (backwards compat for old data). History queries scope to current keg_id from tap config — old keg data not mixed in. Response includes both data (level_pct) and battery_data arrays for dual-series chart.

Tare data — dual write

Tare values are written to TWO places on every tare measurement: - KEG#<id> / TARE — source of truth: empty_offset, full_ref, tared_at, tare_status - DEVICE#<id> / TELEMETRY — diagnostic copy: tare_empty_offset, tare_full_lbs (reflects scale NVS)

The scale computes level_pct itself using NVS-stored tare values. The keg record is the audit trail. When tare_full is recorded, BEER#<id> / KEG#<type> full_lbs is updated via updateItem (preserves empty_lbs).

Beer keg weight source badges

BEER#<id> / KEG#<type> records carry a measured_at field when updated by a real tare: - No measured_atdefault (static reference weights, seeded at half_barrel for all beers) - measured_at present → set (real tare measurement) - measured_count > 1calculated (running average — future, issue #20)

Tap ID format

Tap IDs always include location: loc-001#TAP#07 This is the tap_id used in Timestream dimensions and MQTT topics.

Beer library

Global library = beers with PK BEER#<id> — available to all locations. Custom beers = beers with PK BEER#loc-001#<id> — location-specific. Search combines both when filtering by loc_id.

Specific gravity

Default: 1.0 (water) Stored on BEER#<id> / META as specific_gravity attribute. Used in gallons calculation: gallons = (net_kg / sg) * 0.264172 Common values: - Light lager (Coors Light): 1.007 - Standard lager (Modelo): 1.010 - Pale ale (Sierra Nevada): 1.012 - IPA (Lagunitas): 1.013 - Stout (Guinness): 1.014

Keg type reference data

Type Gallons Full lbs Empty lbs
half_barrel 15.5 161.5 29.0
quarter_barrel 7.75 87.0 22.0
sixth_barrel 5.17 61.0 16.5
slim_quarter 7.75 87.0 20.0

These are stored per beer profile per keg type, not hardcoded. Allows for density variations (Guinness nitrogen vs standard CO2 keg weights differ slightly).

Alert state storage

Active alert key: ALERT#loc-001 / ACTIVE#low_keg#TAP#07 - Only one active alert per (location, type, entity) - Overwritten in place as state transitions

Grace period key: FIRING#TAP#07 / low_keg - Uses DynamoDB TTL = grace_period_secs from location settings - Auto-deleted when grace period expires - If alert fires, delete explicitly

History key: ALERT#loc-001 / HIST#<ISO-timestamp> - Append-only — every alert lifecycle creates a record - Used for alert history log in dashboard

WebSocket connection tracking

Key: WS_CONN#<connection_id> / META Attributes: { loc_id, user_id, connected_at, expires_at } TTL on expires_at (2 hours) — auto-cleanup abandoned connections.

On $connect: write record On $disconnect: delete record On ingest-reading Lambda: query all WS_CONN# for loc_id, fan out, delete GoneException connections

Timestream queries reference

Dashboard chart — history

-- Bin size selected by Lambda based on range parameter
SELECT
  BIN(time, {bin}) AS t,
  AVG(measure_value::double) AS level_pct
FROM "scale-sense"."keg-readings"
WHERE measure_name = 'level_pct'
  AND tap_id = '{tap_id}'
  AND time >= ago({range})
GROUP BY BIN(time, {bin})
ORDER BY t ASC

Pour rate + hours remaining

WITH recent AS (
  SELECT time, measure_value::double AS gallons
  FROM "scale-sense"."keg-readings"
  WHERE measure_name = 'gallons'
    AND tap_id = '{tap_id}'
    AND time >= ago({window_hours}h)
)
SELECT
  (MAX(gallons) - MIN(gallons)) / {window_hours}.0 AS gallons_per_hour,
  MIN(gallons) / ((MAX(gallons) - MIN(gallons)) / {window_hours}.0) AS hours_remaining
FROM recent

Default window: 6 hours (configurable per location in settings).

Keg lifespan

SELECT
  tap_id,
  MIN(time) AS tapped_at,
  MAX(time) AS last_reading,
  MAX(measure_value::double) - MIN(measure_value::double) AS total_poured_gallons,
  date_diff('hour', MIN(time), MAX(time)) AS lifespan_hours
FROM "scale-sense"."keg-readings"
WHERE measure_name = 'gallons'
  AND tap_id = '{tap_id}'
  AND time BETWEEN '{start}' AND '{end}'
GROUP BY tap_id

Pour volume by hour of day

SELECT
  EXTRACT(HOUR FROM time) AS hour_of_day,
  AVG(measure_value::double) AS avg_gallons_poured
FROM "scale-sense"."keg-readings"
WHERE measure_name = 'gallons'
  AND loc_id = '{loc_id}'
  AND time >= ago(30d)
GROUP BY EXTRACT(HOUR FROM time)
ORDER BY hour_of_day ASC

Device battery trend

SELECT
  BIN(time, 1d) AS day,
  device_id,
  AVG(measure_value::double) AS avg_battery_pct,
  MIN(measure_value::double) AS min_battery_pct
FROM "scale-sense"."keg-readings"
WHERE measure_name = 'battery_pct'
  AND loc_id = '{loc_id}'
  AND time >= ago(7d)
GROUP BY BIN(time, 1d), device_id
ORDER BY day ASC, device_id ASC

Timestream cost notes

  • Memory store (24h): fast, expensive per GB
  • Magnetic store (1y): slow, cheap
  • Always filter on a dimension (tap_id or loc_id) before time — reduces scan cost
  • Use BIN() aggressively on long time ranges — reduce scanned rows
  • Multi-measure records cost one write vs N writes for separate measures
  • Style split query joins beer table from DynamoDB in Lambda (Timestream can't join)

DynamoDB write patterns

Ingest reading (Lambda — hot path)

// Three writes per reading — all parallel
await Promise.all([
  // 1. Update tap state
  dynamodb.update({
    TableName: TABLE_NAME,
    Key: { PK: `LOCATION#${loc_id}`, SK: `TAP#${tap_number}#STATE` },
    UpdateExpression: "SET level_pct=:l, gallons=:g, weight_lbs=:w, updated_at=:t, tare_status=:s",
    ExpressionAttributeValues: { :l: level_pct, :g: gallons, :w: weight_lbs, :t: now, :s: tare_status }
  }),

  // 2. Update device telemetry
  dynamodb.update({
    TableName: TABLE_NAME,
    Key: { PK: `DEVICE#${device_id}`, SK: "TELEMETRY" },
    UpdateExpression: "SET battery_v=:bv, battery_pct=:bp, updated_at=:t",
    ExpressionAttributeValues: { :bv: battery_v, :bp: battery_pct, :t: now }
  }),

  // 3. Write to Timestream
  timestream.writeRecords({ ... })
])

Beer assignment (tap + keg creation)

// Parallel: create keg record + update tap config
const kegId = `keg-${dateStr}-${nanoid(4)}`

await Promise.all([
  dynamodb.put({
    TableName: TABLE_NAME,
    Item: {
      PK: `KEG#${kegId}`,
      SK: "META",
      beer_id: beerId,
      keg_type: kegType,
      tapped_at: now,
      loc_id: locId,
      full_lbs: fullLbs,
      empty_lbs: emptyLbs,
      sg: specificGravity
    }
  }),
  dynamodb.update({
    TableName: TABLE_NAME,
    Key: { PK: `LOCATION#${locId}`, SK: `TAP#${tapNumber}` },
    UpdateExpression: "SET beer_id=:b, keg_id=:k, keg_type=:t, full_lbs=:fl, empty_lbs=:el",
    ExpressionAttributeValues: { :b: beerId, :k: kegId, :t: kegType, :fl: fullLbs, :el: emptyLbs }
  }),
  dynamodb.update({
    TableName: TABLE_NAME,
    Key: { PK: `LOCATION#${locId}`, SK: `TAP#${tapNumber}#STATE` },
    UpdateExpression: "SET tare_status=:s",
    ExpressionAttributeValues: { :s: "pending" }
  })
])