Source code for schrodinger.utils.sshconfig

#!/usr/bin/env python
"""
The sshconfig module contains code to configure Passwordless SSH.

Example:

A) Get the keypair to use using find_key_pair(..).
B) Check the keypair is okay before local host ssh setup using
   check_key_pair(..).
C) If the keypair is not okay, generate keypair in local system using
   configure_local_ssh_setup().
D) Setup passwordless SSH to desired host using install_remote_pubkey(..).

::

    from schrodinger.utils import sshconfig

    (priv_file, pub_file) = sshconfig.find_key_pair()
    pub_key = sshconfig.check_key_pair(priv_file, pub_file)
    if not pub_key:
        pub_key = sshconfig.configure_local_ssh_setup(priv_file, pub_file)
    sshconfig.install_remote_pubkey("bobio", "rajagopa", "blah-blah", pub_key)
"""
import base64
import getpass
import os
import re
import shutil
import stat
import sys
import warnings

from schrodinger.utils import fileutils

from . import subprocess

# Remove warning filter after BLDMGR-2748
with warnings.catch_warnings():
    try:
        import Crypto.pct_warnings
        warnings.simplefilter("ignore", Crypto.pct_warnings.PowmInsecureWarning)
    except ImportError:  # once paramiko is updated, it doesn't depend on pycrypto
        pass
    import paramiko

GENERATE_KEY_BITS = 4096

##########################################################################


def _get_pub_key(pub_file):
    """
    Get the public key from SSH keypair setup on your local machine.

    :type pub_file: string
    :param pub_file:
            Public key file to read and get the data.

    :returntype: string
    :return:
            public key as base64 string.
    """

    pub_key_text = ""
    try:
        with open(pub_file) as f:
            if sys.platform == "win32":
                pub_footer = ""
                for line in f:
                    line = line.rstrip()
                    if line.find("BEGIN ") != -1:
                        pub_key_text = "ssh-rsa "
                        continue
                    elif line.find("END ") != -1:
                        continue
                    elif line.find("Comment:") == 0:
                        continue
                    else:
                        pub_key_text += line
                pub_key_text += pub_footer
            else:
                pub_key_text = f.read()
                pub_key_text = pub_key_text.rstrip()
    except OSError:
        pass

    return pub_key_text


def _convert_ppk_openssh(ppk_file, priv_file):
    """
    Convert ppk to OpenSSH.

    :type ppk_file: string
    :param ppk_file:
         file to convert the ppk format.

    :type priv_file: string
    :param priv_file:
            file to write the OpenSSH format.

    :raise OSError:
            if plink_keygen binary to convert ppk to OpenSSH doesn't exist

    :raise RuntimeError:
            if the command to convert ppk to OpenSSH failed
    """
    plink_keygen = os.path.join(os.environ["SCHRODINGER"],
                                r"utilities\plink-keygen.exe")
    proc = subprocess.Popen([
        plink_keygen, "-b",
        "%d" % GENERATE_KEY_BITS, ppk_file, "-O", "private-openssh", "-o",
        priv_file
    ],
                            stderr=subprocess.PIPE)
    stderr = proc.communicate()[1]
    if proc.returncode:
        raise RuntimeError("Command to convert ppk to OpenSSH failed "
            "with exit code - {}. Error is - {}"\
            .format(proc.returncode, stderr))


def _convert_openssh_ppk(priv_file, ppk_file):
    """
    Convert OpenSSH to ppk.

    :type priv_file: string
    :param priv_file:
            file to convert the OpenSSH format.

    :type ppk_file: string
    :param ppk_file:
         file to write the ppk format.

    :raise OSError:
            if plink_keygen binary to convert OpenSSH to ppk doesn't exist

    :raise RuntimeError:
            if the command to convert OpenSSH to ppk failed
    """
    plink_keygen = os.path.join(os.environ["SCHRODINGER"],
                                r"utilities\plink-keygen.exe")
    proc = subprocess.Popen([
        plink_keygen, "-b",
        "%d" % GENERATE_KEY_BITS, priv_file, "-O", "private", "-o", ppk_file
    ],
                            stderr=subprocess.PIPE)
    stderr = proc.communicate()[1]
    if proc.returncode:
        raise RuntimeError("Command to convert OpenSSH to ppk failed "
            "with exit code - {}. Error is - {}"\
            .format(proc.returncode, stderr))


def _grep(filename, search_string):
    """
    Grep the given search string in provided filename.

    :type filename: string
    :param filename:
            filename to search the given pattern.

    :type search_string: string
    :param search_string:
            Pattern to search in the given filename.

    :returntype: bool
    :return:
            True if search string is found.
            False if search string is not found or given filename doesn't exist.
    """
    if not os.path.isfile(filename):
        return False

    pattern = re.compile(search_string)
    with open(filename) as f:
        for line in f:
            if pattern.search(line):
                return True

    return False


############################################################################


