Skip to content

Commit

Permalink
feat: Add global job timeout (#512)
Browse files Browse the repository at this point in the history
* test: Clean up allowed resource providers between test runs

* feat: Add active job offers filter query

* feat: Add job timeout field and options

* feat: Add terminal JobTimedOut agreement state

* feat: Add cancelled job offers filter query

* chore: Rename solver dealStats to jobStats

* feat: Add cancelled job metrics

* feat: Add cancelExpiredJobs

* chore: Update comments

* chore: Rename reportDealMetrics to reportJobMetrics

* feat: Add BoolPointer lang helper

* feat: Combine cancelled filter queries

The cancelled filter query now includes the include cancelled, which was
our mechanism for requesting all job offers. The cancelled filter query
includes all when nil, or cancelled or not when true or false.

* feat: Reject results and file uploads for timed out jobs

* chore: Improve results checking log message
  • Loading branch information
bgins authored Feb 18, 2025
1 parent cd39bfc commit 1c9997b
Show file tree
Hide file tree
Showing 15 changed files with 380 additions and 73 deletions.
3 changes: 3 additions & 0 deletions cmd/lilypad/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,9 @@ func runJob(cmd *cobra.Command, options jobcreator.JobCreatorOptions, network st
case "JobOfferCancelled":
desc = "Job cancelled..."
emoji = "😭"
case "JobTimedOut":
desc = "Job timed out..."
emoji = "⏱️"
default:
desc = st
emoji = "🌟"
Expand Down
7 changes: 6 additions & 1 deletion pkg/data/enums.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ var AgreementState = []string{
"TimeoutJudgeResults",
"TimeoutMediateResults",
"JobOfferCancelled",
"JobTimedOut",
}

// PaymentReason corresponds to PaymentReason in TypeScript
Expand Down Expand Up @@ -84,7 +85,11 @@ func IsActiveAgreementState(itemType uint8) bool {
}

func IsTerminalAgreementState(itemType uint8) bool {
return itemType == GetAgreementStateIndex("JobOfferCancelled") || itemType == GetAgreementStateIndex("ResultsAccepted") || itemType == GetAgreementStateIndex("MediationAccepted") || itemType == GetAgreementStateIndex("MediationRejected")
return itemType == GetAgreementStateIndex("JobOfferCancelled") ||
itemType == GetAgreementStateIndex("JobTimedOut") ||
itemType == GetAgreementStateIndex("ResultsAccepted") ||
itemType == GetAgreementStateIndex("MediationAccepted") ||
itemType == GetAgreementStateIndex("MediationRejected")
}

// GetPaymentReason corresponds to getPaymentReason in TypeScript
Expand Down
7 changes: 7 additions & 0 deletions pkg/jobcreator/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,13 @@ waitloop:
return nil, fmt.Errorf("job was cancelled")
}

// Check if our job timed out
if finalJobOffer.State == data.GetAgreementStateIndex("JobTimedOut") {
span.SetStatus(codes.Error, "job timed out")
span.RecordError(err)
return nil, fmt.Errorf("job timed out")
}

span.AddEvent("get_result.start")
result, err := jobCreatorService.GetResult(finalJobOffer.DealID)
if err != nil {
Expand Down
22 changes: 16 additions & 6 deletions pkg/options/solver.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
package options

import (
"fmt"

"github.com/lilypad-tech/lilypad/pkg/solver"
"github.com/lilypad-tech/lilypad/pkg/system"
"github.com/spf13/cobra"
)

func NewSolverOptions() solver.SolverOptions {
options := solver.SolverOptions{
Server: GetDefaultServerOptions(),
Store: GetDefaultStoreOptions(),
Web3: GetDefaultWeb3Options(),
Services: GetDefaultServicesOptions(),
Telemetry: GetDefaultTelemetryOptions(),
Metrics: GetDefaultMetricsOptions(),
Server: GetDefaultServerOptions(),
Store: GetDefaultStoreOptions(),
Web3: GetDefaultWeb3Options(),
Services: GetDefaultServicesOptions(),
Telemetry: GetDefaultTelemetryOptions(),
Metrics: GetDefaultMetricsOptions(),
JobTimeoutSeconds: GetDefaultServeOptionInt("JOB_TIMEOUT_SECONDS", 600), // 10 minutes
}
options.Web3.Service = system.SolverService
return options
Expand All @@ -26,6 +29,10 @@ func AddSolverCliFlags(cmd *cobra.Command, options *solver.SolverOptions) {
AddServicesCliFlags(cmd, &options.Services)
AddTelemetryCliFlags(cmd, &options.Telemetry)
AddMetricsCliFlags(cmd, &options.Metrics)
cmd.PersistentFlags().IntVar(
&options.JobTimeoutSeconds, "job-timeout-seconds", options.JobTimeoutSeconds,
`The global timeout for jobs in seconds (JOB_TIMEOUT_SECONDS)`,
)
}

func CheckSolverOptions(options solver.SolverOptions) error {
Expand All @@ -49,6 +56,9 @@ func CheckSolverOptions(options solver.SolverOptions) error {
if err != nil {
return err
}
if options.JobTimeoutSeconds <= 0 {
return fmt.Errorf("JOB_TIMEOUT_SECONDS must be greater than zero")
}
return nil
}

Expand Down
80 changes: 77 additions & 3 deletions pkg/solver/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,7 +282,15 @@ func (controller *SolverController) solve(ctx context.Context) error {
ctx, span := controller.tracer.Start(ctx, "solve")
defer span.End()

// find out which deals we can make from matching the offers
// Remove expired job offers
err := controller.cancelExpiredJobs(ctx)
if err != nil {
span.SetStatus(codes.Error, "remove expired offers failed")
span.RecordError(err)
return err
}

// Match job offers with resource offers to make deals
deals, err := matcher.GetMatchingDeals(ctx, controller.store, controller.updateJobOfferState, controller.log, controller.tracer, controller.meter)
if err != nil {
span.SetStatus(codes.Error, "get matching deals failed")
Expand All @@ -294,7 +302,7 @@ func (controller *SolverController) solve(ctx context.Context) error {
Value: attribute.StringSliceValue(data.GetDealIDs(deals)),
})

// loop over each of the deals add add them to the store and emit events
// Add deals to the store, update offer states, and notify network
span.AddEvent("add_deals.start")
for _, deal := range deals {
_, err := controller.addDeal(ctx, deal)
Expand All @@ -304,14 +312,21 @@ func (controller *SolverController) solve(ctx context.Context) error {
}
span.AddEvent("add_deals.done")

// Report deal state metrics
span.AddEvent("report_deal_metrics.start")
storedDeals, err := controller.store.GetDealsAll()
if err != nil {
span.SetStatus(codes.Error, "get all deals failed")
span.RecordError(err)
return err
}
err = reportDealMetrics(ctx, controller.meter, storedDeals)
jobOffers, err := controller.store.GetJobOffers(store.GetJobOffersQuery{Cancelled: system.BoolPointer(true)})
if err != nil {
span.SetStatus(codes.Error, "get cancelled job offers failed")
span.RecordError(err)
return err
}
err = reportJobMetrics(ctx, controller.meter, storedDeals, jobOffers)
if err != nil {
span.SetStatus(codes.Error, "report deal metrics failed")
span.RecordError(err)
Expand All @@ -321,6 +336,65 @@ func (controller *SolverController) solve(ctx context.Context) error {
return nil
}

func (controller *SolverController) cancelExpiredJobs(ctx context.Context) error {
ctx, span := controller.tracer.Start(ctx, "cancel_expired_jobs")
defer span.End()

// Get active job offers
span.AddEvent("db.get_job_offers.start")
jobOffers, err := controller.store.GetJobOffers(store.GetJobOffersQuery{
Active: true,
})
if err != nil {
controller.log.Error("get job offers failed", err)
span.SetStatus(codes.Error, "get job offers failed")
span.RecordError(err)
return err
}
span.AddEvent("db.get_job_offers.done")

// Check active job offers, and cancel expired offers
// and associated resource offers and deals
span.AddEvent("expire_jobs.start")
expiredOffers := []string{}
expiredDeals := []string{}
for _, jobOffer := range jobOffers {
now := time.Now().UnixMilli()
if now-int64(jobOffer.JobOffer.CreatedAt) > int64(controller.options.JobTimeoutSeconds*1000) {
if jobOffer.DealID == "" {
// Cancel expired job offers
_, err := controller.updateJobOfferState(jobOffer.ID, jobOffer.DealID, data.GetAgreementStateIndex("JobTimedOut"))
if err != nil {
controller.log.Error("update expired job offer state failed", err)
span.SetStatus(codes.Error, "update expired job offer state failed")
span.RecordError(err)
}
} else {
// Cancel expired job offers, resource offers, and deals
_, err := controller.updateDealState(jobOffer.DealID, data.GetAgreementStateIndex("JobTimedOut"))
if err != nil {
controller.log.Error("update expired deal state failed", err)
span.SetStatus(codes.Error, "update expired deal state failed")
span.RecordError(err)
}
expiredDeals = append(expiredDeals, jobOffer.DealID)
}
expiredOffers = append(expiredOffers, jobOffer.ID)
}
}
span.SetAttributes(attribute.KeyValue{
Key: "expired_job_offers",
Value: attribute.StringSliceValue(expiredOffers),
})
span.SetAttributes(attribute.KeyValue{
Key: "expired_deals",
Value: attribute.StringSliceValue(expiredDeals),
})
span.AddEvent("expire_jobs.end")

return nil
}

/*
*
*
Expand Down
1 change: 1 addition & 0 deletions pkg/solver/matcher/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func GetMatchingDeals(
span.AddEvent("db.get_job_offers.start")
jobOffers, err := db.GetJobOffers(store.GetJobOffersQuery{
NotMatched: true,
Cancelled: system.BoolPointer(false),
OrderOldestFirst: true,
})
if err != nil {
Expand Down
Loading

0 comments on commit 1c9997b

Please # to comment.