Skip to content
New issue

Have a question about this project? # for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “#”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? # to your account

Make FastaDir thread safe #112

Closed
theferrit32 opened this issue May 31, 2023 · 9 comments
Closed

Make FastaDir thread safe #112

theferrit32 opened this issue May 31, 2023 · 9 comments

Comments

@theferrit32
Copy link
Contributor

The _open_for_reading function puts opened FastaFile objects in a memoize cache. These FastaFile objects have open file handles that are not thread safe. This causes issues in applications that use SeqRepo and may attempt to fetch a sequence from the same bgz file from two threads in the same process, via the same in-process SeqRepo object. This synchronization bug and the exception it can lead to are particularly difficult to debug in Python web servers which [deleted rant about python threading]

@functools.lru_cache()
def _open_for_reading(self, path):
_logger.debug("Opening for reading: " + path)
return FabgzReader(path)

So FastaFile objects are inherently not thread safe, because they logically represent (and contain) a C open()ed descriptor and are seeked on directly by the htslib functions called by FastaFile and elsewhere in pysam.

The file handle struct in htslib that each FastaFile has an instantiation of is here:

https://github.com/samtools/htslib/blob/6143086502567c5c4bb5cacb2951f664ba28ed6e/hfile.c#L524-L528

We do not want to eliminate an open FastaFile cache in seqrepo because of the overhead involved in opening and closing files. We could make these FastaFile objects lockable with contextmanager and require FastaDir to acquire the lock before using .fetch method. Like here:

fabgz = self._open_for_reading(path)
return fabgz.fetch(seq_id, start, end)

The downside to this is it introduces overhead for acquiring and releasing the mutex even in single-threaded programs. The question then is whether this overhead is lower than the overhead opening/closing files (it almost certainly is). If using contextmanager does introduce problematic overhead, we could have something like an environment variable to disable thread safety, and applications will need to ensure they use one SeqRepo object per thread.

It also means we have to remember to always acquire the mutex on the object returned from _open_for_reading if we want to do something stateful with it. As far as I can tell though this function is only used internally in fastadir.py, so that's easy. If we go this route we should _explicitly document that this function should not be used externally, and if it is, then thread safety needs to be considered.

@theferrit32
Copy link
Contributor Author

I have a work in progress that appears to be working well to eliminate these issues. The pysam and sqlite3 c libraries really do not like having the same file open multiple times. There's some race condition between when the file handle object gets destructed and when the next time it is opened, so this only manifests when there are a large quantity of requests that involve accessing the same file via pysam/htslib and sqlite3. I've added locks a fabgz file lock in seqrepo, and explicit destructors that close and destruct open handles.

@reece
Copy link
Member

reece commented Sep 12, 2023

I have a different view of what's happening here, and getting to the bottom of that is essential for solving the issue.

These FastaFile objects have open file handles that are not thread safe.

By file handles, I assume you mean file descriptors. In what sense do you think that the fds are not thread safe? Threads that have fds open to the same file are thread safe (thanks to modern Unix/Linux and C libs). There should be no contention at that level.

@reece
Copy link
Member

reece commented Sep 12, 2023

I now have a script that reliably demonstrates that the problem is with lru_cache in a threaded context, essentially as @theferrit32 conjectured. I'll post the tests later today, but wanted to at least share the results now.

I assembled a list of 239980 NM unique accessions and wrote a script that allows me to easily control the # of threads and # of accessions. For some tests, I commented out lru_cache at fastadir.py:212 to disable fd caching in order to assess benefit and threading issues.

one thread, lru_cache enabled (current state)

snafu$ ./misc/threading-test -s /usr/local/share/seqrepo/2021-01-29/ -a accessions -m 10000 -n 1
2023-09-12 07:45:50 snafu root[1955285] INFO Queued 10000 accessions from accessions
2023-09-12 07:45:50 snafu root[1955285] INFO Starting run with 1 threads
2023-09-12 07:45:52 snafu root[1955285] INFO <Worker(Thread-1, started 139940591564480)>: Done; processed 10000 accessions
2023-09-12 07:45:52 snafu root[1955285] INFO Fetched 10000 sequences in 1.34815899 s with 1 threads; 7418 seq/sec

one thread, lru_cache disabled

The upshot is that caching is worth a lot (7418 seq/seq → 232 seq/sec)

snafu$ ./misc/threading-test -s /usr/local/share/seqrepo/2021-01-29/ -a accessions -m 10000 -n 1
Disabled lru_cache
2023-09-12 07:44:16 snafu root[1955067] INFO Queued 10000 accessions from accessions
2023-09-12 07:44:16 snafu root[1955067] INFO Starting run with 1 threads
2023-09-12 07:44:59 snafu root[1955067] INFO <Worker(Thread-1, started 140101585729216)>: Done; processed 10000 accessions
2023-09-12 07:44:59 snafu root[1955067] INFO Fetched 10000 sequences in 43.024929758 s with 1 threads; 232 seq/sec

