Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

HTTP/1: Support streaming requests of unknown length #506

Merged
merged 9 commits into from
Mar 14, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 7 additions & 4 deletions include/aws/http/private/h1_encoder.h
Original file line number Diff line number Diff line change
@@ -53,7 +53,12 @@ struct aws_h1_encoder_message {
enum aws_h1_encoder_state {
AWS_H1_ENCODER_STATE_INIT,
AWS_H1_ENCODER_STATE_HEAD,
AWS_H1_ENCODER_STATE_UNCHUNKED_BODY,
/* Write streaming body, without chunked encoding, because Content-Length is known */
AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM,
/* Write streaming body, with chunked encoding, because Content-Length is unknown */
AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM,
AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM_LAST_CHUNK,
/* The rest of the _CHUNK_ states support the write_chunk() API (body stream not provided up front) */
AWS_H1_ENCODER_STATE_CHUNK_NEXT,
AWS_H1_ENCODER_STATE_CHUNK_LINE,
AWS_H1_ENCODER_STATE_CHUNK_BODY,
@@ -73,7 +78,7 @@ struct aws_h1_encoder {
/* Current chunk */
struct aws_h1_chunk *current_chunk;
/* Number of chunks sent, just used for logging */
size_t chunk_count;
uint64_t chunk_count;
/* Encoder logs with this stream ptr as the ID, and passes this ptr to the chunk_complete callback */
struct aws_http_stream *current_stream;
};
@@ -91,8 +96,6 @@ void aws_h1_chunk_destroy(struct aws_h1_chunk *chunk);
/* Destroy chunk and fire its completion callback */
void aws_h1_chunk_complete_and_destroy(struct aws_h1_chunk *chunk, struct aws_http_stream *http_stream, int error_code);

int aws_chunk_line_from_options(struct aws_http1_chunk_options *options, struct aws_byte_buf *chunk_line);

AWS_EXTERN_C_BEGIN

/* Validate request and cache any info the encoder will need later in the "encoder message". */
3 changes: 2 additions & 1 deletion include/aws/http/request_response.h
Original file line number Diff line number Diff line change
@@ -881,7 +881,8 @@ AWS_FUTURE_T_POINTER_WITH_RELEASE_DECLARATION(aws_future_http_message, struct aw

/**
* Submit a chunk of data to be sent on an HTTP/1.1 stream.
* The stream must have specified "chunked" in a "transfer-encoding" header.
* The stream must have specified "chunked" in a "transfer-encoding" header,
* and the aws_http_message must NOT have any body stream set.
* For client streams, activate() must be called before any chunks are submitted.
* For server streams, the response must be submitted before any chunks.
* A final chunk with size 0 must be submitted to successfully complete the HTTP-stream.
153 changes: 139 additions & 14 deletions source/h1_encoder.c
Original file line number Diff line number Diff line change
@@ -127,14 +127,6 @@ static int s_scan_outgoing_headers(
return aws_raise_error(AWS_ERROR_HTTP_INVALID_HEADER_FIELD);
}

if (encoder_message->has_chunked_encoding_header && has_body_stream) {
AWS_LOGF_ERROR(
AWS_LS_HTTP_STREAM,
"id=static: Both Transfer-Encoding chunked header and body stream is set. "
"chunked data must use the chunk API to write the body stream.");
return aws_raise_error(AWS_ERROR_HTTP_INVALID_BODY_STREAM);
}

if (body_headers_forbidden && (encoder_message->content_length > 0 || has_transfer_encoding_header)) {
AWS_LOGF_ERROR(
AWS_LS_HTTP_STREAM,
@@ -663,7 +655,7 @@ static int s_encode_stream(
err = aws_input_stream_get_status(stream, &status);
if (err) {
ENCODER_LOGF(
TRACE,
ERROR,
encoder,
"Failed to query body stream status, error %d (%s)",
aws_last_error(),
@@ -729,7 +721,10 @@ static int s_state_fn_head(struct aws_h1_encoder *encoder, struct aws_byte_buf *

/* Pick next state */
if (encoder->message->body && encoder->message->content_length) {
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_UNCHUNKED_BODY);
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM);

} else if (encoder->message->body && encoder->message->has_chunked_encoding_header) {
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM);

} else if (encoder->message->has_chunked_encoding_header) {
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNK_NEXT);
@@ -739,8 +734,8 @@ static int s_state_fn_head(struct aws_h1_encoder *encoder, struct aws_byte_buf *
}
}

