Love where you’re taking this. Here’s a clean, code-first plug-in contract for feeds, plus a messaging plan that scales from your current Redis Router up to NATS/Kafka if/when needed—while keeping Orchestra decoupled and future-proof.
1) The universal contract (what every data source must speak)
1.1 Event envelope (bus-friendly, source-agnostic)
Use this wrapper for every message, no matter the feed:
-
event_typedrives routing and storage. -
schema_refis the only thing Orchestra/normalizers need to know to validate/transform. -
watermarkenables exactly-once or at least-once dedupe. -
payloadfollows a canonical schema (see 1.2).
1.2 Canonical payload schemas (small set, versioned)
Keep these tight and versioned (JSON Schema + Pydantic types; Avro/Protobuf optional later):
-
quotes/v2: bid/ask/mid, sizes, venue, symbol, tick_type. -
trades/v2: price, size, side, trade_id. -
orderbook/v1: levels, depth, side arrays. -
gex/levels/v1: walls, zero-gamma, zero-delta, timestamps. -
news/v1: headline, source, tickers[], sentiment?, url. -
calendar/v1: event_name, actual/forecast/previous, importance.
Example quotes/v2 payload:
Rule: Connectors map proprietary fields → these canonical payloads before publishing.
1.3 Connector manifest (DI-friendly, zero guessing)
Each connector ships a connector.manifest.yaml:
Orchestra reads manifests at startup, registers routes, and applies rate limits automatically.
2) Data flow (conditioning & handoff without choking)
Collector → Normalizer → Bus → Persister → Feature/Signals
-
Collector (source adapter)
Pulls raw data; never publishes raw. Hands raw to the Normalizer via in-proc queue. -
Normalizer (schema compiler)
-
Validates against the canonical
schema_ref(JSON Schema). -
Adds envelope, computes checksum, sets event_time/ingest_time, sequence.
-
Publishes to the bus topic (see 3).
-
-
Persister
-
Subscribes by
event_typeand writes to Timescale tables (quotes,trades,gex_levels,news,calendar). -
Idempotent on
(event_type, source_id, symbol, event_time, sequence, checksum).
-
-
Feature builders / Signals
-
Continuous aggs & materialized views (e.g., ZG/ZD, Markov inputs).
-
Emit derived events (e.g.,
signals/zgzd/v1) back onto the bus for downstream consumers (alerts, autotrade).
-
This guarantees Orchestra only deals in well-formed messages. Garbage never hits your core.
3) Topics, routing, retention
Naming:
Retention:
-
Hot streams (quotes/trades): 24–72h on the bus → persisted in Timescale for long history.
-
Control/Signals: 7–30d (small volume).
-
DLQ per topic:
*.dlqkeeps failed messages + last error.
4) Bus choice: start simple, swap later without rewrites
-
Now (fits your stack): Redis Streams + consumer groups.
Pros: already running; microsecond latency; easy ops.
Usage: 1 stream per topic;MAXLEN ~caps; DLQs per topic. -
If fan-out and global WAN grow: NATS JetStream.
Pros: blazing low-latency, simple semantics, good clustering, subjects with wildcards.
Migration: adapter that reads/writes both for a while. -
If we hit very high throughput & replay needs (TB/day): Kafka.
Pros: huge throughput, long retention, Connect ecosystem.
Cons: heavier ops. Only move when metrics say so.
Design hedge: our envelope + topics + schema are bus-agnostic. Only the transport client changes.
5) Backpressure, DLQ, idempotency
-
Backpressure: rate limiter per connector (from manifest). If lag rises, slow publishers; fast-fail to DLQ on hard errors.
-
Idempotency: dedupe key = hash of
(event_type, source_id, symbol|partition, event_time|sequence, checksum).
Timescale writes useON CONFLICT DO NOTHING. -
DLQ: store the bad envelope + error; auto-replayer service can fix/re-emit post-patch.
6) Orchestration interface (so jobs don’t care about sources)
Jobs never call “Barchart” or “IBKR.” They declare needs:
The runtime resolves data from Timescale (primary) and/or recent bus cache. No source names appear here—total decoupling.
7) Storage model in Timescale (simple & fast)
-
Raw canonical tables (narrow, NOT JSONB for hot paths):
-
quotes(ts, symbol, bid, ask, bid_size, ask_size, source_id, ...); -
trades(ts, symbol, price, size, side, source_id, trade_id, ...); -
gex_levels(asof, symbol, zero_gamma, zero_delta, call_wall, put_wall, source_id, ...); -
news(ts, ticker, headline, source_id, url, sentiment); -
calendar(ts, ticker?, event_name, actual, forecast, importance);
-
-
Continuous aggregates for OHLCV, spreads, features.
-
Materialized views feeding Markov/alerts.
-
Replay: from Timescale or archived Parquet if we add S3 later.
8) Health + governance
-
Every connector exposes
/health(latency, last_ok, lag). -
Status Gateway scrapes and emits
health.json→ Joomla/Svelte. -
Schema governance: JSON Schema in
utp-schemas/, semver’d, CI-validated. -
Manifests in each connector repo; CI checks they don’t claim unsupported event_types.
9) Edge & regions (US/EU/JP chambers)
-
Put Collectors/Normalizers near the venue (latency + regional auth).
-
Persist locally → replicate to central Timescale or query federated views.
-
Topics include
realm®ionso cross-region consumers can subscribe precisely.
10) Practical “Day-1” steps (fits what you’ve built)
-
Lock the envelope + canonical schemas v1.0 in
utp-schemas/(JSON Schema + Pydantic). -
Add connector manifests to IBKR, Tasty, Barchart.
-
Update collectors to publish only canonical envelopes to
utp.prod.chamber_us.*. -
Normalizer as a common lib (
utp-normalize) each connector imports. -
Persister service writes to Timescale with idempotent UPSERTs.
-
Orchestra jobs use needs (event_type/symbols/lookback), not vendor names.
-
Keep Redis Streams now; set per-topic MAXLEN + DLQs; measure lag.
-
Status Gateway already reads health → add bus depth & persister lag metrics.
-
Begin with alert-only path (Telegram/email/WebSocket) from
signals.*events; later flip execution mode per YAML flag + guardrails.
Why this wins
-
Pluggable by design (your DI instincts, but with thin manifests + canonical schemas).
-
Transport-agnostic: Redis→NATS→Kafka without rewriting business code.
-
Operator-friendly: health, DLQs, backpressure, idempotency from day one.
-
Trader-first: Chamber can run alone to produce P&L quickly; Brains and new Chambers slot in later.
If you want, I can draft the JSON Schemas for quotes/v2, trades/v2, and gex/levels/v1 and the baseline topic map so you can line up your first Chamber runs against them immediately.