Source code for schrodinger.application.bioluminate.epitope.psc_workflow

# Written by:   Xiaohu Hu (xiaohu.hu@schrodinger.com)
# Date:         Jan 2022

import os
import pickle
import subprocess
import sys
import time

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# Requires Biophython modules for sequence data manipulation
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from scipy.sparse import triu

# ---------------------------------------------------------------------------- #


[docs]def save_mkdir(odir, overwrite): if not os.path.isdir(odir): os.system('mkdir %s' % odir) else: if overwrite: print("Overwriting content in '%s'" % odir) #os.system('rm -r %s' % output_dir) #os.system('mkdir %s' % output_dir) else: print( "'%s' already exists, use '--overwrite' to continue if you wish to overwrite the existing files " "with the same names." % odir) sys.exit(2)
# ---------------------------------------------------------------------------- #
[docs]def gen_fastas(spreadsheet_file=None, output_dir=None, overwrite=False): """ Read an Excel spreadsheet that contains at least 3 columns with the column names, 'ID', 'VH' and 'VL', indicating the names/labels of the antibodies, heavy chain and light chain sequences, respectively. """ if spreadsheet_file.split('.')[-1] == 'xlsx': print('Input file is an Microsoft Excel spreadsheet file.') input_df = pd.read_excel(spreadsheet_file) elif spreadsheet_file.split('.')[-1] == 'csv': print('Input file is a csv spreadsheet file.') input_df = pd.read_csv(spreadsheet_file) else: print("Unknown file type: %s" % spreadsheet_file) print("Only 'xlsx' or 'csv' are allowed.") sys.exit(1) save_mkdir(output_dir, overwrite=overwrite) for index, row in input_df.iterrows(): ab_name = row['ID'] vh_seq = SeqRecord(Seq(row['VH']), id=ab_name + ':H', description='VH domain') vl_seq = SeqRecord(Seq(row['VL']), id=ab_name + ':L', description='VL domain') # print(vh_seq) fastas_name = u'%s/%s.fasta' % (output_dir, ab_name) fo = open(fastas_name, 'wt') SeqIO.write(vh_seq, fo, "fasta") SeqIO.write(vl_seq, fo, "fasta") fo.close() print("fastas file '%s' generated." % fastas_name)
# print("Fasta sequence files saved in '%s'" % output_dir) # ---------------------------------------------------------------------------- #
[docs]class psc_workflow: """ Class that creates an paratope similarity clustering workflow object """ # ---------------------------------------------------------------------------- # # Location of the local utility scripts util_scripts_dir = './utils/' # User inputs fastas_dir = None project_name = None # Data dirs data_dir = None fab_dir = None build_ab_log_dir = None faux_epi_dir = None mif_dir = None ### BioLuminate antibody builder input parameters ### # Specify antibody residue numbering scheme, using BioLuminate default numbering_scheme = 'EnhancedChothia' nmodels = 1 # Number of models to be built for each antibody sequence overwrite_data = None # Flag for if overwrite data when using the same file directory names # Class data S = None # Similarity matrix D = None # Distance matrix D1d = None # 1D condensed distance matrix L = None # Linkage matrix, needed for scipy dendrogram plotting function N_fab = None # Number of antibody for the clustering ID_fab = None # Labels of the antibodies used in the plots # Default plot settings plot_width = 7 plot_height = 6 dpi = 300 # ---------------------------------------------------------------------------- #
[docs] def __init__(self, project_name, fastas_dir, overwrite_data=False): print('\nInitiating workflow object...\n') self.fastas_dir = fastas_dir self.project_name = project_name self.overwrite_data = overwrite_data ## Location of the intermediate output files self.data_dir = 'project_data_' + self.project_name.replace(' ', '_') + '/' self.safe_mkdir(self.data_dir) # Location of the Fab maestro files self.fab_dir = self.data_dir + 'fab_structures/' self.build_ab_log_dir = self.data_dir + 'build_ab_log/' self.safe_mkdir(self.fab_dir) self.safe_mkdir(self.build_ab_log_dir) # Location of the faux epitopes self.faux_epi_dir = self.data_dir + 'faux_epitopes/' self.safe_mkdir(self.faux_epi_dir) # Location of the molecular interaction fields self.mif_dir = self.data_dir + 'mifs/' self.safe_mkdir(self.mif_dir)
# ---------------------------------------------------------------------------- #
[docs] def safe_mkdir(self, dir): if not os.path.isdir(dir): os.system("mkdir %s" % dir) else: if self.overwrite_data: print( "Warning: '%s' already exists, will overwrite its content" % dir) else: print( "'%s' already exists, set 'overwrite_data=True' to overwrite" % dir) sys.exit()
# -------------------------------------------------------------------------- #
[docs] def gen_file_list(self, dir): return [dir + item for item in os.listdir(dir)]
# -------------------------------------------------------------------------- #
[docs] def gen_names(self, dir): return [item.split('.')[0] for item in os.listdir(dir)]
# -------------------------------------------------------------------------- #
[docs] def gen_jobname(self, filename): """ Remove '.fasta' from the fastas file name and use the base for the jobname. """ return filename.split('.')[0]
# -------------------------------------------------------------------------- #
[docs] def save_mat_csv(self, mat, IDs, mat_name): df = pd.DataFrame(mat, columns=IDs) df.insert(0, '', IDs, True) n = self.N_fab csv_file = self.data_dir + '%s_%dx%d.csv' % (mat_name, n, n) df.to_csv(csv_file, index=False) print("Matrix saved as %s" % csv_file)
# -------------------------------------------------------------------------- #
[docs] def build_ab(self, fasta_file, jobname): """ Given a fasta file containing the sequences of Vh and Vl domains, create an antibody model. """ # First build an antibody model from the sequence in the fastas file using # the templates from the data base. build_antibody_command = "$SCHRODINGER/run -WAIT -FROM bioluminate build_antibody.py %s -scheme %s -nreport 10 -nframe %d -jobname %s" % ( fasta_file, self.numbering_scheme, self.nmodels, jobname) #print("Executing: ",build_antibody_command) os.system(build_antibody_command) os.system('mv %s-out.maegz %s%s.maegz' % (jobname, self.fab_dir, jobname)) os.system('mv %s.log %s' % (jobname, self.build_ab_log_dir))
# ---------------------------------------------------------------------------- #
[docs] def gen_faux_epitopes(self): """ Generate faux epitopes from Fabs Maestro files in the fab_dir """ faux_epitope_list = [] ts = time.time() fab_mae_list = self.gen_file_list(self.fab_dir) fab_names = self.gen_names(self.fab_dir) N = len(fab_names) for i in range(N): print("\rGenerating faux epitopes %d/%d" % (i + 1, N), end="\r\r") faux_epit_mae = "faux_epitope_%s.mae" % fab_names[i] run_faux_epitope = "$SCHRODINGER/run %s/faux-epitope.py %s %s/%s" % ( self.util_scripts_dir, fab_mae_list[i], self.faux_epi_dir, faux_epit_mae) os.system(run_faux_epitope) #print(run_faux_epitope) faux_epitope_list.append(faux_epit_mae) te = time.time() duration = (te - ts) / 60 print("\n>>> Faux epitope generation took %.2f minutes\n" % duration)
# -------------------------------------------------------------------------- #
[docs] def gen_mifs(self): """ Generate MIFs """ mif_list = [] ts = time.time() fab_mae_list = self.gen_file_list(self.fab_dir) fab_names = self.gen_names(self.fab_dir) # It's important to make sure the faux epitope is matched with the right Fab, otherwise # this step will result in traceback, so generate faux epitope file list in the exact # order as the Fab file list. faux_epi_list = [ self.faux_epi_dir + "/faux_epitope_%s.mae" % item for item in fab_names ] duration = 0 N = len(fab_names) for i in range(N): te = time.time() duration = (te - ts) / 60 print( "\rGenerating MIF %d/%d, current running time ~ %.2f min. (~ %.2f min. per calculation)" % (i + 1, N, duration, duration / float(i + 1)), end="\r\r") mif_name = "if_%s.json" % fab_names[i] run_extract_bsite = "$SCHRODINGER/run %s/bindingsite_ab_extract_19-3.py -L %s %s -i %s/%s" % ( self.util_scripts_dir, faux_epi_list[i], fab_mae_list[i], self.mif_dir, mif_name) #print(run_extract_bsite) os.system(run_extract_bsite) mif_list.append(mif_name) te = time.time() duration = (te - ts) / 60 print("\n>>> MIFs generation took %.2f minutes\n" % duration)
# -------------------------------------------------------------------------- #
[docs] def gen_sim_mat(self): """ Compute the pairwise similarity matrix between binding sites using Phase Shape approach. Compare all i,j pairs of MIFs in the mif_dir. """ ts = time.time() schro = os.environ['SCHRODINGER'] if '2021-1' in schro: mmshare = '5.3' elif '2021-2' in schro: mmshare = '5.4' elif '2021-3' in schro: mmshare = '5.5' elif '2021-4' in schro: mmshare = '5.6' else: print( 'Error: The current version of the workflow only works with a 2021-X schrodinger release.' ) sys.exit(3) mif_list = self.gen_file_list(self.mif_dir) mif_names = self.gen_names(self.mif_dir) N = len(mif_list) sim_mat = np.empty((N, N)) sim_mat[:] = -1.0 Ntot = N * N - N counter = 1 IDs = [name.split('_')[1] for name in mif_names] for i in range(N): for j in range(N): if (i != j): te = time.time() duration = (te - ts) / 60 print( "\rComparing pair: %d-%d (%d/%d pairs), current running time ~ %.2f min. (~ %.2f min. per pair)" % (i + 1, j + 1, counter, Ntot, duration, duration / counter), end="\r\r") result = subprocess.run( [ '%s/run' % schro, '%s/mmshare-v%s/python/scripts/bindingsite_compare.py' % (schro, mmshare), mif_list[i], mif_list[j], #'--force-gpu', #'--maxMapPerAtom','4', #'--maxAtomsToMap','32' ], stdout=subprocess.PIPE) sim_mat[i, j] = float( str(result).split(' ')[-1].split('\\')[0]) counter += 1 else: sim_mat[i, j] = 1.0 te = time.time() duration = (te - ts) / 60 print("\n>>> Similarity matrix generation took %.2f minutes\n" % duration) # update the class data self.S = np.copy(sim_mat) self.ID_fab = np.copy(IDs) self.N_fab = N # Pickle the results pickle.dump([sim_mat, IDs], open( self.data_dir + 'raw_sim_matrix_data_%dx%d.pck' % (self.N_fab, self.N_fab), 'wb'))
# -------------------------------------------------------------------------- #
[docs] def sim2dist(self): """ Convert the simularity matrix S in to a symmetric, normalizd distance matrix D """ n, m = self.S.shape Su = np.copy(self.S) Sl = np.copy(self.S) # Create the transposed upper triangle ind_tril = np.tril_indices(n, -1) Sl[ind_tril] = 0 Sut = Sl.transpose() # Create the transposed lower triangle ind_triu = np.triu_indices(n, 1) Su[ind_triu] = 0 Slt = Su.transpose() # "Symmetrize" the raw similarity matrix S_sym = (self.S + Slt + Sut) / 2 S_sym /= np.amax(S_sym) # Convert the symmetrized similarity matrix to a "distance" matrix D = 1 - S_sym # Normalize D D = D / np.amax(D) self.D = D
# -------------------------------------------------------------------------- #
[docs] def cond_1d_dist_mat(self, triag='upper'): """ Convert the fully symmetric distance matrix into a condensed 1D distance matrix triag = 'upper' or 'lower', specifying whether the upper or lower triangle to use. It shouldn't matter since D is fully symmetric and both should yield identical results. """ Dt = triu(self.D, k=0).toarray() D_cond = [] for item in Dt: trimmed = np.delete(item, np.where(item == 0.0)) if trimmed.size != 0: D_cond.append(trimmed) self.D1d = np.hstack(D_cond)
# -------------------------------------------------------------------------- #
[docs] def plot_mat(self, data, plot_title, cmap): fig, ax = plt.subplots(figsize=(self.plot_height, self.plot_height), dpi=150) nx, ny = data.shape ax.set_xticks(np.arange(0, nx + 1, 1), minor=False) ax.set_yticks(np.arange(0, ny + 1, 1), minor=False) ax.set_xticklabels(self.ID_fab) ax.set_yticklabels(self.ID_fab) color_map = ax.imshow( data, cmap=cmap, aspect='equal', #interpolation='bicubic',alpha=1.0, vmin=0, vmax=1) ax.xaxis.set_ticks_position('bottom') plt.colorbar(color_map, fraction=0.046, pad=0.04) color_map.cmap.set_under('snow') plt.xticks(rotation=90) plt.title('%s' % plot_title) plt.tight_layout() plt.savefig(self.data_dir + '%s.png' % plot_title.replace(' ', '_'), dpi=self.dpi)
#plt.show() # -------------------------------------------------------------------------- #
[docs] def generate_dendrogram(self, cutoff): """ Perform clustering using the distance matrix """ from scipy.cluster.hierarchy import dendrogram from scipy.cluster.hierarchy import linkage # Convert D to a 1D condensed distance matrix as required by the scipy # clusrering function we are using self.cond_1d_dist_mat() # Hierarchical clustering using single linkage self.L = linkage(self.D1d, 'single') # Generate the dendrogram plot plt.subplots(figsize=(self.plot_width, self.plot_height), dpi=150) dendrogram(self.L, labels=self.ID_fab, color_threshold=cutoff, distance_sort=True, show_leaf_counts=False, orientation='left') plt.xlabel('Distance (arbitrary unit)') dmax = np.amax(self.D[np.where(self.D != 1)]) dmin = np.amin(self.D[np.where(self.D != 0)]) dbuff = (dmax - dmin) * 0.05 plt.xlim(dmax + dbuff, dmin - dbuff) plt.plot([cutoff, cutoff], [0, 1e5], 'k--', label='Cut-off = %.2f' % cutoff) plt.legend(loc='upper left') plt.savefig(self.data_dir + 'dendrogram_%s_%dFabs.png' % (self.project_name, self.N_fab), dpi=self.dpi)
#plt.show() # -------------------------------------------------------------------------- #
[docs] def build_antibody_models(self, debug): """ Build antibody models from the fasta sequences """ # Get the list of fastas files in the input directory fastas_list = None if os.path.isdir(self.fastas_dir): if not os.listdir(self.fastas_dir): print("Error: '%s' is empty\n" % self.fastas_dir) sys.exit() else: fastas_list = os.listdir(self.fastas_dir) else: print("Error: '%s' not found\n" % self.fastas_dir) sys.exit() fab_names = [] results = [] # The full outout of the BioSQPR aggre_scores = [] # The overall aggregation propensity score nfastas = len(fastas_list) # Loop through all the fastas sequences in the input folder and build antibody models from the sequences if debug: Nbuild = 3 print("\nDebug mode - only build the first 3 models for testing\n") else: Nbuild = nfastas ts = time.time() for i in range(Nbuild): f = fastas_list[i] jobname = self.gen_jobname(f) fab_names.append(jobname) te = time.time() duration = (te - ts) / 60 print("\rProcessing antibody %d/%d (~ %.2f min. per calculation)" % (i + 1, Nbuild, duration / float(i + 1)), end="\r\r") fasta_file = os.path.join(self.fastas_dir, f) self.build_ab(fasta_file, jobname) te = time.time() duration = (te - ts) / 60 print("\n>>> Building structural models took about %.1f minutes\n" % duration)
# -------------------------------------------------------------------------- #
[docs] def run_full_workflow(self, cutoff, debug=False): """ Main driver that executes the entire workflow """ starting_time = time.time() print( '\n-----------------------------------------------------------------' ) print(' Paratope Similarity Clustering workflow for: %s' % self.project_name) print( '-----------------------------------------------------------------') self.build_antibody_models(debug=debug) # step 1 self.gen_faux_epitopes() # step 2 self.gen_mifs() # step 3 self.gen_sim_mat() #step 4 self.sim2dist() #step 5 self.generate_dendrogram(cutoff) # step 6 # Write out the raw similartiy matrix and the normalized distance matrix as CSV files self.save_mat_csv(self.S, self.ID_fab, 'raw_similarity_matrix') self.save_mat_csv(self.D, self.ID_fab, 'normalized_distance_matrix') # Generate heat map plots of these matrices self.plot_mat(self.S, 'Raw Similarity Matrix', 'seismic') self.plot_mat(self.D, 'Normalized Distance Matrix', None) ending_time = time.time() time_elapsed = (ending_time - starting_time) / 60 print("\n>>> The workflow took %.1f minutes\n" % time_elapsed)
# -------------------------------------------------------------------------- #
[docs] def clean(self): os.system('rm -r %s' % self.data_dir)
# -------------------------------------------------------------------------- # ### END of the CLASS ###