-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathtcp_connection.go
145 lines (127 loc) · 2.63 KB
/
tcp_connection.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
package kissnet
import (
"bytes"
"encoding/binary"
"io"
"net"
"sync"
"sync/atomic"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
)
const MsgHeaderMaxSize = 2
type Connection struct {
id int64
conn *net.TCPConn
exitSync sync.WaitGroup
cb *CallBack
isClose int32
sendCh chan *bytes.Buffer
lastPingTime int64
}
func NewConnection(conn *net.TCPConn, cb *CallBack) IConnection {
c := &Connection{
conn: conn,
isClose: 0,
cb: cb,
sendCh: make(chan *bytes.Buffer, 1024),
}
return c
}
func (c *Connection) getID() int64 {
return c.id
}
func (c *Connection) setID(id int64) {
c.id = id
}
func (c *Connection) IsClose() bool { return atomic.LoadInt32(&c.isClose) > 0 }
func (c *Connection) Close() {
c.cb.ConnectionCB(c, nil)
c.SendMsg(nil)
c.exitSync.Wait()
atomic.StoreInt32(&c.isClose, 1)
close(c.sendCh)
if c.conn != nil {
c.conn.Close()
c.conn = nil
}
logrus.WithFields(logrus.Fields{"id:": c.id}).Info("TcpConnection Close")
}
func (c *Connection) start() {
c.conn.SetNoDelay(true)
c.conn.SetKeepAlive(true)
//同步退出 goroutine
c.exitSync.Add(2)
//开启读写 goroutine
go c.recvMsgLoop()
go c.sendMsgLoop()
}
func (c *Connection) SendMsg(msg *bytes.Buffer) error {
if c.IsClose() {
//关闭不能发送消息
return nil
}
//推入发送循环
c.sendCh <- msg
return nil
}
func (c *Connection) sendMsgLoop() {
defer func() {
if err := recover(); err != nil {
c.exitSync.Done()
if e, ok := err.(error); ok {
logrus.WithError(errors.WithStack(e)).Error("sendMsgLoop error")
}
}
}()
for msg := range c.sendCh {
if msg == nil || c.conn == nil {
break
}
msgLen := uint16(msg.Len())
buf := make([]byte, MsgHeaderMaxSize)
binary.LittleEndian.PutUint16(buf, msgLen)
_, err := c.conn.Write(buf)
if err != nil {
break
}
_, err = c.conn.Write(msg.Bytes())
if err != nil {
break
}
}
//关闭socket 从读操作退出
c.exitSync.Done()
}
func (c *Connection) recvMsgLoop() {
defer func() {
if err := recover(); err != nil {
c.exitSync.Done()
// 退出处理
c.Close()
//打印堆栈
if e, ok := err.(error); ok {
logrus.WithError(errors.WithStack(e)).Error("recvMsgLoop error")
}
}
}()
var err error
var msgLen int
msgHeader := make([]byte, MsgHeaderMaxSize)
for {
_, err = io.ReadFull(c.conn, msgHeader)
if err != nil {
break
}
msgLen = int(binary.LittleEndian.Uint16(msgHeader))
if msgLen <= 0 || msgLen > 65535 {
break
}
msgBody := make([]byte, msgLen)
_, err = io.ReadFull(c.conn, msgBody)
if err != nil {
break
}
c.cb.ConnectionCB(c, msgBody)
}
}