Skip to content

Commit

Permalink
Merge pull request #505 from lf-lang/refactor-only-comm-type
Browse files Browse the repository at this point in the history
Refactoring of socket related functions to a separate socket_common.c
  • Loading branch information
edwardalee authored Dec 27, 2024
2 parents 4cad9fb + 106c827 commit 03137a0
Show file tree
Hide file tree
Showing 11 changed files with 740 additions and 749 deletions.
1 change: 1 addition & 0 deletions core/federated/RTI/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ add_library(${RTI_LIB} STATIC
${CoreLib}/tag.c
${CoreLib}/clock.c
${CoreLib}/federated/network/net_util.c
${CoreLib}/federated/network/socket_common.c
${CoreLib}/utils/pqueue_base.c
${CoreLib}/utils/pqueue_tag.c
${CoreLib}/utils/pqueue.c
Expand Down
182 changes: 21 additions & 161 deletions core/federated/RTI/rti_remote.c
Original file line number Diff line number Diff line change
Expand Up @@ -52,132 +52,6 @@ extern int lf_critical_section_enter(environment_t* env) { return lf_mutex_lock(

extern int lf_critical_section_exit(environment_t* env) { return lf_mutex_unlock(&rti_mutex); }

/**
* Create a server and enable listening for socket connections.
* If the specified port if it is non-zero, it will attempt to acquire that port.
* If it fails, it will repeatedly attempt up to PORT_BIND_RETRY_LIMIT times with
* a delay of PORT_BIND_RETRY_INTERVAL in between. If the specified port is
* zero, then it will attempt to acquire DEFAULT_PORT first. If this fails, then it
* will repeatedly attempt up to PORT_BIND_RETRY_LIMIT times, incrementing the port
* number between attempts, with no delay between attempts. Once it has incremented
* the port number MAX_NUM_PORT_ADDRESSES times, it will cycle around and begin again
* with DEFAULT_PORT.
*
* @param port The port number to use or 0 to start trying at DEFAULT_PORT.
* @param socket_type The type of the socket for the server (TCP or UDP).
* @return The socket descriptor on which to accept connections.
*/
static int create_rti_server(uint16_t port, socket_type_t socket_type) {
// Timeout time for the communications of the server
struct timeval timeout_time = {.tv_sec = TCP_TIMEOUT_TIME / BILLION, .tv_usec = (TCP_TIMEOUT_TIME % BILLION) / 1000};
// Create an IPv4 socket for TCP (not UDP) communication over IP (0).
int socket_descriptor = -1;
if (socket_type == TCP) {
socket_descriptor = create_real_time_tcp_socket_errexit();
} else if (socket_type == UDP) {
socket_descriptor = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP);
// Set the appropriate timeout time
timeout_time =
(struct timeval){.tv_sec = UDP_TIMEOUT_TIME / BILLION, .tv_usec = (UDP_TIMEOUT_TIME % BILLION) / 1000};
}
if (socket_descriptor < 0) {
lf_print_error_system_failure("Failed to create RTI socket.");
}

// Set the option for this socket to reuse the same address
int true_variable = 1; // setsockopt() requires a reference to the value assigned to an option
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEADDR, &true_variable, sizeof(int32_t)) < 0) {
lf_print_error("RTI failed to set SO_REUSEADDR option on the socket: %s.", strerror(errno));
}
// Set the timeout on the socket so that read and write operations don't block for too long
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_RCVTIMEO, (const char*)&timeout_time, sizeof(timeout_time)) < 0) {
lf_print_error("RTI failed to set SO_RCVTIMEO option on the socket: %s.", strerror(errno));
}
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_SNDTIMEO, (const char*)&timeout_time, sizeof(timeout_time)) < 0) {
lf_print_error("RTI failed to set SO_SNDTIMEO option on the socket: %s.", strerror(errno));
}

/*
* The following used to permit reuse of a port that an RTI has previously
* used that has not been released. We no longer do this, and instead retry
* some number of times after waiting.
// SO_REUSEPORT (since Linux 3.9)
// Permits multiple AF_INET or AF_INET6 sockets to be bound to an
// identical socket address. This option must be set on each
// socket (including the first socket) prior to calling bind(2)
// on the socket. To prevent port hijacking, all of the
// processes binding to the same address must have the same
// effective UID. This option can be employed with both TCP and
// UDP sockets.
int reuse = 1;
#ifdef SO_REUSEPORT
if (setsockopt(socket_descriptor, SOL_SOCKET, SO_REUSEPORT,
(const char*)&reuse, sizeof(reuse)) < 0) {
perror("setsockopt(SO_REUSEPORT) failed");
}
#endif
*/

