Skip to content

Commit

Permalink
Port udp support to macOS
Browse files Browse the repository at this point in the history
  • Loading branch information
Alex Hultman authored and Alex Hultman committed Jan 16, 2022
1 parent 39d2191 commit eab7df5
Show file tree
Hide file tree
Showing 4 changed files with 120 additions and 30 deletions.
23 changes: 13 additions & 10 deletions examples/udp_benchmark.c
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
* We should remove this and replace it with bsd_addr_t and a builder function */
#include <netinet/in.h>

struct us_udp_packet_buffer_t *send_buf;

void on_wakeup(struct us_loop_t *loop) {

Expand All @@ -18,9 +19,6 @@ void on_pre(struct us_loop_t *loop) {

}

struct us_udp_packet_buffer_t *buf;
struct us_udp_packet_buffer_t *send_buf;

void on_post(struct us_loop_t *loop) {
// send whatever in buffer here

Expand All @@ -34,11 +32,16 @@ void timer_cb(struct us_timer_t *timer) {
messages = 0;
}

void on_server_read(struct us_udp_socket_t *s) {
/* Called whenever you can write more datagrams after a failure to write */
void on_server_drain(struct us_udp_socket_t *s) {

int packets = us_udp_socket_receive(s, buf);
//printf("Packets: %d\n", packets);
}

/* Called whenever there are received datagrams for the app to consume */
void on_server_data(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t *buf, int packets) {

// you could theoretically modify the receive buffer and just pass it to send

for (int i = 0; i < packets; i++) {
// payload, length, peer addr (behöver inte veta längd bara void), local addr (vet redan), cong
char *payload = us_udp_packet_buffer_payload(buf, i);
Expand All @@ -56,17 +59,18 @@ void on_server_read(struct us_udp_socket_t *s) {

int main() {
/* Allocate per thread, UDP packet buffers */
buf = us_create_udp_packet_buffer();
struct us_udp_packet_buffer_t *receive_buf = us_create_udp_packet_buffer();
/* We also want a send buffer we can assemble while iterating the read buffer */
send_buf = us_create_udp_packet_buffer();

/* Create the event loop */
struct us_loop_t *loop = us_create_loop(0, on_wakeup, on_pre, on_post, 0);

/* Create two UDP sockets and bind them to their respective ports */
struct us_udp_socket_t *server = us_create_udp_socket(loop, on_server_read, 5678);
struct us_udp_socket_t *server = us_create_udp_socket(loop, receive_buf, on_server_data, on_server_drain, "127.0.0.1", 5678);
printf("Server socket: %p\n", server);

struct us_udp_socket_t *client = us_create_udp_socket(loop, on_server_read, 5679);
struct us_udp_socket_t *client = us_create_udp_socket(loop, receive_buf, on_server_data, on_server_drain, "127.0.0.1", 5679);

/* Send first packet from client to server */
struct sockaddr_storage storage;
Expand All @@ -80,7 +84,6 @@ int main() {
us_udp_buffer_set_packet_payload(send_buf, i, 0, "Hello UDP!", 10, &storage);
}


int sent = us_udp_socket_send(client, send_buf, 100); // buffer should know how many it holds!
printf("Sent: %d\n", sent);

Expand Down
63 changes: 58 additions & 5 deletions src/bsd.c
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,9 @@
/* Internal structure of packet buffer */
struct us_internal_udp_packet_buffer {
#if defined(_WIN32) || defined(__APPLE__)

char *buf[LIBUS_UDP_MAX_NUM];
size_t len[LIBUS_UDP_MAX_NUM];
struct sockaddr_storage addr[LIBUS_UDP_MAX_NUM];
#else
struct mmsghdr msgvec[LIBUS_UDP_MAX_NUM];
struct iovec iov[LIBUS_UDP_MAX_NUM];
Expand All @@ -51,46 +53,91 @@ struct us_internal_udp_packet_buffer {
int bsd_sendmmsg(LIBUS_SOCKET_DESCRIPTOR fd, void *msgvec, unsigned int vlen, int flags) {
#if defined(_WIN32) || defined(__APPLE__)

struct us_internal_udp_packet_buffer *packet_buffer = (struct us_internal_udp_packet_buffer *) msgvec;

/* Let's just use sendto here */
/* Winsock does not have sendmsg, while macOS has, however, we simply use sendto since both macOS and Winsock has it.
* Besides, you should use Linux either way to get best performance with the sendmmsg */


// while we do not get error, send next

for (int i = 0; i < LIBUS_UDP_MAX_NUM; i++) {
// need to support ipv6 addresses also!
int ret = sendto(fd, packet_buffer->buf[i], packet_buffer->len[i], flags, (struct sockaddr *)&packet_buffer->addr[i], sizeof(struct sockaddr_in));

if (ret == -1) {
// if we fail then we need to buffer up, no that's not our problem
// we do need to register poll out though and have a callback for it
return i;
}

//printf("sendto: %d\n", ret);
}

return LIBUS_UDP_MAX_NUM; // one message
#else
return sendmmsg(fd, (struct mmsghdr *)msgvec, vlen, flags);
return sendmmsg(fd, (struct mmsghdr *)msgvec, vlen, flags | MSG_NOSIGNAL);
#endif
}

int bsd_recvmmsg(LIBUS_SOCKET_DESCRIPTOR fd, void *msgvec, unsigned int vlen, int flags, void *timeout) {
#if defined(_WIN32) || defined(__APPLE__)
struct us_internal_udp_packet_buffer *packet_buffer = (struct us_internal_udp_packet_buffer *) msgvec;


for (int i = 0; i < LIBUS_UDP_MAX_NUM; i++) {
socklen_t addr_len = sizeof(struct sockaddr_storage);
int ret = recvfrom(fd, packet_buffer->buf[i], LIBUS_UDP_MAX_SIZE, flags, (struct sockaddr *)&packet_buffer->addr[i], &addr_len);

if (ret == -1) {
return i;
}

packet_buffer->len[i] = ret;
}

return LIBUS_UDP_MAX_NUM;
#else
return recvmmsg(fd, (struct mmsghdr *)msgvec, vlen, flags, 0);
#endif
}

char *bsd_udp_packet_buffer_peer(void *msgvec, int index) {
#if defined(_WIN32) || defined(__APPLE__)

struct us_internal_udp_packet_buffer *packet_buffer = (struct us_internal_udp_packet_buffer *) msgvec;
return (char *)&packet_buffer->addr[index];
#else
return ((struct mmsghdr *) msgvec)[index].msg_hdr.msg_name;
#endif
}

char *bsd_udp_packet_buffer_payload(void *msgvec, int index) {
#if defined(_WIN32) || defined(__APPLE__)

struct us_internal_udp_packet_buffer *packet_buffer = (struct us_internal_udp_packet_buffer *) msgvec;
return packet_buffer->buf[index];
#else
return ((struct mmsghdr *) msgvec)[index].msg_hdr.msg_iov[0].iov_base;
#endif
}

int bsd_udp_packet_buffer_payload_length(void *msgvec, int index) {
#if defined(_WIN32) || defined(__APPLE__)

struct us_internal_udp_packet_buffer *packet_buffer = (struct us_internal_udp_packet_buffer *) msgvec;
return packet_buffer->len[index];
#else
return ((struct mmsghdr *) msgvec)[index].msg_len;
#endif
}

void bsd_udp_buffer_set_packet_payload(struct us_udp_packet_buffer_t *send_buf, int index, int offset, void *payload, int length, void *peer_addr) {
#if defined(_WIN32) || defined(__APPLE__)
struct us_internal_udp_packet_buffer *packet_buffer = (struct us_internal_udp_packet_buffer *) send_buf;

memcpy(packet_buffer->buf[index], payload, length);
memcpy(&packet_buffer->addr[index], peer_addr, sizeof(struct sockaddr_storage));

packet_buffer->len[index] = length;
#else
//printf("length: %d, offset: %d\n", length, offset);

Expand All @@ -114,7 +161,13 @@ void bsd_udp_buffer_set_packet_payload(struct us_udp_packet_buffer_t *send_buf,
* Therefore a udp_packet_buffer_t will be 64 MB in size (64kb * 1024). */
void *bsd_create_udp_packet_buffer() {
#if defined(_WIN32) || defined(__APPLE__)
struct us_internal_udp_packet_buffer *b = malloc(sizeof(struct us_internal_udp_packet_buffer) + LIBUS_UDP_MAX_SIZE * LIBUS_UDP_MAX_NUM);

for (int i = 0; i < LIBUS_UDP_MAX_NUM; i++) {
b->buf[i] = ((char *) b) + sizeof(struct us_internal_udp_packet_buffer) + LIBUS_UDP_MAX_SIZE * i;
}

return (struct us_udp_packet_buffer_t *) b;
#else
/* Allocate 64kb times 1024 */
struct us_internal_udp_packet_buffer *b = malloc(sizeof(struct us_internal_udp_packet_buffer) + LIBUS_UDP_MAX_SIZE * LIBUS_UDP_MAX_NUM);
Expand Down
6 changes: 5 additions & 1 deletion src/libusockets.h
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ WIN32_EXPORT struct us_udp_packet_buffer_t *us_create_udp_packet_buffer();
/* Creates a (heavy-weight) UDP socket with a user space ring buffer. Again, this one is heavy weight and
* shoud be reused. One entire QUIC server can be implemented using only one single UDP socket so weight
* is not a concern as is the case for TCP sockets which are 1-to-1 with TCP connections. */
WIN32_EXPORT struct us_udp_socket_t *us_create_udp_socket(struct us_loop_t *loop, void (*read_cb)(struct us_udp_socket_t *), unsigned short port);
//WIN32_EXPORT struct us_udp_socket_t *us_create_udp_socket(struct us_loop_t *loop, void (*read_cb)(struct us_udp_socket_t *), unsigned short port);

//WIN32_EXPORT struct us_udp_socket_t *us_create_udp_socket(struct us_loop_t *loop, void (*data_cb)(struct us_udp_socket_t *, struct us_udp_packet_buffer_t *, int), void (*drain_cb)(struct us_udp_socket_t *), char *host, unsigned short port);

WIN32_EXPORT struct us_udp_socket_t *us_create_udp_socket(struct us_loop_t *loop, struct us_udp_packet_buffer_t *buf, void (*data_cb)(struct us_udp_socket_t *, struct us_udp_packet_buffer_t *, int), void (*drain_cb)(struct us_udp_socket_t *), char *host, unsigned short port);

/* Binds the UDP socket to an interface and port */
WIN32_EXPORT int us_udp_socket_bind(struct us_udp_socket_t *s, const char *hostname, unsigned int port);
Expand Down
58 changes: 44 additions & 14 deletions src/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,12 @@ WIN32_EXPORT int us_udp_packet_buffer_payload_length(struct us_udp_packet_buffer
return bsd_udp_packet_buffer_payload_length(buf, index);
}

// what should we return? number of sent datagrams?
WIN32_EXPORT int us_udp_socket_send(struct us_udp_socket_t *s, struct us_udp_packet_buffer_t *buf, int num) {
int fd = us_poll_fd((struct us_poll_t *) s);

// we need to poll out if we failed

return bsd_sendmmsg(fd, buf, num, 0);
}

Expand All @@ -56,32 +60,58 @@ WIN32_EXPORT struct us_udp_packet_buffer_t *us_create_udp_packet_buffer() {
return (struct us_udp_packet_buffer_t *) bsd_create_udp_packet_buffer();
}

WIN32_EXPORT struct us_udp_socket_t *us_create_udp_socket(struct us_loop_t *loop, void (*read_cb)(struct us_udp_socket_t *), unsigned short port) {
struct us_internal_udp_t {
struct us_internal_callback_t cb;
struct us_udp_packet_buffer_t *receive_buf;
void (*data_cb)(struct us_udp_socket_t *, struct us_udp_packet_buffer_t *, int);
void (*drain_cb)(struct us_udp_socket_t *);
};

/* Internal wrapper, move from here */
void internal_on_udp_read(struct us_udp_socket_t *s) {

// lookup receive buffer and callback here
struct us_internal_udp_t *udp = (struct us_internal_udp_t *) s;

int packets = us_udp_socket_receive(s, udp->receive_buf);
//printf("Packets: %d\n", packets);

// we need to get the socket data and lookup its callback here


udp->data_cb(s, udp->receive_buf, packets);
}

WIN32_EXPORT struct us_udp_socket_t *us_create_udp_socket(struct us_loop_t *loop, struct us_udp_packet_buffer_t *buf, void (*data_cb)(struct us_udp_socket_t *, struct us_udp_packet_buffer_t *, int), void (*drain_cb)(struct us_udp_socket_t *), char *host, unsigned short port) {

LIBUS_SOCKET_DESCRIPTOR fd = bsd_create_udp_socket("127.0.0.1", port);
LIBUS_SOCKET_DESCRIPTOR fd = bsd_create_udp_socket(host, port);
if (fd == LIBUS_SOCKET_ERROR) {
return 0;
}

/* If buf is 0 then create one here */
if (!buf) {
buf = us_create_udp_packet_buffer();
}

int ext_size = 0;
int fallthrough = 0;

struct us_poll_t *p = us_create_poll(loop, fallthrough, sizeof(struct us_internal_callback_t) + ext_size);
struct us_poll_t *p = us_create_poll(loop, fallthrough, sizeof(struct us_internal_udp_t) + ext_size);
us_poll_init(p, fd, POLL_TYPE_CALLBACK);

struct us_internal_callback_t *cb = (struct us_internal_callback_t *) p;
cb->loop = loop;
cb->cb_expects_the_loop = 0;
cb->leave_poll_ready = 1;
struct us_internal_udp_t *cb = (struct us_internal_udp_t *) p;
cb->cb.loop = loop;
cb->cb.cb_expects_the_loop = 0;
cb->cb.leave_poll_ready = 1;

cb->cb = (void (*)(struct us_internal_callback_t *)) read_cb;
cb->data_cb = data_cb;
cb->receive_buf = buf;
cb->drain_cb = drain_cb;

us_poll_start((struct us_poll_t *) cb, cb->loop, LIBUS_SOCKET_READABLE);
cb->cb.cb = (void (*)(struct us_internal_callback_t *)) internal_on_udp_read;

us_poll_start((struct us_poll_t *) cb, cb->cb.loop, LIBUS_SOCKET_READABLE);

return (struct us_udp_socket_t *) cb;
}

// not in use?
WIN32_EXPORT int us_udp_socket_bind(struct us_udp_socket_t *s, const char *hostname, unsigned int port) {

}

0 comments on commit eab7df5

Please # to comment.