/* Write out body (not using chunked encoding). */
static int s_state_fn_unchunked_body(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
/* Write out body with known Content-Length (not using chunked encoding). */
static int s_state_fn_unchunked_body_stream(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
bool done;
if (s_encode_stream(encoder, dst, encoder->message->body, encoder->message->content_length, &done)) {
return AWS_OP_ERR;
@@ -755,6 +750,133 @@ static int s_state_fn_unchunked_body(struct aws_h1_encoder *encoder, struct aws_
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_DONE);
}

/* Write out body (of unknown Content-Length) using chunked encoding.
* Each pass through this state writes out 1 chunk of body data (or nothing at all). */
static int s_state_fn_chunked_body_stream(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {

/* Each chunk is prefixed with: CHUNK-LENGTH-IN-ASCII-HEX CRLF
* and suffixed with: CRLF
*
* When reading from the stream, we don't know how much data we'll get,
* but the length needs to go in the prefix, before the data!
* Therefore, leave space at start of dst buffer for the prefix,
* we'll go back and write it AFTER streaming the body data.
* Leave space at the end for the suffix too.
*
* Use a predictable length for the prefix.
* 8 hex chars (i.e. "000000F7") seems reasonable (4 is too small, 16 is ridiculous, dynamic is complicated). */
enum { padded_hex_len = 8 }; /* enum, because it's used as size for stack array */
const char *padded_hex_fmt = "%08zX";
const size_t max_hex_value_given_padding = UINT32_MAX; /* fits in 8 chars */
const size_t chunk_prefix_len = padded_hex_len + CRLF_SIZE;
const size_t chunk_suffix_len = CRLF_SIZE;

/* If dst buffer nearly full, don't bother reading from stream.
* Remain in this state and we'll get a fresh buffer next tick. */
const size_t dont_bother_if_space_less_than = 128; /* magic number, seems reasonable */
AWS_ASSERT(dont_bother_if_space_less_than > chunk_prefix_len + chunk_suffix_len);
if (dst->capacity - dst->len < dont_bother_if_space_less_than) {
/* If this buffer is empty, and still not big enough, just give up.
* Probably never happens, but g_aws_channel_max_fragment_size can theoretically be tweaked by user. */
if (dst->len == 0) {
AWS_LOGF_ERROR(
AWS_LS_HTTP_STREAM, "id=%p Channel max fragment size is too small.", (void *)encoder->current_stream);
return aws_raise_error(AWS_ERROR_INVALID_STATE);
}

/* Remain in this state and we'll get a fresh buffer next tick */
return AWS_OP_SUCCESS;
}

/* Use a sub-buffer to limit where body can go.
* Body will go after chunk-prefix, and needs to leave enough space for chunk-suffix afterwards. */
uint8_t *body_sub_buf_start = dst->buffer + dst->len + chunk_prefix_len;
uint8_t *body_sub_buf_end = dst->buffer + dst->capacity - chunk_suffix_len;
struct aws_byte_buf body_sub_buf =
aws_byte_buf_from_empty_array(body_sub_buf_start, body_sub_buf_end - body_sub_buf_start);
/* We set aside a fixed number of bytes to encode the length, don't read more than that */
body_sub_buf.capacity = aws_min_size(body_sub_buf.capacity, max_hex_value_given_padding);

/* Stream body into sub-buffer */
ENCODER_LOG(TRACE, encoder, "Reading from body stream.");
if (aws_input_stream_read(encoder->message->body, &body_sub_buf) != AWS_OP_SUCCESS) {
ENCODER_LOGF(
ERROR,
encoder,
"Failed to read body stream, error %d (%s)",
aws_last_error(),
aws_error_name(aws_last_error()));
return AWS_OP_ERR;
}

/* If ANY body data was streamed, then write in chunk prefix and suffix.
*
* (else no body data streamed, so dst remains untouched. Maybe we've
* reached end of stream, maybe user just doesn't have data yet to send) */
if (body_sub_buf.len > 0) {
encoder->chunk_count++;
ENCODER_LOGF(
TRACE, encoder, "Sending chunk #%" PRIu64 " with size %zu", encoder->chunk_count, body_sub_buf.len);
bool wrote_all = true;

/* Write chunk-prefix: LENGTH-IN-HEX CRLF */
char hexbuf[padded_hex_len + 1] = {0};
AWS_ASSERT(body_sub_buf.len <= max_hex_value_given_padding); /* guaranteed, b/c we clamped .capacity earlier */
snprintf(hexbuf, sizeof(hexbuf), padded_hex_fmt, body_sub_buf.len);

wrote_all &= aws_byte_buf_write_from_whole_cursor(dst, aws_byte_cursor_from_c_str(hexbuf));
wrote_all &= s_write_crlf(dst);

/* Increment dst->len, since we already copied body in there via sub-buffer */
AWS_ASSERT(dst->buffer + dst->len == body_sub_buf_start); /* written chunk-prefix should end at body start */
dst->len += body_sub_buf.len; /* safe b/c we clamped body_sub_buf.capacity earlier */

/* Write chunk-suffix: CRLF */
wrote_all &= s_write_crlf(dst);

AWS_ASSERT(wrote_all); /* everything should have fit, we did a lot of math and clamping to guarantee it */
(void)wrote_all;
}

/* If body stream has ended: switch states.
* As an optimization, we only do this check when the stream didn't 100% fill the buffer */
if (body_sub_buf.len < body_sub_buf.capacity) {
struct aws_stream_status stream_status;
if (aws_input_stream_get_status(encoder->message->body, &stream_status) != AWS_OP_SUCCESS) {
ENCODER_LOGF(
ERROR,
encoder,
"Failed to query body stream status, error %d (%s)",
aws_last_error(),
aws_error_name(aws_last_error()));
return AWS_OP_ERR;
}

if (stream_status.is_end_of_stream) {
encoder->chunk_count++;
ENCODER_LOGF(TRACE, encoder, "Sending last chunk #%" PRIu64, encoder->chunk_count);
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM_LAST_CHUNK);
}
}

/* Remain in state until done streaming body */
return AWS_OP_SUCCESS;
}

/* Note: this state is ONLY used when streaming a body of unknown Content-Length.
* It is NOT used when the write_chunk() API is being used. */
static int s_state_fn_chunked_body_stream_last_chunk(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {

struct aws_byte_cursor last_chunk = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("0\r\n");
if (aws_byte_buf_write_from_whole_cursor(dst, last_chunk) == true) {
ENCODER_LOG(TRACE, encoder, "Last chunk complete");
return s_switch_state(encoder, AWS_H1_ENCODER_STATE_CHUNK_TRAILER);
} else {
/* Remain in state until there's enough space to write */
return AWS_OP_SUCCESS;
}
}

/* Select next chunk to work on.
* Encoder is essentially "paused" here if no chunks are available. */
static int s_state_fn_chunk_next(struct aws_h1_encoder *encoder, struct aws_byte_buf *dst) {
@@ -773,7 +895,7 @@ static int s_state_fn_chunk_next(struct aws_h1_encoder *encoder, struct aws_byte
ENCODER_LOGF(
TRACE,
encoder,
"Begin sending chunk %zu with size %" PRIu64,
"Begin sending chunk #%" PRIu64 " with size %" PRIu64,
encoder->chunk_count,
encoder->current_chunk->data_size);

@@ -871,7 +993,10 @@ struct encoder_state_def {
static struct encoder_state_def s_encoder_states[] = {
[AWS_H1_ENCODER_STATE_INIT] = {.fn = s_state_fn_init, .name = "INIT"},
[AWS_H1_ENCODER_STATE_HEAD] = {.fn = s_state_fn_head, .name = "HEAD"},
[AWS_H1_ENCODER_STATE_UNCHUNKED_BODY] = {.fn = s_state_fn_unchunked_body, .name = "BODY"},
[AWS_H1_ENCODER_STATE_UNCHUNKED_BODY_STREAM] = {.fn = s_state_fn_unchunked_body_stream, .name = "BODY"},
[AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM] = {.fn = s_state_fn_chunked_body_stream, .name = "CHUNKED_BODY_STREAM"},
[AWS_H1_ENCODER_STATE_CHUNKED_BODY_STREAM_LAST_CHUNK] =
{.fn = s_state_fn_chunked_body_stream_last_chunk, .name = "LAST_CHUNK"},
[AWS_H1_ENCODER_STATE_CHUNK_NEXT] = {.fn = s_state_fn_chunk_next, .name = "CHUNK_NEXT"},
[AWS_H1_ENCODER_STATE_CHUNK_LINE] = {.fn = s_state_fn_chunk_line, .name = "CHUNK_LINE"},
[AWS_H1_ENCODER_STATE_CHUNK_BODY] = {.fn = s_state_fn_chunk_body, .name = "CHUNK_BODY"},
5 changes: 4 additions & 1 deletion tests/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -61,7 +61,6 @@ add_test_case(h1_encoder_transfer_encoding_chunked_across_multiple_headers)
add_test_case(h1_encoder_case_insensitive_header_names)
add_test_case(h1_encoder_rejects_transfer_encoding_chunked_header_combined_with_content_length)
add_test_case(h1_encoder_rejects_transfer_encoding_header_without_chunked)
add_test_case(h1_encoder_rejects_transfer_encoding_chunked_header_combined_with_body_stream)
add_test_case(h1_encoder_transfer_encoding_chunked_header_with_multiple_encodings)
add_test_case(h1_encoder_rejects_transfer_encoding_header_when_chunked_not_final_encoding)
add_test_case(h1_encoder_rejects_transfer_encoding_header_not_ending_in_chunked)
@@ -76,13 +75,17 @@ add_test_case(h1_client_sanity_check)
add_test_case(h1_client_request_send_1liner)
add_test_case(h1_client_request_send_headers)
add_test_case(h1_client_request_send_body)
add_test_case(h1_client_request_send_body_chunked_and_streaming)
add_test_case(h1_client_request_send_body_chunked)
add_test_case(h1_client_request_send_chunked_trailer)
add_test_case(h1_client_request_forbidden_trailer)
add_test_case(h1_client_request_send_empty_chunked_trailer)
add_test_case(h1_client_request_send_large_body)
add_test_case(h1_client_request_send_large_body_chunked)
add_test_case(h1_client_request_send_large_body_chunked_and_streaming)
add_test_case(h1_client_request_send_large_head)
add_test_case(h1_client_request_send_empty_body_chunked_and_streaming)
add_test_case(h1_client_request_send_stalled_body_chunked_and_streaming)
add_test_case(h1_client_request_content_length_0_ok)
add_test_case(h1_client_request_waits_for_chunks)
add_test_case(h1_client_request_send_chunk_from_chunk_complete_callback)
398 changes: 396 additions & 2 deletions tests/test_h1_client.c
Original file line number Diff line number Diff line change
@@ -326,6 +326,278 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_body_chunked) {
return AWS_OP_SUCCESS;
}

H1_CLIENT_TEST_CASE(h1_client_request_send_body_chunked_and_streaming) {
(void)ctx;
struct tester tester;
ASSERT_SUCCESS(s_tester_init(&tester, allocator));

/* send request */
static const struct aws_byte_cursor body = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("write more tests");
struct aws_input_stream *body_stream = aws_input_stream_new_from_cursor(allocator, &body);

struct aws_http_header headers[] = {
{
.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Host"),
.value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("amazon.com"),
},
{
.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Transfer-Encoding"),
.value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("chunked"),
},
};

struct aws_http_message *request = aws_http_message_new_request(allocator);
ASSERT_NOT_NULL(request);
ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("PUT")));
ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/plan.txt")));
aws_http_message_add_header_array(request, headers, AWS_ARRAY_SIZE(headers));
aws_http_message_set_body_stream(request, body_stream);

struct aws_http_make_request_options opt = {
.self_size = sizeof(opt),
.request = request,
};
struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt);
ASSERT_NOT_NULL(stream);
aws_http_stream_activate(stream);

testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check result */
const char *expected = "PUT /plan.txt HTTP/1.1\r\n"
"Host: amazon.com\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"00000010\r\n" /* implementation currently pads chunk-size */
"write more tests"
"\r\n"
"0\r\n"
"\r\n";

ASSERT_SUCCESS(testing_channel_check_written_messages_str(&tester.testing_channel, allocator, expected));

/* clean up */
aws_input_stream_release(body_stream);
aws_http_message_destroy(request);
aws_http_stream_release(stream);

ASSERT_SUCCESS(s_tester_clean_up(&tester));
return AWS_OP_SUCCESS;
}

H1_CLIENT_TEST_CASE(h1_client_request_send_empty_body_chunked_and_streaming) {
(void)ctx;
struct tester tester;
ASSERT_SUCCESS(s_tester_init(&tester, allocator));

/* send request */
static const struct aws_byte_cursor body = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("");
struct aws_input_stream *body_stream = aws_input_stream_new_from_cursor(allocator, &body);

struct aws_http_header headers[] = {
{
.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Host"),
.value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("amazon.com"),
},
{
.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Transfer-Encoding"),
.value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("chunked"),
},
};

struct aws_http_message *request = aws_http_message_new_request(allocator);
ASSERT_NOT_NULL(request);
ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("PUT")));
ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/plan.txt")));
aws_http_message_add_header_array(request, headers, AWS_ARRAY_SIZE(headers));
aws_http_message_set_body_stream(request, body_stream);

struct aws_http_make_request_options opt = {
.self_size = sizeof(opt),
.request = request,
};
struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt);
ASSERT_NOT_NULL(stream);
aws_http_stream_activate(stream);

testing_channel_drain_queued_tasks(&tester.testing_channel);

/* check result */
const char *expected = "PUT /plan.txt HTTP/1.1\r\n"
"Host: amazon.com\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"
"0\r\n"
"\r\n";

ASSERT_SUCCESS(testing_channel_check_written_messages_str(&tester.testing_channel, allocator, expected));

/* clean up */
aws_input_stream_release(body_stream);
aws_http_message_destroy(request);
aws_http_stream_release(stream);

ASSERT_SUCCESS(s_tester_clean_up(&tester));
return AWS_OP_SUCCESS;
}

