Skip to main content

Overview

Provider selection in Lasso operates as a multi-stage pipeline that transforms a pool of candidate providers into an ordered execution list. The pipeline balances performance, reliability, and load distribution based on real-time health metrics and historical performance data.

Pipeline Architecture

Candidate Pool → 7-Stage Filters → Strategy Ranking → Health Tiering → Execution

High-Level Flow

  1. Candidate Pool: All providers configured in profile for the chain
  2. 7-Stage Filters: Exclude ineligible providers (see below)
  3. Strategy Ranking: Order by strategy (fastest, latency_weighted, etc.)
  4. Health Tiering: Reorder into 4 tiers by circuit breaker and rate limit state
  5. Execution: Sequential attempts with automatic failover

7-Stage Filter Pipeline

Implemented in Lasso.Providers.CandidateListing.list_candidates/3:
profile_providers
|> Enum.map(&build_candidate/1)
|> Enum.filter(fn c ->
  transport_available?(c, protocol, profile, chain) and
    circuit_breaker_ready?(c, protocol, include_half_open) and
    rate_limit_ok?(c, protocol, filters)
end)
|> filter_by_lag(profile, chain, max_lag_blocks)
|> filter_by_archival(requires_archival)
|> filter_excluded(filters)

Stage 1: Transport Availability

Purpose: Filter providers based on required transport (HTTP/WebSocket) Logic:
case protocol do
  :http ->
    is_binary(config.url)
  
  :ws ->
    is_binary(config.ws_url) and ws_channel_live?(profile, chain, candidate.id)
  
  :both ->
    is_binary(config.url) or
      (is_binary(config.ws_url) and ws_channel_live?(profile, chain, candidate.id))
  
  nil ->
    is_binary(config.url) or is_binary(config.ws_url)
end
Exclusions:
  • HTTP requests exclude providers without url
  • WebSocket requests exclude providers without active WebSocket connection
  • Checks :transport_channel_cache ETS table for WebSocket liveness

Stage 2: WebSocket Liveness

Purpose: Verify WebSocket channels are actively connected Logic:
case :ets.lookup(:transport_channel_cache, {profile, chain, provider_id, :ws}) do
  [{_, _channel}] -> true
  [] -> false
end
Exclusions:
  • Providers with ws_url configured but no active connection
  • Prevents routing to providers mid-reconnection

Stage 3: Circuit Breaker State

Purpose: Exclude providers with open circuit breakers Logic:
case protocol do
  :http ->
    cb_ready?(cs.http, include_half_open)
  
  :ws ->
    cb_ready?(cs.ws, include_half_open)
  
  :both ->
    (has_http and cs.http != :open) or (has_ws and cs.ws != :open)
end

def cb_ready?(cb_state, include_half_open) do
  if include_half_open, do: cb_state != :open, else: cb_state == :closed
end
Exclusions:
  • Providers with :open circuit breakers are always excluded
  • Providers with :half_open circuit breakers excluded unless include_half_open: true
Circuit Breaker States:
  • :closed - Healthy, provider is eligible
  • :half_open - Recovering, excluded by default (configurable)
  • :open - Failing, always excluded
See Circuit Breakers for state machine details.

Stage 4: Rate Limit State

Purpose: Optionally exclude rate-limited providers Logic:
if Map.get(filters, :exclude_rate_limited, false) do
  rl = candidate.rate_limited
  
  case protocol do
    :http -> not rl.http
    :ws -> not rl.ws
    :both -> not rl.http and not rl.ws
    nil -> not rl.http or not rl.ws
  end
else
  true
end
Exclusions:
  • Only when exclude_rate_limited: true filter is set
  • Checks :lasso_instance_state ETS table for rate limit flags
  • Rate limit state is set by error classification (see error rules in profiles)
By default, rate-limited providers are not excluded, only deprioritized to Tier 2/4 during health tiering.

Stage 5: Lag Filtering

Purpose: Exclude providers that are behind consensus by more than threshold Logic:
case LagCalculation.calculate_optimistic_lag(chain, candidate.instance_id, block_time_ms) do
  {:ok, optimistic_lag, _raw_lag} -> optimistic_lag >= -max_lag_blocks
  {:error, _} -> true  # Include if lag calculation fails
