diff --git a/tests/AlignedSegment_test.py b/tests/AlignedSegment_test.py index ea7886fa..ea390aaa 100644 --- a/tests/AlignedSegment_test.py +++ b/tests/AlignedSegment_test.py @@ -360,6 +360,55 @@ def testPositions(self): ], ) + # cigar operator should be added to tuple + # ((0, 10), (2, 1), (0, 9), (1, 1), (0, 20)) + self.assertEqual( + a.get_aligned_pairs(with_cigar=True), + [ + (0, 20, 0), + (1, 21, 0), + (2, 22, 0), + (3, 23, 0), + (4, 24, 0), + (5, 25, 0), + (6, 26, 0), + (7, 27, 0), + (8, 28, 0), + (9, 29, 0), + (None, 30, 2), + (10, 31, 0), + (11, 32, 0), + (12, 33, 0), + (13, 34, 0), + (14, 35, 0), + (15, 36, 0), + (16, 37, 0), + (17, 38, 0), + (18, 39, 0), + (19, None, 1), + (20, 40, 0), + (21, 41, 0), + (22, 42, 0), + (23, 43, 0), + (24, 44, 0), + (25, 45, 0), + (26, 46, 0), + (27, 47, 0), + (28, 48, 0), + (29, 49, 0), + (30, 50, 0), + (31, 51, 0), + (32, 52, 0), + (33, 53, 0), + (34, 54, 0), + (35, 55, 0), + (36, 56, 0), + (37, 57, 0), + (38, 58, 0), + (39, 59, 0), + ], + ) + self.assertEqual( a.get_reference_positions(), [ @@ -444,6 +493,15 @@ def test_get_aligned_pairs_soft_clipping(self): ] + [(37, None), (38, None), (39, None)], ) + self.assertEqual( + a.get_aligned_pairs(with_cigar=True), + [(0, None, 4), (1, None, 4)] + + [ + (qpos, refpos, 0) + for (qpos, refpos) in zip(range(2, 2 + 35), range(20, 20 + 35)) + ] + + [(37, None, 4), (38, None, 4), (39, None, 4)], + ) self.assertEqual( a.get_aligned_pairs(True), # [(0, None), (1, None)] + @@ -453,6 +511,15 @@ def test_get_aligned_pairs_soft_clipping(self): ] # [(37, None), (38, None), (39, None)] ) + self.assertEqual( + a.get_aligned_pairs(True, with_cigar=True), + # [(0, None, 4), (1, None, 4)] + + [ + (qpos, refpos, 0) + for (qpos, refpos) in zip(range(2, 2 + 35), range(20, 20 + 35)) + ] + # [(37, None, 4), (38, None, 4), (39, None, 4)] + ) def test_get_aligned_pairs_hard_clipping(self): a = self.build_read() @@ -472,6 +539,21 @@ def test_get_aligned_pairs_hard_clipping(self): for (qpos, refpos) in zip(range(0, 0 + 35), range(20, 20 + 35)) ], ) + self.assertEqual( + a.get_aligned_pairs(with_cigar=True), + # No seq, no seq pos + [ + (qpos, refpos, 0) + for (qpos, refpos) in zip(range(0, 0 + 35), range(20, 20 + 35)) + ], + ) + self.assertEqual( + a.get_aligned_pairs(True, with_cigar=True), + [ + (qpos, refpos, 0) + for (qpos, refpos) in zip(range(0, 0 + 35), range(20, 20 + 35)) + ], + ) def test_get_aligned_pairs_skip(self): a = self.build_read() @@ -487,6 +569,17 @@ def test_get_aligned_pairs_skip(self): ) ], ) + self.assertEqual( + a.get_aligned_pairs(with_cigar=True), + [(0, 20, 0), (1, 21, 0)] + + [(None, refpos, 2) for refpos in range(22, 22 + 100)] + + [ + (qpos, refpos, 0) + for (qpos, refpos) in zip( + range(2, 2 + 38), range(20 + 2 + 100, 20 + 2 + 100 + 38) + ) + ], + ) self.assertEqual( a.get_aligned_pairs(True), [(0, 20), (1, 21)] + @@ -498,6 +591,17 @@ def test_get_aligned_pairs_skip(self): ) ], ) + self.assertEqual( + a.get_aligned_pairs(True, with_cigar=True), + [(0, 20, 0), (1, 21, 0)] + + # [(None, refpos, 2) for refpos in range(21, 21+100)] + + [ + (qpos, refpos, 0) + for (qpos, refpos) in zip( + range(2, 2 + 38), range(20 + 2 + 100, 20 + 2 + 100 + 38) + ) + ], + ) def test_get_aligned_pairs_match_mismatch(self): a = self.build_read() @@ -516,6 +620,28 @@ def test_get_aligned_pairs_match_mismatch(self): for (qpos, refpos) in zip(range(0, 0 + 40), range(20, 20 + 40)) ], ) + self.assertEqual( + a.get_aligned_pairs(with_cigar=True), + [ + (qpos, refpos, 7) + for (qpos, refpos) in zip(range(0, 0 + 20), range(20, 20 + 20)) + ] + + [ + (qpos, refpos, 8) + for (qpos, refpos) in zip(range(0 + 20, 0 + 40), range(20 + 20, 20 + 40)) + ], + ) + self.assertEqual( + a.get_aligned_pairs(True, with_cigar=True), + [ + (qpos, refpos, 7) + for (qpos, refpos) in zip(range(0, 0 + 20), range(20, 20 + 20)) + ] + + [ + (qpos, refpos, 8) + for (qpos, refpos) in zip(range(0 + 20, 0 + 40), range(20 + 20, 20 + 40)) + ], + ) def test_get_aligned_pairs_padding(self): a = self.build_read() @@ -524,6 +650,8 @@ def test_get_aligned_pairs_padding(self): # See comment in test_get_aligned_pairs_padding_with_seq (below). self.assertEqual(a.get_aligned_pairs(), [(0, 20), (1, None), (2, 21)]) + self.assertEqual(a.get_aligned_pairs(with_cigar=True), + [(0, 20, 0), (1, None, 6), (2, 21, 0)]) def test_get_aligned_pairs_padding_with_seq(self): a = self.build_read() @@ -552,6 +680,8 @@ def test_get_aligned_pairs_padding_with_seq(self): # sequences inserted relative to the reference." self.assertEqual(a.get_aligned_pairs(with_seq=True), [(0, 20, 'A'), (1, None, None), (2, 21, 'T')]) + self.assertEqual(a.get_aligned_pairs(with_seq=True, with_cigar=True), + [(0, 20, 'A', 0), (1, None, None, 6), (2, 21, 'T', 0)]) def test_get_aligned_pairs(self): a = self.build_read() @@ -572,6 +702,20 @@ def test_get_aligned_pairs(self): (8, 28, "A"), ], ) + self.assertEqual( + a.get_aligned_pairs(with_seq=True, with_cigar=True), + [ + (0, 20, "A", 0), + (1, 21, "A", 0), + (2, 22, "A", 0), + (3, 23, "A", 0), + (4, 24, "A", 0), + (5, 25, "A", 0), + (6, 26, "A", 0), + (7, 27, "A", 0), + (8, 28, "A", 0), + ], + ) a.set_tag("MD", "4C4") self.assertEqual( @@ -588,6 +732,20 @@ def test_get_aligned_pairs(self): (8, 28, "A"), ], ) + self.assertEqual( + a.get_aligned_pairs(with_seq=True, with_cigar=True), + [ + (0, 20, "A", 0), + (1, 21, "A", 0), + (2, 22, "A", 0), + (3, 23, "A", 0), + (4, 24, "c", 0), + (5, 25, "A", 0), + (6, 26, "A", 0), + (7, 27, "A", 0), + (8, 28, "A", 0), + ], + ) a.cigarstring = "5M2D4M" a.set_tag("MD", "4C^TT4") @@ -607,6 +765,22 @@ def test_get_aligned_pairs(self): (8, 30, "A"), ], ) + self.assertEqual( + a.get_aligned_pairs(with_seq=True, with_cigar=True), + [ + (0, 20, "A", 0), + (1, 21, "A", 0), + (2, 22, "A", 0), + (3, 23, "A", 0), + (4, 24, "c", 0), + (None, 25, "T", 2), + (None, 26, "T", 2), + (5, 27, "A", 0), + (6, 28, "A", 0), + (7, 29, "A", 0), + (8, 30, "A", 0), + ], + ) a.cigarstring = "5M2D2I2M" a.set_tag("MD", "4C^TT2") @@ -626,6 +800,22 @@ def test_get_aligned_pairs(self): (8, 28, "A"), ], ) + self.assertEqual( + a.get_aligned_pairs(with_seq=True, with_cigar=True), + [ + (0, 20, "A", 0), + (1, 21, "A", 0), + (2, 22, "A", 0), + (3, 23, "A", 0), + (4, 24, "c", 0), + (None, 25, "T", 2), + (None, 26, "T", 2), + (5, None, None, 1), + (6, None, None, 1), + (7, 27, "A", 0), + (8, 28, "A", 0), + ], + ) def test_get_aligned_pairs_with_malformed_MD_tag(self): @@ -636,6 +826,7 @@ def test_get_aligned_pairs_with_malformed_MD_tag(self): a.cigarstring = "64M2D85M2S" a.set_tag("MD", "64^TG86A0") self.assertRaises(AssertionError, a.get_aligned_pairs, with_seq=True) + self.assertRaises(AssertionError, a.get_aligned_pairs, with_seq=True, with_cigar=True) def test_get_aligned_pairs_skip_reference(self): a = self.build_read() @@ -660,6 +851,23 @@ def test_get_aligned_pairs_skip_reference(self): ], ) + self.assertEqual( + a.get_aligned_pairs(with_seq=True, with_cigar=True), + [ + (0, 20, "A", 0), + (1, 21, "A", 0), + (2, 22, "A", 0), + (3, 23, "A", 0), + (4, 24, "A", 0), + (None, 25, None, 3), + (5, 26, "A", 0), + (6, 27, "A", 0), + (7, 28, "A", 0), + (8, 29, "A", 0), + (9, 30, "A", 0), + ], + ) + self.assertEqual( a.get_aligned_pairs(with_seq=False), [ @@ -677,6 +885,23 @@ def test_get_aligned_pairs_skip_reference(self): ], ) + self.assertEqual( + a.get_aligned_pairs(with_seq=False, with_cigar=True), + [ + (0, 20, 0), + (1, 21, 0), + (2, 22, 0), + (3, 23, 0), + (4, 24, 0), + (None, 25, 3), + (5, 26, 0), + (6, 27, 0), + (7, 28, 0), + (8, 29, 0), + (9, 30, 0), + ], + ) + self.assertEqual( a.get_aligned_pairs(matches_only=True, with_seq=False), [ @@ -693,38 +918,66 @@ def test_get_aligned_pairs_skip_reference(self): ], ) + self.assertEqual( + a.get_aligned_pairs(matches_only=True, with_seq=False, with_cigar=True), + [ + (0, 20, 0), + (1, 21, 0), + (2, 22, 0), + (3, 23, 0), + (4, 24, 0), + (5, 26, 0), + (6, 27, 0), + (7, 28, 0), + (8, 29, 0), + (9, 30, 0), + ], + ) + def test_equivalence_matches_only_and_with_seq(self): a = self.build_read() a.query_sequence = "ACGT" * 2 a.cigarstring = "4M1D4M" a.set_tag("MD", "4^x4") - full = ( - list(zip(range(0, 4), range(20, 24), "ACGT")) - + [(None, 24, "x")] - + list(zip(range(4, 8), range(25, 29), "ACGT")) + full_with_op = ( + list(zip(range(0, 4), range(20, 24), "ACGT", [0] * 4)) + + [(None, 24, "x", 2)] + + list(zip(range(4, 8), range(25, 29), "ACGT", [0] * 4)) ) + full = [(qpos, refpos, ref) for (qpos, refpos, ref, op) in full_with_op] self.assertEqual(a.get_aligned_pairs(matches_only=False, with_seq=True), full) + self.assertEqual(a.get_aligned_pairs(matches_only=False, with_seq=True, with_cigar=True), full_with_op) self.assertEqual( a.get_aligned_pairs(matches_only=True, with_seq=True), [x for x in full if x[0] is not None and x[1] is not None], ) + self.assertEqual( + a.get_aligned_pairs(matches_only=True, with_seq=True, with_cigar=True), + [x for x in full_with_op if x[0] is not None and x[1] is not None], + ) a = self.build_read() a.query_sequence = "ACGT" * 2 a.cigarstring = "4M1N4M" a.set_tag("MD", "8") - full = ( - list(zip(range(0, 4), range(20, 24), "ACGT")) - + [(None, 24, None)] - + list(zip(range(4, 8), range(25, 29), "ACGT")) + full_with_op = ( + list(zip(range(0, 4), range(20, 24), "ACGT", [0] * 4)) + + [(None, 24, None, 3)] + + list(zip(range(4, 8), range(25, 29), "ACGT", [0] * 4)) ) + full = [(qpos, refpos, ref) for (qpos, refpos, ref, op) in full_with_op] self.assertEqual(a.get_aligned_pairs(matches_only=False, with_seq=True), full) + self.assertEqual(a.get_aligned_pairs(matches_only=False, with_seq=True, with_cigar=True), full_with_op) self.assertEqual( a.get_aligned_pairs(matches_only=True, with_seq=True), [x for x in full if x[0] is not None and x[1] is not None], ) + self.assertEqual( + a.get_aligned_pairs(matches_only=True, with_seq=True, with_cigar=True), + [x for x in full_with_op if x[0] is not None and x[1] is not None], + ) def test_get_aligned_pairs_lowercase_md(self): a = self.build_read() @@ -746,6 +999,21 @@ def test_get_aligned_pairs_lowercase_md(self): (9, 29, "A"), ], ) + self.assertEqual( + a.get_aligned_pairs(with_seq=True, with_cigar=True), + [ + (0, 20, "A", 0), + (1, 21, "A", 0), + (2, 22, "A", 0), + (3, 23, "A", 0), + (4, 24, "A", 0), + (5, 25, "g", 0), + (6, 26, "A", 0), + (7, 27, "A", 0), + (8, 28, "A", 0), + (9, 29, "A", 0), + ], + ) def test_get_aligned_pairs_uppercase_md(self): a = self.build_read() @@ -767,6 +1035,21 @@ def test_get_aligned_pairs_uppercase_md(self): (9, 29, "A"), ], ) + self.assertEqual( + a.get_aligned_pairs(with_seq=True, with_cigar=True), + [ + (0, 20, "A", 0), + (1, 21, "A", 0), + (2, 22, "A", 0), + (3, 23, "A", 0), + (4, 24, "A", 0), + (5, 25, "g", 0), + (6, 26, "A", 0), + (7, 27, "A", 0), + (8, 28, "A", 0), + (9, 29, "A", 0), + ], + ) def test_get_aligned_pairs_1character_md(self): a = self.build_read() @@ -785,6 +1068,18 @@ def test_get_aligned_pairs_1character_md(self): (6, 26, "A"), ], ) + self.assertEqual( + a.get_aligned_pairs(with_seq=True, with_cigar=True), + [ + (0, 20, "A", 0), + (1, 21, "A", 0), + (2, 22, "A", 0), + (3, 23, "A", 0), + (4, 24, "A", 0), + (5, 25, "A", 0), + (6, 26, "A", 0), + ], + ) def test_get_aligned_pairs_bad_type_md(self): a = self.build_read() @@ -793,6 +1088,8 @@ def test_get_aligned_pairs_bad_type_md(self): a.set_tag("MD", 7) with self.assertRaises(TypeError): a.get_aligned_pairs(with_seq=True) + with self.assertRaises(TypeError): + a.get_aligned_pairs(with_seq=True, with_cigar=True) def testNoSequence(self): """issue 176: retrieving length without query sequence @@ -1060,6 +1357,17 @@ def testReferenceBases(self): for x, y in reference_bases.items(): self.assertEqual(len(set(y)), 1) + reference_bases = collections.defaultdict(list) + with pysam.AlignmentFile(self.filename) as inf: + for c in inf.pileup(): + for r in c.pileups: + for read, ref, base, op in r.alignment.get_aligned_pairs(with_seq=True, with_cigar=True): + if ref is None: + continue + reference_bases[ref].append(base.upper()) + + for x, y in reference_bases.items(): + self.assertEqual(len(set(y)), 1) class TestBaseModifications(unittest.TestCase): def testChebi(self):