diff --git a/engine/engine.go b/engine/engine.go index b2844519..05fc6192 100644 --- a/engine/engine.go +++ b/engine/engine.go @@ -14,7 +14,9 @@ import ( "sync/atomic" "go.lsp.dev/uri" + "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/propagation" "github.com/cbroglie/mustache" "github.com/go-logr/logr" @@ -31,11 +33,12 @@ type RuleEngine interface { } type ruleMessage struct { - rule Rule - ruleSetName string - ctx ConditionContext - scope Scope - returnChan chan response + rule Rule + ruleSetName string + conditionContext ConditionContext + scope Scope + returnChan chan response + carrier propagation.TextMapCarrier } type response struct { @@ -125,18 +128,21 @@ func (r *ruleEngine) Stop() { } func processRuleWorker(ctx context.Context, ruleMessages chan ruleMessage, logger logr.Logger, wg *sync.WaitGroup) { + prop := otel.GetTextMapPropagator() for { select { case m := <-ruleMessages: logger.V(5).Info("taking rule", "ruleset", m.ruleSetName, "rule", m.rule.RuleID) newLogger := logger.WithValues("ruleID", m.rule.RuleID) //We createa new rule context for a every rule run, here we need to apply the scope - m.ctx.Template = make(map[string]ChainTemplate) + m.conditionContext.Template = make(map[string]ChainTemplate) if m.scope != nil { - m.scope.AddToContext(&m.ctx) + m.scope.AddToContext(&m.conditionContext) } + logger.Info("Adding Carrier span info to context") + ctx = prop.Extract(ctx, m.carrier) - bo, err := processRule(ctx, m.rule, m.ctx, newLogger) + bo, err := processRule(ctx, m.rule, m.conditionContext, newLogger) logger.V(5).Info("finished rule", "found", len(bo.Incidents), "error", err, "rule", m.rule.RuleID) m.returnChan <- response{ ConditionResponse: bo, @@ -190,6 +196,9 @@ func (r *ruleEngine) RunRulesScoped(ctx context.Context, ruleSets []RuleSet, sco } r.logger.Info("added scopes to condition context", "scopes", scopes, "conditionContext", conditionContext) } + carrier := propagation.MapCarrier{} + otel.GetTextMapPropagator().Inject(ctx, carrier) + r.logger.Info("inject span info", "carrier", carrier) ctx, cancelFunc := context.WithCancel(ctx) taggingRules, otherRules, mapRuleSets := r.filterRules(ruleSets, selectors...) @@ -264,10 +273,13 @@ func (r *ruleEngine) RunRulesScoped(ctx context.Context, ruleSets []RuleSet, sco }() for _, rule := range otherRules { + newContext := ruleContext.Copy() + newContext.RuleID = rule.rule.RuleID wg.Add(1) rule.returnChan = ret - rule.ctx = ruleContext + rule.conditionContext = newContext rule.scope = scopes + rule.carrier = carrier r.ruleProcessing <- rule } r.logger.V(5).Info("All rules added buffer, waiting for engine to complete", "size", len(otherRules)) @@ -450,11 +462,9 @@ func processRule(ctx context.Context, rule Rule, ruleCtx ConditionContext, log l ctx, span := tracing.StartNewSpan( ctx, "process-rule", attribute.Key("rule").String(rule.RuleID)) defer span.End() - newContext := ruleCtx.Copy() - newContext.RuleID = rule.RuleID // Here is what a worker should run when getting a rule. // For now, lets not fan out the running of conditions. - return rule.When.Evaluate(ctx, log, newContext) + return rule.When.Evaluate(ctx, log, ruleCtx) }