Skip to content

Commit

Permalink
Merge pull request #2033 from heuermh/storage-level
Browse files Browse the repository at this point in the history
[ADAM-2032] Add StorageLevel as an optional parameter to loadPairedFastq
  • Loading branch information
pauldwolfe authored Sep 6, 2018
2 parents 3542342 + 57916da commit f1cc2cf
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 3 deletions.
15 changes: 12 additions & 3 deletions adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ import org.apache.spark.SparkContext
import org.apache.spark.rdd.MetricsContext._
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{ Dataset, SparkSession, SQLContext }
import org.apache.spark.storage.StorageLevel
import org.bdgenomics.adam.converters._
import org.bdgenomics.adam.instrumentation.Timers._
import org.bdgenomics.adam.io._
Expand Down Expand Up @@ -2018,6 +2019,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* Globs/directories are supported.
* @param optRecordGroup The optional record group name to associate to the unaligned alignment
* records. Defaults to None.
* @param persistLevel An optional persistance level to set. If this level is
* set, then reads will be cached (at the given persistance) level as part of
* validation. Defaults to StorageLevel.MEMORY_ONLY.
* @param stringency The validation stringency to use when validating paired FASTQ format.
* Defaults to ValidationStringency.STRICT.
* @return Returns an unaligned AlignmentRecordRDD.
Expand All @@ -2026,6 +2030,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
pathName1: String,
pathName2: String,
optRecordGroup: Option[String] = None,
persistLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
stringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = LoadPairedFastq.time {

val reads1 = loadUnpairedFastq(
Expand All @@ -2043,8 +2048,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log

stringency match {
case ValidationStringency.STRICT | ValidationStringency.LENIENT =>
val count1 = reads1.rdd.cache.count
val count2 = reads2.rdd.cache.count
val count1 = persistLevel.fold(reads1.rdd.count)(reads1.rdd.persist(_).count)
val count2 = persistLevel.fold(reads2.rdd.count)(reads2.rdd.persist(_).count)

if (count1 != count2) {
val msg = s"Fastq 1 ($pathName1) has $count1 reads, fastq 2 ($pathName2) has $count2 reads"
Expand Down Expand Up @@ -2568,6 +2573,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
* Globs/directories are supported.
* @param optRecordGroup The optional record group name to associate to the unaligned alignment
* records. Defaults to None.
* @param persistLevel An optional persistance level to set. If this level is
* set, then reads will be cached (at the given persistance) level as part of
* validation. Defaults to StorageLevel.MEMORY_ONLY.
* @param stringency The validation stringency to use when validating paired FASTQ format.
* Defaults to ValidationStringency.STRICT.
* @return Returns a FragmentRDD containing the paired reads grouped by
Expand All @@ -2577,9 +2585,10 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log
pathName1: String,
pathName2: String,
optRecordGroup: Option[String] = None,
persistLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY),
stringency: ValidationStringency = ValidationStringency.STRICT): FragmentRDD = LoadPairedFastqFragments.time {

loadPairedFastq(pathName1, pathName2, optRecordGroup, stringency).toFragments()
loadPairedFastq(pathName1, pathName2, optRecordGroup, persistLevel, stringency).toFragments()
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -639,13 +639,34 @@ class ADAMContextSuite extends ADAMFunSuite {
assert(reads.rdd.count === 6)
}

sparkTest("load paired fastq") {
val pathR1 = testFile("proper_pairs_1.fq")
val pathR2 = testFile("proper_pairs_2.fq")
val reads = sc.loadPairedFastq(pathR1, pathR2)
assert(reads.rdd.count === 6)
}

sparkTest("load paired fastq without cache") {
val pathR1 = testFile("proper_pairs_1.fq")
val pathR2 = testFile("proper_pairs_2.fq")
val reads = sc.loadPairedFastq(pathR1, pathR2, persistLevel = None)
assert(reads.rdd.count === 6)
}

sparkTest("load paired fastq as fragments") {
val pathR1 = testFile("proper_pairs_1.fq")
val pathR2 = testFile("proper_pairs_2.fq")
val fragments = sc.loadPairedFastqAsFragments(pathR1, pathR2)
assert(fragments.rdd.count === 3)
}

sparkTest("load paired fastq as fragments without cache") {
val pathR1 = testFile("proper_pairs_1.fq")
val pathR2 = testFile("proper_pairs_2.fq")
val fragments = sc.loadPairedFastqAsFragments(pathR1, pathR2, persistLevel = None)
assert(fragments.rdd.count === 3)
}

sparkTest("load HTSJDK sequence dictionary") {
val path = testFile("hs37d5.dict")
val sequences = sc.loadSequenceDictionary(path)
Expand Down

0 comments on commit f1cc2cf

Please # to comment.