Scale Sense — AWS Infrastructure¶
CDK structure¶
Language: TypeScript
Entry: infrastructure/bin/scale-sense.ts
infrastructure/
bin/
scale-sense.ts CDK app entry point
lib/
stacks/
iot-stack.ts IoT Core, Greengrass, device registry
data-stack.ts DynamoDB, Timestream
api-stack.ts API Gateway REST + WebSocket, Lambda, Cognito
alert-stack.ts EventBridge rules, SNS, SES
storage-stack.ts S3 (firmware, exports)
network-stack.ts VPC, WireGuard EC2, ECR
constructs/
scale-lambda.ts Shared Lambda config (layers, env, VPC)
iot-rule.ts IoT Core rule helper
lambda/
handlers/
ingest-reading.ts IoT rule → DynamoDB + Timestream + alerts
check-offline.ts EventBridge 5min → scan for offline scales
api/
locations.ts
taps.ts
beers.ts
devices.ts
commands.ts
inventory-export.ts
websocket/
connect.ts
disconnect.ts
message.ts
shared/
dynamo.ts DynamoDB client + helpers
timestream.ts Timestream write + query helpers
iot.ts IoT Core publish helper
alerts.ts Alert state machine
calculations.ts Weight → gallons conversion
test/
...
IoT Core setup¶
Device registry¶
- Thing type:
ScaleSenseScale - Thing group:
scales-{location_id}(one per bar location) - Certificates: X.509, provisioned via fleet provisioning or manual per device
- Policy:
ScaleSenseDevicePolicy - Allow:
iot:Connect(client ID must match thing name) - Allow:
iot:Publishonkeg/readings/*,keg/status/*,keg/response/* - Allow:
iot:Subscribeonkeg/command/*,keg/ota/* - Allow:
iot:Receiveonkeg/command/*,keg/ota/*
IoT rules¶
Rule: ScaleSenseReadingRule
SQL: SELECT *, topic(3) as device_id FROM 'keg/readings/+'
Actions:
→ Lambda: ingest-reading (fan-out to DynamoDB, Timestream, alerts, WebSocket)
Rule: ScaleSenseStatusRule
SQL: SELECT *, topic(3) as device_id FROM 'keg/status/+'
Actions:
→ Lambda: ingest-reading (handle status changes)
Greengrass¶
- Core devices: one per Pi (registered as Greengrass core)
- Leaf devices: ESP32 scales (registered as Greengrass leaf devices behind Pi)
- Component:
com.scalesense.mosquitto— Mosquitto config + bridge - Component:
com.scalesense.ota-server— serves .bin files locally - Component:
com.scalesense.monitor— Pi health reporting
DynamoDB¶
Table name: scale-sense
Billing: PAY_PER_REQUEST (on-demand)
Point-in-time recovery: enabled
Key schema (single-table design)¶
| PK | SK | Entity | Key attributes |
|---|---|---|---|
LOCATION#loc-001 |
META |
Location | name, address, timezone, settings |
LOCATION#loc-001 |
TAP#07 |
Tap config | scale_id, beer_id, keg_id, keg_type, full_lbs, empty_lbs |
LOCATION#loc-001 |
TAP#07#STATE |
Tap state | level_pct, gallons, weight_lbs, updated_at, tare_status |
BEER#guinness-draught |
META |
Beer | name, brewery, style, abv, specific_gravity |
BEER#guinness-draught |
KEG#half_barrel |
Keg weights | full_lbs, empty_lbs |
KEG#keg-20260318-007 |
META |
Keg lifecycle | beer_id, keg_type, tapped_at, loc_id |
KEG#keg-20260318-007 |
TARE |
Tare record | empty_offset, full_ref, tared_by, tared_at |
DEVICE#SCALE-0042 |
META |
Device | loc_id, tap_id, fw_version, status |
DEVICE#SCALE-0042 |
TELEMETRY |
Device health | battery_v, battery_pct, rssi, updated_at |
DEVICE#SCALE-0042 |
CALIBRATION |
Current cal | scale_factor, zero_offset, method, drift_pct, calibrated_at |
DEVICE#SCALE-0042 |
CAL#<timestamp> |
Cal history | scale_factor, known_weight, by, method, drift_pct |
ALERT#loc-001 |
ACTIVE#low_keg#TAP#07 |
Active alert | type, severity, state, fired_at, context |
ALERT#loc-001 |
HIST#<timestamp> |
Alert log | full alert record |
FIRING#TAP#07 |
low_keg |
Grace period | started_at, severity — TTL = grace_period_secs |
WS_CONN#<conn_id> |
META |
WS connection | loc_id, user_id, expires_at — TTL = 2h |
GSIs¶
GSI: device-index
PK: device_id (maps DEVICE#xxx → loc_id, tap_id)
GSI: location-battery-index
PK: loc_id
SK: battery_pct
(query low battery devices at a location)
DynamoDB TTL¶
FIRING#rows: TTL = grace_period_secs (auto-cleanup stale grace periods)WS_CONN#rows: TTL = connection_expires_at (auto-cleanup stale WS registrations)
Timestream¶
Database: scale-sense
Table: keg-readings
Memory store retention: 24 hours (fast queries) Magnetic store retention: 1 year (historical, configurable per location)
Record format (multi-measure — always use this)¶
{
Time: timestamp,
Dimensions: [
{ Name: "device_id", Value: "SCALE-0042" },
{ Name: "tap_id", Value: "loc-001#TAP#07" },
{ Name: "loc_id", Value: "loc-001" }
],
MeasureName: "scale_reading",
MeasureValueType: "MULTI",
MeasureValues: [
{ Name: "weight_lbs", Value: "87.3", Type: "DOUBLE" },
{ Name: "level_pct", Value: "54.1", Type: "DOUBLE" },
{ Name: "gallons", Value: "8.4", Type: "DOUBLE" },
{ Name: "battery_v", Value: "3.94", Type: "DOUBLE" },
{ Name: "battery_pct", Value: "78", Type: "DOUBLE" }
]
}
Key Timestream queries¶
History chart (bin size depends on range):
SELECT BIN(time, 1h) AS t, AVG(measure_value::double) AS level_pct
FROM "scale-sense"."keg-readings"
WHERE measure_name = 'level_pct'
AND tap_id = 'loc-001#TAP#07'
AND time >= ago(7d)
GROUP BY BIN(time, 1h)
ORDER BY t ASC
Pour rate projection:
WITH recent AS (
SELECT time, measure_value::double AS gallons
FROM "scale-sense"."keg-readings"
WHERE measure_name = 'gallons'
AND tap_id = 'loc-001#TAP#07'
AND time >= ago(6h)
)
SELECT
(MAX(gallons) - MIN(gallons)) / 6.0 AS gallons_per_hour,
MIN(gallons) / ((MAX(gallons) - MIN(gallons)) / 6.0) AS hours_remaining
FROM recent
Bin size selection (Lambda logic): - ≤ 2h range → 5m bins - ≤ 24h range → 15m bins - ≤ 7d range → 1h bins - > 7d range → 6h bins
Lambda functions¶
ingest-reading (IoT Core trigger)¶
- Parse reading from IoT rule event
- Update
LOCATION#loc-001 / TAP#07#STATEin DynamoDB - Write multi-measure record to Timestream
- Run alert checks (low_keg, low_battery, stale_reading)
- Fan out to WebSocket connections via API Gateway Management API
check-offline (EventBridge, every 5 min)¶
- Scan all DEVICE# META records
- For each, check TAP#STATE updated_at age
- If > 5 min: fire/update scale_offline alert
- If > 30 min: escalate to critical
API handlers (API Gateway REST)¶
- All handlers share Lambda layer: aws-sdk, jsonwebtoken, zod
- Auth: Cognito JWT verified by API Gateway authorizer (not in handler)
- Role checked in handler for write operations
WebSocket handlers¶
- connect: write WS_CONN# to DynamoDB with TTL
- disconnect: delete WS_CONN# from DynamoDB
- message: handle ping/pong keepalive
Alert system¶
Alert states: none → firing → active → escalated → acknowledged → resolved
Grace period (FIRING# TTL): 2 minutes — condition must persist before alert fires. Prevents false positives from transient sensor glitches.
Thresholds (configurable per location in LOCATION#META settings):
low_keg_warning_pct: 20%
low_keg_critical_pct: 10%
low_keg_warning_hours: 8h
low_keg_critical_hours: 2h
offline_warning_secs: 300 (5 min)
offline_critical_secs: 1800 (30 min)
battery_warning_pct: 20%
battery_critical_pct: 10%
calibration_drift_warn: 2%
Quiet hours: push/email suppressed outside operating hours. WebSocket alerts fire immediately regardless of quiet hours. Overnight digest sent at open time.
API Gateway¶
REST API (api.scalesense.com/v1)¶
Cognito authorizer on all routes. Stage variables: dev / staging / prod
Routes summary:
GET /locations
GET /locations/{id}
GET /locations/{id}/taps
GET /locations/{id}/alerts
GET /taps/{id}
PATCH /taps/{id}
POST /taps/{id}/command { command, ...params }
GET /taps/{id}/history ?range=7d
GET /taps/{id}/report
GET /beers?q=search
GET /beers/{id}
POST /beers
GET /kegs/{id}
POST /kegs
PATCH /kegs/{id}
GET /kegs/{id}/readings
GET /devices/{id}
POST /devices/{id}/ota admin only
GET /devices/{id}/telemetry
PATCH /devices/{id}
GET /devices/{id}/calibration
POST /devices/{id}/calibration manager + admin
GET /locations/{id}/inventory/export?format=pdf|csv
WebSocket API (ws.scalesense.com)¶
Routes: $connect, $disconnect, message Auth: Cognito JWT sent in first message (action: "auth")
Events pushed to dashboard clients:
type WSEvent =
| { type: "reading"; tap_id: string; state: TapState }
| { type: "alert_fired"; alert: Alert }
| { type: "alert_cleared"; alert_id: string }
| { type: "device_status"; device_id: string; status: string }
| { type: "maintenance_entered"; tap_id: string; source: string }
| { type: "maintenance_exited"; tap_id: string }
| { type: "tare_updated"; tap_id: string; tare: TareRecord }
| { type: "calibration_updated"; device_id: string; cal: CalSummary }
| { type: "tap_updated"; tap_id: string; tap: Partial<Tap> }
| { type: "ping" }
Cognito¶
User pool per customer account (not per location).
Groups: Staff, Manager, Admin
Permission matrix: | Action | Staff | Manager | Admin | |---|---|---|---| | View tap levels | ✓ | ✓ | ✓ | | Tare / maintenance | ✓ | ✓ | ✓ | | Assign beer/keg | — | ✓ | ✓ | | View reports | — | ✓ | ✓ | | Manage settings | — | ✓ | ✓ | | Calibrate scales | — | ✓ | ✓ | | OTA firmware | — | — | ✓ | | Manage users | — | — | ✓ |
S3 buckets¶
| Bucket | Purpose | Lifecycle |
|---|---|---|
scale-sense-firmware-{env} |
Versioned .bin files | Keep all versions |
scale-sense-exports-{env} |
PDF/CSV inventory exports | Delete after 24h |
Export flow: 1. Lambda generates PDF (ReportLab) or CSV 2. Upload to exports bucket with 15-min TTL object 3. Generate presigned URL (900s expiry) 4. Return URL to client → browser opens → download triggered
VPC / networking¶
- WireGuard EC2 instance (t3.micro) in public subnet
- Pi connects to WireGuard endpoint for SSH access
- All Lambda functions in private subnet
- DynamoDB, Timestream, IoT Core via VPC endpoints (no internet egress)
ECR¶
Repo: scale-sense/pi-gateway
Tags: semver (v1.4.2), latest
Watchtower on Pi polls ECR every hour, pulls new tags automatically.
Pi Docker stack: Mosquitto + OTA server + monitor agent
EventBridge rules¶
Rule: CheckOfflineScales
Schedule: rate(5 minutes)
Target: Lambda check-offline
Rule: DailyInventoryExport
Schedule: cron(0 10 * * ? *) (10 AM UTC = configurable per location)
Target: Lambda inventory-export
Input: { "loc_id": "loc-001", "format": "pdf" }
Environment variables (Lambda)¶
TABLE_NAME=scale-sense
TIMESTREAM_DB=scale-sense
TIMESTREAM_TABLE=keg-readings
IOT_ENDPOINT=xxxxx.iot.us-west-2.amazonaws.com
WS_ENDPOINT=xxxxx.execute-api.us-west-2.amazonaws.com/prod
FIRMWARE_BUCKET=scale-sense-firmware-prod
EXPORTS_BUCKET=scale-sense-exports-prod
USER_POOL_ID=us-west-2_xxxxx
CDK deployment¶
cd infrastructure
npm run build # tsc compile
npx cdk diff # preview changes
npx cdk deploy --all # deploy all stacks
npx cdk deploy ScaleSenseApiStack # deploy one stack
Stack dependency order: 1. NetworkStack (VPC, ECR) 2. StorageStack (S3, DynamoDB, Timestream) 3. IoTStack (IoT Core, Greengrass) 4. AlertStack (SNS, SES, EventBridge) 5. ApiStack (API Gateway, Lambda, Cognito) — depends on all above