@@ -5,16 +5,14 @@ const {
5
5
Readable,
6
6
} = require ( 'stream' ) ;
7
7
const assert = require ( 'assert' ) ;
8
+ const { once } = require ( 'events' ) ;
8
9
const { setTimeout } = require ( 'timers/promises' ) ;
9
10
10
11
{
11
12
// Map works on synchronous streams with a synchronous mapper
12
13
const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( ( x ) => x + x ) ;
13
- const result = [ 2 , 4 , 6 , 8 , 10 ] ;
14
14
( async ( ) => {
15
- for await ( const item of stream ) {
16
- assert . strictEqual ( item , result . shift ( ) ) ;
17
- }
15
+ assert . deepStrictEqual ( await stream . toArray ( ) , [ 2 , 4 , 6 , 8 , 10 ] ) ;
18
16
} ) ( ) . then ( common . mustCall ( ) ) ;
19
17
}
20
18
@@ -24,7 +22,49 @@ const { setTimeout } = require('timers/promises');
24
22
await Promise . resolve ( ) ;
25
23
return x + x ;
26
24
} ) ;
27
- const result = [ 2 , 4 , 6 , 8 , 10 ] ;
25
+ ( async ( ) => {
26
+ assert . deepStrictEqual ( await stream . toArray ( ) , [ 2 , 4 , 6 , 8 , 10 ] ) ;
27
+ } ) ( ) . then ( common . mustCall ( ) ) ;
28
+ }
29
+
30
+ {
31
+ // Map works on asynchronous streams with a asynchronous mapper
32
+ const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( async ( x ) => {
33
+ return x + x ;
34
+ } ) . map ( ( x ) => x + x ) ;
35
+ ( async ( ) => {
36
+ assert . deepStrictEqual ( await stream . toArray ( ) , [ 4 , 8 , 12 , 16 , 20 ] ) ;
37
+ } ) ( ) . then ( common . mustCall ( ) ) ;
38
+ }
39
+
40
+ {
41
+ // Map works on an infinite stream
42
+ const stream = Readable . from ( async function * ( ) {
43
+ while ( true ) yield 1 ;
44
+ } ( ) ) . map ( common . mustCall ( async ( x ) => {
45
+ return x + x ;
46
+ } , 5 ) ) ;
47
+ ( async ( ) => {
48
+ let i = 1 ;
49
+ for await ( const item of stream ) {
50
+ assert . strictEqual ( item , 2 ) ;
51
+ if ( ++ i === 5 ) break ;
52
+ }
53
+ } ) ( ) . then ( common . mustCall ( ) ) ;
54
+ }
55
+
56
+ {
57
+ // Map works on non-objectMode streams
58
+ const stream = new Readable ( {
59
+ read ( ) {
60
+ this . push ( Uint8Array . from ( [ 1 ] ) ) ;
61
+ this . push ( Uint8Array . from ( [ 2 ] ) ) ;
62
+ this . push ( null ) ;
63
+ }
64
+ } ) . map ( async ( [ x ] ) => {
65
+ return x + x ;
66
+ } ) . map ( ( x ) => x + x ) ;
67
+ const result = [ 4 , 8 ] ;
28
68
( async ( ) => {
29
69
for await ( const item of stream ) {
30
70
assert . strictEqual ( item , result . shift ( ) ) ;
@@ -33,39 +73,88 @@ const { setTimeout } = require('timers/promises');
33
73
}
34
74
35
75
{
36
- // Map works on asynchronous streams with a asynchronous mapper
37
- const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( async ( x ) => {
76
+ // Does not care about data events
77
+ const source = new Readable ( {
78
+ read ( ) {
79
+ this . push ( Uint8Array . from ( [ 1 ] ) ) ;
80
+ this . push ( Uint8Array . from ( [ 2 ] ) ) ;
81
+ this . push ( null ) ;
82
+ }
83
+ } ) ;
84
+ setImmediate ( ( ) => stream . emit ( 'data' , Uint8Array . from ( [ 1 ] ) ) ) ;
85
+ const stream = source . map ( async ( [ x ] ) => {
38
86
return x + x ;
39
87
} ) . map ( ( x ) => x + x ) ;
40
- const result = [ 4 , 8 , 12 , 16 , 20 ] ;
88
+ const result = [ 4 , 8 ] ;
41
89
( async ( ) => {
42
90
for await ( const item of stream ) {
43
91
assert . strictEqual ( item , result . shift ( ) ) ;
44
92
}
45
93
} ) ( ) . then ( common . mustCall ( ) ) ;
46
94
}
47
95
96
+ {
97
+ // Emitting an error during `map`
98
+ const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( async ( x ) => {
99
+ if ( x === 3 ) {
100
+ stream . emit ( 'error' , new Error ( 'boom' ) ) ;
101
+ }
102
+ return x + x ;
103
+ } ) ;
104
+ assert . rejects (
105
+ stream . map ( ( x ) => x + x ) . toArray ( ) ,
106
+ / b o o m / ,
107
+ ) . then ( common . mustCall ( ) ) ;
108
+ }
109
+
110
+ {
111
+ // Throwing an error during `map` (sync)
112
+ const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( ( x ) => {
113
+ if ( x === 3 ) {
114
+ throw new Error ( 'boom' ) ;
115
+ }
116
+ return x + x ;
117
+ } ) ;
118
+ assert . rejects (
119
+ stream . map ( ( x ) => x + x ) . toArray ( ) ,
120
+ / b o o m / ,
121
+ ) . then ( common . mustCall ( ) ) ;
122
+ }
123
+
124
+
125
+ {
126
+ // Throwing an error during `map` (async)
127
+ const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( async ( x ) => {
128
+ if ( x === 3 ) {
129
+ throw new Error ( 'boom' ) ;
130
+ }
131
+ return x + x ;
132
+ } ) ;
133
+ assert . rejects (
134
+ stream . map ( ( x ) => x + x ) . toArray ( ) ,
135
+ / b o o m / ,
136
+ ) . then ( common . mustCall ( ) ) ;
137
+ }
138
+
48
139
{
49
140
// Concurrency + AbortSignal
50
141
const ac = new AbortController ( ) ;
51
- let calls = 0 ;
52
- const stream = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) . map ( async ( _ , { signal } ) => {
53
- calls ++ ;
54
- await setTimeout ( 100 , { signal } ) ;
55
- } , { signal : ac . signal , concurrency : 2 } ) ;
142
+ const range = Readable . from ( [ 1 , 2 , 3 , 4 , 5 ] ) ;
143
+ const stream = range . map ( common . mustCall ( async ( _ , { signal } ) => {
144
+ await once ( signal , 'abort' ) ;
145
+ throw signal . reason ;
146
+ } , 2 ) , { signal : ac . signal , concurrency : 2 } ) ;
56
147
// pump
57
148
assert . rejects ( async ( ) => {
58
149
for await ( const item of stream ) {
59
- // nope
60
- console . log ( item ) ;
150
+ assert . fail ( 'should not reach here, got ' + item ) ;
61
151
}
62
152
} , {
63
153
name : 'AbortError' ,
64
154
} ) . then ( common . mustCall ( ) ) ;
65
155
66
156
setImmediate ( ( ) => {
67
157
ac . abort ( ) ;
68
- assert . strictEqual ( calls , 2 ) ;
69
158
} ) ;
70
159
}
71
160
0 commit comments