Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

fix(decl/cmd/test): wait for goroutines termination upon failures #256

Merged
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 32 additions & 15 deletions cmd/declarative/test/test.go
Original file line number Diff line number Diff line change
@@ -171,52 +171,65 @@ func (cw *CommandWrapper) run(cmd *cobra.Command, _ []string) {

ctx, cancel := context.WithTimeout(ctx, cw.TestsTimeout)
defer cancel()
exitAndCancel := func() {
cancelAndExit := func() {
cancel()
os.Exit(1)
}

labels, err := label.ParseSet(cw.Labels)
if err != nil {
logger.Error(err, "Error parsing labels")
exitAndCancel()
cancelAndExit()
}

logger = enrichLoggerWithLabels(logger, labels)

description, err := loadTestsDescription(logger, cw.TestsDescriptionFile, cw.TestsDescription)
if err != nil {
logger.Error(err, "Error loading tests description")
exitAndCancel()
cancelAndExit()
}

runnerBuilder, err := cw.createRunnerBuilder()
if err != nil {
logger.Error(err, "Error creating runner builder")
exitAndCancel()
cancelAndExit()
}

// Retrieve the already populated test ID. The test ID absence is used to uniquely identify the root process in the
// process chain.
testID := cw.TestID
isRootProcess := testID == ""

// globalWaitGroup accounts for all spawned goroutines.
globalWaitGroup := sync.WaitGroup{}
waitAndExit := func() {
cancel()
globalWaitGroup.Wait()
os.Exit(1)
}

// Initialize tester and Falco alerts collection.
var testr tester.Tester
testerWaitGroup := sync.WaitGroup{}
if isRootProcess && !cw.skipOutcomeVerification {
if testr, err = cw.createTester(logger); err != nil {
logger.Error(err, "Error creating tester")
exitAndCancel()
cancelAndExit()
}

globalWaitGroup.Add(1)
go func() {
defer globalWaitGroup.Done()
defer cancel()
if err := testr.StartAlertsCollection(ctx); err != nil {
logger.Error(err, "Error starting tester execution")
exitAndCancel()
}
}()
}

// testerWaitGroup accounts for all goroutines producing reports.
testerWaitGroup := sync.WaitGroup{}

// Prepare parameters shared by all runners.
runnerEnviron := cw.buildRunnerEnviron(cmd)
var runnerLabels *label.Set
@@ -248,7 +261,7 @@ func (cw *CommandWrapper) run(cmd *cobra.Command, _ []string) {
testUID, err = extractTestUID(testID)
if err != nil {
logger.Error(err, "Error extracting test UID from test ID", "testId", testID)
exitAndCancel()
waitAndExit()
}
}

@@ -267,23 +280,25 @@ func (cw *CommandWrapper) run(cmd *cobra.Command, _ []string) {
runnerInstance, err := runnerBuilder.Build(testDesc.Runner, runnerLogger, runnerDescription)
if err != nil {
logger.Error(err, "Error creating runner")
exitAndCancel()
waitAndExit()
}

logger.Info("Starting test execution...")
if err := runnerInstance.Run(ctx, testID, testIndex, testDesc); err != nil {
logRunnerError(logger, err)
exitAndCancel()
waitAndExit()
}

logger.Info("Test execution completed")

if testr != nil {
produceReport(&testerWaitGroup, testr, &testUID, testDesc, cw.reportFormat)
produceReport(&globalWaitGroup, &testerWaitGroup, testr, &testUID, testDesc, cw.reportFormat)
}
}

testerWaitGroup.Wait()
cancel()
globalWaitGroup.Wait()
}

// enrichLoggerWithLabels creates a new logger, starting from the provided one, with the information extracted from the
@@ -462,11 +477,13 @@ func logRunnerError(logger logr.Logger, err error) {
}

// produceReport produces a report for the given test by using the provided tester.
func produceReport(wg *sync.WaitGroup, testr tester.Tester, testUID *uuid.UUID, testDesc *loader.Test,
reportFmt reportFormat) {
wg.Add(1)
func produceReport(globalWaitGroup, testerWaitGroup *sync.WaitGroup, testr tester.Tester, testUID *uuid.UUID,
testDesc *loader.Test, reportFmt reportFormat) {
globalWaitGroup.Add(1)
testerWaitGroup.Add(1)
go func() {
defer wg.Done()
defer globalWaitGroup.Done()
defer testerWaitGroup.Done()
testName, ruleName := testDesc.Name, testDesc.Rule
report := getReport(testr, testUID, ruleName, &testDesc.ExpectedOutcome)
report.TestName, report.RuleName = testName, ruleName
16 changes: 4 additions & 12 deletions pkg/test/tester/tester/tester.go
Original file line number Diff line number Diff line change
@@ -65,7 +65,7 @@ func (t *testerImpl) StartAlertsCollection(ctx context.Context) error {
}

alertInfoCh := t.filterAlertsWithUID(ctx, alertCh)
t.startAlertsCaching(ctx, alertInfoCh)
t.startAlertsCaching(alertInfoCh)
return nil
}

@@ -142,17 +142,9 @@ func (t *testerImpl) findUID(alrt *alert.Alert) *uuid.UUID {
}

// startAlertsCaching starts caching the alerts received through the provided channel.
func (t *testerImpl) startAlertsCaching(ctx context.Context, alertInfoCh <-chan *alertInfo) {
for {
select {
case <-ctx.Done():
return
case info, ok := <-alertInfoCh:
if !ok {
return
}
t.cacheAlert(info.uid, info.alert)
}
func (t *testerImpl) startAlertsCaching(alertInfoCh <-chan *alertInfo) {
for info := range alertInfoCh {
t.cacheAlert(info.uid, info.alert)
}
}

Loading