Source code for schrodinger.protein.getpdb

"""
Module for downloading PDB files from the web.

The data is retrieved from the RCSB. Current download URLs are documented
at http://www.rcsb.org/pdb/static.do?p=download/http/index.html

Running this module is no different from using a web-browser to access
the site - it's just a different type of web client. Therefore this should
cause no problems for the maintainers of that site and be within the
terms and conditions of use.

Note that certain assumptions are made about the layout of the web site -
changes there in future may make this script stop working.

Copyright Schrodinger, LLC. All rights reserved.

"""

import gzip
import os
import shutil
import sys
import tempfile

import requests
import requests.packages.urllib3
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util import retry

from schrodinger.utils import fileutils
from schrodinger.utils import log
from schrodinger.utils import subprocess

requests.packages.urllib3.disable_warnings()

# Constants for get_pdb() function:
AUTO, DATABASE, WEB = list(range(3))

# Main download URL. FASTA files are downloaded differently.
_RCSB_URL = 'https://files.rcsb.org/download/'

# EMDB download URL.
_EMDB_URL = 'https://ftp.rcsb.org/pub/emdb/structures'

# HTTP error codes for which request retry should happen
RETRY_HTTP_CODES = (500, 502, 503, 504)

logger = log.get_output_logger("getpdb")
logger.setLevel(log.INFO)


def _download_file_from_url(url, dest_file):
    """
    Download file from given URL to the specified destination file. If CWD
    is not writable, file will be written to the temporary directory.

    :param url: URL to download the file from.
    :type url: str

    :param dest_file: Path where to write the file to.
    :type dest_file: str

    :return: Path to the written file.
    :rtype: str
    """

    with requests_retry_session() as session:
        request = session.get(url, stream=True, verify=False)
        request.raise_for_status()
        with open_filename(dest_file, 'wb') as fh:
            # If CWD is not writable, output path will be in temporary dir:
            dest_file = fh.name
            # Fetch by chunks of 8 KiB:
            for chunk in request.iter_content(8192):
                fh.write(chunk)
    return dest_file


