Skip to main content

Overview

Lasso’s WebSocket subscription system provides continuity-preserving event delivery with automatic failover and gap-filling. The architecture multiplexes client subscriptions to minimize upstream connections while ensuring no events are lost during provider failures.

Architecture

Subscription Flow

Client (Viem/Wagmi)

RPCSocket (Phoenix Channel)

SubscriptionRouter

UpstreamSubscriptionPool (multiplexing)

WSConnection (upstream provider)

StreamCoordinator (per-subscription key)

├─→ GapFiller (HTTP backfill)
└─→ ClientSubscriptionRegistry (fan-out)

Key Components

UpstreamSubscriptionPool
  • Per-(profile, chain) GenServer multiplexing client subscriptions
  • Resolves provider_idinstance_id via Catalog
  • Calls InstanceSubscriptionManager.ensure_subscription to share upstream subscriptions across profiles
  • Registers in InstanceSubscriptionRegistry to receive events
InstanceSubscriptionManager
  • Per-instance GenServer managing upstream WebSocket subscriptions
  • Handles subscription lifecycle (eth_subscribe, eth_unsubscribe) with the upstream provider
  • Receives events from WSConnection via PubSub topic ws:subs:instance:{instance_id}
  • Dispatches events to consumers via InstanceSubscriptionRegistry (duplicate-key Registry)
StreamCoordinator
  • Per-subscription-key GenServer managing continuity and gap-filling
  • Orchestrates failover with synchronous state transitions
  • Ensures gap-free, duplicate-free event delivery
  • Location: lib/lasso/core/streaming/stream_coordinator.ex
WSConnection
  • GenServer managing persistent WebSocket connection
  • Started via start_shared_link/1 under InstanceSupervisor
  • Shared across all profiles using the same upstream

Multiplexing

N:1 Client-to-Upstream Ratio

100 clients subscribing to eth_subscribe("newHeads") share a single upstream subscription. Benefits:
  • Reduced provider API usage
  • Lower latency (no connection setup per client)
  • Efficient resource utilization
Implementation: The UpstreamSubscriptionPool tracks subscription reference counts:
# First client subscribes
eth_subscribe("newHeads") → upstream provider

# Clients 2-100 subscribe
eth_subscribe("newHeads") → reuse existing upstream subscription

# Last client unsubscribes
eth_unsubscribe(subscription_id) → cleanup upstream subscription

Failover with Gap-Filling

Failover State Machine

StreamCoordinator implements a synchronous state machine: States:
  • :active - Normal event processing
  • :backfilling - Fetching missed blocks/logs via HTTP
  • :switching - Resubscribing to new provider
  • :degraded - Circuit breaker triggered (max failover attempts exceeded)
Transitions:
:active → :backfilling

:backfilling → :switching

:switching → :active (success)
:switching → :backfilling (cascade to next provider)

Gap-Filling Process

On provider failure mid-stream:
  1. Detect Failure: StreamCoordinator receives {:provider_unhealthy, failed_id, proposed_new_id}
  2. Compute Gap: Calculate last_seen_block to current head using consensus height (<1ms vs 200-500ms blocking request)
  3. Backfill via HTTP: GapFiller fetches missed blocks/logs from independent HTTP provider (decoupled from WebSocket provider selection)
  4. Buffer Incoming Events: Events from new provider are buffered during backfill
  5. Resubscribe: Request new WebSocket subscription to proposed provider
  6. Drain Buffer: Deduplicate and emit buffered events after transition
Result: Clients receive continuous event stream without gaps or duplicates.

Continuity Policy

:best_effort (default):
  • Backfills up to max_backfill_blocks (default: 32)
  • If gap exceeds limit, fills what it can and continues
:strict_abort:
  • Fails if gap exceeds max_backfill_blocks
  • Guarantees complete continuity or explicit failure
Configuration:
# StreamCoordinator initialization
max_backfill_blocks: 32,
backfill_timeout: 30_000,
continuity_policy: :best_effort

Deduplication

StreamCoordinator maintains a dedupe cache to filter duplicate events during failover: newHeads Deduplication:
  • Key: Block hash
  • Max items: 256 (configurable)
  • Max age: 30 seconds
logs Deduplication:
  • Key: {blockNumber, transactionIndex, logIndex}
  • Same cache limits
Buffer Ordering: Before deduplication, buffered events are sorted deterministically:
# newHeads: sort by block number
ordered_buffer = Enum.sort_by(buffer, fn payload ->
  decode_hex(Map.get(payload, "number", "0x0"))
end)

