-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathconfig.go
221 lines (186 loc) · 6.28 KB
/
config.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
package rabbids
import (
"fmt"
"io"
"io/ioutil"
"os"
"path/filepath"
"time"
"github.com/a8m/envsubst"
"github.com/mitchellh/mapstructure"
"github.com/streadway/amqp"
yaml "gopkg.in/yaml.v3"
)
const (
Version = "0.0.1"
DefaultTimeout = 2 * time.Second
DefaultSleep = 500 * time.Millisecond
DefaultRetries = 5
)
// File represents the file operations needed to works with our config loader.
type File interface {
io.Reader
Stat() (os.FileInfo, error)
}
// Config describes all available options to declare all the components used by
// rabbids Consumers and Producers.
type Config struct {
// Connections describe the connections used by consumers.
Connections map[string]Connection `mapstructure:"connections"`
// Exchanges have all the exchanges used by consumers.
// This exchanges are declared on startup of the rabbids client.
Exchanges map[string]ExchangeConfig `mapstructure:"exchanges"`
// DeadLetters have all the deadletters queues used internally by other queues
// This will be declared at startup of the rabbids client.
DeadLetters map[string]DeadLetter `mapstructure:"dead_letters"`
// Consumers describes configuration list for consumers.
Consumers map[string]ConsumerConfig `mapstructure:"consumers"`
// Registered Message handlers used by consumers
Handlers map[string]MessageHandler
}
// Connection describe a config for one connection.
type Connection struct {
DSN string `mapstructure:"dsn"`
Timeout time.Duration `mapstructure:"timeout"`
Sleep time.Duration `mapstructure:"sleep"`
Retries int `mapstructure:"retries"`
}
// ConsumerConfig describes consumer's configuration.
type ConsumerConfig struct {
Connection string `mapstructure:"connection"`
Workers int `mapstructure:"workers"`
PrefetchCount int `mapstructure:"prefetch_count"`
DeadLetter string `mapstructure:"dead_letter"`
Queue QueueConfig `mapstructure:"queue"`
Options Options `mapstructure:"options"`
}
// ExchangeConfig describes exchange's configuration.
type ExchangeConfig struct {
Type string `mapstructure:"type"`
Options Options `mapstructure:"options"`
}
// DeadLetter describe all the dead letters queues to be declared before declare other queues.
type DeadLetter struct {
Queue QueueConfig `mapstructure:"queue"`
}
// QueueConfig describes queue's configuration.
type QueueConfig struct {
Name string `mapstructure:"name"`
Bindings []Binding `mapstructure:"bindings"`
Options Options `mapstructure:"options"`
}
// Binding describe how a queue connects to a exchange.
type Binding struct {
Exchange string `mapstructure:"exchange"`
RoutingKeys []string `mapstructure:"routing_keys"`
Options Options `mapstructure:"options"`
}
// Options describes optionals configuration
// for consumer, queue, bindings and exchanges declaration.
type Options struct {
Durable bool `mapstructure:"durable"`
Internal bool `mapstructure:"internal"`
AutoDelete bool `mapstructure:"auto_delete"`
Exclusive bool `mapstructure:"exclusive"`
NoWait bool `mapstructure:"no_wait"`
NoLocal bool `mapstructure:"no_local"`
AutoAck bool `mapstructure:"auto_ack"`
Args amqp.Table `mapstructure:"args"`
}
func setConfigDefaults(config *Config) {
for k := range config.Connections {
cfg := config.Connections[k]
if cfg.Retries == 0 {
cfg.Retries = DefaultRetries
}
if cfg.Sleep == 0 {
cfg.Sleep = DefaultSleep
}
if cfg.Timeout == 0 {
cfg.Timeout = DefaultTimeout
}
config.Connections[k] = cfg
}
for k := range config.Consumers {
cfg := config.Consumers[k]
if cfg.Workers <= 0 {
cfg.Workers = 1
}
if cfg.PrefetchCount <= 0 {
// we need at least 2 more messages than our worker to be able to see workers blocked
cfg.PrefetchCount = cfg.Workers + 2
}
config.Consumers[k] = cfg
}
}
// RegisterHandler is used to set the MessageHandler used by one Consumer.
// The consumerName MUST be equal as the name used by the Consumer
// (the key inside the map of consumers).
func (c *Config) RegisterHandler(consumerName string, h MessageHandler) {
if c.Handlers == nil {
c.Handlers = map[string]MessageHandler{}
}
c.Handlers[consumerName] = h
}
// ConfigFromFilename is a wrapper to open the file and pass to ConfigFromFile.
func ConfigFromFilename(filename string) (*Config, error) {
file, err := os.Open(filename)
if err != nil {
return nil, fmt.Errorf("failed to open %s: %w", filename, err)
}
defer file.Close()
return ConfigFromFile(file)
}
// ConfigFromFilename read a YAML file and convert it into a Config struct
// with all the configuration to build the Consumers and producers.
// Also, it Is possible to use environment variables values inside the YAML file.
// The syntax is like the syntax used inside the docker-compose file.
// To use a required variable just use like this: ${ENV_NAME}
// and to put an default value you can use: ${ENV_NAME:=some-value} inside any value.
// If a required variable didn't exist, an error will be returned.
func ConfigFromFile(file File) (*Config, error) {
input := map[string]interface{}{}
output := &Config{}
body, err := ioutil.ReadAll(file)
if err != nil {
return nil, fmt.Errorf("failed to read the file: %w", err)
}
in, err := envsubst.BytesRestricted(body, true, false)
if err != nil {
return nil, fmt.Errorf("failed to parse some environment variables: %w", err)
}
stat, err := file.Stat()
if err != nil {
return nil, fmt.Errorf("failed to get the file stats: %w", err)
}
switch getConfigType(stat.Name()) {
case "yaml", "yml":
err = yaml.Unmarshal(in, &input)
if err != nil {
return nil, fmt.Errorf("failed to decode the yaml configuration. %w", err)
}
default:
return nil, fmt.Errorf("file extension %s not supported", getConfigType(stat.Name()))
}
decoder, err := mapstructure.NewDecoder(&mapstructure.DecoderConfig{
Metadata: nil,
Result: output,
WeaklyTypedInput: true,
DecodeHook: mapstructure.ComposeDecodeHookFunc(
mapstructure.StringToTimeDurationHookFunc(),
mapstructure.StringToSliceHookFunc(","),
),
})
if err != nil {
return nil, err
}
err = decoder.Decode(input)
return output, err
}
func getConfigType(file string) string {
ext := filepath.Ext(file)
if len(ext) > 1 {
return ext[1:]
}
return ""
}