Skip to content

Move h1_stream variables, to make thread usage more explicit #504

New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Merged
merged 2 commits into from
Mar 10, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 17 additions & 15 deletions include/aws/http/private/h1_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,23 +40,21 @@ struct aws_h1_stream {
*/
struct aws_channel_task cross_thread_work_task;

/* Message (derived from outgoing request or response) to be submitted to encoder */
struct aws_h1_encoder_message encoder_message;

bool is_outgoing_message_done;
struct {
/* Message (derived from outgoing request or response) to be submitted to encoder */
struct aws_h1_encoder_message encoder_message;

bool is_incoming_message_done;
bool is_incoming_head_done;
bool is_outgoing_message_done;

/* If true, this is the last stream the connection should process.
* See RFC-7230 Section 6: Connection Management. */
bool is_final_stream;
bool is_incoming_message_done;
bool is_incoming_head_done;

/* Buffer for incoming data that needs to stick around. */
struct aws_byte_buf incoming_storage_buf;
/* If true, this is the last stream the connection should process.
* See RFC-7230 Section 6: Connection Management. */
bool is_final_stream;

struct {
/* TODO: move most other members in here */
/* Buffer for incoming data that needs to stick around. */
struct aws_byte_buf incoming_storage_buf;

/* List of `struct aws_h1_chunk`, used for chunked encoding.
* Encoder completes/frees/pops front chunk when it's done sending. */
Expand All @@ -77,12 +75,16 @@ struct aws_h1_stream {
* Sharing a lock is fine because it's rare for an HTTP/1 connection
* to have more than one stream at a time. */
struct {
/* Outgoing response on "request handler" stream which has been submitted by user,
* but hasn't yet moved to thread_data.encoder_message. */
struct aws_h1_encoder_message pending_outgoing_response;

/* List of `struct aws_h1_chunk` which have been submitted by user,
* but haven't yet moved to encoder_message.pending_chunk_list where the encoder will find them. */
* but haven't yet moved to thread_data.encoder_message.pending_chunk_list where the encoder will find them. */
struct aws_linked_list pending_chunk_list;

/* trailing headers which have been submitted by user,
* but haven't yet moved to encoder_message where the encoder will find them. */
* but haven't yet moved to thread_data.encoder_message where the encoder will find them. */
struct aws_h1_trailer *pending_trailer;

enum aws_h1_stream_api_state api_state;
Expand Down
36 changes: 18 additions & 18 deletions source/h1_connection.c
Original file line number Diff line number Diff line change
Expand Up @@ -637,7 +637,7 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) {
}

if (error_code != AWS_ERROR_SUCCESS) {
if (stream->base.client_data && stream->is_incoming_message_done) {
if (stream->base.client_data && stream->thread_data.is_incoming_message_done) {
/* As a request that finished receiving the response, we ignore error and
* consider it finished successfully */
AWS_LOGF_DEBUG(
Expand All @@ -649,7 +649,7 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) {
aws_error_name(error_code));
error_code = AWS_ERROR_SUCCESS;
}
if (stream->base.server_data && stream->is_outgoing_message_done) {
if (stream->base.server_data && stream->thread_data.is_outgoing_message_done) {
/* As a server finished sending the response, but still failed with the request was not finished receiving.
* We ignore error and consider it finished successfully */
AWS_LOGF_DEBUG(
Expand Down Expand Up @@ -693,7 +693,7 @@ static void s_stream_complete(struct aws_h1_stream *stream, int error_code) {

/* If connection must shut down, do it BEFORE invoking stream-complete callback.
* That way, if aws_http_connection_is_open() is called from stream-complete callback, it returns false. */
if (stream->is_final_stream) {
if (stream->thread_data.is_final_stream) {
AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION,
"id=%p: Closing connection due to completion of final stream.",
Expand Down Expand Up @@ -845,12 +845,12 @@ static void s_set_outgoing_message_done(struct aws_h1_stream *stream) {
struct aws_channel *channel = aws_http_connection_get_channel(connection);
AWS_ASSERT(aws_channel_thread_is_callers_thread(channel));

if (stream->is_outgoing_message_done) {
if (stream->thread_data.is_outgoing_message_done) {
/* Already did the job */
return;
}

stream->is_outgoing_message_done = true;
stream->thread_data.is_outgoing_message_done = true;
AWS_ASSERT(stream->base.metrics.send_end_timestamp_ns == -1);
aws_high_res_clock_get_ticks((uint64_t *)&stream->base.metrics.send_end_timestamp_ns);
AWS_ASSERT(stream->base.metrics.send_start_timestamp_ns != -1);
Expand Down Expand Up @@ -904,7 +904,7 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti

/* RFC-7230 section 6.6: Tear-down.
* If this was the final stream, don't allows any further streams to be sent */
if (current->is_final_stream) {
if (current->thread_data.is_final_stream) {
AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION,
"id=%p: Done sending final stream, no further streams will be sent.",
Expand All @@ -919,7 +919,7 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti
}

/* If it's also done receiving data, then it's complete! */
if (current->is_incoming_message_done) {
if (current->thread_data.is_incoming_message_done) {
/* Only 1st stream in list could finish receiving before it finished sending */
AWS_ASSERT(&current->node == aws_linked_list_begin(&connection->thread_data.stream_list));

Expand All @@ -942,7 +942,7 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti
struct aws_h1_stream *stream = AWS_CONTAINER_OF(node, struct aws_h1_stream, node);

/* If we already sent this stream's data, keep looking... */
if (stream->is_outgoing_message_done) {
if (stream->thread_data.is_outgoing_message_done) {
continue;
}

Expand Down Expand Up @@ -975,7 +975,7 @@ static struct aws_h1_stream *s_update_outgoing_stream_ptr(struct aws_h1_connecti
aws_high_res_clock_get_ticks((uint64_t *)&current->base.metrics.send_start_timestamp_ns);

err = aws_h1_encoder_start_message(
&connection->thread_data.encoder, &current->encoder_message, &current->base);
&connection->thread_data.encoder, &current->thread_data.encoder_message, &current->base);
(void)err;
AWS_ASSERT(connection->thread_data.encoder.state == AWS_H1_ENCODER_STATE_INIT);
AWS_ASSERT(!err);
Expand Down Expand Up @@ -1177,7 +1177,7 @@ static int s_decoder_on_request(
AWS_BYTE_CURSOR_PRI(*uri));

/* Copy strings to internal buffer */
struct aws_byte_buf *storage_buf = &incoming_stream->incoming_storage_buf;
struct aws_byte_buf *storage_buf = &incoming_stream->thread_data.incoming_storage_buf;
AWS_ASSERT(storage_buf->capacity == 0);

size_t storage_size = 0;
Expand Down Expand Up @@ -1261,7 +1261,7 @@ static int s_decoder_on_header(const struct aws_h1_decoded_header *header, void
"id=%p: Received 'Connection: close' header. This will be the final stream on this connection.",
(void *)&incoming_stream->base);

incoming_stream->is_final_stream = true;
incoming_stream->thread_data.is_final_stream = true;
{ /* BEGIN CRITICAL SECTION */
aws_h1_connection_lock_synced_data(connection);
connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
Expand All @@ -1278,7 +1278,7 @@ static int s_decoder_on_header(const struct aws_h1_decoded_header *header, void
* Mark the stream's outgoing message as complete,
* so that we stop sending, and stop waiting for it to finish sending.
**/
if (!incoming_stream->is_outgoing_message_done) {
if (!incoming_stream->thread_data.is_outgoing_message_done) {
AWS_LOGF_DEBUG(
AWS_LS_HTTP_STREAM,
"id=%p: Received 'Connection: close' header, no more request data will be sent.",
Expand Down Expand Up @@ -1323,7 +1323,7 @@ static int s_decoder_on_header(const struct aws_h1_decoded_header *header, void

static int s_mark_head_done(struct aws_h1_stream *incoming_stream) {
/* Bail out if we've already done this */
if (incoming_stream->is_incoming_head_done) {
if (incoming_stream->thread_data.is_incoming_head_done) {
return AWS_OP_SUCCESS;
}

Expand All @@ -1335,7 +1335,7 @@ static int s_mark_head_done(struct aws_h1_stream *incoming_stream) {

if (header_block == AWS_HTTP_HEADER_BLOCK_MAIN) {
AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "id=%p: Main header block done.", (void *)&incoming_stream->base);
incoming_stream->is_incoming_head_done = true;
incoming_stream->thread_data.is_incoming_head_done = true;

} else if (header_block == AWS_HTTP_HEADER_BLOCK_INFORMATIONAL) {
AWS_LOGF_TRACE(AWS_LS_HTTP_STREAM, "id=%p: Informational header block done.", (void *)&incoming_stream->base);
Expand Down Expand Up @@ -1443,7 +1443,7 @@ static int s_decoder_on_done(void *user_data) {
}

/* Otherwise the incoming stream is finished decoding and we will update it if needed */
incoming_stream->is_incoming_message_done = true;
incoming_stream->thread_data.is_incoming_message_done = true;
aws_high_res_clock_get_ticks((uint64_t *)&incoming_stream->base.metrics.receive_end_timestamp_ns);
AWS_ASSERT(incoming_stream->base.metrics.receive_start_timestamp_ns != -1);
AWS_ASSERT(
Expand All @@ -1454,7 +1454,7 @@ static int s_decoder_on_done(void *user_data) {

/* RFC-7230 section 6.6
* After reading the final message, the connection must not read any more */
if (incoming_stream->is_final_stream) {
if (incoming_stream->thread_data.is_final_stream) {
AWS_LOGF_TRACE(
AWS_LS_HTTP_CONNECTION,
"id=%p: Done reading final stream, no further streams will be read.",
Expand All @@ -1479,13 +1479,13 @@ static int s_decoder_on_done(void *user_data) {
return AWS_OP_ERR;
}
}
if (incoming_stream->is_outgoing_message_done) {
if (incoming_stream->thread_data.is_outgoing_message_done) {
AWS_ASSERT(&incoming_stream->node == aws_linked_list_begin(&connection->thread_data.stream_list));
s_stream_complete(incoming_stream, AWS_ERROR_SUCCESS);
}
s_set_incoming_stream_ptr(connection, NULL);

} else if (incoming_stream->is_outgoing_message_done) {
} else if (incoming_stream->thread_data.is_outgoing_message_done) {
/* Client side */
AWS_ASSERT(&incoming_stream->node == aws_linked_list_begin(&connection->thread_data.stream_list));

Expand Down
48 changes: 26 additions & 22 deletions source/h1_stream.c
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,9 @@ static void s_stream_destroy(struct aws_http_stream *stream_base) {
aws_linked_list_empty(&stream->synced_data.pending_chunk_list) &&
"Chunks should be marked complete before stream destroyed");

aws_h1_encoder_message_clean_up(&stream->encoder_message);
aws_byte_buf_clean_up(&stream->incoming_storage_buf);
aws_h1_encoder_message_clean_up(&stream->thread_data.encoder_message);
aws_h1_encoder_message_clean_up(&stream->synced_data.pending_outgoing_response);
aws_byte_buf_clean_up(&stream->thread_data.incoming_storage_buf);
aws_mem_release(stream->base.alloc, stream);
}

Expand Down Expand Up @@ -58,29 +59,33 @@ static void s_stream_cross_thread_work_task(struct aws_channel_task *task, void

int api_state = stream->synced_data.api_state;

bool found_chunks = !aws_linked_list_empty(&stream->synced_data.pending_chunk_list);
/* If we have any new outgoing data, prompt the connection to try and send it. */
bool new_outgoing_data = !aws_linked_list_empty(&stream->synced_data.pending_chunk_list);
aws_linked_list_move_all_back(&stream->thread_data.pending_chunk_list, &stream->synced_data.pending_chunk_list);

stream->encoder_message.trailer = stream->synced_data.pending_trailer;
stream->synced_data.pending_trailer = NULL;
/* If we JUST learned about having an outgoing response, that's a reason to try sending data */
if (stream->synced_data.has_outgoing_response && !stream->thread_data.has_outgoing_response) {
stream->thread_data.has_outgoing_response = true;
new_outgoing_data = true;

bool has_outgoing_response = stream->synced_data.has_outgoing_response;
stream->thread_data.encoder_message = stream->synced_data.pending_outgoing_response;
AWS_ZERO_STRUCT(stream->synced_data.pending_outgoing_response);

if (stream->thread_data.encoder_message.has_connection_close_header) {
/* This will be the last stream connection will process */
stream->thread_data.is_final_stream = true;
}
}

stream->thread_data.encoder_message.trailer = stream->synced_data.pending_trailer;
stream->synced_data.pending_trailer = NULL;

uint64_t pending_window_update = stream->synced_data.pending_window_update;
stream->synced_data.pending_window_update = 0;

s_stream_unlock_synced_data(stream);
/* END CRITICAL SECTION */

/* If we have any new outgoing data, prompt the connection to try and send it. */
bool new_outgoing_data = found_chunks;

/* If we JUST learned about having an outgoing response, that's a reason to try sending data */
if (has_outgoing_response && !stream->thread_data.has_outgoing_response) {
stream->thread_data.has_outgoing_response = true;
new_outgoing_data = true;
}

if (new_outgoing_data && (api_state == AWS_H1_STREAM_API_STATE_ACTIVE)) {
aws_h1_connection_try_write_outgoing_stream(connection);
}
Expand Down Expand Up @@ -413,7 +418,7 @@ struct aws_h1_stream *aws_h1_stream_new_request(

/* Validate request and cache info that the encoder will eventually need */
if (aws_h1_encoder_message_init_from_request(
&stream->encoder_message,
&stream->thread_data.encoder_message,
client_connection->alloc,
options->request,
&stream->thread_data.pending_chunk_list)) {
Expand All @@ -422,11 +427,11 @@ struct aws_h1_stream *aws_h1_stream_new_request(

/* RFC-7230 Section 6.3: The "close" connection option is used to signal
* that a connection will not persist after the current request/response*/
if (stream->encoder_message.has_connection_close_header) {
stream->is_final_stream = true;
if (stream->thread_data.encoder_message.has_connection_close_header) {
stream->thread_data.is_final_stream = true;
}

stream->synced_data.using_chunked_encoding = stream->encoder_message.has_chunked_encoding_header;
stream->synced_data.using_chunked_encoding = stream->thread_data.encoder_message.has_chunked_encoding_header;

return stream;

Expand Down Expand Up @@ -493,16 +498,15 @@ int aws_h1_stream_send_response(struct aws_h1_stream *stream, struct aws_http_me
error_code = AWS_ERROR_INVALID_STATE;
} else {
stream->synced_data.has_outgoing_response = true;
stream->encoder_message = encoder_message;
stream->synced_data.pending_outgoing_response = encoder_message;
if (encoder_message.has_connection_close_header) {
/* This will be the last stream connection will process, new streams will be rejected */
stream->is_final_stream = true;

/* Note: We're touching the connection's synced_data, which is OK
* because an h1_connection and all its h1_streams share a single lock. */
connection->synced_data.new_stream_error_code = AWS_ERROR_HTTP_CONNECTION_CLOSED;
}
stream->synced_data.using_chunked_encoding = stream->encoder_message.has_chunked_encoding_header;
stream->synced_data.using_chunked_encoding = encoder_message.has_chunked_encoding_header;

should_schedule_task = !stream->synced_data.is_cross_thread_work_task_scheduled;
stream->synced_data.is_cross_thread_work_task_scheduled = true;
Expand Down
4 changes: 2 additions & 2 deletions tests/test_connection_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -1564,7 +1564,7 @@ static int s_aws_http_on_incoming_header_block_done_proxy_test(
struct cm_tester *tester = &s_tester;
if (aws_http_stream_get_incoming_response_status(stream, &s_response_status_code) == AWS_OP_SUCCESS) {
aws_mutex_lock(&tester->lock);
tester->proxy_request_successful = s_response_status_code == 200;
tester->proxy_request_successful = s_response_status_code / 100 == 2;
aws_mutex_unlock(&tester->lock);
}

Expand Down Expand Up @@ -1771,7 +1771,7 @@ static int s_proxy_integration_test_helper_general(
aws_http_stream_activate(stream);

ASSERT_SUCCESS(s_wait_on_proxy_request_complete());
ASSERT_TRUE(s_response_status_code == 200);
ASSERT_TRUE(s_response_status_code / 100 == 2);

aws_http_stream_release(stream);
aws_http_message_destroy(request);
Expand Down
4 changes: 1 addition & 3 deletions tests/test_localhost_integ.c
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,6 @@ struct tester {
size_t stream_completed_count;
size_t stream_complete_errors;
size_t stream_200_count;
size_t stream_4xx_count;
size_t stream_status_not_200_count;

uint64_t num_sen_received;
Expand Down Expand Up @@ -187,10 +186,9 @@ static void s_tester_on_stream_completed(struct aws_http_stream *stream, int err
++s_tester.stream_complete_errors;
s_tester.stream_completed_error_code = aws_last_error();
} else {
if (status == 200) {
if (status / 100 == 2) {
s_tester.stream_completed_with_200 = true;
++s_tester.stream_200_count;
} else if (status / 100 == 4) {
} else {
++s_tester.stream_status_not_200_count;
}
Expand Down
2 changes: 1 addition & 1 deletion tests/test_stream_manager.c
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ static void s_sm_tester_on_stream_complete(struct aws_http_stream *stream, int e
++s_tester.stream_complete_errors;
s_tester.stream_completed_error_code = aws_last_error();
} else {
if (status == 200) {
if (status / 100 == 2) {
++s_tester.stream_200_count;
} else {
++s_tester.stream_status_not_200_count;
Expand Down