// Server file descriptor.
struct sockaddr_in server_fd;
// Zero out the server address structure.
bzero((char*)&server_fd, sizeof(server_fd));

uint16_t specified_port = port;
if (specified_port == 0)
port = DEFAULT_PORT;

server_fd.sin_family = AF_INET; // IPv4
server_fd.sin_addr.s_addr = INADDR_ANY; // All interfaces, 0.0.0.0.
// Convert the port number from host byte order to network byte order.
server_fd.sin_port = htons(port);

int result = bind(socket_descriptor, (struct sockaddr*)&server_fd, sizeof(server_fd));

// Try repeatedly to bind to a port. If no specific port is specified, then
// increment the port number each time.

int count = 1;
while (result != 0 && count++ < PORT_BIND_RETRY_LIMIT) {
if (specified_port == 0) {
lf_print_warning("RTI failed to get port %d.", port);
port++;
if (port >= DEFAULT_PORT + MAX_NUM_PORT_ADDRESSES)
port = DEFAULT_PORT;
lf_print_warning("RTI will try again with port %d.", port);
server_fd.sin_port = htons(port);
// Do not sleep.
} else {
lf_print("RTI failed to get port %d. Will try again.", port);
lf_sleep(PORT_BIND_RETRY_INTERVAL);
}
result = bind(socket_descriptor, (struct sockaddr*)&server_fd, sizeof(server_fd));
}
if (result != 0) {
lf_print_error_and_exit("Failed to bind the RTI socket. Port %d is not available. ", port);
}
char* type = "TCP";
if (socket_type == UDP) {
type = "UDP";
}
lf_print("RTI using %s port %d for federation %s.", type, port, rti_remote->federation_id);

if (socket_type == TCP) {
rti_remote->final_port_TCP = port;
// Enable listening for socket connections.
// The second argument is the maximum number of queued socket requests,
// which according to the Mac man page is limited to 128.
listen(socket_descriptor, 128);
} else if (socket_type == UDP) {
rti_remote->final_port_UDP = port;
// No need to listen on the UDP socket
}

return socket_descriptor;
}