/**
* An input stream that can stall (provide 0 bytes from read()) for a while.
* Not thread safe. Set current_cursor to un-stall it. Set is_eof to end it.
*/
struct stalling_input_stream {
struct aws_input_stream base;
struct aws_allocator *allocator;
struct aws_byte_cursor current_cursor;
bool is_eof;
};

static int s_stalling_input_stream_read(struct aws_input_stream *stream, struct aws_byte_buf *dest) {
struct stalling_input_stream *impl = AWS_CONTAINER_OF(stream, struct stalling_input_stream, base);

aws_byte_buf_write_to_capacity(dest, &impl->current_cursor);

return AWS_OP_SUCCESS;
}

static int s_stalling_input_stream_get_status(struct aws_input_stream *stream, struct aws_stream_status *status) {
struct stalling_input_stream *impl = AWS_CONTAINER_OF(stream, struct stalling_input_stream, base);

status->is_end_of_stream = impl->is_eof && impl->current_cursor.len == 0;
status->is_valid = true;

return AWS_OP_SUCCESS;
}

static void s_stalling_input_stream_add_data(struct aws_input_stream *stream, const char *data) {
struct stalling_input_stream *impl = AWS_CONTAINER_OF(stream, struct stalling_input_stream, base);
AWS_FATAL_ASSERT(impl->current_cursor.len == 0);
impl->current_cursor = aws_byte_cursor_from_c_str(data);
}

