Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
21c4a6e
feat: add common ancestor to notif
LeoPatOZ Dec 4, 2025
5ef900b
fix: update test for new reorg notif
LeoPatOZ Dec 4, 2025
c956b43
feat: update common fn with new ancestor block notif
LeoPatOZ Dec 4, 2025
4bc5b1f
feat: add reorg streaming and log removal logic
LeoPatOZ Dec 4, 2025
567ddda
ref: move reorg check to internal
LeoPatOZ Dec 4, 2025
31c0e5a
feat: finalized tip reorg opt
LeoPatOZ Dec 4, 2025
5af945c
format: common rs
LeoPatOZ Dec 5, 2025
4737724
Merge branch 'main' into reorg-log-opt
LeoPatOZ Dec 5, 2025
e0286f2
ref: simplify if else
LeoPatOZ Dec 5, 2025
8f204d5
test: latest roerg logic
LeoPatOZ Dec 5, 2025
637fa87
doc: add note about possible opt
LeoPatOZ Dec 5, 2025
3633605
doc: fix comment
LeoPatOZ Dec 5, 2025
11c71aa
Merge branch 'main' into reorg-log-opt
LeoPatOZ Dec 5, 2025
aa6c884
ref: ignore reorg tests for now (add comment why)
LeoPatOZ Dec 8, 2025
c27b70d
ref: remove from in handle reorg
LeoPatOZ Dec 8, 2025
c74e6d5
fix: only send reorg range but continue rewind from previous batch to
LeoPatOZ Dec 8, 2025
e357412
ref: remove retain
LeoPatOZ Dec 8, 2025
b0eaf4b
ref: better reorg block included test + naming
LeoPatOZ Dec 8, 2025
87ad933
ref: use skip_while
LeoPatOZ Dec 8, 2025
c7584ae
Merge branch 'main' into reorg-log-opt
LeoPatOZ Dec 8, 2025
81875e4
doc: add comment about reorg handling
LeoPatOZ Dec 8, 2025
b216fa7
comment: explain ignored test in comment
LeoPatOZ Dec 9, 2025
ffe122e
test: add BRS tests (ignored until ack channel)
LeoPatOZ Dec 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 95 additions & 35 deletions src/block_range_scanner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ use crate::{

use alloy::{
consensus::BlockHeader,
eips::BlockId,
eips::{BlockId, BlockNumberOrTag},
network::{BlockResponse, Network, primitives::HeaderResponse},
primitives::BlockNumber,
};
Expand Down Expand Up @@ -445,6 +445,11 @@ impl<N: Network> Service<N> {
///
/// The `from` block is assumed to be greater than or equal to the `to` block.
///
/// # Reorg Handling
///
/// Reorg checks are only performed when the tip is above the current finalized
/// block height.
///
/// # Errors
///
/// Returns an error if the stream fails
Expand All @@ -464,8 +469,22 @@ impl<N: Network> Service<N> {
let from = tip.header().number();
let to = to.header().number();

let finalized_block = match provider.get_block_by_number(BlockNumberOrTag::Finalized).await
{
Ok(block) => block,
Err(e) => {
error!(error = %e, "Failed to get finalized block");
_ = sender.try_stream(e).await;
return;
}
};

// we're iterating in reverse
let mut batch_from = from;
let finalized_number = finalized_block.header().number();

// only check reorg if our tip is after the finalized block
let check_reorg = tip.header().number() > finalized_number;

while batch_from >= to {
let batch_to = batch_from.saturating_sub(max_block_range - 1).max(to);
Expand All @@ -486,51 +505,92 @@ impl<N: Network> Service<N> {
break;
}

let reorged_opt = match reorg_handler.check(&tip).await {
Ok(opt) => {
info!(block_number = %from, hash = %tip.header().hash(), "Reorg detected");
opt
}
Err(e) => {
error!(error = %e, "Terminal RPC call error, shutting down");
_ = sender.try_stream(e).await;
return;
}
};

// For now we only care if a reorg occurred, not which block it was.
// Once we optimize 'latest' mode to update only the reorged logs, we will need the
// exact common ancestor.
if reorged_opt.is_some() {
info!(block_number = %from, hash = %tip.header().hash(), "Reorg detected");

if !sender.try_stream(Notification::ReorgDetected).await {
break;
}

// restart rewind
batch_from = from;
// store the updated end block hash
tip = match provider.get_block_by_number(from.into()).await {
Ok(block) => block,
Err(RobustProviderError::BlockNotFound(_)) => {
panic!("Block with number '{from}' should exist post-reorg");
}
if check_reorg {
let reorg = match reorg_handler.check(&tip).await {
Ok(opt) => opt,
Err(e) => {
error!(error = %e, "Terminal RPC call error, shutting down");
_ = sender.try_stream(e).await;
return;
}
};
} else {
// `batch_to` is always greater than `to`, so `batch_to - 1` is always a valid
// unsigned integer
batch_from = batch_to - 1;

if let Some(common_ancestor) = reorg &&
!Self::handle_reorg_rescan(
&mut tip,
common_ancestor,
max_block_range,
sender,
provider,
)
.await
{
return;
}
}
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since BRS is a public type, make sure to document this behavior. I will do the same for historic mode.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related #235

Copy link
Collaborator Author

@LeoPatOZ LeoPatOZ Dec 8, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added an inline comment + something in the doc comment. Is that what you had in mind?


batch_from = batch_to - 1;
}

info!(batch_count = batch_count, "Rewind completed");
}

/// Handles re-scanning of reorged blocks.
///
/// Returns `true` on success, `false` if stream closed or terminal error occurred.
async fn handle_reorg_rescan(
tip: &mut N::BlockResponse,
common_ancestor: N::BlockResponse,
max_block_range: u64,
sender: &mpsc::Sender<BlockScannerResult>,
provider: &RobustProvider<N>,
) -> bool {
let tip_number = tip.header().number();
let common_ancestor_block = common_ancestor.header().number();
info!(
block_number = %tip_number,
hash = %tip.header().hash(),
common_ancestor_block = %common_ancestor_block,
"Reorg detected"
);

if !sender.try_stream(Notification::ReorgDetected { common_ancestor_block }).await {
return false;
}

// Get the new tip block (same height as original tip, but new hash)
*tip = match provider.get_block_by_number(tip_number.into()).await {
Ok(block) => block,
Err(RobustProviderError::BlockNotFound(_)) => {
panic!("Block with number '{tip_number}' should exist post-reorg");
}
Err(e) => {
error!(error = %e, "Terminal RPC call error, shutting down");
_ = sender.try_stream(e).await;
return false;
}
};

// Re-scan only the affected range (from tip down to common_ancestor + 1)
let rescan_to = common_ancestor_block + 1;

let mut rescan_batch_from = tip_number;
while rescan_batch_from >= rescan_to {
let rescan_batch_to =
rescan_batch_from.saturating_sub(max_block_range - 1).max(rescan_to);

if !sender.try_stream(rescan_batch_to..=rescan_batch_from).await {
return false;
}

if rescan_batch_to == rescan_to {
break;
}
rescan_batch_from = rescan_batch_to - 1;
}

true
}
}

pub struct BlockRangeScannerClient {
Expand Down
13 changes: 7 additions & 6 deletions src/block_range_scanner/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -251,12 +251,13 @@ async fn handle_reorg_detected<N: Network>(
state: &mut LiveStreamingState<N>,
sender: &mpsc::Sender<BlockScannerResult>,
) -> bool {
if !sender.try_stream(Notification::ReorgDetected).await {
let ancestor_num = common_ancestor.header().number();

if !sender.try_stream(Notification::ReorgDetected { common_ancestor_block: ancestor_num }).await
{
return false;
}

let ancestor_num = common_ancestor.header().number();

// Reset streaming position based on common ancestor
if ancestor_num < stream_start {
// Reorg went before our starting point - restart from stream_start
Expand Down Expand Up @@ -431,11 +432,11 @@ pub(crate) async fn stream_range_with_reorg_handling<N: Network>(
};

next_start_block = if let Some(common_ancestor) = reorged_opt {
if !sender.try_stream(Notification::ReorgDetected).await {
let common_ancestor_block = common_ancestor.header().number();
if !sender.try_stream(Notification::ReorgDetected { common_ancestor_block }).await {
return None;
}

min_common_ancestor.max(common_ancestor.header().number()) + 1
(common_ancestor_block + 1).max(min_common_ancestor)
} else {
batch_end_num + 1
};
Expand Down
29 changes: 29 additions & 0 deletions src/event_scanner/scanner/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,35 @@ pub fn spawn_log_consumers_in_collection_mode<N: Network>(
}
}
}
Ok(ScannerMessage::Notification(Notification::ReorgDetected {
common_ancestor_block,
})) => {
info!(
common_ancestor_block = common_ancestor_block,
"Received ReorgDetected notification"
);

// Invalidate logs from reorged blocks
// Logs are ordered newest -> oldest, so skip logs with
// block_number > common_ancestor at the front
let before_count = collected.len();
collected = collected
.into_iter()
.skip_while(|log| {
log.block_number.is_some_and(|n| n > common_ancestor_block)
})
.collect();
let removed_count = before_count - collected.len();
if removed_count > 0 {
info!(
removed_count = removed_count,
remaining_count = collected.len(),
"Invalidated logs from reorged blocks"
);
}
// Don't forward the notification to the user in CollectLatest mode
// since logs haven't been sent yet
}
Ok(ScannerMessage::Notification(notification)) => {
info!(notification = ?notification, "Received notification");
if !sender.try_stream(notification).await {
Expand Down
7 changes: 4 additions & 3 deletions src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,6 @@ pub enum Notification {
/// in sync scanners.
SwitchingToLive,

/// Emitted when a blockchain reorganization is detected during scanning.
///
/// When a reorg occurs, the scanner adjusts its position to re-stream events from the
/// canonical chain state. The specific behavior depends on the scanning mode (see individual
/// scanner mode documentation for details).
Expand All @@ -49,7 +47,10 @@ pub enum Notification {
/// handle duplicate logs idempotently (e.g., using transaction hashes or log indices as
/// deduplication keys). The scanner prioritizes correctness by ensuring all logs from the
/// canonical chain are delivered, even if it means occasional duplicates during reorgs.
ReorgDetected,
///
/// The `common_ancestor_block` field contains the block number of the last block
/// that is still valid on the canonical chain.
ReorgDetected { common_ancestor_block: u64 },
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we can find a more succinct name


/// Emitted during the latest events phase when no matching logs are found in the
/// scanned range.
Expand Down
Loading