[docs]def download_file(filename): """ Download the given file from RCSB and save it to either CWD or temp dir with same name. Path to the written file is returned. :param filename: File to download from RSCB web site. :type filename: str :raises requests.HTTPError: if error in connection to RCSB. """ url = _RCSB_URL + filename return _download_file_from_url(url, filename)
def _decompress_gz_file(compressed_file, dest_file): """ Decompress the given `*.gz` file to the given destination path, and delete the original file. If destination path is not writable, the file will be written to a temporary directory, and new path is returned. :param compressed_file: Path to the file to decompress. :type compressed_file: str :param dest_file: Path to the file to write. :type dest_file: str :return: Path to the written file, which will be different form dest_file if CWD is not writable. :rtype: str """ outfile = None with gzip.open(compressed_file, 'rb') as in_fh: with open_filename(dest_file, 'w', encoding="utf-8") as out_fh: for line in in_fh: out_fh.write(line.decode()) # File path may be different form dest_file: outfile = out_fh.name os.remove(compressed_file) return outfile def _decompress_binary_gz_file(compressed_file, dest_file): """ Decompress the given binary `*.gz` file to the given destination path, and delete the original file. If destination path is not writable, the file will be written to a temporary directory, and new path is returned. :param compressed_file: Path to the file to decompress. :type compressed_file: str :param dest_file: Path to the file to write. :type dest_file: str :return: Path to the written file, which will be different form dest_file if CWD is not writable. :rtype: str """ outfile = None with gzip.open(compressed_file, 'rb') as in_fh: with open_filename(dest_file, 'wb') as out_fh: shutil.copyfileobj(in_fh, out_fh) # File path may be different form dest_file: outfile = out_fh.name os.remove(compressed_file) return outfile
[docs]def download_sf(pdb_code): """ Download the ENT file for the given PDB ID, converts it to CNS format, and returns the CNS file name. Will raise a RuntimeError if either download or conversion fails. Not every pdb has structure factor files deposited, and not every structure factor file will convert perfectly. """ try: ent_file = download_ent(pdb_code) except (RuntimeError, requests.HTTPError) as err: if "404" in str(err): msg = "No Structure factors deposited for %s" % pdb_code raise RuntimeError(msg) raise refconvert = os.path.join(os.environ['SCHRODINGER'], 'utilities', 'refconvert') cv_file = os.path.splitext(ent_file)[0] + '.sv' cmd = [ refconvert, '-icif', ent_file, '-ocns', cv_file, ] ret = subprocess.call(cmd) if ret != 0 or not os.path.isfile(cv_file): raise RuntimeError("Failed to convert ENT file: %s" % ent_file) # TODO: Remove the ent_file return cv_file
[docs]def download_fasta(pdb_code): """ Attempts to download the fasta file for the given PDB ID and chain. :type pdb_code: str :param pdb_code: PDB ID of the file to download """ # Currently, the RCSB website downloads from the http address, but the # https address also exists. url = f'https://www.rcsb.org/fasta/entry/{pdb_code}/download' fasta_file = f"{pdb_code}.fasta" _download_file_from_url(url, fasta_file) return fasta_file
[docs]def download_em_map(emdb_code): """ Attempts to download the EM map file for the given EMDB ID. :type emdb_code: str :param emdb_code: EMDB ID of the map file to download """ compressed_em_file = f"emd_{emdb_code}.map.gz" em_file = "emd_%s.map" % emdb_code url = f'{_EMDB_URL}/EMD-{emdb_code}/map/{compressed_em_file}' _download_file_from_url(url, compressed_em_file) em_file = _decompress_binary_gz_file(compressed_em_file, em_file) return em_file
[docs]def get_pdb(pdbid, source=AUTO, caps_asis=False): """ Attempts to get the specified PDB file from either the database or the web, depending on the source option. Default is AUTO, which attempts the database first, and then the web. pdbid - string of 4 characters source - one of: AUTO, DATABASE, WEB. :type caps_asis: bool :param caps_asis: True if the capitalization of pdbid should be preserved, False (default) if it should be converted to lowercase. :return: Path to the PDB file that was written (`*.pdb` or `*.cif`) :rtype: str :raises requests.HTTPError: if error in connection to RCSB :raises RuntimeError: for other error retreiving file """ if source == DATABASE: pdb_file = retrieve_pdb(pdbid, caps_asis=caps_asis) if not pdb_file: raise RuntimeError( "PDB '%s' could not be retrieved from the database" % pdbid) else: return pdb_file elif source == WEB: return download_pdb(pdbid) elif source == AUTO: pdb_file = retrieve_pdb(pdbid, caps_asis=caps_asis) if pdb_file: return pdb_file else: pdb_file = download_pdb(pdbid) return pdb_file else: raise ValueError("Invalid source")
[docs]def retrieve_pdb(pdbid, local_repos=None, verbose=False, caps_asis=False): """ Attempt to retrieve the PDB from the local repository First we look for current files ending in .gz or .Z, then obsolete files with the same endings. The file name we search for is: pdbXXXX.ent.Y where XXXX is the PDB code and Y is either gz or Z :type pdbid: str :param pdbid: the PDB code of the desired file :type local_repos: list of str :param local_repos: the paths to the parent directories of each local repository. :type caps_asis: bool :param caps_asis: True if the capitalization of pdbid should be preserved, False (default) if it should be converted to lowercase. :rtype: str :return: the name of the pdb file or None if a failure occurs """ local_pdb_file = find_local_pdb(pdbid, local_repos, verbose=verbose, caps_asis=caps_asis) if not local_pdb_file: return None if local_pdb_file.endswith('.gz'): myfile = gzip.open(local_pdb_file, 'rb') else: # A compress .Z file - there is no nice way in Python to handle this command = ['gzip', '-c', '-d', local_pdb_file] # Run the job and capture stdout: myfile = tempfile.TemporaryFile() subprocess.call(command, stdout=myfile, stderr=myfile) myfile.seek(0) # Transfer the compressed contents to a new, uncompressed file if caps_asis: uncompressed_pdb_file = pdbid + '.pdb' else: uncompressed_pdb_file = pdbid.lower() + '.pdb' with open_filename(uncompressed_pdb_file, 'wb') as fh: # NOTE: If CWD is not writable, the file will be written to temp dir. uncompressed_pdb_file = fh.name for line in myfile: fh.write(line) myfile.close() return uncompressed_pdb_file
[docs]def find_local_repository(verbose=False): """ Determine a directory list for local repositories. Note: the location of the PDB directory can be specified via environment variables; the order of precedence is: * SCHRODINGER_PDB * SCHRODINGER_THIRDPARTY/database/pdb * SCHRODINGER/thirdparty/database/pdb (the default) :type verbose: bool :param verbose: True if debugging messages should be printed to the screen :rtype: list of str :return: the paths to the parent directories of each local repository. Returns an empty list if the local repository cannot be determined. """ # Each environement variable has a different implied path to add to the end # of it. varlist = [('SCHRODINGER_PDB', ""), ('SCHRODINGER_THIRDPARTY', '/database/pdb'), ('SCHRODINGER', '/thirdparty/database/pdb')] local_repos = [] for var, pathend in varlist: try: envvar = os.environ[var] logger.debug('environment variable {} is set to {}'.format( var, envvar)) except KeyError: # Variable not defined continue if not envvar: logger.debug( 'environment variable {} is set but has no value'.format(var)) continue local_repos.append(envvar + pathend) if not local_repos: logger.debug('Local database is not found') return local_repos local_repos = [os.path.normpath(r) for r in local_repos] retval = [] for dir_name in local_repos: if os.path.isdir(dir_name) and dir_name not in retval: logger.debug('Local database found: {}'.format(dir_name)) retval.append(dir_name) else: logger.debug('Local database {} is not found'.format(dir_name)) return retval
[docs]def find_local_pdb(pdbid, local_repos=None, verbose=False, caps_asis=False): """ Check a series of local directories and filenames for the PDB files. First we look for current files ending in .gz or .Z, then obsolete files with the same endings. The file name we search for is: pdbXXXX.ent.Y where XXXX is the PDB code and Y is either gz or Z Note: the location of the PDB directory can be specified via environment variables; the order of precedence is: * SCHRODINGER_PDB * SCHRODINGER_THIRDPARTY * SCHRODINGER/thirdparty (the default) :type pdbid: str :param pdbid: the PDB code of the desired file :type local_repos: list of str :param local_repos: the paths to the parent directories of each local repository. :type verbose: bool :param verbose: True if debug messages should be printed out :type caps_asis: bool :param caps_asis: True if the capitalization of pdbid should be preserved, False (default) if it should be converted to lowercase. :rtype: str :return: the path to an existing file ith the desired PDB code """ if not local_repos: local_repos = find_local_repository(verbose=verbose) if not local_repos: return None # The PDB files are stored with a bit of a mangled name if caps_asis: file_id = pdbid else: file_id = pdbid.lower() filename = '.'.join(['pdb' + file_id, 'ent']) # PDB files are stored in an additional subdirectory under current or # obsolete_dir based on the middle two characters of the PDB code div_dir = file_id[1:3] # these are the subdirectories of each repository that we search sub_dirs = ['data/structures/%s/pdb/%s' % pair for \ pair in [('divided', div_dir), ('obsolete', div_dir), ('local', '')]] for repo in local_repos: for dir_name in sub_dirs: path = os.path.join(repo, dir_name) if not os.path.exists(path): continue for extension in ['.gz', '.Z']: name = os.path.join(path, filename + extension) logger.debug('Looking for: {}'.format(name)) if os.path.exists(name): logger.debug('Returning: {}'.format(name)) return name return None
[docs]def download_pdb(pdb_code, biological_unit=False, try_as_cif=True): """ Download the PDB record from www.rcsb.org into the CWD. If the PDB is too large to be downloaded as `*.pdb` file, it will be saved as `*.cif`. :param pdb_code: Four character alphanumeric string for the PDB id. :type pdb_code: str :param biological_unit: If True, and the file needs to be downloaded, then download the file at the biological unit URL, otherwise use the typical record URL. Default is False, get the typical record. # NOTE: This option is no longer used by PrepWizard, but still # used by getpdb_utility.py ($SCHRODINGER/utilities/getpdb) :type biological_unit: bool :param try_as_cif: Whether to try downloading the file as CIF format if the structure is too large to be represented in PDB format. :type try_as_cif: bool :return: Path to the downloaded file. :rtype: str :raises requests.HTTPError: if error in connection to RCSB or pdb ID does not exist :raises RuntimeError: for other error retreiving file """ logger.info("Downloading %s..." % pdb_code) try: if biological_unit: gz_file = download_file(pdb_code + '.pdb1.gz') out_file = pdb_code + '_bio1.pdb' else: gz_file = download_file(pdb_code + '.pdb.gz') out_file = pdb_code + '.pdb' except requests.exceptions.HTTPError as err: if not try_as_cif: raise if 'Not Found for url' in str(err): # Structure may be too large, attempt downloading as a CIF format. if biological_unit: gz_file = download_file(pdb_code + '-assembly1.cif.gz') out_file = pdb_code + '_bio1.cif' else: gz_file = download_file(pdb_code + '.cif.gz') out_file = pdb_code + '.cif' out_file = _decompress_gz_file(gz_file, out_file) return out_file
[docs]def download_cif(pdb_code): """ Download `*.cif` file from Web for a given PDB code. :param pdb_code: Four character alphanumeric string for the PDB id. :type pdb_code: str :return: Path to the downloaded file. :rtype: str :raises requests.HTTPError: if error in connection to RCSB or pdb ID does not exist """ gz_file = download_file(pdb_code + '.cif.gz') out_file = pdb_code + '.cif' out_file = _decompress_gz_file(gz_file, out_file) return out_file
[docs]def requests_retry_session(max_retries=3, backoff_factor=0.3, status_forcelist=RETRY_HTTP_CODES, session=None): """ Return a session to connect to a web url. In case of network failures the session will retry (number of re-attempts allowed is specified by `retries`) to connect to the url. :param retries: Total number of retries allowed :type retries: int :param backoff_factor: Backoff factor to apply between attempts after the second try. `urllib3` will sleep for: {backoff factor} * (2 ** ({number of total retries} - 1)) seconds before making next attempt. :type backoff_factor: float :param status_forcelist: Http error status codes for which retry will happen :type status_forcelist: iterable of int :param session: A session object :type session: requests.Session :return: A session object :rtype: requests.Session """ session = session or requests.Session() retries = retry.Retry(total=max_retries, read=max_retries, connect=max_retries, backoff_factor=backoff_factor, status_forcelist=status_forcelist) adapter = HTTPAdapter(max_retries=retries) session.mount('http://', adapter) session.mount('https://', adapter) return session
[docs]def retrieve_ent(pdbid): """ Retrieves the ENT file for the specified PDB ID from the third-party database and copies it to the CWD. File path is returned. Raises RuntimeError on error. """ # Ev:96694 schrodingerpath = os.environ['SCHRODINGER'] try: thirdpartypath = os.environ['SCHRODINGER_THIRDPARTY'] except KeyError: thirdpartypath = os.path.join(schrodingerpath, "thirdparty") filename = "pdb%s.ent.gz" % pdbid compressed_ent_file = os.path.join(thirdpartypath, "database", "pdb", "structures", "all", "pdb", filename) ent_file = filename.rstrip(".gz") #print 'ORIGINAL TEMPLATE FILE:', compressed_ent_file if not os.path.isfile(compressed_ent_file): raise RuntimeError("Template file is missing: %s" % compressed_ent_file) ent_file = _decompress_gz_file(compressed_ent_file, ent_file) return ent_file
[docs]def download_ent(pdbid): """ Downloads the ENT file for the specified PDB ID from the RCSB web site, and saves it to the CWD. File path is returned. :raises requests.HTTPError: if error in connection to RCSB :raises RuntimeError: for other error retreiving file """ # Ev:96694 compressed_ent_file = download_file(f'{pdbid}-sf.cif.gz') try: ent_file = _decompress_gz_file(compressed_ent_file, "%ssf.ent" % pdbid) except Exception as err: # Ev:71880 msg = "Failed to save downloded data.\nERROR: %s" % err raise RuntimeError(msg) return ent_file
[docs]def get_ent(pdbid, source=AUTO): """ Attempts to get the specified ENT file from either the database or the web, depending on the source option. Default is AUTO, which attempts the database first, and then the web. pdbid - string of 4 characters source - one of: AUTO, DATABASE, WEB. :raises requests.HTTPError: if error in connection to RCSB :raises RuntimeError: for other error retreiving file """ # Ev:96694 if source == DATABASE: return retrieve_ent(pdbid) elif source == WEB: return download_ent(pdbid) elif source == AUTO: try: ent_file = retrieve_ent(pdbid) except RuntimeError: return download_ent(pdbid) else: return ent_file else: raise ValueError("Invalid source")
[docs]def open_filename(filename, mode, encoding=None): """ Opens a filename, or a temporary filename, if filename is not writeable. The name may change and is accessible via name attribute on file object. """ try: return open(filename, mode, encoding=encoding) except IOError: temp_dir = fileutils.get_directory_path(fileutils.TEMP) return open(os.path.join(temp_dir, filename), mode, encoding=encoding)
[docs]def download_reflection_data(pdbid): """ Attempt to download reflection data type pdbid: str param pdbid: PDB ID """ ent_file = get_ent(pdbid) cv_file = "%s.cv" % pdbid cmd = ['refconvert', '-icif', ent_file, '-ocns', cv_file] subprocess.call(cmd) if os.path.isfile(cv_file): # Ev:71921: os.remove(ent_file) return cv_file else: msg = """Downloaded reflection data to: %s. Failed to convert to CV format using default refconvert options. For help, run: $SCHRODINGER/utilities/refconvert -help.""" % ent_file raise FileNotFoundError(msg)
if __name__ == '__main__': for pdb_code in sys.argv[1:]: download_pdb(pdb_code, True)