diff --git a/pdbtools/pdb_selaltloc.py b/pdbtools/pdb_selaltloc.py index 46adf68e..6f437c33 100644 --- a/pdbtools/pdb_selaltloc.py +++ b/pdbtools/pdb_selaltloc.py @@ -39,7 +39,6 @@ data to another. They are based on old FORTRAN77 code that was taking too much effort to maintain and compile. RIP. """ - import operator import os import sys @@ -113,8 +112,21 @@ def check_input(args): def select_by_occupancy(fhandle): + return select_altloc(fhandle, selloc=None, byocc=True) + + +def select_by_altloc(fhandle, selloc): + return select_altloc(fhandle, selloc, byocc=False) + + +def select_altloc(fhandle, selloc=None, byocc=False): """ - Pick the altloc with the highest occupancy. + Pick one altloc when atoms have more than one. + + If the specified altloc (selloc) is not present for this particular + atom, outputs all altlocs. For instance, if atom X has altlocs A and + B but the user picked C, we return A and B anyway. If atom Y has + altlocs A, B, and C, then we only return C. This function is a generator. @@ -125,133 +137,259 @@ def select_by_occupancy(fhandle): Yields ------ str (line-by-line) - The PDB file with altlocs of highest occupancy only. + The PDB file with altlocs according to selection. """ - atom_prop = {} - atom_prop_setd = atom_prop.setdefault - atom_data = [] - atom_data_append = atom_data.append - anisou_lines = {} # map atom_uid to lineno - ignored = set() - ignored_add = ignored.add - ignored_discard = ignored.discard - ignored_update = ignored.update - - # Iterate over file and store atom_uid - records = ('ATOM', 'HETATM', 'ANISOU') - for lineno, line in enumerate(fhandle): + if selloc is None and not byocc: + raise ValueError('Provide either `selloc` or `byocc`.') - atom_data_append(line) + altloc_lines = {} # dict to capture the lines from a altloc group + res_per_loc = {} # dict to capture the residues per altloc group - if line.startswith(records): - # Sometimes altlocs are used between different residue names. - # See 3u7t (residue 22 of chain A). So we ignore the resname below. - atom_uid = (line[12:16], line[20:26]) + prev_altloc = '' + prev_resname = '' + prev_resnum = '' - # ANISOU records do not have occupancy values. - # To keep things simple, we map ANISOU to ATOM/HETATM records - if line.startswith('ANISOU'): - anisou_lines[lineno - 1] = lineno - ignored_add(lineno) # we will fix this below - else: - occ = float(line[54:60]) - atom_prop_l = atom_prop_setd(atom_uid, []) - atom_prop_l.append((lineno, occ)) + flush_func_multi_residues = flush_resloc_occ if byocc else flush_resloc + flush_func_single_residues = \ + flush_resloc_occ_same_residue if byocc else flush_resloc_id_same_residue - # Iterate and pick highest occupancy for each atom. - for atom_uid, prop_list in atom_prop.items(): - prop_list.sort(key=operator.itemgetter(1), reverse=True) + records = ('ATOM', 'HETATM', 'ANISOU') + terminators = ('TER', 'END', 'CONECT', 'END', 'ENDMDL') - lineno = prop_list[0][0] + for line in fhandle: - # Edit altloc field(s) - line = atom_data[lineno] - atom_data[lineno] = line[:16] + ' ' + line[17:] + if line.startswith(records): + # captures the relevant parameters + altloc = line[16] + resname = line[17:20] + resnum = line[22:26].strip() + + if is_another_altloc_group( + altloc, prev_altloc, resnum, prev_resnum, + resname, prev_resname, altloc_lines, res_per_loc): + # if we see the altloc group has changed, we should flush + # the lines observed for the previous altloc group + + # uses for loop instead of "yield from" to maintain compatibility + # with older python version + if partial_altloc(altloc_lines): + flush_func = flush_func_single_residues + else: + flush_func = flush_func_multi_residues + + for __line in flush_func(selloc=selloc, altloc_lines=altloc_lines, res_per_loc=res_per_loc): + yield __line + + # saves the line per altloc identifier + current_loc = altloc_lines.setdefault(altloc, []) + current_loc.append(line) + + # registers which residues are seen for each identifier + rploc = res_per_loc.setdefault(altloc, set()) + rploc.add((resname, resnum)) + + prev_altloc = altloc + prev_resnum = resnum + prev_resname = resname + + elif line.startswith(terminators): + # before flushing the terminator line + # we should flush the previous altloc group + if altloc_lines: + if partial_altloc(altloc_lines): + flush_func = flush_func_single_residues + else: + flush_func = flush_func_multi_residues + for __line in flush_func(selloc=selloc, altloc_lines=altloc_lines, res_per_loc=res_per_loc): + yield __line + + prev_altloc = '' + prev_resname = '' + prev_resnum = '' + + yield line # the terminator line - if lineno in anisou_lines: - anisou_lineno = anisou_lines[lineno] - line = atom_data[anisou_lineno] - atom_data[anisou_lineno] = line[:16] + ' ' + line[17:] - ignored_discard(anisou_lineno) + else: + prev_altloc = '' + prev_resname = '' + prev_resnum = '' + yield line - ignored_update(p[0] for p in prop_list[1:]) + # end of for loop + # flush altloc residues in case the last residue was an altloc + if altloc_lines: - # Now yield - for lineno, line in enumerate(atom_data): - if lineno in ignored: - continue + if partial_altloc(altloc_lines): + flush_func = flush_func_single_residues + else: + flush_func = flush_func_multi_residues + + for __line in flush_func(selloc=selloc, altloc_lines=altloc_lines, res_per_loc=res_per_loc): + yield __line + + +def is_another_altloc_group( + altloc, + prev_altloc, + resnum, + prev_resnum, + resname, + prev_resname, + altloc_lines, + rploc, + ): + """Detect if current line because to another altloc group.""" + a0 = prev_altloc + a1 = altloc + ra0 = prev_resname + ra1 = resname + ru0 = prev_resnum + ru1 = resnum + rl = altloc_lines + rv = list(rploc.values()) + + is_another = ( + all((a0, ra0, ru0)) and ( + (a0 != a1 and a1 == ' ' and ru1 > ru0) + or (a0 == ' ' and a1 == ' ' and (ru1 != ru0 or ra1 != ra0)) + or ( + a0 == a1 + and a0 != ' ' + and a1 in rl + and ru1 > ru0 + and len(rl) > 1 + and all(len(v) == len(rv[0]) for v in rv[1:]) + ) + ) + ) + + return is_another + + +def flush_resloc(selloc, altloc_lines, res_per_loc): + """Flush the captured altloc lines.""" + # only the selected altloc is yieled + if selloc in altloc_lines: + for line2flush in altloc_lines[selloc]: + yield line2flush[:16] + ' ' + line2flush[17:] + + # the altloc group does not contain the selected altloc + # therefore, all members should be yielded + else: + for key, lines2flush in altloc_lines.items(): + for line2flush in lines2flush: + yield line2flush + + # clears the altloc group dictionary. Ready for the next one! + altloc_lines.clear() + res_per_loc.clear() + + +def flush_resloc_occ(altloc_lines, res_per_loc, **kw): + """Flush the captured altloc lines by highest occupancy.""" + # only the selected altloc is yieled + highest = 0.00 + altloc = ' ' + + # detects which altloc identifier has the highest occupancy + for key, lines2flush in altloc_lines.items(): + # we check only the first line because all atoms in one identifier + # should have the same occupancy value + occ = float(lines2flush[0][54:60]) + if occ > highest: + altloc = key + highest = occ + + for line2flush in altloc_lines[altloc]: + yield line2flush[:16] + ' ' + line2flush[17:] + + # clears the altloc group dictionary. Ready for the next one! + altloc_lines.clear() + res_per_loc.clear() + + +def flush_resloc_id_same_residue(selloc, altloc_lines, res_per_loc): + """Flush altloc if altloc are atoms in the same residue - by ID.""" + # places all lines in a single list + all_lines = [] + for altloc, lines in altloc_lines.items(): + all_lines.extend(lines) + + # organize by atoms + atoms = {} + for line in all_lines: + atom_number = int(line[6:11]) + atom = line[12:16] + alist = atoms.setdefault((atom_number, atom), []) + alist.append(line) + + sorted_atoms = sorted(list(atoms.items()), key=lambda x: x[0][0]) + + to_yield = [] + for atom, lines in sorted_atoms: + for line in lines: + if line[16] == selloc: + to_yield.append(line) + + if to_yield: + for line in to_yield: + yield line[:16] + ' ' + line[17:] + else: + for line in lines: + yield line - yield line + altloc_lines.clear() + res_per_loc.clear() -def select_by_altloc(fhandle, selloc): - """ - Pick one altloc when atoms have more than one. +def flush_resloc_occ_same_residue(altloc_lines, res_per_loc, **kw): + """Flush altloc if altloc are atoms in the same residue - by occ.""" + # places all lines in a single list + all_lines = [] + for altloc, lines in altloc_lines.items(): + all_lines.extend(lines) - If the specified altloc (selloc) is not present for this particular - atom, outputs all altlocs. For instance, if atom X has altlocs A and - B but the user picked C, we return A and B anyway. If atom Y has - altlocs A, B, and C, then we only return C. + # organize by atoms + atoms = {} + for line in all_lines: + atom_number = int(line[6:11]) + atom = line[12:16] + alist = atoms.setdefault((atom_number, atom), []) + alist.append(line) - This function is a generator. + sorted_atoms = sorted(list(atoms.items()), key=lambda x: x[0][0]) - Parameters - ---------- - fhandle : an iterable giving the PDB file line-by-line. + A = { + 'ATOM': 1, + 'HETA': 1, + 'ANIS': 0, + } - Yields - ------ - str (line-by-line) - The PDB file with altlocs according to selection. - """ - # We have to iterate multiple times - atom_prop = {} - atom_prop_setd = atom_prop.setdefault - atom_data = [] - atom_data_append = atom_data.append + for atom, lines in sorted_atoms: + lines.sort(key=lambda x: (A[x[:4]], float(x[54:60])), reverse=True) + yield lines[0][:16] + ' ' + lines[0][17:] + if lines[1:] and lines[1].startswith('ANISOU'): + yield lines[1][:16] + ' ' + lines[1][17:] - # Iterate over file and store atom_uid - records = ('ATOM', 'HETATM', 'ANISOU') - editable = set() - editable_add = editable.add - for lineno, line in enumerate(fhandle): + altloc_lines.clear() + res_per_loc.clear() - atom_data_append(line) - if line.startswith(records): - # Sometimes altlocs are used between different residue names. - # See 3u7t (residue 22 of chain A). So we ignore the resname below. - atom_uid = (line[12:16], line[20:26]) +def all_same_residue(altloc_lines): + """Assert all lines are from same residue.""" + residues = set() + for key, val in altloc_lines.items(): + for line in val: + resname = line[17:20] + resnum = line[22:26].strip() + residues.add((resname, resnum)) - altloc = line[16] - atom_prop_l = atom_prop_setd(atom_uid, []) - atom_prop_l.append((altloc, lineno)) - - if altloc == selloc: # flag as editable - editable_add(lineno) - - # Reduce editable indexes to atom_uid entries - editable = { - (atom_data[i][12:16], atom_data[i][20:26]) for i in editable - } - - # Now define lines to ignore in the output - ignored = set() - for atom_uid in editable: - for altloc, lineno in atom_prop[atom_uid]: - if altloc != selloc: - ignored.add(lineno) - else: - # Edit altloc field - line = atom_data[lineno] - atom_data[lineno] = line[:16] + ' ' + line[17:] - - # Iterate again and yield the correct lines. - for lineno, line in enumerate(atom_data): - if lineno in ignored: - continue - - yield line + return len(residues) == 1 + + +def partial_altloc(altloc_lines): + """Detect if the altloc positions are atoms in a single residue.""" + return ' ' in altloc_lines and all_same_residue(altloc_lines) def run(fhandle, option=None): diff --git a/tests/test_pdb_selaltloc.py b/tests/test_pdb_selaltloc.py index 8d6ade2b..b751d838 100644 --- a/tests/test_pdb_selaltloc.py +++ b/tests/test_pdb_selaltloc.py @@ -18,7 +18,6 @@ """ Unit Tests for `pdb_selaltloc`. """ - import os import sys import unittest @@ -53,6 +52,153 @@ def exec_module(self): return + def test_is_same_group_1(self): + """ + Test if from the same group. + + Exemple of a starting line with empty strings. + """ + result = self.module.is_another_altloc_group( + ' ', '', '1', '', 'ALA', '', {}, {}) + + self.assertFalse(result) + + def test_is_same_group_2(self): + """ + Test if from the same group. + + Example of a starting line with Nones. + """ + result = self.module.is_another_altloc_group( + ' ', None, None, None, None, None, {}, {}) + + self.assertFalse(result) + + def test_is_same_group_3(self): + """ + Test if from the same group. + + Example of all parameters are the same as previous line. + """ + result = self.module.is_another_altloc_group( + 'B', 'B', '12', '12', 'ALA', 'ALA', {'B': None}, + {'B': {('ALA', '12')}} + ) + self.assertFalse(result) + + + def test_is_same_group_4(self): + """ + Test if line is from another group. + + Multiple residue altloc. + + This considers altloc spanning several residues. See example + dummy_altloc2.pdb. + """ + result = self.module.is_another_altloc_group( + 'A', 'A', '26', '25', 'LEU', 'GLU', {'A': ['lines']}, + {'A': {('GLU', '25')}} + ) + self.assertFalse(result) + + def test_is_same_group_5(self): + """ + Test if line is from another group. + + Multiple residue altloc. + + This considers altloc spanning several residues. See example + dummy_altloc2.pdb. + """ + result = self.module.is_another_altloc_group( + 'A', ' ', '26', '25', 'GLU', 'GLU', {' ': ['lines']}, + {' ': {('GLU', '25')}} + ) + self.assertFalse(result) + + def test_is_same_group_6(self): + """ + Test if line is from another group. + + Multiple residue altloc. + + This considers altloc spanning several residues. See example + dummy_altloc2.pdb. + """ + result = self.module.is_another_altloc_group( + 'A', ' ', '25', '25', 'ALA', 'GLU', {' ': ['lines']}, + {' ': {('GLU', '25')}} + ) + self.assertFalse(result) + + def test_is_another_group_1(self): + result = self.module.is_another_altloc_group( + ' ', 'B', '2', '1', 'ALA', 'PRO', {'B': ['lines']}, + {'B': {('PRO', '1')}} + ) + self.assertTrue(result) + + def test_is_another_group_2(self): + result = self.module.is_another_altloc_group( + ' ', ' ', '2', '1', 'ALA', 'ALA', {' ': ['lines']}, + {' ': {('ALA', '1')}} + ) + self.assertTrue(result) + + def test_is_another_group_3(self): + result = self.module.is_another_altloc_group( + ' ', ' ', '1', '1', 'ALA', 'GLU', {' ': ['lines']}, + {' ': {('GLU', '1')}} + ) + self.assertTrue(result) + + def test_is_another_group_4(self): + result = self.module.is_another_altloc_group( + 'A', 'A', '26', '25', 'LEU', 'GLU', {' ': ['lines'], 'A': ['lines']}, + {' ': {('LEU', '25')}, 'A': {('GLU', '26')}} + ) + self.assertTrue(result) + + def test_all_same_residue(self): + """Test all same residue.""" + inp = { + ' ': [ + "ATOM 3 N ASN A 1 22.066 40.557 0.420 1.00 0.00 N ", + "ATOM 3 H ASN A 1 21.629 41.305 -0.098 1.00 0.00 H ", + "ATOM 3 H2 ASN A 1 23.236 40.798 0.369 1.00 0.00 H ", + "ATOM 3 H3 ASN A 1 21.866 40.736 1.590 1.00 0.00 H ", + ], + 'B': ["ATOM 3 CA BASN A 1 20.000 30.000 0.005 0.60 0.00 C "], + 'A': ["ATOM 3 CA AASN A 1 21.411 39.311 0.054 0.40 0.00 C "], + } + + result = self.module.all_same_residue(inp) + self.assertTrue(result) + + def test_all_same_residue_false(self): + """Test all same residue.""" + inp = { + 'B': ["ATOM 3 CA BSER A 2 20.000 30.000 0.005 0.60 0.00 C "], 'A': ["ATOM 3 CA AASN A 1 21.411 39.311 0.054 0.40 0.00 C "], } + result = self.module.all_same_residue(inp) + self.assertFalse(result) + + def test_partial_altloc(self): + inp = { + 'A': [ + "ATOM 333 CA AGLU A 26 -10.000 -3.000 -12.000 0.50 4.89 C ", + "ANISOU 333 CA AGLU A 26 576 620 663 31 42 45 C ", + ], + 'B':[ + "ATOM 333 CA CGLU A 26 -10.679 -3.437 -12.387 1.00 4.89 C ", + "ANISOU 333 CA CGLU A 26 576 620 663 31 42 45 C ", + ], + } + + result = self.module.partial_altloc(inp) + self.assertFalse(result) + + def test_default(self): """$ pdb_selaltloc data/dummy_altloc.pdb""" @@ -280,6 +426,41 @@ def test_select_loc_C_2(self): self.assertEqual(observed, expected) + def test_gives_same_dummy_A(self): + """Test dummy.pdb is not altered because there are not altlocs.""" + sys.argv = ['', '-A', os.path.join(data_dir, 'dummy.pdb')] + self.exec_module() + self.assertEqual(self.retcode, 0) + self.assertEqual(len(self.stdout), 203) + self.assertEqual(len(self.stderr), 0) + self.assertEqual( + self.stdout[80], + "ATOM 3 CA ASN A 1 21.411 39.311 0.054 0.40 0.00 C ") + + def test_gives_same_dummy_B(self): + """Test dummy.pdb is not altered because there are not altlocs.""" + sys.argv = ['', '-B', os.path.join(data_dir, 'dummy.pdb')] + self.exec_module() + self.assertEqual(self.retcode, 0) + self.assertEqual(len(self.stdout), 203) + self.assertEqual(len(self.stderr), 0) + self.assertEqual( + self.stdout[80], + "ATOM 3 CA ASN A 1 20.000 30.000 0.005 0.60 0.00 C " + ) + + def test_gives_same_dummy_maxocc(self): + """Test dummy.pdb is not altered because there are not altlocs.""" + sys.argv = ['', os.path.join(data_dir, 'dummy.pdb')] + self.exec_module() + self.assertEqual(self.retcode, 0) + self.assertEqual(len(self.stdout), 203) + self.assertEqual(len(self.stderr), 0) + self.assertEqual( + self.stdout[80], + "ATOM 3 CA ASN A 1 20.000 30.000 0.005 0.60 0.00 C " + ) + def test_file_not_found(self): """$ pdb_selaltloc not_existing.pdb"""