-
Notifications
You must be signed in to change notification settings - Fork 266
Initial implementation of checkpoint state #315
Conversation
"work with CheckpointState" in { | ||
// TODO complete this | ||
implicit val fixedBatcher : Batcher = new MillisecondBatcher(30*60*1000L) | ||
val path:String = "/Users/amokashi/work/apache" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Get a temp path here than /Users....
case Some(b) => { | ||
// checkpoint batches until b (exclusive) | ||
BatchID.range(startBatch, b.prev) | ||
.foreach(t => versionedStore.succeedVersion(fixedBatcher.earliestTimeOf(t).as[Long])); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shouldn't this be done in the succeed method below? We have not actually suceeded yet.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We do this here to mark it as running. This avoids multiple runs of same batch at the same time.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The issue is the whole process can crash on mesos, and then you leave behind the claim that it was finished. This actually happens, so it is not failsafe.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
With the change we discussed yesterday, the new run of the same thing would prefer startdate over hdfs checkpoint.
Initial implementation of checkpoint state
Clean up comments
Still need to complete tests