diff --git a/ethhelpers/client_with_handlers.go b/ethhelpers/client_with_handlers.go index 8282b0d..ef5fae4 100644 --- a/ethhelpers/client_with_handlers.go +++ b/ethhelpers/client_with_handlers.go @@ -34,8 +34,10 @@ type clientWithHandlers struct { // NewClientWithHandlers creates a new client with custom handlers. // -// The handlers cannot modify the content of the arguments or results, except by -// overriding the error. +// The handlers cannot modify the content of the arguments or results, except +// for overriding the error returned. +// +// Handlers should return nil if it has not changed the error. func NewClientWithDefaultHandler(defaultHandler func(context.Context, ClientCaller) error) Client { return &clientWithHandlers{ defaultHandler: defaultHandler, diff --git a/ethhelpers/client_with_http_subscriptions.go b/ethhelpers/client_with_http_subscriptions.go index 17f9328..2c66015 100644 --- a/ethhelpers/client_with_http_subscriptions.go +++ b/ethhelpers/client_with_http_subscriptions.go @@ -8,17 +8,18 @@ import ( ) const ( + // Deprecated: maxFilterLogWindow = 1000 ) type clientWithHTTPSubscriptions struct { Client - createTicker func(context.Context, uint64) BlockNumberTicker + createTicker func(context.Context, uint64) (BlockNumberTicker, error) } // TODO: Add options. -func NewClientWithHTTPSubscriptions(client Client, createTicker func(context.Context, uint64) BlockNumberTicker) Client { +func NewClientWithHTTPSubscriptions(client Client, createTicker func(context.Context, uint64) (BlockNumberTicker, error)) Client { return &clientWithHTTPSubscriptions{ Client: client, createTicker: createTicker, @@ -27,6 +28,11 @@ func NewClientWithHTTPSubscriptions(client Client, createTicker func(context.Con // The context argument cancels the RPC request that sets up the subscription // but has no effect on the subscription after Subscribe has returned. -func (c *clientWithHTTPSubscriptions) SubscribeFilterLogs(ctx context.Context, filterQuery ethereum.FilterQuery, logChan chan<- types.Log) (ethereum.Subscription, error) { - return SubscribeFilterLogsWithHTTP(ctx, c.Client, c.createTicker, filterQuery, logChan) +func (c *clientWithHTTPSubscriptions) SubscribeFilterLogs(ctx context.Context, filterQuery ethereum.FilterQuery, logs chan<- types.Log) (ethereum.Subscription, error) { + return SubscribeFilterLogsWithHTTP(ctx, &HTTPSubscriberOptions{ + Client: c.Client, + CreateTicker: c.createTicker, + FilterQuery: filterQuery, + Logs: logs, + }) } diff --git a/ethhelpers/client_with_http_subscriptions_test.go b/ethhelpers/client_with_http_subscriptions_test.go index 8c6c453..131455a 100644 --- a/ethhelpers/client_with_http_subscriptions_test.go +++ b/ethhelpers/client_with_http_subscriptions_test.go @@ -29,8 +29,8 @@ func TestClientWithHTTPSubscriptions_SubscribeFilterLogs(t *testing.T) { client := ethhelpers.NewClientWithHTTPSubscriptions( ethtesting.NewSimulatedClient(sim.Backend), - func(ctx context.Context, fromBlock uint64) ethhelpers.BlockNumberTicker { - return ethhelpers.NewPeriodicBlockNumberTickerFromBlock(ctx, ethtesting.NewSimulatedClient(sim.Backend), time.Second/4, fromBlock) + func(ctx context.Context, fromBlock uint64) (ethhelpers.BlockNumberTicker, error) { + return ethhelpers.NewPeriodicBlockNumberTickerFromBlock(ctx, ethtesting.NewSimulatedClient(sim.Backend), time.Second/4, fromBlock), nil }, ) diff --git a/ethhelpers/clients.go b/ethhelpers/clients.go index e456183..41d87ed 100644 --- a/ethhelpers/clients.go +++ b/ethhelpers/clients.go @@ -89,15 +89,15 @@ type Client interface { // TODO: Add client without subcription methods. +type BlockNumberReader interface { + BlockNumber(ctx context.Context) (uint64, error) +} + type ChainReaderAndTransactionSender interface { ethereum.TransactionSender ethereum.ChainReader } -type BlockNumberReader interface { - BlockNumber(ctx context.Context) (uint64, error) -} - type FilterLogsReader interface { FilterLogs(ctx context.Context, q ethereum.FilterQuery) ([]types.Log, error) } diff --git a/ethhelpers/http_subscribers.go b/ethhelpers/http_subscribers.go index fb2b46a..b367638 100644 --- a/ethhelpers/http_subscribers.go +++ b/ethhelpers/http_subscribers.go @@ -10,69 +10,149 @@ import ( ) type httpSubscription struct { - subCancel func() - errChan chan error - unsubDone chan struct{} + cancel func() + err chan error + done chan struct{} } func (s *httpSubscription) Unsubscribe() { - s.subCancel() - <-s.unsubDone + s.cancel() + <-s.done - close(s.errChan) + close(s.err) } func (s *httpSubscription) Err() <-chan error { - return s.errChan + return s.err +} + +type HTTPSubscriberClient interface { + BlockNumberReader + FilterLogsReader +} + +type HTTPSubscriberOptions struct { + Client HTTPSubscriberClient + + // CreateContext returns a context that is used for the subscription, or + // context.Background() if nil. + CreateContext func() (context.Context, context.CancelFunc) + + // CreateTicker is a function that creates a block number ticker. + // + // The context is canceled when a subscription is unsubscribed or encounters + // an error, or if the context passed to the function creating the + // HTTPSubscriber function is canceled before returning a valid + // subscription. + // + // The method is called only once per subscription. + CreateTicker func(ctx context.Context, fromBlock uint64) (BlockNumberTicker, error) + + FilterQuery ethereum.FilterQuery + Logs chan<- types.Log } // The context argument cancels the RPC request that sets up the subscription // but has no effect on the subscription after Subscribe has returned. // -// Subscribers should be using the same underlying go-ethereum rpc client as the -// block number ticker to ensure there are no race-conditions. +// Subscribers should be using the same underlying go-ethereum rpc client +// connection as the block number ticker to ensure there are no race-conditions. // -// To conform to the SubscriberFilterLogs api the ticker should only return -// current, and not historic, block numbers. -func SubscribeFilterLogsWithHTTP(ctx context.Context, client FilterLogsReader, createTicker func(context.Context, uint64) BlockNumberTicker, filterQuery ethereum.FilterQuery, logChan chan<- types.Log) (ethereum.Subscription, error) { - if logChan == nil { - panic("channel given to SubscribeFilterLogs must not be nil") +// To conform to the go-ethereum SubscriberFilterLogs api the ticker should only +// return current, and not historic, block numbers. +// +// The current block number is requested before the subscription is returned. +func SubscribeFilterLogsWithHTTP(callerCtx context.Context, opts *HTTPSubscriberOptions) (ethereum.Subscription, error) { + if opts.Client == nil { + return nil, fmt.Errorf("opts.Client must be set") + } + if opts.CreateTicker == nil { + return nil, fmt.Errorf("opts.CreateTicker must be set") } + if opts.Logs == nil { + return nil, fmt.Errorf("opts.Logs must be set") + } + + subscriberCtx, cancel := func() (context.Context, context.CancelFunc) { + if opts.CreateContext == nil { + return context.WithCancel(context.Background()) + } - subCtx, subCancel := context.WithCancel(context.Background()) + return opts.CreateContext() + }() - s := &httpSubscription{ - subCancel: subCancel, - errChan: make(chan error, 1), - unsubDone: make(chan struct{}), - } + ticker, err := func(ctx context.Context, done <-chan struct{}) (BlockNumberTicker, error) { + ch := make(chan struct{}) + defer close(ch) + + go func() { + select { + case <-ch: + case <-done: + cancel() + } + }() + + currentBlock, err := opts.Client.BlockNumber(ctx) + if err != nil { + return nil, fmt.Errorf("failed to get current block number: %w", err) + } - queryFromBlock, ok := BigIntAsUint64OrZeroIfNil(filterQuery.FromBlock) - if !ok { - return nil, fmt.Errorf("invalid FromBlock value") + if opts.FilterQuery.FromBlock == nil { + return opts.CreateTicker(ctx, currentBlock) + } + if !opts.FilterQuery.FromBlock.IsUint64() { + return nil, fmt.Errorf("opts.FilterQuery.FromBlock is too large") + } + + queryFromBlock := opts.FilterQuery.FromBlock.Uint64() + + if currentBlock < queryFromBlock { + return opts.CreateTicker(ctx, queryFromBlock) + } else { + return opts.CreateTicker(ctx, currentBlock) + } + + }(subscriberCtx, callerCtx.Done()) + if err != nil { + cancel() + return nil, fmt.Errorf("failed to create block ticker: %w", err) } - ticker := createTicker(ctx, queryFromBlock) + s := &httpSubscription{ + cancel: cancel, + err: make(chan error, 1), + done: make(chan struct{}), + } go func(ctx context.Context) { - defer close(s.unsubDone) + defer close(s.done) waitFn := func() (uint64, bool) { select { - case bn := <-ticker.Wait(): + case bn, ok := <-ticker.Wait(): + if !ok { + s.err <- fmt.Errorf("block ticker wait channel closed") + return 0, false + } + return bn.BlockNumber, true - case err := <-ticker.Err(): + case err, ok := <-ticker.Err(): + if !ok { + s.err <- fmt.Errorf("block number ticker closed the error channel") + return 0, false + } if err == nil { - s.errChan <- fmt.Errorf("block number ticker returned nil error") + s.err <- fmt.Errorf("block number ticker returned a nil error") return 0, false } - s.errChan <- err + s.err <- err return 0, false case <-ctx.Done(): - s.errChan <- ctx.Err() + s.err <- ctx.Err() return 0, false } } @@ -87,29 +167,33 @@ func SubscribeFilterLogsWithHTTP(ctx context.Context, client FilterLogsReader, c if !ok { return } + if currentBlock < fromBlock { + s.err <- fmt.Errorf("block number ticker returned a block number less than the from block") + return + } - q := filterQuery + q := opts.FilterQuery q.FromBlock = new(big.Int).SetUint64(fromBlock) q.ToBlock = new(big.Int).SetUint64(currentBlock) - logs, err := client.FilterLogs(ctx, q) + logs, err := opts.Client.FilterLogs(ctx, q) if err != nil { - s.errChan <- err + s.err <- err return } for _, log := range logs { select { - case logChan <- log: + case opts.Logs <- log: case <-ctx.Done(): - s.errChan <- err + s.err <- err return } } fromBlock = currentBlock + 1 } - }(subCtx) + }(subscriberCtx) return s, nil } diff --git a/ethhelpers/tickers.go b/ethhelpers/tickers.go index 037419f..ba4ae7e 100644 --- a/ethhelpers/tickers.go +++ b/ethhelpers/tickers.go @@ -63,13 +63,6 @@ func (t *blockNumberTicker) Wait() <-chan BlockNumber { case <-t.request: default: } - - // select { - // case t.request <- blockNumberTickerRequest{ch}: - // return ch - // case <-t.request: - // // Discard previous request if it hasn't been read yet. - // } } }