Overview
Lasso’s WebSocket subscription system provides continuity-preserving event delivery with automatic failover and gap-filling. The architecture multiplexes client subscriptions to minimize upstream connections while ensuring no events are lost during provider failures.Architecture
Subscription Flow
Key Components
UpstreamSubscriptionPool- Per-(profile, chain) GenServer multiplexing client subscriptions
- Resolves
provider_id→instance_idvia Catalog - Calls
InstanceSubscriptionManager.ensure_subscriptionto share upstream subscriptions across profiles - Registers in
InstanceSubscriptionRegistryto receive events
- Per-instance GenServer managing upstream WebSocket subscriptions
- Handles subscription lifecycle (
eth_subscribe,eth_unsubscribe) with the upstream provider - Receives events from WSConnection via PubSub topic
ws:subs:instance:{instance_id} - Dispatches events to consumers via
InstanceSubscriptionRegistry(duplicate-key Registry)
- Per-subscription-key GenServer managing continuity and gap-filling
- Orchestrates failover with synchronous state transitions
- Ensures gap-free, duplicate-free event delivery
- Location:
lib/lasso/core/streaming/stream_coordinator.ex
- GenServer managing persistent WebSocket connection
- Started via
start_shared_link/1under InstanceSupervisor - Shared across all profiles using the same upstream
Multiplexing
N:1 Client-to-Upstream Ratio
100 clients subscribing toeth_subscribe("newHeads") share a single upstream subscription.
Benefits:
- Reduced provider API usage
- Lower latency (no connection setup per client)
- Efficient resource utilization
Failover with Gap-Filling
Failover State Machine
StreamCoordinator implements a synchronous state machine: States::active- Normal event processing:backfilling- Fetching missed blocks/logs via HTTP:switching- Resubscribing to new provider:degraded- Circuit breaker triggered (max failover attempts exceeded)
Gap-Filling Process
On provider failure mid-stream:- Detect Failure: StreamCoordinator receives
{:provider_unhealthy, failed_id, proposed_new_id} - Compute Gap: Calculate
last_seen_blockto current head using consensus height (<1ms vs 200-500ms blocking request) - Backfill via HTTP: GapFiller fetches missed blocks/logs from independent HTTP provider (decoupled from WebSocket provider selection)
- Buffer Incoming Events: Events from new provider are buffered during backfill
- Resubscribe: Request new WebSocket subscription to proposed provider
- Drain Buffer: Deduplicate and emit buffered events after transition
Continuity Policy
:best_effort (default):
- Backfills up to
max_backfill_blocks(default: 32) - If gap exceeds limit, fills what it can and continues
:strict_abort:
- Fails if gap exceeds
max_backfill_blocks - Guarantees complete continuity or explicit failure
Deduplication
StreamCoordinator maintains a dedupe cache to filter duplicate events during failover: newHeads Deduplication:- Key: Block hash
- Max items: 256 (configurable)
- Max age: 30 seconds
- Key:
{blockNumber, transactionIndex, logIndex} - Same cache limits
Circuit Breaker Protection
Failover Rate Limiting
StreamCoordinator implements exponential backoff for rapid failover attempts: Thresholds:max_failover_attempts: 3 (default)failover_cooldown_ms: 5,000ms window
- Transition to
:degradedstate - Drop incoming events (emit telemetry for monitoring)
- Schedule retry after 30 seconds
- Clear failure history and attempt recovery
- Max buffer size: 100 events (configurable)
- Overflow strategy: Drop oldest, keep newest
- Buffer preserved during cascade (multi-provider failover)
Provider Selection for Failover
Decoupled HTTP Backfill
Gap-filling uses independent HTTP provider selection:Cascade Failover
If resubscription to the proposed provider fails:- Check circuit breaker (recent failures < max attempts?)
- Select next provider from priority list (excluding all previously failed)
- Preserve event buffer from failed attempt
- Initiate new failover cycle
- If no providers available → enter degraded mode
Telemetry Events
StreamCoordinator emits detailed telemetry:Supported Subscription Types
newHeads
- Last seen: block number
- Gap calculation: current head - last seen
- Backfill:
eth_getBlockByNumberfor each missing block
logs
- Last seen: highest block number in logs or last newHeads block
- Gap calculation: current head - last log block
- Backfill:
eth_getLogswith same filter + block range
Performance Characteristics
Overhead:- Deduplication check: <0.1ms (in-memory cache lookup)
- Gap calculation: <1ms (consensus height from ETS)
- Failover latency: 200-500ms (backfill + resubscribe)
- Subscriptions per upstream: 1,000+ clients per upstream subscription
- Memory per StreamCoordinator: <10KB (state + dedupe cache)
- Concurrent failovers: Independent per subscription key
Configuration
Best Practices
For Production Deployments
- Monitor failover metrics: Track
[:lasso, :subs, :failover, :degraded]events - Configure provider priorities: Order providers by subscription reliability
- Set appropriate backfill limits: Balance continuity vs latency (32 blocks ≈ 6 minutes on Ethereum)
- Use best_effort for high-throughput chains: Arbitrum’s fast blocks can create large gaps
For Development
- Enable debug logging: Set
continuity_policy: :strict_abortto detect gap issues - Test failover scenarios: Manually trigger provider failures to verify gap-filling
- Monitor telemetry: Use LiveView dashboard to observe failover state transitions
Related Documentation
- Block Sync - Block height monitoring for gap calculation
- Error Classification - Provider failure detection
- Benchmarking - Provider selection for HTTP backfill