-
Notifications
You must be signed in to change notification settings - Fork 3
/
listener.go
145 lines (116 loc) · 3.09 KB
/
listener.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package quic
import (
"crypto/tls"
"net"
"sync/atomic"
"github.com/SentimensRG/ctx"
quic "github.com/lucas-clemente/quic-go"
"github.com/nanomsg/mangos"
"github.com/pkg/errors"
)
type lstnFactory func(string, *tls.Config, *quic.Config) (quic.Listener, error)
type listenDeleter interface {
DelListener(netlocator)
}
type refcntListener struct {
ctx.Doner
gc func()
refcnt int32
quic.Listener
}
func newRefCntListener(n netlocator, l quic.Listener, d listenDeleter) *refcntListener {
cq := make(chan struct{})
return &refcntListener{
Listener: l,
Doner: ctx.C(cq),
gc: func() {
close(cq)
d.DelListener(n)
},
}
}
func (r *refcntListener) Incr() *refcntListener {
atomic.AddInt32(&r.refcnt, 1)
return r
}
func (r *refcntListener) DecrAndClose() (err error) {
if i := atomic.AddInt32(&r.refcnt, -1); i == 0 {
err = r.Close()
r.gc()
} else if i < 0 {
panic("already closed")
}
return
}
// listenMux implements muxListener
type listenMux struct {
mux *multiplexer
factory lstnFactory
l *refcntListener
}
func newListenMux(m *multiplexer, fn lstnFactory) *listenMux {
return &listenMux{mux: m, factory: fn}
}
func (lm *listenMux) LoadListener(n netlocator, tc *tls.Config, qc *quic.Config) error {
lm.mux.Lock()
defer lm.mux.Unlock()
var ok bool
if lm.l, ok = lm.mux.GetListener(n); !ok {
// We don't have a listener for this netloc yet, so create it.
ql, err := lm.factory(n.Netloc(), tc, qc)
if err != nil {
return err
}
// Init refcnt to track the Listener's usage and clean up when we're done
lm.l = newRefCntListener(n, ql, lm.mux)
lm.mux.AddListener(n, lm.l)
}
lm.l.Incr()
return nil
}
func (lm listenMux) Accept(path string) (conn net.Conn, err error) {
chConn := make(chan net.Conn)
if err = lm.mux.RegisterPath(path, chConn); err != nil {
err = errors.Wrapf(err, "register path %s", path)
return
}
// Start the listen loop, which will produce sessions, accept their
// streams, and route them to the appropriate endpoint.
go ctx.FTick(lm.l, func() {
if sess, err := lm.l.Accept(); err == nil {
lm.mux.Lock()
defer lm.mux.Unlock()
sess := newRefCntSession(sess, lm.mux)
lm.mux.AddSession(sess.RemoteAddr(), sess.Incr())
go lm.mux.Serve(sess)
}
})
return <-chConn, nil
}
func (lm listenMux) Close(path string) error {
lm.mux.UnregisterPath(path)
return lm.l.DecrAndClose()
}
type listener struct {
netloc
*listenMux
opt *options
sock mangos.Socket
}
func (l *listener) Listen() error {
tc, qc := getQUICCfg(l.opt)
return errors.Wrap(l.LoadListener(l.netloc, tc, qc), "listen quic")
}
func (l listener) Accept() (mangos.Pipe, error) {
conn, err := l.listenMux.Accept(l.Path)
if err != nil {
return nil, errors.Wrap(err, "mux accept")
}
return mangos.NewConnPipe(conn, l.sock)
}
func (l listener) Close() error {
return l.listenMux.Close(l.Path)
}
func (l listener) GetOption(name string) (v interface{}, err error) { return l.opt.get(name) }
func (l listener) SetOption(name string, v interface{}) (err error) { return l.opt.set(name, v) }
func (l listener) Address() string { return l.URL.String() }