4
4
"context"
5
5
"crypto/rand"
6
6
"testing"
7
+ "time"
7
8
8
9
"github.com/stretchr/testify/assert"
9
10
"github.com/stretchr/testify/mock"
@@ -12,7 +13,9 @@ import (
12
13
abci "github.com/tendermint/tendermint/abci/types"
13
14
cfg "github.com/tendermint/tendermint/config"
14
15
"github.com/tendermint/tendermint/libs/log"
16
+ "github.com/tendermint/tendermint/libs/pubsub/query"
15
17
"github.com/tendermint/tendermint/proxy"
18
+ tmtypes "github.com/tendermint/tendermint/types"
16
19
17
20
"github.com/celestiaorg/optimint/mempool"
18
21
"github.com/celestiaorg/optimint/mocks"
@@ -35,7 +38,7 @@ func TestCreateBlock(t *testing.T) {
35
38
nsID := [8 ]byte {1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 }
36
39
37
40
mpool := mempool .NewCListMempool (cfg .DefaultMempoolConfig (), proxy .NewAppConnMempool (client ), 0 )
38
- executor := NewBlockExecutor ([]byte ("test address" ), nsID , "test" , mpool , proxy .NewAppConnConsensus (client ), logger )
41
+ executor := NewBlockExecutor ([]byte ("test address" ), nsID , "test" , mpool , proxy .NewAppConnConsensus (client ), nil , logger )
39
42
40
43
state := State {}
41
44
state .ConsensusParams .Block .MaxBytes = 100
@@ -91,7 +94,21 @@ func TestApplyBlock(t *testing.T) {
91
94
chainID := "test"
92
95
93
96
mpool := mempool .NewCListMempool (cfg .DefaultMempoolConfig (), proxy .NewAppConnMempool (client ), 0 )
94
- executor := NewBlockExecutor ([]byte ("test address" ), nsID , chainID , mpool , proxy .NewAppConnConsensus (client ), logger )
97
+ eventBus := tmtypes .NewEventBus ()
98
+ require .NoError (eventBus .Start ())
99
+ executor := NewBlockExecutor ([]byte ("test address" ), nsID , chainID , mpool , proxy .NewAppConnConsensus (client ), eventBus , logger )
100
+
101
+ txQuery , err := query .New ("tm.event='Tx'" )
102
+ require .NoError (err )
103
+ txSub , err := eventBus .Subscribe (context .Background (), "test" , txQuery , 1000 )
104
+ require .NoError (err )
105
+ require .NotNil (txSub )
106
+
107
+ headerQuery , err := query .New ("tm.event='NewBlockHeader'" )
108
+ require .NoError (err )
109
+ headerSub , err := eventBus .Subscribe (context .Background (), "test" , headerQuery , 100 )
110
+ require .NoError (err )
111
+ require .NotNil (headerSub )
95
112
96
113
state := State {}
97
114
state .InitialHeight = 1
@@ -125,4 +142,35 @@ func TestApplyBlock(t *testing.T) {
125
142
require .NoError (err )
126
143
require .NotNil (newState )
127
144
assert .Equal (int64 (2 ), newState .LastBlockHeight )
145
+
146
+ // wait for at least 4 Tx events, for up to 3 second.
147
+ // 3 seconds is a fail-scenario only
148
+ timer := time .NewTimer (3 * time .Second )
149
+ txs := make (map [int64 ]int )
150
+ cnt := 0
151
+ for cnt != 4 {
152
+ select {
153
+ case evt := <- txSub .Out ():
154
+ cnt ++
155
+ data , ok := evt .Data ().(tmtypes.EventDataTx )
156
+ assert .True (ok )
157
+ assert .NotEmpty (data .Tx )
158
+ txs [data .Height ]++
159
+ case <- timer .C :
160
+ t .FailNow ()
161
+ }
162
+ }
163
+ assert .Zero (len (txSub .Out ())) // expected exactly 4 Txs - channel should be empty
164
+ assert .EqualValues (1 , txs [1 ])
165
+ assert .EqualValues (3 , txs [2 ])
166
+
167
+ require .EqualValues (2 , len (headerSub .Out ()))
168
+ for h := 1 ; h <= 2 ; h ++ {
169
+ evt := <- headerSub .Out ()
170
+ data , ok := evt .Data ().(tmtypes.EventDataNewBlockHeader )
171
+ assert .True (ok )
172
+ if data .Header .Height == 2 {
173
+ assert .EqualValues (3 , data .NumTxs )
174
+ }
175
+ }
128
176
}
0 commit comments