two threads, lru_cache enabled

snafu$ ./misc/threading-test -s /usr/local/share/seqrepo/2021-01-29/ -a accessions -m 10000 -n 2
2023-09-12 07:47:49 snafu root[1955573] INFO Queued 10000 accessions from accessions
2023-09-12 07:47:49 snafu root[1955573] INFO Starting run with 2 threads
[E::bgzf_uncompress] Inflate operation failed: invalid distance too far back
[E::bgzf_uncompress] Inflate operation failed: invalid distance too far back
⋮
FileNotFoundError: [Errno 2] b'No such file or directory'
FileNotFoundError: [Errno 2] b'No such file or directory'

We get two errors because both threads run out of fds at essentially the same time

two threads, lru_cache disabled

snafu$ ./misc/threading-test -s /usr/local/share/seqrepo/2021-01-29/ -a accessions -m 10000 -n 2
Disabled lru_cache
2023-09-12 07:54:22 snafu root[1956250] INFO Queued 10000 accessions from accessions
2023-09-12 07:54:22 snafu root[1956250] INFO Starting run with 2 threads
2023-09-12 07:54:52 snafu root[1956250] INFO <Worker(Thread-1, started 139696936056512)>: Done; processed 5000 accessions
2023-09-12 07:54:52 snafu root[1956250] INFO <Worker(Thread-2, started 139696927663808)>: Done; processed 5000 accessions
2023-09-12 07:54:52 snafu root[1956250] INFO Fetched 10000 sequences in 45.292377406 s with 2 threads; 221 seq/sec

five threads, lru_cache disabled

snafu$ ./misc/threading-test -s /usr/local/share/seqrepo/2021-01-29/ -a accessions -m 10000 -n 5
Disabled lru_cache
2023-09-12 07:45:03 snafu root[1955144] INFO Queued 10000 accessions from accessions
2023-09-12 07:45:03 snafu root[1955144] INFO Starting run with 5 threads
2023-09-12 07:45:30 snafu root[1955144] INFO <Worker(Thread-4, started 140123433854656)>: Done; processed 1985 accessions
2023-09-12 07:45:30 snafu root[1955144] INFO <Worker(Thread-1, started 140123530327744)>: Done; processed 1926 accessions
2023-09-12 07:45:30 snafu root[1955144] INFO <Worker(Thread-2, started 140123521935040)>: Done; processed 2021 accessions
2023-09-12 07:45:30 snafu root[1955144] INFO <Worker(Thread-3, started 140123442247360)>: Done; processed 2063 accessions
2023-09-12 07:45:30 snafu root[1955144] INFO <Worker(Thread-5, started 140123425461952)>: Done; processed 2005 accessions
2023-09-12 07:45:30 snafu root[1955144] INFO Fetched 10000 sequences in 61.054243067 s with 5 threads; 164 seq/sec

My conclusion from all of this is that we must directly address thread-safe caching. To be clear, this isn't a problem with lru_cache per se, or with the inability to use the same fds in multiple threads, but instead that the lru_cache is per-thread, and that this leads us to exhaust fds. This issue is purely a problem with resource allocation.

Options I see:

  • Cut lru_cache size to something small (like 10). That's a hack, but would likely work in practice and keep some of the benefit.
  • Switch to a fd pool
  • Catch errors, close fds explicitly, and retry on failure

@theferrit32
Copy link
Contributor Author

theferrit32 commented Oct 6, 2023

@reece thanks for adding those scripts it helps evaluate different conditions easily. I've made a couple modifications and added another README(2) here: https://github.com/theferrit32/biocommons.seqrepo/blob/kf/112-make-fastadir-thread-safe/misc/threading-tests/README2.md

To be clear, this isn't a problem with lru_cache per se, or with the inability to use the same fds in multiple threads, but instead that the lru_cache is per-thread, and that this leads us to exhaust fds

