Skip to content

Scale Sense — AWS Infrastructure

CDK structure

Language: TypeScript Entry: infrastructure/bin/scale-sense.ts

infrastructure/
  bin/
    scale-sense.ts          CDK app entry point
  lib/
    stacks/
      iot-stack.ts          IoT Core, Greengrass, device registry
      data-stack.ts         DynamoDB, Timestream
      api-stack.ts          API Gateway REST + WebSocket, Lambda, Cognito
      alert-stack.ts        EventBridge rules, SNS, SES
      storage-stack.ts      S3 (firmware, exports)
      network-stack.ts      VPC, WireGuard EC2, ECR
    constructs/
      scale-lambda.ts       Shared Lambda config (layers, env, VPC)
      iot-rule.ts           IoT Core rule helper
  lambda/
    handlers/
      ingest-reading.ts     IoT rule → DynamoDB + Timestream + alerts
      check-offline.ts      EventBridge 5min → scan for offline scales
      api/
        locations.ts
        taps.ts
        beers.ts
        devices.ts
        commands.ts
        inventory-export.ts
      websocket/
        connect.ts
        disconnect.ts
        message.ts
    shared/
      dynamo.ts             DynamoDB client + helpers
      timestream.ts         Timestream write + query helpers
      iot.ts                IoT Core publish helper
      alerts.ts             Alert state machine
      calculations.ts       Weight → gallons conversion
  test/
    ...

IoT Core setup