end
Optimistic Lag Calculation:
elapsed_ms = now - timestamp
block_time_ms = Registry.get_block_time_ms(chain) || config.block_time_ms
staleness_credit = min(div(elapsed_ms, block_time_ms), div(30_000, block_time_ms))
optimistic_height = height + staleness_credit
optimistic_lag = optimistic_height - consensus_height
Exclusions:
  • Providers with optimistic_lag < -max_lag_blocks
  • Example: max_lag_blocks: 5 excludes providers more than 5 blocks behind
  • Accounts for observation delay using block time (prevents false lag detection)
Configuration:
selection:
  max_lag_blocks: 1  # L1: 1-2, L2: 3-10
Example (Arbitrum - 250ms blocks, 2s poll):
reported_height: 421,535,503
consensus_height: 421,535,511
raw_lag: -8 blocks

elapsed: 2000ms → credit: 2000/250 = 8 blocks
optimistic_height: 421,535,503 + 8 = 421,535,511
optimistic_lag: 0 blocks ✓ (passes filter)

Stage 6: Archival Filtering

Purpose: Require archival providers for historical queries Logic:
case requires_archival do
  true -> Enum.filter(candidates, fn c -> c.config.archival != false end)
  _ -> candidates
end
Exclusions:
  • Providers with archival: false when requires_archival: true
  • Typically used for eth_getLogs with historical block ranges
Configuration:
providers:
  - id: "quicknode"
    archival: true  # Has full historical data
  - id: "publicnode"
    archival: false  # Only recent blocks

Stage 7: Exclude List

Purpose: Explicitly exclude specific providers Logic:
case Map.get(filters, :exclude) do
  exclude_list when is_list(exclude_list) ->
    Enum.filter(candidates, &(&1.id not in exclude_list))
  _ ->
    candidates
end
Exclusions:
  • Providers in the exclude filter list
  • Useful for temporary provider blacklisting
  • Used during failover to avoid retrying failed providers
Example:
filters = %{
  exclude: ["alchemy", "infura"]  # Skip these providers
}

Candidate Structure

Filtered candidates include metadata for downstream ranking:
%{
  id: "ethereum_llamarpc",
  instance_id: "sha256_hash",
  config: %{
    id: "ethereum_llamarpc",
    url: "https://eth.llamarpc.com",
    ws_url: "wss://eth.llamarpc.com",
    priority: 5,
    capabilities: %{...},
    archival: false,
    name: "LlamaRPC Ethereum"
  },
  availability: :available,  # :available | :degraded | :unavailable
  circuit_state: %{http: :closed, ws: :closed},
  rate_limited: %{http: false, ws: false}
}

Strategy Ranking

After filtering, candidates are ranked by the selected strategy:

Fastest

Ranks by measured latency (ascending):
Enum.sort_by(channels, fn channel ->
  case Map.get(metrics_map, {channel.provider_id, method, channel.transport}) do
    %{latency_ms: ms, last_updated_ms: updated} ->
      age_ms = current_time - updated
      if age_ms > freshness_cutoff, do: cold_start_baseline, else: ms
    _ ->
      cold_start_baseline  # Missing metrics
  end
end)

Latency Weighted

Weighted random selection:
weight = (1 / latency^beta) * success_rate * confidence * calls_scale
weight = max(weight, explore_floor)

Enum.sort_by(channels, fn ch -> -(:rand.uniform() * weight_fn.(ch)) end)

Load Balanced

Random shuffle:
Enum.shuffle(channels)

Priority

Static priority from configuration:
Enum.sort_by(channels, & &1.config.priority)
See Routing Strategies for detailed strategy behavior.

Health-Based Tiering

After strategy ranking, providers are reordered into 4 tiers:

Tier Definitions

  1. Tier 1: Closed circuit + not rate-limited (preferred)
  2. Tier 2: Closed circuit + rate-limited
  3. Tier 3: Half-open circuit + not rate-limited
  4. Tier 4: Half-open circuit + rate-limited
Excluded: Open circuit providers (already filtered in Stage 3)

Tiering Logic

Tiering preserves strategy ranking within each tier:
Original (Strategy Ranking):
[A (fastest, half-open), B (medium, closed), C (slow, closed)]

After Health Tiering:
Tier 1: [B (closed), C (closed)]  # Preserves B before C
Tier 3: [A (half-open)]           # Deprioritized despite being fastest

Why Tiering Matters

Tiering ensures healthy providers receive traffic first: Scenario: 3 providers with fastest strategy
  • Provider A: 200ms latency, half-open circuit → Tier 3
  • Provider B: 350ms latency, closed circuit → Tier 1
  • Provider C: 500ms latency, closed circuit → Tier 1
Result: Provider B (350ms) receives traffic before Provider A (200ms) because it has a closed circuit.

Execution and Failover