On this point, there are two distinct file descriptor issues that have arisen under load testing.

  1. Opening a lot of SeqRepo objects creates a lot of open resource handles like sqlite objects and file descriptors (under FastaFile) that kind of just sit around in memory for a while if not explicitly closed and exhaust system resources. This issue was seen in seqrepo-rest-service. Limit number of SeqRepo instances seqrepo-rest-service#15 resolves this.
  2. File descriptors themselves are not thread safe, because they represent a specific read offset (this is where the invalid distance… errors come from. Having these in an lru_cache (with bounded size) is not an issue unless there are concurrent accesses to the SeqRepo (and then to the FastaDir) object. Which is not usually the case in simple one-thread Python code, but it is the case in any async code that references a SeqRepo object. The async code (for example in a FastApi or other async http server) can run multiple coroutines concurrently and these will break if they access the same FastaFile (which is common). I don't know if there are any other thread safety issues in the FastaDir or SeqRepo classes but I have not encountered any.
    We have seen this in a FastApi codebase and my initial quick solution was here: Queryhandler concurrent access fix cancervariants/variation-normalization#388 but we decided this mutex was at too high a level and it would be better to put it lower around the actual object that is the issue, the FastaFile objects in the cache.
    We are currently running the revision in Thread safety for FastaDir file handle cache #117 on our server here: https://normalization.clingen.app/variation and it handles concurrency fine.

The performance penalty from disabling the lru_cache is pretty substantial so I think that would not be a great option. I added some additional options to the FastaDir class in that branch to also look at this change, adding threading.Lock protection around the individual FastaFile objects in the cache.

I also added threading-test2 and some evaluations to README2.md and when single threading there isn't much penalty from adding the threading.Lock protection around the individual FastaFile objects inside the cache. (see: https://github.com/theferrit32/biocommons.seqrepo/blob/kf/112-make-fastadir-thread-safe/misc/threading-tests/README2.md#cache-enabled-with-and-without-locks)

Copy link

github-actions bot commented Nov 6, 2023

This issue is stale because it has been open 30 days with no activity. Remove stale label or comment or this will be closed in 7 days.

@github-actions github-actions bot added the stale Issue is stale and subject to automatic closing label Nov 6, 2023
@theferrit32 theferrit32 removed the stale Issue is stale and subject to automatic closing label Nov 8, 2023
@theferrit32
Copy link
Contributor Author

@quinnwai
Copy link

quinnwai commented Mar 4, 2024

Hi there! Saw that this was closed but I have experienced a similar issue on 0.6.9. Anecdotally, when using multiprocessing to parallelize translation code from gnomad to vrs ID in vrs-python, I seem to run into file IO problems, even when only specifying 1 worker:

[E::fai_load3_core] Failed to open FASTA file /Users/quinnwaiwong/seqrepo/latest/sequences/2016/0827/211653/1472332613.04.fa.bgz
Exception ignored in: <function FabgzReader.del at 0x10691c1f0>
Traceback (most recent call last):
File “/Users/quinnwaiwong/projects/ohsu/vrs-python-testing/venv3.10/lib/python3.10/site-packages/biocommons/seqrepo/fastadir/fabgz.py”, line 83, in del
self._fh.close()
AttributeError: ‘FabgzReader’ object has no attribute ‘_fh’
...
File "/Users/quinnwaiwong/projects/ohsu/vrs-python-testing/venv3.10/lib/python3.10/site-packages/ga4gh/vrs/dataproxy.py", line 125, in _get_sequence
return self.sr.fetch_uri(coerce_namespace(identifier), start, end)
File "/Users/quinnwaiwong/projects/ohsu/vrs-python-testing/venv3.10/lib/python3.10/site-packages/biocommons/seqrepo/seqrepo.py", line 195, in fetch_uri
return self.fetch(alias=alias, namespace=namespace, start=start, end=end)
File "/Users/quinnwaiwong/projects/ohsu/vrs-python-testing/venv3.10/lib/python3.10/site-packages/biocommons/seqrepo/seqrepo.py", line 186, in fetch
return self.sequences.fetch(seq_id, start, end)
File "/Users/quinnwaiwong/projects/ohsu/vrs-python-testing/venv3.10/lib/python3.10/site-packages/biocommons/seqrepo/fastadir/fastadir.py", line 149, in fetch
with self._open_for_reading(path) as fabgz:
File "/Users/quinnwaiwong/projects/ohsu/vrs-python-testing/venv3.10/lib/python3.10/site-packages/biocommons/seqrepo/fastadir/fastadir.py", line 94, in _open_for_reading
return FabgzReader(path)
File "/Users/quinnwaiwong/projects/ohsu/vrs-python-testing/venv3.10/lib/python3.10/site-packages/biocommons/seqrepo/fastadir/fabgz.py", line 80, in init
self._fh = FastaFile(filename)
File "pysam/libcfaidx.pyx", line 121, in pysam.libcfaidx.FastaFile.cinit
File "pysam/libcfaidx.pyx", line 181, in pysam.libcfaidx.FastaFile._open
OSError: error when opening file /Users/quinnwaiwong/seqrepo/latest/sequences/2016/0827/211653/1472332613.04.fa.bgz

here's the associated code

from biocommons.seqrepo import SeqRepo
from ga4gh.vrs.dataproxy import SeqRepoDataProxy
from ga4gh.vrs.extras.translator import AlleleTranslator

from datetime import datetime
import multiprocessing
import subprocess

# get vrs ids
def translate(gnomad_expr):
    data_proxy = SeqRepoDataProxy(SeqRepo("/Users/quinnwaiwong/seqrepo/latest"))
    translator = AlleleTranslator(data_proxy)
    allele = translator._from_gnomad(gnomad_expr, require_validation=False)
    if allele is not None:
        return (gnomad_expr, dict(allele))

def calculate_gnomad_expressions(input_vcf, alt=True):
    if alt: command = f"bcftools query -f '%CHROM-%POS-%REF-%ALT\n' {input_vcf}"
    else: command = f"bcftools query -f '%CHROM-%POS-%REF-%REF\n' {input_vcf}"
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True)

    # Iterate over the output of bcftools and yield each gnomAD expression
    for line in process.stdout:
        yield line.strip()

