Skip to content

Commit

Permalink
update for testing
Browse files Browse the repository at this point in the history
Signed-off-by: Le Zhang <zhangl@us.ibm.com>
  • Loading branch information
LiilyZhang committed Oct 9, 2024
1 parent d01651a commit 72f0838
Showing 1 changed file with 37 additions and 11 deletions.
48 changes: 37 additions & 11 deletions agreementbot/agreementbot.go
Original file line number Diff line number Diff line change
Expand Up @@ -1047,6 +1047,7 @@ func (w *AgreementBotWorker) syncOnInit() error {

neededBCInstances := make(map[string]map[string]map[string]bool)
bPolicyCheckingMap := make(map[string]bool)
bPolicyMessageMap := make(map[string]events.Message)

for _, ag := range agreements {

Expand Down Expand Up @@ -1132,11 +1133,7 @@ func (w *AgreementBotWorker) syncOnInit() error {
glog.V(3).Infof(AWlogString(fmt.Sprintf("business policy %v from agreement %v is not found from exchange.", pol.Header.Name, ag.CurrentAgreementId)))
// Need to cancel the agreement
policyDeletedMsg := events.NewPolicyDeletedMessage(events.DELETED_POLICY, "", pol.Header.Name, exchange.GetOrg(pol.Header.Name), ag.Policy)
pdCmd := NewPolicyDeletedCommand(*policyDeletedMsg)
// Queue the command to the relevant protocol handler for further processing.
if w.consumerPH.Get(agp).AcceptCommand(pdCmd) {
w.consumerPH.Get(agp).HandlePolicyDeleted(pdCmd, w.consumerPH.Get(agp))
}
bPolicyMessageMap[pol.Header.Name] = policyDeletedMsg
} else {
for polId, exchPol := range exchPols {
bPol := exchPol.GetBusinessPolicy()
Expand All @@ -1148,15 +1145,11 @@ func (w *AgreementBotWorker) syncOnInit() error {
glog.Errorf(fmt.Sprintf("Error trying to marshal internal business policy %v error: %v", exPolicy, err))
} else {
// If business policy has been changed during a restart, handle it
glog.V(3).Infof(AWlogString(fmt.Sprintf("re-evaluate agreement %v for policy %v", ag.CurrentAgreementId, pol.Header.Name)))
glog.V(3).Infof(AWlogString(fmt.Sprintf("need re-evaluate the agreement %v for policy %v", ag.CurrentAgreementId, pol.Header.Name)))

// Call HandlePolicyChanged() function directly because at this point, the agbot is not set to ready to accept messages
policyChangedMsg := events.NewPolicyChangedMessage(events.CHANGED_POLICY, "", pol.Header.Name, ag.Org, exPolicyString, pol)
pcCmd := NewPolicyChangedCommand(*policyChangedMsg)
// Queue the command to the relevant protocol handler for further processing.
if w.consumerPH.Get(agp).AcceptCommand(pcCmd) {
w.consumerPH.Get(agp).HandlePolicyChanged(pcCmd, w.consumerPH.Get(agp))
}
bPolicyMessageMap[pol.Header.Name] = policyChangedMsg
}
}
}
Expand Down Expand Up @@ -1188,6 +1181,14 @@ func (w *AgreementBotWorker) syncOnInit() error {
}
}

glog.V(3).Infof(AWlogString(fmt.Sprintf("checking if business policies have be changed: %v", bPolicyMessageMap)))
for policyName, msg := range bPolicyMessageMap {
if glog.V(5) {
glog.Infof(AWlogString(fmt.Sprintf("handling policy message %v for business policy %v", msg, policyName)))
}
w.queuePolicyCommand(msg)
}

} else {
return errors.New(AWlogString(fmt.Sprintf("error searching database: %v", err)))
}
Expand All @@ -1197,6 +1198,31 @@ func (w *AgreementBotWorker) syncOnInit() error {
return nil
}

func (w *AgreementBotWorker) queuePolicyCommand(message events.Message) {
switch message.(type) {
case *events.PolicyChangedMessage:
pcm, _ := message.(*events.PolicyChangedMessage)
pcCmd := NewPolicyChangedCommand(*pcm)
w.Commands <- pcCmd
// Queue the command to the relevant protocol handler for further processing.
// if w.consumerPH.Get(agp).AcceptCommand(pcCmd) {
// w.consumerPH.Get(agp).HandlePolicyChanged(pcCmd, w.consumerPH.Get(agp))
// }

case *events.PolicyDeletedMessage:
pdm, _ := message.(*events.PolicyDeletedMessage)
pdCmd := NewPolicyDeletedCommand(*pdm)
w.Commands <- pdCmd
// Queue the command to the relevant protocol handler for further processing.
// if w.consumerPH.Get(agp).AcceptCommand(pdCmd) {
// w.consumerPH.Get(agp).HandlePolicyDeleted(pdCmd, w.consumerPH.Get(agp))
// }

default: //nothing
}

}

func (w *AgreementBotWorker) recordConsumerAgreementState(agreementId string, pol *policy.Policy, org string, state string) error {

workload := pol.Workloads[0].WorkloadURL
Expand Down

0 comments on commit 72f0838

Please # to comment.