Providers are attempted sequentially until success or exhaustion:

Sequential Execution

for channel <- channels do
  case execute_request(channel, method, params) do
    {:ok, result} -> return result
    {:error, reason} -> try next channel
  end
end

Success Criteria

  • 2xx HTTP status
  • Valid JSON-RPC structure
  • No RPC error code (unless expected)

Failure Handling

Retriable Errors (try next provider):
  • :rate_limit - Provider throttling
  • :network_error - Connection failure
  • :server_error - 5xx status
  • :capability_violation - Method not supported
  • :method_not_found - Method not available
Non-Retriable Errors (return immediately):
  • :invalid_params - User error
  • :user_error - Client mistake
  • :client_error - 4xx status

All Providers Exhausted

Returns 503 Service Unavailable with details:
{
  "jsonrpc": "2.0",
  "error": {
    "code": -32000,
    "message": "All providers failed",
    "data": {
      "attempts": [
        {"provider": "alchemy", "error": "rate_limit"},
        {"provider": "infura", "error": "circuit_open"},
        {"provider": "quicknode", "error": "network_error"}
      ]
    }
  }
}

Filter Configuration

Via Selection Filters

alias Lasso.RPC.SelectionFilters

filters = %SelectionFilters{
  protocol: :http,              # :http | :ws | :both | nil
  include_half_open: false,     # Include half-open circuit providers
  exclude_rate_limited: false,  # Exclude rate-limited providers
  max_lag_blocks: 5,            # Maximum lag tolerance
  requires_archival: false,     # Require archival data
  exclude: []                   # Explicit provider exclusion list
}

candidates = CandidateListing.list_candidates(profile, chain, filters)

Via Profile Configuration

selection:
  max_lag_blocks: 1  # Stage 5: Lag filtering
  archival_threshold: 128  # Stage 6: Archival requirement

ETS State Management

The filter pipeline reads from three ETS tables:

:lasso_instance_state

Circuit Breaker State:
{:circuit, instance_id, transport} => %{
  state: :closed | :half_open | :open,
  error: %{code: -32000, category: :server_error, message: "..."} | nil,
  recovery_deadline_ms: 1736894871234 | nil
}
Rate Limit State:
{:rate_limit, instance_id, transport} => %{
  rate_limited: true | false,
  retry_after_ms: 1736894871234 | nil
}
Health State:
{:health, instance_id} => %{
  status: :healthy | :degraded | :unavailable,
  http_status: :healthy | :degraded | :unavailable,
  consecutive_failures: 0,
  consecutive_successes: 5,
  last_error: %{...} | nil,
  last_health_check: 1736894871234
}

:transport_channel_cache

WebSocket Channel Liveness:
{profile, chain, provider_id, :ws} => %Channel{
  provider_id: "ethereum_llamarpc",
  transport: :ws,
  pid: #PID&lt;0.1234.0>,
  config: %{...}
}

:lasso_config_store

Provider Configuration:
{:profile, profile_slug, :chains} => %{
  "ethereum" => %{
    chain_id: 1,
    providers: [...],
    monitoring: %{...},
    selection: %{...}
  }
}

Performance Characteristics

Filter Pipeline Latency

StageLatencyNotes
Transport availability<0.1msSimple field checks
WebSocket liveness<0.1msSingle ETS lookup
Circuit breaker state<0.1msTwo ETS lookups (HTTP + WS)
Rate limit state<0.1msTwo ETS lookups (HTTP + WS)
Lag filtering0.5-2msBlockSync.Registry + consensus calculation
Archival filtering<0.1msSimple field check
Exclude list filtering<0.1msList membership check
Total~2-5msEnd-to-end pipeline

Optimization Techniques

Batch Metrics Fetching:
# Eliminates N sequential GenServer calls
requests = Enum.map(channels, fn ch -> {ch.provider_id, method, ch.transport} end)
metrics_map = Metrics.batch_get_transport_performance(profile, chain, requests)
ETS Read Concurrency:
:ets.new(:lasso_instance_state, [
  :named_table,
  :public,
  :set,
  read_concurrency: true,  # Optimizes concurrent reads
  write_concurrency: true
])
Persistent Term Catalog:
# O(1) provider lookups without ETS overhead
Catalog.get_profile_providers(profile, chain)
# => Reads from :persistent_term (faster than ETS)

Next Steps

Routing Strategies

Understand strategy ranking algorithms

Circuit Breakers

Learn about state machine and recovery

Profiles

Configure provider selection policies

Architecture

Explore the OTP supervision tree