Skip to content
This repository has been archived by the owner on May 4, 2018. It is now read-only.

Commit

Permalink
unix: try to write immediately in uv_udp_send
Browse files Browse the repository at this point in the history
  • Loading branch information
saghul committed Jul 8, 2014
1 parent bf6e90f commit 4189122
Showing 1 changed file with 74 additions and 95 deletions.
169 changes: 74 additions & 95 deletions src/unix/udp.c
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,9 @@


static void uv__udp_run_completed(uv_udp_t* handle);
static void uv__udp_run_pending(uv_udp_t* handle);
static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
static void uv__udp_recvmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
static void uv__udp_sendmsg(uv_loop_t* loop, uv__io_t* w, unsigned int revents);
static void uv__udp_recvmsg(uv_udp_t* handle);
static void uv__udp_sendmsg(uv_udp_t* handle);
static int uv__udp_maybe_deferred_bind(uv_udp_t* handle,
int domain,
unsigned int flags);
Expand All @@ -65,25 +64,19 @@ void uv__udp_finish_close(uv_udp_t* handle) {
assert(!uv__io_active(&handle->io_watcher, UV__POLLIN | UV__POLLOUT));
assert(handle->io_watcher.fd == -1);

uv__udp_run_completed(handle);

while (!QUEUE_EMPTY(&handle->write_queue)) {
q = QUEUE_HEAD(&handle->write_queue);
QUEUE_REMOVE(q);

req = QUEUE_DATA(q, uv_udp_send_t, queue);
uv__req_unregister(handle->loop, req);

if (req->bufs != req->bufsml)
free(req->bufs);
req->bufs = NULL;

if (req->send_cb != NULL)
req->send_cb(req, -ECANCELED);
req->status = -ECANCELED;
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
}

handle->send_queue_size = 0;
handle->send_queue_count = 0;
uv__udp_run_completed(handle);

assert(handle->send_queue_size == 0);
assert(handle->send_queue_count == 0);

/* Now tear down the handle. */
handle->recv_cb = NULL;
Expand All @@ -92,52 +85,6 @@ void uv__udp_finish_close(uv_udp_t* handle) {
}


static void uv__udp_run_pending(uv_udp_t* handle) {
uv_udp_send_t* req;
QUEUE* q;
struct msghdr h;
ssize_t size;

while (!QUEUE_EMPTY(&handle->write_queue)) {
q = QUEUE_HEAD(&handle->write_queue);
assert(q != NULL);

req = QUEUE_DATA(q, uv_udp_send_t, queue);
assert(req != NULL);

memset(&h, 0, sizeof h);
h.msg_name = &req->addr;
h.msg_namelen = (req->addr.ss_family == AF_INET6 ?
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
h.msg_iov = (struct iovec*) req->bufs;
h.msg_iovlen = req->nbufs;

do {
size = sendmsg(handle->io_watcher.fd, &h, 0);
}
while (size == -1 && errno == EINTR);

/* TODO try to write once or twice more in the
* hope that the socket becomes readable again?
*/
if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
break;

req->status = (size == -1 ? -errno : size);

/* Sending a datagram is an atomic operation: either all data
* is written or nothing is (and EMSGSIZE is raised). That is
* why we don't handle partial writes. Just pop the request
* off the write queue and onto the completed queue, done.
*/
handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
handle->send_queue_count--;
QUEUE_REMOVE(&req->queue);
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
}
}


static void uv__udp_run_completed(uv_udp_t* handle) {
uv_udp_send_t* req;
QUEUE* q;
Expand All @@ -149,6 +96,9 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
req = QUEUE_DATA(q, uv_udp_send_t, queue);
uv__req_unregister(handle->loop, req);

handle->send_queue_size -= uv__count_bufs(req->bufs, req->nbufs);
handle->send_queue_count--;

if (req->bufs != req->bufsml)
free(req->bufs);
req->bufs = NULL;
Expand All @@ -164,33 +114,40 @@ static void uv__udp_run_completed(uv_udp_t* handle) {
else
req->send_cb(req, req->status);
}

if (QUEUE_EMPTY(&handle->write_queue)) {
/* Pending queue and completion queue empty, stop watcher. */
uv__io_stop(handle->loop, &handle->io_watcher, UV__POLLOUT);
if (!uv__io_active(&handle->io_watcher, UV__POLLIN))
uv__handle_stop(handle);
}
}


static void uv__udp_io(uv_loop_t* loop, uv__io_t* w, unsigned int revents) {
uv_udp_t* handle;

handle = container_of(w, uv_udp_t, io_watcher);
assert(handle->type == UV_UDP);

if (revents & UV__POLLIN)
uv__udp_recvmsg(loop, w, revents);
uv__udp_recvmsg(handle);

if (revents & UV__POLLOUT)
uv__udp_sendmsg(loop, w, revents);
if (revents & UV__POLLOUT) {
uv__udp_sendmsg(handle);
uv__udp_run_completed(handle);
}
}


static void uv__udp_recvmsg(uv_loop_t* loop,
uv__io_t* w,
unsigned int revents) {
static void uv__udp_recvmsg(uv_udp_t* handle) {
struct sockaddr_storage peer;
struct msghdr h;
uv_udp_t* handle;
ssize_t nread;
uv_buf_t buf;
int flags;
int count;

handle = container_of(w, uv_udp_t, io_watcher);
assert(handle->type == UV_UDP);
assert(revents & UV__POLLIN);

assert(handle->recv_cb != NULL);
assert(handle->alloc_cb != NULL);

Expand Down Expand Up @@ -247,34 +204,46 @@ static void uv__udp_recvmsg(uv_loop_t* loop,
}


static void uv__udp_sendmsg(uv_loop_t* loop,
uv__io_t* w,
unsigned int revents) {
uv_udp_t* handle;

handle = container_of(w, uv_udp_t, io_watcher);
assert(handle->type == UV_UDP);
assert(revents & UV__POLLOUT);
static void uv__udp_sendmsg(uv_udp_t* handle) {
uv_udp_send_t* req;
QUEUE* q;
struct msghdr h;
ssize_t size;

assert(!QUEUE_EMPTY(&handle->write_queue)
|| !QUEUE_EMPTY(&handle->write_completed_queue));

/* Write out pending data first. */
uv__udp_run_pending(handle);
while (!QUEUE_EMPTY(&handle->write_queue)) {
q = QUEUE_HEAD(&handle->write_queue);
assert(q != NULL);

/* Drain 'request completed' queue. */
uv__udp_run_completed(handle);
req = QUEUE_DATA(q, uv_udp_send_t, queue);
assert(req != NULL);

if (!QUEUE_EMPTY(&handle->write_completed_queue)) {
/* Schedule completion callbacks. */
uv__io_feed(handle->loop, &handle->io_watcher);
}
else if (QUEUE_EMPTY(&handle->write_queue)) {
/* Pending queue and completion queue empty, stop watcher. */
uv__io_stop(loop, &handle->io_watcher, UV__POLLOUT);
memset(&h, 0, sizeof h);
h.msg_name = &req->addr;
h.msg_namelen = (req->addr.ss_family == AF_INET6 ?
sizeof(struct sockaddr_in6) : sizeof(struct sockaddr_in));
h.msg_iov = (struct iovec*) req->bufs;
h.msg_iovlen = req->nbufs;

if (!uv__io_active(&handle->io_watcher, UV__POLLIN))
uv__handle_stop(handle);
do {
size = sendmsg(handle->io_watcher.fd, &h, 0);
} while (size == -1 && errno == EINTR);

if (size == -1 && (errno == EAGAIN || errno == EWOULDBLOCK))
break;

req->status = (size == -1 ? -errno : size);

/* Sending a datagram is an atomic operation: either all data
* is written or nothing is (and EMSGSIZE is raised). That is
* why we don't handle partial writes. Just pop the request
* off the write queue and onto the completed queue, done.
*/
QUEUE_REMOVE(&req->queue);
QUEUE_INSERT_TAIL(&handle->write_completed_queue, &req->queue);
uv__io_feed(handle->loop, &handle->io_watcher);
}
}

Expand Down Expand Up @@ -415,15 +384,21 @@ int uv__udp_send(uv_udp_send_t* req,
unsigned int addrlen,
uv_udp_send_cb send_cb) {
int err;
int empty_queue;

assert(nbufs > 0);

err = uv__udp_maybe_deferred_bind(handle, addr->sa_family, 0);
if (err)
return err;

uv__req_init(handle->loop, req, UV_UDP_SEND);
/* It's legal for send_queue_count > 0 even when the write_queue is empty;
* it means there are error-state requests in the write_completed_queue that
* will touch up send_queue_size/count later.
*/
empty_queue = (handle->send_queue_count == 0);

uv__req_init(handle->loop, req, UV_UDP_SEND);
assert(addrlen <= sizeof(req->addr));
memcpy(&req->addr, addr, addrlen);
req->send_cb = send_cb;
Expand All @@ -441,9 +416,13 @@ int uv__udp_send(uv_udp_send_t* req,
handle->send_queue_size += uv__count_bufs(req->bufs, req->nbufs);
handle->send_queue_count++;
QUEUE_INSERT_TAIL(&handle->write_queue, &req->queue);
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);
uv__handle_start(handle);

if (empty_queue)
uv__udp_sendmsg(handle);
else
uv__io_start(handle->loop, &handle->io_watcher, UV__POLLOUT);

return 0;
}

Expand Down

0 comments on commit 4189122

Please # to comment.