Real-time Subscriptions
Setup
import sh.brane.rpc.Brane;
// Create a WebSocket-based client using the builder
Brane client = Brane.builder()
.wsUrl("wss://ethereum.publicnode.com")
.build();Subscribe to New Blocks
Get notified when new blocks are mined:
import sh.brane.rpc.Subscription;
import sh.brane.core.model.BlockHeader;
Subscription subscription = client.onNewHeads(header -> {
System.out.println("New block #" + header.number());
System.out.println(" Hash: " + header.hash());
System.out.println(" Timestamp: " + header.timestamp());
if (header.baseFeePerGas() != null) {
System.out.println(" Base Fee: " + header.baseFeePerGas().value() + " wei");
}
});
// Keep running...
Thread.sleep(60_000);
// Unsubscribe when done
subscription.unsubscribe();BlockHeader Fields
| Field | Type | Description |
|---|---|---|
hash() | Hash | Block hash |
number() | Long | Block number |
parentHash() | Hash | Parent block hash |
timestamp() | Long | Unix timestamp |
baseFeePerGas() | Wei | EIP-1559 base fee (null for pre-London) |
Subscribe to Logs
Monitor contract events in real-time:
import sh.brane.rpc.LogFilter;
import sh.brane.rpc.Subscription;
import sh.brane.core.model.LogEntry;
import sh.brane.core.types.Address;
import sh.brane.core.types.Hash;
import java.util.List;
// USDC contract on Ethereum mainnet
Address usdc = new Address("0xA0b86991c6218b36c1d19D4a2e9Eb0cE3606eB48");
// Transfer(address,address,uint256) event signature
Hash transferTopic = new Hash("0xddf252ad1be2c89b69c2b068fc378daa952ba7f163c4a11628f55a4df523b3ef");
// Use the byContract factory method
LogFilter filter = LogFilter.byContract(usdc, List.of(transferTopic));
Subscription subscription = client.onLogs(filter, log -> {
System.out.println("Transfer detected!");
System.out.println(" Tx: " + log.transactionHash());
System.out.println(" From: " + log.topics().get(1)); // indexed param
System.out.println(" To: " + log.topics().get(2)); // indexed param
System.out.println(" Data: " + log.data()); // amount (encoded)
});LogEntry Fields
| Field | Type | Description |
|---|---|---|
address() | Address | Contract address |
data() | HexData | Non-indexed event data |
topics() | List<Hash> | Event signature + indexed parameters |
transactionHash() | Hash | Transaction that emitted the log |
blockHash() | Hash | Block containing the log |
logIndex() | int | Log index in the block |
removed() | boolean | True if log was reverted (reorg) |
Subscribe to Pending Transactions
Monitor the mempool for new pending transactions:
// Use the low-level provider API for pending transactions
String subscriptionId = provider.subscribe("newPendingTransactions", null, event -> {
String txHash = event.toString();
System.out.println("Pending tx: " + txHash);
});
// Later, unsubscribe
provider.unsubscribe(subscriptionId);Managing Subscriptions
Unsubscribe
Always unsubscribe when done to free resources:
Subscription sub = client.onNewHeads(header -> { ... });
// When done
sub.unsubscribe();Multiple Subscriptions
You can have multiple subscriptions on the same connection:
var blockSub = client.onNewHeads(header -> { ... });
var logSub = client.onLogs(filter, log -> { ... });
// Both run concurrently on the same WebSocket connectionCallback Threading
By default, callbacks run on virtual threads to prevent blocking the Netty I/O thread.
Custom Executor
For CPU-intensive callbacks or custom threading:
import java.util.concurrent.Executors;
// Run callbacks on a fixed thread pool
provider.setSubscriptionExecutor(
Executors.newFixedThreadPool(4)
);Error Handling
Reorgs (Chain Reorganization)
When a reorg occurs, logs may be marked as removed:
client.onLogs(filter, log -> {
if (log.removed()) {
System.out.println("Log reverted due to reorg: " + log.transactionHash());
// Handle revert - e.g., undo state change
return;
}
// Process normally
});Connection Loss
If the WebSocket connection drops, subscriptions are lost. Re-subscribe after reconnection:
// The provider handles reconnection automatically,
// but you need to re-subscribe
Consumer<BlockHeader> callback = header -> {
System.out.println("New block: " + header.number());
};
try {
var sub = client.onNewHeads(callback);
} catch (RpcException e) {
// Handle error, implement retry logic
System.err.println("Subscription failed: " + e.getMessage());
}Best Practices
-
Always unsubscribe — Prevents resource leaks on both client and server.
-
Handle reorgs — Check
log.removed()for critical applications. -
Don't block callbacks — Use the subscription executor for heavy work.
-
Use try-with-resources for client — Ensures cleanup:
try (Brane client = Brane.builder().wsUrl("wss://...").build()) { var sub = client.onNewHeads(callback); // ... } // Client closes, subscriptions cleaned up -
Filter precisely — The more specific your
LogFilter, the less data transferred.