From 1c596739df45492edcf4261331541af58bbdfb6d Mon Sep 17 00:00:00 2001 From: lynnjo Date: Wed, 6 Dec 2023 09:39:05 -0500 Subject: [PATCH 01/14] intial changes, not at all finished --- .../phgv2/cli/AlignAssemblies.kt | 66 ++++++++++++++++++- 1 file changed, 64 insertions(+), 2 deletions(-) diff --git a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt index 91c13842..6b21c2ca 100644 --- a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt +++ b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt @@ -90,7 +90,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { val totalThreads by option(help = "Number of threads available. These will be split among the alginments that are run in parallel") .int() - .default(1) + .default(0) val inParallel by option( help = "Number of assemblies to simultaneously process. " + @@ -99,7 +99,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { "Consider this memory factor when providing values for the total-threads and in-parallel." ) .int() - .default(1) + .default(0) val refMaxAlignCov by option(help = "Anchorwave proali parameter R, indicating reference genome maximum alignment coverage.") .int() @@ -119,6 +119,12 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { override fun run() { + val threadsAndRuns = calculatedNumThreadsAndRuns(totalThreads, inParallel, assemblies) + val freeMemory = Runtime.getRuntime().freeMemory() + val processors = Runtime.getRuntime().availableProcessors() + + println("freeMemory: $freeMemory, processors: $processors") + // create CDS fasta from reference and gff3 file val cdsFasta = "$outputDir/ref.cds.fasta" @@ -154,6 +160,62 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { } + fun calculatedNumThreadsAndRuns(totalThreads:Int, inParallel:Int, assemblies:String): Pair { + // If totalThreads or inParallel are 0, it means the user did not specify them + // In that case we calculate these values based on the number of processors available + // and the amount of free memory available. + val freeMemory = Runtime.getRuntime().freeMemory() + val processors = if (totalThreads > 0) totalThreads else Runtime.getRuntime().availableProcessors() + + // If the user did not specify the number of threads to use, we will use all available + // processors. If the user did specify the number of threads, we will use that number + // of threads, but we will not exceed the number of processors available. + val totalThreadsToUse = if (totalThreads > 0) { + if (totalThreads > processors) { + myLogger.warn("The number of threads specified ($totalThreads) exceeds the number of processors available ($processors). Using $processors threads.") + processors + } else { + totalThreads + } + } else { + processors + } + + // Per Baoxing's chart, it takes just over 20G/thread to run anchorwave + // THe number of threads that can be run simultaneously is limited by the amount of + // free memory available. We will use 21G/thread as the memory requirement, and + // calculate the number of threads that can be run simultaneously based on the + // amount of free memory available. + + val totalConcurrentThreads = (freeMemory / 21e9).toInt() + + // Now that we know how many threads can be run concurrently, we need to + // determine how many parallel alignments to do, and how many threads each + // one gets. If the user specified the number of parallel alignments to do, + // we will use that number. + + val numAssemblies = File(assemblies).readLines().filter { it.isNotBlank() }.size + // This needs to return a Pair where the first value is the number of runs, the seconds is threadsPerRun + val runsAndThreads = if (inParallel > 0) { + if (inParallel > totalConcurrentThreads) { + myLogger.warn("The number of parallel alignments specified ($inParallel) exceeds the number of threads that can be run concurrently ($totalConcurrentThreads). Using $totalConcurrentThreads threads.") + Pair(totalConcurrentThreads, 1) + } else { + Pair(inParallel, totalConcurrentThreads / inParallel) + } + } else { + // Need to do calcuations here to determine the best value + maximizeRunsAndThreads(totalConcurrentThreads, numAssemblies) + } + return runsAndThreads + } + + fun maximizeRunsAndThreads(totalConcurrentThreads:Int, numAssemblies:Int): Pair { + // This needs to return a Pair where the first value is the number of runs, the seconds is threadsPerRun + + // TODO - determine how this works! + return Pair(1, 1) // this is fake! + } private fun createCDSfromRefData(refFasta: String, gffFile: String, cdsFasta: String, outputDir: String): Boolean { // val command = "anchorwave gff2seq -r ${refFasta} -i ${gffFile} -o ${cdsFasta} " From 6e11f9c19a38faaffed4d3e21d569e296fe8db71 Mon Sep 17 00:00:00 2001 From: lynnjo Date: Thu, 7 Dec 2023 07:45:29 -0500 Subject: [PATCH 02/14] code addded that calculated number of threads/runs --- .../phgv2/cli/AlignAssemblies.kt | 63 +++++++++++++++++-- .../phgv2/cli/AlignAssembliesTest.kt | 56 +++++++++++++++++ 2 files changed, 115 insertions(+), 4 deletions(-) diff --git a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt index 6b21c2ca..6b03b120 100644 --- a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt +++ b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt @@ -210,12 +210,67 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { return runsAndThreads } - fun maximizeRunsAndThreads(totalConcurrentThreads:Int, numAssemblies:Int): Pair { + fun maximizeRunsAndThreads(totalConcurrentThreads:Int, totalAssemblies:Int): Pair { // This needs to return a Pair where the first value is the number of runs, the seconds is threadsPerRun - - // TODO - determine how this works! - return Pair(1, 1) // this is fake! + // THe maximum number of current runs is the total number of threads available, + // which would be each assembly getting a single thread. + // We believe it is more efficient to run fewer assemblies at a time, each with more threads. + // What this code does is calculate the middle of this. For example: If there are 20 assemblies + // and 10 threads, our options are: + // 1 run: 10 threads + // 2 runs: 5 threads + // 3 runs: 3 threads + // 4 runs: 2 threads + // 5 runs: 2 threads + // 6 or higher runs: 1 thread + // This code will return 3 runs, 3 threads each. + val threadsToAssembliesMap = mutableMapOf() + + // This loop says if each assembly gets "numThreads", how many concurrent runs can we do? + for (numThreads in 1..totalConcurrentThreads) { + + val numRuns = totalConcurrentThreads/numThreads + val currentThreads = threadsToAssembliesMap[numRuns] + // if currentThreads is not null and is > than numThreads, ignore. + // otherwise, replace this entry + if (currentThreads == null || currentThreads < numThreads) { + threadsToAssembliesMap[numRuns] = numThreads + } + } + // we should now have a map with the highest number of threads for each number of runs + //At this point, we pick + // 1. if only 1 entry, use that + // 2. if are only 2 entries, use the one with the highest number of threads + // 3. if there are > 3 entries, drop the one with the lowest number of runs and the one with the highest number of runs. + // Repeat until there are 2 entries or fewer entries left. + + + // 1. if only 1 entry, use that + if (threadsToAssembliesMap.size == 1) { + val entry = threadsToAssembliesMap.entries.first() + println("Using ${entry.value} threads for ${entry.key} runs") + return Pair(entry.key, entry.value) + } else if (threadsToAssembliesMap.size == 2) { + // 2. if are only 2 entries, use the one with the highest number of threads + val entry = threadsToAssembliesMap.entries.maxByOrNull { it.value } + println("Using ${entry!!.value} threads for ${entry.key} runs") + return Pair(entry.key, entry.value) + } else { + // 3. if there are > 3 entries, drop the one with the lowest number of runs and the one with the highest number of runs. + // Repeat then there are 2 entries or fewer entries left. + while (threadsToAssembliesMap.size > 2) { + val minEntry = threadsToAssembliesMap.entries.minByOrNull { it.key } + val maxEntry = threadsToAssembliesMap.entries.maxByOrNull { it.key } + threadsToAssembliesMap.remove(minEntry!!.key) + threadsToAssembliesMap.remove(maxEntry!!.key) + } + // 2. if are only 2 entries, use the one with the highest number of threads + val entry = threadsToAssembliesMap.entries.maxByOrNull { it.value } + println("Using ${entry!!.value} threads for ${entry.key} runs") + return Pair(entry.key, entry.value) + } } + private fun createCDSfromRefData(refFasta: String, gffFile: String, cdsFasta: String, outputDir: String): Boolean { // val command = "anchorwave gff2seq -r ${refFasta} -i ${gffFile} -o ${cdsFasta} " diff --git a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt index 011f2856..f66795e1 100644 --- a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt +++ b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt @@ -12,6 +12,62 @@ import kotlin.test.assertTrue @ExtendWith(TestExtension::class) class AlignAssembliesTest { + @Test + fun testNumThreadsAndRuns() { + // ok - this is now working. It is giving me 3/3 + // The numbers with 10 threads and 20 total assemblies are: + // 1 run: 10 threads + // 2 runs: 5 threads + // 3 runs: 3 threads + // 4 runs: 2 threads + // 5 runs: 2 threads + // 6 or higher runs: 1 thread + val threadsToAssembliesMap = mutableMapOf() + val totalConcurrentThreads = 10 + // val totalAssemblies = 20 + // val assembliesMax = 10 + // This loop says if each assembly gets "numThreads", how many concurrent runs can we do? + for (numThreads in 1..totalConcurrentThreads) { + //val numRuns = assembliesMax / numThreads + val numRuns = totalConcurrentThreads/numThreads + val currentThreads = threadsToAssembliesMap[numRuns] + // if currentThreads is not null and is > than numThreads, ignore. + // otherwise, replace this entry + if (currentThreads == null || currentThreads < numThreads) { + threadsToAssembliesMap[numRuns] = numThreads + } + } + // we should now have a map with the highest number of threads for each number of runs + //At this point, we pick + // 1. if only 1 entry, use that + // 2. if are only 2 entries, use the one with the highest number of threads + // 3. if there are > 3 entries, drop the one with the lowest number of runs and the one with the highest number of runs. + // Repeat then there are 2 entries or fewer entries left. + + // 1. if only 1 entry, use that + if (threadsToAssembliesMap.size == 1) { + val entry = threadsToAssembliesMap.entries.first() + println("Using ${entry.value} threads for ${entry.key} runs") + } else if (threadsToAssembliesMap.size == 2) { + // 2. if are only 2 entries, use the one with the highest number of threads + val entry = threadsToAssembliesMap.entries.maxByOrNull { it.value } + println("Using ${entry!!.value} threads for ${entry.key} runs") + } else { + // 3. if there are > 3 entries, drop the one with the lowest number of runs and the one with the highest number of runs. + // Repeat then there are 2 entries or fewer entries left. + while (threadsToAssembliesMap.size > 2) { + val minEntry = threadsToAssembliesMap.entries.minByOrNull { it.key } + val maxEntry = threadsToAssembliesMap.entries.maxByOrNull { it.key } + threadsToAssembliesMap.remove(minEntry!!.key) + threadsToAssembliesMap.remove(maxEntry!!.key) + } + // 2. if are only 2 entries, use the one with the highest number of threads + val entry = threadsToAssembliesMap.entries.maxByOrNull { it.value } + println("Using ${entry!!.value} threads for ${entry.key} runs") + } + + + } @Test fun testCliktParams() { val alignAssemblies = AlignAssemblies() From 0dbf8ff4eaba091d159c07ec1e935be85c7d980e Mon Sep 17 00:00:00 2001 From: lynnjo Date: Thu, 7 Dec 2023 08:47:52 -0500 Subject: [PATCH 03/14] more details to the calculate threads/run function --- .../phgv2/cli/AlignAssemblies.kt | 27 +++++++++++++++---- .../phgv2/cli/AlignAssembliesTest.kt | 4 +++ 2 files changed, 26 insertions(+), 5 deletions(-) diff --git a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt index 6b03b120..c8f29a40 100644 --- a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt +++ b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt @@ -187,31 +187,48 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { // calculate the number of threads that can be run simultaneously based on the // amount of free memory available. - val totalConcurrentThreads = (freeMemory / 21e9).toInt() + val concurrentThreads = (freeMemory / 21e9).toInt() + val totalConcurrentThreads = if (concurrentThreads > totalThreadsToUse) { + totalThreadsToUse + } else { + concurrentThreads + } + // Now that we know how many threads can be run concurrently, we need to // determine how many parallel alignments to do, and how many threads each - // one gets. If the user specified the number of parallel alignments to do, + // one gets. If the user specified the number of parallel alignments to run, // we will use that number. val numAssemblies = File(assemblies).readLines().filter { it.isNotBlank() }.size // This needs to return a Pair where the first value is the number of runs, the seconds is threadsPerRun + //TODO this is not considering the actual number of assemblies in the calculations val runsAndThreads = if (inParallel > 0) { if (inParallel > totalConcurrentThreads) { - myLogger.warn("The number of parallel alignments specified ($inParallel) exceeds the number of threads that can be run concurrently ($totalConcurrentThreads). Using $totalConcurrentThreads threads.") + myLogger.warn("The number of parallel alignments specified ($inParallel) exceeds the number of threads that can be run concurrently ($totalConcurrentThreads). Will run $totalConcurrentThreads concurrent alignments with 1 thread each.") Pair(totalConcurrentThreads, 1) } else { + // InParallel was defined by the user and is <= totalConcurrentThreads + // Number of concurrent alignments is "inParallel", and number of threads-per-alignment becomes + // totalConcurrentThreads/inParallel Pair(inParallel, totalConcurrentThreads / inParallel) } } else { - // Need to do calcuations here to determine the best value + // Need to do calculations here to determine the best values for + // concurrent runs and number of threads per run maximizeRunsAndThreads(totalConcurrentThreads, numAssemblies) } return runsAndThreads } + /** + * This method should only be called if the user did not specify the number of parallel alignments + * + * TODO - this needs to consider the total assemblies in the calculations + * as that makes a different as to how many threads to give it. + */ fun maximizeRunsAndThreads(totalConcurrentThreads:Int, totalAssemblies:Int): Pair { - // This needs to return a Pair where the first value is the number of runs, the seconds is threadsPerRun + // This returns a Pair where the first value is the number of runs, the seconds is threadsPerRun // THe maximum number of current runs is the total number of threads available, // which would be each assembly getting a single thread. // We believe it is more efficient to run fewer assemblies at a time, each with more threads. diff --git a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt index f66795e1..42860c05 100644 --- a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt +++ b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt @@ -22,6 +22,10 @@ class AlignAssembliesTest { // 4 runs: 2 threads // 5 runs: 2 threads // 6 or higher runs: 1 thread + + // BUT .. the calculations do NOT take into account the number of assemblies + // the user wants to align. In AlignAssemblies, this is called whenever the user + // has not specified an inParallel amount. val threadsToAssembliesMap = mutableMapOf() val totalConcurrentThreads = 10 // val totalAssemblies = 20 From c701ef10848e9185e26a20af06d58a5ca7ab3aa0 Mon Sep 17 00:00:00 2001 From: lynnjo Date: Fri, 8 Dec 2023 07:58:22 -0500 Subject: [PATCH 04/14] futher attempts at determining available mem, plus debug --- .../phgv2/cli/AlignAssemblies.kt | 145 ++++++++++++------ .../phgv2/cli/AlignAssembliesTest.kt | 98 ++++++------ 2 files changed, 147 insertions(+), 96 deletions(-) diff --git a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt index c8f29a40..275f0c5d 100644 --- a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt +++ b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt @@ -114,16 +114,17 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { val asmFasta: String, val outputDir: String, val gffFile: String, - val refSamOutFile: String + val refSamOutFile: String, + val maxRuns:Int, + val threadsPerRun:Int ) override fun run() { - val threadsAndRuns = calculatedNumThreadsAndRuns(totalThreads, inParallel, assemblies) - val freeMemory = Runtime.getRuntime().freeMemory() - val processors = Runtime.getRuntime().availableProcessors() - - println("freeMemory: $freeMemory, processors: $processors") + // Returns Pair where the first value is the number of parallel alignments, the second is threadsPerAlignment + val runsAndThreads = calculatedNumThreadsAndRuns(totalThreads, inParallel, assemblies) +// val freeMemory = Runtime.getRuntime().freeMemory() +// val processors = Runtime.getRuntime().availableProcessors() // create CDS fasta from reference and gff3 file val cdsFasta = "$outputDir/ref.cds.fasta" @@ -138,8 +139,9 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { val samOutFile = "${justNameRef}.sam" val refSamOutFile = "${outputDir}/${samOutFile}" + // For minimap2, we will use the number of processors available as the number of threads. val builder = ProcessBuilder( - "conda", "run", "-n", "phgv2-conda", "minimap2", "-x", "splice", "-t", totalThreads.toString(), "-k", "12", + "conda", "run", "-n", "phgv2-conda", "minimap2", "-x", "splice", "-t", runsAndThreads.second.toString(), "-k", "12", "-a", "-p", "0.4", "-N20", referenceFile, cdsFasta, "-o", refSamOutFile ) val redirectError = "$outputDir/minimap2Ref_error.log" @@ -156,7 +158,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { throw IllegalStateException("Error running minimap2 for reference: $error") } - runAnchorWaveMultiThread(referenceFile, assembliesList, cdsFasta, gff, refSamOutFile) + runAnchorWaveMultiThread(referenceFile, assembliesList, cdsFasta, gff, refSamOutFile,runsAndThreads) } @@ -164,8 +166,11 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { // If totalThreads or inParallel are 0, it means the user did not specify them // In that case we calculate these values based on the number of processors available // and the amount of free memory available. - val freeMemory = Runtime.getRuntime().freeMemory() - val processors = if (totalThreads > 0) totalThreads else Runtime.getRuntime().availableProcessors() + //val freeMemory = Runtime.getRuntime().freeMemory() + val freeMemory = Runtime.getRuntime().maxMemory() / 1e9 + val processors = Runtime.getRuntime().availableProcessors() - 2 // leave 2 processors for the OS + + myLogger.info("\nLCJ: freeMemory: $freeMemory, processors: $processors") // If the user did not specify the number of threads to use, we will use all available // processors. If the user did specify the number of threads, we will use that number @@ -181,51 +186,75 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { processors } - // Per Baoxing's chart, it takes just over 20G/thread to run anchorwave + println("LCJ: totalThreadsToUse: $totalThreadsToUse") + // Per Baoxing's chart, it takes just over 20G/thread to run anchorwave on a sample maize genome. // THe number of threads that can be run simultaneously is limited by the amount of // free memory available. We will use 21G/thread as the memory requirement, and // calculate the number of threads that can be run simultaneously based on the // amount of free memory available. - val concurrentThreads = (freeMemory / 21e9).toInt() + val concurrentThreads = (freeMemory/21).toInt() + println("LCJ: concurrentThreads: $concurrentThreads") + if (concurrentThreads < 1) { + // FreeMemory is often low when running CI or junit tests. This allows them to pass. + myLogger.warn("There is not enough free memory to run anchorwave. Free memory: $freeMemory") + myLogger.warn("will attempt to run one alignment at a time, using just a single thread.") + return(Pair(1,1)) + } + + // THis is sometimes + // Calculate how many threads we can run at the same time (based on memory availability) val totalConcurrentThreads = if (concurrentThreads > totalThreadsToUse) { totalThreadsToUse } else { concurrentThreads } - // Now that we know how many threads can be run concurrently, we need to // determine how many parallel alignments to do, and how many threads each // one gets. If the user specified the number of parallel alignments to run, - // we will use that number. + // we will use that number, or the number of assemblies in the list, whichever + // is smaller. val numAssemblies = File(assemblies).readLines().filter { it.isNotBlank() }.size - // This needs to return a Pair where the first value is the number of runs, the seconds is threadsPerRun - //TODO this is not considering the actual number of assemblies in the calculations + // This needs to return a Pair where the first value is the number of alignments, the seconds is threadsPerAlignment val runsAndThreads = if (inParallel > 0) { if (inParallel > totalConcurrentThreads) { + myLogger.warn("The number of parallel alignments specified ($inParallel) exceeds the number of threads that can be run concurrently ($totalConcurrentThreads). Will run $totalConcurrentThreads concurrent alignments with 1 thread each.") - Pair(totalConcurrentThreads, 1) + if (numAssemblies < totalConcurrentThreads){ + // there are fewer assemblies than threads available, so split the number of threads + // evenly among the assemblies. This may leave some threads unused. + Pair(numAssemblies, totalConcurrentThreads/numAssemblies) + } else { + // we have as many concurrent aligments as there are threads available, so each alignment gets 1 thread + Pair(totalConcurrentThreads, 1) + } + } else { // InParallel was defined by the user and is <= totalConcurrentThreads + // THis is adjusted lower if there are fewer assemblies to align than the number of parallel alignments // Number of concurrent alignments is "inParallel", and number of threads-per-alignment becomes // totalConcurrentThreads/inParallel - Pair(inParallel, totalConcurrentThreads / inParallel) + val concurrentAlignments = if (numAssemblies < inParallel) numAssemblies else inParallel + Pair(concurrentAlignments, totalConcurrentThreads / concurrentAlignments) } } else { + // User did not specify the number of alignments they want run in parallel (the inParallel param) // Need to do calculations here to determine the best values for - // concurrent runs and number of threads per run + // concurrent alignments and number of threads per alignment maximizeRunsAndThreads(totalConcurrentThreads, numAssemblies) } + println("LCJ: calculatedNumThreadsAndRuns: returning runsAndThreads values: $runsAndThreads") return runsAndThreads } /** - * This method should only be called if the user did not specify the number of parallel alignments + * This considers the number of threads on the machine, the number of threads user may have + * specified, and the number of assemblies to align. It calculates the a middle value balancing + * number of threads and number of concurrent alignments. * - * TODO - this needs to consider the total assemblies in the calculations - * as that makes a different as to how many threads to give it. + * If the */ fun maximizeRunsAndThreads(totalConcurrentThreads:Int, totalAssemblies:Int): Pair { // This returns a Pair where the first value is the number of runs, the seconds is threadsPerRun @@ -241,17 +270,19 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { // 5 runs: 2 threads // 6 or higher runs: 1 thread // This code will return 3 runs, 3 threads each. - val threadsToAssembliesMap = mutableMapOf() + println("LCJ: maximizeRunsAndThreads: totalConcurrentThreads: $totalConcurrentThreads, totalAssemblies: $totalAssemblies") + val assembliesToThreads = mutableMapOf() // This loop says if each assembly gets "numThreads", how many concurrent runs can we do? for (numThreads in 1..totalConcurrentThreads) { - val numRuns = totalConcurrentThreads/numThreads - val currentThreads = threadsToAssembliesMap[numRuns] + val numRuns =totalConcurrentThreads/numThreads + if (numRuns > totalAssemblies) continue // invalid, number of runs is greater than the number of assemblies + val currentThreads = assembliesToThreads[numRuns] // if currentThreads is not null and is > than numThreads, ignore. // otherwise, replace this entry if (currentThreads == null || currentThreads < numThreads) { - threadsToAssembliesMap[numRuns] = numThreads + assembliesToThreads[numRuns] = numThreads } } // we should now have a map with the highest number of threads for each number of runs @@ -262,28 +293,41 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { // Repeat until there are 2 entries or fewer entries left. + println("LCJ: maximizeRunsAndThreads: we had the chart, it looks like:") + println("numRuns\tthreadsPerRun") + assembliesToThreads.forEach { (numRuns, threadsPerRun) -> + println("$numRuns\t$threadsPerRun") + } + // 1. if only 1 entry, use that - if (threadsToAssembliesMap.size == 1) { - val entry = threadsToAssembliesMap.entries.first() + if (assembliesToThreads.size == 1) { + val entry = assembliesToThreads.entries.first() println("Using ${entry.value} threads for ${entry.key} runs") return Pair(entry.key, entry.value) - } else if (threadsToAssembliesMap.size == 2) { - // 2. if are only 2 entries, use the one with the highest number of threads - val entry = threadsToAssembliesMap.entries.maxByOrNull { it.value } + } else if (assembliesToThreads.size == 2) { + // 2. if are only 2 entries, use the one with the highest number of concurrent assemblies + val entry = assembliesToThreads.entries.maxByOrNull { it.key } println("Using ${entry!!.value} threads for ${entry.key} runs") return Pair(entry.key, entry.value) } else { // 3. if there are > 3 entries, drop the one with the lowest number of runs and the one with the highest number of runs. // Repeat then there are 2 entries or fewer entries left. - while (threadsToAssembliesMap.size > 2) { - val minEntry = threadsToAssembliesMap.entries.minByOrNull { it.key } - val maxEntry = threadsToAssembliesMap.entries.maxByOrNull { it.key } - threadsToAssembliesMap.remove(minEntry!!.key) - threadsToAssembliesMap.remove(maxEntry!!.key) + while (assembliesToThreads.size > 2) { + val minEntry = assembliesToThreads.entries.minByOrNull { it.key } + val maxEntry = assembliesToThreads.entries.maxByOrNull { it.key } + assembliesToThreads.remove(minEntry!!.key) + assembliesToThreads.remove(maxEntry!!.key) } - // 2. if are only 2 entries, use the one with the highest number of threads - val entry = threadsToAssembliesMap.entries.maxByOrNull { it.value } - println("Using ${entry!!.value} threads for ${entry.key} runs") + // 2. if are only 2 entries, use the one with the highest number of concurrent alignments + // But this may mean we are not using all the threads available. If we pick the one with + // the highest number of threads, that could be better utilization of the threads. + // If there are 10 threads and 5 alignments, this will pick 2 alignments with 5 threads each. + + // If we choose based on remaining enty with the largest number of concurrent alignments, + // we will pick 3 alignments with 3 threads each. Because we already dropped 5 alignments + // and 2 threads. + val entry = assembliesToThreads.entries.maxByOrNull { it.key } + println("Running ${entry!!.key} concurrent alignments with ${entry!!.value} threads for each run") return Pair(entry.key, entry.value) } } @@ -337,7 +381,8 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { assemblies: List, cdsFasta: String, gffFile: String, - refSamOutFile: String + refSamOutFile: String, + runsAndThreads: Pair // Pair(concurrentRuns, threadsPerRun) ) { runBlocking { // Setup @@ -349,9 +394,8 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { assemblies.forEach { asmFile -> // Column names were checked for validity above - myLogger.info("Adding: $asmFile for processing") - inputChannel.send(InputChannelData(refFasta, asmFile, outputDir, gffFile, refSamOutFile)) + inputChannel.send(InputChannelData(refFasta, asmFile, outputDir, gffFile, refSamOutFile, runsAndThreads.first, runsAndThreads.second)) } myLogger.info("Done Adding data to the inputChannel:") inputChannel.close() // Need to close this here to show the workers that it is done adding more data @@ -361,7 +405,10 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { // terminate above when there is no more data on the input channel // This calls anchorwave's proali, and minimap2 scripts to process the alignments - val workerThreads = (1..inParallel).map { run -> + // The number of worker threads should equal the number of concurrent runs + // Inside alignAssembly(), each alignment will get threadsAndRuns.second, which is the number + // of threads allocated for each alignment. This data is included in the InputChannelData record + val workerThreads = (1..runsAndThreads.first).map { run -> launch { alignAssembly(inputChannel, cdsFasta, gffFile) } } @@ -372,16 +419,16 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { withContext( Dispatchers.Default ) { - val threadsPerRun = totalThreads / inParallel - println("alignAssembly: totalThreads: $totalThreads, inParallel: $inParallel, threadsPerRun: $threadsPerRun") + //val threadsPerRun = totalThreads / inParallel + //println("alignAssembly: totalThreads: $totalThreads, inParallel: $inParallel, threadsPerRun: $threadsPerRun") + for (assemblyEntry in inputChannel) { // Column names were checked for validity above val justName = File(assemblyEntry.asmFasta).nameWithoutExtension val samShort = "${justName}.sam" val asmSamFile = "${assemblyEntry.outputDir}/${samShort}" - myLogger.info("alignAssembly: asmFileFull: ${assemblyEntry.asmFasta}, outputFile: $asmSamFile") - + myLogger.info("alignAssembly: asmFileFull: ${assemblyEntry.asmFasta}, outputFile: $asmSamFile , threadsPerRun: ${assemblyEntry.threadsPerRun}") val builder = ProcessBuilder( "conda", @@ -392,7 +439,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { "-x", "splice", "-t", - threadsPerRun.toString(), + assemblyEntry.threadsPerRun.toString(), "-k", "12", "-a", @@ -426,7 +473,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { cdsFasta, assemblyEntry.refSamOutFile, asmSamFile, - threadsPerRun.toString() + assemblyEntry.threadsPerRun.toString() ) } } diff --git a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt index 42860c05..14abdfd6 100644 --- a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt +++ b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt @@ -12,9 +12,21 @@ import kotlin.test.assertTrue @ExtendWith(TestExtension::class) class AlignAssembliesTest { + @Test + fun testSystemMemory() { + val s_runtime = Runtime.getRuntime() + val s_totalMemory = s_runtime.totalMemory() + val s_freeMemory = s_runtime.freeMemory() + val s_maxMemory = s_runtime.maxMemory() + val fromTerry = Runtime.getRuntime().maxMemory() / 1048576L + val freeMemGigs = Runtime.getRuntime().maxMemory() / 1e-9 + println("fromTerry: $fromTerry , freeMemGigs: $freeMemGigs") + println("System memory: total: $s_totalMemory, free: $s_freeMemory, max: $s_maxMemory") + } @Test fun testNumThreadsAndRuns() { - // ok - this is now working. It is giving me 3/3 + // Test more assemblies than threads, it will pick the + // middle option that has the highest number of parallel alignments // The numbers with 10 threads and 20 total assemblies are: // 1 run: 10 threads // 2 runs: 5 threads @@ -23,52 +35,44 @@ class AlignAssembliesTest { // 5 runs: 2 threads // 6 or higher runs: 1 thread - // BUT .. the calculations do NOT take into account the number of assemblies - // the user wants to align. In AlignAssemblies, this is called whenever the user - // has not specified an inParallel amount. - val threadsToAssembliesMap = mutableMapOf() - val totalConcurrentThreads = 10 - // val totalAssemblies = 20 - // val assembliesMax = 10 - // This loop says if each assembly gets "numThreads", how many concurrent runs can we do? - for (numThreads in 1..totalConcurrentThreads) { - //val numRuns = assembliesMax / numThreads - val numRuns = totalConcurrentThreads/numThreads - val currentThreads = threadsToAssembliesMap[numRuns] - // if currentThreads is not null and is > than numThreads, ignore. - // otherwise, replace this entry - if (currentThreads == null || currentThreads < numThreads) { - threadsToAssembliesMap[numRuns] = numThreads - } - } - // we should now have a map with the highest number of threads for each number of runs - //At this point, we pick - // 1. if only 1 entry, use that - // 2. if are only 2 entries, use the one with the highest number of threads - // 3. if there are > 3 entries, drop the one with the lowest number of runs and the one with the highest number of runs. - // Repeat then there are 2 entries or fewer entries left. - - // 1. if only 1 entry, use that - if (threadsToAssembliesMap.size == 1) { - val entry = threadsToAssembliesMap.entries.first() - println("Using ${entry.value} threads for ${entry.key} runs") - } else if (threadsToAssembliesMap.size == 2) { - // 2. if are only 2 entries, use the one with the highest number of threads - val entry = threadsToAssembliesMap.entries.maxByOrNull { it.value } - println("Using ${entry!!.value} threads for ${entry.key} runs") - } else { - // 3. if there are > 3 entries, drop the one with the lowest number of runs and the one with the highest number of runs. - // Repeat then there are 2 entries or fewer entries left. - while (threadsToAssembliesMap.size > 2) { - val minEntry = threadsToAssembliesMap.entries.minByOrNull { it.key } - val maxEntry = threadsToAssembliesMap.entries.maxByOrNull { it.key } - threadsToAssembliesMap.remove(minEntry!!.key) - threadsToAssembliesMap.remove(maxEntry!!.key) - } - // 2. if are only 2 entries, use the one with the highest number of threads - val entry = threadsToAssembliesMap.entries.maxByOrNull { it.value } - println("Using ${entry!!.value} threads for ${entry.key} runs") - } + var totalConcurrentThreads = 10 + var totalAssemblies = 20 + + var alignmentsToThreads = AlignAssemblies().maximizeRunsAndThreads(totalConcurrentThreads, totalAssemblies) + println("\nAlignAssembliesTest: alignmentsToThreads: $alignmentsToThreads") + assertEquals(alignmentsToThreads, Pair(3, 3)) + + totalAssemblies = 5 + // This picks 3/3 + // Options will be: + // 1 run: 10 threads + // 2 runs: 5 threads + // 3 runs: 3 threads + // 4 runs: 2 threads + // 5 runs: 2 threads + alignmentsToThreads = AlignAssemblies().maximizeRunsAndThreads(totalConcurrentThreads, totalAssemblies) + println("\nAlignAssembliesTest: alignmentsToThreads: $alignmentsToThreads") + assertEquals(alignmentsToThreads, Pair(3, 3)) + + // test higher number of threads: + // This picks 6/7 + // Options will be: (only number highest number of runs with the same thread count is kept) + // 1 run: 45 threads + // 2 runs: 22 threads + // 3 runs: 15 threads + // 4 runs: 11 threads + // 5 runs: 9 threads + // 6 runs: 7 threads + // 7 runs: 6 threads + // 9 runs: 5 threads + // 11 runs: 4 threads + // 15 runs: 3 threads + // 22 runs: 2 threads + totalConcurrentThreads = 45 + totalAssemblies = 25 + alignmentsToThreads = AlignAssemblies().maximizeRunsAndThreads(totalConcurrentThreads, totalAssemblies) + println("\nAlignAssembliesTest: alignmentsToThreads: $alignmentsToThreads") + assertEquals( Pair(6,7), alignmentsToThreads) } From d85eff68447bbfa19fbd732ef8ddbe9a0fac59a8 Mon Sep 17 00:00:00 2001 From: lynnjo Date: Wed, 13 Dec 2023 13:40:40 -0500 Subject: [PATCH 05/14] add correct call to get system memory --- .../phgv2/cli/AlignAssemblies.kt | 83 ++++++++++++------- .../phgv2/cli/AlignAssembliesTest.kt | 16 ++-- 2 files changed, 61 insertions(+), 38 deletions(-) diff --git a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt index 275f0c5d..457e6db5 100644 --- a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt +++ b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt @@ -12,6 +12,9 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import java.io.File +import java.lang.management.ManagementFactory +import javax.management.MBeanServer +import javax.management.ObjectName /** * This will align assemblies to a reference genome. @@ -88,7 +91,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { } } - val totalThreads by option(help = "Number of threads available. These will be split among the alginments that are run in parallel") + val totalThreads by option(help = "Number of threads available. These will be split among the alignments that are run in parallel") .int() .default(0) @@ -123,8 +126,6 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { // Returns Pair where the first value is the number of parallel alignments, the second is threadsPerAlignment val runsAndThreads = calculatedNumThreadsAndRuns(totalThreads, inParallel, assemblies) -// val freeMemory = Runtime.getRuntime().freeMemory() -// val processors = Runtime.getRuntime().availableProcessors() // create CDS fasta from reference and gff3 file val cdsFasta = "$outputDir/ref.cds.fasta" @@ -162,15 +163,41 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { } + /** + * This method uses a java managment facotry bean to determine the + * amount of system memory on the hosting machine. + */ + fun getSystemMemory():Double { + val mBeanServer: MBeanServer = ManagementFactory.getPlatformMBeanServer() + // attribute is in bytes + val attribute = + mBeanServer.getAttribute(ObjectName("java.lang", "type", "OperatingSystem"), "TotalPhysicalMemorySize") + // translate bytes to gigabutes + val memoryGibi = attribute as Long / 1e9 + // and now to gibibytes, as anchorwave uses GiB + val memoryGigi = memoryGibi * 0.93 + myLogger.info("getSystemMemory: Total system memory: $attribute Bytes, $memoryGibi GB, $memoryGigi GiB") + return memoryGigi + } + + /** + * This function calculates the number of available processors, and the maxium + * system memory avilable for this machine. It returns a Pair where + * the first value is the number of alignments to run in parallel, and the second + * value is the number of threads to use for each alignment. + * + * An algorithm is used that selects a middle value between the number of threads + * and the number of alignments to run in parallel. + * + */ fun calculatedNumThreadsAndRuns(totalThreads:Int, inParallel:Int, assemblies:String): Pair { // If totalThreads or inParallel are 0, it means the user did not specify them // In that case we calculate these values based on the number of processors available // and the amount of free memory available. - //val freeMemory = Runtime.getRuntime().freeMemory() - val freeMemory = Runtime.getRuntime().maxMemory() / 1e9 val processors = Runtime.getRuntime().availableProcessors() - 2 // leave 2 processors for the OS + val systemMemory = getSystemMemory() - myLogger.info("\nLCJ: freeMemory: $freeMemory, processors: $processors") + myLogger.info("calculateNumThreadsAndRuns: systemMemory: $systemMemory, processors: $processors") // If the user did not specify the number of threads to use, we will use all available // processors. If the user did specify the number of threads, we will use that number @@ -186,23 +213,26 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { processors } - println("LCJ: totalThreadsToUse: $totalThreadsToUse") + myLogger.info("calculateNumThreadsAndRuns: totalThreadsToUse: $totalThreadsToUse") // Per Baoxing's chart, it takes just over 20G/thread to run anchorwave on a sample maize genome. // THe number of threads that can be run simultaneously is limited by the amount of // free memory available. We will use 21G/thread as the memory requirement, and // calculate the number of threads that can be run simultaneously based on the - // amount of free memory available. + // amount of system memory available. This calculation assumes the user has access to + // all memory and processors on the machine. - val concurrentThreads = (freeMemory/21).toInt() - println("LCJ: concurrentThreads: $concurrentThreads") + val concurrentThreads = (systemMemory/21).toInt() + myLogger.info("calculateNumThreadsAndRuns: max concurrent threads: $concurrentThreads") if (concurrentThreads < 1) { - // FreeMemory is often low when running CI or junit tests. This allows them to pass. - myLogger.warn("There is not enough free memory to run anchorwave. Free memory: $freeMemory") + //TODO: Lcj - do we still need this code? + // systemMemory may be low when running CI or junit tests. + // The test files are also very small. + // This clause allows junits to run. + myLogger.warn("There is not enough systemMemory to run anchorwave. Free memory: $systemMemory") myLogger.warn("will attempt to run one alignment at a time, using just a single thread.") return(Pair(1,1)) } - // THis is sometimes // Calculate how many threads we can run at the same time (based on memory availability) val totalConcurrentThreads = if (concurrentThreads > totalThreadsToUse) { totalThreadsToUse @@ -211,7 +241,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { } // Now that we know how many threads can be run concurrently, we need to - // determine how many parallel alignments to do, and how many threads each + // determine how many parallel alignments to run, and how many threads each // one gets. If the user specified the number of parallel alignments to run, // we will use that number, or the number of assemblies in the list, whichever // is smaller. @@ -245,7 +275,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { // concurrent alignments and number of threads per alignment maximizeRunsAndThreads(totalConcurrentThreads, numAssemblies) } - println("LCJ: calculatedNumThreadsAndRuns: returning runsAndThreads values: $runsAndThreads") + myLogger.info("calculatedNumThreadsAndRuns: returning runsAndThreads values: $runsAndThreads") return runsAndThreads } @@ -270,7 +300,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { // 5 runs: 2 threads // 6 or higher runs: 1 thread // This code will return 3 runs, 3 threads each. - println("LCJ: maximizeRunsAndThreads: totalConcurrentThreads: $totalConcurrentThreads, totalAssemblies: $totalAssemblies") + myLogger.info("maximizeRunsAndThreads: totalConcurrentThreads: $totalConcurrentThreads, totalAssemblies: $totalAssemblies") val assembliesToThreads = mutableMapOf() // This loop says if each assembly gets "numThreads", how many concurrent runs can we do? @@ -293,21 +323,21 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { // Repeat until there are 2 entries or fewer entries left. - println("LCJ: maximizeRunsAndThreads: we had the chart, it looks like:") - println("numRuns\tthreadsPerRun") + myLogger.info("maximizeRunsAndThreads: potential run/thread combinations:") + myLogger.info("numAlignments\tthreadsPerAlignments") assembliesToThreads.forEach { (numRuns, threadsPerRun) -> - println("$numRuns\t$threadsPerRun") + myLogger.info("$numRuns\t$threadsPerRun") } // 1. if only 1 entry, use that if (assembliesToThreads.size == 1) { val entry = assembliesToThreads.entries.first() - println("Using ${entry.value} threads for ${entry.key} runs") + myLogger.info("Running ${entry!!.key} runs with ${entry.value} threads per runs") return Pair(entry.key, entry.value) } else if (assembliesToThreads.size == 2) { // 2. if are only 2 entries, use the one with the highest number of concurrent assemblies val entry = assembliesToThreads.entries.maxByOrNull { it.key } - println("Using ${entry!!.value} threads for ${entry.key} runs") + myLogger.info("Running ${entry!!.key} runs with ${entry.value} threads per runs") return Pair(entry.key, entry.value) } else { // 3. if there are > 3 entries, drop the one with the lowest number of runs and the one with the highest number of runs. @@ -319,15 +349,12 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { assembliesToThreads.remove(maxEntry!!.key) } // 2. if are only 2 entries, use the one with the highest number of concurrent alignments - // But this may mean we are not using all the threads available. If we pick the one with - // the highest number of threads, that could be better utilization of the threads. - // If there are 10 threads and 5 alignments, this will pick 2 alignments with 5 threads each. + // This may mean we are not using all the threads available, but it avoids the case of choosing + // to run 1 alignment with all threads, rather than 2 alignments with hald the threads when + // these are our only choices. - // If we choose based on remaining enty with the largest number of concurrent alignments, - // we will pick 3 alignments with 3 threads each. Because we already dropped 5 alignments - // and 2 threads. val entry = assembliesToThreads.entries.maxByOrNull { it.key } - println("Running ${entry!!.key} concurrent alignments with ${entry!!.value} threads for each run") + myLogger.info("Running ${entry!!.key} concurrent alignments with ${entry!!.value} threads for each run") return Pair(entry.key, entry.value) } } diff --git a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt index 14abdfd6..1a6abf1e 100644 --- a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt +++ b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt @@ -14,15 +14,12 @@ class AlignAssembliesTest { @Test fun testSystemMemory() { - val s_runtime = Runtime.getRuntime() - val s_totalMemory = s_runtime.totalMemory() - val s_freeMemory = s_runtime.freeMemory() - val s_maxMemory = s_runtime.maxMemory() - val fromTerry = Runtime.getRuntime().maxMemory() / 1048576L - val freeMemGigs = Runtime.getRuntime().maxMemory() / 1e-9 - println("fromTerry: $fromTerry , freeMemGigs: $freeMemGigs") - println("System memory: total: $s_totalMemory, free: $s_freeMemory, max: $s_maxMemory") + val availMemory = AlignAssemblies().getSystemMemory() + // What can we test here? Verify that memory is not 0, as we don't + // know how much memory will be on the system running the test. + assertTrue(availMemory > 0, "System memory is 0") } + @Test fun testNumThreadsAndRuns() { // Test more assemblies than threads, it will pick the @@ -73,9 +70,8 @@ class AlignAssembliesTest { alignmentsToThreads = AlignAssemblies().maximizeRunsAndThreads(totalConcurrentThreads, totalAssemblies) println("\nAlignAssembliesTest: alignmentsToThreads: $alignmentsToThreads") assertEquals( Pair(6,7), alignmentsToThreads) - - } + @Test fun testCliktParams() { val alignAssemblies = AlignAssemblies() From a83b7ef54b272ea0a73106d76d7c67653321dfcc Mon Sep 17 00:00:00 2001 From: lynnjo Date: Wed, 13 Dec 2023 15:12:53 -0500 Subject: [PATCH 06/14] udpate junit tests --- .../phgv2/cli/AlignAssembliesTest.kt | 35 ++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt index 1a6abf1e..116e7787 100644 --- a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt +++ b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt @@ -133,7 +133,7 @@ class AlignAssembliesTest { val result = alignAssemblies.test( "--gff ${TestExtension.smallseqAnchorsGffFile} --reference-file ${TestExtension.smallseqRefFile} " + - "-a ${TestExtension.smallseqAssembliesListFile} -o ${TestExtension.tempDir}" + "-a ${TestExtension.smallseqAssembliesListFile} -o ${TestExtension.tempDir} --total-threads 1 --in-parallel 1" ) println("testRunningAlignAssemblies: result output: ${result.output}") @@ -174,4 +174,37 @@ class AlignAssembliesTest { } + @Test + fun testSystemDefinedThreadsAndRuns() { + + // phg align-assemblies --gff /workdir/tmc46/AlignAssemblies/smallSeq_data/anchors.gff + // --ref /workdir/tmc46/AlignAssemblies/smallSeq_data/Ref.fa + // -a /workdir/tmc46/AlignAssemblies/assembliesList.txt + // -o /workdir/tmc46/AlignAssemblies/temp + + val alignAssemblies = AlignAssemblies() + + val result = alignAssemblies.test( + "--gff ${TestExtension.smallseqAnchorsGffFile} --reference-file ${TestExtension.smallseqRefFile} " + + "-a ${TestExtension.smallseqAssembliesListFile} -o ${TestExtension.tempDir} " + ) + + println("testRunningAlignAssemblies: result output: ${result.output}") + + assertEquals(result.statusCode, 0, "status code not 0: ${result.statusCode}") + + val lineAMAF = TestExtension.tempDir + "LineA.maf" + assertTrue(File(lineAMAF).exists(), "File $lineAMAF does not exist") + + val lineBMAF = TestExtension.tempDir + "LineB.maf" + assertTrue(File(lineBMAF).exists(), "File $lineBMAF does not exist") + + // Checksums of files are not verified here. Because running in parallel means + // the entries in the maf files are in unspecified order, the files, while containing + // the same data, will have different checksums. This is not a problem, as the + // order of the entries in the maf files is not important. + + + } + } \ No newline at end of file From 90a03deeac057b990ca088fc13fdbb7f01c6d757 Mon Sep 17 00:00:00 2001 From: lynnjo Date: Thu, 14 Dec 2023 08:06:29 -0500 Subject: [PATCH 07/14] cleanup before pull request --- .../phgv2/cli/AlignAssemblies.kt | 32 ++++++++----------- .../phgv2/cli/AlignAssembliesTest.kt | 16 ++++------ 2 files changed, 20 insertions(+), 28 deletions(-) diff --git a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt index 457e6db5..c9708c9b 100644 --- a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt +++ b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt @@ -164,7 +164,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { } /** - * This method uses a java managment facotry bean to determine the + * This method uses a java management factory bean to determine the * amount of system memory on the hosting machine. */ fun getSystemMemory():Double { @@ -194,7 +194,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { // If totalThreads or inParallel are 0, it means the user did not specify them // In that case we calculate these values based on the number of processors available // and the amount of free memory available. - val processors = Runtime.getRuntime().availableProcessors() - 2 // leave 2 processors for the OS + val processors = if (Runtime.getRuntime().availableProcessors() - 2 > 0) Runtime.getRuntime().availableProcessors() - 2 else 1 // leave 2 processors for the OS val systemMemory = getSystemMemory() myLogger.info("calculateNumThreadsAndRuns: systemMemory: $systemMemory, processors: $processors") @@ -215,8 +215,8 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { myLogger.info("calculateNumThreadsAndRuns: totalThreadsToUse: $totalThreadsToUse") // Per Baoxing's chart, it takes just over 20G/thread to run anchorwave on a sample maize genome. - // THe number of threads that can be run simultaneously is limited by the amount of - // free memory available. We will use 21G/thread as the memory requirement, and + // The number of threads that can be run simultaneously is limited by the amount of + // memory available. We will use 21G/thread as the memory requirement, and // calculate the number of threads that can be run simultaneously based on the // amount of system memory available. This calculation assumes the user has access to // all memory and processors on the machine. @@ -224,7 +224,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { val concurrentThreads = (systemMemory/21).toInt() myLogger.info("calculateNumThreadsAndRuns: max concurrent threads: $concurrentThreads") if (concurrentThreads < 1) { - //TODO: Lcj - do we still need this code? + // do we still need this code? // systemMemory may be low when running CI or junit tests. // The test files are also very small. // This clause allows junits to run. @@ -281,14 +281,12 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { /** * This considers the number of threads on the machine, the number of threads user may have - * specified, and the number of assemblies to align. It calculates the a middle value balancing + * specified, and the number of assemblies to align. It calculates a middle value balancing * number of threads and number of concurrent alignments. - * - * If the */ fun maximizeRunsAndThreads(totalConcurrentThreads:Int, totalAssemblies:Int): Pair { // This returns a Pair where the first value is the number of runs, the seconds is threadsPerRun - // THe maximum number of current runs is the total number of threads available, + // The maximum number of current runs is the total number of threads available, // which would be each assembly getting a single thread. // We believe it is more efficient to run fewer assemblies at a time, each with more threads. // What this code does is calculate the middle of this. For example: If there are 20 assemblies @@ -303,7 +301,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { myLogger.info("maximizeRunsAndThreads: totalConcurrentThreads: $totalConcurrentThreads, totalAssemblies: $totalAssemblies") val assembliesToThreads = mutableMapOf() - // This loop says if each assembly gets "numThreads", how many concurrent runs can we do? + // This loop asks if each assembly gets "numThreads", how many concurrent runs can we do? for (numThreads in 1..totalConcurrentThreads) { val numRuns =totalConcurrentThreads/numThreads @@ -316,11 +314,11 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { } } // we should now have a map with the highest number of threads for each number of runs - //At this point, we pick + // We choose the best option based on: // 1. if only 1 entry, use that - // 2. if are only 2 entries, use the one with the highest number of threads + // 2. if only 2 entries, use the one with the highest number of threads // 3. if there are > 3 entries, drop the one with the lowest number of runs and the one with the highest number of runs. - // Repeat until there are 2 entries or fewer entries left. + // Repeat until there are 2 or fewer entries left. myLogger.info("maximizeRunsAndThreads: potential run/thread combinations:") @@ -341,16 +339,16 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { return Pair(entry.key, entry.value) } else { // 3. if there are > 3 entries, drop the one with the lowest number of runs and the one with the highest number of runs. - // Repeat then there are 2 entries or fewer entries left. + // Repeat until there are 2 entries or fewer entries left. while (assembliesToThreads.size > 2) { val minEntry = assembliesToThreads.entries.minByOrNull { it.key } val maxEntry = assembliesToThreads.entries.maxByOrNull { it.key } assembliesToThreads.remove(minEntry!!.key) assembliesToThreads.remove(maxEntry!!.key) } - // 2. if are only 2 entries, use the one with the highest number of concurrent alignments + // 2. if only 2 entries, use the one with the highest number of concurrent alignments // This may mean we are not using all the threads available, but it avoids the case of choosing - // to run 1 alignment with all threads, rather than 2 alignments with hald the threads when + // to run 1 alignment with all threads, rather than 2 alignments with half the threads when // these are our only choices. val entry = assembliesToThreads.entries.maxByOrNull { it.key } @@ -446,8 +444,6 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { withContext( Dispatchers.Default ) { - //val threadsPerRun = totalThreads / inParallel - //println("alignAssembly: totalThreads: $totalThreads, inParallel: $inParallel, threadsPerRun: $threadsPerRun") for (assemblyEntry in inputChannel) { // Column names were checked for validity above diff --git a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt index 116e7787..0b908268 100644 --- a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt +++ b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt @@ -15,8 +15,9 @@ class AlignAssembliesTest { @Test fun testSystemMemory() { val availMemory = AlignAssemblies().getSystemMemory() - // What can we test here? Verify that memory is not 0, as we don't - // know how much memory will be on the system running the test. + // Verify that memory is not 0. It is difficult to know precisely + // what should be returned as we do not know the system running the test. + // This has been tested manually on multiple systems. assertTrue(availMemory > 0, "System memory is 0") } @@ -53,7 +54,7 @@ class AlignAssembliesTest { // test higher number of threads: // This picks 6/7 - // Options will be: (only number highest number of runs with the same thread count is kept) + // Options will be: (only highest number of runs with the same thread count is kept) // 1 run: 45 threads // 2 runs: 22 threads // 3 runs: 15 threads @@ -177,11 +178,6 @@ class AlignAssembliesTest { @Test fun testSystemDefinedThreadsAndRuns() { - // phg align-assemblies --gff /workdir/tmc46/AlignAssemblies/smallSeq_data/anchors.gff - // --ref /workdir/tmc46/AlignAssemblies/smallSeq_data/Ref.fa - // -a /workdir/tmc46/AlignAssemblies/assembliesList.txt - // -o /workdir/tmc46/AlignAssemblies/temp - val alignAssemblies = AlignAssemblies() val result = alignAssemblies.test( @@ -199,8 +195,8 @@ class AlignAssembliesTest { val lineBMAF = TestExtension.tempDir + "LineB.maf" assertTrue(File(lineBMAF).exists(), "File $lineBMAF does not exist") - // Checksums of files are not verified here. Because running in parallel means - // the entries in the maf files are in unspecified order, the files, while containing + // Checksums of files are not verified here. Running in parallel means + // the entries in the maf files are in unspecified order. The files, while containing // the same data, will have different checksums. This is not a problem, as the // order of the entries in the maf files is not important. From 157a96b0910a9ed8b33c10c246f838a37dcd3dfc Mon Sep 17 00:00:00 2001 From: lynnjo Date: Thu, 14 Dec 2023 09:40:42 -0500 Subject: [PATCH 08/14] added edge case junits to up code coverage --- .../phgv2/cli/AlignAssembliesTest.kt | 58 +++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt index 0b908268..9a3b6a0b 100644 --- a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt +++ b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt @@ -71,6 +71,15 @@ class AlignAssembliesTest { alignmentsToThreads = AlignAssemblies().maximizeRunsAndThreads(totalConcurrentThreads, totalAssemblies) println("\nAlignAssembliesTest: alignmentsToThreads: $alignmentsToThreads") assertEquals( Pair(6,7), alignmentsToThreads) + + // test with just 1 thread and 1 run. This is to hit + // code coverage for the case where there is only 1 option + // for the number of runs and threads. + totalConcurrentThreads = 1 + totalAssemblies = 1 + alignmentsToThreads = AlignAssemblies().maximizeRunsAndThreads(totalConcurrentThreads, totalAssemblies) + println("\nAlignAssembliesTest: alignmentsToThreads: $alignmentsToThreads") + assertEquals( Pair(1,1), alignmentsToThreads) } @Test @@ -203,4 +212,53 @@ class AlignAssembliesTest { } + @Test + fun testRequestedThreadsGreaterThanAvail() { + // I'm assuming all of the machines which will run this test will + // have less than 300 threads available for processing. The code + // should default to the maximum number of threads available (minus 2 for IO) + // This test ups our code coverage. + val alignAssemblies = AlignAssemblies() + + val result = alignAssemblies.test( + "--gff ${TestExtension.smallseqAnchorsGffFile} --reference-file ${TestExtension.smallseqRefFile} " + + "-a ${TestExtension.smallseqAssembliesListFile} -o ${TestExtension.tempDir} --total-threads 300 --in-parallel 1" + ) + + println("testRunningAlignAssemblies: result output: ${result.output}") + + assertEquals(result.statusCode, 0, "status code not 0: ${result.statusCode}") + + val lineAMAF = TestExtension.tempDir + "LineA.maf" + assertTrue(File(lineAMAF).exists(), "File $lineAMAF does not exist") + + val lineBMAF = TestExtension.tempDir + "LineB.maf" + assertTrue(File(lineBMAF).exists(), "File $lineBMAF does not exist") + + } + + @Test + fun testInParallelGreaterThanNumberOfAssemblies() { + // There are only 2 assemblies that we are aligning, will give it + // in-parallel of 4. It should adjust to 2. + // This test ups our code coverage. + val alignAssemblies = AlignAssemblies() + + val result = alignAssemblies.test( + "--gff ${TestExtension.smallseqAnchorsGffFile} --reference-file ${TestExtension.smallseqRefFile} " + + "-a ${TestExtension.smallseqAssembliesListFile} -o ${TestExtension.tempDir} --total-threads 4 --in-parallel 4" + ) + + println("testRunningAlignAssemblies: result output: ${result.output}") + + assertEquals(result.statusCode, 0, "status code not 0: ${result.statusCode}") + + val lineAMAF = TestExtension.tempDir + "LineA.maf" + assertTrue(File(lineAMAF).exists(), "File $lineAMAF does not exist") + + val lineBMAF = TestExtension.tempDir + "LineB.maf" + assertTrue(File(lineBMAF).exists(), "File $lineBMAF does not exist") + + } + } \ No newline at end of file From 8bb6e2c2825c07c675793ad1bcda02b4c1166f46 Mon Sep 17 00:00:00 2001 From: lynnjo Date: Thu, 14 Dec 2023 13:08:04 -0500 Subject: [PATCH 09/14] adding documentation updates --- docs/build_and_load.md | 57 +++++++++++++++++-- .../net/maizegenetics/phgv2/cli/LoadVcf.kt | 2 +- 2 files changed, 54 insertions(+), 5 deletions(-) diff --git a/docs/build_and_load.md b/docs/build_and_load.md index c19a769a..d393ca08 100644 --- a/docs/build_and_load.md +++ b/docs/build_and_load.md @@ -31,8 +31,6 @@ In this document, we will discuss the steps needed to: --gff anchors.gff \ --reference-file /my/ref.fasta \ --assemblies assemblies_list.txt \ - --total-threads 20 \ - --in-parallel 4 \ -o /path/for/generated_files ``` * Compress FASTA files @@ -325,6 +323,12 @@ This command uses several parameters: `LineB`. Since these are located in a subdirectory called `data` relative to my working directory, I will also add that to the path. +* `-o` - The name of the directory for the alignment outputs. + + +In addition to the above parameters, there are two optional parameters. When values are not specified for these parameters, +default values are calculated by the software based on the system processor and memory configuration. + * `--total-threads` - How many threads would you like to allocate for the alignment step? More information about this step and the `--in-parallel` step can be found in the following **Details - @@ -333,7 +337,7 @@ This command uses several parameters: More information about this step and the `--total-threads` step can be found in the following **Details - threads and parallelization** section. -* `-o` - The name of the directory for the alignment outputs. + > [!WARNING] > The directory that you specify in the output (`-o`) section must @@ -407,7 +411,52 @@ or review the following code blocks: > prior `-R` and `-Q` parameters. -[//]: # (#### Details - threads and parallelization) +# (#### Details - threads and parallelization) +Aligning with anchorwave is memory intensive and can be slow. Processing speed may be increased by +using multiple threads for each alignment, and/or by running multiple alignments in parallel. The amount of memory +each thread takes is dependent on the processor type. The table below shows the memory usage for a single +alignment based on processor type: + +| Processor | peak memory (Gb) | wall time | +|-----------|------------------|-----------| +| SSE2 | 20.1 | 26:47:17 | +| SSE4.1 | 20.6 | 24:05:07 | +| AVX2 | 20.1 | 21:40:00 | +| AVX512 | 20.1 | 18:31:39 | +| ARM | 34.2 | 18:08:57 | + +The `--total-threads` parameter indicates the total number of threads available for system use. The `--in-parallel` +parameter controls the number of alignments run in parallel. When these values are not specified, the software will +compute the optimal values based on the system processor and memory configuration. + +The number of threads that may be run in parallel is limited by the amount of memory available. The +system is queried for memory and processor information. The number of threads that may be run in parallel +is determined by "system memory" / "peak memory" from the table above. To generalize the calculation, we divide +memory available (Gib) by 21 and round down to the nearest integer. + +For example, if the system has 512 Gb of memory, 80 processors and 5 assemblies that need aligning, +the maximum number of threads that could be run in parallel is 24 (512/21). The number of potential parallel +alignments with the threads allocated for each is shown in the table below + + +| Alignments in parallel | Threads per alignment | +|------------------------|-----------------------| +| 5 | 4 | +| 4 | 6 | +| 3 | 8 | +| 2 | 12 | +| 1 | 24 | + +The software will select a pairing that maximizes the number of alignments run in parallel while utilizing multiple threads, +opting for a value in the middle. In the case above with 5 assemblies and a possibility of 24 concurrent threads, +the system will choose to run 3 alignments in parallel with 8 threads each. The total number of threads used +will be 24 (3 * 8). + +User defined values for in-parallel and total-threads are considered along with the number of assemblies to +align and system capacities, when determining the anchorwave setup. + + + [//]: # () [//]: # (***WIP*** - revisit once we can agree on naming and parameter conventions) diff --git a/src/main/kotlin/net/maizegenetics/phgv2/cli/LoadVcf.kt b/src/main/kotlin/net/maizegenetics/phgv2/cli/LoadVcf.kt index 4d8e0b7d..82a0d744 100644 --- a/src/main/kotlin/net/maizegenetics/phgv2/cli/LoadVcf.kt +++ b/src/main/kotlin/net/maizegenetics/phgv2/cli/LoadVcf.kt @@ -27,7 +27,7 @@ class LoadVcf : CliktCommand(help="load g.vcf and h.vcf files into tiledb datase private val myLogger = LogManager.getLogger(LoadVcf::class.java) - val vcfDir by option(help = "VCF file directory") + val vcfDir by option(help = "Full path to VCF file directory") .default("") .validate { require(it.isNotBlank()) { From 6222348a87190a0a813a9132c323f595d88599ca Mon Sep 17 00:00:00 2001 From: lynnjo Date: Thu, 14 Dec 2023 15:05:55 -0500 Subject: [PATCH 10/14] adding detailed info on annotate-fastas, some typo corrections elsewhere --- docs/build_and_load.md | 38 +++++++++++++++++++ .../phgv2/cli/AlignAssemblies.kt | 7 ++-- 2 files changed, 41 insertions(+), 4 deletions(-) diff --git a/docs/build_and_load.md b/docs/build_and_load.md index d393ca08..f59e78d3 100644 --- a/docs/build_and_load.md +++ b/docs/build_and_load.md @@ -33,6 +33,13 @@ In this document, we will discuss the steps needed to: --assemblies assemblies_list.txt \ -o /path/for/generated_files ``` +* Update FASTA headers with sample information: + ```shell + phg annotate-fastas \ + --keyfile /path/to/keyfile \ + --output-dir /path/to/annotated/fastas \ + --threads 10 + ``` * Compress FASTA files ```shell phg agc-compress \ @@ -491,6 +498,37 @@ align and system capacities, when determining the anchorwave setup. [//]: # (| ARM | 34.2 | 18:08:57 |) +### Annotate Fastas +Next, we must annotate our FASTA files with sample information. AGC, when queried for sequence +belonging to a specific sample, does not maintain the sample name when returning sequence information. +For example: If I user requested the same chromosome and position range from two different samples with +a command that included: + ``` +chr1@LineA:1-100,chr1@LineB:1-100 + ``` +The returned data would have identical idlines for both samples' fasta sequences and would look like: +```agsl +>chr1:1-100 +``` + +To avoid this, we will append the sample name to the idline of each fasta sequence with the text "sampleName=". +In additiona, the updated fasta is renamed to be .fa. This is done with the annotate-fastas command: +```shell +phg annotate-fastas \ + --keyfile data/annotation_keyfile.txt \ + --threads 10 \ + --o output/annotated_assemblies +``` +The keyfile is a tab-delimited file with two columns: + 1. Path to fasta file + 2. Sample name + +The output directory will contain the updated fasta files with the sample name appended to the idline, and the fastas +renamed to .fa + + + + ### Compress FASTA files For optimal storage of sequence information, we can convert our diff --git a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt index c9708c9b..33ef60c0 100644 --- a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt +++ b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt @@ -172,7 +172,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { // attribute is in bytes val attribute = mBeanServer.getAttribute(ObjectName("java.lang", "type", "OperatingSystem"), "TotalPhysicalMemorySize") - // translate bytes to gigabutes + // translate bytes to gigabytes val memoryGibi = attribute as Long / 1e9 // and now to gibibytes, as anchorwave uses GiB val memoryGigi = memoryGibi * 0.93 @@ -182,18 +182,17 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { /** * This function calculates the number of available processors, and the maxium - * system memory avilable for this machine. It returns a Pair where + * system memory available for this machine. It returns a Pair where * the first value is the number of alignments to run in parallel, and the second * value is the number of threads to use for each alignment. * * An algorithm is used that selects a middle value between the number of threads * and the number of alignments to run in parallel. - * */ fun calculatedNumThreadsAndRuns(totalThreads:Int, inParallel:Int, assemblies:String): Pair { // If totalThreads or inParallel are 0, it means the user did not specify them // In that case we calculate these values based on the number of processors available - // and the amount of free memory available. + // and the amount of memory available. val processors = if (Runtime.getRuntime().availableProcessors() - 2 > 0) Runtime.getRuntime().availableProcessors() - 2 else 1 // leave 2 processors for the OS val systemMemory = getSystemMemory() From b2f92a7500eb8e3d67ee7a073250b5e453b2ee86 Mon Sep 17 00:00:00 2001 From: lynnjo Date: Thu, 14 Dec 2023 15:09:46 -0500 Subject: [PATCH 11/14] additional update to build_and_load.md --- docs/build_and_load.md | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/docs/build_and_load.md b/docs/build_and_load.md index f59e78d3..ade7c8bf 100644 --- a/docs/build_and_load.md +++ b/docs/build_and_load.md @@ -512,22 +512,23 @@ The returned data would have identical idlines for both samples' fasta sequences ``` To avoid this, we will append the sample name to the idline of each fasta sequence with the text "sampleName=". -In additiona, the updated fasta is renamed to be .fa. This is done with the annotate-fastas command: +In addition, the updated fasta is renamed to be .fa. This is done with the annotate-fastas command: ```shell phg annotate-fastas \ --keyfile data/annotation_keyfile.txt \ --threads 10 \ --o output/annotated_assemblies ``` +This command takes 3 parameters: The keyfile is a tab-delimited file with two columns: 1. Path to fasta file 2. Sample name The output directory will contain the updated fasta files with the sample name appended to the idline, and the fastas -renamed to .fa - - +renamed to .fa. These are the files you want to load to the AGC database in the step below. +The threads parameter is optional and defaults to 1. If you have multiple fasta files to annotate, you can use this +to run the annotation in parallel. ### Compress FASTA files From 672574cd0e201c72d2c00ee6e18a32dab085e16b Mon Sep 17 00:00:00 2001 From: lynnjo Date: Thu, 14 Dec 2023 15:40:55 -0500 Subject: [PATCH 12/14] additional updates to annotate-fastas documentation. --- docs/build_and_load.md | 13 ++++++++----- 1 file changed, 8 insertions(+), 5 deletions(-) diff --git a/docs/build_and_load.md b/docs/build_and_load.md index ade7c8bf..2290b11d 100644 --- a/docs/build_and_load.md +++ b/docs/build_and_load.md @@ -501,18 +501,21 @@ align and system capacities, when determining the anchorwave setup. ### Annotate Fastas Next, we must annotate our FASTA files with sample information. AGC, when queried for sequence belonging to a specific sample, does not maintain the sample name when returning sequence information. -For example: If I user requested the same chromosome and position range from two different samples with +For example: If a user requested the same chromosome and position range from two different samples with a command that included: ``` chr1@LineA:1-100,chr1@LineB:1-100 ``` The returned data would have identical idlines for both samples' fasta sequences and would look like: -```agsl +``` >chr1:1-100 ``` -To avoid this, we will append the sample name to the idline of each fasta sequence with the text "sampleName=". -In addition, the updated fasta is renamed to be .fa. This is done with the annotate-fastas command: +The user has no way to determine which chr1:1-100 entry belongs to LineA and which belongs to LineB. + +To avoid this confusion, the annotate-fastas command appends the sample name to the idline of each fasta sequence with the text "sampleName=". +Because AGC maintains the fasta comments, the sample name is now available for each sequence returned from AGC. +In addition, the updated fasta is renamed to .fa. To run annotate-fastas, use the following command ```shell phg annotate-fastas \ --keyfile data/annotation_keyfile.txt \ @@ -528,7 +531,7 @@ The output directory will contain the updated fasta files with the sample name a renamed to .fa. These are the files you want to load to the AGC database in the step below. The threads parameter is optional and defaults to 1. If you have multiple fasta files to annotate, you can use this -to run the annotation in parallel. +to run annotations in parallel. ### Compress FASTA files From 8292319006fc630c9037ed1ec31d5ee7a60aa1f4 Mon Sep 17 00:00:00 2001 From: lynnjo Date: Fri, 15 Dec 2023 07:42:48 -0500 Subject: [PATCH 13/14] minor edits per pull request comments on the documentation --- docs/build_and_load.md | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/docs/build_and_load.md b/docs/build_and_load.md index 2290b11d..ce8fdc50 100644 --- a/docs/build_and_load.md +++ b/docs/build_and_load.md @@ -418,13 +418,13 @@ or review the following code blocks: > prior `-R` and `-Q` parameters. -# (#### Details - threads and parallelization) +#### Details - threads and parallelization Aligning with anchorwave is memory intensive and can be slow. Processing speed may be increased by using multiple threads for each alignment, and/or by running multiple alignments in parallel. The amount of memory each thread takes is dependent on the processor type. The table below shows the memory usage for a single alignment based on processor type: -| Processor | peak memory (Gb) | wall time | +| Processor | peak memory (GB) | wall time | |-----------|------------------|-----------| | SSE2 | 20.1 | 26:47:17 | | SSE4.1 | 20.6 | 24:05:07 | @@ -439,9 +439,9 @@ compute the optimal values based on the system processor and memory configuratio The number of threads that may be run in parallel is limited by the amount of memory available. The system is queried for memory and processor information. The number of threads that may be run in parallel is determined by "system memory" / "peak memory" from the table above. To generalize the calculation, we divide -memory available (Gib) by 21 and round down to the nearest integer. +memory available (GiB) by 21 and round down to the nearest integer. -For example, if the system has 512 Gb of memory, 80 processors and 5 assemblies that need aligning, +For example, if the system has 512 GB of memory, 80 processors and 5 assemblies that need aligning, the maximum number of threads that could be run in parallel is 24 (512/21). The number of potential parallel alignments with the threads allocated for each is shown in the table below From c4d1e264667e599a22b7c8ffd5c41513e8e3911d Mon Sep 17 00:00:00 2001 From: lynnjo Date: Fri, 15 Dec 2023 08:09:44 -0500 Subject: [PATCH 14/14] fixed annotate fasta in documentation --- docs/build_and_load.md | 179 +++++++++++++++++------------------------ 1 file changed, 75 insertions(+), 104 deletions(-) diff --git a/docs/build_and_load.md b/docs/build_and_load.md index ce8fdc50..6ec0a434 100644 --- a/docs/build_and_load.md +++ b/docs/build_and_load.md @@ -499,98 +499,13 @@ align and system capacities, when determining the anchorwave setup. [//]: # (| ARM | 34.2 | 18:08:57 |) ### Annotate Fastas -Next, we must annotate our FASTA files with sample information. AGC, when queried for sequence -belonging to a specific sample, does not maintain the sample name when returning sequence information. -For example: If a user requested the same chromosome and position range from two different samples with -a command that included: - ``` -chr1@LineA:1-100,chr1@LineB:1-100 - ``` -The returned data would have identical idlines for both samples' fasta sequences and would look like: -``` ->chr1:1-100 -``` - -The user has no way to determine which chr1:1-100 entry belongs to LineA and which belongs to LineB. - -To avoid this confusion, the annotate-fastas command appends the sample name to the idline of each fasta sequence with the text "sampleName=". -Because AGC maintains the fasta comments, the sample name is now available for each sequence returned from AGC. -In addition, the updated fasta is renamed to .fa. To run annotate-fastas, use the following command -```shell -phg annotate-fastas \ - --keyfile data/annotation_keyfile.txt \ - --threads 10 \ - --o output/annotated_assemblies -``` -This command takes 3 parameters: -The keyfile is a tab-delimited file with two columns: - 1. Path to fasta file - 2. Sample name - -The output directory will contain the updated fasta files with the sample name appended to the idline, and the fastas -renamed to .fa. These are the files you want to load to the AGC database in the step below. - -The threads parameter is optional and defaults to 1. If you have multiple fasta files to annotate, you can use this -to run annotations in parallel. - - -### Compress FASTA files -For optimal storage of sequence information, we can convert our -"plain-text" FASTA files into a more compact representation using -compression. For PHGv2, we can use a command called `agc-compress`, -which is a wrapper for the -[Assembled Genomes Compressor](https://github.com/refresh-bio/agc) -(AGC). AGC provides performant and efficient compression ratios for -our assembly genomes. Like AnchorWave, AGC is also installed during -the Conda environment setup phase, so there is no need to install -this manually. - -To run the compression step, we can call the `align-assemblies` -command: - -```shell -./phg agc-compress \ - --db-path vcf_dbs \ - --fasta-list data/assemblies_list.txt \ - --reference-file data/Ref.fa -``` - -This command takes in 3 parameters: -* `--db-path` - path to directory storing the TileDB instances. The - AGC compressed genomes will be placed here on completion. - -> [!NOTE] -> The directory specified here should be the same directory used to -> initialize the TileDB instances in the database initialization -> (`initdb`) step. - -* `--fasta-list` - List of assembly FASTA genomes to compress. - -> [!NOTE] -> The list specified in `--fasta-list` _can_ be the same list used -> in the alignment (`align-assemblies`) step, but make sure header -> information in each FASTA contains sample IDs. See the -> [**"Important information regarding `agc` compression"**](#warning-important-information-regarding-agc-compression-warning) -> section for further information and how to remedy this. - -* `--reference-file` - Reference FASTA genome. - -After compression is finished, we can navigate to the directory -containing the TileDB instances. In my case, this would be the -subdirectory, `vcf_dbs`. Here, you will see a new file created: -`assemblies.agc`. This is the compressed AGC file containing our -assemblies. This file will be used later to query for haplotype -sequence regions and composite genome creation. - -#### :warning: Important information regarding `agc` compression :warning: - As of the current date of this document, the return methods of -AGC will not keep track of sample IDs when returning -sequence information from the compressed file **unless you explicitly -state the sample information in the header lines of the FASTA files -you wish to compress for the PHGv2 databases**. To explain this -further, let's imagine that we have two FASTA files: `LineA.fa` and -`LineB.fa`. These files contain information for only chromosome 1 and +AGC will not keep track of sample IDs when returning +sequence information from the compressed file **unless you explicitly +state the sample information in the header lines of the FASTA files +you wish to compress for the PHGv2 databases**. To explain this +further, let's imagine that we have two FASTA files: `LineA.fa` and +`LineB.fa`. These files contain information for only chromosome 1 and have a simple header that denotes this sequence name (e.g. `chr1`): ``` @@ -603,12 +518,12 @@ $ head LineB.fa ATGCGTTCGCCTTCCG ``` -After we compress this information using the `agc-compress` command, +After we compress this information using the `agc-compress` command, we will have a file called `assemblies.agc`. PHGv2 leverages this compressed sequence file when creating VCF data (_see -the [**"Create VCF files"**](#create-vcf-files) section for further +the [**"Create VCF files"**](#create-vcf-files) section for further details_). The issue arrives when we query the compressed data using -[AGC's `getctg`](https://github.com/refresh-bio/agc#extract-contigs-from-the-archive) +[AGC's `getctg`](https://github.com/refresh-bio/agc#extract-contigs-from-the-archive) command. When we query this hypothetical archive, we will get the following information: @@ -622,7 +537,7 @@ ATGCGTACGCGCACCG ATGCGTTCGCCTTCCG ``` -As you can see from the above hypothetical output, we now have no +As you can see from the above hypothetical output, we now have no means to efficiently track where each sequence is coming from due to the lack of header information from our prior FASTA files. We can remedy this by adding sample information to the headers: @@ -650,7 +565,7 @@ ATGCGTACGCGCACCG ATGCGTTCGCCTTCCG ``` -While we can either manually modify the header lines of our FASTA +While we can either manually modify the header lines of our FASTA file, this can become tedious and prone to a new set of downstream errors. To automate this, PHGv2 provides an optional command to append sample information to the headers of each FASTA file called @@ -665,27 +580,29 @@ phg annotate-fastas \ This command takes 3 parameters: -* `--keyfile` - A [tab-delimited](https://en.wikipedia.org/wiki/Tab-separated_values) +* `--keyfile` - A [tab-delimited](https://en.wikipedia.org/wiki/Tab-separated_values) keyfile containing two columns: - | Column | Value | - |--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| - | 1 | Path to FASTA file you would like annotated (this is similar to the text files used to point to the FASTA file paths in the [`agc-compress`](#compress-fasta-files) and [`align-assemblies`](#align-assemblies) commands). | -* | 2 | Name of the sample that will be appended to each header line | +| Column | Value | +|--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 1 | Path to FASTA file you would like annotated (this is similar to the text files used to point to the FASTA file paths in the [`agc-compress`](#compress-fasta-files) and [`align-assemblies`](#align-assemblies) commands). | +| 2 | Name of the sample that will be appended to each header line | + + My example `annotation_keyfile.txt` file would look like this: + ``` data/LineA.fa LineA data/LineB.fa LineB ``` * `--threads` - Optional number of threads to annotate multiple FASTA files in parallel. _Defaults to `1`_. -* `-o` - Output directory for the newly annotated FASTA files +* `-o` - Output directory for the newly annotated FASTA files > [!WARNING] > This step must be performed before the `agc-compress` step. > [!WARNING] -> Sample IDs in the keyfile must match with what is found in the +> Sample IDs in the keyfile must match with what is found in the > TileDB instances. > [!NOTE] @@ -693,7 +610,7 @@ This command takes 3 parameters: > compressed, the extension should be `*.gz`. Once finished, this command will produce FASTA files with the name -of the sample from the keyfile appended to each header line. For +of the sample from the keyfile appended to each header line. For example, in our hypothetical FASTA files, our headers go from this: ``` @@ -726,6 +643,60 @@ ATGCGTACGCGCACCG >ATGCGTACGCGCACCG >``` + + + + +### Compress FASTA files +For optimal storage of sequence information, we can convert our +"plain-text" FASTA files into a more compact representation using +compression. For PHGv2, we can use a command called `agc-compress`, +which is a wrapper for the +[Assembled Genomes Compressor](https://github.com/refresh-bio/agc) +(AGC). AGC provides performant and efficient compression ratios for +our assembly genomes. Like AnchorWave, AGC is also installed during +the Conda environment setup phase, so there is no need to install +this manually. + +To run the compression step, we can call the `align-assemblies` +command: + +```shell +./phg agc-compress \ + --db-path vcf_dbs \ + --fasta-list data/assemblies_list.txt \ + --reference-file data/Ref.fa +``` + +This command takes in 3 parameters: +* `--db-path` - path to directory storing the TileDB instances. The + AGC compressed genomes will be placed here on completion. + +> [!NOTE] +> The directory specified here should be the same directory used to +> initialize the TileDB instances in the database initialization +> (`initdb`) step. + +* `--fasta-list` - List of assembly FASTA genomes to compress. + +> [!NOTE] +> The list specified in `--fasta-list` _can_ be the same list used +> in the alignment (`align-assemblies`) step, but make sure header +> information in each FASTA contains sample IDs. See the +> [**"Important information regarding `agc` compression"**](#warning-important-information-regarding-agc-compression-warning) +> section for further information and how to remedy this. + +* `--reference-file` - Reference FASTA genome. + +After compression is finished, we can navigate to the directory +containing the TileDB instances. In my case, this would be the +subdirectory, `vcf_dbs`. Here, you will see a new file created: +`assemblies.agc`. This is the compressed AGC file containing our +assemblies. This file will be used later to query for haplotype +sequence regions and composite genome creation. + + + ### Create VCF files Now that we have (1) created alignments of our assemblies against a single reference genome and (2) created compressed representations