static void s_stalling_input_stream_set_eof(struct aws_input_stream *stream) {
struct stalling_input_stream *impl = AWS_CONTAINER_OF(stream, struct stalling_input_stream, base);
impl->is_eof = true;
}

static struct aws_input_stream_vtable s_stalling_input_stream_vtable = {
.read = s_stalling_input_stream_read,
.get_status = s_stalling_input_stream_get_status,
};

static void s_stalling_input_stream_destroy(struct stalling_input_stream *impl) {
aws_mem_release(impl->allocator, impl);
}

static struct aws_input_stream *s_stalling_input_stream_new(struct aws_allocator *allocator) {
struct stalling_input_stream *impl = aws_mem_calloc(allocator, 1, sizeof(struct stalling_input_stream));
impl->allocator = allocator;
impl->base.impl = impl;
impl->base.vtable = &s_stalling_input_stream_vtable;
aws_ref_count_init(&impl->base.ref_count, impl, (aws_simple_completion_callback *)s_stalling_input_stream_destroy);
return &impl->base;
}

/* Send request, with chunked and streaming body, where the body stream "stalls"
* and doesn't fill the available space */
H1_CLIENT_TEST_CASE(h1_client_request_send_stalled_body_chunked_and_streaming) {
(void)ctx;
struct tester tester;
ASSERT_SUCCESS(s_tester_init(&tester, allocator));

/* send request */
struct aws_input_stream *stalling_stream = s_stalling_input_stream_new(allocator);

struct aws_http_header headers[] = {
{
.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Host"),
.value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("amazon.com"),
},
{
.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Transfer-Encoding"),
.value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("chunked"),
},
};

struct aws_http_message *request = aws_http_message_new_request(allocator);
ASSERT_NOT_NULL(request);
ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("PUT")));
ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/plan.txt")));
aws_http_message_add_header_array(request, headers, AWS_ARRAY_SIZE(headers));
aws_http_message_set_body_stream(request, stalling_stream);

