Google Chart Test

The Data and Orchestra Contract

Love where you’re taking this. Here’s a clean, code-first plug-in contract for feeds, plus a messaging plan that scales from your current Redis Router up to NATS/Kafka if/when needed—while keeping Orchestra decoupled and future-proof.

1) The universal contract (what every data source must speak)

1.1 Event envelope (bus-friendly, source-agnostic)

Use this wrapper for every message, no matter the feed:

 
{ "envelope_version": "1.0", "event_id": "uuid-v7", "event_type": "quotes.tick|trades.tick|orderbook.snapshot|gex.levels|news.item|calendar.item|risk.greeks", "source_id": "ibkr|tasty|barchart|oanda|kraken|coinbase|forexfactory|custom", "realm": "chamber_us|chamber_eu|chamber_tokyo|brain-001", "region": "US|EU|JP", "schema_ref": "utp://schemas/quotes/v2", "sequence": 126733918, // source sequence if available "event_time": "2025-10-28T07:14:29.312Z", // when it happened at venue "ingest_time": "2025-10-28T07:14:29.987Z", // when we saw it "watermark": { "partition": "BTC-USD", "offset": 987654321 }, "checksum": "sha256:…", // of payload "payload": { /* canonical content */ }, "meta": { "trace_id":"...", "retry":0 } }
  • event_type drives routing and storage.

  • schema_ref is the only thing Orchestra/normalizers need to know to validate/transform.

  • watermark enables exactly-once or at least-once dedupe.

  • payload follows a canonical schema (see 1.2).

1.2 Canonical payload schemas (small set, versioned)

Keep these tight and versioned (JSON Schema + Pydantic types; Avro/Protobuf optional later):

  • quotes/v2: bid/ask/mid, sizes, venue, symbol, tick_type.

  • trades/v2: price, size, side, trade_id.

  • orderbook/v1: levels, depth, side arrays.

  • gex/levels/v1: walls, zero-gamma, zero-delta, timestamps.

  • news/v1: headline, source, tickers[], sentiment?, url.

  • calendar/v1: event_name, actual/forecast/previous, importance.

Example quotes/v2 payload:

 
{ "symbol": "NQZ5", "group": "FUT_IDX", "venue": "CME", "bid": 6701.25, "ask": 6701.50, "bid_size": 8, "ask_size": 6, "currency": "USD" }

Rule: Connectors map proprietary fields → these canonical payloads before publishing.

1.3 Connector manifest (DI-friendly, zero guessing)

Each connector ships a connector.manifest.yaml:

 
connector_id: barchart version: 1.2.0 emits: - event_type: gex.levels schema_ref: utp://schemas/gex/levels/v1 requires: secrets: [BARCHART_API_KEY] nets: [internet] deps: ["utp-core>=0.3.0"] capabilities: batching: true backfill: true rate_limits: max_rps: 5 burst: 10 health: endpoint: /health

Orchestra reads manifests at startup, registers routes, and applies rate limits automatically.


2) Data flow (conditioning & handoff without choking)

Collector → Normalizer → Bus → Persister → Feature/Signals

  1. Collector (source adapter)
    Pulls raw data; never publishes raw. Hands raw to the Normalizer via in-proc queue.

  2. Normalizer (schema compiler)

    • Validates against the canonical schema_ref (JSON Schema).

    • Adds envelope, computes checksum, sets event_time/ingest_time, sequence.

    • Publishes to the bus topic (see 3).

  3. Persister

    • Subscribes by event_type and writes to Timescale tables (quotes, trades, gex_levels, news, calendar).

    • Idempotent on (event_type, source_id, symbol, event_time, sequence, checksum).

  4. Feature builders / Signals

    • Continuous aggs & materialized views (e.g., ZG/ZD, Markov inputs).

    • Emit derived events (e.g., signals/zgzd/v1) back onto the bus for downstream consumers (alerts, autotrade).

This guarantees Orchestra only deals in well-formed messages. Garbage never hits your core.


3) Topics, routing, retention

Naming:

 
utp.<env>.<realm>.<event_type>.<partition> e.g. utp.prod.chamber_us.quotes.tick.ES utp.prod.chamber_us.gex.levels.SPX utp.prod.brain-007.signals.zgzd.NQ

Retention:

  • Hot streams (quotes/trades): 24–72h on the bus → persisted in Timescale for long history.

  • Control/Signals: 7–30d (small volume).

  • DLQ per topic: *.dlq keeps failed messages + last error.


