diff --git a/agreementbot/agreementbot.go b/agreementbot/agreementbot.go index 41690ca87..bac6cfe0a 100644 --- a/agreementbot/agreementbot.go +++ b/agreementbot/agreementbot.go @@ -1047,6 +1047,7 @@ func (w *AgreementBotWorker) syncOnInit() error { neededBCInstances := make(map[string]map[string]map[string]bool) bPolicyCheckingMap := make(map[string]bool) + bPolicyMessageMap := make(map[string]events.Message) for _, ag := range agreements { @@ -1132,11 +1133,7 @@ func (w *AgreementBotWorker) syncOnInit() error { glog.V(3).Infof(AWlogString(fmt.Sprintf("business policy %v from agreement %v is not found from exchange.", pol.Header.Name, ag.CurrentAgreementId))) // Need to cancel the agreement policyDeletedMsg := events.NewPolicyDeletedMessage(events.DELETED_POLICY, "", pol.Header.Name, exchange.GetOrg(pol.Header.Name), ag.Policy) - pdCmd := NewPolicyDeletedCommand(*policyDeletedMsg) - // Queue the command to the relevant protocol handler for further processing. - if w.consumerPH.Get(agp).AcceptCommand(pdCmd) { - w.consumerPH.Get(agp).HandlePolicyDeleted(pdCmd, w.consumerPH.Get(agp)) - } + bPolicyMessageMap[pol.Header.Name] = policyDeletedMsg } else { for polId, exchPol := range exchPols { bPol := exchPol.GetBusinessPolicy() @@ -1148,15 +1145,11 @@ func (w *AgreementBotWorker) syncOnInit() error { glog.Errorf(fmt.Sprintf("Error trying to marshal internal business policy %v error: %v", exPolicy, err)) } else { // If business policy has been changed during a restart, handle it - glog.V(3).Infof(AWlogString(fmt.Sprintf("re-evaluate agreement %v for policy %v", ag.CurrentAgreementId, pol.Header.Name))) + glog.V(3).Infof(AWlogString(fmt.Sprintf("need re-evaluate the agreement %v for policy %v", ag.CurrentAgreementId, pol.Header.Name))) // Call HandlePolicyChanged() function directly because at this point, the agbot is not set to ready to accept messages policyChangedMsg := events.NewPolicyChangedMessage(events.CHANGED_POLICY, "", pol.Header.Name, ag.Org, exPolicyString, pol) - pcCmd := NewPolicyChangedCommand(*policyChangedMsg) - // Queue the command to the relevant protocol handler for further processing. - if w.consumerPH.Get(agp).AcceptCommand(pcCmd) { - w.consumerPH.Get(agp).HandlePolicyChanged(pcCmd, w.consumerPH.Get(agp)) - } + bPolicyMessageMap[pol.Header.Name] = policyChangedMsg } } } @@ -1188,6 +1181,14 @@ func (w *AgreementBotWorker) syncOnInit() error { } } + glog.V(3).Infof(AWlogString(fmt.Sprintf("checking if business policies have be changed: %v", bPolicyMessageMap))) + for policyName, msg := range bPolicyMessageMap { + if glog.V(5) { + glog.Infof(AWlogString(fmt.Sprintf("handling policy message %v for business policy %v", msg, policyName))) + } + w.queuePolicyCommand(msg) + } + } else { return errors.New(AWlogString(fmt.Sprintf("error searching database: %v", err))) } @@ -1197,6 +1198,31 @@ func (w *AgreementBotWorker) syncOnInit() error { return nil } +func (w *AgreementBotWorker) queuePolicyCommand(message events.Message) { + switch message.(type) { + case *events.PolicyChangedMessage: + pcm, _ := message.(*events.PolicyChangedMessage) + pcCmd := NewPolicyChangedCommand(*pcm) + w.Commands <- pcCmd + // Queue the command to the relevant protocol handler for further processing. + // if w.consumerPH.Get(agp).AcceptCommand(pcCmd) { + // w.consumerPH.Get(agp).HandlePolicyChanged(pcCmd, w.consumerPH.Get(agp)) + // } + + case *events.PolicyDeletedMessage: + pdm, _ := message.(*events.PolicyDeletedMessage) + pdCmd := NewPolicyDeletedCommand(*pdm) + w.Commands <- pdCmd + // Queue the command to the relevant protocol handler for further processing. + // if w.consumerPH.Get(agp).AcceptCommand(pdCmd) { + // w.consumerPH.Get(agp).HandlePolicyDeleted(pdCmd, w.consumerPH.Get(agp)) + // } + + default: //nothing + } + +} + func (w *AgreementBotWorker) recordConsumerAgreementState(agreementId string, pol *policy.Policy, org string, state string) error { workload := pol.Workloads[0].WorkloadURL