if __name__ == "__main__":
    input_vcf = "vcf/NA12878.chr1.100.000.vcf"

    gnomad_generator = calculate_gnomad_expressions(input_vcf)

    worker_count = 1 # os.cpu_count()
    progress_interval = 10

    manager = multiprocessing.Manager()
    allele_dict = manager.dict()

    with multiprocessing.Pool(worker_count) as pool:
        # call the function for each item in parallel
        c = 0
        print(datetime.now().isoformat(), c)

        for result in pool.imap(translate, gnomad_generator):
            c += 1
            if result:
                allele_dict[result[0]] = result[1]
            elif c % progress_interval == 0:
                print(datetime.now().isoformat(), c)

Do yall know if this is a related issue? Let me know if I can provide more info

@theferrit32
Copy link
Contributor Author

@quinnwai I believe you are hitting the same issue as issue 1 listed above

Each task being executed on your multiprocessing Pool is constructing a new SeqRepo object, which opens new file descriptors without explicitly closing them. They only get closed implicitly when the resources are finally removed from memory by Python's runtime but this can be much later. So you eventually hit an OS error caused by the open file limit being reached.

You can use the initializer argument to multiprocessing.Pool to give each worker in the pool a single SeqRepo object that gets reused across tasks. The initializer gets run once when the pool creates a new process, and you can add things to that worker's globals the same way using global.

Something like the below modification works for me. I just moved the data_proxy and translator into a initializer function passed to multiprocessing.Pool.

from biocommons.seqrepo import SeqRepo
from ga4gh.vrs.dataproxy import SeqRepoDataProxy
from ga4gh.vrs.extras.translator import AlleleTranslator

from datetime import datetime
import multiprocessing
import subprocess


# get vrs ids
def translate(gnomad_expr):
    # data_proxy = SeqRepoDataProxy(SeqRepo(seqrepo_path))
    # translator = AlleleTranslator(data_proxy)
    allele = translator._from_gnomad(gnomad_expr, require_validation=False)
    if allele is not None:
        return (gnomad_expr, dict(allele))


def calculate_gnomad_expressions(input_vcf, alt=True):
    if alt:
        command = f"bcftools query -f '%CHROM-%POS-%REF-%ALT\n' {input_vcf}"
    else:
        command = f"bcftools query -f '%CHROM-%POS-%REF-%REF\n' {input_vcf}"
    process = subprocess.Popen(command, shell=True, stdout=subprocess.PIPE,
                               stderr=subprocess.PIPE, universal_newlines=True)

    # Iterate over the output of bcftools and yield each gnomAD expression
    for line in process.stdout:
        yield line.strip()


def worker_initializer(seqrepo_path):
    global data_proxy
    global translator
    data_proxy = SeqRepoDataProxy(SeqRepo(seqrepo_path))
    translator = AlleleTranslator(data_proxy)


if __name__ == "__main__":
    input_vcf = "/Users/kferrite/dev/data/clinvar-20240305.vcf.gz"
    seqrepo_path = "/Users/kferrite/dev/biocommons.seqrepo/seqrepo/2024-02-20"

    gnomad_generator = calculate_gnomad_expressions(input_vcf)

    worker_count = 1  # os.cpu_count()
    progress_interval = 10

    manager = multiprocessing.Manager()
    allele_dict = manager.dict()

    with multiprocessing.Pool(
        worker_count,
        initializer=worker_initializer,
        initargs=(seqrepo_path,),
    ) as pool:
        # call the function for each item in parallel
        c = 0
        print(datetime.now().isoformat(), c)

        for result in pool.imap(translate, gnomad_generator):
            c += 1
            if result:
                allele_dict[result[0]] = result[1]
            elif c % progress_interval == 0:
                print(datetime.now().isoformat(), c)

@quinnwai
Copy link

quinnwai commented Mar 7, 2024

Awesome thanks for the help! This is working well excited to use this to process some large VCFs

# for free to join this conversation on GitHub. Already have an account? # to comment
Labels
None yet
Projects
Development

No branches or pull requests

3 participants