-
Notifications
You must be signed in to change notification settings - Fork 311
New issue
Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? # to your account
[ADAM-2023] Implemented Duplicate Marking algorithm in Spark SQL #2045
[ADAM-2023] Implemented Duplicate Marking algorithm in Spark SQL #2045
Conversation
With these changes, I see as much as a 30% speedup for large datasts.
Can one of the admins verify this patch? |
Jenkins, test this please |
Jenkins, add to whitelist |
* @param alignmentRecords GenomicRDD of alignment records | ||
* @return RDD of alignment records with the "duplicateRead" field marked appropriately | ||
*/ | ||
def apply(alignmentRecords: AlignmentRecordRDD): RDD[AlignmentRecord] = { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should refactor the caller here so that we don't force the conversions between RDD and Dataset. In other words, only perform the conversions if alignmentRecords
has been realized as RDDBoundAlignmentRecordRDD
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see, so there should be another apply
method which takes Dataset[AlignmentRecord]
and this apply
method's signature should be changed to take RDDBoundAlignmentRecordRDD
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or do you mean that this method should just be changed to take Dataset[AlignmentRecord]
. In this case would it also make sense to refactor so that the other apply
method also just takes Dataset[Fragment]
and do a similar refactoring of the caller?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Does this look right? We would want the caller to be
abstract class AlignmentRecordRDD ... {
def markDuplicates(): AlignmentRecordRDD = {
replaceRdd(MarkDuplicates(this.rdd, this.recordGroups))
}
}
case class DatasetBoundAlignmentRecordRDD ... {
override def markDuplicates(): AlignmentRecordRDD = {
replaceDataset(MarkDuplicates(this.dataset, this.recordGroups))
}
}
abstract class FragmentRDD ... {
def markDuplicates(): FragmentRDD = {
replaceRdd(MarkDuplicates(this.rdd, this.recordGroups))
}
}
case class DatasetBoundFragmentRDD ... {
override def markDuplicates(): FragmentRDD = {
replaceDataset(MarkDuplicates(this.dataset, this.recordGroups))
}
}
so then the apply
methods might be
import org.bdgenomics.formats.avro.{
AlignmentRecord,
Fragment
}
import org.bdgenomics.adam.sql.{
AlignmentRecord => AlignmentRecordProduct,
Fragment => FragmentProduct
}
object MarkDuplicates {
def apply(RDD[AlignmentRecord], RecordGroupDictionary): RDD[AlignmentRecord] = { }
def apply(RDD[Fragment], RecordGroupDictionary): RDD[Fragment] = { }
def apply(Dataset[AlignmentRecordProduct], RecordGroupDictionary): Dataset[AlignmentRecordProduct] = { }
def apply(Dataset[FragmentProduct], RecordGroupDictionary): Dataset[FragmentProduct] = { }
}
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Alright sounds good. Because of type erasure there will need to be a single apply[T]
for RDD
and for Dataset
for fragments/alignment-records.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah, good point. Is RecordGroupDictionary
actually required for the fragment cases? If not, that could be the discriminator.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I believe that it is necessary for both so can't be used as a distinguisher.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would it be better to take RDDBoundAlignmentRecordRDD
and DatasetBoundAlignmentRecordRDD
so that the conversion from RDD
to Dataset
that is already implemented in RDDBoundAlignmentRecordRDD
is not duplicated within
MarkDuplicates
?
* Case class which merely extends the Fragment Schema by a single column "duplicateFragment" so that | ||
* a DataFrame with fragments having been marked as duplicates can be cast back into a DataSet | ||
*/ | ||
private case class FragmentDuplicateSchema(readName: Option[String] = None, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you think it would be useful to add duplicateFragment
or similarly named field to the Avro schema definition for Fragment
, or is this flag only useful in a temporary context?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure. I was considering it while developing but then I realized that it really only seems useful in a temporary context since each of the alignment records contained within the fragment also contain a duplicate flag so the schema has redundancies.
Test PASSed. |
Test PASSed. |
Test PASSed. |
… for Dataset duplicate marking path
Test PASSed. |
Because the Spark SQL implementation of duplciate marking was not scaling well to cluster runs, this version convers many of the groupBy followed by Join back to the original dataset operations with Window functions. This hopefully reduces the amount of data that has to be shuffled when running on a cluster and will make the performance benefits scale.
Test PASSed. |
Closing as WontFix. Performance testing these changes were inconclusive. Feel free to create a new PR after rebasing against git head. |
With these changes, I see as much as a 30% speedup for large datasets. Fixes #2023