Skip to content

Commit

Permalink
Merge pull request #117
Browse files Browse the repository at this point in the history
Improved `pdb_selaltloc`
  • Loading branch information
joaomcteixeira authored Feb 8, 2022
2 parents 881d4ff + bf1e254 commit 26f25a0
Show file tree
Hide file tree
Showing 2 changed files with 428 additions and 109 deletions.
354 changes: 246 additions & 108 deletions pdbtools/pdb_selaltloc.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
data to another. They are based on old FORTRAN77 code that was taking too much
effort to maintain and compile. RIP.
"""

import operator
import os
import sys
Expand Down Expand Up @@ -113,8 +112,21 @@ def check_input(args):


def select_by_occupancy(fhandle):
return select_altloc(fhandle, selloc=None, byocc=True)


def select_by_altloc(fhandle, selloc):
return select_altloc(fhandle, selloc, byocc=False)


def select_altloc(fhandle, selloc=None, byocc=False):
"""
Pick the altloc with the highest occupancy.
Pick one altloc when atoms have more than one.
If the specified altloc (selloc) is not present for this particular
atom, outputs all altlocs. For instance, if atom X has altlocs A and
B but the user picked C, we return A and B anyway. If atom Y has
altlocs A, B, and C, then we only return C.
This function is a generator.
Expand All @@ -125,133 +137,259 @@ def select_by_occupancy(fhandle):
Yields
------
str (line-by-line)
The PDB file with altlocs of highest occupancy only.
The PDB file with altlocs according to selection.
"""
atom_prop = {}
atom_prop_setd = atom_prop.setdefault
atom_data = []
atom_data_append = atom_data.append
anisou_lines = {} # map atom_uid to lineno
ignored = set()
ignored_add = ignored.add
ignored_discard = ignored.discard
ignored_update = ignored.update

# Iterate over file and store atom_uid
records = ('ATOM', 'HETATM', 'ANISOU')
for lineno, line in enumerate(fhandle):
if selloc is None and not byocc:
raise ValueError('Provide either `selloc` or `byocc`.')

atom_data_append(line)
altloc_lines = {} # dict to capture the lines from a altloc group
res_per_loc = {} # dict to capture the residues per altloc group

if line.startswith(records):
# Sometimes altlocs are used between different residue names.
# See 3u7t (residue 22 of chain A). So we ignore the resname below.
atom_uid = (line[12:16], line[20:26])
prev_altloc = ''
prev_resname = ''
prev_resnum = ''

# ANISOU records do not have occupancy values.
# To keep things simple, we map ANISOU to ATOM/HETATM records
if line.startswith('ANISOU'):
anisou_lines[lineno - 1] = lineno
ignored_add(lineno) # we will fix this below
else:
occ = float(line[54:60])
atom_prop_l = atom_prop_setd(atom_uid, [])
atom_prop_l.append((lineno, occ))
flush_func_multi_residues = flush_resloc_occ if byocc else flush_resloc
flush_func_single_residues = \
flush_resloc_occ_same_residue if byocc else flush_resloc_id_same_residue

# Iterate and pick highest occupancy for each atom.
for atom_uid, prop_list in atom_prop.items():
prop_list.sort(key=operator.itemgetter(1), reverse=True)
records = ('ATOM', 'HETATM', 'ANISOU')
terminators = ('TER', 'END', 'CONECT', 'END', 'ENDMDL')

lineno = prop_list[0][0]
for line in fhandle:

# Edit altloc field(s)
line = atom_data[lineno]
atom_data[lineno] = line[:16] + ' ' + line[17:]
if line.startswith(records):
# captures the relevant parameters
altloc = line[16]
resname = line[17:20]
resnum = line[22:26].strip()

if is_another_altloc_group(
altloc, prev_altloc, resnum, prev_resnum,
resname, prev_resname, altloc_lines, res_per_loc):
# if we see the altloc group has changed, we should flush
# the lines observed for the previous altloc group

# uses for loop instead of "yield from" to maintain compatibility
# with older python version
if partial_altloc(altloc_lines):
flush_func = flush_func_single_residues
else:
flush_func = flush_func_multi_residues

for __line in flush_func(selloc=selloc, altloc_lines=altloc_lines, res_per_loc=res_per_loc):
yield __line

# saves the line per altloc identifier
current_loc = altloc_lines.setdefault(altloc, [])
current_loc.append(line)

# registers which residues are seen for each identifier
rploc = res_per_loc.setdefault(altloc, set())
rploc.add((resname, resnum))

prev_altloc = altloc
prev_resnum = resnum
prev_resname = resname

elif line.startswith(terminators):
# before flushing the terminator line
# we should flush the previous altloc group
if altloc_lines:
if partial_altloc(altloc_lines):
flush_func = flush_func_single_residues
else:
flush_func = flush_func_multi_residues
for __line in flush_func(selloc=selloc, altloc_lines=altloc_lines, res_per_loc=res_per_loc):
yield __line

prev_altloc = ''
prev_resname = ''
prev_resnum = ''

yield line # the terminator line

if lineno in anisou_lines:
anisou_lineno = anisou_lines[lineno]
line = atom_data[anisou_lineno]
atom_data[anisou_lineno] = line[:16] + ' ' + line[17:]
ignored_discard(anisou_lineno)
else:
prev_altloc = ''
prev_resname = ''
prev_resnum = ''
yield line

ignored_update(p[0] for p in prop_list[1:])
# end of for loop
# flush altloc residues in case the last residue was an altloc
if altloc_lines:

# Now yield
for lineno, line in enumerate(atom_data):
if lineno in ignored:
continue
if partial_altloc(altloc_lines):
flush_func = flush_func_single_residues
else:
flush_func = flush_func_multi_residues

for __line in flush_func(selloc=selloc, altloc_lines=altloc_lines, res_per_loc=res_per_loc):
yield __line


def is_another_altloc_group(
altloc,
prev_altloc,
resnum,
prev_resnum,
resname,
prev_resname,
altloc_lines,
rploc,
):
"""Detect if current line because to another altloc group."""
a0 = prev_altloc
a1 = altloc
ra0 = prev_resname
ra1 = resname
ru0 = prev_resnum
ru1 = resnum
rl = altloc_lines
rv = list(rploc.values())

is_another = (
all((a0, ra0, ru0)) and (
(a0 != a1 and a1 == ' ' and ru1 > ru0)
or (a0 == ' ' and a1 == ' ' and (ru1 != ru0 or ra1 != ra0))
or (
a0 == a1
and a0 != ' '
and a1 in rl
and ru1 > ru0
and len(rl) > 1
and all(len(v) == len(rv[0]) for v in rv[1:])
)
)
)

return is_another


def flush_resloc(selloc, altloc_lines, res_per_loc):
"""Flush the captured altloc lines."""
# only the selected altloc is yieled
if selloc in altloc_lines:
for line2flush in altloc_lines[selloc]:
yield line2flush[:16] + ' ' + line2flush[17:]

# the altloc group does not contain the selected altloc
# therefore, all members should be yielded
else:
for key, lines2flush in altloc_lines.items():
for line2flush in lines2flush:
yield line2flush

# clears the altloc group dictionary. Ready for the next one!
altloc_lines.clear()
res_per_loc.clear()


def flush_resloc_occ(altloc_lines, res_per_loc, **kw):
"""Flush the captured altloc lines by highest occupancy."""
# only the selected altloc is yieled
highest = 0.00
altloc = ' '

# detects which altloc identifier has the highest occupancy
for key, lines2flush in altloc_lines.items():
# we check only the first line because all atoms in one identifier
# should have the same occupancy value
occ = float(lines2flush[0][54:60])
if occ > highest:
altloc = key
highest = occ

for line2flush in altloc_lines[altloc]:
yield line2flush[:16] + ' ' + line2flush[17:]

# clears the altloc group dictionary. Ready for the next one!
altloc_lines.clear()
res_per_loc.clear()


def flush_resloc_id_same_residue(selloc, altloc_lines, res_per_loc):
"""Flush altloc if altloc are atoms in the same residue - by ID."""
# places all lines in a single list
all_lines = []
for altloc, lines in altloc_lines.items():
all_lines.extend(lines)

# organize by atoms
atoms = {}
for line in all_lines:
atom_number = int(line[6:11])
atom = line[12:16]
alist = atoms.setdefault((atom_number, atom), [])
alist.append(line)

sorted_atoms = sorted(list(atoms.items()), key=lambda x: x[0][0])

to_yield = []
for atom, lines in sorted_atoms:
for line in lines:
if line[16] == selloc:
to_yield.append(line)

if to_yield:
for line in to_yield:
yield line[:16] + ' ' + line[17:]
else:
for line in lines:
yield line

yield line
altloc_lines.clear()
res_per_loc.clear()


def select_by_altloc(fhandle, selloc):
"""
Pick one altloc when atoms have more than one.
def flush_resloc_occ_same_residue(altloc_lines, res_per_loc, **kw):
"""Flush altloc if altloc are atoms in the same residue - by occ."""
# places all lines in a single list
all_lines = []
for altloc, lines in altloc_lines.items():
all_lines.extend(lines)