struct aws_http_make_request_options opt = {
.self_size = sizeof(opt),
.request = request,
};
struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt);
ASSERT_NOT_NULL(stream);
aws_http_stream_activate(stream);

/* Stream is currently stalled. After 1 event-loop tick, only the request head should have be written. */
testing_channel_run_currently_queued_tasks(&tester.testing_channel);
ASSERT_SUCCESS(testing_channel_check_written_messages_str(
&tester.testing_channel,
allocator,
/*expected*/
"PUT /plan.txt HTTP/1.1\r\n"
"Host: amazon.com\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n"));

/* Execute a few event-loop ticks. No more data should be written */
testing_channel_run_currently_queued_tasks(&tester.testing_channel);
testing_channel_run_currently_queued_tasks(&tester.testing_channel);
testing_channel_run_currently_queued_tasks(&tester.testing_channel);
ASSERT_TRUE(aws_linked_list_empty(testing_channel_get_written_message_queue(&tester.testing_channel)));

/* Now stream a bit of data... */
s_stalling_input_stream_add_data(stalling_stream, "baby's first chunk");
testing_channel_run_currently_queued_tasks(&tester.testing_channel);
ASSERT_SUCCESS(testing_channel_check_written_messages_str(
&tester.testing_channel,
allocator,
/*expected*/
"00000012\r\n"
"baby's first chunk"
"\r\n"));