# logs: sort by (blockNumber, transactionIndex, logIndex)
ordered_buffer = Enum.sort_by(buffer, fn log ->
  {decode_hex(Map.get(log, "blockNumber", "0x0")),
   decode_hex(Map.get(log, "transactionIndex", "0x0")),
   decode_hex(Map.get(log, "logIndex", "0x0"))}
end)

Circuit Breaker Protection

Failover Rate Limiting

StreamCoordinator implements exponential backoff for rapid failover attempts: Thresholds:
  • max_failover_attempts: 3 (default)
  • failover_cooldown_ms: 5,000ms window
Behavior: If 3 failover attempts occur within 5 seconds:
  1. Transition to :degraded state
  2. Drop incoming events (emit telemetry for monitoring)
  3. Schedule retry after 30 seconds
  4. Clear failure history and attempt recovery
Event Buffering During Failover:
  • Max buffer size: 100 events (configurable)
  • Overflow strategy: Drop oldest, keep newest
  • Buffer preserved during cascade (multi-provider failover)

Provider Selection for Failover

Decoupled HTTP Backfill

Gap-filling uses independent HTTP provider selection:
# WebSocket failover: priority strategy (configured order)
pick_next_provider(excluded_providers, protocol: :ws, strategy: :priority)

# HTTP backfill: fastest strategy (lowest latency)
pick_best_http_provider(excluded_providers, protocol: :http, strategy: :fastest)
Rationale: The best WebSocket provider for subscriptions may not be the fastest for bulk historical queries.

Cascade Failover

If resubscription to the proposed provider fails:
  1. Check circuit breaker (recent failures < max attempts?)
  2. Select next provider from priority list (excluding all previously failed)
  3. Preserve event buffer from failed attempt
  4. Initiate new failover cycle
  5. If no providers available → enter degraded mode

Telemetry Events

StreamCoordinator emits detailed telemetry:
[:lasso, :subs, :failover, :initiated]
# Metadata: chain, key, old_provider, new_provider

[:lasso, :subs, :failover, :backfill_started]
# Measurements: count (blocks to backfill)
# Metadata: chain, provider_id, from_block, to_block

[:lasso, :subs, :failover, :backfill_completed]
# Measurements: count (blocks fetched)
# Metadata: chain, from_block, to_block

[:lasso, :subs, :failover, :resubscribe_initiated]
# Metadata: chain, key, provider_id

[:lasso, :subs, :failover, :completed]
# Measurements: duration_ms
# Metadata: chain, key

[:lasso, :subs, :failover, :degraded]
# Metadata: chain, key

[:lasso, :stream, :dropped_event]
# Measurements: count
# Metadata: chain, reason (:degraded_mode)

Supported Subscription Types

newHeads

await provider.send('eth_subscribe', ['newHeads'])
Continuity Tracking:
  • Last seen: block number
  • Gap calculation: current head - last seen
  • Backfill: eth_getBlockByNumber for each missing block

logs

await provider.send('eth_subscribe', ['logs', {
  address: '0x....',
  topics: ['0x....']
}])
Continuity Tracking:
  • Last seen: highest block number in logs or last newHeads block
  • Gap calculation: current head - last log block
  • Backfill: eth_getLogs with same filter + block range

Performance Characteristics

Overhead:
  • Deduplication check: <0.1ms (in-memory cache lookup)
  • Gap calculation: <1ms (consensus height from ETS)
  • Failover latency: 200-500ms (backfill + resubscribe)
Scalability:
  • Subscriptions per upstream: 1,000+ clients per upstream subscription
  • Memory per StreamCoordinator: <10KB (state + dedupe cache)
  • Concurrent failovers: Independent per subscription key

Configuration

# Per-chain WebSocket subscription settings
chains:
  ethereum:
    subscription:
      max_backfill_blocks: 32
      backfill_timeout: 30000
      continuity_policy: best_effort  # or strict_abort
      dedupe_max_items: 256
      dedupe_max_age_ms: 30000
      max_failover_attempts: 3
      failover_cooldown_ms: 5000
      max_event_buffer: 100

Best Practices

For Production Deployments

  1. Monitor failover metrics: Track [:lasso, :subs, :failover, :degraded] events
  2. Configure provider priorities: Order providers by subscription reliability
  3. Set appropriate backfill limits: Balance continuity vs latency (32 blocks ≈ 6 minutes on Ethereum)
  4. Use best_effort for high-throughput chains: Arbitrum’s fast blocks can create large gaps

For Development

  1. Enable debug logging: Set continuity_policy: :strict_abort to detect gap issues
  2. Test failover scenarios: Manually trigger provider failures to verify gap-filling
  3. Monitor telemetry: Use LiveView dashboard to observe failover state transitions