If the specified altloc (selloc) is not present for this particular
atom, outputs all altlocs. For instance, if atom X has altlocs A and
B but the user picked C, we return A and B anyway. If atom Y has
altlocs A, B, and C, then we only return C.
# organize by atoms
atoms = {}
for line in all_lines:
atom_number = int(line[6:11])
atom = line[12:16]
alist = atoms.setdefault((atom_number, atom), [])
alist.append(line)

This function is a generator.
sorted_atoms = sorted(list(atoms.items()), key=lambda x: x[0][0])

Parameters
----------
fhandle : an iterable giving the PDB file line-by-line.
A = {
'ATOM': 1,
'HETA': 1,
'ANIS': 0,
}

Yields
------
str (line-by-line)
The PDB file with altlocs according to selection.
"""
# We have to iterate multiple times
atom_prop = {}
atom_prop_setd = atom_prop.setdefault
atom_data = []
atom_data_append = atom_data.append
for atom, lines in sorted_atoms:
lines.sort(key=lambda x: (A[x[:4]], float(x[54:60])), reverse=True)
yield lines[0][:16] + ' ' + lines[0][17:]
if lines[1:] and lines[1].startswith('ANISOU'):
yield lines[1][:16] + ' ' + lines[1][17:]

# Iterate over file and store atom_uid
records = ('ATOM', 'HETATM', 'ANISOU')
editable = set()
editable_add = editable.add
for lineno, line in enumerate(fhandle):
altloc_lines.clear()
res_per_loc.clear()

atom_data_append(line)

if line.startswith(records):
# Sometimes altlocs are used between different residue names.
# See 3u7t (residue 22 of chain A). So we ignore the resname below.
atom_uid = (line[12:16], line[20:26])
def all_same_residue(altloc_lines):
"""Assert all lines are from same residue."""
residues = set()
for key, val in altloc_lines.items():
for line in val:
resname = line[17:20]
resnum = line[22:26].strip()
residues.add((resname, resnum))

altloc = line[16]
atom_prop_l = atom_prop_setd(atom_uid, [])
atom_prop_l.append((altloc, lineno))

if altloc == selloc: # flag as editable
editable_add(lineno)

# Reduce editable indexes to atom_uid entries
editable = {
(atom_data[i][12:16], atom_data[i][20:26]) for i in editable
}

# Now define lines to ignore in the output
ignored = set()
for atom_uid in editable:
for altloc, lineno in atom_prop[atom_uid]:
if altloc != selloc:
ignored.add(lineno)
else:
# Edit altloc field
line = atom_data[lineno]
atom_data[lineno] = line[:16] + ' ' + line[17:]

# Iterate again and yield the correct lines.
for lineno, line in enumerate(atom_data):
if lineno in ignored:
continue

yield line
return len(residues) == 1


def partial_altloc(altloc_lines):
"""Detect if the altloc positions are atoms in a single residue."""
return ' ' in altloc_lines and all_same_residue(altloc_lines)


def run(fhandle, option=None):
Expand Down
Loading

0 comments on commit 26f25a0

Please sign in to comment.