Skip to content

Commit

Permalink
Use watched addresses from direct indexing params by default while se…
Browse files Browse the repository at this point in the history
…rving statediff APIs (#262)

* Use watched addresses from direct indexing params in statediff APIs by default

* Avoid using indexer object when direct indexing is off

* Add nil check before accessing watched addresses from direct indexing params
  • Loading branch information
prathamesh0 authored Jul 25, 2022
1 parent 7b4ef34 commit 5083ee5
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 24 deletions.
87 changes: 66 additions & 21 deletions statediff/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,7 +170,8 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
var db sql.Database
var err error
quitCh := make(chan bool)
if params.IndexerConfig != nil {
indexerConfigAvailable := params.IndexerConfig != nil
if indexerConfigAvailable {
info := nodeinfo.Info{
GenesisBlock: blockChain.Genesis().Hash().Hex(),
NetworkID: strconv.FormatUint(cfg.NetworkId, 10),
Expand Down Expand Up @@ -201,11 +202,13 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
statediffMetrics: statediffMetrics,
sqlFileWaitingForWrite: false,
}
if params.IndexerConfig.Type() == shared.POSTGRES {
knownGaps.checkForGaps = true
} else {
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
knownGaps.checkForGaps = false
if indexerConfigAvailable {
if params.IndexerConfig.Type() == shared.POSTGRES {
knownGaps.checkForGaps = true
} else {
log.Info("We are not going to check for gaps on start up since we are not connected to Postgres!")
knownGaps.checkForGaps = false
}
}
sds := &Service{
Mutex: sync.Mutex{},
Expand All @@ -226,9 +229,11 @@ func New(stack *node.Node, ethServ *eth.Ethereum, cfg *ethconfig.Config, params
stack.RegisterLifecycle(sds)
stack.RegisterAPIs(sds.APIs())

err = loadWatchedAddresses(indexer)
if err != nil {
return err
if indexerConfigAvailable {
err = loadWatchedAddresses(indexer)
if err != nil {
return err
}
}

return nil
Expand Down Expand Up @@ -477,6 +482,13 @@ func (sds *Service) StateDiffAt(blockNumber uint64, params Params) (*Payload, er
currentBlock := sds.BlockChain.GetBlockByNumber(blockNumber)
log.Info("sending state diff", "block height", blockNumber)

// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()

Expand All @@ -493,6 +505,13 @@ func (sds *Service) StateDiffFor(blockHash common.Hash, params Params) (*Payload
currentBlock := sds.BlockChain.GetBlockByHash(blockHash)
log.Info("sending state diff", "block hash", blockHash)

// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()

Expand Down Expand Up @@ -777,6 +796,15 @@ func (sds *Service) StreamCodeAndCodeHash(blockNumber uint64, outChan chan<- typ
// This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data
func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
log.Info("writing state diff at", "block height", blockNumber)

// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()

Expand All @@ -793,6 +821,15 @@ func (sds *Service) WriteStateDiffAt(blockNumber uint64, params Params) error {
// This operation cannot be performed back past the point of db pruning; it requires an archival node
// for historical data
func (sds *Service) WriteStateDiffFor(blockHash common.Hash, params Params) error {
log.Info("writing state diff for", "block hash", blockHash)

// use watched addresses from statediffing write loop if not provided
if params.WatchedAddresses == nil && writeLoopParams.WatchedAddresses != nil {
writeLoopParams.RLock()
params.WatchedAddresses = make([]common.Address, len(writeLoopParams.WatchedAddresses))
copy(params.WatchedAddresses, writeLoopParams.WatchedAddresses)
writeLoopParams.RUnlock()
}
// compute leaf paths of watched addresses in the params
params.ComputeWatchedAddressesLeafPaths()

Expand Down Expand Up @@ -895,9 +932,11 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
}

// update the db
err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber)
if err != nil {
return err
if sds.indexer != nil {
err = sds.indexer.InsertWatchedAddresses(filteredArgs, currentBlockNumber)
if err != nil {
return err
}
}

// update in-memory params
Expand All @@ -917,9 +956,11 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
}

// update the db
err = sds.indexer.RemoveWatchedAddresses(args)
if err != nil {
return err
if sds.indexer != nil {
err = sds.indexer.RemoveWatchedAddresses(args)
if err != nil {
return err
}
}

// update in-memory params
Expand All @@ -933,19 +974,23 @@ func (sds *Service) WatchAddress(operation types2.OperationType, args []types2.W
}

// update the db
err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber)
if err != nil {
return err
if sds.indexer != nil {
err = sds.indexer.SetWatchedAddresses(args, currentBlockNumber)
if err != nil {
return err
}
}

// update in-memory params
writeLoopParams.WatchedAddresses = argAddresses
writeLoopParams.ComputeWatchedAddressesLeafPaths()
case types2.Clear:
// update the db
err := sds.indexer.ClearWatchedAddresses()
if err != nil {
return err
if sds.indexer != nil {
err := sds.indexer.ClearWatchedAddresses()
if err != nil {
return err
}
}

// update in-memory params
Expand Down
7 changes: 4 additions & 3 deletions statediff/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,10 @@ var (
event3 = core.ChainEvent{Block: testBlock3}

defaultParams = statediff.Params{
IncludeBlock: true,
IncludeReceipts: true,
IncludeTD: true,
IncludeBlock: true,
IncludeReceipts: true,
IncludeTD: true,
WatchedAddresses: []common.Address{},
}
)

Expand Down

0 comments on commit 5083ee5

Please # to comment.