-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathsipengine.go
127 lines (110 loc) · 3.18 KB
/
sipengine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
package sipengine
import (
"bytes"
"context"
"github.com/pkg/errors"
"net"
)
type Engine struct {
//Context will be used for managing shutdown operations.
address string
ctx context.Context
steps []SIPStep
channels ChannelMap
}
type ChannelMap struct {
Ingress chan *Message
Egress chan *Message
Error chan error
}
//NewEngine will return the engined used for processing. We are allowing
//users to provide their own channels as input to let them decide
//how (or if) they should be buffered. This decouples this layer from
//configuration or assumptions about the user's preferences for traffic.
func NewEngine(address string, ctx context.Context, channelmap ChannelMap, steps... SIPStep) Engine {
return Engine{
address: address,
ctx: ctx,
steps: steps,
channels: channelmap,
}
}
//Start listening and funneling off requests
func (e Engine) ListenAndServe() error {
pc, err := net.ListenPacket("udp", e.address)
if err != nil {
return errors.Wrap(err, "failure during startup")
}
//Processing for messages on ingress channel
go func(){
for {
select {
case m := <- e.channels.Ingress:
//Spin off a separate routine for each message to process in order to
//allow for termination of execution as users signal for it.
mctx, mcancel := context.WithCancel(e.ctx)
defer mcancel()
go func(){
for _, v := range e.steps {
select {
//Don't work on this message if the context is already cancelled
//Or completed
case <- mctx.Done():
return
default:
//
}
err := v(m)
if err != nil {
//Is it a termination error?
if _, ok := errors.Cause(err).(*MessageTerminationError); ok {
//Let's stop all processing for the routine for this message
return
} else {
e.channels.Error <- errors.Wrap(err,"error during pipeline step processing")
}
}
}
}()
case <- e.ctx.Done():
//The context has marked itself as complete so let's stop processing new messages
return
default:
//
}
}
}()
//Begin pulling messages off of the listener.
go func(){
for {
//We should think about having the buffer size value provided somehow
buf := make([]byte, 2048)
_, addr, err := pc.ReadFrom(buf)
if err != nil {
//This is a scenario where we would want to stop as we can't listen from the network
//for some reason
e.channels.Error <- errors.Wrap(err, "unable to read from listening socket")
return
}
message, err := NewMessage(bytes.NewReader(buf), e.ctx)
message.Detail.From = addr.String()
if err != nil {
e.channels.Error <- err
return
}
e.channels.Ingress <- message
}
}()
// `Close`ing the packet "connection" means cleaning the data structures
// allocated for holding information about the listening socket.
defer pc.Close()
//Also need to look for context cancellations to close the connection.
select {
case <- e.ctx.Done():
pc.Close()
//Wait for existing messages to filter out
//TODO: How to track active dialogs and messages without state
err = NewShutDownSignalError("Context has been cancelled. Shutdown initiating")
}
return err
}