-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathintegration_producer_test.go
123 lines (98 loc) · 3.12 KB
/
integration_producer_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
package rabbids_test
import (
"sync"
"sync/atomic"
"testing"
"time"
"github.com/leveeml/rabbids"
"github.com/streadway/amqp"
"github.com/stretchr/testify/require"
"gopkg.in/ory-am/dockertest.v3"
)
func TestBasicIntegrationProducer(t *testing.T) {
integrationTest(t)
tests := []struct {
scenario string
method func(*testing.T, *dockertest.Resource)
}{
{
scenario: "test producer with connection problems",
method: testProducerWithReconnect,
},
{
scenario: "test send delay messages",
method: testPublishWithDelay,
},
}
// -> Setup
dockerPool, err := dockertest.NewPool("")
require.NoError(t, err, "Coud not connect to docker")
resource, err := dockerPool.Run("rabbitmq", "3.6.12-management", []string{})
require.NoError(t, err, "Could not start resource")
// -> TearDown
t.Cleanup(func() {
if err := dockerPool.Purge(resource); err != nil {
t.Errorf("Could not purge resource: %s", err)
}
})
// -> Run!
for _, test := range tests {
tt := test
t.Run(test.scenario, func(st *testing.T) {
tt.method(st, resource)
})
}
}
func testProducerWithReconnect(t *testing.T, resource *dockertest.Resource) {
t.Parallel()
var wg sync.WaitGroup
adminClient := getRabbitClient(t, resource)
producer, err := rabbids.NewProducer(getDSN(resource), rabbids.WithCustomName("test-reconnect"))
require.NoError(t, err, "could not connect to: ", getDSN(resource))
ch := producer.GetAMQPChannel()
_, err = ch.QueueDeclare("testProducerWithReconnect", true, false, false, false, amqp.Table{})
require.NoError(t, err)
var emitWithErrors int64
wg.Add(1)
go func() {
defer wg.Done()
for pErr := range producer.EmitErr() {
t.Logf("received a emitErr: %v", pErr)
atomic.AddInt64(&emitWithErrors, 1)
}
}()
for i := 1; i <= 1000; i++ {
if i%100 == 0 {
closeRabbitMQConnections(t, adminClient, "test-reconnect")
}
producer.Emit() <- rabbids.NewPublishing("", "testProducerWithReconnect",
map[string]int{"test": i},
)
time.Sleep(time.Millisecond)
}
err = producer.Close()
require.NoError(t, err, "error closing the connection")
wg.Wait()
count := getQueueLength(t, adminClient, "testProducerWithReconnect", 40*time.Second)
t.Logf("Finished published with %d messages inside the queue and %d messages with error", count, emitWithErrors)
}
func testPublishWithDelay(t *testing.T, resource *dockertest.Resource) {
t.Parallel()
adminClient := getRabbitClient(t, resource)
producer, err := rabbids.NewProducer(getDSN(resource))
require.NoError(t, err, "could not connect to: ", getDSN(resource))
ch := producer.GetAMQPChannel()
_, err = ch.QueueDeclare("testPublishWithDelay", true, false, false, false, amqp.Table{})
require.NoError(t, err)
err = producer.Send(rabbids.NewDelayedPublishing(
"testPublishWithDelay",
10*time.Second,
map[string]string{"test": "fooo"},
))
require.NoError(t, err, "error on rab.Send")
time.Sleep(15 * time.Second)
err = producer.Close()
require.NoError(t, err, "error closing the connection")
count := getQueueLength(t, adminClient, "testPublishWithDelay", 10*time.Second)
require.Equal(t, 1, count, "expecting the message inside the queue")
}