diff --git a/aio_generic.go b/aio_generic.go index d7f4857..a97beee 100644 --- a/aio_generic.go +++ b/aio_generic.go @@ -45,6 +45,8 @@ const ( OpRead OpType = iota // OpWrite means the aiocb is a write operation OpWrite + // OpDetach detach the connection from a Watcher + OpDetach // internal operation to delete an related resource opDelete ) diff --git a/aio_test.go b/aio_test.go index adeaf34..7c19182 100644 --- a/aio_test.go +++ b/aio_test.go @@ -571,6 +571,9 @@ func Test4kTiny(t *testing.T) { } func testParallel(t *testing.T, par int, msgsize int) { + if par > 1024 { + par = 1024 + } t.Log("testing concurrent:", par, "connections") ln := echoServer(t, msgsize) defer ln.Close() @@ -645,6 +648,9 @@ func Test10kCompleteSwapBuffer(t *testing.T) { } func testParallelRandomInternal(t *testing.T, par int, msgsize int, allswap bool) { + if par > 1024 { + par = 1024 + } t.Log("testing concurrent:", par, "connections") ln := echoServer(t, msgsize) defer ln.Close() @@ -735,6 +741,9 @@ func TestDeadline8k(t *testing.T) { } func testDeadline(t *testing.T, par int) { + if par > 1024 { + par = 1024 + } t.Log("testing concurrent:", par, "unresponsive connections") ln := echoServer(t, 128) defer ln.Close() diff --git a/examples/echo-server/main.go b/examples/echo-server/main.go index 5769ec7..f099143 100644 --- a/examples/echo-server/main.go +++ b/examples/echo-server/main.go @@ -4,7 +4,7 @@ import ( "log" "net" - "github.com/xtaci/gaio" + "github.com/liukun/gaio" ) // this goroutine will wait for all io events, and sents back everything it received diff --git a/examples/push-server/main.go b/examples/push-server/main.go index be99b60..8fbb678 100644 --- a/examples/push-server/main.go +++ b/examples/push-server/main.go @@ -6,7 +6,7 @@ import ( "net" "time" - "github.com/xtaci/gaio" + "github.com/liukun/gaio" ) func main() { diff --git a/go.mod b/go.mod index 03e8063..fef8b58 100644 --- a/go.mod +++ b/go.mod @@ -1,3 +1,3 @@ -module github.com/xtaci/gaio +module github.com/liukun/gaio go 1.13 diff --git a/watcher.go b/watcher.go index 14bef6d..be6883f 100644 --- a/watcher.go +++ b/watcher.go @@ -211,8 +211,14 @@ func (w *watcher) WriteTimeout(ctx interface{}, conn net.Conn, buf []byte, deadl return w.aioCreate(ctx, OpWrite, conn, buf, deadline, false) } +// Detach let the watcher to release resources related to this conn immediately, but not close the conn. +// Then deliver the duplicated conn's `fd` in `size` field as a event. +func (w *watcher) Detach(conn net.Conn) error { + return w.aioCreate(nil, OpDetach, conn, nil, zeroTime, false) +} + // Free let the watcher to release resources related to this conn immediately, -// like socket file descriptors. +// like socket file descriptors, and close the connection. func (w *watcher) Free(conn net.Conn) error { return w.aioCreate(nil, opDelete, conn, nil, zeroTime, false) } @@ -336,7 +342,7 @@ func (w *watcher) tryWrite(fd int, pcb *aiocb) bool { } // release connection related resources -func (w *watcher) releaseConn(ident int) { +func (w *watcher) releaseConn(ident int, close bool) { if desc, ok := w.descs[ident]; ok { // delete from heap for e := desc.readers.Front(); e != nil; e = e.Next() { @@ -356,7 +362,9 @@ func (w *watcher) releaseConn(ident int) { delete(w.descs, ident) delete(w.connIdents, desc.ptr) // close socket file descriptor duplicated from net.Conn - syscall.Close(ident) + if close { + _ = syscall.Close(ident) + } } } @@ -393,7 +401,7 @@ func (w *watcher) loop() { // defer function to release all resources defer func() { for ident := range w.descs { - w.releaseConn(ident) + w.releaseConn(ident, true) } }() @@ -439,7 +447,7 @@ func (w *watcher) loop() { if ident, ok := w.connIdents[ptr]; ok { // since it's gc-ed, queue is impossible to hold net.Conn // we don't have to send to chIOCompletion,just release here - w.releaseConn(ident) + w.releaseConn(ident, true) } w.gc[i] = nil } @@ -457,8 +465,13 @@ func (w *watcher) handlePending(pending []*aiocb) { for _, pcb := range pending { ident, ok := w.connIdents[pcb.ptr] // resource releasing operation - if pcb.op == opDelete && ok { - w.releaseConn(ident) + if (pcb.op == opDelete || pcb.op == OpDetach) && ok { + w.releaseConn(ident, pcb.op == opDelete) + if pcb.op == OpDetach { + // pass the `fd` in `size` field + pcb.size = ident + w.deliver(pcb) + } continue }