Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Issue open-horizon#4156 - Bug: On restart, agbot doesn't update a nod… #4157

Merged
merged 1 commit into from
Oct 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 61 additions & 3 deletions agreementbot/agreementbot.go
Original file line number Diff line number Diff line change
Expand Up @@ -1046,6 +1046,8 @@ func (w *AgreementBotWorker) syncOnInit() error {
if agreements, err := w.db.FindAgreements([]persistence.AFilter{persistence.UnarchivedAFilter()}, agp); err == nil {

neededBCInstances := make(map[string]map[string]map[string]bool)
bPolicyCheckingMap := make(map[string]bool)
bPolicyMessageMap := make(map[string]events.Message)

for _, ag := range agreements {

Expand All @@ -1061,11 +1063,13 @@ func (w *AgreementBotWorker) syncOnInit() error {
}
neededBCInstances[bcOrg][bcType][bcName] = true

var pol *policy.Policy

// If the agreement has received a reply then we just need to make sure that the policy manager's agreement counts
// are correct. Even for already timedout agreements, the governance process will cleanup old and outdated agreements,
// so we don't need to do anything here.
if ag.AgreementCreationTime != 0 {
if pol, err := policy.DemarshalPolicy(ag.Policy); err != nil {
if pol, err = policy.DemarshalPolicy(ag.Policy); err != nil {
glog.Errorf(AWlogString(fmt.Sprintf("unable to demarshal policy for agreement %v, error %v", ag.CurrentAgreementId, err)))
} else if existingPol := w.pm.GetPolicy(ag.Org, pol.Header.Name); existingPol == nil {
glog.Errorf(AWlogString(fmt.Sprintf("agreement %v has a policy %v that doesn't exist anymore", ag.CurrentAgreementId, pol.Header.Name)))
Expand Down Expand Up @@ -1119,6 +1123,39 @@ func (w *AgreementBotWorker) syncOnInit() error {
glog.V(3).Infof(AWlogString(fmt.Sprintf("added agreement %v to policy agreement counter.", ag.CurrentAgreementId)))
}

// After checking the policy, add it in to a map. In Each for loop which iterate the agreements, checking if current policy inside agreement has been handled or not
if pol != nil && !bPolicyCheckingMap[pol.Header.Name] {
glog.V(3).Infof(AWlogString(fmt.Sprintf("checking policy against exchange for agreement %v.", ag.CurrentAgreementId)))
if exchPols, err := exchange.GetBusinessPolicies(w, exchange.GetOrg(pol.Header.Name), exchange.GetId(pol.Header.Name)); err != nil {
glog.Errorf(AWlogString(fmt.Sprintf("error getting business policies from exchange for org: %v, policy name: %v error: %v", ag.Org, pol.Header.Name, err)))
} else if len(exchPols) == 0 {
glog.V(3).Infof(AWlogString(fmt.Sprintf("business policy %v from agreement %v is not found from exchange.", pol.Header.Name, ag.CurrentAgreementId)))
// Need to cancel the agreement
policyDeletedMsg := events.NewPolicyDeletedMessage(events.DELETED_POLICY, "", pol.Header.Name, exchange.GetOrg(pol.Header.Name), ag.Policy)
bPolicyMessageMap[pol.Header.Name] = policyDeletedMsg
} else {
for polId, exchPol := range exchPols {
bPol := exchPol.GetBusinessPolicy()
if exPolicy, err := bPol.GenPolicyFromBusinessPolicy(polId); err != nil {
glog.Errorf(AWlogString(fmt.Sprintf("error generating internal business policies for org: %v, policy name: %v from %v, error: %v", ag.Org, pol.Header.Name, exchPol.String(), err)))
} else if exPolicy == nil {
glog.Errorf(AWlogString(fmt.Sprintf("the generated internal business policies is nil for org: %v, policy name: %v from %v", ag.Org, pol.Header.Name, exchPol.String())))
} else if exPolicyString, err := policy.MarshalPolicy(exPolicy); err != nil {
glog.Errorf(fmt.Sprintf("Error trying to marshal internal business policy %v error: %v", exPolicy, err))
} else {
// If business policy has been changed during a restart, handle it
glog.V(3).Infof(AWlogString(fmt.Sprintf("need re-evaluate the agreement %v for policy %v", ag.CurrentAgreementId, pol.Header.Name)))

policyChangedMsg := events.NewPolicyChangedMessage(events.CHANGED_POLICY, "", pol.Header.Name, ag.Org, exPolicyString, pol)
bPolicyMessageMap[pol.Header.Name] = policyChangedMsg
}
}
}
bPolicyCheckingMap[pol.Header.Name] = true
} else {
glog.V(3).Infof(AWlogString(fmt.Sprintf("skip checking policy %v for agreement %v", pol, ag.CurrentAgreementId)))
}

// This state should never occur, but could if there was an error along the way. It means that a DB record
// was created for this agreement but the record was never updated with the creation time, which is supposed to occur
// immediately following creation of the record. Further, if this were to occur, then the exchange should not have been
Expand All @@ -1142,6 +1179,11 @@ func (w *AgreementBotWorker) syncOnInit() error {
}
}

glog.V(3).Infof(AWlogString(fmt.Sprintf("policies that might have changed: %v", bPolicyMessageMap)))
for _, msg := range bPolicyMessageMap {
w.queuePolicyCommand(msg)
}

} else {
return errors.New(AWlogString(fmt.Sprintf("error searching database: %v", err)))
}
Expand All @@ -1151,6 +1193,23 @@ func (w *AgreementBotWorker) syncOnInit() error {
return nil
}

func (w *AgreementBotWorker) queuePolicyCommand(message events.Message) {
switch message.(type) {
case *events.PolicyChangedMessage:
pcm, _ := message.(*events.PolicyChangedMessage)
pcCmd := NewPolicyChangedCommand(*pcm)
w.Commands <- pcCmd

case *events.PolicyDeletedMessage:
pdm, _ := message.(*events.PolicyDeletedMessage)
pdCmd := NewPolicyDeletedCommand(*pdm)
w.Commands <- pdCmd

default: //nothing
}

}

func (w *AgreementBotWorker) recordConsumerAgreementState(agreementId string, pol *policy.Policy, org string, state string) error {

workload := pol.Workloads[0].WorkloadURL
Expand All @@ -1159,7 +1218,7 @@ func (w *AgreementBotWorker) recordConsumerAgreementState(agreementId string, po

as := new(exchange.PutAgbotAgreementState)
as.Service = exchange.WorkloadAgreement{
Org: exchange.GetOrg(pol.PatternId),
Org: org,
Pattern: exchange.GetId(pol.PatternId),
URL: workload,
}
Expand Down Expand Up @@ -1478,7 +1537,6 @@ func (w *AgreementBotWorker) databaseHeartBeat() int {

// Ask the database to check for stale partitions and move them into our partition if one is found.
func (w *AgreementBotWorker) stalePartitions() int {

// Dont try to grab a stale partition if we are unable to heartbeat.
now := uint64(time.Now().Unix())
if hb, err := w.db.GetHeartbeat(); err != nil {
Expand Down
4 changes: 3 additions & 1 deletion agreementbot/persistence/postgresql/partition.go
Original file line number Diff line number Diff line change
Expand Up @@ -262,7 +262,9 @@ func (db *AgbotPostgresqlDB) MovePartition(timeout uint64) (bool, error) {
return false, err
} else if _, err := tx.Exec(db.GetWorkloadUsagePartitionMove(fromPartition, db.PrimaryPartition())); err != nil {
return false, err
} else if _, err := tx.Exec(db.GetSecretPartitionMove(fromPartition, db.PrimaryPartition())); err != nil {
} else if _, err := tx.Exec(db.GetSecretPartitionMovePattern(fromPartition, db.PrimaryPartition())); err != nil {
return false, err
} else if _, err := tx.Exec(db.GetSecretPartitionMovePolicy(fromPartition, db.PrimaryPartition())); err != nil {
return false, err
} else if _, err := tx.Exec(db.GetAgreementPartitionTableDrop(fromPartition)); err != nil {
return false, err
Expand Down
25 changes: 20 additions & 5 deletions agreementbot/persistence/postgresql/secrets.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,18 @@ const SECRET_UPDATE_TIME_PATTERN = `UPDATE "secrets_pattern_ SET last_update_che
const SECRET_EXISTS_UPDATE_TIME_PATTERN = `UPDATE "secrets_pattern_ SET last_update_check = $1, secret_exists = false WHERE secret_org = $2 AND secret_name = $3 AND secret_exists = $4;`
const SECRET_DELETE_PATTERN = `DELETE FROM "secrets_pattern_ WHERE secret_org = $1 AND secret_name = $2 AND pattern_org = $3 AND pattern_name = $4;`

const SECRET_MOVE = `WITH moved_rows AS (
const SECRET_MOVE_POLICY = `WITH moved_rows AS (
DELETE FROM "secrets_policy_ a
RETURNING a.secret_org, a.secret_name, a.policy_org, a.policy_name, a.last_update_check, a.secret_exists
)
INSERT INTO "secrets_policy_ (secret_org, secret_name, policy_org, policy_name, last_update_check, secret_exists, partition) SELECT secret_org, secret_name, policy_org, policy_name, last_update_check, secret_exists, 'partition_name' FROM moved_rows WHERE secret_org <> policy_org ON CONFLICT DO NOTHING;
`

const SECRET_MOVE_PATTERN = `WITH moved_rows AS (
DELETE FROM "secrets_pattern_ a
RETURNING a.secret_org, a.secret_name, a.pattern_org, a.pattern_name, a.last_update_check
RETURNING a.secret_org, a.secret_name, a.pattern_org, a.pattern_name, a.last_update_check, a.secret_exists
)
INSERT INTO "secrets_pattern_ (secret_org, secret_name, pattern_org, pattern_name, last_update_check, partition) SELECT secret_org, secret_name, pattern_org, pattern_name, last_update_check, 'partition_name' FROM moved_rows WHERE secret_org <> pattern_org ON CONFLICT DO NOTHING;
INSERT INTO "secrets_pattern_ (secret_org, secret_name, pattern_org, pattern_name, last_update_check, secret_exists, partition) SELECT secret_org, secret_name, pattern_org, pattern_name, last_update_check, secret_exists, 'partition_name' FROM moved_rows WHERE secret_org <> pattern_org ON CONFLICT DO NOTHING;
`

const SECRET_DROP_PARTITION_POLICY = `DROP TABLE "secrets_policy_;`
Expand Down Expand Up @@ -224,13 +231,21 @@ func (db *AgbotPostgresqlDB) GetDeleteSecretPattern() string {
}

// The partition table name replacement scheme used in this function is slightly different from the others above.
func (db *AgbotPostgresqlDB) GetSecretPartitionMove(fromPartition string, toPartition string) string {
sql := strings.Replace(SECRET_MOVE, SECRET_TABLE_NAME_ROOT_PATTERN, db.GetSecretPartitionTableNamePattern(toPartition), 2)
func (db *AgbotPostgresqlDB) GetSecretPartitionMovePattern(fromPartition string, toPartition string) string {
sql := strings.Replace(SECRET_MOVE_PATTERN, SECRET_TABLE_NAME_ROOT_PATTERN, db.GetSecretPartitionTableNamePattern(toPartition), 2)
sql = strings.Replace(sql, db.GetSecretPartitionTableNamePattern(toPartition), db.GetSecretPartitionTableNamePattern(fromPartition), 1)
sql = strings.Replace(sql, SECRET_PARTITION_FILLIN, toPartition, 1)
return sql
}

// The partition table name replacement scheme used in this function is slightly different from the others above.
func (db *AgbotPostgresqlDB) GetSecretPartitionMovePolicy(fromPartition string, toPartition string) string {
sql := strings.Replace(SECRET_MOVE_POLICY, SECRET_TABLE_NAME_ROOT_POLICY, db.GetSecretPartitionTableNamePolicy(toPartition), 2)
sql = strings.Replace(sql, db.GetSecretPartitionTableNamePolicy(toPartition), db.GetSecretPartitionTableNamePolicy(fromPartition), 1)
sql = strings.Replace(sql, SECRET_PARTITION_FILLIN, toPartition, 1)
return sql
}

func (db *AgbotPostgresqlDB) GetManagedPolicySecretNames(policyOrg, policyName string) ([]string, error) {
sql := ""
if policyOrg == "" {
Expand Down
Loading