-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathxmfa_process.py
123 lines (106 loc) · 3.87 KB
/
xmfa_process.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
from Bio import SeqIO
import tempfile
import argparse
import json
import os
from BCBio import GFF
def parse_xmfa(xmfa):
"""Simple XMFA parser until https://github.com/biopython/biopython/pull/544
"""
current_lcb = []
current_seq = {}
for line in xmfa.readlines():
if line.startswith('#'):
continue
if line.strip() == '=':
if 'id' in current_seq:
current_lcb.append(current_seq)
current_seq = {}
yield current_lcb
current_lcb = []
else:
line = line.strip()
if line.startswith('>'):
if 'id' in current_seq:
current_lcb.append(current_seq)
current_seq = {}
data = line.strip().split()
# 0 1 2 3 4 5
# > 1:5986-6406 + CbK.fa # CbK_gp011
id, loc = data[1].split(':')
start, end = loc.split('-')
current_seq = {
'rid': '_'.join(data[1:]),
'id': id,
'start': int(start),
'end': int(end),
'strand': 1 if data[2] == '+' else -1,
'seq': '',
'comment': '',
}
if len(data) > 5:
current_seq['comment'] = ' '.join(data[5:])
# else:
# current_seq['seq'] += line.strip()
def percent_identity(a, b):
"""Calculate % identity, ignoring gaps in the host sequence
"""
match = 0
mismatch = 0
for char_a, char_b in zip(list(a), list(b)):
if char_a == '-':
continue
if char_a == char_b:
match += 1
else:
mismatch += 1
if match + mismatch == 0:
return 0.0
return 100 * float(match) / (match + mismatch)
def get_fasta_ids(sequences):
"""Returns a list of fasta records in the order they appear
"""
ids = []
for seq in SeqIO.parse(sequences, 'fasta'):
ids.append(seq.id)
return ids
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='parse xmfa file')
parser.add_argument('gff3', type=argparse.FileType('r'), help='Multi-GFF3 File')
parser.add_argument('fasta', type=argparse.FileType('r'), help='Multi-FA file')
parser.add_argument('xmfa', type=argparse.FileType('r'), help='XMFA File')
parser.add_argument('output_dir', type=str, help="output directory")
args = parser.parse_args()
fasta_list = get_fasta_ids(args.fasta)
lcbs = parse_xmfa(args.xmfa)
if not os.path.exists(args.output_dir):
os.makedirs(args.output_dir)
output = {
'fasta': [],
'gff3': [],
'xmfa': None,
}
processed_xmfa = os.path.join(args.output_dir, 'regions.json')
with open(processed_xmfa, 'w') as handle:
json.dump([lcb for lcb in lcbs if len(lcb) > 1], handle, sort_keys=True)
output['xmfa'] = processed_xmfa
# Have to seek because we already access args.fasta once in id_tn_dict
args.fasta.seek(0)
# Load up sequence(s) for GFF3 data
seq_dict = SeqIO.to_dict(SeqIO.parse(args.fasta, "fasta"))
# Parse GFF3 records
gffs = GFF.parse(args.gff3, base_dict=seq_dict)
for record in sorted(gffs, key=lambda rec: fasta_list.index(rec.id)):
gff_output = os.path.join(args.output_dir, record.id + '.gff')
with open(gff_output, 'w') as handle:
GFF.write([record], handle)
output['gff3'].append(gff_output)
fa_output = os.path.join(args.output_dir, record.id + '.txt')
with open(fa_output, 'w') as handle:
handle.write(str(record.seq))
output['fasta'].append({
'path': fa_output,
'length': len(record.seq),
'name': record.id
})
print(json.dumps(output, sort_keys=True))