-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
JobUpdateTask cleanups #650
Conversation
Optional<Job> getJob = this.jobRepository.findById(request.getId()); | ||
if (getJob.isEmpty()) { | ||
// FIXME: if getJob.isEmpty then constructing this error message will always throw an error... | ||
throw new NoSuchElementException( | ||
"Attempted to stop nonexistent job with id: " + getJob.get().getId()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a bug, I'll raise it in a separate issue/PR, didn't want to change it here because it should probably have a hotfix instead of waiting on this refactoring PR to get through. There are two or three instances of the same problem in this file.
The intention here was probably to use request.getId()
for the exception message. getJob.get()
will throw NoSuchElementException
by definition. @mrzzy
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch. It should be request.getId()
instead.
@@ -73,147 +74,131 @@ public JobUpdateTask( | |||
this.currentJob = currentJob; | |||
this.jobManager = jobManager; | |||
this.jobUpdateTimeoutSeconds = jobUpdateTimeoutSeconds; | |||
this.runnerType = jobManager.getRunnerType().toString(); | |||
} | |||
|
|||
@Override | |||
public Job call() { | |||
ExecutorService executorService = Executors.newSingleThreadExecutor(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@zhilingc I'm trying to understand why this is used—JobUpdateTask
is a Callable
and its instances are all dispatched on separate threads by JobCoordinatorService
, it seems redundant to have the tasks then use another thread again where they invoke a job manager (that will [usually] do network I/O). If anything, it would seem that futures should be at the layer of the job manager instead. call()
submits and then blocks on them before returning.
One more question for you is about the parameters of helper methods like startJob
and updateJob
: most of them except the job are instance state of the JobUpdateTask
itself, and they were able to be final
, so it seems like there's no need for passing them around through arguments. I removed them in the process of working out what call()
actually needed to touch. Am I missing anything, or were they just a vestige of some refactoring in your initial work?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
After reading through the code I'm not sure what my past self was thinking tbqh :( You're right, its completely redundant.
As for the second point, you're right. I must've missed it.
public static Collection<JobStatus> getTerminalState() { | ||
return TERMINAL_STATE; | ||
public static Set<JobStatus> getTerminalStates() { | ||
return TERMINAL_STATES; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Got a little carried away, JobStatus
changes are further than I meant to go for now… but I started touching JobCoordinatorService
for filter(job -> !job.hasTerminated())
and I ended up here…
These semantically are/should be Sets I think. I'm not sure I see value in these static getters wrapping constants, after this PR their only remaining use is in a few tests. Considered removing them, but thought I'd leave it to discuss first.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense.
/assign @pradithya |
Thanks for the PR, I'll have a look tomorrow! |
No complaints here, thanks so much for the PR, and sorry for the mess :/ |
/test test-end-to-end |
/test test-end-to-end-batch |
/lgtm |
/test test-end-to-end-batch |
/hold |
Logic is a bit complicated here and trimming excess noise starts to help make it quicker to grok. We were doing the equivalent of Set#equals by hand.
The helper methods like startJob and updateJob had parameters that are instance state, so there seemed to be no reason to pass around arguments for them.
And factor out things that don't differ between its test cases. JobCoordinatorService unmarshals protos to model types eagerly as they come off the wire, so we're left dealing with models everywhere else.
a3b0452
to
ded7ca5
Compare
/test test-end-to-end-batch |
Just FYI we are considering enabling the e2e tests again by default. The added benefit of not having to wait for them in small PRs isn't worth the manual effort for big PRs. |
Okay, so I did the spec renaming for consistency as @mrzzy suggested, and when I stepped back and looked at it, all the instance variables of I tried removing the redundant I'm leaning more toward my instinct in the original comment/question for @zhilingc being the right answer eventually: the asynchrony (and possibility of failure) should be expressed at the |
/hold cancel |
The job manager soon after converts things back to proto again, but that's natural, these are I/O boundaries of the system. The overhead of these conversions is the price we pay if we don't want wire format in business logic layer, which is probably worth it. It ought to be less than the actual de/serialization anyway, and this stuff isn't a hot path/tight loop 🤞 |
Oh, finally, let me know if you'd like me to try to squash a few commits. I think you're typically squashing merges anyway so it'd not be time well spent in that case. |
|
||
log.info("Updating feature set status"); | ||
updateFeatureSetStatuses(jobUpdateTasks); | ||
executorService.shutdown(); | ||
} | ||
|
||
// TODO: make this more efficient | ||
private void updateFeatureSetStatuses(List<JobUpdateTask> jobUpdateTasks) { | ||
Set<FeatureSet> ready = new HashSet<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not really related to this PR per say, but would it be possible for us to remove this updateFeatureSetStatuses
call and use the real job state?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you're asking: rather than cache feature set statuses in bulk, could status be returned on-demand by checking job state for one given feature set, when requested? I guess the team will have more context on the design.
As it stands, I'd have to get my head more into functional consideration of what it's doing beyond the mechanical refactoring I've done to this class, but it seems like there could be more to look at later:
-
In the
Poll()
loop we are:- At the beginning, doing
getJob
for the source/store combo of all feature sets that are subscribed to - Then, doing the
startOrUpdate
of tasks for all those, and waiting on all those tasks - Throwing away the results of waiting (updated
Job
references), because thestartOrUpdate
returnsvoid
- Doing
getJob
again for each of the tasks, in theupdateFeatureSetStatuses
It seems like this second round of DB lookups could be avoided by keeping and using the
Job
s we just got back. Unless I'm missing something, if it's trying to avoid a race condition or something. - At the beginning, doing
-
getJob
doesjobRepository.findBySourceIdAndStoreNameOrderByLastUpdatedDesc
returning all matching records, and then takes the first (most recent) one client-side instead of having SQL do it. And this happens in loops, 2x per polling loop as noted in 1.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Tracking your comments here: #664
and then takes the first (most recent) one client-side instead of having SQL do it.
That's a very low hanging fruit.
this.store = store; | ||
this.currentJob = currentJob; | ||
this.jobManager = jobManager; | ||
this.jobUpdateTimeoutSeconds = jobUpdateTimeoutSeconds; | ||
this.runnerName = jobManager.getRunnerType().toString(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The runnerName
as we have it above refers to the runnerType
right? Should we deduplicate that from the configuration (we could do that at the configuration level as well)?
feast:
jobs:
polling_interval_milliseconds: 60000
job_update_timeout_seconds: 240
active_runner: my_direct_runner
runners:
- name: my_direct_runner
type: DirectRunner
options: {}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The idea with the configuration was to provide a more forward compatible configuration schema, but we could just as easily use type there instead of name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, from your example config, jobManager.getRunnerType().toString()
will be "DirectRunner"
(whereas jobManager.getRunnerType().name()
would be "DIRECT"
).
In JobUpdateTask
this value is being used only for log messages.
Is there anything further I should update on this one? |
/lgtm |
/approve |
[APPROVALNOTIFIER] This PR is APPROVED This pull-request has been approved by: ches, woop The full list of commands accepted by this bot can be found here. The pull request process is described here
Needs approval from an approver in each of these files:
Approvers can indicate their approval by writing |
This seems reasonable, although I have only superficially looked at job management. I do think we need to look at this in a bit more detail soon though, especially if folks are bringing their own runners. |
Clears a FIXME left from #650, get() uses that will always fail. Also fixes a bunch of == comparisons on strings, reference equality is not what you want there.
* Fix Optional#get() and string comparison bugs in JobService Clears a FIXME left from #650, get() uses that will always fail. Also fixes a bunch of == comparisons on strings, reference equality is not what you want there. * Hide Maven transfer progress, for CI It makes trying to read output in GitHub Actions horrible. Seeing failures in red is kind of nice, so `--no-transfer-progress` keeps colors unlike `--batch-mode`.
What this PR does / why we need it:
Cleanup, this is purely refactoring aiming to make code easier to read, understand, and modify.
I've been doing some work—and reviews of work—around this code, and found it a bit hard to follow and work with. This is my attempt at improving that.
This PR is meant to be reviewed commit-by-commit, it should hopefully make sense quickly that way. Tests are untouched until the final commit and should pass at every step.
I have a question or two though that I'll ask inline, so I'm marking this WIP, I may update a bit more based on the answers, especially docstrings where warranted.
Which issue(s) this PR fixes:
No issue
Does this PR introduce a user-facing change?: