Skip to content

Commit

Permalink
[C] Add support for loading human readable params for the driver conf…
Browse files Browse the repository at this point in the history
…ig from environment. Issue #603.
  • Loading branch information
mjpt777 committed Jan 13, 2019
1 parent 7737345 commit 0309b09
Showing 1 changed file with 94 additions and 25 deletions.
119 changes: 94 additions & 25 deletions aeron-driver/src/main/c/aeron_driver_context.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@

#include "util/aeron_error.h"
#include "protocol/aeron_udp_protocol.h"
#include "util/aeron_prop_util.h"
#include "util/aeron_fileutil.h"
#include "aeron_driver_context.h"
#include "aeron_alloc.h"
Expand Down Expand Up @@ -140,6 +141,52 @@ uint64_t aeron_config_parse_uint64(const char *str, uint64_t def, uint64_t min,
return result;
}

uint64_t aeron_config_parse_size64(const char *name, const char *str, uint64_t def, uint64_t min, uint64_t max)
{
uint64_t result = def;

if (NULL != str)
{
uint64_t value = 0;

if (-1 == aeron_parse_size64(str, &value))
{
printf("WARNING: %s=%s is invalid, using default\n", name, str);
}
else
{
result = value;
result = result > max ? max : result;
result = result < min ? min : result;
}
}

return result;
}

uint64_t aeron_config_parse_duration_ns(const char *name, const char *str, uint64_t def, uint64_t min, uint64_t max)
{
uint64_t result = def;

if (NULL != str)
{
uint64_t value = 0;

if (-1 == aeron_parse_duration_ns(str, &value))
{
printf("WARNING: %s=%s is invalid, using default\n", name, str);
}
else
{
result = value;
result = result > max ? max : result;
result = result < min ? min : result;
}
}

return result;
}

#define AERON_CONFIG_GETENV_OR_DEFAULT(e, d) ((NULL == getenv(e)) ? (d) : getenv(e))

static void aeron_driver_conductor_to_driver_interceptor_null(
Expand Down Expand Up @@ -331,19 +378,22 @@ int aeron_driver_context_init(aeron_driver_context_t **context)
getenv(AERON_SPIES_SIMULATE_CONNECTION_ENV_VAR),
_context->spies_simulate_connection);

_context->to_driver_buffer_length = aeron_config_parse_uint64(
_context->to_driver_buffer_length = aeron_config_parse_size64(
AERON_TO_CONDUCTOR_BUFFER_LENGTH_ENV_VAR,
getenv(AERON_TO_CONDUCTOR_BUFFER_LENGTH_ENV_VAR),
_context->to_driver_buffer_length,
1024 + AERON_RB_TRAILER_LENGTH,
INT32_MAX);

_context->to_clients_buffer_length = aeron_config_parse_uint64(
_context->to_clients_buffer_length = aeron_config_parse_size64(
AERON_TO_CLIENTS_BUFFER_LENGTH_ENV_VAR,
getenv(AERON_TO_CLIENTS_BUFFER_LENGTH_ENV_VAR),
_context->to_clients_buffer_length,
1024 + AERON_BROADCAST_BUFFER_TRAILER_LENGTH,
INT32_MAX);

_context->counters_values_buffer_length = aeron_config_parse_uint64(
_context->counters_values_buffer_length = aeron_config_parse_size64(
AERON_COUNTERS_VALUES_BUFFER_LENGTH_ENV_VAR,
getenv(AERON_COUNTERS_VALUES_BUFFER_LENGTH_ENV_VAR),
_context->counters_values_buffer_length,
1024,
Expand All @@ -353,67 +403,78 @@ int aeron_driver_context_init(aeron_driver_context_t **context)
_context->counters_values_buffer_length *
(AERON_COUNTERS_MANAGER_METADATA_LENGTH / AERON_COUNTERS_MANAGER_VALUE_LENGTH);

_context->error_buffer_length = aeron_config_parse_uint64(
_context->error_buffer_length = aeron_config_parse_size64(
AERON_ERROR_BUFFER_LENGTH_ENV_VAR,
getenv(AERON_ERROR_BUFFER_LENGTH_ENV_VAR),
_context->error_buffer_length,
1024,
INT32_MAX);

_context->client_liveness_timeout_ns = aeron_config_parse_uint64(
_context->client_liveness_timeout_ns = aeron_config_parse_duration_ns(
AERON_CLIENT_LIVENESS_TIMEOUT_ENV_VAR,
getenv(AERON_CLIENT_LIVENESS_TIMEOUT_ENV_VAR),
_context->client_liveness_timeout_ns,
1000,
INT64_MAX);

_context->publication_linger_timeout_ns = aeron_config_parse_uint64(
_context->publication_linger_timeout_ns = aeron_config_parse_duration_ns(
AERON_PUBLICATION_LINGER_TIMEOUT_ENV_VAR,
getenv(AERON_PUBLICATION_LINGER_TIMEOUT_ENV_VAR),
_context->publication_linger_timeout_ns,
1000,
INT64_MAX);

_context->term_buffer_length = aeron_config_parse_uint64(
_context->term_buffer_length = aeron_config_parse_size64(
AERON_TERM_BUFFER_LENGTH_ENV_VAR,
getenv(AERON_TERM_BUFFER_LENGTH_ENV_VAR),
_context->term_buffer_length,
1024,
INT32_MAX);

_context->ipc_term_buffer_length = aeron_config_parse_uint64(
_context->ipc_term_buffer_length = aeron_config_parse_size64(
AERON_IPC_TERM_BUFFER_LENGTH_ENV_VAR,
getenv(AERON_IPC_TERM_BUFFER_LENGTH_ENV_VAR),
_context->ipc_term_buffer_length,
1024,
INT32_MAX);

_context->mtu_length = aeron_config_parse_uint64(
_context->mtu_length = aeron_config_parse_size64(
AERON_MTU_LENGTH_ENV_VAR,
getenv(AERON_MTU_LENGTH_ENV_VAR),
_context->mtu_length,
AERON_DATA_HEADER_LENGTH,
AERON_MAX_UDP_PAYLOAD_LENGTH);

_context->ipc_mtu_length = aeron_config_parse_uint64(
_context->ipc_mtu_length = aeron_config_parse_size64(
AERON_IPC_MTU_LENGTH_ENV_VAR,
getenv(AERON_IPC_MTU_LENGTH_ENV_VAR),
_context->ipc_mtu_length,
AERON_DATA_HEADER_LENGTH,
AERON_MAX_UDP_PAYLOAD_LENGTH);

_context->ipc_publication_window_length = aeron_config_parse_uint64(
_context->ipc_publication_window_length = aeron_config_parse_size64(
AERON_IPC_PUBLICATION_TERM_WINDOW_LENGTH_ENV_VAR,
getenv(AERON_IPC_PUBLICATION_TERM_WINDOW_LENGTH_ENV_VAR),
_context->ipc_publication_window_length,
0,
INT32_MAX);

_context->publication_window_length = aeron_config_parse_uint64(
_context->publication_window_length = aeron_config_parse_size64(
AERON_PUBLICATION_TERM_WINDOW_LENGTH_ENV_VAR,
getenv(AERON_PUBLICATION_TERM_WINDOW_LENGTH_ENV_VAR),
_context->publication_window_length,
0,
INT32_MAX);

_context->socket_rcvbuf = aeron_config_parse_uint64(
_context->socket_rcvbuf = aeron_config_parse_size64(
AERON_SOCKET_SO_RCVBUF_ENV_VAR,
getenv(AERON_SOCKET_SO_RCVBUF_ENV_VAR),
_context->socket_rcvbuf,
0,
INT32_MAX);

_context->socket_sndbuf = aeron_config_parse_uint64(
_context->socket_sndbuf = aeron_config_parse_size64(
AERON_SOCKET_SO_SNDBUF_ENV_VAR,
getenv(AERON_SOCKET_SO_SNDBUF_ENV_VAR),
_context->socket_sndbuf,
0,
Expand All @@ -431,55 +492,64 @@ int aeron_driver_context_init(aeron_driver_context_t **context)
1,
INT32_MAX);

_context->status_message_timeout_ns = aeron_config_parse_uint64(
_context->status_message_timeout_ns = aeron_config_parse_duration_ns(
AERON_RCV_STATUS_MESSAGE_TIMEOUT_ENV_VAR,
getenv(AERON_RCV_STATUS_MESSAGE_TIMEOUT_ENV_VAR),
_context->status_message_timeout_ns,
1000,
INT64_MAX);

_context->image_liveness_timeout_ns = aeron_config_parse_uint64(
_context->image_liveness_timeout_ns = aeron_config_parse_duration_ns(
AERON_IMAGE_LIVENESS_TIMEOUT_ENV_VAR,
getenv(AERON_IMAGE_LIVENESS_TIMEOUT_ENV_VAR),
_context->image_liveness_timeout_ns,
1000,
INT64_MAX);

_context->initial_window_length = aeron_config_parse_uint64(
_context->initial_window_length = aeron_config_parse_size64(
AERON_RCV_INITIAL_WINDOW_LENGTH_ENV_VAR,
getenv(AERON_RCV_INITIAL_WINDOW_LENGTH_ENV_VAR),
_context->initial_window_length,
256,
INT32_MAX);

_context->loss_report_length = aeron_config_parse_uint64(
_context->loss_report_length = aeron_config_parse_size64(
AERON_LOSS_REPORT_BUFFER_LENGTH_ENV_VAR,
getenv(AERON_LOSS_REPORT_BUFFER_LENGTH_ENV_VAR),
_context->loss_report_length,
1024,
INT32_MAX);

_context->file_page_size = aeron_config_parse_uint64(
_context->file_page_size = aeron_config_parse_size64(
AERON_FILE_PAGE_SIZE_ENV_VAR,
getenv(AERON_FILE_PAGE_SIZE_ENV_VAR),
_context->file_page_size,
4 * 1024,
INT32_MAX);

_context->publication_unblock_timeout_ns = aeron_config_parse_uint64(
_context->publication_unblock_timeout_ns = aeron_config_parse_duration_ns(
AERON_PUBLICATION_UNBLOCK_TIMEOUT_ENV_VAR,
getenv(AERON_PUBLICATION_UNBLOCK_TIMEOUT_ENV_VAR),
_context->publication_unblock_timeout_ns,
1000,
INT64_MAX);

_context->publication_connection_timeout_ns = aeron_config_parse_uint64(
_context->publication_connection_timeout_ns = aeron_config_parse_duration_ns(
AERON_PUBLICATION_CONNECTION_TIMEOUT_ENV_VAR,
getenv(AERON_PUBLICATION_CONNECTION_TIMEOUT_ENV_VAR),
_context->publication_connection_timeout_ns,
1000,
INT64_MAX);

_context->timer_interval_ns = aeron_config_parse_uint64(
_context->timer_interval_ns = aeron_config_parse_duration_ns(
AERON_TIMER_INTERVAL_ENV_VAR,
getenv(AERON_TIMER_INTERVAL_ENV_VAR),
_context->timer_interval_ns,
1000,
INT64_MAX);

_context->counter_free_to_reuse_ns = aeron_config_parse_uint64(
_context->counter_free_to_reuse_ns = aeron_config_parse_duration_ns(
AERON_COUNTERS_FREE_TO_REUSE_TIMEOUT_ENV_VAR,
getenv(AERON_COUNTERS_FREE_TO_REUSE_TIMEOUT_ENV_VAR),
_context->counter_free_to_reuse_ns,
0,
Expand Down Expand Up @@ -641,7 +711,6 @@ bool aeron_is_driver_active_with_cnc(
else
{
int64_t timestamp = aeron_mpsc_rb_consumer_heartbeat_time_value(&rb);

int64_t diff = now - timestamp;

snprintf(buffer, sizeof(buffer) - 1, "INFO: Aeron toDriver consumer heartbeat is %" PRId64 " ms old", diff);
Expand All @@ -665,7 +734,7 @@ bool aeron_is_driver_active(const char *dirname, int64_t timeout, int64_t now, a

if (stat(dirname, &sb) == 0 && S_ISDIR(sb.st_mode))
{
aeron_mapped_file_t cnc_map = {NULL, 0};
aeron_mapped_file_t cnc_map = { NULL, 0 };

snprintf(buffer, sizeof(buffer) - 1, "INFO: Aeron directory %s exists", dirname);
log_func(buffer);
Expand Down

0 comments on commit 0309b09

Please # to comment.