-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathutils.py
144 lines (120 loc) · 3.57 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Time : 2016/12/8 16:58
# @Author : Runsheng
# @File : utils.py
import subprocess
import sys
import signal
import os
import fnmatch
import multiprocessing
import unittest
from Bio import SeqIO
def myexe(cmd, timeout=0):
"""
a simple wrap of the shell
mainly used to run the bwa mem mapping and samtool orders
"""
def setupAlarm():
signal.signal(signal.SIGALRM, alarmHandler)
signal.alarm(timeout)
def alarmHandler(signum, frame):
sys.exit(1)
proc=subprocess.Popen(cmd, shell=True, preexec_fn=setupAlarm,
stdout=subprocess.PIPE, stderr=subprocess.PIPE,cwd=os.getcwd())
out, err=proc.communicate()
print (err, "Run finished with return code:", proc.returncode)
return out
def fasta2dic(fastafile):
"""
Give a fasta file name, return a dict contains the name and seq
Require Biopython SeqIO medule to parse the sequence into dict, a large genome may take a lot of RAM
"""
if ".gz" in fastafile:
handle=gzip.open(fastafile, "rU")
else:
handle=open(fastafile, "rU")
record_dict=SeqIO.to_dict(SeqIO.parse(handle,"fasta"))
handle.close()
return record_dict
def chr_select(record_dict, chro, start,end):
"""
Note the start and end is 0 based
give the name of refdic, and the chr, start and end to be used
return the name and sequence (both as str)
for example, chrcut(record_dict, "I", 100,109) returns
("I:100_109","AAAAAAAAAA")
"""
name=chro+ ":"+str(start)+"_"+str(end)
seq=str(record_dict[chro][start:end].seq)
return name,seq
def dic2fasta(record_dict,out="record_dict.fasta"):
"""
Write a record_dict of SeqIO sequence dict back to fasta file
:param record_dict:
:param out:
:return:
"""
with open(out,"w") as f:
for record in sorted(record_dict.keys()):
name=record
seq=str(record_dict[name].seq)
f.write(">")
f.write(name)
f.write("\n")
f.write(seq)
f.write("\n")
def myglob(seqdir, word):
"""
to write a glob for python2 for res-glob
"""
matches=[]
for root, dirnames, filenames in os.walk(seqdir):
for filename in fnmatch.filter(filenames, word):
matches.append(os.path.join(root, filename))
return matches
def parmap(f, X, nprocs=multiprocessing.cpu_count()):
"""
a function to use mutip map inside a function
modified from stackoverflow, 3288595
:param f:
:param X:
:param nprocs: core, if not given, use all core
:return:
"""
q_in = multiprocessing.Queue(1)
q_out = multiprocessing.Queue()
proc = [multiprocessing.Process(target=fun, args=(f, q_in, q_out))
for _ in range(nprocs)]
for p in proc:
p.daemon = True
p.start()
sent = [q_in.put((i, x)) for i, x in enumerate(X)]
[q_in.put((None, None)) for _ in range(nprocs)]
res = [q_out.get() for _ in range(len(sent))]
[p.join() for p in proc]
return [x for i, x in sorted(res)]
def fun(f, q_in, q_out):
"""
for parmap
:param f:
:param q_in:
:param q_out:
:return:
"""
while True:
i, x = q_in.get()
if i is None:
break
q_out.put((i, f(x)))
class TestUtils(unittest.TestCase):
def SetUp(self):
pass
def TearDown(self):
pass
def test_myexe(self):
out = myexe("ls")
self.assertIn("utils.py", out)
if __name__=="__main__":
unittest.main()