-
Notifications
You must be signed in to change notification settings - Fork 0
/
main.go
120 lines (106 loc) · 2.76 KB
/
main.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
package main
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"sync"
"time"
"github.com/joho/godotenv"
"go.mongodb.org/mongo-driver/bson"
"go.mongodb.org/mongo-driver/bson/primitive"
"go.mongodb.org/mongo-driver/mongo"
"go.mongodb.org/mongo-driver/mongo/options"
)
// Fruit data type for inserting into collection
type Fruit struct {
ID primitive.ObjectID
Name string
Quantity int
}
// `json:"id" bson:"_id,omitempty"`
// `json:"name" bson:"name,omitempty"`
// `json:"quantity" bson:"quantity,omitempty"`
// type ReceiveDocument struct{
// }
func main() {
err := godotenv.Load()
if err != nil {
log.Fatal("Error loading .env file")
}
// URI for connect
user := os.Getenv("MONGO_USER")
pwd := os.Getenv("MONGO_USER_PASSWORD")
uri := fmt.Sprintf("mongodb+srv://%s:%s@cluster0-yyofy.gcp.mongodb.net/test?retryWrites=true&w=majority", user, pwd)
// Connect to MongoDB
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
client, err := mongo.Connect(ctx, options.Client().ApplyURI(uri))
if err != nil {
log.Fatal(err)
}
fmt.Println("Connected to MongoDB!")
cancel()
db := "demo"
collec := "stock"
stock := client.Database(db).Collection(collec)
// Context will expire in 6 seconds. The change and receive will expire accordingly
chgCtx, chgCancel := context.WithTimeout(context.Background(), 6*time.Second)
// chgCancel will cancel the context before the application goes out of scope.
// This will be activated only when the application doesn't timeout in 6 sec
defer chgCancel()
matchPipeline := bson.D{
{
"$match", bson.D{
{"operationType", "insert"},
{"fullDocument.quantity", bson.D{
{"$gt", 15},
}},
},
},
}
chgStream, err := stock.Watch(chgCtx, mongo.Pipeline{matchPipeline})
if err != nil {
log.Fatal(err)
}
defer chgStream.Close(chgCtx)
var wg sync.WaitGroup
wg.Add(1)
// Receive changes
// go createChange(stock)
go receiveChange(chgCtx, &wg, chgStream)
wg.Wait()
}
func receiveChange(routineCtx context.Context, waitGroup *sync.WaitGroup, stream *mongo.ChangeStream) {
defer stream.Close(routineCtx)
defer waitGroup.Done()
for stream.Next(routineCtx) {
var data bson.M
if err := stream.Decode(&data); err != nil {
log.Fatal(err)
}
document := data["fullDocument"].(bson.M)
var fruit Fruit
bsonBytes, err := bson.Marshal(document)
if err != nil {
log.Fatal(err)
}
if err := bson.Unmarshal(bsonBytes, &fruit); err != nil {
log.Fatal(err)
}
fmt.Println(fruit.Quantity)
}
select {
case <-routineCtx.Done():
fmt.Println("Operation timed out")
return
}
}
func createChange(stock *mongo.Collection) {
fruit := Fruit{Name: "Pineapple"}
for true {
fruit.Quantity = rand.Intn(99-10) + 10
fmt.Println("Inserting Value", fruit)
time.Sleep(2 * time.Second)
}
}