Are you an LLM? Read llms.txt for a summary of the docs, or llms-full.txt for the full context.
Skip to content

Metrics & Observability

BraneMetrics Interface

Implement the BraneMetrics interface to receive callbacks on RPC events:

import sh.brane.rpc.BraneMetrics;
import java.time.Duration;
 
public class MyMetrics implements BraneMetrics {
 
    @Override
    public void onRequestCompleted(String method, Duration latency) {
        // Record latency per method
    }
 
    @Override
    public void onRequestTimeout(String method, long requestId) {
        // Increment timeout counter, correlate with logs using requestId
    }
 
    @Override
    public void onBackpressure(int pendingCount, int maxPendingRequests) {
        // Alert: queue at capacity (pendingCount / maxPendingRequests)
    }
 
    @Override
    public void onOrphanedResponse(String reason) {
        // Track responses that arrived after timeout
    }
}

Wiring Up Metrics

Attach your metrics implementation to a WebSocket provider:

import sh.brane.rpc.WebSocketProvider;
 
var provider = WebSocketProvider.create("wss://...");
provider.setMetrics(new MyMetrics());

Available Hooks

MethodWhen CalledUse Case
onRequestStarted(method)Request beginsTrack in-flight requests
onRequestCompleted(method, latency)Request succeedsLatency histograms
onRequestTimeout(method, requestId)Request times outTimeout counters, log correlation
onRequestFailed(method, error)Request failsError rates
onBackpressure(pendingCount, maxPending)Too many pending requestsBackpressure alerts, capacity tuning
onConnectionLost()WebSocket disconnectsConnection health
onReconnect()WebSocket reconnectsReconnection tracking
onSubscriptionNotification(id)Subscription event receivedSubscription throughput
onRingBufferSaturation(remaining, size)Buffer nearing capacityEarly warning
onOrphanedResponse(reason)Response with no matching requestDetect timeouts, network issues
onSubscriptionCallbackError(id, error)Subscription callback throwsDebug misbehaving callbacks

Micrometer Integration

Here's a complete example integrating with Micrometer:

import sh.brane.rpc.BraneMetrics;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
 
public class MicrometerMetrics implements BraneMetrics {
 
    private final MeterRegistry registry;
    private final ConcurrentHashMap<String, Timer> timers = new ConcurrentHashMap<>();
    private final Counter backpressureCounter;
    private final Counter timeoutCounter;
    private final Counter reconnectCounter;
    private final Counter orphanCounter;
 
    public MicrometerMetrics(MeterRegistry registry) {
        this.registry = registry;
        this.backpressureCounter = registry.counter("brane.rpc.backpressure");
        this.timeoutCounter = registry.counter("brane.rpc.timeout");
        this.reconnectCounter = registry.counter("brane.rpc.reconnect");
        this.orphanCounter = registry.counter("brane.rpc.orphaned");
    }
 
    @Override
    public void onRequestCompleted(String method, Duration latency) {
        Timer timer = timers.computeIfAbsent(method, m ->
            Timer.builder("brane.rpc.latency")
                .tag("method", m)
                .register(registry)
        );
        timer.record(latency);
    }
 
    @Override
    public void onRequestTimeout(String method, long requestId) {
        timeoutCounter.increment();
        registry.counter("brane.rpc.timeout", "method", method).increment();
    }
 
    @Override
    public void onBackpressure(int pendingCount, int maxPendingRequests) {
        backpressureCounter.increment();
        // Track utilization when backpressure occurs
        double utilization = (double) pendingCount / maxPendingRequests;
        registry.gauge("brane.rpc.pending.utilization", utilization);
    }
 
    @Override
    public void onReconnect() {
        reconnectCounter.increment();
    }
 
    @Override
    public void onRingBufferSaturation(long remaining, int size) {
        double pctFull = 1.0 - ((double) remaining / size);
        registry.gauge("brane.ringbuffer.saturation", pctFull);
    }
 
    @Override
    public void onOrphanedResponse(String reason) {
        orphanCounter.increment();
        registry.counter("brane.rpc.orphaned", "reason", reason).increment();
    }
}

Usage

import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
 
var registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
var provider = WebSocketProvider.create("wss://...");
provider.setMetrics(new MicrometerMetrics(registry));
 
// Expose metrics at /metrics endpoint
// registry.scrape() returns Prometheus format

Prometheus Metrics Example

With the Micrometer integration above, you'll get metrics like:

# TYPE brane_rpc_latency_seconds histogram
brane_rpc_latency_seconds_bucket{method="eth_blockNumber",le="0.01"} 1523
brane_rpc_latency_seconds_bucket{method="eth_blockNumber",le="0.05"} 2341
brane_rpc_latency_seconds_count{method="eth_blockNumber"} 2500

# TYPE brane_rpc_backpressure_total counter
brane_rpc_backpressure_total 3

# TYPE brane_rpc_timeout_total counter
brane_rpc_timeout_total{method="eth_getLogs"} 2

Ring Buffer Saturation

The onRingBufferSaturation hook fires when the Disruptor ring buffer is less than 10% empty. This is an early warning of backpressure.

@Override
public void onRingBufferSaturation(long remaining, int size) {
    double pctFull = 1.0 - ((double) remaining / size);
    log.warn("Ring buffer {}% full ({} slots remaining)",
        (int)(pctFull * 100), remaining);
 
    // Consider: slow down request rate, alert on-call, etc.
}

Simple Logging Implementation

For development or debugging, a simple logging implementation:

import sh.brane.rpc.BraneMetrics;
import java.time.Duration;
 
public class LoggingMetrics implements BraneMetrics {
 
    @Override
    public void onRequestCompleted(String method, Duration latency) {
        System.out.printf("[METRICS] %s completed in %dms%n",
            method, latency.toMillis());
    }
 
    @Override
    public void onRequestTimeout(String method, long requestId) {
        System.err.printf("[METRICS] %s timed out (requestId=%d)%n", method, requestId);
    }
 
    @Override
    public void onBackpressure(int pendingCount, int maxPendingRequests) {
        System.err.printf("[METRICS] Backpressure! %d/%d pending%n",
            pendingCount, maxPendingRequests);
    }
 
    @Override
    public void onConnectionLost() {
        System.err.println("[METRICS] Connection lost");
    }
 
    @Override
    public void onReconnect() {
        System.out.println("[METRICS] Reconnected");
    }
 
    @Override
    public void onOrphanedResponse(String reason) {
        System.err.printf("[METRICS] Orphaned response: %s%n", reason);
    }
}

Thread Safety

All BraneMetrics methods may be called from multiple threads concurrently. Ensure your implementation is thread-safe:

  • Use atomic counters or thread-safe collections
  • Avoid blocking in callbacks (they're called from I/O threads)
  • Micrometer's MeterRegistry is thread-safe by design

Disabling Metrics

By default, a no-op implementation is used. You don't need to explicitly disable metrics - just don't call setMetrics().

// No metrics collection (default)
var provider = WebSocketProvider.create("wss://...");
 
// With metrics
provider.setMetrics(new MyMetrics());