# Written by: Xiaohu Hu (xiaohu.hu@schrodinger.com)
# Date: Jan 2022
import os
import pickle
import subprocess
import sys
import time
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
# Requires Biophython modules for sequence data manipulation
from Bio import SeqIO
from Bio.Seq import Seq
from Bio.SeqRecord import SeqRecord
from scipy.sparse import triu
# ---------------------------------------------------------------------------- #
[docs]def save_mkdir(odir, overwrite):
if not os.path.isdir(odir):
os.system('mkdir %s' % odir)
else:
if overwrite:
print("Overwriting content in '%s'" % odir)
#os.system('rm -r %s' % output_dir)
#os.system('mkdir %s' % output_dir)
else:
print(
"'%s' already exists, use '--overwrite' to continue if you wish to overwrite the existing files "
"with the same names." % odir)
sys.exit(2)
# ---------------------------------------------------------------------------- #
[docs]def gen_fastas(spreadsheet_file=None, output_dir=None, overwrite=False):
"""
Read an Excel spreadsheet that contains at least 3 columns with the column names, 'ID', 'VH' and 'VL', indicating
the names/labels of the antibodies, heavy chain and light chain sequences, respectively.
"""
if spreadsheet_file.split('.')[-1] == 'xlsx':
print('Input file is an Microsoft Excel spreadsheet file.')
input_df = pd.read_excel(spreadsheet_file)
elif spreadsheet_file.split('.')[-1] == 'csv':
print('Input file is a csv spreadsheet file.')
input_df = pd.read_csv(spreadsheet_file)
else:
print("Unknown file type: %s" % spreadsheet_file)
print("Only 'xlsx' or 'csv' are allowed.")
sys.exit(1)
save_mkdir(output_dir, overwrite=overwrite)
for index, row in input_df.iterrows():
ab_name = row['ID']
vh_seq = SeqRecord(Seq(row['VH']),
id=ab_name + ':H',
description='VH domain')
vl_seq = SeqRecord(Seq(row['VL']),
id=ab_name + ':L',
description='VL domain')
# print(vh_seq)
fastas_name = u'%s/%s.fasta' % (output_dir, ab_name)
fo = open(fastas_name, 'wt')
SeqIO.write(vh_seq, fo, "fasta")
SeqIO.write(vl_seq, fo, "fasta")
fo.close()
print("fastas file '%s' generated." % fastas_name)
# print("Fasta sequence files saved in '%s'" % output_dir)
# ---------------------------------------------------------------------------- #
[docs]class psc_workflow:
"""
Class that creates an paratope similarity clustering workflow object
"""
# ---------------------------------------------------------------------------- #
# Location of the local utility scripts
util_scripts_dir = './utils/'
# User inputs
fastas_dir = None
project_name = None
# Data dirs
data_dir = None
fab_dir = None
build_ab_log_dir = None
faux_epi_dir = None
mif_dir = None
### BioLuminate antibody builder input parameters ###
# Specify antibody residue numbering scheme, using BioLuminate default
numbering_scheme = 'EnhancedChothia'
nmodels = 1 # Number of models to be built for each antibody sequence
overwrite_data = None # Flag for if overwrite data when using the same file directory names
# Class data
S = None # Similarity matrix
D = None # Distance matrix
D1d = None # 1D condensed distance matrix
L = None # Linkage matrix, needed for scipy dendrogram plotting function
N_fab = None # Number of antibody for the clustering
ID_fab = None # Labels of the antibodies used in the plots
# Default plot settings
plot_width = 7
plot_height = 6
dpi = 300
# ---------------------------------------------------------------------------- #
[docs] def __init__(self, project_name, fastas_dir, overwrite_data=False):
print('\nInitiating workflow object...\n')
self.fastas_dir = fastas_dir
self.project_name = project_name
self.overwrite_data = overwrite_data
## Location of the intermediate output files
self.data_dir = 'project_data_' + self.project_name.replace(' ',
'_') + '/'
self.safe_mkdir(self.data_dir)
# Location of the Fab maestro files
self.fab_dir = self.data_dir + 'fab_structures/'
self.build_ab_log_dir = self.data_dir + 'build_ab_log/'
self.safe_mkdir(self.fab_dir)
self.safe_mkdir(self.build_ab_log_dir)
# Location of the faux epitopes
self.faux_epi_dir = self.data_dir + 'faux_epitopes/'
self.safe_mkdir(self.faux_epi_dir)
# Location of the molecular interaction fields
self.mif_dir = self.data_dir + 'mifs/'
self.safe_mkdir(self.mif_dir)
# ---------------------------------------------------------------------------- #
[docs] def safe_mkdir(self, dir):
if not os.path.isdir(dir):
os.system("mkdir %s" % dir)
else:
if self.overwrite_data:
print(
"Warning: '%s' already exists, will overwrite its content" %
dir)
else:
print(
"'%s' already exists, set 'overwrite_data=True' to overwrite"
% dir)
sys.exit()
# -------------------------------------------------------------------------- #
[docs] def gen_file_list(self, dir):
return [dir + item for item in os.listdir(dir)]
# -------------------------------------------------------------------------- #
[docs] def gen_names(self, dir):
return [item.split('.')[0] for item in os.listdir(dir)]
# -------------------------------------------------------------------------- #
[docs] def gen_jobname(self, filename):
"""
Remove '.fasta' from the fastas file name and use the base for
the jobname.
"""
return filename.split('.')[0]
# -------------------------------------------------------------------------- #
[docs] def save_mat_csv(self, mat, IDs, mat_name):
df = pd.DataFrame(mat, columns=IDs)
df.insert(0, '', IDs, True)
n = self.N_fab
csv_file = self.data_dir + '%s_%dx%d.csv' % (mat_name, n, n)
df.to_csv(csv_file, index=False)
print("Matrix saved as %s" % csv_file)
# -------------------------------------------------------------------------- #
[docs] def build_ab(self, fasta_file, jobname):
"""
Given a fasta file containing the sequences of Vh and Vl domains, create
an antibody model.
"""
# First build an antibody model from the sequence in the fastas file using
# the templates from the data base.
build_antibody_command = "$SCHRODINGER/run -WAIT -FROM bioluminate build_antibody.py %s -scheme %s -nreport 10 -nframe %d -jobname %s" % (
fasta_file, self.numbering_scheme, self.nmodels, jobname)
#print("Executing: ",build_antibody_command)
os.system(build_antibody_command)
os.system('mv %s-out.maegz %s%s.maegz' %
(jobname, self.fab_dir, jobname))
os.system('mv %s.log %s' % (jobname, self.build_ab_log_dir))
# ---------------------------------------------------------------------------- #
[docs] def gen_faux_epitopes(self):
"""
Generate faux epitopes from Fabs Maestro files in the fab_dir
"""
faux_epitope_list = []
ts = time.time()
fab_mae_list = self.gen_file_list(self.fab_dir)
fab_names = self.gen_names(self.fab_dir)
N = len(fab_names)
for i in range(N):
print("\rGenerating faux epitopes %d/%d" % (i + 1, N), end="\r\r")
faux_epit_mae = "faux_epitope_%s.mae" % fab_names[i]
run_faux_epitope = "$SCHRODINGER/run %s/faux-epitope.py %s %s/%s" % (
self.util_scripts_dir, fab_mae_list[i], self.faux_epi_dir,
faux_epit_mae)
os.system(run_faux_epitope)
#print(run_faux_epitope)
faux_epitope_list.append(faux_epit_mae)
te = time.time()
duration = (te - ts) / 60
print("\n>>> Faux epitope generation took %.2f minutes\n" % duration)
# -------------------------------------------------------------------------- #
[docs] def gen_mifs(self):
"""
Generate MIFs
"""
mif_list = []
ts = time.time()
fab_mae_list = self.gen_file_list(self.fab_dir)
fab_names = self.gen_names(self.fab_dir)
# It's important to make sure the faux epitope is matched with the right Fab, otherwise
# this step will result in traceback, so generate faux epitope file list in the exact
# order as the Fab file list.
faux_epi_list = [
self.faux_epi_dir + "/faux_epitope_%s.mae" % item
for item in fab_names
]
duration = 0
N = len(fab_names)
for i in range(N):
te = time.time()
duration = (te - ts) / 60
print(
"\rGenerating MIF %d/%d, current running time ~ %.2f min. (~ %.2f min. per calculation)"
% (i + 1, N, duration, duration / float(i + 1)),
end="\r\r")
mif_name = "if_%s.json" % fab_names[i]
run_extract_bsite = "$SCHRODINGER/run %s/bindingsite_ab_extract_19-3.py -L %s %s -i %s/%s" % (
self.util_scripts_dir, faux_epi_list[i], fab_mae_list[i],
self.mif_dir, mif_name)
#print(run_extract_bsite)
os.system(run_extract_bsite)
mif_list.append(mif_name)
te = time.time()
duration = (te - ts) / 60
print("\n>>> MIFs generation took %.2f minutes\n" % duration)
# -------------------------------------------------------------------------- #
[docs] def gen_sim_mat(self):
"""
Compute the pairwise similarity matrix between binding sites using Phase Shape approach.
Compare all i,j pairs of MIFs in the mif_dir.
"""
ts = time.time()
schro = os.environ['SCHRODINGER']
if '2021-1' in schro:
mmshare = '5.3'
elif '2021-2' in schro:
mmshare = '5.4'
elif '2021-3' in schro:
mmshare = '5.5'
elif '2021-4' in schro:
mmshare = '5.6'
else:
print(
'Error: The current version of the workflow only works with a 2021-X schrodinger release.'
)
sys.exit(3)
mif_list = self.gen_file_list(self.mif_dir)
mif_names = self.gen_names(self.mif_dir)
N = len(mif_list)
sim_mat = np.empty((N, N))
sim_mat[:] = -1.0
Ntot = N * N - N
counter = 1
IDs = [name.split('_')[1] for name in mif_names]
for i in range(N):
for j in range(N):
if (i != j):
te = time.time()
duration = (te - ts) / 60
print(
"\rComparing pair: %d-%d (%d/%d pairs), current running time ~ %.2f min. (~ %.2f min. per pair)"
% (i + 1, j + 1, counter, Ntot, duration,
duration / counter),
end="\r\r")
result = subprocess.run(
[
'%s/run' % schro,
'%s/mmshare-v%s/python/scripts/bindingsite_compare.py'
% (schro, mmshare),
mif_list[i],
mif_list[j],
#'--force-gpu',
#'--maxMapPerAtom','4',
#'--maxAtomsToMap','32'
],
stdout=subprocess.PIPE)
sim_mat[i, j] = float(
str(result).split(' ')[-1].split('\\')[0])
counter += 1
else:
sim_mat[i, j] = 1.0
te = time.time()
duration = (te - ts) / 60
print("\n>>> Similarity matrix generation took %.2f minutes\n" %
duration)
# update the class data
self.S = np.copy(sim_mat)
self.ID_fab = np.copy(IDs)
self.N_fab = N
# Pickle the results
pickle.dump([sim_mat, IDs],
open(
self.data_dir + 'raw_sim_matrix_data_%dx%d.pck' %
(self.N_fab, self.N_fab), 'wb'))
# -------------------------------------------------------------------------- #
[docs] def sim2dist(self):
"""
Convert the simularity matrix S in to a symmetric, normalizd distance matrix D
"""
n, m = self.S.shape
Su = np.copy(self.S)
Sl = np.copy(self.S)
# Create the transposed upper triangle
ind_tril = np.tril_indices(n, -1)
Sl[ind_tril] = 0
Sut = Sl.transpose()
# Create the transposed lower triangle
ind_triu = np.triu_indices(n, 1)
Su[ind_triu] = 0
Slt = Su.transpose()
# "Symmetrize" the raw similarity matrix
S_sym = (self.S + Slt + Sut) / 2
S_sym /= np.amax(S_sym)
# Convert the symmetrized similarity matrix to a "distance" matrix
D = 1 - S_sym
# Normalize D
D = D / np.amax(D)
self.D = D
# -------------------------------------------------------------------------- #
[docs] def cond_1d_dist_mat(self, triag='upper'):
"""
Convert the fully symmetric distance matrix into a condensed 1D distance
matrix triag = 'upper' or 'lower', specifying whether the upper or lower
triangle to use. It shouldn't matter since D is fully symmetric and both
should yield identical results.
"""
Dt = triu(self.D, k=0).toarray()
D_cond = []
for item in Dt:
trimmed = np.delete(item, np.where(item == 0.0))
if trimmed.size != 0:
D_cond.append(trimmed)
self.D1d = np.hstack(D_cond)
# -------------------------------------------------------------------------- #
[docs] def plot_mat(self, data, plot_title, cmap):
fig, ax = plt.subplots(figsize=(self.plot_height, self.plot_height),
dpi=150)
nx, ny = data.shape
ax.set_xticks(np.arange(0, nx + 1, 1), minor=False)
ax.set_yticks(np.arange(0, ny + 1, 1), minor=False)
ax.set_xticklabels(self.ID_fab)
ax.set_yticklabels(self.ID_fab)
color_map = ax.imshow(
data,
cmap=cmap,
aspect='equal',
#interpolation='bicubic',alpha=1.0,
vmin=0,
vmax=1)
ax.xaxis.set_ticks_position('bottom')
plt.colorbar(color_map, fraction=0.046, pad=0.04)
color_map.cmap.set_under('snow')
plt.xticks(rotation=90)
plt.title('%s' % plot_title)
plt.tight_layout()
plt.savefig(self.data_dir + '%s.png' % plot_title.replace(' ', '_'),
dpi=self.dpi)
#plt.show()
# -------------------------------------------------------------------------- #
[docs] def generate_dendrogram(self, cutoff):
"""
Perform clustering using the distance matrix
"""
from scipy.cluster.hierarchy import dendrogram
from scipy.cluster.hierarchy import linkage
# Convert D to a 1D condensed distance matrix as required by the scipy
# clusrering function we are using
self.cond_1d_dist_mat()
# Hierarchical clustering using single linkage
self.L = linkage(self.D1d, 'single')
# Generate the dendrogram plot
plt.subplots(figsize=(self.plot_width, self.plot_height), dpi=150)
dendrogram(self.L,
labels=self.ID_fab,
color_threshold=cutoff,
distance_sort=True,
show_leaf_counts=False,
orientation='left')
plt.xlabel('Distance (arbitrary unit)')
dmax = np.amax(self.D[np.where(self.D != 1)])
dmin = np.amin(self.D[np.where(self.D != 0)])
dbuff = (dmax - dmin) * 0.05
plt.xlim(dmax + dbuff, dmin - dbuff)
plt.plot([cutoff, cutoff], [0, 1e5],
'k--',
label='Cut-off = %.2f' % cutoff)
plt.legend(loc='upper left')
plt.savefig(self.data_dir + 'dendrogram_%s_%dFabs.png' %
(self.project_name, self.N_fab),
dpi=self.dpi)
#plt.show()
# -------------------------------------------------------------------------- #
[docs] def build_antibody_models(self, debug):
"""
Build antibody models from the fasta sequences
"""
# Get the list of fastas files in the input directory
fastas_list = None
if os.path.isdir(self.fastas_dir):
if not os.listdir(self.fastas_dir):
print("Error: '%s' is empty\n" % self.fastas_dir)
sys.exit()
else:
fastas_list = os.listdir(self.fastas_dir)
else:
print("Error: '%s' not found\n" % self.fastas_dir)
sys.exit()
fab_names = []
results = [] # The full outout of the BioSQPR
aggre_scores = [] # The overall aggregation propensity score
nfastas = len(fastas_list)
# Loop through all the fastas sequences in the input folder and build antibody models from the sequences
if debug:
Nbuild = 3
print("\nDebug mode - only build the first 3 models for testing\n")
else:
Nbuild = nfastas
ts = time.time()
for i in range(Nbuild):
f = fastas_list[i]
jobname = self.gen_jobname(f)
fab_names.append(jobname)
te = time.time()
duration = (te - ts) / 60
print("\rProcessing antibody %d/%d (~ %.2f min. per calculation)" %
(i + 1, Nbuild, duration / float(i + 1)),
end="\r\r")
fasta_file = os.path.join(self.fastas_dir, f)
self.build_ab(fasta_file, jobname)
te = time.time()
duration = (te - ts) / 60
print("\n>>> Building structural models took about %.1f minutes\n" %
duration)
# -------------------------------------------------------------------------- #
[docs] def run_full_workflow(self, cutoff, debug=False):
"""
Main driver that executes the entire workflow
"""
starting_time = time.time()
print(
'\n-----------------------------------------------------------------'
)
print(' Paratope Similarity Clustering workflow for: %s' %
self.project_name)
print(
'-----------------------------------------------------------------')
self.build_antibody_models(debug=debug) # step 1
self.gen_faux_epitopes() # step 2
self.gen_mifs() # step 3
self.gen_sim_mat() #step 4
self.sim2dist() #step 5
self.generate_dendrogram(cutoff) # step 6
# Write out the raw similartiy matrix and the normalized distance matrix as CSV files
self.save_mat_csv(self.S, self.ID_fab, 'raw_similarity_matrix')
self.save_mat_csv(self.D, self.ID_fab, 'normalized_distance_matrix')
# Generate heat map plots of these matrices
self.plot_mat(self.S, 'Raw Similarity Matrix', 'seismic')
self.plot_mat(self.D, 'Normalized Distance Matrix', None)
ending_time = time.time()
time_elapsed = (ending_time - starting_time) / 60
print("\n>>> The workflow took %.1f minutes\n" % time_elapsed)
# -------------------------------------------------------------------------- #
[docs] def clean(self):
os.system('rm -r %s' % self.data_dir)
# -------------------------------------------------------------------------- #
### END of the CLASS ###