/* Execute a few event-loop ticks. No more data should be written */
testing_channel_run_currently_queued_tasks(&tester.testing_channel);
testing_channel_run_currently_queued_tasks(&tester.testing_channel);
testing_channel_run_currently_queued_tasks(&tester.testing_channel);
ASSERT_TRUE(aws_linked_list_empty(testing_channel_get_written_message_queue(&tester.testing_channel)));

/* Now stream exactly 1 byte more... */
s_stalling_input_stream_add_data(stalling_stream, "z");
testing_channel_run_currently_queued_tasks(&tester.testing_channel);
ASSERT_SUCCESS(testing_channel_check_written_messages_str(
&tester.testing_channel,
allocator,
/*expected*/
"00000001\r\n"
"z"
"\r\n"));

/* Finally, end the stream, the request should finish sending... */
s_stalling_input_stream_set_eof(stalling_stream);
testing_channel_drain_queued_tasks(&tester.testing_channel);
ASSERT_SUCCESS(testing_channel_check_written_messages_str(
&tester.testing_channel,
allocator,
/*expected*/
"0\r\n"
"\r\n"));

/* clean up */
aws_input_stream_release(stalling_stream);
aws_http_message_destroy(request);
aws_http_stream_release(stream);

ASSERT_SUCCESS(s_tester_clean_up(&tester));
return AWS_OP_SUCCESS;
}

int chunked_test_helper(
const struct aws_byte_cursor *body,
struct aws_http_headers *trailers,
@@ -1177,7 +1449,7 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_chunk_size_0_with_extensions_ok) {
return AWS_OP_SUCCESS;
}

