Skip to content

Commit e3a9cab

Browse files
authoredMar 14, 2025
HTTP/1: Support streaming requests of unknown length (#506)
**Issue:** aws-sdk-cpp needs to send streaming requests of unknown content length. This isn't currently supported in our HTTP/1 client. (our HTTP/2 client *does* already support this) **Background:** The HTTP/1 client already supported these use cases: - Send streaming request body, but declare Content-Length up front - Send chunked request body, but user must declare length of each chunk via `write_chunk()` API. But it wasn't possible to send a stream of unknown length without doing extra copies (read to intermediate buffer, then send that buffer via write_chunk() API). **Description of changes:** Support input streams of unknown length via chunked encoding. When reading from the stream, leave a bit of space at the start and end of the aws_io_message buffer for the chunk prefix and suffix. Then go back and write in the prefix and suffix afterwards. No unnecessary copies are involved.
1 parent 60c43f8 commit e3a9cab

File tree

6 files changed

+548
-70
lines changed

6 files changed

+548
-70
lines changed
 

‎include/aws/http/private/h1_encoder.h

+7-4
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,12 @@ struct aws_h1_encoder_message {
5353
enum aws_h1_encoder_state {
5454
AWS_H1_ENCODER_STATE_INIT,
5555
AWS_H1_ENCODER_STATE_HEAD,
56-
AWS_H1_ENCODER_STATE_UNCHUNKED_BODY,
56+
/* Write streaming body, without chunked encoding, because Content-Length is known */
57+
AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM,
58+
/* Write streaming body, with chunked encoding, because Content-Length is unknown */
59+
AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM,
60+
AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM_LAST_CHUNK,
61+
/* The rest of the _CHUNK_ states support the write_chunk() API (body stream not provided up front) */
5762
AWS_H1_ENCODER_STATE_CHUNK_NEXT,
5863
AWS_H1_ENCODER_STATE_CHUNK_LINE,
5964
AWS_H1_ENCODER_STATE_CHUNK_BODY,
@@ -73,7 +78,7 @@ struct aws_h1_encoder {
7378
/* Current chunk */
7479
struct aws_h1_chunk *current_chunk;
7580
/* Number of chunks sent, just used for logging */
76-
size_t chunk_count;
81+
uint64_t chunk_count;
7782
/* Encoder logs with this stream ptr as the ID, and passes this ptr to the chunk_complete callback */
7883
struct aws_http_stream *current_stream;
7984
};
@@ -91,8 +96,6 @@ void aws_h1_chunk_destroy(struct aws_h1_chunk *chunk);
9196
/* Destroy chunk and fire its completion callback */
9297
void aws_h1_chunk_complete_and_destroy(struct aws_h1_chunk *chunk, struct aws_http_stream *http_stream, int error_code);
9398

94-
int aws_chunk_line_from_options(struct aws_http1_chunk_options *options, struct aws_byte_buf *chunk_line);
95-
9699
AWS_EXTERN_C_BEGIN
97100

98101
/* Validate request and cache any info the encoder will need later in the "encoder message". */

‎include/aws/http/request_response.h

+2-1
Original file line numberDiff line numberDiff line change
@@ -881,7 +881,8 @@ AWS_FUTURE_T_POINTER_WITH_RELEASE_DECLARATION(aws_future_http_message, struct aw
881881

882882
/**
883883
* Submit a chunk of data to be sent on an HTTP/1.1 stream.
884-
* The stream must have specified "chunked" in a "transfer-encoding" header.
884+
* The stream must have specified "chunked" in a "transfer-encoding" header,
885+
* and the aws_http_message must NOT have any body stream set.
885886
* For client streams, activate() must be called before any chunks are submitted.
886887
* For server streams, the response must be submitted before any chunks.
887888
* A final chunk with size 0 must be submitted to successfully complete the HTTP-stream.

‎source/h1_encoder.c

+139-14
Original file line numberDiff line numberDiff line change
@@ -127,14 +127,6 @@ static int s_scan_outgoing_headers(
127127
return aws_raise_error(AWS_ERROR_HTTP_INVALID_HEADER_FIELD);
128128
}
129129

130-
if (encoder_message->has_chunked_encoding_header && has_body_stream) {
131-
AWS_LOGF_ERROR(
132-
AWS_LS_HTTP_STREAM,
133-
"id=static: Both Transfer-Encoding chunked header and body stream is set. "
134-
"chunked data must use the chunk API to write the body stream.");
135-
return aws_raise_error(AWS_ERROR_HTTP_INVALID_BODY_STREAM);
136-
}
137-
138130
if (body_headers_forbidden && (encoder_message->content_length > 0 || has_transfer_encoding_header)) {
139131
AWS_LOGF_ERROR(
140132
AWS_LS_HTTP_STREAM,
@@ -663,7 +655,7 @@ static int s_encode_stream(
663655
err = aws_input_stream_get_status(stream, &status);
664656
if (err) {
665657
ENCODER_LOGF(
666-
TRACE,
658+
ERROR,
667659
encoder,
668660
"Failed to query body stream status, error %d (%s)",
669661
aws_last_error(),
@@ -729,7 +721,10 @@ static int s_state_fn_head(struct aws_h1_encoder *encoder, struct aws_byte_buf *
729721

730722
/* Pick next state */
731723
if (encoder->message->body && encoder->message->content_length) {
732-
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_UNCHUNKED_BODY);
724+
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM);
725+
726+
} else if (encoder->message->body && encoder->message->has_chunked_encoding_header) {
727+
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM);
733728

734729
} else if (encoder->message->has_chunked_encoding_header) {
735730
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNK_NEXT);
@@ -739,8 +734,8 @@ static int s_state_fn_head(struct aws_h1_encoder *encoder, struct aws_byte_buf *
739734
}
740735
}
741736

