-
Notifications
You must be signed in to change notification settings - Fork 5
/
rtm_broker.go
100 lines (79 loc) · 1.87 KB
/
rtm_broker.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
package slacker
import (
"encoding/json"
"time"
"github.com/gorilla/websocket"
)
// RTMBroker handles incoming and outgoing messages to a Slack RTM Websocket
type RTMBroker struct {
url string
incoming chan []byte
outgoing chan []byte
events chan RTMEvent
conn *websocket.Conn
closed bool
}
// RTMEvent repesents a simple event received from Slack
type RTMEvent struct {
Type string `json:"type"`
RawMessage json.RawMessage
}
// NewRTMBroker returns a connected broker to Slack from a rtm.start result
func NewRTMBroker(s *RTMStartResult) *RTMBroker {
broker := &RTMBroker{
url: s.URL,
}
return broker
}
// Connect connects to the RTM Websocket
func (b *RTMBroker) Connect() error {
conn, _, err := websocket.DefaultDialer.Dial(b.url, nil)
if err != nil {
return err
}
b.conn = conn
b.incoming = make(chan []byte, 0)
b.events = make(chan RTMEvent, 0)
go b.startRecv()
go b.handleEvents()
return nil
}
// Close Closes the connection to Slack RTM
func (b *RTMBroker) Close() error {
b.closed = true
return b.conn.Close()
}
// Events returns a receive-only channel for all Events RTM API pushes
// to the broker.
func (b *RTMBroker) Events() <-chan RTMEvent {
return b.events
}
// Publish pushes an event to the RTM Websocket
func (b *RTMBroker) Publish(e Publishable) error {
d, err := e.Publishable()
if err != nil {
return err
}
return b.conn.WriteMessage(websocket.TextMessage, d)
}
func (b *RTMBroker) startRecv() {
for !b.closed {
msgType, message, _ := b.conn.ReadMessage()
if msgType == websocket.TextMessage {
b.incoming <- message
}
time.Sleep(25 * time.Millisecond)
}
}
func (b *RTMBroker) handleEvents() {
for !b.closed {
raw := json.RawMessage(<-b.incoming)
rtmEvent := RTMEvent{
RawMessage: raw,
}
if err := json.Unmarshal(raw, &rtmEvent); err != nil {
panic(err)
}
b.events <- rtmEvent
}
}