Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

feeder: handle race condition #646

Merged
merged 1 commit into from
Mar 10, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 21 additions & 9 deletions KubeArmor/feeder/feeder.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,7 @@ func (ls *LogService) removeMsgStruct(uid string) {
func (ls *LogService) WatchMessages(req *pb.RequestMessage, svr pb.LogService_WatchMessagesServer) error {
uid := uuid.Must(uuid.NewRandom()).String()
conn := make(chan *pb.Message)
defer close(conn)
ls.addMsgStruct(uid, conn, req.Filter)
defer ls.removeMsgStruct(uid)

Expand Down Expand Up @@ -173,6 +174,7 @@ func (ls *LogService) WatchAlerts(req *pb.RequestMessage, svr pb.LogService_Watc
return nil
}
conn := make(chan *pb.Alert)
defer close(conn)
ls.addAlertStruct(uid, conn, req.Filter)
defer ls.removeAlertStruct(uid)

Expand Down Expand Up @@ -229,6 +231,7 @@ func (ls *LogService) WatchLogs(req *pb.RequestMessage, svr pb.LogService_WatchL
return nil
}
conn := make(chan *pb.Log)
defer close(conn)
ls.addLogStruct(uid, conn, req.Filter)
defer ls.removeLogStruct(uid)

Expand Down Expand Up @@ -497,11 +500,14 @@ func (fd *Feeder) PushMessage(level, message string) {
pbMsg.Level = level
pbMsg.Message = message

// MsgLock.Lock()
// defer MsgLock.Unlock()
MsgLock.Lock()
defer MsgLock.Unlock()

for uid := range MsgStructs {
MsgStructs[uid].Broadcast <- &pbMsg
select {
case MsgStructs[uid].Broadcast <- &pbMsg:
default:
}
}
}

Expand Down Expand Up @@ -590,11 +596,14 @@ func (fd *Feeder) PushLog(log tp.Log) {

pbAlert.Result = log.Result

// AlertLock.Lock()
// defer AlertLock.Unlock()
AlertLock.Lock()
defer AlertLock.Unlock()

for uid := range AlertStructs {
AlertStructs[uid].Broadcast <- &pbAlert
select {
case AlertStructs[uid].Broadcast <- &pbAlert:
default:
}
}
} else { // ContainerLog
pbLog := pb.Log{}
Expand Down Expand Up @@ -630,11 +639,14 @@ func (fd *Feeder) PushLog(log tp.Log) {

pbLog.Result = log.Result

// LogLock.Lock()
// defer LogLock.Unlock()
LogLock.Lock()
defer LogLock.Unlock()

for uid := range LogStructs {
LogStructs[uid].Broadcast <- &pbLog
select {
case LogStructs[uid].Broadcast <- &pbLog:
default:
}
}
}
}