30
30
S : Payload ,
31
31
{
32
32
body_tx : SendStream < SendBuf < S :: Data > > ,
33
+ data_done : bool ,
33
34
stream : S ,
34
35
}
35
36
40
41
fn new ( stream : S , tx : SendStream < SendBuf < S :: Data > > ) -> PipeToSendStream < S > {
41
42
PipeToSendStream {
42
43
body_tx : tx,
44
+ data_done : false ,
43
45
stream : stream,
44
46
}
45
47
}
48
+
49
+ fn on_err ( & mut self , err : S :: Error ) -> :: Error {
50
+ let err = :: Error :: new_user_body ( err) ;
51
+ trace ! ( "send body user stream error: {}" , err) ;
52
+ self . body_tx . send_reset ( Reason :: INTERNAL_ERROR ) ;
53
+ err
54
+ }
55
+
56
+ fn send_eos_frame ( & mut self ) -> :: Result < ( ) > {
57
+ trace ! ( "send body eos" ) ;
58
+ self . body_tx . send_data ( SendBuf ( None ) , true )
59
+ . map_err ( :: Error :: new_body_write)
60
+ }
46
61
}
47
62
48
63
impl < S > Future for PipeToSendStream < S >
@@ -54,49 +69,59 @@ where
54
69
55
70
fn poll ( & mut self ) -> Poll < Self :: Item , Self :: Error > {
56
71
loop {
57
- // TODO: make use of flow control on SendStream
58
- // If you're looking at this and thinking of trying to fix this TODO,
59
- // you may want to look at:
60
- // https://docs.rs/h2/0.1.*/h2/struct.SendStream.html
61
- //
62
- // With that doc open, we'd want to do these things:
63
- // - check self.body_tx.capacity() to see if we can send *any* data
64
- // - if > 0:
65
- // - poll self.stream
66
- // - reserve chunk.len() more capacity (because its about to be used)?
67
- // - send the chunk
68
- // - else:
69
- // - try reserve a smallish amount of capacity
70
- // - call self.body_tx.poll_capacity(), return if NotReady
71
- match self . stream . poll_data ( ) {
72
- Ok ( Async :: Ready ( Some ( chunk) ) ) => {
73
- let is_eos = self . stream . is_end_stream ( ) ;
74
- trace ! (
75
- "send body chunk: {} bytes, eos={}" ,
76
- chunk. remaining( ) ,
77
- is_eos,
78
- ) ;
79
-
80
- let buf = SendBuf ( Some ( chunk) ) ;
81
- self . body_tx . send_data ( buf, is_eos)
82
- . map_err ( :: Error :: new_body_write) ?;
83
-
84
- if is_eos {
85
- return Ok ( Async :: Ready ( ( ) ) )
86
- }
87
- } ,
88
- Ok ( Async :: Ready ( None ) ) => {
89
- trace ! ( "send body eos" ) ;
90
- self . body_tx . send_data ( SendBuf ( None ) , true )
91
- . map_err ( :: Error :: new_body_write) ?;
92
- return Ok ( Async :: Ready ( ( ) ) ) ;
93
- } ,
94
- Ok ( Async :: NotReady ) => return Ok ( Async :: NotReady ) ,
95
- Err ( err) => {
96
- let err = :: Error :: new_user_body ( err) ;
97
- trace ! ( "send body user stream error: {}" , err) ;
98
- self . body_tx . send_reset ( Reason :: INTERNAL_ERROR ) ;
99
- return Err ( err) ;
72
+ if !self . data_done {
73
+ // TODO: make use of flow control on SendStream
74
+ // If you're looking at this and thinking of trying to fix this TODO,
75
+ // you may want to look at:
76
+ // https://docs.rs/h2/0.1.*/h2/struct.SendStream.html
77
+ //
78
+ // With that doc open, we'd want to do these things:
79
+ // - check self.body_tx.capacity() to see if we can send *any* data
80
+ // - if > 0:
81
+ // - poll self.stream
82
+ // - reserve chunk.len() more capacity (because its about to be used)?
83
+ // - send the chunk
84
+ // - else:
85
+ // - try reserve a smallish amount of capacity
86
+ // - call self.body_tx.poll_capacity(), return if NotReady
87
+ match try_ready ! ( self . stream. poll_data( ) . map_err( |e| self . on_err( e) ) ) {
88
+ Some ( chunk) => {
89
+ let is_eos = self . stream . is_end_stream ( ) ;
90
+ trace ! (
91
+ "send body chunk: {} bytes, eos={}" ,
92
+ chunk. remaining( ) ,
93
+ is_eos,
94
+ ) ;
95
+
96
+ let buf = SendBuf ( Some ( chunk) ) ;
97
+ self . body_tx . send_data ( buf, is_eos)
98
+ . map_err ( :: Error :: new_body_write) ?;
99
+
100
+ if is_eos {
101
+ return Ok ( Async :: Ready ( ( ) ) )
102
+ }
103
+ } ,
104
+ None => {
105
+ let is_eos = self . stream . is_end_stream ( ) ;
106
+ if is_eos {
107
+ return self . send_eos_frame ( ) . map ( Async :: Ready ) ;
108
+ } else {
109
+ self . data_done = true ;
110
+ // loop again to poll_trailers
111
+ }
112
+ } ,
113
+ }
114
+ } else {
115
+ match try_ready ! ( self . stream. poll_trailers( ) . map_err( |e| self . on_err( e) ) ) {
116
+ Some ( trailers) => {
117
+ self . body_tx . send_trailers ( trailers)
118
+ . map_err ( :: Error :: new_body_write) ?;
119
+ return Ok ( Async :: Ready ( ( ) ) ) ;
120
+ } ,
121
+ None => {
122
+ // There were no trailers, so send an empty DATA frame...
123
+ return self . send_eos_frame ( ) . map ( Async :: Ready ) ;
124
+ } ,
100
125
}
101
126
}
102
127
}
0 commit comments