diff --git a/docs/build_and_load.md b/docs/build_and_load.md index c19a769a..6ec0a434 100644 --- a/docs/build_and_load.md +++ b/docs/build_and_load.md @@ -31,10 +31,15 @@ In this document, we will discuss the steps needed to: --gff anchors.gff \ --reference-file /my/ref.fasta \ --assemblies assemblies_list.txt \ - --total-threads 20 \ - --in-parallel 4 \ -o /path/for/generated_files ``` +* Update FASTA headers with sample information: + ```shell + phg annotate-fastas \ + --keyfile /path/to/keyfile \ + --output-dir /path/to/annotated/fastas \ + --threads 10 + ``` * Compress FASTA files ```shell phg agc-compress \ @@ -325,6 +330,12 @@ This command uses several parameters: `LineB`. Since these are located in a subdirectory called `data` relative to my working directory, I will also add that to the path. +* `-o` - The name of the directory for the alignment outputs. + + +In addition to the above parameters, there are two optional parameters. When values are not specified for these parameters, +default values are calculated by the software based on the system processor and memory configuration. + * `--total-threads` - How many threads would you like to allocate for the alignment step? More information about this step and the `--in-parallel` step can be found in the following **Details - @@ -333,7 +344,7 @@ This command uses several parameters: More information about this step and the `--total-threads` step can be found in the following **Details - threads and parallelization** section. -* `-o` - The name of the directory for the alignment outputs. + > [!WARNING] > The directory that you specify in the output (`-o`) section must @@ -407,7 +418,52 @@ or review the following code blocks: > prior `-R` and `-Q` parameters. -[//]: # (#### Details - threads and parallelization) +#### Details - threads and parallelization +Aligning with anchorwave is memory intensive and can be slow. Processing speed may be increased by +using multiple threads for each alignment, and/or by running multiple alignments in parallel. The amount of memory +each thread takes is dependent on the processor type. The table below shows the memory usage for a single +alignment based on processor type: + +| Processor | peak memory (GB) | wall time | +|-----------|------------------|-----------| +| SSE2 | 20.1 | 26:47:17 | +| SSE4.1 | 20.6 | 24:05:07 | +| AVX2 | 20.1 | 21:40:00 | +| AVX512 | 20.1 | 18:31:39 | +| ARM | 34.2 | 18:08:57 | + +The `--total-threads` parameter indicates the total number of threads available for system use. The `--in-parallel` +parameter controls the number of alignments run in parallel. When these values are not specified, the software will +compute the optimal values based on the system processor and memory configuration. + +The number of threads that may be run in parallel is limited by the amount of memory available. The +system is queried for memory and processor information. The number of threads that may be run in parallel +is determined by "system memory" / "peak memory" from the table above. To generalize the calculation, we divide +memory available (GiB) by 21 and round down to the nearest integer. + +For example, if the system has 512 GB of memory, 80 processors and 5 assemblies that need aligning, +the maximum number of threads that could be run in parallel is 24 (512/21). The number of potential parallel +alignments with the threads allocated for each is shown in the table below + + +| Alignments in parallel | Threads per alignment | +|------------------------|-----------------------| +| 5 | 4 | +| 4 | 6 | +| 3 | 8 | +| 2 | 12 | +| 1 | 24 | + +The software will select a pairing that maximizes the number of alignments run in parallel while utilizing multiple threads, +opting for a value in the middle. In the case above with 5 assemblies and a possibility of 24 concurrent threads, +the system will choose to run 3 alignments in parallel with 8 threads each. The total number of threads used +will be 24 (3 * 8). + +User defined values for in-parallel and total-threads are considered along with the number of assemblies to +align and system capacities, when determining the anchorwave setup. + + + [//]: # () [//]: # (***WIP*** - revisit once we can agree on naming and parameter conventions) @@ -442,64 +498,14 @@ or review the following code blocks: [//]: # (| ARM | 34.2 | 18:08:57 |) - -### Compress FASTA files -For optimal storage of sequence information, we can convert our -"plain-text" FASTA files into a more compact representation using -compression. For PHGv2, we can use a command called `agc-compress`, -which is a wrapper for the -[Assembled Genomes Compressor](https://github.com/refresh-bio/agc) -(AGC). AGC provides performant and efficient compression ratios for -our assembly genomes. Like AnchorWave, AGC is also installed during -the Conda environment setup phase, so there is no need to install -this manually. - -To run the compression step, we can call the `align-assemblies` -command: - -```shell -./phg agc-compress \ - --db-path vcf_dbs \ - --fasta-list data/assemblies_list.txt \ - --reference-file data/Ref.fa -``` - -This command takes in 3 parameters: -* `--db-path` - path to directory storing the TileDB instances. The - AGC compressed genomes will be placed here on completion. - -> [!NOTE] -> The directory specified here should be the same directory used to -> initialize the TileDB instances in the database initialization -> (`initdb`) step. - -* `--fasta-list` - List of assembly FASTA genomes to compress. - -> [!NOTE] -> The list specified in `--fasta-list` _can_ be the same list used -> in the alignment (`align-assemblies`) step, but make sure header -> information in each FASTA contains sample IDs. See the -> [**"Important information regarding `agc` compression"**](#warning-important-information-regarding-agc-compression-warning) -> section for further information and how to remedy this. - -* `--reference-file` - Reference FASTA genome. - -After compression is finished, we can navigate to the directory -containing the TileDB instances. In my case, this would be the -subdirectory, `vcf_dbs`. Here, you will see a new file created: -`assemblies.agc`. This is the compressed AGC file containing our -assemblies. This file will be used later to query for haplotype -sequence regions and composite genome creation. - -#### :warning: Important information regarding `agc` compression :warning: - +### Annotate Fastas As of the current date of this document, the return methods of -AGC will not keep track of sample IDs when returning -sequence information from the compressed file **unless you explicitly -state the sample information in the header lines of the FASTA files -you wish to compress for the PHGv2 databases**. To explain this -further, let's imagine that we have two FASTA files: `LineA.fa` and -`LineB.fa`. These files contain information for only chromosome 1 and +AGC will not keep track of sample IDs when returning +sequence information from the compressed file **unless you explicitly +state the sample information in the header lines of the FASTA files +you wish to compress for the PHGv2 databases**. To explain this +further, let's imagine that we have two FASTA files: `LineA.fa` and +`LineB.fa`. These files contain information for only chromosome 1 and have a simple header that denotes this sequence name (e.g. `chr1`): ``` @@ -512,12 +518,12 @@ $ head LineB.fa ATGCGTTCGCCTTCCG ``` -After we compress this information using the `agc-compress` command, +After we compress this information using the `agc-compress` command, we will have a file called `assemblies.agc`. PHGv2 leverages this compressed sequence file when creating VCF data (_see -the [**"Create VCF files"**](#create-vcf-files) section for further +the [**"Create VCF files"**](#create-vcf-files) section for further details_). The issue arrives when we query the compressed data using -[AGC's `getctg`](https://github.com/refresh-bio/agc#extract-contigs-from-the-archive) +[AGC's `getctg`](https://github.com/refresh-bio/agc#extract-contigs-from-the-archive) command. When we query this hypothetical archive, we will get the following information: @@ -531,7 +537,7 @@ ATGCGTACGCGCACCG ATGCGTTCGCCTTCCG ``` -As you can see from the above hypothetical output, we now have no +As you can see from the above hypothetical output, we now have no means to efficiently track where each sequence is coming from due to the lack of header information from our prior FASTA files. We can remedy this by adding sample information to the headers: @@ -559,7 +565,7 @@ ATGCGTACGCGCACCG ATGCGTTCGCCTTCCG ``` -While we can either manually modify the header lines of our FASTA +While we can either manually modify the header lines of our FASTA file, this can become tedious and prone to a new set of downstream errors. To automate this, PHGv2 provides an optional command to append sample information to the headers of each FASTA file called @@ -574,27 +580,29 @@ phg annotate-fastas \ This command takes 3 parameters: -* `--keyfile` - A [tab-delimited](https://en.wikipedia.org/wiki/Tab-separated_values) +* `--keyfile` - A [tab-delimited](https://en.wikipedia.org/wiki/Tab-separated_values) keyfile containing two columns: - | Column | Value | - |--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| - | 1 | Path to FASTA file you would like annotated (this is similar to the text files used to point to the FASTA file paths in the [`agc-compress`](#compress-fasta-files) and [`align-assemblies`](#align-assemblies) commands). | -* | 2 | Name of the sample that will be appended to each header line | +| Column | Value | +|--------|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| +| 1 | Path to FASTA file you would like annotated (this is similar to the text files used to point to the FASTA file paths in the [`agc-compress`](#compress-fasta-files) and [`align-assemblies`](#align-assemblies) commands). | +| 2 | Name of the sample that will be appended to each header line | + + My example `annotation_keyfile.txt` file would look like this: + ``` data/LineA.fa LineA data/LineB.fa LineB ``` * `--threads` - Optional number of threads to annotate multiple FASTA files in parallel. _Defaults to `1`_. -* `-o` - Output directory for the newly annotated FASTA files +* `-o` - Output directory for the newly annotated FASTA files > [!WARNING] > This step must be performed before the `agc-compress` step. > [!WARNING] -> Sample IDs in the keyfile must match with what is found in the +> Sample IDs in the keyfile must match with what is found in the > TileDB instances. > [!NOTE] @@ -602,7 +610,7 @@ This command takes 3 parameters: > compressed, the extension should be `*.gz`. Once finished, this command will produce FASTA files with the name -of the sample from the keyfile appended to each header line. For +of the sample from the keyfile appended to each header line. For example, in our hypothetical FASTA files, our headers go from this: ``` @@ -635,6 +643,60 @@ ATGCGTACGCGCACCG >ATGCGTACGCGCACCG >``` + + + + +### Compress FASTA files +For optimal storage of sequence information, we can convert our +"plain-text" FASTA files into a more compact representation using +compression. For PHGv2, we can use a command called `agc-compress`, +which is a wrapper for the +[Assembled Genomes Compressor](https://github.com/refresh-bio/agc) +(AGC). AGC provides performant and efficient compression ratios for +our assembly genomes. Like AnchorWave, AGC is also installed during +the Conda environment setup phase, so there is no need to install +this manually. + +To run the compression step, we can call the `align-assemblies` +command: + +```shell +./phg agc-compress \ + --db-path vcf_dbs \ + --fasta-list data/assemblies_list.txt \ + --reference-file data/Ref.fa +``` + +This command takes in 3 parameters: +* `--db-path` - path to directory storing the TileDB instances. The + AGC compressed genomes will be placed here on completion. + +> [!NOTE] +> The directory specified here should be the same directory used to +> initialize the TileDB instances in the database initialization +> (`initdb`) step. + +* `--fasta-list` - List of assembly FASTA genomes to compress. + +> [!NOTE] +> The list specified in `--fasta-list` _can_ be the same list used +> in the alignment (`align-assemblies`) step, but make sure header +> information in each FASTA contains sample IDs. See the +> [**"Important information regarding `agc` compression"**](#warning-important-information-regarding-agc-compression-warning) +> section for further information and how to remedy this. + +* `--reference-file` - Reference FASTA genome. + +After compression is finished, we can navigate to the directory +containing the TileDB instances. In my case, this would be the +subdirectory, `vcf_dbs`. Here, you will see a new file created: +`assemblies.agc`. This is the compressed AGC file containing our +assemblies. This file will be used later to query for haplotype +sequence regions and composite genome creation. + + + ### Create VCF files Now that we have (1) created alignments of our assemblies against a single reference genome and (2) created compressed representations diff --git a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt index 91c13842..33ef60c0 100644 --- a/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt +++ b/src/main/kotlin/net/maizegenetics/phgv2/cli/AlignAssemblies.kt @@ -12,6 +12,9 @@ import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withContext import org.apache.logging.log4j.LogManager import java.io.File +import java.lang.management.ManagementFactory +import javax.management.MBeanServer +import javax.management.ObjectName /** * This will align assemblies to a reference genome. @@ -88,9 +91,9 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { } } - val totalThreads by option(help = "Number of threads available. These will be split among the alginments that are run in parallel") + val totalThreads by option(help = "Number of threads available. These will be split among the alignments that are run in parallel") .int() - .default(1) + .default(0) val inParallel by option( help = "Number of assemblies to simultaneously process. " + @@ -99,7 +102,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { "Consider this memory factor when providing values for the total-threads and in-parallel." ) .int() - .default(1) + .default(0) val refMaxAlignCov by option(help = "Anchorwave proali parameter R, indicating reference genome maximum alignment coverage.") .int() @@ -114,11 +117,16 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { val asmFasta: String, val outputDir: String, val gffFile: String, - val refSamOutFile: String + val refSamOutFile: String, + val maxRuns:Int, + val threadsPerRun:Int ) override fun run() { + // Returns Pair where the first value is the number of parallel alignments, the second is threadsPerAlignment + val runsAndThreads = calculatedNumThreadsAndRuns(totalThreads, inParallel, assemblies) + // create CDS fasta from reference and gff3 file val cdsFasta = "$outputDir/ref.cds.fasta" @@ -132,8 +140,9 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { val samOutFile = "${justNameRef}.sam" val refSamOutFile = "${outputDir}/${samOutFile}" + // For minimap2, we will use the number of processors available as the number of threads. val builder = ProcessBuilder( - "conda", "run", "-n", "phgv2-conda", "minimap2", "-x", "splice", "-t", totalThreads.toString(), "-k", "12", + "conda", "run", "-n", "phgv2-conda", "minimap2", "-x", "splice", "-t", runsAndThreads.second.toString(), "-k", "12", "-a", "-p", "0.4", "-N20", referenceFile, cdsFasta, "-o", refSamOutFile ) val redirectError = "$outputDir/minimap2Ref_error.log" @@ -150,10 +159,203 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { throw IllegalStateException("Error running minimap2 for reference: $error") } - runAnchorWaveMultiThread(referenceFile, assembliesList, cdsFasta, gff, refSamOutFile) + runAnchorWaveMultiThread(referenceFile, assembliesList, cdsFasta, gff, refSamOutFile,runsAndThreads) } + /** + * This method uses a java management factory bean to determine the + * amount of system memory on the hosting machine. + */ + fun getSystemMemory():Double { + val mBeanServer: MBeanServer = ManagementFactory.getPlatformMBeanServer() + // attribute is in bytes + val attribute = + mBeanServer.getAttribute(ObjectName("java.lang", "type", "OperatingSystem"), "TotalPhysicalMemorySize") + // translate bytes to gigabytes + val memoryGibi = attribute as Long / 1e9 + // and now to gibibytes, as anchorwave uses GiB + val memoryGigi = memoryGibi * 0.93 + myLogger.info("getSystemMemory: Total system memory: $attribute Bytes, $memoryGibi GB, $memoryGigi GiB") + return memoryGigi + } + + /** + * This function calculates the number of available processors, and the maxium + * system memory available for this machine. It returns a Pair where + * the first value is the number of alignments to run in parallel, and the second + * value is the number of threads to use for each alignment. + * + * An algorithm is used that selects a middle value between the number of threads + * and the number of alignments to run in parallel. + */ + fun calculatedNumThreadsAndRuns(totalThreads:Int, inParallel:Int, assemblies:String): Pair { + // If totalThreads or inParallel are 0, it means the user did not specify them + // In that case we calculate these values based on the number of processors available + // and the amount of memory available. + val processors = if (Runtime.getRuntime().availableProcessors() - 2 > 0) Runtime.getRuntime().availableProcessors() - 2 else 1 // leave 2 processors for the OS + val systemMemory = getSystemMemory() + + myLogger.info("calculateNumThreadsAndRuns: systemMemory: $systemMemory, processors: $processors") + + // If the user did not specify the number of threads to use, we will use all available + // processors. If the user did specify the number of threads, we will use that number + // of threads, but we will not exceed the number of processors available. + val totalThreadsToUse = if (totalThreads > 0) { + if (totalThreads > processors) { + myLogger.warn("The number of threads specified ($totalThreads) exceeds the number of processors available ($processors). Using $processors threads.") + processors + } else { + totalThreads + } + } else { + processors + } + + myLogger.info("calculateNumThreadsAndRuns: totalThreadsToUse: $totalThreadsToUse") + // Per Baoxing's chart, it takes just over 20G/thread to run anchorwave on a sample maize genome. + // The number of threads that can be run simultaneously is limited by the amount of + // memory available. We will use 21G/thread as the memory requirement, and + // calculate the number of threads that can be run simultaneously based on the + // amount of system memory available. This calculation assumes the user has access to + // all memory and processors on the machine. + + val concurrentThreads = (systemMemory/21).toInt() + myLogger.info("calculateNumThreadsAndRuns: max concurrent threads: $concurrentThreads") + if (concurrentThreads < 1) { + // do we still need this code? + // systemMemory may be low when running CI or junit tests. + // The test files are also very small. + // This clause allows junits to run. + myLogger.warn("There is not enough systemMemory to run anchorwave. Free memory: $systemMemory") + myLogger.warn("will attempt to run one alignment at a time, using just a single thread.") + return(Pair(1,1)) + } + + // Calculate how many threads we can run at the same time (based on memory availability) + val totalConcurrentThreads = if (concurrentThreads > totalThreadsToUse) { + totalThreadsToUse + } else { + concurrentThreads + } + + // Now that we know how many threads can be run concurrently, we need to + // determine how many parallel alignments to run, and how many threads each + // one gets. If the user specified the number of parallel alignments to run, + // we will use that number, or the number of assemblies in the list, whichever + // is smaller. + + val numAssemblies = File(assemblies).readLines().filter { it.isNotBlank() }.size + // This needs to return a Pair where the first value is the number of alignments, the seconds is threadsPerAlignment + val runsAndThreads = if (inParallel > 0) { + if (inParallel > totalConcurrentThreads) { + + myLogger.warn("The number of parallel alignments specified ($inParallel) exceeds the number of threads that can be run concurrently ($totalConcurrentThreads). Will run $totalConcurrentThreads concurrent alignments with 1 thread each.") + if (numAssemblies < totalConcurrentThreads){ + // there are fewer assemblies than threads available, so split the number of threads + // evenly among the assemblies. This may leave some threads unused. + Pair(numAssemblies, totalConcurrentThreads/numAssemblies) + } else { + // we have as many concurrent aligments as there are threads available, so each alignment gets 1 thread + Pair(totalConcurrentThreads, 1) + } + + } else { + // InParallel was defined by the user and is <= totalConcurrentThreads + // THis is adjusted lower if there are fewer assemblies to align than the number of parallel alignments + // Number of concurrent alignments is "inParallel", and number of threads-per-alignment becomes + // totalConcurrentThreads/inParallel + val concurrentAlignments = if (numAssemblies < inParallel) numAssemblies else inParallel + Pair(concurrentAlignments, totalConcurrentThreads / concurrentAlignments) + } + } else { + // User did not specify the number of alignments they want run in parallel (the inParallel param) + // Need to do calculations here to determine the best values for + // concurrent alignments and number of threads per alignment + maximizeRunsAndThreads(totalConcurrentThreads, numAssemblies) + } + myLogger.info("calculatedNumThreadsAndRuns: returning runsAndThreads values: $runsAndThreads") + return runsAndThreads + } + + /** + * This considers the number of threads on the machine, the number of threads user may have + * specified, and the number of assemblies to align. It calculates a middle value balancing + * number of threads and number of concurrent alignments. + */ + fun maximizeRunsAndThreads(totalConcurrentThreads:Int, totalAssemblies:Int): Pair { + // This returns a Pair where the first value is the number of runs, the seconds is threadsPerRun + // The maximum number of current runs is the total number of threads available, + // which would be each assembly getting a single thread. + // We believe it is more efficient to run fewer assemblies at a time, each with more threads. + // What this code does is calculate the middle of this. For example: If there are 20 assemblies + // and 10 threads, our options are: + // 1 run: 10 threads + // 2 runs: 5 threads + // 3 runs: 3 threads + // 4 runs: 2 threads + // 5 runs: 2 threads + // 6 or higher runs: 1 thread + // This code will return 3 runs, 3 threads each. + myLogger.info("maximizeRunsAndThreads: totalConcurrentThreads: $totalConcurrentThreads, totalAssemblies: $totalAssemblies") + val assembliesToThreads = mutableMapOf() + + // This loop asks if each assembly gets "numThreads", how many concurrent runs can we do? + for (numThreads in 1..totalConcurrentThreads) { + + val numRuns =totalConcurrentThreads/numThreads + if (numRuns > totalAssemblies) continue // invalid, number of runs is greater than the number of assemblies + val currentThreads = assembliesToThreads[numRuns] + // if currentThreads is not null and is > than numThreads, ignore. + // otherwise, replace this entry + if (currentThreads == null || currentThreads < numThreads) { + assembliesToThreads[numRuns] = numThreads + } + } + // we should now have a map with the highest number of threads for each number of runs + // We choose the best option based on: + // 1. if only 1 entry, use that + // 2. if only 2 entries, use the one with the highest number of threads + // 3. if there are > 3 entries, drop the one with the lowest number of runs and the one with the highest number of runs. + // Repeat until there are 2 or fewer entries left. + + + myLogger.info("maximizeRunsAndThreads: potential run/thread combinations:") + myLogger.info("numAlignments\tthreadsPerAlignments") + assembliesToThreads.forEach { (numRuns, threadsPerRun) -> + myLogger.info("$numRuns\t$threadsPerRun") + } + + // 1. if only 1 entry, use that + if (assembliesToThreads.size == 1) { + val entry = assembliesToThreads.entries.first() + myLogger.info("Running ${entry!!.key} runs with ${entry.value} threads per runs") + return Pair(entry.key, entry.value) + } else if (assembliesToThreads.size == 2) { + // 2. if are only 2 entries, use the one with the highest number of concurrent assemblies + val entry = assembliesToThreads.entries.maxByOrNull { it.key } + myLogger.info("Running ${entry!!.key} runs with ${entry.value} threads per runs") + return Pair(entry.key, entry.value) + } else { + // 3. if there are > 3 entries, drop the one with the lowest number of runs and the one with the highest number of runs. + // Repeat until there are 2 entries or fewer entries left. + while (assembliesToThreads.size > 2) { + val minEntry = assembliesToThreads.entries.minByOrNull { it.key } + val maxEntry = assembliesToThreads.entries.maxByOrNull { it.key } + assembliesToThreads.remove(minEntry!!.key) + assembliesToThreads.remove(maxEntry!!.key) + } + // 2. if only 2 entries, use the one with the highest number of concurrent alignments + // This may mean we are not using all the threads available, but it avoids the case of choosing + // to run 1 alignment with all threads, rather than 2 alignments with half the threads when + // these are our only choices. + + val entry = assembliesToThreads.entries.maxByOrNull { it.key } + myLogger.info("Running ${entry!!.key} concurrent alignments with ${entry!!.value} threads for each run") + return Pair(entry.key, entry.value) + } + } + private fun createCDSfromRefData(refFasta: String, gffFile: String, cdsFasta: String, outputDir: String): Boolean { // val command = "anchorwave gff2seq -r ${refFasta} -i ${gffFile} -o ${cdsFasta} " @@ -203,7 +405,8 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { assemblies: List, cdsFasta: String, gffFile: String, - refSamOutFile: String + refSamOutFile: String, + runsAndThreads: Pair // Pair(concurrentRuns, threadsPerRun) ) { runBlocking { // Setup @@ -215,9 +418,8 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { assemblies.forEach { asmFile -> // Column names were checked for validity above - myLogger.info("Adding: $asmFile for processing") - inputChannel.send(InputChannelData(refFasta, asmFile, outputDir, gffFile, refSamOutFile)) + inputChannel.send(InputChannelData(refFasta, asmFile, outputDir, gffFile, refSamOutFile, runsAndThreads.first, runsAndThreads.second)) } myLogger.info("Done Adding data to the inputChannel:") inputChannel.close() // Need to close this here to show the workers that it is done adding more data @@ -227,7 +429,10 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { // terminate above when there is no more data on the input channel // This calls anchorwave's proali, and minimap2 scripts to process the alignments - val workerThreads = (1..inParallel).map { run -> + // The number of worker threads should equal the number of concurrent runs + // Inside alignAssembly(), each alignment will get threadsAndRuns.second, which is the number + // of threads allocated for each alignment. This data is included in the InputChannelData record + val workerThreads = (1..runsAndThreads.first).map { run -> launch { alignAssembly(inputChannel, cdsFasta, gffFile) } } @@ -238,16 +443,14 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { withContext( Dispatchers.Default ) { - val threadsPerRun = totalThreads / inParallel - println("alignAssembly: totalThreads: $totalThreads, inParallel: $inParallel, threadsPerRun: $threadsPerRun") + for (assemblyEntry in inputChannel) { // Column names were checked for validity above val justName = File(assemblyEntry.asmFasta).nameWithoutExtension val samShort = "${justName}.sam" val asmSamFile = "${assemblyEntry.outputDir}/${samShort}" - myLogger.info("alignAssembly: asmFileFull: ${assemblyEntry.asmFasta}, outputFile: $asmSamFile") - + myLogger.info("alignAssembly: asmFileFull: ${assemblyEntry.asmFasta}, outputFile: $asmSamFile , threadsPerRun: ${assemblyEntry.threadsPerRun}") val builder = ProcessBuilder( "conda", @@ -258,7 +461,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { "-x", "splice", "-t", - threadsPerRun.toString(), + assemblyEntry.threadsPerRun.toString(), "-k", "12", "-a", @@ -292,7 +495,7 @@ class AlignAssemblies : CliktCommand(help="Align assemblies using anchorwave") { cdsFasta, assemblyEntry.refSamOutFile, asmSamFile, - threadsPerRun.toString() + assemblyEntry.threadsPerRun.toString() ) } } diff --git a/src/main/kotlin/net/maizegenetics/phgv2/cli/LoadVcf.kt b/src/main/kotlin/net/maizegenetics/phgv2/cli/LoadVcf.kt index 4d8e0b7d..82a0d744 100644 --- a/src/main/kotlin/net/maizegenetics/phgv2/cli/LoadVcf.kt +++ b/src/main/kotlin/net/maizegenetics/phgv2/cli/LoadVcf.kt @@ -27,7 +27,7 @@ class LoadVcf : CliktCommand(help="load g.vcf and h.vcf files into tiledb datase private val myLogger = LogManager.getLogger(LoadVcf::class.java) - val vcfDir by option(help = "VCF file directory") + val vcfDir by option(help = "Full path to VCF file directory") .default("") .validate { require(it.isNotBlank()) { diff --git a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt index 011f2856..9a3b6a0b 100644 --- a/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt +++ b/src/test/kotlin/net/maizegenetics/phgv2/cli/AlignAssembliesTest.kt @@ -12,6 +12,76 @@ import kotlin.test.assertTrue @ExtendWith(TestExtension::class) class AlignAssembliesTest { + @Test + fun testSystemMemory() { + val availMemory = AlignAssemblies().getSystemMemory() + // Verify that memory is not 0. It is difficult to know precisely + // what should be returned as we do not know the system running the test. + // This has been tested manually on multiple systems. + assertTrue(availMemory > 0, "System memory is 0") + } + + @Test + fun testNumThreadsAndRuns() { + // Test more assemblies than threads, it will pick the + // middle option that has the highest number of parallel alignments + // The numbers with 10 threads and 20 total assemblies are: + // 1 run: 10 threads + // 2 runs: 5 threads + // 3 runs: 3 threads + // 4 runs: 2 threads + // 5 runs: 2 threads + // 6 or higher runs: 1 thread + + var totalConcurrentThreads = 10 + var totalAssemblies = 20 + + var alignmentsToThreads = AlignAssemblies().maximizeRunsAndThreads(totalConcurrentThreads, totalAssemblies) + println("\nAlignAssembliesTest: alignmentsToThreads: $alignmentsToThreads") + assertEquals(alignmentsToThreads, Pair(3, 3)) + + totalAssemblies = 5 + // This picks 3/3 + // Options will be: + // 1 run: 10 threads + // 2 runs: 5 threads + // 3 runs: 3 threads + // 4 runs: 2 threads + // 5 runs: 2 threads + alignmentsToThreads = AlignAssemblies().maximizeRunsAndThreads(totalConcurrentThreads, totalAssemblies) + println("\nAlignAssembliesTest: alignmentsToThreads: $alignmentsToThreads") + assertEquals(alignmentsToThreads, Pair(3, 3)) + + // test higher number of threads: + // This picks 6/7 + // Options will be: (only highest number of runs with the same thread count is kept) + // 1 run: 45 threads + // 2 runs: 22 threads + // 3 runs: 15 threads + // 4 runs: 11 threads + // 5 runs: 9 threads + // 6 runs: 7 threads + // 7 runs: 6 threads + // 9 runs: 5 threads + // 11 runs: 4 threads + // 15 runs: 3 threads + // 22 runs: 2 threads + totalConcurrentThreads = 45 + totalAssemblies = 25 + alignmentsToThreads = AlignAssemblies().maximizeRunsAndThreads(totalConcurrentThreads, totalAssemblies) + println("\nAlignAssembliesTest: alignmentsToThreads: $alignmentsToThreads") + assertEquals( Pair(6,7), alignmentsToThreads) + + // test with just 1 thread and 1 run. This is to hit + // code coverage for the case where there is only 1 option + // for the number of runs and threads. + totalConcurrentThreads = 1 + totalAssemblies = 1 + alignmentsToThreads = AlignAssemblies().maximizeRunsAndThreads(totalConcurrentThreads, totalAssemblies) + println("\nAlignAssembliesTest: alignmentsToThreads: $alignmentsToThreads") + assertEquals( Pair(1,1), alignmentsToThreads) + } + @Test fun testCliktParams() { val alignAssemblies = AlignAssemblies() @@ -73,7 +143,7 @@ class AlignAssembliesTest { val result = alignAssemblies.test( "--gff ${TestExtension.smallseqAnchorsGffFile} --reference-file ${TestExtension.smallseqRefFile} " + - "-a ${TestExtension.smallseqAssembliesListFile} -o ${TestExtension.tempDir}" + "-a ${TestExtension.smallseqAssembliesListFile} -o ${TestExtension.tempDir} --total-threads 1 --in-parallel 1" ) println("testRunningAlignAssemblies: result output: ${result.output}") @@ -114,4 +184,81 @@ class AlignAssembliesTest { } + @Test + fun testSystemDefinedThreadsAndRuns() { + + val alignAssemblies = AlignAssemblies() + + val result = alignAssemblies.test( + "--gff ${TestExtension.smallseqAnchorsGffFile} --reference-file ${TestExtension.smallseqRefFile} " + + "-a ${TestExtension.smallseqAssembliesListFile} -o ${TestExtension.tempDir} " + ) + + println("testRunningAlignAssemblies: result output: ${result.output}") + + assertEquals(result.statusCode, 0, "status code not 0: ${result.statusCode}") + + val lineAMAF = TestExtension.tempDir + "LineA.maf" + assertTrue(File(lineAMAF).exists(), "File $lineAMAF does not exist") + + val lineBMAF = TestExtension.tempDir + "LineB.maf" + assertTrue(File(lineBMAF).exists(), "File $lineBMAF does not exist") + + // Checksums of files are not verified here. Running in parallel means + // the entries in the maf files are in unspecified order. The files, while containing + // the same data, will have different checksums. This is not a problem, as the + // order of the entries in the maf files is not important. + + + } + + @Test + fun testRequestedThreadsGreaterThanAvail() { + // I'm assuming all of the machines which will run this test will + // have less than 300 threads available for processing. The code + // should default to the maximum number of threads available (minus 2 for IO) + // This test ups our code coverage. + val alignAssemblies = AlignAssemblies() + + val result = alignAssemblies.test( + "--gff ${TestExtension.smallseqAnchorsGffFile} --reference-file ${TestExtension.smallseqRefFile} " + + "-a ${TestExtension.smallseqAssembliesListFile} -o ${TestExtension.tempDir} --total-threads 300 --in-parallel 1" + ) + + println("testRunningAlignAssemblies: result output: ${result.output}") + + assertEquals(result.statusCode, 0, "status code not 0: ${result.statusCode}") + + val lineAMAF = TestExtension.tempDir + "LineA.maf" + assertTrue(File(lineAMAF).exists(), "File $lineAMAF does not exist") + + val lineBMAF = TestExtension.tempDir + "LineB.maf" + assertTrue(File(lineBMAF).exists(), "File $lineBMAF does not exist") + + } + + @Test + fun testInParallelGreaterThanNumberOfAssemblies() { + // There are only 2 assemblies that we are aligning, will give it + // in-parallel of 4. It should adjust to 2. + // This test ups our code coverage. + val alignAssemblies = AlignAssemblies() + + val result = alignAssemblies.test( + "--gff ${TestExtension.smallseqAnchorsGffFile} --reference-file ${TestExtension.smallseqRefFile} " + + "-a ${TestExtension.smallseqAssembliesListFile} -o ${TestExtension.tempDir} --total-threads 4 --in-parallel 4" + ) + + println("testRunningAlignAssemblies: result output: ${result.output}") + + assertEquals(result.statusCode, 0, "status code not 0: ${result.statusCode}") + + val lineAMAF = TestExtension.tempDir + "LineA.maf" + assertTrue(File(lineAMAF).exists(), "File $lineAMAF does not exist") + + val lineBMAF = TestExtension.tempDir + "LineB.maf" + assertTrue(File(lineBMAF).exists(), "File $lineBMAF does not exist") + + } + } \ No newline at end of file