Metrics & Observability
BraneMetrics Interface
Implement the BraneMetrics interface to receive callbacks on RPC events:
import sh.brane.rpc.BraneMetrics;
import java.time.Duration;
public class MyMetrics implements BraneMetrics {
@Override
public void onRequestCompleted(String method, Duration latency) {
// Record latency per method
}
@Override
public void onRequestTimeout(String method, long requestId) {
// Increment timeout counter, correlate with logs using requestId
}
@Override
public void onBackpressure(int pendingCount, int maxPendingRequests) {
// Alert: queue at capacity (pendingCount / maxPendingRequests)
}
@Override
public void onOrphanedResponse(String reason) {
// Track responses that arrived after timeout
}
}Wiring Up Metrics
Attach your metrics implementation to a WebSocket provider:
import sh.brane.rpc.WebSocketProvider;
var provider = WebSocketProvider.create("wss://...");
provider.setMetrics(new MyMetrics());Available Hooks
| Method | When Called | Use Case |
|---|---|---|
onRequestStarted(method) | Request begins | Track in-flight requests |
onRequestCompleted(method, latency) | Request succeeds | Latency histograms |
onRequestTimeout(method, requestId) | Request times out | Timeout counters, log correlation |
onRequestFailed(method, error) | Request fails | Error rates |
onBackpressure(pendingCount, maxPending) | Too many pending requests | Backpressure alerts, capacity tuning |
onConnectionLost() | WebSocket disconnects | Connection health |
onReconnect() | WebSocket reconnects | Reconnection tracking |
onSubscriptionNotification(id) | Subscription event received | Subscription throughput |
onRingBufferSaturation(remaining, size) | Buffer nearing capacity | Early warning |
onOrphanedResponse(reason) | Response with no matching request | Detect timeouts, network issues |
onSubscriptionCallbackError(id, error) | Subscription callback throws | Debug misbehaving callbacks |
Micrometer Integration
Here's a complete example integrating with Micrometer:
import sh.brane.rpc.BraneMetrics;
import io.micrometer.core.instrument.MeterRegistry;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
public class MicrometerMetrics implements BraneMetrics {
private final MeterRegistry registry;
private final ConcurrentHashMap<String, Timer> timers = new ConcurrentHashMap<>();
private final Counter backpressureCounter;
private final Counter timeoutCounter;
private final Counter reconnectCounter;
private final Counter orphanCounter;
public MicrometerMetrics(MeterRegistry registry) {
this.registry = registry;
this.backpressureCounter = registry.counter("brane.rpc.backpressure");
this.timeoutCounter = registry.counter("brane.rpc.timeout");
this.reconnectCounter = registry.counter("brane.rpc.reconnect");
this.orphanCounter = registry.counter("brane.rpc.orphaned");
}
@Override
public void onRequestCompleted(String method, Duration latency) {
Timer timer = timers.computeIfAbsent(method, m ->
Timer.builder("brane.rpc.latency")
.tag("method", m)
.register(registry)
);
timer.record(latency);
}
@Override
public void onRequestTimeout(String method, long requestId) {
timeoutCounter.increment();
registry.counter("brane.rpc.timeout", "method", method).increment();
}
@Override
public void onBackpressure(int pendingCount, int maxPendingRequests) {
backpressureCounter.increment();
// Track utilization when backpressure occurs
double utilization = (double) pendingCount / maxPendingRequests;
registry.gauge("brane.rpc.pending.utilization", utilization);
}
@Override
public void onReconnect() {
reconnectCounter.increment();
}
@Override
public void onRingBufferSaturation(long remaining, int size) {
double pctFull = 1.0 - ((double) remaining / size);
registry.gauge("brane.ringbuffer.saturation", pctFull);
}
@Override
public void onOrphanedResponse(String reason) {
orphanCounter.increment();
registry.counter("brane.rpc.orphaned", "reason", reason).increment();
}
}Usage
import io.micrometer.prometheus.PrometheusConfig;
import io.micrometer.prometheus.PrometheusMeterRegistry;
var registry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);
var provider = WebSocketProvider.create("wss://...");
provider.setMetrics(new MicrometerMetrics(registry));
// Expose metrics at /metrics endpoint
// registry.scrape() returns Prometheus formatPrometheus Metrics Example
With the Micrometer integration above, you'll get metrics like:
# TYPE brane_rpc_latency_seconds histogram
brane_rpc_latency_seconds_bucket{method="eth_blockNumber",le="0.01"} 1523
brane_rpc_latency_seconds_bucket{method="eth_blockNumber",le="0.05"} 2341
brane_rpc_latency_seconds_count{method="eth_blockNumber"} 2500
# TYPE brane_rpc_backpressure_total counter
brane_rpc_backpressure_total 3
# TYPE brane_rpc_timeout_total counter
brane_rpc_timeout_total{method="eth_getLogs"} 2
Ring Buffer Saturation
The onRingBufferSaturation hook fires when the Disruptor ring buffer is less than 10% empty. This is an early warning of backpressure.
@Override
public void onRingBufferSaturation(long remaining, int size) {
double pctFull = 1.0 - ((double) remaining / size);
log.warn("Ring buffer {}% full ({} slots remaining)",
(int)(pctFull * 100), remaining);
// Consider: slow down request rate, alert on-call, etc.
}Simple Logging Implementation
For development or debugging, a simple logging implementation:
import sh.brane.rpc.BraneMetrics;
import java.time.Duration;
public class LoggingMetrics implements BraneMetrics {
@Override
public void onRequestCompleted(String method, Duration latency) {
System.out.printf("[METRICS] %s completed in %dms%n",
method, latency.toMillis());
}
@Override
public void onRequestTimeout(String method, long requestId) {
System.err.printf("[METRICS] %s timed out (requestId=%d)%n", method, requestId);
}
@Override
public void onBackpressure(int pendingCount, int maxPendingRequests) {
System.err.printf("[METRICS] Backpressure! %d/%d pending%n",
pendingCount, maxPendingRequests);
}
@Override
public void onConnectionLost() {
System.err.println("[METRICS] Connection lost");
}
@Override
public void onReconnect() {
System.out.println("[METRICS] Reconnected");
}
@Override
public void onOrphanedResponse(String reason) {
System.err.printf("[METRICS] Orphaned response: %s%n", reason);
}
}Thread Safety
All BraneMetrics methods may be called from multiple threads concurrently. Ensure your implementation is thread-safe:
- Use atomic counters or thread-safe collections
- Avoid blocking in callbacks (they're called from I/O threads)
- Micrometer's
MeterRegistryis thread-safe by design
Disabling Metrics
By default, a no-op implementation is used. You don't need to explicitly disable metrics - just don't call setMetrics().
// No metrics collection (default)
var provider = WebSocketProvider.create("wss://...");
// With metrics
provider.setMetrics(new MyMetrics());