diff --git a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala index 47f0006c30..533847524c 100644 --- a/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala +++ b/adam-core/src/main/scala/org/bdgenomics/adam/rdd/ADAMContext.scala @@ -43,6 +43,7 @@ import org.apache.spark.SparkContext import org.apache.spark.rdd.MetricsContext._ import org.apache.spark.rdd.RDD import org.apache.spark.sql.{ Dataset, SparkSession, SQLContext } +import org.apache.spark.storage.StorageLevel import org.bdgenomics.adam.converters._ import org.bdgenomics.adam.instrumentation.Timers._ import org.bdgenomics.adam.io._ @@ -2018,6 +2019,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * Globs/directories are supported. * @param optRecordGroup The optional record group name to associate to the unaligned alignment * records. Defaults to None. + * @param persistLevel An optional persistance level to set. If this level is + * set, then reads will be cached (at the given persistance) level as part of + * validation. Defaults to StorageLevel.MEMORY_ONLY. * @param stringency The validation stringency to use when validating paired FASTQ format. * Defaults to ValidationStringency.STRICT. * @return Returns an unaligned AlignmentRecordRDD. @@ -2026,6 +2030,7 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log pathName1: String, pathName2: String, optRecordGroup: Option[String] = None, + persistLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), stringency: ValidationStringency = ValidationStringency.STRICT): AlignmentRecordRDD = LoadPairedFastq.time { val reads1 = loadUnpairedFastq( @@ -2043,8 +2048,8 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log stringency match { case ValidationStringency.STRICT | ValidationStringency.LENIENT => - val count1 = reads1.rdd.cache.count - val count2 = reads2.rdd.cache.count + val count1 = persistLevel.fold(reads1.rdd.count)(reads1.rdd.persist(_).count) + val count2 = persistLevel.fold(reads2.rdd.count)(reads2.rdd.persist(_).count) if (count1 != count2) { val msg = s"Fastq 1 ($pathName1) has $count1 reads, fastq 2 ($pathName2) has $count2 reads" @@ -2568,6 +2573,9 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log * Globs/directories are supported. * @param optRecordGroup The optional record group name to associate to the unaligned alignment * records. Defaults to None. + * @param persistLevel An optional persistance level to set. If this level is + * set, then reads will be cached (at the given persistance) level as part of + * validation. Defaults to StorageLevel.MEMORY_ONLY. * @param stringency The validation stringency to use when validating paired FASTQ format. * Defaults to ValidationStringency.STRICT. * @return Returns a FragmentRDD containing the paired reads grouped by @@ -2577,9 +2585,10 @@ class ADAMContext(@transient val sc: SparkContext) extends Serializable with Log pathName1: String, pathName2: String, optRecordGroup: Option[String] = None, + persistLevel: Option[StorageLevel] = Some(StorageLevel.MEMORY_ONLY), stringency: ValidationStringency = ValidationStringency.STRICT): FragmentRDD = LoadPairedFastqFragments.time { - loadPairedFastq(pathName1, pathName2, optRecordGroup, stringency).toFragments() + loadPairedFastq(pathName1, pathName2, optRecordGroup, persistLevel, stringency).toFragments() } /** diff --git a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala index 051545c27b..257a2b6cbf 100644 --- a/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala +++ b/adam-core/src/test/scala/org/bdgenomics/adam/rdd/ADAMContextSuite.scala @@ -639,6 +639,20 @@ class ADAMContextSuite extends ADAMFunSuite { assert(reads.rdd.count === 6) } + sparkTest("load paired fastq") { + val pathR1 = testFile("proper_pairs_1.fq") + val pathR2 = testFile("proper_pairs_2.fq") + val reads = sc.loadPairedFastq(pathR1, pathR2) + assert(reads.rdd.count === 6) + } + + sparkTest("load paired fastq without cache") { + val pathR1 = testFile("proper_pairs_1.fq") + val pathR2 = testFile("proper_pairs_2.fq") + val reads = sc.loadPairedFastq(pathR1, pathR2, persistLevel = None) + assert(reads.rdd.count === 6) + } + sparkTest("load paired fastq as fragments") { val pathR1 = testFile("proper_pairs_1.fq") val pathR2 = testFile("proper_pairs_2.fq") @@ -646,6 +660,13 @@ class ADAMContextSuite extends ADAMFunSuite { assert(fragments.rdd.count === 3) } + sparkTest("load paired fastq as fragments without cache") { + val pathR1 = testFile("proper_pairs_1.fq") + val pathR2 = testFile("proper_pairs_2.fq") + val fragments = sc.loadPairedFastqAsFragments(pathR1, pathR2, persistLevel = None) + assert(fragments.rdd.count === 3) + } + sparkTest("load HTSJDK sequence dictionary") { val path = testFile("hs37d5.dict") val sequences = sc.loadSequenceDictionary(path)