-
Notifications
You must be signed in to change notification settings - Fork 980
/
Copy pathtopic_test.go
151 lines (134 loc) · 4.86 KB
/
topic_test.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package test
import (
"context"
"strconv"
"sync"
"testing"
"github.com/dtm-labs/dtm/client/dtmcli"
"github.com/dtm-labs/dtm/client/dtmcli/dtmimp"
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgimp"
"github.com/dtm-labs/dtm/client/dtmgrpc/dtmgpb"
"github.com/dtm-labs/dtm/dtmutil"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
const (
testTopicTestTopicNormal = "test_topic_TestTopicNormal"
testTopicTestConcurrentUpdateTopic = "concurrent_topic_TestConcurrentUpdateTopic"
)
func TestTopicNormal(t *testing.T) {
testSubscribe(t, httpSubscribe)
testUnsubscribe(t, httpUnsubscribe)
testDeleteTopic(t, httpDeleteTopic)
testSubscribe(t, grpcSubscribe)
testUnsubscribe(t, grpcUnsubscribe)
testDeleteTopic(t, grpcDeleteTopic)
}
func TestConcurrentUpdateTopic(t *testing.T) {
var wg sync.WaitGroup
var urls []string
var errNum int
concurrentTimes := 20
// concurrently updates the topic, part of them succeed
for i := 0; i < concurrentTimes; i++ {
wg.Add(1)
go func(i int) {
url := "http://dtm/test" + strconv.Itoa(i)
err := httpSubscribe(testTopicTestConcurrentUpdateTopic, url)
if err == nil {
urls = append(urls, url)
} else {
errNum++
}
wg.Done()
}(i)
}
wg.Wait()
assert.True(t, len(urls) > 0)
// delete successfully subscribed urls above, all of them should succeed
for _, url := range urls {
assert.Nil(t, httpUnsubscribe(testTopicTestConcurrentUpdateTopic, url))
}
// finally, the topic version should be correct
m := map[string]interface{}{}
resp, err := dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
"cat": "topics",
"key": testTopicTestConcurrentUpdateTopic,
}).Get(dtmutil.DefaultHTTPServer + "/queryKV")
assert.Nil(t, err)
dtmimp.MustUnmarshalString(resp.String(), &m)
dtmimp.MustRemarshal(m["kv"].([]interface{})[0], &m)
assert.Equal(t, float64((concurrentTimes-errNum)*2), m["version"])
}
func testSubscribe(t *testing.T, subscribe func(topic, url string) error) {
assert.Nil(t, subscribe(testTopicTestTopicNormal, "http://dtm/test1"))
assert.Error(t, subscribe(testTopicTestTopicNormal, "http://dtm/test1")) // error:repeat subscription
assert.Error(t, subscribe("", "http://dtm/test1")) // error:empty topic
assert.Error(t, subscribe(testTopicTestTopicNormal, "")) // error:empty url
assert.Nil(t, subscribe(testTopicTestTopicNormal, "http://dtm/test2"))
}
func testUnsubscribe(t *testing.T, unsubscribe func(topic, url string) error) {
assert.Nil(t, unsubscribe(testTopicTestTopicNormal, "http://dtm/test1"))
assert.Error(t, unsubscribe(testTopicTestTopicNormal, "http://dtm/test1")) // error:repeat unsubscription
assert.Error(t, unsubscribe("", "http://dtm/test1")) // error:empty topic
assert.Error(t, unsubscribe(testTopicTestTopicNormal, "")) // error:empty url
assert.Error(t, unsubscribe("non_existent_topic", "http://dtm/test1")) // error:unsubscribe a non-existent topic
assert.Nil(t, unsubscribe(testTopicTestTopicNormal, "http://dtm/test2"))
assert.Error(t, unsubscribe(testTopicTestTopicNormal, "http://dtm/test2"))
}
func testDeleteTopic(t *testing.T, deleteTopic func(topic string) error) {
assert.Error(t, deleteTopic("non_existent_testDeleteTopic"))
assert.Nil(t, deleteTopic(testTopicTestTopicNormal))
}
func httpSubscribe(topic, url string) error {
resp, err := dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
"topic": topic,
"url": url,
"remark": "for test",
}).Get(dtmutil.DefaultHTTPServer + "/subscribe")
e2p(err)
if resp.StatusCode() != 200 {
err = errors.Errorf("Http Request Error. Resp:%v", resp.String())
}
return err
}
func httpUnsubscribe(topic, url string) error {
resp, err := dtmcli.GetRestyClient().R().SetQueryParams(map[string]string{
"topic": topic,
"url": url,
}).Get(dtmutil.DefaultHTTPServer + "/unsubscribe")
e2p(err)
if resp.StatusCode() != 200 {
err = errors.Errorf("Http Request Error. Resp:%+v", resp.String())
}
return err
}
func httpDeleteTopic(topic string) error {
resp, err := dtmcli.GetRestyClient().R().Delete(dtmutil.DefaultHTTPServer + "/topic/" + topic)
e2p(err)
if resp.StatusCode() != 200 {
err = errors.Errorf("Http Request Error. Resp:%+v", resp.String())
}
return err
}
func grpcSubscribe(topic, url string) error {
_, err := dtmgimp.MustGetDtmClient(dtmutil.DefaultGrpcServer).Subscribe(context.Background(),
&dtmgpb.DtmTopicRequest{
Topic: topic,
URL: url,
Remark: "for test"})
return err
}
func grpcUnsubscribe(topic, url string) error {
_, err := dtmgimp.MustGetDtmClient(dtmutil.DefaultGrpcServer).Unsubscribe(context.Background(),
&dtmgpb.DtmTopicRequest{
Topic: topic,
URL: url})
return err
}
func grpcDeleteTopic(topic string) error {
_, err := dtmgimp.MustGetDtmClient(dtmutil.DefaultGrpcServer).DeleteTopic(context.Background(),
&dtmgpb.DtmTopicRequest{
Topic: topic})
return err
}