/* Send a request whose body doesn't fit in a single aws_io_message using content length*/
/* Send a request (with streaming body and defined Content-Length) whose body doesn't fit in a single aws_io_message */
H1_CLIENT_TEST_CASE(h1_client_request_send_large_body) {
(void)ctx;
struct tester tester;
@@ -1247,6 +1519,128 @@ H1_CLIENT_TEST_CASE(h1_client_request_send_large_body) {
return AWS_OP_SUCCESS;
}

static int s_check_chunked_streaming_request(
struct aws_byte_cursor actual_full_request_message,
struct aws_byte_cursor expected_head,
struct aws_byte_cursor expected_body_data) {

/* check request head */
ASSERT_TRUE(actual_full_request_message.len >= expected_head.len);
struct aws_byte_cursor actual_chunked_body = actual_full_request_message;
struct aws_byte_cursor actual_next = aws_byte_cursor_advance(&actual_chunked_body, expected_head.len);
ASSERT_BIN_ARRAYS_EQUALS(expected_head.ptr, expected_head.len, actual_next.ptr, actual_next.len);

/* parse body chunks.
* (currently, when doing streaming body chunks, we never use chunk extensions) */
struct aws_byte_cursor crlf_cursor = aws_byte_cursor_from_c_str("\r\n");
while (true) {
/* Each chunk is prefixed with: CHUNK-LENGTH-IN-HEX CRLF */
struct aws_byte_cursor next_crlf;
ASSERT_SUCCESS(aws_byte_cursor_find_exact(&actual_chunked_body, &crlf_cursor, &next_crlf));
size_t hex_len = next_crlf.ptr - actual_chunked_body.ptr;
actual_next = aws_byte_cursor_advance(&actual_chunked_body, hex_len);
uint64_t chunk_len;
ASSERT_SUCCESS(aws_byte_cursor_utf8_parse_u64_hex(actual_next, &chunk_len));
actual_next = aws_byte_cursor_advance(&actual_chunked_body, 2);
ASSERT_CURSOR_VALUE_CSTRING_EQUALS(actual_next, "\r\n");

/* length of 0 indicates last chunk*/
if (chunk_len == 0) {
ASSERT_UINT_EQUALS(0, expected_body_data.len, "message ended early");
break;
}

/* check that chunk's data matches what's next in expected_body_data */
ASSERT_TRUE(chunk_len <= actual_chunked_body.len);
ASSERT_TRUE(chunk_len <= expected_body_data.len);
actual_next = aws_byte_cursor_advance(&actual_chunked_body, (size_t)chunk_len);
ASSERT_BIN_ARRAYS_EQUALS(expected_body_data.ptr, (size_t)chunk_len, actual_next.ptr, actual_next.len);
aws_byte_cursor_advance(&expected_body_data, (size_t)chunk_len);

/* CRLF at end of each chunk */
actual_next = aws_byte_cursor_advance(&actual_chunked_body, 2);
ASSERT_CURSOR_VALUE_CSTRING_EQUALS(actual_next, "\r\n", "expected CRLF at end of chunk");
}

/* parse trailer
* (currently, when doing streaming body chunks, trailer is always empty) */
actual_next = aws_byte_cursor_advance(&actual_chunked_body, 2);
ASSERT_CURSOR_VALUE_CSTRING_EQUALS(actual_next, "\r\n", "expected CRLF after empty chunk trailer");

/* assert there's nothing left over */
ASSERT_UINT_EQUALS(0, actual_chunked_body.len, "extra bytes at end of message");
return AWS_OP_SUCCESS;
}

/* Send a request (with streaming body and chunked encoding) whose body doesn't fit in a single aws_io_message */
H1_CLIENT_TEST_CASE(h1_client_request_send_large_body_chunked_and_streaming) {
(void)ctx;
struct tester tester;
ASSERT_SUCCESS(s_tester_init(&tester, allocator));

/* send request with large body full of random data */
size_t body_len = 1024 * 1024 * 1; /* 1MB */
struct aws_byte_buf body_buf;
ASSERT_SUCCESS(aws_byte_buf_init(&body_buf, allocator, body_len));
while (body_buf.len < body_len) {
int r = rand();
aws_byte_buf_write_be32(&body_buf, (uint32_t)r);
}

const struct aws_byte_cursor body = aws_byte_cursor_from_buf(&body_buf);
struct aws_input_stream *body_stream = aws_input_stream_new_from_cursor(allocator, &body);

struct aws_http_header headers[] = {
{
.name = aws_byte_cursor_from_c_str("Transfer-Encoding"),
.value = aws_byte_cursor_from_c_str("chunked"),
},
};

struct aws_http_message *request = aws_http_message_new_request(allocator);
ASSERT_NOT_NULL(request);
ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("PUT")));
ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/large.txt")));
aws_http_message_add_header_array(request, headers, AWS_ARRAY_SIZE(headers));
aws_http_message_set_body_stream(request, body_stream);

struct aws_http_make_request_options opt = {
.self_size = sizeof(opt),
.request = request,
};
struct aws_http_stream *stream = aws_http_connection_make_request(tester.connection, &opt);
ASSERT_NOT_NULL(stream);
ASSERT_SUCCESS(aws_http_stream_activate(stream));

