Skip to content

Commit

Permalink
Detach a connection but not close it.
Browse files Browse the repository at this point in the history
  • Loading branch information
liukun committed Jan 26, 2021
1 parent 97daad4 commit 0a9a8f6
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 10 deletions.
2 changes: 2 additions & 0 deletions aio_generic.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ const (
OpRead OpType = iota
// OpWrite means the aiocb is a write operation
OpWrite
// OpDetach detach the connection from a Watcher
OpDetach
// internal operation to delete an related resource
opDelete
)
Expand Down
9 changes: 9 additions & 0 deletions aio_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -571,6 +571,9 @@ func Test4kTiny(t *testing.T) {
}

func testParallel(t *testing.T, par int, msgsize int) {
if par > 1024 {
par = 1024
}
t.Log("testing concurrent:", par, "connections")
ln := echoServer(t, msgsize)
defer ln.Close()
Expand Down Expand Up @@ -645,6 +648,9 @@ func Test10kCompleteSwapBuffer(t *testing.T) {
}

func testParallelRandomInternal(t *testing.T, par int, msgsize int, allswap bool) {
if par > 1024 {
par = 1024
}
t.Log("testing concurrent:", par, "connections")
ln := echoServer(t, msgsize)
defer ln.Close()
Expand Down Expand Up @@ -735,6 +741,9 @@ func TestDeadline8k(t *testing.T) {
}

func testDeadline(t *testing.T, par int) {
if par > 1024 {
par = 1024
}
t.Log("testing concurrent:", par, "unresponsive connections")
ln := echoServer(t, 128)
defer ln.Close()
Expand Down
2 changes: 1 addition & 1 deletion examples/echo-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"log"
"net"

"github.com/xtaci/gaio"
"github.com/liukun/gaio"
)

// this goroutine will wait for all io events, and sents back everything it received
Expand Down
2 changes: 1 addition & 1 deletion examples/push-server/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import (
"net"
"time"

"github.com/xtaci/gaio"
"github.com/liukun/gaio"
)

func main() {
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
@@ -1,3 +1,3 @@
module github.com/xtaci/gaio
module github.com/liukun/gaio

go 1.13
27 changes: 20 additions & 7 deletions watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,14 @@ func (w *watcher) WriteTimeout(ctx interface{}, conn net.Conn, buf []byte, deadl
return w.aioCreate(ctx, OpWrite, conn, buf, deadline, false)
}

// Detach let the watcher to release resources related to this conn immediately, but not close the conn.
// Then deliver the duplicated conn's `fd` in `size` field as a event.
func (w *watcher) Detach(conn net.Conn) error {
return w.aioCreate(nil, OpDetach, conn, nil, zeroTime, false)
}

// Free let the watcher to release resources related to this conn immediately,
// like socket file descriptors.
// like socket file descriptors, and close the connection.
func (w *watcher) Free(conn net.Conn) error {
return w.aioCreate(nil, opDelete, conn, nil, zeroTime, false)
}
Expand Down Expand Up @@ -336,7 +342,7 @@ func (w *watcher) tryWrite(fd int, pcb *aiocb) bool {
}

// release connection related resources
func (w *watcher) releaseConn(ident int) {
func (w *watcher) releaseConn(ident int, close bool) {
if desc, ok := w.descs[ident]; ok {
// delete from heap
for e := desc.readers.Front(); e != nil; e = e.Next() {
Expand All @@ -356,7 +362,9 @@ func (w *watcher) releaseConn(ident int) {
delete(w.descs, ident)
delete(w.connIdents, desc.ptr)
// close socket file descriptor duplicated from net.Conn
syscall.Close(ident)
if close {
_ = syscall.Close(ident)
}
}
}

Expand Down Expand Up @@ -393,7 +401,7 @@ func (w *watcher) loop() {
// defer function to release all resources
defer func() {
for ident := range w.descs {
w.releaseConn(ident)
w.releaseConn(ident, true)
}
}()

Expand Down Expand Up @@ -439,7 +447,7 @@ func (w *watcher) loop() {
if ident, ok := w.connIdents[ptr]; ok {
// since it's gc-ed, queue is impossible to hold net.Conn
// we don't have to send to chIOCompletion,just release here
w.releaseConn(ident)
w.releaseConn(ident, true)
}
w.gc[i] = nil
}
Expand All @@ -457,8 +465,13 @@ func (w *watcher) handlePending(pending []*aiocb) {
for _, pcb := range pending {
ident, ok := w.connIdents[pcb.ptr]
// resource releasing operation
if pcb.op == opDelete && ok {
w.releaseConn(ident)
if (pcb.op == opDelete || pcb.op == OpDetach) && ok {
w.releaseConn(ident, pcb.op == opDelete)
if pcb.op == OpDetach {
// pass the `fd` in `size` field
pcb.size = ident
w.deliver(pcb)
}
continue
}

Expand Down

0 comments on commit 0a9a8f6

Please # to comment.