Device registry

  • Thing type: ScaleSenseScale
  • Thing group: scales-{location_id} (one per bar location)
  • Certificates: X.509, provisioned via fleet provisioning or manual per device
  • Policy: ScaleSenseDevicePolicy
  • Allow: iot:Connect (client ID must match thing name)
  • Allow: iot:Publish on keg/readings/*, keg/status/*, keg/response/*
  • Allow: iot:Subscribe on keg/command/*, keg/ota/*
  • Allow: iot:Receive on keg/command/*, keg/ota/*

IoT rules

Rule: ScaleSenseReadingRule
  SQL: SELECT *, topic(3) as device_id FROM 'keg/readings/+'
  Actions:
    → Lambda: ingest-reading (fan-out to DynamoDB, Timestream, alerts, WebSocket)

Rule: ScaleSenseStatusRule
  SQL: SELECT *, topic(3) as device_id FROM 'keg/status/+'
  Actions:
    → Lambda: ingest-reading (handle status changes)

Greengrass

  • Core devices: one per Pi (registered as Greengrass core)
  • Leaf devices: ESP32 scales (registered as Greengrass leaf devices behind Pi)
  • Component: com.scalesense.mosquitto — Mosquitto config + bridge
  • Component: com.scalesense.ota-server — serves .bin files locally
  • Component: com.scalesense.monitor — Pi health reporting

DynamoDB

Table name: scale-sense Billing: PAY_PER_REQUEST (on-demand) Point-in-time recovery: enabled

Key schema (single-table design)

PK SK Entity Key attributes
LOCATION#loc-001 META Location name, address, timezone, settings
LOCATION#loc-001 TAP#07 Tap config scale_id, beer_id, keg_id, keg_type, full_lbs, empty_lbs
LOCATION#loc-001 TAP#07#STATE Tap state level_pct, gallons, weight_lbs, updated_at, tare_status
BEER#guinness-draught META Beer name, brewery, style, abv, specific_gravity
BEER#guinness-draught KEG#half_barrel Keg weights full_lbs, empty_lbs
KEG#keg-20260318-007 META Keg lifecycle beer_id, keg_type, tapped_at, loc_id
KEG#keg-20260318-007 TARE Tare record empty_offset, full_ref, tared_by, tared_at
DEVICE#SCALE-0042 META Device loc_id, tap_id, fw_version, status
DEVICE#SCALE-0042 TELEMETRY Device health battery_v, battery_pct, rssi, updated_at
DEVICE#SCALE-0042 CALIBRATION Current cal scale_factor, zero_offset, method, drift_pct, calibrated_at
DEVICE#SCALE-0042 CAL#<timestamp> Cal history scale_factor, known_weight, by, method, drift_pct
ALERT#loc-001 ACTIVE#low_keg#TAP#07 Active alert type, severity, state, fired_at, context
ALERT#loc-001 HIST#<timestamp> Alert log full alert record
FIRING#TAP#07 low_keg Grace period started_at, severity — TTL = grace_period_secs
WS_CONN#<conn_id> META WS connection loc_id, user_id, expires_at — TTL = 2h

GSIs

GSI: device-index
  PK: device_id (maps DEVICE#xxx → loc_id, tap_id)

GSI: location-battery-index
  PK: loc_id
  SK: battery_pct
  (query low battery devices at a location)

DynamoDB TTL

  • FIRING# rows: TTL = grace_period_secs (auto-cleanup stale grace periods)
  • WS_CONN# rows: TTL = connection_expires_at (auto-cleanup stale WS registrations)

Timestream

Database: scale-sense Table: keg-readings

Memory store retention: 24 hours (fast queries) Magnetic store retention: 1 year (historical, configurable per location)

Record format (multi-measure — always use this)

{
  Time: timestamp,
  Dimensions: [
    { Name: "device_id", Value: "SCALE-0042" },
    { Name: "tap_id",    Value: "loc-001#TAP#07" },
    { Name: "loc_id",    Value: "loc-001" }
  ],
  MeasureName: "scale_reading",
  MeasureValueType: "MULTI",
  MeasureValues: [
    { Name: "weight_lbs",  Value: "87.3",  Type: "DOUBLE" },
    { Name: "level_pct",   Value: "54.1",  Type: "DOUBLE" },
    { Name: "gallons",     Value: "8.4",   Type: "DOUBLE" },
    { Name: "battery_v",   Value: "3.94",  Type: "DOUBLE" },
    { Name: "battery_pct", Value: "78",    Type: "DOUBLE" }
  ]
}

Key Timestream queries

History chart (bin size depends on range):

SELECT BIN(time, 1h) AS t, AVG(measure_value::double) AS level_pct
FROM "scale-sense"."keg-readings"
WHERE measure_name = 'level_pct'
  AND tap_id = 'loc-001#TAP#07'
  AND time >= ago(7d)
GROUP BY BIN(time, 1h)
ORDER BY t ASC

Pour rate projection:

WITH recent AS (
  SELECT time, measure_value::double AS gallons
  FROM "scale-sense"."keg-readings"
  WHERE measure_name = 'gallons'
    AND tap_id = 'loc-001#TAP#07'
    AND time >= ago(6h)
)
SELECT
  (MAX(gallons) - MIN(gallons)) / 6.0 AS gallons_per_hour,
  MIN(gallons) / ((MAX(gallons) - MIN(gallons)) / 6.0) AS hours_remaining
FROM recent

Bin size selection (Lambda logic): - ≤ 2h range → 5m bins - ≤ 24h range → 15m bins - ≤ 7d range → 1h bins - > 7d range → 6h bins

Lambda functions

ingest-reading (IoT Core trigger)

  1. Parse reading from IoT rule event
  2. Update LOCATION#loc-001 / TAP#07#STATE in DynamoDB
  3. Write multi-measure record to Timestream
  4. Run alert checks (low_keg, low_battery, stale_reading)
  5. Fan out to WebSocket connections via API Gateway Management API

check-offline (EventBridge, every 5 min)

  1. Scan all DEVICE# META records
  2. For each, check TAP#STATE updated_at age
  3. If > 5 min: fire/update scale_offline alert
  4. If > 30 min: escalate to critical

API handlers (API Gateway REST)

  • All handlers share Lambda layer: aws-sdk, jsonwebtoken, zod
  • Auth: Cognito JWT verified by API Gateway authorizer (not in handler)
  • Role checked in handler for write operations

WebSocket handlers

  • connect: write WS_CONN# to DynamoDB with TTL
  • disconnect: delete WS_CONN# from DynamoDB
  • message: handle ping/pong keepalive

Alert system

Alert states: none → firing → active → escalated → acknowledged → resolved

Grace period (FIRING# TTL): 2 minutes — condition must persist before alert fires. Prevents false positives from transient sensor glitches.

Thresholds (configurable per location in LOCATION#META settings):

low_keg_warning_pct:    20%
low_keg_critical_pct:   10%
low_keg_warning_hours:  8h
low_keg_critical_hours: 2h
offline_warning_secs:   300  (5 min)
offline_critical_secs:  1800 (30 min)
battery_warning_pct:    20%
battery_critical_pct:   10%
calibration_drift_warn: 2%

Quiet hours: push/email suppressed outside operating hours. WebSocket alerts fire immediately regardless of quiet hours. Overnight digest sent at open time.

API Gateway

REST API (api.scalesense.com/v1)

Cognito authorizer on all routes. Stage variables: dev / staging / prod

Routes summary:

GET    /locations
GET    /locations/{id}
GET    /locations/{id}/taps
GET    /locations/{id}/alerts
GET    /taps/{id}
PATCH  /taps/{id}
POST   /taps/{id}/command        { command, ...params }
GET    /taps/{id}/history        ?range=7d
GET    /taps/{id}/report
GET    /beers?q=search
GET    /beers/{id}
POST   /beers
GET    /kegs/{id}
POST   /kegs
PATCH  /kegs/{id}
GET    /kegs/{id}/readings
GET    /devices/{id}
POST   /devices/{id}/ota         admin only
GET    /devices/{id}/telemetry
PATCH  /devices/{id}
GET    /devices/{id}/calibration
POST   /devices/{id}/calibration manager + admin
GET    /locations/{id}/inventory/export?format=pdf|csv

WebSocket API (ws.scalesense.com)

Routes: $connect, $disconnect, message Auth: Cognito JWT sent in first message (action: "auth")

Events pushed to dashboard clients:

type WSEvent =
  | { type: "reading";             tap_id: string; state: TapState }
  | { type: "alert_fired";         alert: Alert }
  | { type: "alert_cleared";       alert_id: string }
  | { type: "device_status";       device_id: string; status: string }
  | { type: "maintenance_entered"; tap_id: string; source: string }
  | { type: "maintenance_exited";  tap_id: string }
  | { type: "tare_updated";        tap_id: string; tare: TareRecord }
  | { type: "calibration_updated"; device_id: string; cal: CalSummary }
  | { type: "tap_updated";         tap_id: string; tap: Partial<Tap> }
  | { type: "ping" }

Cognito

User pool per customer account (not per location). Groups: Staff, Manager, Admin

Permission matrix: | Action | Staff | Manager | Admin | |---|---|---|---| | View tap levels | ✓ | ✓ | ✓ | | Tare / maintenance | ✓ | ✓ | ✓ | | Assign beer/keg | — | ✓ | ✓ | | View reports | — | ✓ | ✓ | | Manage settings | — | ✓ | ✓ | | Calibrate scales | — | ✓ | ✓ | | OTA firmware | — | — | ✓ | | Manage users | — | — | ✓ |

S3 buckets

Bucket Purpose Lifecycle
scale-sense-firmware-{env} Versioned .bin files Keep all versions
scale-sense-exports-{env} PDF/CSV inventory exports Delete after 24h

Export flow: 1. Lambda generates PDF (ReportLab) or CSV 2. Upload to exports bucket with 15-min TTL object 3. Generate presigned URL (900s expiry) 4. Return URL to client → browser opens → download triggered

VPC / networking

  • WireGuard EC2 instance (t3.micro) in public subnet
  • Pi connects to WireGuard endpoint for SSH access
  • All Lambda functions in private subnet
  • DynamoDB, Timestream, IoT Core via VPC endpoints (no internet egress)

ECR

Repo: scale-sense/pi-gateway Tags: semver (v1.4.2), latest Watchtower on Pi polls ECR every hour, pulls new tags automatically. Pi Docker stack: Mosquitto + OTA server + monitor agent

EventBridge rules

Rule: CheckOfflineScales
  Schedule: rate(5 minutes)
  Target: Lambda check-offline

Rule: DailyInventoryExport
  Schedule: cron(0 10 * * ? *)  (10 AM UTC = configurable per location)
  Target: Lambda inventory-export
  Input: { "loc_id": "loc-001", "format": "pdf" }

Environment variables (Lambda)

TABLE_NAME=scale-sense
TIMESTREAM_DB=scale-sense
TIMESTREAM_TABLE=keg-readings
IOT_ENDPOINT=xxxxx.iot.us-west-2.amazonaws.com
WS_ENDPOINT=xxxxx.execute-api.us-west-2.amazonaws.com/prod
FIRMWARE_BUCKET=scale-sense-firmware-prod
EXPORTS_BUCKET=scale-sense-exports-prod
USER_POOL_ID=us-west-2_xxxxx

CDK deployment

cd infrastructure
npm run build                    # tsc compile
npx cdk diff                     # preview changes
npx cdk deploy --all             # deploy all stacks
npx cdk deploy ScaleSenseApiStack  # deploy one stack

Stack dependency order: 1. NetworkStack (VPC, ECR) 2. StorageStack (S3, DynamoDB, Timestream) 3. IoTStack (IoT Core, Greengrass) 4. AlertStack (SNS, SES, EventBridge) 5. ApiStack (API Gateway, Lambda, Cognito) — depends on all above