4) Bus choice: start simple, swap later without rewrites

  • Now (fits your stack): Redis Streams + consumer groups.
    Pros: already running; microsecond latency; easy ops.
    Usage: 1 stream per topic; MAXLEN ~ caps; DLQs per topic.

  • If fan-out and global WAN grow: NATS JetStream.
    Pros: blazing low-latency, simple semantics, good clustering, subjects with wildcards.
    Migration: adapter that reads/writes both for a while.

  • If we hit very high throughput & replay needs (TB/day): Kafka.
    Pros: huge throughput, long retention, Connect ecosystem.
    Cons: heavier ops. Only move when metrics say so.

Design hedge: our envelope + topics + schema are bus-agnostic. Only the transport client changes.


5) Backpressure, DLQ, idempotency

  • Backpressure: rate limiter per connector (from manifest). If lag rises, slow publishers; fast-fail to DLQ on hard errors.

  • Idempotency: dedupe key = hash of (event_type, source_id, symbol|partition, event_time|sequence, checksum).
    Timescale writes use ON CONFLICT DO NOTHING.

  • DLQ: store the bad envelope + error; auto-replayer service can fix/re-emit post-patch.


6) Orchestration interface (so jobs don’t care about sources)

Jobs never call “Barchart” or “IBKR.” They declare needs:

 
{ "job_type": "compute_zgzd", "needs": [ {"event_type":"gex.levels", "symbols":["ES","NQ"], "lookback":"1d"}, {"event_type":"quotes.tick", "symbols":["ES","NQ"], "lookback":"1d"} ], "mode": "realtime|backfill", "output": {"event_type":"signals.zgzd", "schema_ref":"utp://schemas/signals/zgzd/v1"} }

The runtime resolves data from Timescale (primary) and/or recent bus cache. No source names appear here—total decoupling.


7) Storage model in Timescale (simple & fast)

  • Raw canonical tables (narrow, NOT JSONB for hot paths):

    • quotes(ts, symbol, bid, ask, bid_size, ask_size, source_id, ...);

    • trades(ts, symbol, price, size, side, source_id, trade_id, ...);

    • gex_levels(asof, symbol, zero_gamma, zero_delta, call_wall, put_wall, source_id, ...);

    • news(ts, ticker, headline, source_id, url, sentiment);

    • calendar(ts, ticker?, event_name, actual, forecast, importance);

  • Continuous aggregates for OHLCV, spreads, features.

  • Materialized views feeding Markov/alerts.

  • Replay: from Timescale or archived Parquet if we add S3 later.


8) Health + governance

  • Every connector exposes /health (latency, last_ok, lag).

  • Status Gateway scrapes and emits health.json → Joomla/Svelte.

  • Schema governance: JSON Schema in utp-schemas/, semver’d, CI-validated.

  • Manifests in each connector repo; CI checks they don’t claim unsupported event_types.


9) Edge & regions (US/EU/JP chambers)

  • Put Collectors/Normalizers near the venue (latency + regional auth).

  • Persist locally → replicate to central Timescale or query federated views.

  • Topics include realm & region so cross-region consumers can subscribe precisely.


10) Practical “Day-1” steps (fits what you’ve built)

  1. Lock the envelope + canonical schemas v1.0 in utp-schemas/ (JSON Schema + Pydantic).

  2. Add connector manifests to IBKR, Tasty, Barchart.

  3. Update collectors to publish only canonical envelopes to utp.prod.chamber_us.*.

  4. Normalizer as a common lib (utp-normalize) each connector imports.

  5. Persister service writes to Timescale with idempotent UPSERTs.

  6. Orchestra jobs use needs (event_type/symbols/lookback), not vendor names.

  7. Keep Redis Streams now; set per-topic MAXLEN + DLQs; measure lag.

  8. Status Gateway already reads health → add bus depth & persister lag metrics.

  9. Begin with alert-only path (Telegram/email/WebSocket) from signals.* events; later flip execution mode per YAML flag + guardrails.


Why this wins

  • Pluggable by design (your DI instincts, but with thin manifests + canonical schemas).

  • Transport-agnostic: Redis→NATS→Kafka without rewriting business code.

  • Operator-friendly: health, DLQs, backpressure, idempotency from day one.

  • Trader-first: Chamber can run alone to produce P&L quickly; Brains and new Chambers slot in later.

If you want, I can draft the JSON Schemas for quotes/v2, trades/v2, and gex/levels/v1 and the baseline topic map so you can line up your first Chamber runs against them immediately.