/* check result */
const char *expected_head = "PUT /large.txt HTTP/1.1\r\n"
"Transfer-Encoding: chunked\r\n"
"\r\n";

testing_channel_drain_queued_tasks(&tester.testing_channel);

struct aws_byte_buf written_buf;
ASSERT_SUCCESS(aws_byte_buf_init(&written_buf, allocator, body_len * 2));
ASSERT_SUCCESS(testing_channel_drain_written_messages(&tester.testing_channel, &written_buf));

ASSERT_SUCCESS(s_check_chunked_streaming_request(
aws_byte_cursor_from_buf(&written_buf) /*actual_full_request_message*/,
aws_byte_cursor_from_c_str(expected_head),
body /*expected_body_data*/));

/* clean up */
aws_input_stream_release(body_stream);
aws_http_message_destroy(request);
aws_http_stream_release(stream);
aws_byte_buf_clean_up(&body_buf);
aws_byte_buf_clean_up(&written_buf);

ASSERT_SUCCESS(s_tester_clean_up(&tester));

aws_thread_current_sleep(100);
return AWS_OP_SUCCESS;
}

static int s_parse_chunked_extensions(
const char *extensions,
struct aws_http1_chunk_extension *expected_extensions,
@@ -1345,7 +1739,7 @@ static int s_can_parse_as_chunked_encoding(
return AWS_OP_SUCCESS;
}

/* Send a request whose body doesn't fit in a single aws_io_message using chunked transfer encoding*/
/* Send a request (using the write_chunk() API) whose body doesn't fit in a single aws_io_message */
H1_CLIENT_TEST_CASE(h1_client_request_send_large_body_chunked) {
(void)ctx;
struct tester tester;
48 changes: 0 additions & 48 deletions tests/test_h1_encoder.c
Original file line number Diff line number Diff line change
@@ -246,54 +246,6 @@ H1_ENCODER_TEST_CASE(h1_encoder_rejects_transfer_encoding_header_without_chunked
AWS_ERROR_HTTP_INVALID_HEADER_VALUE /*expected_error*/);
}

H1_ENCODER_TEST_CASE(h1_encoder_rejects_transfer_encoding_chunked_header_combined_with_body_stream) {
(void)ctx;
s_test_init(allocator);
struct aws_h1_encoder encoder;
aws_h1_encoder_init(&encoder, allocator);

/* request to send - we won't actually send it, we want to validate headers are set correctly. */
static const struct aws_byte_cursor body = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("write more tests");
struct aws_input_stream *body_stream = aws_input_stream_new_from_cursor(allocator, &body);

struct aws_http_header headers[] = {
{
.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Host"),
.value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("amazon.com"),
},
{
.name = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("Transfer-Encoding"),
.value = AWS_BYTE_CUR_INIT_FROM_STRING_LITERAL("chunked"),
},
};

struct aws_http_message *request = aws_http_message_new_request(allocator);
ASSERT_SUCCESS(aws_http_message_set_request_method(request, aws_byte_cursor_from_c_str("PUT")));
ASSERT_SUCCESS(aws_http_message_set_request_path(request, aws_byte_cursor_from_c_str("/")));
aws_http_message_add_header_array(request, headers, AWS_ARRAY_SIZE(headers));
/* Setting the body stream should cause an error */
aws_http_message_set_body_stream(request, body_stream);

struct aws_linked_list chunk_list;
aws_linked_list_init(&chunk_list);

struct aws_h1_encoder_message encoder_message;
ASSERT_ERROR(
AWS_ERROR_HTTP_INVALID_BODY_STREAM,
aws_h1_encoder_message_init_from_request(&encoder_message, allocator, request, &chunk_list));

ASSERT_FALSE(encoder_message.has_chunked_encoding_header);
ASSERT_FALSE(encoder_message.has_connection_close_header);
ASSERT_UINT_EQUALS(0, encoder_message.content_length);

aws_input_stream_release(body_stream);
aws_http_message_destroy(request);
aws_h1_encoder_message_clean_up(&encoder_message);
aws_h1_encoder_clean_up(&encoder);
s_test_clean_up();
return AWS_OP_SUCCESS;
}

H1_ENCODER_TEST_CASE(h1_encoder_rejects_transfer_encoding_header_not_ending_in_chunked) {
(void)ctx;
struct aws_http_header headers[] = {