742-
/* Write out body (not using chunked encoding). */
743-
static int s_state_fn_unchunked_body(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
737+
/* Write out body with known Content-Length (not using chunked encoding). */
738+
static int s_state_fn_unchunked_body_stream(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
744739
bool done;
745740
if (s_encode_stream(encoder, dst, encoder->message->body, encoder->message->content_length, &done)) {
746741
return AWS_OP_ERR;
@@ -755,6 +750,133 @@ static int s_state_fn_unchunked_body(struct aws_h1_encoder *encoder, struct aws_
755750
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_DONE);
756751
}
757752

753+
/* Write out body (of unknown Content-Length) using chunked encoding.
754+
* Each pass through this state writes out 1 chunk of body data (or nothing at all). */
755+
static int s_state_fn_chunked_body_stream(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
756+
757+
/* Each chunk is prefixed with: CHUNK-LENGTH-IN-ASCII-HEX CRLF
758+
* and suffixed with: CRLF
759+
*
760+
* When reading from the stream, we don't know how much data we'll get,
761+
* but the length needs to go in the prefix, before the data!
762+
* Therefore, leave space at start of dst buffer for the prefix,
763+
* we'll go back and write it AFTER streaming the body data.
764+
* Leave space at the end for the suffix too.
765+
*
766+
* Use a predictable length for the prefix.
767+
* 8 hex chars (i.e. "000000F7") seems reasonable (4 is too small, 16 is ridiculous, dynamic is complicated). */
768+
enum { padded_hex_len = 8 }; /* enum, because it's used as size for stack array */
769+
const char *padded_hex_fmt = "%08zX";
770+
const size_t max_hex_value_given_padding = UINT32_MAX; /* fits in 8 chars */
771+
const size_t chunk_prefix_len = padded_hex_len + CRLF_SIZE;
772+
const size_t chunk_suffix_len = CRLF_SIZE;
773+
774+
/* If dst buffer nearly full, don't bother reading from stream.
775+
* Remain in this state and we'll get a fresh buffer next tick. */
776+
const size_t dont_bother_if_space_less_than = 128; /* magic number, seems reasonable */
777+
AWS_ASSERT(dont_bother_if_space_less_than > chunk_prefix_len + chunk_suffix_len);
778+
if (dst->capacity - dst->len < dont_bother_if_space_less_than) {
779+
/* If this buffer is empty, and still not big enough, just give up.
780+
* Probably never happens, but g_aws_channel_max_fragment_size can theoretically be tweaked by user. */
781+
if (dst->len == 0) {
782+
AWS_LOGF_ERROR(
783+
AWS_LS_HTTP_STREAM, "id=%p Channel max fragment size is too small.", (void *)encoder->current_stream);
784+
return aws_raise_error(AWS_ERROR_INVALID_STATE);
785+
}
786+
787+
/* Remain in this state and we'll get a fresh buffer next tick */
788+
return AWS_OP_SUCCESS;
789+
}
790+
791+
/* Use a sub-buffer to limit where body can go.
792+
* Body will go after chunk-prefix, and needs to leave enough space for chunk-suffix afterwards. */
793+
uint8_t *body_sub_buf_start = dst->buffer + dst->len + chunk_prefix_len;
794+
uint8_t *body_sub_buf_end = dst->buffer + dst->capacity - chunk_suffix_len;
795+
struct aws_byte_buf body_sub_buf =
796+
aws_byte_buf_from_empty_array(body_sub_buf_start, body_sub_buf_end - body_sub_buf_start);
797+
/* We set aside a fixed number of bytes to encode the length, don't read more than that */
798+
body_sub_buf.capacity = aws_min_size(body_sub_buf.capacity, max_hex_value_given_padding);
799+
800+
/* Stream body into sub-buffer */
801+
ENCODER_LOG(TRACE, encoder, "Reading from body stream.");
802+
if (aws_input_stream_read(encoder->message->body, &body_sub_buf) != AWS_OP_SUCCESS) {
803+
ENCODER_LOGF(
804+
ERROR,
805+
encoder,
806+
"Failed to read body stream, error %d (%s)",
807+
aws_last_error(),
808+
aws_error_name(aws_last_error()));
809+
return AWS_OP_ERR;
810+
}
811+
812+
/* If ANY body data was streamed, then write in chunk prefix and suffix.
813+
*
814+
* (else no body data streamed, so dst remains untouched. Maybe we've
815+
* reached end of stream, maybe user just doesn't have data yet to send) */
816+
if (body_sub_buf.len > 0) {
817+
encoder->chunk_count++;
818+
ENCODER_LOGF(
819+
TRACE, encoder, "Sending chunk #%" PRIu64 " with size %zu", encoder->chunk_count, body_sub_buf.len);
820+
bool wrote_all = true;
821+
822+
/* Write chunk-prefix: LENGTH-IN-HEX CRLF */
823+
char hexbuf[padded_hex_len + 1] = {0};
824+
AWS_ASSERT(body_sub_buf.len <= max_hex_value_given_padding); /* guaranteed, b/c we clamped .capacity earlier */
825+
snprintf(hexbuf, sizeof(hexbuf), padded_hex_fmt, body_sub_buf.len);
826+
827+
wrote_all &= aws_byte_buf_write_from_whole_cursor(dst, aws_byte_cursor_from_c_str(hexbuf));
828+
wrote_all &= s_write_crlf(dst);
829+
830+
/* Increment dst->len, since we already copied body in there via sub-buffer */
831+
AWS_ASSERT(dst->buffer + dst->len == body_sub_buf_start); /* written chunk-prefix should end at body start */
832+
dst->len += body_sub_buf.len; /* safe b/c we clamped body_sub_buf.capacity earlier */
833+
834+
/* Write chunk-suffix: CRLF */
835+
wrote_all &= s_write_crlf(dst);
836+
837+
AWS_ASSERT(wrote_all); /* everything should have fit, we did a lot of math and clamping to guarantee it */
838+
(void)wrote_all;
839+
}
840+
841+
/* If body stream has ended: switch states.
842+
* As an optimization, we only do this check when the stream didn't 100% fill the buffer */
843+
if (body_sub_buf.len < body_sub_buf.capacity) {
844+
struct aws_stream_status stream_status;
845+
if (aws_input_stream_get_status(encoder->message->body, &stream_status) != AWS_OP_SUCCESS) {
846+
ENCODER_LOGF(
847+
ERROR,
848+
encoder,
849+
"Failed to query body stream status, error %d (%s)",
850+
aws_last_error(),
851+
aws_error_name(aws_last_error()));
852+
return AWS_OP_ERR;
853+
}
854+
855+
if (stream_status.is_end_of_stream) {
856+
encoder->chunk_count++;
857+
ENCODER_LOGF(TRACE, encoder, "Sending last chunk #%" PRIu64, encoder->chunk_count);
858+
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM_LAST_CHUNK);
859+
}
860+
}
861+
862+
/* Remain in state until done streaming body */
863+
return AWS_OP_SUCCESS;
864+
}
865+
866+
/* Note: this state is ONLY used when streaming a body of unknown Content-Length.
867+
* It is NOT used when the write_chunk() API is being used. */
868+
static int s_state_fn_chunked_body_stream_last_chunk(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
869+
870+
struct aws_byte_cursor last_chunk = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("0\r\n");
871+
if (aws_byte_buf_write_from_whole_cursor(dst, last_chunk) == true) {
872+
ENCODER_LOG(TRACE, encoder, "Last chunk complete");
873+
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNK_TRAILER);
874+
} else {
875+
/* Remain in state until there's enough space to write */
876+
return AWS_OP_SUCCESS;
877+
}
878+
}
879+
758880
/* Select next chunk to work on.
759881
* Encoder is essentially "paused" here if no chunks are available. */
760882
static int s_state_fn_chunk_next(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
@@ -773,7 +895,7 @@ static int s_state_fn_chunk_next(struct aws_h1_encoder *encoder, struct aws_byte
773895
ENCODER_LOGF(
774896
TRACE,
775897
encoder,
776-
"Begin sending chunk %zu with size %" PRIu64,
898+
"Begin sending chunk #%" PRIu64 " with size %" PRIu64,
777899
encoder->chunk_count,
778900
encoder->current_chunk->data_size);
779901

@@ -871,7 +993,10 @@ struct encoder_state_def {
871993
static struct encoder_state_def s_encoder_states[] = {
872994
[AWS_H1_ENCODER_STATE_INIT] = {.fn = s_state_fn_init, .name = "INIT"},
873995
[AWS_H1_ENCODER_STATE_HEAD] = {.fn = s_state_fn_head, .name = "HEAD"},
874-
[AWS_H1_ENCODER_STATE_UNCHUNKED_BODY] = {.fn = s_state_fn_unchunked_body, .name = "BODY"},
996+
[AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM] = {.fn = s_state_fn_unchunked_body_stream, .name = "BODY"},
997+
[AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM] = {.fn = s_state_fn_chunked_body_stream, .name = "CHUNKED_BODY_STREAM"},
998+
[AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM_LAST_CHUNK] =
999+
{.fn = s_state_fn_chunked_body_stream_last_chunk, .name = "LAST_CHUNK"},
8751000
[AWS_H1_ENCODER_STATE_CHUNK_NEXT] = {.fn = s_state_fn_chunk_next, .name = "CHUNK_NEXT"},
8761001
[AWS_H1_ENCODER_STATE_CHUNK_LINE] = {.fn = s_state_fn_chunk_line, .name = "CHUNK_LINE"},
8771002
[AWS_H1_ENCODER_STATE_CHUNK_BODY] = {.fn = s_state_fn_chunk_body, .name = "CHUNK_BODY"},

‎tests/CMakeLists.txt

+4-1
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ add_test_case(h1_encoder_transfer_encoding_chunked_across_multiple_headers)
6161
add_test_case(h1_encoder_case_insensitive_header_names)
6262
add_test_case(h1_encoder_rejects_transfer_encoding_chunked_header_combined_with_content_length)
6363
add_test_case(h1_encoder_rejects_transfer_encoding_header_without_chunked)
64-
add_test_case(h1_encoder_rejects_transfer_encoding_chunked_header_combined_with_body_stream)
6564
add_test_case(h1_encoder_transfer_encoding_chunked_header_with_multiple_encodings)
6665
add_test_case(h1_encoder_rejects_transfer_encoding_header_when_chunked_not_final_encoding)
6766
add_test_case(h1_encoder_rejects_transfer_encoding_header_not_ending_in_chunked)
@@ -76,13 +75,17 @@ add_test_case(h1_client_sanity_check)
7675
add_test_case(h1_client_request_send_1liner)
7776
add_test_case(h1_client_request_send_headers)
7877
add_test_case(h1_client_request_send_body)
78+
add_test_case(h1_client_request_send_body_chunked_and_streaming)
7979
add_test_case(h1_client_request_send_body_chunked)
8080
add_test_case(h1_client_request_send_chunked_trailer)
8181
add_test_case(h1_client_request_forbidden_trailer)
8282
add_test_case(h1_client_request_send_empty_chunked_trailer)
8383
add_test_case(h1_client_request_send_large_body)
8484
add_test_case(h1_client_request_send_large_body_chunked)
85+
add_test_case(h1_client_request_send_large_body_chunked_and_streaming)
8586
add_test_case(h1_client_request_send_large_head)
87+
add_test_case(h1_client_request_send_empty_body_chunked_and_streaming)
88+
add_test_case(h1_client_request_send_stalled_body_chunked_and_streaming)
8689
add_test_case(h1_client_request_content_length_0_ok)
8790
add_test_case(h1_client_request_waits_for_chunks)
8891
add_test_case(h1_client_request_send_chunk_from_chunk_complete_callback)

0 commit comments

Comments
 (0)