-
Notifications
You must be signed in to change notification settings - Fork 2
/
client.rb
153 lines (139 loc) · 3.67 KB
/
client.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
module WebSocket
def self.create_key
B64.encode(Sysrandom.buf(16)).chomp!
end
class Connection
def initialize(host, port, path, *args)
@host = host
@port = port
@path = path
@tcp_socket = TCPSocket.new host, port
@socket = Tls::Client.new(*args)
@socket.connect_socket @tcp_socket.fileno, host
setup()
end
def recv(timeout)
if @msgs.empty?
if @client.want_read?
@socket_pi.events = Poll::In
@client.recv if @poll.wait(timeout)
end
end
@msgs.shift
ensure
@socket_pi.events = Poll::Out
while @client.want_write?
if @poll.wait(timeout)
@client.send
else
break
end
end
end
def send(msg, opcode, timeout)
if opcode
@client.queue_msg(msg, opcode)
else
@client.queue_msg(msg)
end
@socket_pi.events = Poll::Out
while @client.want_write?
if @poll.wait(timeout)
@client.send
else
break
end
end
self
end
def close(status_code, reason, timeout)
if reason
@client.queue_close(status_code, reason)
else
@client.queue_close(status_code)
end
while @client.want_write?||@client.want_read?
@socket_pi.events = 0
@socket_pi.events |= Poll::In if @client.want_read?
@socket_pi.events |= Poll::Out if @client.want_write?
if @poll.wait(timeout)
@client.send if @socket_pi.writable?
@client.recv if @socket_pi.readable?
else
break
end
end
@msgs.dup
ensure
@msgs.clear
@tcp_socket._setnonblock(false)
@socket.close
@tcp_socket.close
end
private
def setup
http_handshake
make_nonblock
setup_poller
setup_ws
rescue => e
@tcp_socket._setnonblock(false)
@socket.close
@tcp_socket.close
raise e
end
def http_handshake
key = WebSocket.create_key
@socket.write("GET #{@path} HTTP/1.1\r\nHost: #{@host}:#{@port}\r\nConnection: Upgrade\r\nUpgrade: websocket\r\nSec-WebSocket-Version: 13\r\nSec-WebSocket-Key: #{key}\r\n\r\n")
buf = @socket.read
phr = Phr.new
while true
case phr.parse_response(buf)
when Fixnum
break
when :incomplete
buf << @socket.read
when :parser_error
raise Error, "HTTP Parser error"
end
end
header = phr.headers.each do |key, value|
return value if key.downcase == 'sec-websocket-accept'
end
unless WebSocket.create_accept(key).securecmp(header)
raise Error, "Handshake failure"
end
end
def make_nonblock
@tcp_socket._setnonblock(true)
end
def setup_poller
@poll = Poll.new
@socket_pi = @poll.add(@tcp_socket)
end
def setup_ws
@callbacks = Wslay::Event::Callbacks.new
@callbacks.recv_callback {|buf, len| @socket.read_nonblock len}
@callbacks.send_callback {|buf| @socket.write_nonblock buf}
@msgs = []
@callbacks.on_msg_recv_callback {|msg| @msgs << msg}
@client = Wslay::Event::Context::Client.new @callbacks
end
end
class Client
def initialize(host, port, path, *args)
@connection = Connection.new(host, port, path, *args)
end
def recv(timeout = -1)
@connection.recv(timeout)
end
def send(msg, opcode = nil, timeout = -1)
@connection.send(msg, opcode, timeout)
self
end
alias :<< :send
def close(status_code = :normal_closure, reason = nil, timeout = -1)
@connection.close(status_code, reason, timeout)
end
end
end