Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

[WIP] Hack Base Indexing #37

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@ clap = { version = "4", features = ["derive", "env"] }
rusqlite = { version = "0.30.0", features = ["extra_check"] }
# Waiting on https://github.com/nlordell/ethrpc-rs/pull/9
#ethrpc = { version = "0.0.8", features = ["http"] }
ethrpc = { git = "https://github.com/bh2smith/ethrpc-rs", rev = "4eb7466", features = [
ethrpc = { git = "https://github.com/bh2smith/ethrpc-rs", rev = "f463da4", features = [
"http",
] }
serde = { version = "1", features = ["derive"] }
1 change: 1 addition & 0 deletions src/database/mod.rs
Original file line number Diff line number Diff line change
@@ -50,6 +50,7 @@ pub struct Uncle<'a> {
/// An emitted event log.
#[derive(Debug, Default)]
pub struct Log<'a> {
// pub transaction_hash: Digest,
pub event: &'a str,
pub block_number: u64,
pub log_index: u64,
18 changes: 14 additions & 4 deletions src/indexer/chain.rs
Original file line number Diff line number Diff line change
@@ -42,10 +42,20 @@ impl Chain {

/// Updates the finalized block. Returns the previous finalized block.
pub fn finalize(&mut self, finalized: U256) -> Result<U256> {
anyhow::ensure!(
(self.finalized..self.next()).contains(&finalized),
"invalid finalized block"
);
// If the finalized block is ahead of our chain, just return current finalized
if finalized >= self.next() {
return Ok(self.finalized);
}

// If the finalized block is behind our current finalized, log warning and return
if finalized < self.finalized {
tracing::warn!(
current = %self.finalized,
attempted = %finalized,
"attempted to finalize block before current finalized block"
);
return Ok(self.finalized);
}

let keep = self.next() - finalized;
let old = self.finalized;
55 changes: 52 additions & 3 deletions src/indexer/mod.rs
Original file line number Diff line number Diff line change
@@ -14,9 +14,10 @@ use {
eth,
types::{Block, BlockSpec, BlockTag, BlockTransactions, Hydrated, LogBlocks},
},
solabi::U256,
solabi::{Address, U256},
std::{
cmp,
str::FromStr,
time::{Duration, SystemTime},
},
tokio::time,
@@ -130,6 +131,17 @@ where
let to = cmp::min(finalized.number.as_u64(), earliest + config.page_size - 1);
tracing::debug!(from =% earliest, %to, "indexing blocks");

// Add detailed logging for block ranges
tracing::debug!(
"page_size={}, finalized={}, earliest={}",
config.page_size,
finalized.number.as_u64(),
earliest
);

let to = cmp::min(finalized.number.as_u64(), earliest + config.page_size - 1);
tracing::debug!("block_range_size={}", to.saturating_sub(earliest) + 1);

// Fetch Blocks and Transaction Data
// TODO(bh2smith) - When a new event is introduced with start earlier than previously indexed events,
// we still need to pick up the block-data that we don't already have.
@@ -138,7 +150,7 @@ where
.map(|block: u64| {
(
eth::GetBlockByNumber,
(BlockSpec::Number(U256::from(block)), Hydrated::Yes),
(BlockSpec::Number(U256::from(block)), Hydrated::No),
)
})
.collect();
@@ -188,6 +200,41 @@ where
.flat_map(|(adapter, logs)| database_logs(adapter, logs))
.collect::<Vec<_>>();

// Filter logs by transaction sender
let filtered_logs = futures::future::join_all(logs.into_iter().map(|log| async {
let tx = self
.eth
.call(
eth::GetTransactionByBlockNumberAndIndex,
(
BlockSpec::Number(U256::from(log.block_number)),
U256::from(log.transaction_index),
),
)
.await
.ok()
.flatten();

match tx {
Some(tx)
if tx.from()
== Address::from_str("0x72469d86a92f5a9e975fe371a66015e667ab288f")
.unwrap() =>
{
Some(log)
}
_ => None,
}
}))
.await;

let filtered_logs: Vec<_> = filtered_logs.into_iter().flatten().collect();
if !filtered_logs.is_empty() {
tracing::debug!(
logs = %filtered_logs.len(),
"keep relevant logs"
);
}
// Add blocks and transactions.
blocks.extend([
database::EventBlock {
@@ -207,7 +254,7 @@ where
]);
let (block_times, transactions) = database_block_data(block_tx_data);
self.database
.update(&blocks, &logs, &block_times, &transactions)
.update(&blocks, &filtered_logs, &block_times, &transactions)
.await?;
}
}
@@ -233,6 +280,7 @@ where
block = %next.number, hash = %next.hash,
"found new block"
);
time::sleep(Duration::from_secs(1)).await;
}
chain::Append::Reorg => {
let block = next.number - 1;
@@ -397,6 +445,7 @@ fn database_logs(

Some(database::Log {
event: adapter.name(),
// transaction_hash: log.transaction_hash,
block_number: log.block_number.as_u64(),
log_index: log.log_index.as_u64(),
transaction_index: log.transaction_index.as_u64(),
Loading