Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

EnqueueIn and EnqueueWithOptions with At #84

Open
samacs opened this issue Mar 21, 2018 · 0 comments
Open

EnqueueIn and EnqueueWithOptions with At #84

samacs opened this issue Mar 21, 2018 · 0 comments

Comments

@samacs
Copy link

samacs commented Mar 21, 2018

Can somebody please explain to me how these functions work for scheduling a job?

Sorry if this doesn't qualify as issue, but there are no steps describing this kind of operations.

I have a list of job handlers that implement jobFunc, so initially I just enqueue different tasks to run periodically:

// Start jobs 
jobs := buildJobsList(c.String("jobs"))
if len(jobs) == 0 {
	jobs = Jobs
}
for _, jobName := range jobs {
	var job Consumer
	var params interface{}
	var operation string
	switch jobName {
	case "amazon-inventory":
		job = NewAmazonInventory()
		consumers["list-inventory-supply-by-next-token"] = job
	}
	if err := job.Initialize(errors); err != nil {
		return err
	}
	consumers[operation] = job
	workers.Enqueue(redisConfig.Queue, operation, params)
}
workers.Process(redisConfig.Queue, handleJob, redisConfig.Concurrency)
go func() {
	for err := range errors {
		message.Error("an error occurred: %v", err)
	}
}()
go workers.StatsServer(8080)
workers.Run()

The handleJob function just call the Handle function in the Job interface with the corresponding params:

// handle job function
args, err := msg.Args().Map()
if err != nil {
	message.Error(err.Error())
}
operation := args["Operation"].(string)
job := consumers[operation]
job.Handle(msg)

Now, different jobs need to run periodically at different times, so I need to schedule the next function at somewhere in the future, it might be a couple of seconds, five minutes, etc. For instance, the job configured above could have this implementation in the Handle function:

params := &AmazonInventoryParams{}
if err := json.Unmarshal([]byte(msg.Args().ToJson()), &params); err != nil {
	j.errors <- err
	return
}
nextPerform := time.Now().UTC().Add(time.Second * 2)
workers.EnqueueAt(redisConfig.Queue, params.Operation, nextPerform, params)

The problem is that I don't know exactly how to schedule the next perform time either using EnqueueAt or EnqueueIn functions.

This is the output after running the code above, and as you guys can see, the jobs are scheduled to run after fifteen seconds:

workers: 2018/03/20 18:24:40.093782 processing queue osom with 10 workers.
workers: 2018/03/20 18:24:40.105344 osom JID-e00d508285d6241a2173fa79 start
workers: 2018/03/20 18:24:40.105413 osom JID-e00d508285d6241a2173fa79 args: {"MarketplaceID":"A1AM78C64UM0Y8","NextToken":"","Operation":"list-inventory-supply","QueryStartDateTime":"2018-03-21T00:24:40.086486644Z"}
workers: 2018/03/20 18:24:40.106765 osom JID-e00d508285d6241a2173fa79 done: 1.417078ms
workers: 2018/03/20 18:24:55.106479 osom JID-37e2b86789ba8734ab3073bc start
workers: 2018/03/20 18:24:55.106531 osom JID-37e2b86789ba8734ab3073bc args: {"MarketplaceID":"A1AM78C64UM0Y8","NextToken":"","Operation":"list-inventory-supply","QueryStartDateTime":"2018-03-21T00:24:40.086486644Z"}
workers: 2018/03/20 18:24:55.113703 osom JID-37e2b86789ba8734ab3073bc done: 7.221384ms
workers: 2018/03/20 18:25:10.114261 osom JID-b1b8c1d52badaad8ede83e0e start
workers: 2018/03/20 18:25:10.114299 osom JID-b1b8c1d52badaad8ede83e0e args: {"MarketplaceID":"A1AM78C64UM0Y8","NextToken":"","Operation":"list-inventory-supply","QueryStartDateTime":"2018-03-21T00:24:40.086486644Z"}
workers: 2018/03/20 18:25:10.116502 osom JID-b1b8c1d52badaad8ede83e0e done: 2.238734ms
^Cworkers: 2018/03/20 18:25:21.669964 quitting queue osom (waiting for 0 / 10 workers).

I already tried scheduling the jobs using the EnqueueIn function using something like float64((time.Seconds * 2).Nanoseconds()), but I think EnqueueAt would be more convenient and easy to figure out.

Thanks in advance and I think it would be really nice if we can have a Wiki describing this kind of scenarios.

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant