From 88a15d7dfc406ff6967a0a43f7e9f735f25d6a51 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Tue, 11 Mar 2025 13:21:19 -0700 Subject: [PATCH 1/8] HTTP/1: Support chunked encoding for streaming request body --- include/aws/http/private/h1_encoder.h | 11 +- include/aws/http/request_response.h | 3 +- source/h1_encoder.c | 152 ++++++++++++++-- tests/CMakeLists.txt | 4 +- tests/test_h1_client.c | 241 +++++++++++++++++++++++++- tests/test_h1_encoder.c | 48 ----- 6 files changed, 389 insertions(+), 70 deletions(-) diff --git a/include/aws/http/private/h1_encoder.h b/include/aws/http/private/h1_encoder.h index 11b4965c0..8f82b23d2 100644 --- a/include/aws/http/private/h1_encoder.h +++ b/include/aws/http/private/h1_encoder.h @@ -53,7 +53,12 @@ struct aws_h1_encoder_message { enum aws_h1_encoder_state { AWS_H1_ENCODER_STATE_INIT, AWS_H1_ENCODER_STATE_HEAD, - AWS_H1_ENCODER_STATE_UNCHUNKED_BODY, + /* Write streaming body, without chunked encoding, because Content-Length is known */ + AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM, + /* Write streaming body, with chunked encoding, because Content-Length is unknown */ + AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM, + AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM_LAST_CHUNK, + /* The rest of the _CHUNK_ states support the write_chunk() API (body stream not provided up front) */ AWS_H1_ENCODER_STATE_CHUNK_NEXT, AWS_H1_ENCODER_STATE_CHUNK_LINE, AWS_H1_ENCODER_STATE_CHUNK_BODY, @@ -73,7 +78,7 @@ struct aws_h1_encoder { /* Current chunk */ struct aws_h1_chunk *current_chunk; /* Number of chunks sent, just used for logging */ - size_t chunk_count; + uint64_t chunk_count; /* Encoder logs with this stream ptr as the ID, and passes this ptr to the chunk_complete callback */ struct aws_http_stream *current_stream; }; @@ -91,8 +96,6 @@ void aws_h1_chunk_destroy(struct aws_h1_chunk *chunk); /* Destroy chunk and fire its completion callback */ void aws_h1_chunk_complete_and_destroy(struct aws_h1_chunk *chunk, struct aws_http_stream *http_stream, int error_code); -int aws_chunk_line_from_options(struct aws_http1_chunk_options *options, struct aws_byte_buf *chunk_line); - AWS_EXTERN_C_BEGIN /* Validate request and cache any info the encoder will need later in the "encoder message". */ diff --git a/include/aws/http/request_response.h b/include/aws/http/request_response.h index d0d297724..7cd365da4 100644 --- a/include/aws/http/request_response.h +++ b/include/aws/http/request_response.h @@ -881,7 +881,8 @@ AWS_FUTURE_T_POINTER_WITH_RELEASE_DECLARATION(aws_future_http_message, struct aw /** * Submit a chunk of data to be sent on an HTTP/1.1 stream. - * The stream must have specified "chunked" in a "transfer-encoding" header. + * The stream must have specified "chunked" in a "transfer-encoding" header, + * and the aws_http_message must NOT have any body stream set. * For client streams, activate() must be called before any chunks are submitted. * For server streams, the response must be submitted before any chunks. * A final chunk with size 0 must be submitted to successfully complete the HTTP-stream. diff --git a/source/h1_encoder.c b/source/h1_encoder.c index 5374de120..c4c5b4063 100644 --- a/source/h1_encoder.c +++ b/source/h1_encoder.c @@ -127,14 +127,6 @@ static int s_scan_outgoing_headers( return aws_raise_error(AWS_ERROR_HTTP_INVALID_HEADER_FIELD); } - if (encoder_message->has_chunked_encoding_header && has_body_stream) { - AWS_LOGF_ERROR( - AWS_LS_HTTP_STREAM, - "id=static: Both Transfer-Encoding chunked header and body stream is set. " - "chunked data must use the chunk API to write the body stream."); - return aws_raise_error(AWS_ERROR_HTTP_INVALID_BODY_STREAM); - } - if (body_headers_forbidden && (encoder_message->content_length > 0 || has_transfer_encoding_header)) { AWS_LOGF_ERROR( AWS_LS_HTTP_STREAM, @@ -663,7 +655,7 @@ static int s_encode_stream( err = aws_input_stream_get_status(stream, &status); if (err) { ENCODER_LOGF( - TRACE, + ERROR, encoder, "Failed to query body stream status, error %d (%s)", aws_last_error(), @@ -729,7 +721,10 @@ static int s_state_fn_head(struct aws_h1_encoder *encoder, struct aws_byte_buf * /* Pick next state */ if (encoder->message->body && encoder->message->content_length) { - return s_switch_state(encoder, AWS_H1_ENCODER_STATE_UNCHUNKED_BODY); + return s_switch_state(encoder, AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM); + + } else if (encoder->message->body && encoder->message->has_chunked_encoding_header) { + return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM); } else if (encoder->message->has_chunked_encoding_header) { return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNK_NEXT); @@ -739,8 +734,8 @@ static int s_state_fn_head(struct aws_h1_encoder *encoder, struct aws_byte_buf * } } -/* Write out body (not using chunked encoding). */ -static int s_state_fn_unchunked_body(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) { +/* Write out body (not using chunked encoding) with known Content-Length. */ +static int s_state_fn_unchunked_body_stream(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) { bool done; if (s_encode_stream(encoder, dst, encoder->message->body, encoder->message->content_length, &done)) { return AWS_OP_ERR; @@ -755,6 +750,132 @@ static int s_state_fn_unchunked_body(struct aws_h1_encoder *encoder, struct aws_ return s_switch_state(encoder, AWS_H1_ENCODER_STATE_DONE); } +/* Write out body (of unknown Content-Length) using chunked encoding. + * Each pass through this state writes out 1 chunk of body data (or nothing at all). */ +static int s_state_fn_chunked_body_stream(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) { + + /* Each chunk is prefixed with: CHUNK-LENGTH-IN-HEX CRLF + * and suffixed with: CRLF + * + * When reading from the stream, we don't know how much data we'll get. + * But the length needs to go in the prefix, before the data! + * Therefore, leave space at start of dst buffer for the prefix + * we'll go back and write it AFTER streaming the body data. + * Leave space at the end for the suffix too. + * + * Use a predictable length for the prefix. + * 8 hex chars (i.e. "000000F7") seems reasonable (4 is too small, 16 is ridiculous, dynamic is complicated). */ + const size_t padded_hex_len = 8; + const char *padded_hex_fmt = "%08zX"; + const size_t max_hex_value_given_padding = UINT32_MAX; /* fits in 8 chars */ + const size_t chunk_prefix_len = padded_hex_len + CRLF_SIZE; + const size_t chunk_suffix_len = CRLF_SIZE; + + /* If dst buffer nearly full, don't bother reading from stream. + * Remain in this state and we'll get a fresh buffer next tick. */ + const size_t dont_bother_if_space_less_than = 128; /* magic number, seems reasonable */ + AWS_ASSERT(chunk_prefix_len + chunk_suffix_len < dont_bother_if_space_less_than); + if (dst->capacity - dst->len < dont_bother_if_space_less_than) { + /* If this buffer is empty, and still not big enough, just give up. + * Probably never happens, but g_aws_channel_max_fragment_size can theoretically be tweaked by user. */ + if (dst->len == 0) { + AWS_LOGF_ERROR( + AWS_LS_HTTP_STREAM, "id=%p Channel max fragment size is too small.", (void *)encoder->current_stream); + return aws_raise_error(AWS_ERROR_INVALID_STATE); + } + + /* Remain in this state and we'll get a fresh buffer next tick */ + return AWS_OP_SUCCESS; + } + + /* Use a sub-buffer to limit where body can go. + * Body will go after chunk-prefix, and needs to leave enough space for chunk-suffix afterwards. */ + uint8_t *body_sub_buf_start = dst->buffer + dst->len + chunk_prefix_len; + uint8_t *body_sub_buf_end = dst->buffer + dst->capacity - chunk_suffix_len; + struct aws_byte_buf body_sub_buf = + aws_byte_buf_from_empty_array(body_sub_buf_start, body_sub_buf_end - body_sub_buf_start); + /* We set aside a fixed number of bytes to encode the length, don't read more than that */ + body_sub_buf.capacity = aws_min_size(body_sub_buf.capacity, max_hex_value_given_padding); + + /* Stream body into sub-buffer */ + ENCODER_LOG(TRACE, encoder, "Reading from body stream."); + if (aws_input_stream_read(encoder->message->body, &body_sub_buf) != AWS_OP_SUCCESS) { + ENCODER_LOGF( + ERROR, + encoder, + "Failed to read body stream, error %d (%s)", + aws_last_error(), + aws_error_name(aws_last_error())); + return AWS_OP_ERR; + } + + /* If ANY body data was streamed, then write in chunk prefix and suffix. + * + * If no body data streamed, dst remains untouched: maybe we've reached end of stream, + * maybe user just doesn't have data yet to send */ + if (body_sub_buf.len > 0) { + encoder->chunk_count++; + ENCODER_LOGF( + TRACE, encoder, "Sending chunk #%" PRIu64 " with size %zu", encoder->chunk_count, body_sub_buf.len); + + /* Write chunk-prefix: LENGTH-IN-HEX CRLF */ + char hexbuf[padded_hex_len + 1] = {0}; + snprintf(hexbuf, sizeof(hexbuf), padded_hex_fmt, body_sub_buf.len); + aws_byte_buf_write_from_whole_cursor(dst, aws_byte_cursor_from_c_str(hexbuf)); + s_write_crlf(dst); + + /* Increment dst->len, since we already copied body in there via sub-buffer */ + + dst->len += body_sub_buf.len; + AWS_ASSERT(dst->len <= dst->capacity); + + /* Write chunk-suffix: CRLF */ + s_write_crlf(dst); + } + + /* If body stream has ended: switch states. + * As an optimization, we only do this check when the stream didn't 100% fill the buffer */ + if (body_sub_buf.len < body_sub_buf.capacity) { + struct aws_stream_status stream_status; + if (aws_input_stream_get_status(encoder->message->body, &stream_status) != AWS_OP_SUCCESS) { + ENCODER_LOGF( + ERROR, + encoder, + "Failed to query body stream status, error %d (%s)", + aws_last_error(), + aws_error_name(aws_last_error())); + return AWS_OP_ERR; + } + + if (stream_status.is_end_of_stream) { + encoder->chunk_count++; + ENCODER_LOGF(TRACE, encoder, "Sending last chunk #%" PRIu64, encoder->chunk_count); + return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM_LAST_CHUNK); + } + } + + /* Remain in state until done streaming body */ + return AWS_OP_SUCCESS; +} + +static int s_state_fn_chunked_body_stream_last_chunk(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) { + struct aws_byte_cursor last_chunk = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("0\r\n"); + + const size_t space_available = dst->capacity - dst->len; + if (space_available < last_chunk.len) { + /* Remain in state until there's enough space to write */ + return AWS_OP_SUCCESS; + } + + if (aws_byte_buf_write_from_whole_cursor(dst, last_chunk) == true) { + ENCODER_LOG(TRACE, encoder, "Last chunk complete"); + return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNK_TRAILER); + } else { + /* Remain in state until there's enough space to write */ + return AWS_OP_SUCCESS; + } +} + /* Select next chunk to work on. * Encoder is essentially "paused" here if no chunks are available. */ static int s_state_fn_chunk_next(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) { @@ -773,7 +894,7 @@ static int s_state_fn_chunk_next(struct aws_h1_encoder *encoder, struct aws_byte ENCODER_LOGF( TRACE, encoder, - "Begin sending chunk %zu with size %" PRIu64, + "Begin sending chunk #%" PRIu64 " with size %" PRIu64, encoder->chunk_count, encoder->current_chunk->data_size); @@ -871,7 +992,10 @@ struct encoder_state_def { static struct encoder_state_def s_encoder_states[] = { [AWS_H1_ENCODER_STATE_INIT] = {.fn = s_state_fn_init, .name = "INIT"}, [AWS_H1_ENCODER_STATE_HEAD] = {.fn = s_state_fn_head, .name = "HEAD"}, - [AWS_H1_ENCODER_STATE_UNCHUNKED_BODY] = {.fn = s_state_fn_unchunked_body, .name = "BODY"}, + [AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM] = {.fn = s_state_fn_unchunked_body_stream, .name = "BODY"}, + [AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM] = {.fn = s_state_fn_chunked_body_stream, .name = "CHUNKED_BODY_STREAM"}, + [AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM_LAST_CHUNK] = + {.fn = s_state_fn_chunked_body_stream_last_chunk, .name = "LAST_CHUNK"}, [AWS_H1_ENCODER_STATE_CHUNK_NEXT] = {.fn = s_state_fn_chunk_next, .name = "CHUNK_NEXT"}, [AWS_H1_ENCODER_STATE_CHUNK_LINE] = {.fn = s_state_fn_chunk_line, .name = "CHUNK_LINE"}, [AWS_H1_ENCODER_STATE_CHUNK_BODY] = {.fn = s_state_fn_chunk_body, .name = "CHUNK_BODY"}, diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index d05bff540..2e8c42a65 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -61,7 +61,6 @@ add_test_case(h1_encoder_transfer_encoding_chunked_across_multiple_headers) add_test_case(h1_encoder_case_insensitive_header_names) add_test_case(h1_encoder_rejects_transfer_encoding_chunked_header_combined_with_content_length) add_test_case(h1_encoder_rejects_transfer_encoding_header_without_chunked) -add_test_case(h1_encoder_rejects_transfer_encoding_chunked_header_combined_with_body_stream) add_test_case(h1_encoder_transfer_encoding_chunked_header_with_multiple_encodings) add_test_case(h1_encoder_rejects_transfer_encoding_header_when_chunked_not_final_encoding) add_test_case(h1_encoder_rejects_transfer_encoding_header_not_ending_in_chunked) @@ -76,13 +75,16 @@ add_test_case(h1_client_sanity_check) add_test_case(h1_client_request_send_1liner) add_test_case(h1_client_request_send_headers) add_test_case(h1_client_request_send_body) +add_test_case(h1_client_request_send_body_chunked_and_streaming) add_test_case(h1_client_request_send_body_chunked) add_test_case(h1_client_request_send_chunked_trailer) add_test_case(h1_client_request_forbidden_trailer) add_test_case(h1_client_request_send_empty_chunked_trailer) add_test_case(h1_client_request_send_large_body) add_test_case(h1_client_request_send_large_body_chunked) +add_test_case(h1_client_request_send_large_body_chunked_and_streaming) add_test_case(h1_client_request_send_large_head) +add_test_case(h1_client_request_send_empty_body_chunked_and_streaming) add_test_case(h1_client_request_content_length_0_ok) add_test_case(h1_client_request_waits_for_chunks) add_test_case(h1_client_request_send_chunk_from_chunk_complete_callback) diff --git a/tests/test_h1_client.c b/tests/test_h1_client.c index 9cbe7f798..4016fee5c 100644 --- a/tests/test_h1_client.c +++ b/tests/test_h1_client.c @@ -326,6 +326,121 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_body_chunked) { return AWS_OP_SUCCESS; } +H1_CLIENT_TEST_CASE(h1_client_request_send_body_chunked_and_streaming) { + (void)ctx; + struct tester tester; + ASSERT_SUCCESS(s_tester_init(&tester, allocator)); + + /* send request */ + static const struct aws_byte_cursor body = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("write more tests"); + struct aws_input_stream *body_stream = aws_input_stream_new_from_cursor(allocator, &body); + + struct aws_http_header headers[] = { + { + .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Host"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("amazon.com"), + }, + { + .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Transfer-Encoding"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("chunked"), + }, + }; + + struct aws_http_message *request = aws_http_message_new_request(allocator); + ASSERT_NOT_NULL(request); + ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("PUT"))); + ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/plan.txt"))); + aws_http_message_add_header_array(request, headers, AWS_ARRAY_SIZE(headers)); + aws_http_message_set_body_stream(request, body_stream); + + struct aws_http_make_request_options opt = { + .self_size = sizeof(opt), + .request = request, + }; + struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt); + ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); + + testing_channel_drain_queued_tasks(&tester.testing_channel); + + /* check result */ + const char *expected = "PUT /plan.txt HTTP/1.1\r\n" + "Host: amazon.com\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "00000010\r\n" /* implementation currently pads chunk-size */ + "write more tests" + "\r\n" + "0\r\n" + "\r\n"; + + ASSERT_SUCCESS(testing_channel_check_written_messages_str(&tester.testing_channel, allocator, expected)); + + /* clean up */ + aws_input_stream_release(body_stream); + aws_http_message_destroy(request); + aws_http_stream_release(stream); + + ASSERT_SUCCESS(s_tester_clean_up(&tester)); + return AWS_OP_SUCCESS; +} + +H1_CLIENT_TEST_CASE(h1_client_request_send_empty_body_chunked_and_streaming) { + (void)ctx; + struct tester tester; + ASSERT_SUCCESS(s_tester_init(&tester, allocator)); + + /* send request */ + static const struct aws_byte_cursor body = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL(""); + struct aws_input_stream *body_stream = aws_input_stream_new_from_cursor(allocator, &body); + + struct aws_http_header headers[] = { + { + .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Host"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("amazon.com"), + }, + { + .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Transfer-Encoding"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("chunked"), + }, + }; + + struct aws_http_message *request = aws_http_message_new_request(allocator); + ASSERT_NOT_NULL(request); + ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("PUT"))); + ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/plan.txt"))); + aws_http_message_add_header_array(request, headers, AWS_ARRAY_SIZE(headers)); + aws_http_message_set_body_stream(request, body_stream); + + struct aws_http_make_request_options opt = { + .self_size = sizeof(opt), + .request = request, + }; + struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt); + ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); + + testing_channel_drain_queued_tasks(&tester.testing_channel); + + /* check result */ + const char *expected = "PUT /plan.txt HTTP/1.1\r\n" + "Host: amazon.com\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n" + "0\r\n" + "\r\n"; + + ASSERT_SUCCESS(testing_channel_check_written_messages_str(&tester.testing_channel, allocator, expected)); + + /* clean up */ + aws_input_stream_release(body_stream); + aws_http_message_destroy(request); + aws_http_stream_release(stream); + + ASSERT_SUCCESS(s_tester_clean_up(&tester)); + return AWS_OP_SUCCESS; +} + int chunked_test_helper( const struct aws_byte_cursor *body, struct aws_http_headers *trailers, @@ -1177,7 +1292,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_chunk_size_0_with_extensions_ok) { return AWS_OP_SUCCESS; } -/* Send a request whose body doesn't fit in a single aws_io_message using content length*/ +/* Send a request (with streaming body and defined Content-Length) whose body doesn't fit in a single aws_io_message */ H1_CLIENT_TEST_CASE(h1_client_request_send_large_body) { (void)ctx; struct tester tester; @@ -1247,6 +1362,128 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_large_body) { return AWS_OP_SUCCESS; } +static int s_check_chunked_streaming_request( + struct aws_byte_cursor actual_full_request_message, + struct aws_byte_cursor expected_head, + struct aws_byte_cursor expected_body_data) { + + /* check request head */ + ASSERT_TRUE(actual_full_request_message.len >= expected_head.len); + struct aws_byte_cursor actual_chunked_body = actual_full_request_message; + struct aws_byte_cursor actual_next = aws_byte_cursor_advance(&actual_chunked_body, expected_head.len); + ASSERT_BIN_ARRAYS_EQUALS(expected_head.ptr, expected_head.len, actual_next.ptr, actual_next.len); + + /* parse body chunks. + * (currently, when doing streaming body chunks, we never use chunk extensions) */ + struct aws_byte_cursor crlf_cursor = aws_byte_cursor_from_c_str("\r\n"); + while (true) { + /* Each chunk is prefixed with: CHUNK-LENGTH-IN-HEX CRLF */ + struct aws_byte_cursor next_crlf; + ASSERT_SUCCESS(aws_byte_cursor_find_exact(&actual_chunked_body, &crlf_cursor, &next_crlf)); + size_t hex_len = next_crlf.ptr - actual_chunked_body.ptr; + actual_next = aws_byte_cursor_advance(&actual_chunked_body, hex_len); + uint64_t chunk_len; + ASSERT_SUCCESS(aws_byte_cursor_utf8_parse_u64_hex(actual_next, &chunk_len)); + actual_next = aws_byte_cursor_advance(&actual_chunked_body, 2); + ASSERT_CURSOR_VALUE_CSTRING_EQUALS(actual_next, "\r\n"); + + /* length of 0 indicates last chunk*/ + if (chunk_len == 0) { + ASSERT_UINT_EQUALS(0, expected_body_data.len, "message ended early"); + break; + } + + /* check that chunk's data matches what's next in expected_body_data */ + ASSERT_TRUE(chunk_len <= actual_chunked_body.len); + ASSERT_TRUE(chunk_len <= expected_body_data.len); + actual_next = aws_byte_cursor_advance(&actual_chunked_body, chunk_len); + ASSERT_BIN_ARRAYS_EQUALS(expected_body_data.ptr, chunk_len, actual_next.ptr, actual_next.len); + aws_byte_cursor_advance(&expected_body_data, chunk_len); + + /* CRLF at end of each chunk */ + actual_next = aws_byte_cursor_advance(&actual_chunked_body, 2); + ASSERT_CURSOR_VALUE_CSTRING_EQUALS(actual_next, "\r\n", "expected CRLF at end of chunk"); + } + + /* parse trailer + * (currently, when doing streaming body chunks, trailer is always empty) */ + actual_next = aws_byte_cursor_advance(&actual_chunked_body, 2); + ASSERT_CURSOR_VALUE_CSTRING_EQUALS(actual_next, "\r\n", "expected CRLF after empty chunk trailer"); + + /* assert there's nothing left over */ + ASSERT_UINT_EQUALS(0, actual_chunked_body.len, "extra bytes at end of message"); + return AWS_OP_SUCCESS; +} + +/* Send a request (with streaming body and chunked encoding) whose body doesn't fit in a single aws_io_message */ +H1_CLIENT_TEST_CASE(h1_client_request_send_large_body_chunked_and_streaming) { + (void)ctx; + struct tester tester; + ASSERT_SUCCESS(s_tester_init(&tester, allocator)); + + /* send request with large body full of random data */ + size_t body_len = 1024 * 1024 * 1; /* 1MB */ + struct aws_byte_buf body_buf; + ASSERT_SUCCESS(aws_byte_buf_init(&body_buf, allocator, body_len)); + while (body_buf.len < body_len) { + int r = rand(); + aws_byte_buf_write_be32(&body_buf, (uint32_t)r); + } + + const struct aws_byte_cursor body = aws_byte_cursor_from_buf(&body_buf); + struct aws_input_stream *body_stream = aws_input_stream_new_from_cursor(allocator, &body); + + struct aws_http_header headers[] = { + { + .name = aws_byte_cursor_from_c_str("Transfer-Encoding"), + .value = aws_byte_cursor_from_c_str("chunked"), + }, + }; + + struct aws_http_message *request = aws_http_message_new_request(allocator); + ASSERT_NOT_NULL(request); + ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("PUT"))); + ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/large.txt"))); + aws_http_message_add_header_array(request, headers, AWS_ARRAY_SIZE(headers)); + aws_http_message_set_body_stream(request, body_stream); + + struct aws_http_make_request_options opt = { + .self_size = sizeof(opt), + .request = request, + }; + struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt); + ASSERT_NOT_NULL(stream); + ASSERT_SUCCESS(aws_http_stream_activate(stream)); + + /* check result */ + const char *expected_head = "PUT /large.txt HTTP/1.1\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n"; + + testing_channel_drain_queued_tasks(&tester.testing_channel); + + struct aws_byte_buf written_buf; + ASSERT_SUCCESS(aws_byte_buf_init(&written_buf, allocator, body_len * 2)); + ASSERT_SUCCESS(testing_channel_drain_written_messages(&tester.testing_channel, &written_buf)); + + ASSERT_SUCCESS(s_check_chunked_streaming_request( + aws_byte_cursor_from_buf(&written_buf) /*actual_full_request_message*/, + aws_byte_cursor_from_c_str(expected_head), + body /*expected_body_data*/)); + + /* clean up */ + aws_input_stream_release(body_stream); + aws_http_message_destroy(request); + aws_http_stream_release(stream); + aws_byte_buf_clean_up(&body_buf); + aws_byte_buf_clean_up(&written_buf); + + ASSERT_SUCCESS(s_tester_clean_up(&tester)); + + aws_thread_current_sleep(100); + return AWS_OP_SUCCESS; +} + static int s_parse_chunked_extensions( const char *extensions, struct aws_http1_chunk_extension *expected_extensions, @@ -1345,7 +1582,7 @@ static int s_can_parse_as_chunked_encoding( return AWS_OP_SUCCESS; } -/* Send a request whose body doesn't fit in a single aws_io_message using chunked transfer encoding*/ +/* Send a request (using the write_chunk() API) whose body doesn't fit in a single aws_io_message */ H1_CLIENT_TEST_CASE(h1_client_request_send_large_body_chunked) { (void)ctx; struct tester tester; diff --git a/tests/test_h1_encoder.c b/tests/test_h1_encoder.c index 0fa89d5cd..30ef0f3bc 100644 --- a/tests/test_h1_encoder.c +++ b/tests/test_h1_encoder.c @@ -246,54 +246,6 @@ H1_ENCODER_TEST_CASE(h1_encoder_rejects_transfer_encoding_header_without_chunked AWS_ERROR_HTTP_INVALID_HEADER_VALUE /*expected_error*/); } -H1_ENCODER_TEST_CASE(h1_encoder_rejects_transfer_encoding_chunked_header_combined_with_body_stream) { - (void)ctx; - s_test_init(allocator); - struct aws_h1_encoder encoder; - aws_h1_encoder_init(&encoder, allocator); - - /* request to send - we won't actually send it, we want to validate headers are set correctly. */ - static const struct aws_byte_cursor body = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("write more tests"); - struct aws_input_stream *body_stream = aws_input_stream_new_from_cursor(allocator, &body); - - struct aws_http_header headers[] = { - { - .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Host"), - .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("amazon.com"), - }, - { - .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Transfer-Encoding"), - .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("chunked"), - }, - }; - - struct aws_http_message *request = aws_http_message_new_request(allocator); - ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("PUT"))); - ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/"))); - aws_http_message_add_header_array(request, headers, AWS_ARRAY_SIZE(headers)); - /* Setting the body stream should cause an error */ - aws_http_message_set_body_stream(request, body_stream); - - struct aws_linked_list chunk_list; - aws_linked_list_init(&chunk_list); - - struct aws_h1_encoder_message encoder_message; - ASSERT_ERROR( - AWS_ERROR_HTTP_INVALID_BODY_STREAM, - aws_h1_encoder_message_init_from_request(&encoder_message, allocator, request, &chunk_list)); - - ASSERT_FALSE(encoder_message.has_chunked_encoding_header); - ASSERT_FALSE(encoder_message.has_connection_close_header); - ASSERT_UINT_EQUALS(0, encoder_message.content_length); - - aws_input_stream_release(body_stream); - aws_http_message_destroy(request); - aws_h1_encoder_message_clean_up(&encoder_message); - aws_h1_encoder_clean_up(&encoder); - s_test_clean_up(); - return AWS_OP_SUCCESS; -} - H1_ENCODER_TEST_CASE(h1_encoder_rejects_transfer_encoding_header_not_ending_in_chunked) { (void)ctx; struct aws_http_header headers[] = { From 3f7b4ce12d3e83146aeb2ada4582773e4dafae82 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Tue, 11 Mar 2025 13:35:11 -0700 Subject: [PATCH 2/8] trivial comment tweaks --- source/h1_encoder.c | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/source/h1_encoder.c b/source/h1_encoder.c index c4c5b4063..183f9c911 100644 --- a/source/h1_encoder.c +++ b/source/h1_encoder.c @@ -734,7 +734,7 @@ static int s_state_fn_head(struct aws_h1_encoder *encoder, struct aws_byte_buf * } } -/* Write out body (not using chunked encoding) with known Content-Length. */ +/* Write out body with known Content-Length (not using chunked encoding). */ static int s_state_fn_unchunked_body_stream(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) { bool done; if (s_encode_stream(encoder, dst, encoder->message->body, encoder->message->content_length, &done)) { @@ -754,12 +754,12 @@ static int s_state_fn_unchunked_body_stream(struct aws_h1_encoder *encoder, stru * Each pass through this state writes out 1 chunk of body data (or nothing at all). */ static int s_state_fn_chunked_body_stream(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) { - /* Each chunk is prefixed with: CHUNK-LENGTH-IN-HEX CRLF + /* Each chunk is prefixed with: CHUNK-LENGTH-IN-ASCII-HEX CRLF * and suffixed with: CRLF * - * When reading from the stream, we don't know how much data we'll get. - * But the length needs to go in the prefix, before the data! - * Therefore, leave space at start of dst buffer for the prefix + * When reading from the stream, we don't know how much data we'll get, + * but the length needs to go in the prefix, before the data! + * Therefore, leave space at start of dst buffer for the prefix, * we'll go back and write it AFTER streaming the body data. * Leave space at the end for the suffix too. * @@ -811,8 +811,8 @@ static int s_state_fn_chunked_body_stream(struct aws_h1_encoder *encoder, struct /* If ANY body data was streamed, then write in chunk prefix and suffix. * - * If no body data streamed, dst remains untouched: maybe we've reached end of stream, - * maybe user just doesn't have data yet to send */ + * (else no body data streamed, so dst remains untouched. Maybe we've + * reached end of stream, maybe user just doesn't have data yet to send) */ if (body_sub_buf.len > 0) { encoder->chunk_count++; ENCODER_LOGF( @@ -858,6 +858,8 @@ static int s_state_fn_chunked_body_stream(struct aws_h1_encoder *encoder, struct return AWS_OP_SUCCESS; } +/* Note: this state is ONLY used when streaming a body of unknown Content-Length. + * It is NOT used when the write_chunk() API is being used. */ static int s_state_fn_chunked_body_stream_last_chunk(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) { struct aws_byte_cursor last_chunk = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("0\r\n"); From fc400ae43436c7fbfc3b9b7d730100a5c1ed551d Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Tue, 11 Mar 2025 20:55:04 +0000 Subject: [PATCH 3/8] make compilers happy --- source/h1_encoder.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/h1_encoder.c b/source/h1_encoder.c index 183f9c911..0a8f5b1f1 100644 --- a/source/h1_encoder.c +++ b/source/h1_encoder.c @@ -765,7 +765,7 @@ static int s_state_fn_chunked_body_stream(struct aws_h1_encoder *encoder, struct * * Use a predictable length for the prefix. * 8 hex chars (i.e. "000000F7") seems reasonable (4 is too small, 16 is ridiculous, dynamic is complicated). */ - const size_t padded_hex_len = 8; + enum { padded_hex_len = 8 }; /* enum so we can use a length of stack-array */ const char *padded_hex_fmt = "%08zX"; const size_t max_hex_value_given_padding = UINT32_MAX; /* fits in 8 chars */ const size_t chunk_prefix_len = padded_hex_len + CRLF_SIZE; From e4ac8c8a656eae1a681e71b1cc4eafd4a8230a74 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Tue, 11 Mar 2025 14:11:26 -0700 Subject: [PATCH 4/8] make compilers happy --- tests/test_h1_client.c | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_h1_client.c b/tests/test_h1_client.c index 4016fee5c..9e9ccdbc3 100644 --- a/tests/test_h1_client.c +++ b/tests/test_h1_client.c @@ -1396,9 +1396,9 @@ static int s_check_chunked_streaming_request( /* check that chunk's data matches what's next in expected_body_data */ ASSERT_TRUE(chunk_len <= actual_chunked_body.len); ASSERT_TRUE(chunk_len <= expected_body_data.len); - actual_next = aws_byte_cursor_advance(&actual_chunked_body, chunk_len); - ASSERT_BIN_ARRAYS_EQUALS(expected_body_data.ptr, chunk_len, actual_next.ptr, actual_next.len); - aws_byte_cursor_advance(&expected_body_data, chunk_len); + actual_next = aws_byte_cursor_advance(&actual_chunked_body, (size_t)chunk_len); + ASSERT_BIN_ARRAYS_EQUALS(expected_body_data.ptr, (size_t)chunk_len, actual_next.ptr, actual_next.len); + aws_byte_cursor_advance(&expected_body_data, (size_t)chunk_len); /* CRLF at end of each chunk */ actual_next = aws_byte_cursor_advance(&actual_chunked_body, 2); From d1b0d70d8b070e3be030a7548fb64cb5dd9c3f26 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Wed, 12 Mar 2025 16:26:03 -0700 Subject: [PATCH 5/8] comment typo --- source/h1_encoder.c | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/source/h1_encoder.c b/source/h1_encoder.c index 0a8f5b1f1..a20abbe92 100644 --- a/source/h1_encoder.c +++ b/source/h1_encoder.c @@ -765,7 +765,7 @@ static int s_state_fn_chunked_body_stream(struct aws_h1_encoder *encoder, struct * * Use a predictable length for the prefix. * 8 hex chars (i.e. "000000F7") seems reasonable (4 is too small, 16 is ridiculous, dynamic is complicated). */ - enum { padded_hex_len = 8 }; /* enum so we can use a length of stack-array */ + enum { padded_hex_len = 8 }; /* enum, because it's used as size for stack array */ const char *padded_hex_fmt = "%08zX"; const size_t max_hex_value_given_padding = UINT32_MAX; /* fits in 8 chars */ const size_t chunk_prefix_len = padded_hex_len + CRLF_SIZE; From e0f45989d4c5b9ac08fb6b3fe95fbc2605ec197e Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Thu, 13 Mar 2025 17:02:47 -0700 Subject: [PATCH 6/8] respond to feedback --- source/h1_encoder.c | 27 +++++++++++++-------------- 1 file changed, 13 insertions(+), 14 deletions(-) diff --git a/source/h1_encoder.c b/source/h1_encoder.c index a20abbe92..aecc01977 100644 --- a/source/h1_encoder.c +++ b/source/h1_encoder.c @@ -774,7 +774,7 @@ static int s_state_fn_chunked_body_stream(struct aws_h1_encoder *encoder, struct /* If dst buffer nearly full, don't bother reading from stream. * Remain in this state and we'll get a fresh buffer next tick. */ const size_t dont_bother_if_space_less_than = 128; /* magic number, seems reasonable */ - AWS_ASSERT(chunk_prefix_len + chunk_suffix_len < dont_bother_if_space_less_than); + AWS_ASSERT(dont_bother_if_space_less_than > chunk_prefix_len + chunk_suffix_len); if (dst->capacity - dst->len < dont_bother_if_space_less_than) { /* If this buffer is empty, and still not big enough, just give up. * Probably never happens, but g_aws_channel_max_fragment_size can theoretically be tweaked by user. */ @@ -817,20 +817,25 @@ static int s_state_fn_chunked_body_stream(struct aws_h1_encoder *encoder, struct encoder->chunk_count++; ENCODER_LOGF( TRACE, encoder, "Sending chunk #%" PRIu64 " with size %zu", encoder->chunk_count, body_sub_buf.len); + bool wrote_all = true; /* Write chunk-prefix: LENGTH-IN-HEX CRLF */ char hexbuf[padded_hex_len + 1] = {0}; + AWS_ASSERT(body_sub_buf.len <= max_hex_value_given_padding); /* guaranteed, b/c we clamped .capacity earlier */ snprintf(hexbuf, sizeof(hexbuf), padded_hex_fmt, body_sub_buf.len); - aws_byte_buf_write_from_whole_cursor(dst, aws_byte_cursor_from_c_str(hexbuf)); - s_write_crlf(dst); - /* Increment dst->len, since we already copied body in there via sub-buffer */ + wrote_all &= aws_byte_buf_write_from_whole_cursor(dst, aws_byte_cursor_from_c_str(hexbuf)); + wrote_all &= s_write_crlf(dst); - dst->len += body_sub_buf.len; - AWS_ASSERT(dst->len <= dst->capacity); + /* Increment dst->len, since we already copied body in there via sub-buffer */ + AWS_ASSERT(dst->buffer + dst->len == body_sub_buf_start); /* written chunk-prefix should end at body start */ + dst->len += body_sub_buf.len; /* safe b/c we clamped body_sub_buf.capacity earlier */ /* Write chunk-suffix: CRLF */ - s_write_crlf(dst); + wrote_all &= s_write_crlf(dst); + + AWS_ASSERT(wrote_all); /* everything should have fit, we did a lot of math and clamping to guarantee it */ + (void)wrote_all; } /* If body stream has ended: switch states. @@ -861,14 +866,8 @@ static int s_state_fn_chunked_body_stream(struct aws_h1_encoder *encoder, struct /* Note: this state is ONLY used when streaming a body of unknown Content-Length. * It is NOT used when the write_chunk() API is being used. */ static int s_state_fn_chunked_body_stream_last_chunk(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) { - struct aws_byte_cursor last_chunk = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("0\r\n"); - - const size_t space_available = dst->capacity - dst->len; - if (space_available < last_chunk.len) { - /* Remain in state until there's enough space to write */ - return AWS_OP_SUCCESS; - } + struct aws_byte_cursor last_chunk = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("0\r\n"); if (aws_byte_buf_write_from_whole_cursor(dst, last_chunk) == true) { ENCODER_LOG(TRACE, encoder, "Last chunk complete"); return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNK_TRAILER); From 6f8221ae952d797a7999a5268f5185258ccdfb3d Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Fri, 14 Mar 2025 12:25:29 -0700 Subject: [PATCH 7/8] add test for stalled streams --- tests/CMakeLists.txt | 1 + tests/test_h1_client.c | 146 +++++++++++++++++++++++++++++++++++++++++ 2 files changed, 147 insertions(+) diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 2e8c42a65..dbb2b7b7f 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -85,6 +85,7 @@ add_test_case(h1_client_request_send_large_body_chunked) add_test_case(h1_client_request_send_large_body_chunked_and_streaming) add_test_case(h1_client_request_send_large_head) add_test_case(h1_client_request_send_empty_body_chunked_and_streaming) +add_test_case(h1_client_request_send_stalled_body_chunked_and_streaming) add_test_case(h1_client_request_content_length_0_ok) add_test_case(h1_client_request_waits_for_chunks) add_test_case(h1_client_request_send_chunk_from_chunk_complete_callback) diff --git a/tests/test_h1_client.c b/tests/test_h1_client.c index 9e9ccdbc3..f383f7533 100644 --- a/tests/test_h1_client.c +++ b/tests/test_h1_client.c @@ -441,6 +441,152 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_empty_body_chunked_and_streaming) { return AWS_OP_SUCCESS; } +/** + * An input stream that can stall (provide 0 bytes from read()) for a while. + * Not thread safe. Set current_cursor to un-stall it. Set is_eof to end it. + */ +struct stalling_input_stream { + struct aws_input_stream base; + struct aws_allocator *allocator; + struct aws_byte_cursor current_cursor; + bool is_eof; +}; + +static int s_stalling_input_stream_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) { + struct stalling_input_stream *impl = AWS_CONTAINER_OF(stream, struct stalling_input_stream, base); + + aws_byte_buf_write_to_capacity(dest, &impl->current_cursor); + + return AWS_OP_SUCCESS; +} + +static int s_stalling_input_stream_get_status(struct aws_input_stream *stream, struct aws_stream_status *status) { + struct stalling_input_stream *impl = AWS_CONTAINER_OF(stream, struct stalling_input_stream, base); + + status->is_end_of_stream = impl->is_eof && impl->current_cursor.len == 0; + status->is_valid = true; + + return AWS_OP_SUCCESS; +} + +static void s_stalling_input_stream_add_data(struct aws_input_stream *stream, const char *data) { + struct stalling_input_stream *impl = AWS_CONTAINER_OF(stream, struct stalling_input_stream, base); + AWS_FATAL_ASSERT(impl->current_cursor.len == 0); + impl->current_cursor = aws_byte_cursor_from_c_str(data); +} + +static void s_stalling_input_stream_set_eof(struct aws_input_stream *stream) { + struct stalling_input_stream *impl = AWS_CONTAINER_OF(stream, struct stalling_input_stream, base); + impl->is_eof = true; +} + +static struct aws_input_stream_vtable s_stalling_input_stream_vtable = { + .read = s_stalling_input_stream_read, + .get_status = s_stalling_input_stream_get_status, +}; + +static void s_stalling_input_stream_destroy(struct stalling_input_stream *impl) { + aws_mem_release(impl->allocator, impl); +} + +static struct aws_input_stream *s_stalling_input_stream_new(struct aws_allocator *allocator) { + struct stalling_input_stream *impl = aws_mem_calloc(allocator, 1, sizeof(struct stalling_input_stream)); + impl->allocator = allocator; + impl->base.impl = impl; + impl->base.vtable = &s_stalling_input_stream_vtable; + aws_ref_count_init(&impl->base.ref_count, impl, (aws_simple_completion_callback *)s_stalling_input_stream_destroy); + return &impl->base; +} + +/* Send request, with chunked and streaming body, where the body stream "stalls" + * and doesn't fill the available space */ +H1_CLIENT_TEST_CASE(h1_client_request_send_stalled_body_chunked_and_streaming) { + (void)ctx; + struct tester tester; + ASSERT_SUCCESS(s_tester_init(&tester, allocator)); + + /* send request */ + struct aws_input_stream *stalling_stream = s_stalling_input_stream_new(allocator); + + struct aws_http_header headers[] = { + { + .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Host"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("amazon.com"), + }, + { + .name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Transfer-Encoding"), + .value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("chunked"), + }, + }; + + struct aws_http_message *request = aws_http_message_new_request(allocator); + ASSERT_NOT_NULL(request); + ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("PUT"))); + ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/plan.txt"))); + aws_http_message_add_header_array(request, headers, AWS_ARRAY_SIZE(headers)); + aws_http_message_set_body_stream(request, stalling_stream); + + struct aws_http_make_request_options opt = { + .self_size = sizeof(opt), + .request = request, + }; + struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt); + ASSERT_NOT_NULL(stream); + aws_http_stream_activate(stream); + + /* Stream is currently stalled, only the request head should have be written. + * The client should be stuck, repeatedly polling the input-stream once per event-loop tick */ + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + ASSERT_SUCCESS(testing_channel_check_written_messages_str( + &tester.testing_channel, + allocator, + /*expected*/ + "PUT /plan.txt HTTP/1.1\r\n" + "Host: amazon.com\r\n" + "Transfer-Encoding: chunked\r\n" + "\r\n")); + + /* now stream a bit of data.. */ + s_stalling_input_stream_add_data(stalling_stream, "baby's first chunk"); + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + ASSERT_SUCCESS(testing_channel_check_written_messages_str( + &tester.testing_channel, + allocator, + /*expected*/ + "00000012\r\n" + "baby's first chunk" + "\r\n")); + + /* now stream a bit more.. */ + s_stalling_input_stream_add_data(stalling_stream, "the second chunk"); + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + ASSERT_SUCCESS(testing_channel_check_written_messages_str( + &tester.testing_channel, + allocator, + /*expected*/ + "00000010\r\n" + "the second chunk" + "\r\n")); + + /* and now end the stream, the request should finish sending... */ + s_stalling_input_stream_set_eof(stalling_stream); + testing_channel_drain_queued_tasks(&tester.testing_channel); + ASSERT_SUCCESS(testing_channel_check_written_messages_str( + &tester.testing_channel, + allocator, + /*expected*/ + "0\r\n" + "\r\n")); + + /* clean up */ + aws_input_stream_release(stalling_stream); + aws_http_message_destroy(request); + aws_http_stream_release(stream); + + ASSERT_SUCCESS(s_tester_clean_up(&tester)); + return AWS_OP_SUCCESS; +} + int chunked_test_helper( const struct aws_byte_cursor *body, struct aws_http_headers *trailers, From 599c9b38770305bc20c3ccb256fc97b2bb9fff74 Mon Sep 17 00:00:00 2001 From: Michael Graeb Date: Fri, 14 Mar 2025 12:35:52 -0700 Subject: [PATCH 8/8] A few tweaks to me this more like a similar h2 test (h2_client_stream_send_stalled_data) --- tests/test_h1_client.c | 27 +++++++++++++++++++-------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/tests/test_h1_client.c b/tests/test_h1_client.c index f383f7533..f645f8d0b 100644 --- a/tests/test_h1_client.c +++ b/tests/test_h1_client.c @@ -534,8 +534,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_stalled_body_chunked_and_streaming) { ASSERT_NOT_NULL(stream); aws_http_stream_activate(stream); - /* Stream is currently stalled, only the request head should have be written. - * The client should be stuck, repeatedly polling the input-stream once per event-loop tick */ + /* Stream is currently stalled. After 1 event-loop tick, only the request head should have be written. */ testing_channel_run_currently_queued_tasks(&tester.testing_channel); ASSERT_SUCCESS(testing_channel_check_written_messages_str( &tester.testing_channel, @@ -546,7 +545,13 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_stalled_body_chunked_and_streaming) { "Transfer-Encoding: chunked\r\n" "\r\n")); - /* now stream a bit of data.. */ + /* Execute a few event-loop ticks. No more data should be written */ + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + ASSERT_TRUE(aws_linked_list_empty(testing_channel_get_written_message_queue(&tester.testing_channel))); + + /* Now stream a bit of data... */ s_stalling_input_stream_add_data(stalling_stream, "baby's first chunk"); testing_channel_run_currently_queued_tasks(&tester.testing_channel); ASSERT_SUCCESS(testing_channel_check_written_messages_str( @@ -557,18 +562,24 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_stalled_body_chunked_and_streaming) { "baby's first chunk" "\r\n")); - /* now stream a bit more.. */ - s_stalling_input_stream_add_data(stalling_stream, "the second chunk"); + /* Execute a few event-loop ticks. No more data should be written */ + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + testing_channel_run_currently_queued_tasks(&tester.testing_channel); + ASSERT_TRUE(aws_linked_list_empty(testing_channel_get_written_message_queue(&tester.testing_channel))); + + /* Now stream exactly 1 byte more... */ + s_stalling_input_stream_add_data(stalling_stream, "z"); testing_channel_run_currently_queued_tasks(&tester.testing_channel); ASSERT_SUCCESS(testing_channel_check_written_messages_str( &tester.testing_channel, allocator, /*expected*/ - "00000010\r\n" - "the second chunk" + "00000001\r\n" + "z" "\r\n")); - /* and now end the stream, the request should finish sending... */ + /* Finally, end the stream, the request should finish sending... */ s_stalling_input_stream_set_eof(stalling_stream); testing_channel_drain_queued_tasks(&tester.testing_channel); ASSERT_SUCCESS(testing_channel_check_written_messages_str(