forked from Illumina/GTCtoVCF
-
Notifications
You must be signed in to change notification settings - Fork 1
/
BPMRecord.py
334 lines (271 loc) · 12.5 KB
/
BPMRecord.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
from IlluminaBeadArrayFiles import RefStrand
COMPLEMENT_MAP = dict(zip("ABCDGHKMRTVYNID", "TVGHCDMKYABRNID"))
REQUIRED_INDEL_CONTEXT_LENGTH = 3
def reverse(sequence):
"""
Reverse a string
Args:
sequence (string): Sequence to reverse
Returns:
string: The reversed sequence
"""
return sequence[::-1]
def complement(sequence):
"""
Complement a nucleotide sequence. Note that complement of D and I are D and I,
respectively. This is intended to be called on the "SNP" portion of a source sequence.
Args:
sequence (string): The input sequence
Returns:
string: The complemented sequence
"""
return "".join(COMPLEMENT_MAP[x] for x in sequence)
def determine_left_shift(five_prime, indel, three_prime):
"""
Adjust 5' and 3' context of indel such that
indel is fully shifted to 5'
Args:
five_prime (string) : Five prime sequence
indel (string) : Sequence of indel
three_prime (string) : Three prime sequence
Returns:
(string, string) : New sequence of 5' and 3' sequences
"""
while five_prime.endswith(indel):
five_prime = five_prime[:-len(indel)]
three_prime = indel + three_prime
# may have not fully shifted homopolymer
while len(indel) * five_prime[-1] == indel:
three_prime = five_prime[-1] + three_prime
five_prime = five_prime[:-1]
return (five_prime, three_prime)
def reverse_complement(sequence):
"""
Reverse complement a sequence
Args:
sequence (string): The input sequence
Returns:
string: The reverse-complement of the input sequence
"""
return reverse(complement(sequence))
class BPMRecord(object):
"""
Represents entry from a manifest.abs
Attributes:
name (string): Entry name (unique)
address_a (string): Address of probe A
probe_a (string): Sequence of probe A
chromosome (string): Chromsome name
pos (int): Mapping position
snp (string): SNP variation (e.g., [A/C])
ref_strand (RefStrand): Reference strand of snp
assay_type (int): 0 for Inf II, 1 for Inf I
indel_source_sequence (IndelSourceSequence): Sequence of indel (on design strand), None for SNV
index_num (int): Index in original manifest
is_deletion (bool): Whether indel record represents deletion, None for SNV
"""
def __init__(self, name, address_a, probe_a, chromosome, pos, snp, ref_strand, assay_type, indel_source_sequence, source_strand, ilmn_strand, genome_reader, index, logger):
"""
Create a new BPM record
Args:
name (string) : Name field from manifest
name (string) : AddressA_ID field from manifest
probe_a (string) : AlleleA_ProbeSeq field from manifest
chromosome (string) : Chr field from manifest
pos (string, int) : MapInfo field from manifest
ref_strand (RefStrand) : RefStrand from manifest
assay_type (int) : 0 for Inf II, 1 for Inf I
indel_source_sequence (IndelSourceSequence) : Source sequence for indel, may be None for SNV
source_strand (string) : SourceStrand field from manifest
ilmn_strand (strinng) : IlmnStrand field from manifest
genome_reader (ReferenceGenome,CachedReferenceGenome) : Allows query of genomic sequence, may be None for SNV
index (int) : Index of entry within manifest/GTC files
logger (Logger) : A logger
"""
self.name = name
self.address_a = address_a
self.probe_a = probe_a
self.chromosome = chromosome
self.pos = int(pos)
self.snp = snp
self.ref_strand = ref_strand
self.assay_type = assay_type
self.indel_source_sequence = indel_source_sequence
self._genome_reader = genome_reader
self.index_num = index
self._logger = logger
self.plus_strand_alleles = self._determine_plus_strand_alleles(snp, ref_strand)
if self.indel_source_sequence:
source_strand = source_strand[0].upper()
ilmn_strand = ilmn_strand[0].upper()
if source_strand == "U" or ilmn_strand == "U":
raise ValueError("Unable to process indel with customer or ILMN strand value of \"U\"")
if source_strand == "P" or source_strand == "M":
assert ilmn_strand == "P" or ilmn_strand == "M"
else:
assert ilmn_strand == "T" or ilmn_strand == "B"
self.is_source_on_design_strand = source_strand == ilmn_strand
self.is_deletion = self._calculate_is_deletion()
else:
self.is_source_on_design_strand = None
self.is_deletion = None
def is_indel(self):
"""
Check whether a BPM record represents an indel
Args:
None
Returns:
bool: True if record is indel
"""
return "D" in self.snp
def get_indel_source_sequences(self, ref_strand):
return self.indel_source_sequence.get_split_sequence(self.is_source_on_design_strand != (self.ref_strand == ref_strand), True)
def _calculate_is_deletion(self):
if self.chromosome == "0" or self.pos == 0:
return None
start_index = self.pos - 1
chromosome = "X" if self.chromosome == "XY" else self.chromosome
# get indel sequence on the plus strand
(five_prime, indel_sequence, three_prime) = self.get_indel_source_sequences(RefStrand.Plus)
genomic_sequence = self._genome_reader.get_reference_bases(chromosome, start_index, start_index + len(indel_sequence))
indel_sequence_match = indel_sequence == genomic_sequence
genomic_deletion_five_prime = self._genome_reader.get_reference_bases(
chromosome, start_index - len(five_prime), start_index)
genomic_deletion_three_prime = self._genome_reader.get_reference_bases(
chromosome, start_index + len(indel_sequence), start_index + len(indel_sequence) + len(three_prime))
(genomic_deletion_five_prime, genomic_deletion_three_prime) = determine_left_shift(genomic_deletion_five_prime, indel_sequence, genomic_deletion_three_prime)
genomic_insertion_five_prime = self._genome_reader.get_reference_bases(
chromosome, start_index - len(five_prime) + 1, start_index + 1)
genomic_insertion_three_prime = self._genome_reader.get_reference_bases(
chromosome, start_index + 1, start_index + len(three_prime) + 1)
(genomic_insertion_five_prime, genomic_insertion_three_prime) = determine_left_shift(genomic_insertion_five_prime, indel_sequence, genomic_insertion_three_prime)
deletion_context_match_lengths = (max_suffix_match(genomic_deletion_five_prime, five_prime), max_prefix_match(genomic_deletion_three_prime, three_prime))
max_deletion_context = min(len(genomic_deletion_five_prime), len(five_prime)) + min(len(genomic_deletion_three_prime), len(three_prime)) + len(indel_sequence)
deletion_context_score = (sum(deletion_context_match_lengths) + len(indel_sequence) if indel_sequence_match else 0)/float(max_deletion_context)
insertion_context_match_lengths = (max_suffix_match(genomic_insertion_five_prime, five_prime), max_prefix_match(genomic_insertion_three_prime, three_prime))
max_insertion_context = min(len(genomic_insertion_five_prime), len(five_prime)) + min(len(genomic_insertion_three_prime), len(three_prime))
insertion_context_score = sum(insertion_context_match_lengths)/float(max_insertion_context)
is_deletion = indel_sequence_match and deletion_context_score > insertion_context_score and min(deletion_context_match_lengths) >= 1
is_insertion = insertion_context_score > deletion_context_score and min(insertion_context_match_lengths) >= 1
if is_deletion == is_insertion:
raise Exception("Unable to determine reference allele for indel")
if is_deletion:
if deletion_context_score < 1.0:
self._logger.warn("Incomplete match of source sequence to genome for indel " + self.name)
if is_insertion:
if insertion_context_score < 1.0:
self._logger.warn("Incomplete match of source sequence to genome for indel " + self.name)
return is_deletion
def _determine_plus_strand_alleles(self, snp, ref_strand):
"""
Return the nucleotides alleles for the record on the plus strand.
If record is indel, will return alleles in terms of D/I SNP convention
Args:
None
Returns
None
Raises:
Exception - Record does not contains reference strand information
"""
nucleotides = [snp[1], snp[-2]]
if ref_strand == RefStrand.Plus:
return nucleotides
elif ref_strand == RefStrand.Minus:
return [
COMPLEMENT_MAP[nucleotide] for nucleotide in nucleotides]
else:
raise Exception(
"Manifest must contain reference strand information")
class IndelSourceSequence(object):
"""
Represents the source sequence for an indel
Attributes:
five_prime (string) : Sequence 5' of indel (on the design strand)
indel (string) : Indel sequence (on the design strand)
three_prime (string) : Sequence 3' of indel (on the design strand)
"""
def __init__(self, source_sequence):
(self.five_prime, self.indel, self.three_prime) = self.split_source_sequence(source_sequence.upper())
def get_split_sequence(self, generate_reverse_complement, left_shift):
"""
Return the components of the indel source sequence
Args:
generate_reverse_complement (bool) : Return reverse complement of original source sequence
left_shift (bool) : Left shift position of indel on requested strand
Returns:
(five_prime, indel, three_prime) = Sequences of three components of indel source sequence
"""
if generate_reverse_complement:
(five_prime, indel, three_prime) = (reverse_complement(self.three_prime), reverse_complement(self.indel), reverse_complement(self.five_prime))
else:
(five_prime, indel, three_prime) = (self.five_prime, self.indel, self.three_prime)
if left_shift:
(five_prime, three_prime) = determine_left_shift(five_prime, indel, three_prime)
return (five_prime, indel, three_prime)
@staticmethod
def split_source_sequence(source_sequence):
"""
Break source sequence into different piecdes
Args:
source_sequence (string): Source sequence string (e.g., ACGT[-/AGA]ATAT)
Returns:
(string, string, string) : Tuple with 5' sequence, indel sequence, 3' sequence
"""
left_position = source_sequence.find("/")
right_position = source_sequence.find("]")
assert source_sequence[left_position - 1] == "-"
return (source_sequence[:(left_position - 2)], source_sequence[(left_position + 1):right_position], source_sequence[(right_position+1):])
DEGENERACY_MAP = {}
DEGENERACY_MAP["A"] = "A"
DEGENERACY_MAP["C"] = "C"
DEGENERACY_MAP["G"] = "G"
DEGENERACY_MAP["T"] = "T"
DEGENERACY_MAP["R"] = "AG"
DEGENERACY_MAP["Y"] = "CT"
DEGENERACY_MAP["S"] = "GC"
DEGENERACY_MAP["W"] = "AT"
DEGENERACY_MAP["K"] = "GT"
DEGENERACY_MAP["M"] = "AC"
DEGENERACY_MAP["B"] = "CGT"
DEGENERACY_MAP["D"] = "AGT"
DEGENERACY_MAP["H"] = "ACT"
DEGENERACY_MAP["V"] = "ACG"
DEGENERACY_MAP["N"] = "ACGT"
def max_suffix_match(str1, str2):
"""
Determine the maximum length of exact suffix
match between str1 and str2
str2 may contain degenerate IUPAC characters
Args:
str1 (string) : First string
str2 (string) : Second string
Returns:
int : Length of maximum suffix match
"""
result = 0
for (char1, char2) in zip(str1[::-1], str2[::-1]):
assert char1 in "ACGT"
if char1 in DEGENERACY_MAP[char2]:
result += 1
else:
break
return result
def max_prefix_match(str1, str2):
"""
Determine the maximum length of exact prefix
match between str1 and str2
str2 may contain degenerate IUPAC characters
Args:
str1 (string) : First string
str2 (string) : Second string
Returns:
int : Length of maximum prefix match
"""
result = 0
for (char1, char2) in zip(str1, str2):
assert char1 in "ACGT"
if char1 in DEGENERACY_MAP[char2]:
result += 1
else:
break
return result