void notify_tag_advance_grant(scheduling_node_t* e, tag_t tag) {
if (e->state == NOT_CONNECTED || lf_tag_compare(tag, e->last_granted) <= 0 ||
lf_tag_compare(tag, e->last_provisionally_granted) < 0) {
Expand Down Expand Up @@ -1171,7 +1045,7 @@ void send_reject(int* socket_id, unsigned char error_code) {
* @param client_fd The socket address.
* @return The federate ID for success or -1 for failure.
*/
static int32_t receive_and_check_fed_id_message(int* socket_id, struct sockaddr_in* client_fd) {
static int32_t receive_and_check_fed_id_message(int* socket_id) {
// Buffer for message ID, federate ID, and federation ID length.
size_t length = 1 + sizeof(uint16_t) + 1; // Message ID, federate ID, length of fedration ID.
unsigned char buffer[length];
Expand Down Expand Up @@ -1261,13 +1135,14 @@ static int32_t receive_and_check_fed_id_message(int* socket_id, struct sockaddr_
}
federate_info_t* fed = GET_FED_INFO(fed_id);
// The MSG_TYPE_FED_IDS message has the right federation ID.
// Assign the address information for federate.
// The IP address is stored here as an in_addr struct (in .server_ip_addr) that can be useful
// to create sockets and can be efficiently sent over the network.
// First, convert the sockaddr structure into a sockaddr_in that contains an internet address.
struct sockaddr_in* pV4_addr = client_fd;
// Then extract the internet address (which is in IPv4 format) and assign it as the federate's socket server
fed->server_ip_addr = pV4_addr->sin_addr;

// Get the peer address from the connected socket_id. Then assign it as the federate's socket server.
struct sockaddr_in peer_addr;
socklen_t addr_len = sizeof(peer_addr);
if (getpeername(*socket_id, (struct sockaddr*)&peer_addr, &addr_len) != 0) {
lf_print_error("RTI failed to get peer address.");
}
fed->server_ip_addr = peer_addr.sin_addr;

#if LOG_LEVEL >= LOG_LEVEL_DEBUG
// Create the human readable format and copy that into
Expand Down Expand Up @@ -1536,25 +1411,7 @@ static bool authenticate_federate(int* socket) {

void lf_connect_to_federates(int socket_descriptor) {
for (int i = 0; i < rti_remote->base.number_of_scheduling_nodes; i++) {
// Wait for an incoming connection request.
struct sockaddr client_fd;
uint32_t client_length = sizeof(client_fd);
// The following blocks until a federate connects.
int socket_id = -1;
while (1) {
socket_id = accept(rti_remote->socket_descriptor_TCP, &client_fd, &client_length);
if (socket_id >= 0) {
// Got a socket
break;
} else if (socket_id < 0 && (errno != EAGAIN || errno != EWOULDBLOCK)) {
lf_print_error_system_failure("RTI failed to accept the socket.");
} else {
// Try again
lf_print_warning("RTI failed to accept the socket. %s. Trying again.", strerror(errno));
continue;
}
}

int socket_id = accept_socket(rti_remote->socket_descriptor_TCP, -1);
// Wait for the first message from the federate when RTI -a option is on.
#ifdef __RTI_AUTH__
if (rti_remote->authentication_enabled) {
Expand All @@ -1572,7 +1429,7 @@ void lf_connect_to_federates(int socket_descriptor) {
#endif

// The first message from the federate should contain its ID and the federation ID.
int32_t fed_id = receive_and_check_fed_id_message(&socket_id, (struct sockaddr_in*)&client_fd);
int32_t fed_id = receive_and_check_fed_id_message(&socket_id);
if (fed_id >= 0 && socket_id >= 0 && receive_connection_information(&socket_id, (uint16_t)fed_id) &&
receive_udp_message_and_set_up_clock_sync(&socket_id, (uint16_t)fed_id)) {

Expand Down Expand Up @@ -1612,14 +1469,12 @@ void* respond_to_erroneous_connections(void* nothing) {
initialize_lf_thread_id();
while (true) {
// Wait for an incoming connection request.
struct sockaddr client_fd;
uint32_t client_length = sizeof(client_fd);
// The following will block until either a federate attempts to connect
// or close(rti->socket_descriptor_TCP) is called.
int socket_id = accept(rti_remote->socket_descriptor_TCP, &client_fd, &client_length);
if (socket_id < 0)
int socket_id = accept_socket(rti_remote->socket_descriptor_TCP, -1);
if (socket_id < 0) {
return NULL;

}
if (rti_remote->all_federates_exited) {
return NULL;
}
Expand Down Expand Up @@ -1653,12 +1508,17 @@ void initialize_federate(federate_info_t* fed, uint16_t id) {
int32_t start_rti_server(uint16_t port) {
_lf_initialize_clock();
// Create the TCP socket server
rti_remote->socket_descriptor_TCP = create_rti_server(port, TCP);
if (create_server(port, &rti_remote->socket_descriptor_TCP, &rti_remote->final_port_TCP, TCP, true)) {
lf_print_error_system_failure("RTI failed to create TCP server: %s.", strerror(errno));
};
lf_print("RTI: Listening for federates.");
// Create the UDP socket server
// Try to get the rti_remote->final_port_TCP + 1 port
if (rti_remote->clock_sync_global_status >= clock_sync_on) {
rti_remote->socket_descriptor_UDP = create_rti_server(rti_remote->final_port_TCP + 1, UDP);
if (create_server(rti_remote->final_port_TCP + 1, &rti_remote->socket_descriptor_UDP, &rti_remote->final_port_UDP,
UDP, true)) {
lf_print_error_system_failure("RTI failed to create UDP server: %s.", strerror(errno));
}
}
return rti_remote->socket_descriptor_TCP;
}
Expand Down
5 changes: 2 additions & 3 deletions core/federated/RTI/rti_remote.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,15 +31,14 @@

#include "lf_types.h"
#include "pqueue_tag.h"
#include "socket_common.h"

/** Time allowed for federates to reply to stop request. */
#define MAX_TIME_FOR_REPLY_TO_STOP_REQUEST SEC(30)

/////////////////////////////////////////////
//// Data structures

typedef enum socket_type_t { TCP, UDP } socket_type_t;

/**
* Information about a federate known to the RTI, including its runtime state,
* mode of execution, and connectivity with other federates.
Expand Down Expand Up @@ -413,4 +412,4 @@ int process_args(int argc, const char* argv[]);
void initialize_RTI(rti_remote_t* rti);

#endif // RTI_REMOTE_H
#endif // STANDALONE_RTI
#endif // STANDALONE_RTI
1 change: 1 addition & 0 deletions core/federated/clock-sync.c
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
#include "clock-sync.h"
#include "net_common.h"
#include "net_util.h"
#include "socket_common.h"
#include "util.h"

/** Offset calculated by the clock synchronization algorithm. */
Expand Down
Loading

0 comments on commit 03137a0

Please # to comment.