[docs]def find_key_pair(): """ Get the private and public key filename to use. :returntype: tuple :return: A tuple containing private and public key filename. """ home = fileutils.get_directory_path(fileutils.HOME) ssh_identity = os.environ.get("SCHRODINGER_SSH_IDENTITY", "") if ssh_identity: return (ssh_identity, ssh_identity + ".pub") if sys.platform == 'win32': priv_file = os.path.join(home, getpass.getuser() + ".ppk") pub_file = os.path.join(home, getpass.getuser() + ".pub") else: priv_file = os.path.join(home, ".ssh", "id_rsa") pub_file = os.path.join(home, ".ssh", "id_rsa.pub") return (priv_file, pub_file)
[docs]def check_key_pair(priv_file, pub_file): """ Check the given private and public key file to make sure they match. For Windows, the private key file is assumed to be in ppk understandable format. :type priv_file: string :param priv_file: Private key file. :type pub_file: string :param pub_file: Public key file. :returntype: string :return: base64 string containing public part of the key pair, on success empty string otherwise """ pub_key = "" try: if sys.platform == 'win32': ppk_file = priv_file priv_file = ppk_file + "%s.openssh" % os.getpid() _convert_ppk_openssh(ppk_file, priv_file) # Compare public key from private key file and public key file # to make sure they match. k_priv = paramiko.RSAKey.from_private_key_file(priv_file) orig_pub_key = _get_pub_key(pub_file) pub_key_data = orig_pub_key.replace("ssh-rsa ", "") pub_key_data = pub_key_data.split()[0] k_pub = paramiko.RSAKey(data=base64.b64decode(pub_key_data)) if k_priv == k_pub: pub_key = orig_pub_key except: # Catch and ignore cases where the key files don't exist (and possibly # other cases). See PYTHON-3036. pass finally: if sys.platform == 'win32': try: os.unlink(priv_file) except OSError: pass return pub_key
[docs]def configure_local_ssh_setup(priv_file, pub_file): """ Set up SSH key pair on your local machine. The private key is written to the file given. For windows this will be in plink understandable format and for non-windows this will be in OpenSSH format. And the public key is written in <priv_file without extension>.pub in OpenSSH format. :type priv_file: string :param priv_file: Private key file to write. :returntype: string :return: base64 string containing public part of the key pair :raise OSError: if plink_keygen binary to convert OpenSSH to ppk doesn't exist :raise RuntimeError: if the command to convert OpenSSH to ppk failed :raise IOError: if there was an error writing private/public key file :raise SSHException: if the private key file was invalid to get the public key. """ home = os.path.expanduser("~") ssh_dir = os.path.join(home, ".ssh") if not os.path.exists(ssh_dir): os.makedirs(ssh_dir) if sys.platform == "win32": ppk_file = priv_file priv_file = ppk_file + "%s.priv" % os.getpid() prv = paramiko.RSAKey.generate(bits=GENERATE_KEY_BITS) prv.write_private_key_file(priv_file) try: # Convert the openssh private key to putty's ppk format if sys.platform == "win32": _convert_openssh_ppk(priv_file, ppk_file) pub = paramiko.RSAKey(filename=priv_file) pub_key = f"{pub.get_name()} {pub.get_base64()}" with open(pub_file, "w") as f: f.write(pub_key) finally: if sys.platform == "win32": try: os.unlink(priv_file) except OSError: pass if sys.platform != 'win32': with open(pub_file) as f1, \ open(os.path.join(ssh_dir, "authorized_keys"), "a") as f2: if os.fstat(f2.fileno()).st_size: f2.write(os.linesep) shutil.copyfileobj(f1, f2) config_file = os.path.join(ssh_dir, "config") if not _grep(config_file, r"^\s*StrictHostKeyChecking "): with open(config_file, "a") as f: f.write("StrictHostKeyChecking no") f.write(os.linesep) os.chmod(ssh_dir, 0o700) # Remove write permission for group and others. current = stat.S_IMODE(os.lstat(home).st_mode) os.chmod(home, current & ~(stat.S_IWGRP | stat.S_IWOTH)) return pub_key
[docs]def install_remote_pubkey(hostname, user, password, pubkey): """ Setup passwordless ssh to the given remote host. :type hostname: string :param hostname: Setup passwordless ssh to this remote host. :type user: string :param user: Specify the user to log in as on the remote machine. :type password: string :param password: Password to use for logging into the remote machine. :type pubkey: string :param pubkey: Public key to cache in 'authorized_keys' of the remote machine. :raise paramiko.AuthenticationException: if the authentication failed. :raise paramiko.SSHException: if there was an error connecting or establishing an SSH session. :raise RuntimeError: if the command to cached public key at remote host fail or if the command to cache remote host's fingerprint in registry fail (Windows). """ config_cmd = 'mkdir -p ~/.ssh && ' \ 'echo "%s" >> ~/.ssh/authorized_keys && ' \ 'chmod 600 ~/.ssh/authorized_keys && ' \ 'chmod 700 ~/.ssh/' % pubkey ssh = paramiko.SSHClient() ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy()) ssh.connect(hostname, username=user, password=password) stdin, stdout, stderr = ssh.exec_command(config_cmd) exit_status = stdout.channel.recv_exit_status() stdin.flush() stdin.channel.shutdown_write() ssh.close() if exit_status > 0: raise RuntimeError("Remote command to cache public key failed " "with exit status - {0}. Error is - {1}".\ format(exit_status, format('\n'.join(stderr.readlines())))) cache_hostname_plink(user, hostname)
[docs]def hostname_in_registry(hostname): """ If hostname is in cached registry, returns True. Only relevant to call on Windows. :param hostname: name of host referred to in plink :type hostname: str :return: bool """ if sys.platform != "win32": return for keytype in {"rsa2@22:", "ssh-ed25519@22:"}: if has_plink_ssh_host_key(keytype, hostname): return True return False
[docs]def known_hostname(hostname): """ Checks if hostname has already been configured to use Passwordless SSH. :param hostname: name of host :type hostname: str :return: bool """ if sys.platform == "win32": return hostname_in_registry(hostname) else: # Check if hostname is listed in known_hosts file. Empty str will be # returned if no occurences of hostname are found. '-H' specified so # found keys are returned in hashed format. try: return bool( subprocess.check_output(["ssh-keygen", "-H", "-F", hostname])) except subprocess.CalledProcessError: return False