Source code for schrodinger.application.licensing.flexlm

#!/usr/bin/env python
#
# A module to parse and manipulate FLEXlm license files
#
# Author: Tom Pollard
# Created: July 2012
#
# Copyright Schrodinger, LLC - All Rights Reserved.
#

# Configure logging
import logging
import operator
import os
import os.path
import re
import shlex
import socket
import sys
import textwrap
from datetime import date
from datetime import datetime
from datetime import timedelta

fmt = logging.Formatter('%(message)s')

log = logging.getLogger("flexlm")
log.addHandler(logging.NullHandler())

filelog = logging.getLogger("logfile")
filelog.addHandler(logging.NullHandler())

# Global constants
signature_fields = ["SIGN", "SIGN2", "vendor_info"]
date_fields = ("ISSUED", "START")
tag_order = {
    "SERVER": 0,
    "VENDOR": 1,
    "USE_SERVER": 2,
    "PACKAGE": 3,
    "FEATURE": 4,
    "INCREMENT": 5
}

ethernet_pattern = re.compile(r"[\da-f]{8,}", re.IGNORECASE)

ETHERNET_HOSTID_ORDER = 10
IPADDR_HOSTID_ORDER = 20
OPEN_HOSTID_ORDER = 50
DEFAULT_HOSTID_ORDER = 100

# Index numbers for the standard license classes
license_index = {
    "open": 10,
    "nodelocked_perm": 20,
    "nodelocked": 30,
    "server_perm": 40,
    "library_perm": 50,
    "server": 60,
    "library": 70,
    "client": 80
}

implicit_supersede = True
sameday_replacement = True

# Environment
SCHRODINGER = os.getenv('SCHRODINGER', "")

#!!! unused variable?
default_license = os.path.join(SCHRODINGER, "license.txt")

#######################################################################
# Exceptions
#######################################################################


[docs]class IncompatibleLicenseError(Exception): """ This exception is thrown from the merge operations when incompatibilities between the old and new license prevent make merging them impossible. """
[docs]class LicfileError(Exception): """ This exception is thrown by the main code if asked to work with something that isn't an actual license file. It's also thrown by the license-file parsing code when an unparseable line is encountered. """
####################################################################### # Module functions #######################################################################
[docs]def set_logger(new_log): """ Allow a client to specify our logger. """ global log log = new_log
[docs]def set_file_logger(new_file_log): """ Allow a client to specify our file logger. """ global filelog filelog = new_file_log
####################################################################### # Support functions #######################################################################
[docs]def parse_line(license_line, line_num=0): """ Parse a complete FLEXlm license line into it's component parts. Duplicate lines are rejected at this stage. (To detect duplicates, we just consider the digital signature fields.) This method returns an object representing the contents of that line, This will be a Feature, Package, Server, Vendor or UseServer object. """ if license_line.startswith("SERVER"): elem = Server(license_line, line_num) elif license_line.startswith("VENDOR"): elem = Vendor(license_line, line_num) elif license_line.startswith("FEATURE"): elem = Feature(license_line, line_num) elif license_line.startswith("INCREMENT"): elem = Feature(license_line, line_num) elif license_line.startswith("FEATURE") or \ license_line.startswith("INCREMENT"): elem = Feature(license_line, line_num) elif license_line.startswith("PACKAGE"): elem = Package(license_line, line_num) elif license_line.startswith("USE_SERVER"): elem = UseServer(license_line, line_num) else: return None return elem
[docs]def parse_date(date_text): """ Parse a FLEXlm date string, which is in the format "dd-mmm-yyyy". A date object is returned. """ try: d = datetime.strptime(date_text, "%d-%b-%Y") return d.date() except ValueError: raise LicfileError("Unable to parse date: '%s'" % date_text)
[docs]def parse_expiration(exp_text): """ Parse the expiration field of a FEATURE/INCREMENT line. This can either be a standard dd-mm-yyyy date, "0", or "permanent". An expiration of "0" indicates a permanent license. A date object is returned. For a permanent license, the value returned is the maximum representable date. """ if exp_text == "0" or exp_text == "permanent": return date.max else: return parse_date(exp_text)
[docs]def parse_tokens(count_text): """ Parse the token-count field of a FEATURE/INCREMENT line. This can either be a positive interger, to indicate the number of tokens, "0", or "uncounted". A count of "0" indicates an uncounted licese. An integer is returned. For an uncounted license, 0 is returned. """ if count_text == "uncounted": return 0 else: if re.search("[^0-9]", count_text): raise LicfileError("expected a non-negative integer: '%s'" % count_text) return int(count_text)
[docs]def parse_components(components_text): """ Parse the COMPONENTS field of a PACKAGE line. This is a list of features, with an optional version number and token count for each feature. A list of (feature_name, version, count) tuples is returned. """ components = {} features = components_text.split() for feature in features: parts = feature.split(":") name = parts.pop(0) if parts: version = int(parts.pop(0)) else: version = 0 if parts: count = int(parts.pop(0)) else: count = 1 components[name] = (version, count) return components
[docs]def matches_canonical_filename(filename): """ Does the filename match the canonical pattern? <2 digit priority>_<license class>_<date issued>_<optional descriptor>.lic """ filename = os.path.basename(filename) expr = r'^(\d+)_(.*)_(\d\d\d\d-\d\d-\d\d)_?(.*)\.lic' expr = re.compile(expr) match = expr.search(filename) return bool(match)
[docs]def hostid_order(hostid): """ Return a index that can be used to order license lines with the given hostid relative to other similar lines with different hostids. This is used to construct the standard sort key. Typical hostid types used in Schrodinger license are 1. ethernet address 2. IP range 3. ANY 4. none We would like more restricted licenses to be listed first in the license file. """ if hostid == 'ANY' or hostid == "DEMO": return OPEN_HOSTID_ORDER elif hostid.startswith("INTERNET="): return IPADDR_HOSTID_ORDER elif ethernet_pattern.match(hostid): return ETHERNET_HOSTID_ORDER else: return DEFAULT_HOSTID_ORDER
def _std_sortkey(elem): """ A comparison function, used to sort license lines. Lines are grouped into the following blocks: 1. all SERVER lines 2. all VENDOR lines 3. all uncounted licenses, ordered by name 4, the USE_SERVER lne, if any 5. all other non-suite feature/increment lines, by name 6. the GLIDE_SUITE package and feature lines 7. the SUITE package and feature lines Within a block, lines are grouped ... a) by feature name in feature/increment blocks, b) FEATURE before INCREMENT, and c) PACKAGE before FEATURE/INCREMENT, To create this sort order, we construct the sort key (Block number, hostid, name, tag_order, lne number) where tag_order is used to order FEATURE/INCREMENT and PACKAGE/non-PACKAGE lines. The hostid field is used to group lines within a block; more retricted hostids are listed before more open hostids. """ block = 0 suite = 0 name = "" hostid = "" line_num = elem.line_num tag = elem.tag if tag == "SERVER": block = 1 elif tag == "VENDOR": block = 2 elif (tag == "FEATURE" or tag == "INCREMENT") and elem.tokens == 0: block = 3 hostid = elem.hostid elif tag == "USE_SERVER": block = 4 elif elem.name == "GLIDE_SUITE": block = 6 elif elem.name == "SUITE": block = 20 elif "SUITE" in elem.name: suite += 1 block = 6 + suite else: block = 5 if elem.name == "MMLIBS": name = "_MMLIBS" else: name = elem.name return (block, hostid_order(hostid), name, tag_order[elem.tag], line_num) def _remove_superseded_lines(elements): """ Remove the superseded licenses from the given list of license lines. This can be used both for Feature and Package lines. The input list is sorted by ISSUED date, as a side effect. """ elements.sort(key=operator.attrgetter("issued"), reverse=True) # Remove superseded lines by finding most recently issued # line with a SUPERSEDE keyword. All earlier lines for this # feature are discarded. supersede_line = None valid_lines = [] for elem in elements: if not supersede_line: if elem.supersede: supersede_line = elem valid_lines.append(elem) elif not supersede_line.supersedes(elem): valid_lines.append(elem) return valid_lines def _remove_replaced_lines(old_elems, new_elems): """ Remove the elements from old_elems that appear in new_elems with the same issued date. This can be used both for Feature and Package lines. All input elements are assumed to have the same name. The filtered list of old elements is returned. """ valid_lines = [] for elem_old in old_elems: issued = elem_old.issued if issued == date.min: # no ISSUED date valid_lines.append(elem_old) continue if any([e for e in new_elems if e.issued == issued]): continue valid_lines.append(elem_old) return valid_lines def _remove_expired_lines(elements): """ Remove the expired licenses from the given list of license lines. This is really only useful for FEATURE/INCREMENT lines. The input list is sorted by ISSUED date, as a side effect. """ return [elem for elem in elements if not elem.is_expired()] ####################################################################### # Classes #######################################################################
[docs]class License(object): """ A License object represents a single FLEXlm license file. It is composed of objects representing the components of the file, including: 1. One or three SERVER lines [optional] 2. A VENDOR line [optional] 3. A USE_SERVER line [optional] 4. Any number of FEATURE, INCREMENT, and PACKAGE lines. [reqd] License objects are initialized by reading a license file. The contents of that file kept as a list of Server, Vendor, Feature and Package objects, each representing a single line in the license file. """
[docs] def __init__(self, text=""): self.line_num = 0 self.server = {} # servers, hostid --> server obj self.vendor = {} # vendors: name --> vendor obj self.feature = {} # features: name --> list of feature objs self.package = {} # packages: name --> list of package objs self.use_server = None # UseServer obj, if any self.elements = [] # list of license elements, in original order self.element = {} # elements: id --> feature or package object self._init_from_string(text)
def __str__(self): lines = [] for elem in self.elements: lines.append(" \\\n\t".join( textwrap.wrap(str(elem), 64, break_long_words=False))) lines.append("") return "\n".join(lines)
[docs] def write(self, outfile, dryrun=False, backup=True): """ Write the license file to a file named 'outfile' """ if os.path.exists(outfile) and backup: timestamp = datetime.today().strftime("%y%m%d-%H%M") (basename, suffix) = os.path.splitext(outfile) backupfile = "%s-%s%s" % (basename, timestamp, suffix) log.info("Original license file saved as '%s'" % backupfile) if not dryrun: os.rename(outfile, backupfile) if not dryrun: if outfile == "-": log.info("Updated license file written to stdout") print(str(self)) else: log.info("Updated license file written to '%s'" % outfile) with open(outfile, "w") as fp: fp.write(str(self))
[docs] def log_deleted_lines(self, newlic): """ Report in the logfile any lines that exist in this license, but not in newlic. """ deleted_set = set(list(self.element)) - set(list(newlic.element)) if deleted_set: deleted_lines = [self.element[k] for k in deleted_set] deleted_lines.sort(key=operator.attrgetter("line_num")) filelog.info("The following lines were removed:") for elem in deleted_lines: filelog.info("%03d: %s" % (elem.line_num, elem.short())) else: filelog.info("No lines were removed.")
[docs] def canonical_filename(self): """ Return the filename for this license file, according to the our conventions for naming licenses in license directories. License filenames should have the following form: <index>_<classname>_<datestamp>_<optional identifier>.lic where the classname and index are Description Classname Index Wide-open (Free Maestro) open 10 Permanent node-locked nodelocked_perm 20 Node-locked nodelocked 30 Permanent server-based non-library server_perm 40 Permanent library library_perm 50 Server-based non-library server 60 Short-term library library 70 Stub for remote server client 80 * An "open" license file is one that contains no node-locked or counted features. * A "permanent" license file is one that contains only permanent (non-expiring or 10-year) features. * A "node-locked" license file (as the term is used above) contains only uncounted, node-locked licenses. """ lictype = self.license_class() licindex = license_index[lictype] issued = self.last_issued() if issued == date.min: issued = date.today() server = shorthost(self.server_host()) if server != "": name = "%02d_%s_%s_%s.lic" % (licindex, lictype, issued, server) else: name = "%02d_%s_%s.lic" % (licindex, lictype, issued) return name
[docs] def license_class(self): """ Returns the name of the license-file class for this file, based on the contents of the license file. """ if self.is_permanent_library(): return "library_perm" elif self.is_library(): return "library" if self.is_permanent_counted(): return "server_perm" elif self.is_counted(): return "server" elif self.is_stub(): return "client" if self.is_open(): return "open" if self.is_permanent_nodelocked(): return "nodelocked_perm" else: return "nodelocked"
[docs] def license_description(self): """ Returns a user-friendly description of the license-file class for this file, based on the contents of the license file. """ if self.is_permanent_library(): return "Permanent token library" elif self.is_library(): return "Token library" if self.is_permanent_counted(): return "Permanent server-based license" elif self.is_stub(): return "Server-based license (client)" elif self.is_counted(): return "Server-based license" if self.is_open(): return "Open license" if self.is_permanent_nodelocked(): return "Permanent node-locked license" else: return "Node-locked license"
[docs] def is_permanent_nodelocked(self): """ Returns true if the license file contains uncounted, node-locked, permanent licenses, and no short-term uncounted, node-locked licenses. """ nodelocked = False for name in self.feature: for feat in self.feature.get(name, []): if feat.is_nodelocked() and not feat.is_counted(): if not feat.is_permanent(): return False nodelocked = True return nodelocked
[docs] def is_permanent_counted(self): """ Returns true if the license file contains permanent counted licenses, and no short-term counted licenses. """ counted = False for name in self.feature: for feat in self.feature.get(name, []): if feat.is_counted(): if not feat.is_permanent(): return False counted = True return counted
[docs] def is_permanent_library(self): """ Returns true if the license file contains a permanent token library, and no short-term token library. """ library = False for name in self.package: if name == "GLIDE_SUITE": continue for pkg in self.package.get(name, []): for feat in self.feature.get(name, []): if not feat.is_permanent(): return False library = True return library
[docs] def is_library(self): """ Returns true if the license file contains a token library. """ for name in self.package: if name == "GLIDE_SUITE": continue for pkg in self.package.get(name, []): if pkg.suite: return True return False
[docs] def is_counted(self): """ Returns true if this license file includes any counted licenses, which requires a license server. """ for name in self.feature: for feat in self.feature.get(name, []): if feat.is_counted(): return True return False
[docs] def is_stub(self): """ Returns true if this license file is a stub license, that merely opints to a server but doesn't include FEATURE/INCREMENT lines. """ return len(self.server) > 0 and len(self.feature) == 0
[docs] def is_open(self): """ Returns true if this license file can be installed and used on any machine and allows unlimited use of all tokens. """ for name in self.feature: for feat in self.feature.get(name, []): if feat.is_nodelocked() or feat.is_counted(): return False if len(self.server) > 0: return False return True
[docs] def last_issued(self): """ Return the most recent issued date among the licenses in this file. """ issued = date.min for name in self.feature: for feat in self.feature.get(name, []): if feat.issued > issued: issued = feat.issued for name in self.package: for pkg in self.package.get(name, []): if pkg.issued > issued: issued = pkg.issued return issued
[docs] def empty(self): """ Return True if this license contains no features and no SERVER lines. """ return len(self.server) == 0 and len(self.feature) == 0
[docs] def need_server(self): """ Return True if this license file includes any counted features. """ for name in self.feature: for feat in self.feature.get(name, []): if feat.tokens: return True return False
[docs] def server_hostport(self): """ Return the port@host address for the license server, if this license is for a license server. """ if len(self.server) > 0: return list(self.server.values())[0].hostport() else: return ""
[docs] def server_host(self): """ Return the hostname for the license server, if this license is for a license server. """ if len(self.server) > 0: return list(self.server.values())[0].hostname() else: return ""
[docs] def redundant_server(self): """ Return True if this license specifies a three-machine redundant license server. """ return len(self.server) == 3
[docs] def sort(self): """ Returns a new License object, sorted in the standard order. """ sorted_lines = self.elements sorted_lines.sort(key=_std_sortkey) return License("\n".join([elem.text for elem in sorted_lines]))
# Signatures
[docs] def add_signatures(self, signatures): """ Add the given vendor_info signatures to the license. This change is made in place. A LicfileError is raised if each signature cannot be assigned to a unique license line. """ matched = {} for (licline, signature) in signatures.items(): elems = [parse_line(x) for x in licline.splitlines()] feat = elems[0] match = None for elem in self.feature[feat.name]: if elem.matches(feat): if elem in matched: raise LicfileError( "Multiple matches for license line:\n" + str(elem)) if match: raise LicfileError("Multiple matches for signature:\n" + licline) matched[elem] = licline match = elem elem.add_signature(signature) if not match: raise LicfileError("No match for signature:\n" + licline)
[docs] def unsign(self): """ Strip signatures from the lines of this License object. This change is made in place. """ for elem in self.elements: elem.unsign()
# License cleanup
[docs] def cleanup(self): """ Remove expired and superseded elements from this license. SERVER and VENDOR lines are unaffected by this. Returns a new License object, with the invalid lines removed, and the remaining lines sorted in the standard order. """ valid_lines = [] # elements of the cleaned-up license file valid_lines.extend(list(self.server.values())) valid_lines.extend(list(self.vendor.values())) if self.use_server: valid_lines.extend(self.use_server) valid_feature_lines = self._cleanup_features() valid_lines.extend(valid_feature_lines) valid_features = set([f.name for f in valid_feature_lines]) valid_lines.extend(self._cleanup_packages(valid_features)) valid_lines.sort(key=_std_sortkey) new_license = License("\n".join([elem.text for elem in valid_lines])) new_license._set_suite_sort_index() self.log_deleted_lines(new_license) return new_license
def _cleanup_packages(self, valid_features): """ Remove superseded and redundant PACKAGE lines. Remove PACKAGE lines with no valid enabling feature. This uses the list of valid feature names passed in as "valid_features". Note that PACKAGE lines don't have expiration dates. Also, only one PACKAGE line for a given package name is used. Currently, only the most recent valid PACKAGE line is kept. ?? Is this right, or should selection be by the order in ?? the original file, instead. Returns a list of the valid Package elements. """ valid_lines = [] # elements of the cleaned-up license file for name in self.package: if name not in valid_features: continue # Sort all PACKAGE lines for this package by ISSUED date all_pkgs = self.package.get(name, []) pkgs = _remove_superseded_lines(all_pkgs) if pkgs: valid_lines.append(pkgs[0]) return valid_lines def _cleanup_features(self): """ Remove expired and superseded FEATURE and INCREMENT lines from this license. Note that superseded lines must be removed before expiration dates are checked. Returns a list of the valid Feature elements. """ valid_lines = [] # elements of the cleaned-up license file for name in self.feature: all_feats = self.feature.get(name, []) feats = _remove_superseded_lines(all_feats) feats = _remove_expired_lines(feats) valid_lines.extend(feats) return valid_lines # License merging
[docs] def merge(self, oldlic): """ Merge this license with an older license. User-modifiable settings from the older license will be retained. Returns a new License object with the merged lines sorted in the standard order. """ merged = [] # elements of the merged license file merged.extend(self._merge_servers(oldlic)) merged.extend(self._merge_vendors(oldlic)) merged.extend(self._merge_packages(oldlic)) merged.extend(self._merge_features(oldlic)) if self.use_server: merged.append(self.use_server) elif oldlic.use_server: merged.append(oldlic.use_server) merged_license = License("\n".join([elem.text for elem in merged])) final_merged_license = merged_license.cleanup() oldlic.log_deleted_lines(final_merged_license) return final_merged_license
def _merge_servers(self, oldlic): """ Merge the SERVER lines for this license with those of an older license. Returns the list of Server objects for the merged file. """ # Check that the number of servers match if self.need_server() and oldlic.need_server(): # Check that the number of servers match if self.redundant_server() != oldlic.redundant_server(): raise IncompatibleLicenseError( "incompatible SERVER configurations in merge.") # Hostids on all servers must match. for hostid in self.server: if hostid not in oldlic.server: raise IncompatibleLicenseError( "SERVER hostids differ in old and new licenses.") if oldlic.server: # Copy SERVER lines from old license, if any. return [elem for elem in oldlic.elements if elem.tag == "SERVER"] else: # ... else take the SERVER lines from the new license. return [elem for elem in self.elements if elem.tag == "SERVER"] def _merge_vendors(self, oldlic): """ Merge the VENDOR lines for this license with those of an older license. Returns the merged list of Vendor objects. """ merged = [] # Copy VENDOR lines from old license. for name in self.vendor: if name not in oldlic.vendor: merged.append(self.vendor[name]) # Include any new VENDOR lines from new license for name in oldlic.vendor: merged.append(oldlic.vendor[name]) return merged def _merge_packages(self, oldlic): """ Merge the PACKAGE lines for this license with those of an older license. Returns the merged list of Package objects. """ merged = [] pkg_names = set(self.package) | set(oldlic.package) for name in pkg_names: new_pkgs = self.package.get(name, []) old_pkgs = oldlic.package.get(name, []) if sameday_replacement: old_pkgs = _remove_replaced_lines(old_pkgs, new_pkgs) pkgs = _remove_superseded_lines(old_pkgs + new_pkgs) if pkgs: merged.append(pkgs[0]) return merged def _merge_features(self, oldlic): """ Merge the FEATURE/INCREMENT lines for this license with those of an older license. Returns the merged list of Feature objects. """ merged = [] feat_names = set(self.feature) | set(oldlic.feature) for name in feat_names: new_feats = self.feature.get(name, []) old_feats = oldlic.feature.get(name, []) if sameday_replacement: old_feats = _remove_replaced_lines(old_feats, new_feats) feats = _remove_superseded_lines(old_feats + new_feats) feats = _remove_expired_lines(feats) merged.extend(feats) return merged def _set_suite_sort_index(self): """ General SUITE tokens should be given an explicit sort index to make sure they're not used unless more restricted license have already been exhausted. TODO: this should be a cleanup method; it can be used on the merged License object resulting from a merge. """ pkg_names = list(self.package) if "SUITE" in pkg_names and len(pkg_names) > 1: for feat in self.feature.get("SUITE", []): if "sort" not in feat.field: feat.set_sort_index("200") filelog.info("Added sort index '200' to %s %s line" % (feat.name, feat.tag)) # License validation
[docs] def validate(self): """ Check that the lines in this license file are all valid, and report any errors found to stdput. WARNING: The validation done here is not comprehensive. There are at least a few errors we could be checking for, but aren't. """ err = self._validate_servers() err += self._validate_vendors() err += self._validate_features() err += self._validate_packages() return err
def _validate_servers(self): err = 0 servers = list(self.server.values()) feats = list(self.feature.values()) # Check for redundant SERVER lines for serv in servers: if serv != self.server[serv.hostid]: log.info("\nWARNING: second SERVER line for hostid %s" % serv.hostid) log.info("--> %s" % serv.text) log.info("--> This line will be ignored.") err += 1 # Check that the number of SERVER lines is valid count = len(servers) if count > 1 and count != 3: log.error("\nERROR: There are %d SERVER lines." % count) for serv in servers: log.info("-->", serv.text) log.info("--> There should only be either 1 or 3 servers.") err += 1 return err def _validate_vendors(self): return 0 def _validate_features(self): err = 0 for name in sorted(list(self.feature)): err += self._validate_feature(name) return err def _validate_feature(self, name): err = 0 # Check for multiple FEATURE lines feat_lines = [f for f in self.feature[name] if f.tag == "FEATURE"] if len(feat_lines) > 1: log.info("\nWARNING: There are multiple FEATURE lines for '%s'." % name) for feat in feat_lines: log.info("--> %s" % feat.short()) log.info("--> Only the first FEATURE line will be recognized.") err += 1 # Check for out-of-order FEATURE line feat1 = self.feature[name][0] if feat_lines and feat1.tag != "FEATURE": log.info( "\nWARNING: INCREMENT line precedes FEATURE line for '%s'" % name) log.info("--> %s" % feat1.short()) log.info("--> %s" % feat_lines[0].short()) err += 1 # Check for expired lines for feat in self.feature[name]: if feat.expiration < date.today(): log.info("\nWARNING: %s line for %s expired on %s" % (feat.tag, feat.name, feat.expiration_str())) log.info("--> %s" % feat.short()) # Check for superseded lines feats = self.feature[name] feats.sort(key=operator.attrgetter("issued"), reverse=True) super_feat = None for feat in feats: if super_feat and super_feat.supersedes(feat): log.info("\nWARNING: %s line for %s is superseded." % (feat.tag, feat.name)) log.info("--> %s" % feat) err += 1 elif not super_feat and feat.supersede: super_feat = feat return err def _validate_packages(self): err = 0 # Validate each PACKAGE line pkg_names = list(self.package) for name in pkg_names: err += self._validate_package(name) # Check that SUITE feature has sort=200 if GLIDE_SUITE also present if "SUITE" in pkg_names and "GLIDE_SUITE" in pkg_names: if "SUITE" in self.feature: for feat in self.feature["SUITE"]: if int(feat.field["sort"]) < 200: log.info( "\nWARNING: sort index in 'SUITE' %s line < 200", feat.tag) err += 1 return err def _validate_package(self, name): err = 0 for pkg in self.package[name]: # Check for matching FEATURE line if name not in self.feature: log.info("\nERROR: PACKAGE %s has no matching FEATURE line" % name) err += 1 # Check for superseded lines pkgs = self.package[name] pkgs.sort(key=operator.attrgetter("issued"), reverse=True) super_pkg = None for pkg in pkgs: if super_pkg and super_pkg.supersedes(pkg): log.info("\nWARNING: PACKAGE line for %s is superseded." % pkg.name) log.info("-->", pkg) err += 1 elif not super_pkg and pkg.supersede: super_pkg = pkg return err # License parsing def _init_from_string(self, text): """ Parse the text of a license file. """ line_parts = [] comments = [] lines = text.split("\n") line_count = 0 log.debug("\nCreate new license object from license file") for line in lines: line_count += 1 if not line_parts: line_num = line_count line = line.strip() if line.startswith("#"): comments.append(line) continue elif line.endswith("\\"): line_parts.append(line.rstrip("\\").rstrip()) continue elif line == "" and not line_parts: continue # We have a complete line line_parts.append(line) full_line = " ".join(line_parts) log.debug("Line %d: %s" % (line_num, full_line)) line_parts = [] comments = [] elem = parse_line(full_line, line_num) if not elem: log.debug("Unrecognized license on line %d:\n%s" % (line_num, full_line)) elif not self._add_element(elem): log.debug("Rejected duplicate license on line %d:\n%s" % (line_num, full_line)) def _add_element(self, elem): """ Add the given "element" (LicenseLine object) to this License object. Duplicate lines are rejected. Each element is also added to the appropriate dict for its type. These are used to find features and packages by name. Return true if the element was added to the License; Return false if it was rejected as a duplicate line. """ elemkey = elem._elemkey() if elemkey in self.element: return False self.elements.append(elem) self.element[elemkey] = elem if elem.tag == "SERVER": self.server[elem.hostid] = elem elif elem.tag == "VENDOR": self.vendor[elem.vendor] = elem elif elem.tag == "USE_SERVER": self.use_server = elem elif elem.tag == "FEATURE" or elem.tag == "INCREMENT": if elem.name in self.feature: self.feature[elem.name].append(elem) else: self.feature[elem.name] = [elem] elif elem.tag == "PACKAGE": if elem.name in self.package: self.package[elem.name].append(elem) else: self.package[elem.name] = [elem] return True
[docs]class LicenseLine(object): """ A LicenseLine object represents a single line of a FLEXlm license file. This is the base class for the Server, Vendor, Feature, and Package classes. """
[docs] def __init__(self, tag="", line=None, line_num=0): self.tag = tag self.text = line self.line_num = line_num self.name = "" self.issued = None self.hostid = ""
def __str__(self): return self.text
[docs] def string(self): return self.text
[docs] def short(self): return self.text
[docs] def unsign(self): pass
[docs] def print_fields(self): print(self.tag)
def _elemkey(self): """ Return the unique identifier for this line, used for recognizing duplicate lines. Different identifiers are used for different types of lines: for SERVER lines, the hostid is used; for VENDOR lines, the vendor name is used; for FEATURE, INCREMENT, and PACKAGE lines, the signature is used. """ return self.text
[docs] def is_expired(self, testdate=None): """ Test whether the license line has expired. License lines without an expration date always return False. If a testdate is supplied, it is compared to the expiration date, otherwise the current date is used. """ try: if testdate: return self.expiration < testdate else: return self.expiration < date.today() except AttributeError: return False
[docs]class Server(LicenseLine): """ Server objects represent SERVER lines in the license file. The format of the SERVER line is: SERVER host hostid [port] [PRIMARY_IS_MASTER] [HEARTBEAT_INTERVAL=seconds] """
[docs] def __init__(self, line=None, line_num=0): super(Server, self).__init__("SERVER", line, line_num) self.host = "" # hostname (user setting) self.hostid = "" # hostid, (encrypted, KEY) self.port = 0 # port number (user setting) if line: self._parse_line(line)
def _elemkey(self): return (self.tag, self.hostid)
[docs] def print_fields(self): print("SERVER") print(" host =", self.host) print(" hostid =", self.hostid) if self.port: print(" port =", self.port)
[docs] def hostname(self): """ Return the hostname for this server. """ if self.host == "this_host": return socket.gethostname() return self.host
[docs] def hostport(self): """ Return the port@host address for this server. """ if self.port: return "@".join((str(self.port), self.hostname())) else: return "@" + self.hostname()
def _parse_line(self, line): self.text = line parts = line.split() if parts[0] != "SERVER": raise LicfileError("Expected SERVER line, got\n" + line) self.tag = parts[0] try: self.host = parts[1].lower() self.hostid = parts[2].lower() if len(parts) > 3: self.port = int(parts[3]) except IndexError: pass
[docs] def is_nodelocked(self): """ Returns true if this server is node-locked. """ return self.hostid != "ANY" and self.hostid != "DEMO"
[docs]class Vendor(LicenseLine): """ Vendor objects represent VENDOR lines in the license file. The format of the VENDOR line is: VENDOR vendor [vendor_daemon_path] [[OPTIONS=]options_file_path] [[PORT=]port] """
[docs] def __init__(self, line=None, line_num=0): super(Vendor, self).__init__("VENDOR", line, line_num) self.vendor = "" self.path = "" self.field = {} if line: self._parse_line(line)
[docs] def print_fields(self): print(self.tag) print(" vendor =", self.vendor) for key in self.field: print(" %s = %s" % (key, self.field[key]))
[docs] def string(self): parts = [self.tag, self.vendor] if self.options(): parts.append("OPTIONS=%s" % self.options()) if self.port(): parts.append("PORT=%s" % self.port())
# Accessors
[docs] def vendor(self): return self.vendor
[docs] def pathname(self): return self.path
[docs] def optionfile(self): return self.field.get("OPTIONS", "")
[docs] def port(self): return self.field.get("PORT", 0)
# Parse license line def _parse_line(self, line): self.text = line parts = line.split() if parts[0] != "VENDOR": raise LicfileError("Expected VENDOR line, got\n" + line) # required fields self.tag = parts.pop(0) self.vendor = parts.pop(0) # optional positional fields if len(parts) and "=" not in parts[0]: self.path = parts.pop(0) if len(parts) and "=" not in parts[0]: self.field['OPTIONS'] = parts.pop(0) if len(parts) and "=" not in parts[0]: self.field['PORT'] = int(parts.pop(0)) # key=value fields for setting in parts: if '=' in setting: (key, value) = setting.split('=', 1) else: key = setting value = True if key == "PORT": self.field[key] = int(value) else: self.field[key] = value self.name = self.vendor
[docs]class UseServer(LicenseLine): """ UseServer objects represent USE_SERVER lines in the license file. The format of the USE_SERVER line is: USE_SERVER """
[docs] def __init__(self, line=None, line_num=0): super(UseServer, self).__init__("USE_SERVER", line, line_num)
[docs]class Feature(LicenseLine): """ Feature objects represent FEATURE or INCREMENT lines in the license file. The format of the FEATURE line is: FEATURE """
[docs] def __init__(self, line=None, line_num=0): super(Feature, self).__init__("FEATURE", line, line_num) self.name = "" self.vendor = "" self.version = 0 self.expiration = date.min self.tokens = 0 self.hostid = "" self.issued = date.min self.start = date.min self.supersede = [] self.field = {} if line: self._parse_line(line)
def _elemkey(self): return self.signature()
[docs] def short(self): """ Return short string representation (no signatures) """ supersede_str = "SUPERSEDE" if self.supersede else "" parts = [ self.tag, self.name, self.vendor, str(self.version), self.expiration_str(), self.tokens_str(), self.hostid, self.issued_str(), self.start_str(), supersede_str ] return " ".join(parts)
[docs] def print_fields(self): print(self.tag) print(" name =", self.name) print(" vendor =", self.vendor) print(" version =", self.version) print(" expires =", self.expiration_str()) print(" count =", self.tokens_str()) for key in list(self.field): print(" %s = %s" % (key, self.field[key]))
[docs] def matches(self, feat): """ Does the given feature match this one? This is used for assigning RSA signatures to licenses. It currently compares only the SIGN and SIGN fields, and so can only be used for signed licenses. Assumes that it's not possible for SIGN fields to match unless SIGN2 fields also match. It doesn't insist on both SIGN and SIGN2 being present, so it'll continue to work if we manage to eliminate SIGN2. """ mysign = self.field.get("SIGN", "0") featsign = feat.field.get("SIGN", "0") if mysign != "0" and featsign != "0": return (mysign == featsign) mysign2 = self.field.get("SIGN2", "0") featsign2 = feat.field.get("SIGN2", "0") if mysign2 != "0" and featsign2 != "0": return (mysign2 == featsign2) raise LicfileError("Cannot match unsigned license lines")
[docs] def alt_matches(self, feat): """ Does the given feature match this one? This is used for assigning RSA signatures to licenses. This version compares the details of each license line. """ match = ((self.tag == feat.tag) and (self.vendor == feat.vendor) and (self.version == feat.version) and (self.expiration == feat.expiration) and (self.tokens == feat.tokens) and (self.issued == feat.issued)) return match
[docs] def tokens_str(self): """ Return the standard string representation of the token count. """ if self.tokens: return str(self.tokens) else: return "uncounted"
[docs] def expiration_str(self): """ Return the standard string representation of the expiration date. """ if self.expiration == date.max: return "permanent" else: return self.expiration.strftime("%d-%b-%Y")
[docs] def issued_str(self): """ Return the standard string representation of the issued date. """ if self.issued > date.min: return "ISSUED=" + self.issued.strftime("%d-%b-%Y") else: return ""
[docs] def start_str(self): """ Return the standard string representation of the start date. """ if self.start > date.min: return "START=" + self.start.strftime("%d-%b-%Y") else: return ""
[docs] def is_nodelocked(self): """ Returns true if this license can be used on any host. """ return self.hostid != "ANY" and self.hostid != "DEMO"
[docs] def is_counted(self): """ Returns true if only a fixed number of tokens are provided. """ return self.tokens > 0
[docs] def is_permanent(self): """ Returns true if this license has no expiration date or if it appears to have been issued as a 10-year license. """ if self.expiration == date.max: return True if self.issued > date.min: term = self.expiration - self.issued else: term = self.expiration - date.today() return term > timedelta(365 * 9)
[docs] def is_signed(self): """ Does this license line have a signature? """ for key in signature_fields: value = self.field.get(key, "") if value != "" and value != "0": return True return False
[docs] def signature(self): """ Return a string representing the signatures for this license line. This is used for recognizing duplicate lines. When no signatures are present, the full text of the line is used. """ if self.is_signed(): return ", ".join([self.field.get(k, "") for k in signature_fields]) else: return self.text
[docs] def unsign(self): for key in signature_fields: if key in self.field: if key == "vendor_info": del self.field[key] setting = "" else: self.field[key] = "0" setting = " %s=0" % key self.text = re.sub(r" %s=(\"[^\"]+\"|\S+)" % key, setting, self.text)
[docs] def supersedes(self, feat): """ Does this FEATURE/INCREMENT line supersede the given line? """ return feat.name in self.supersede and self.issued > feat.issued
[docs] def sort_index(self): """ Get the "sort" field for this line, as an integer. """ return int(self.field.get("sort", "100"))
[docs] def add_signature(self, signature): """ Add the given vendor_info signature to this line. If this line doesn't yet have a 'vendor_info' field, one is added to both the text attribute and the fields dict. """ (key, value) = signature.split('=', 1) self.field["vendor_info"] = value if " vendor_info=" in self.text: self.text = re.sub(r"vendor_info=\"[^\"]*\"", signature, self.text) else: self.text += " " + signature
[docs] def set_sort_index(self, sort_index): """ Set the "sort" field for this line to the specified value. If this line doesn't yet have a 'sort' value, the field is added to both the text attribute and the fields dict. """ self.field["sort"] = str(sort_index) setting = " sort=" + self.field["sort"] if " sort=" in self.text: self.text = re.sub(r" sort=\d+", setting, self.text) else: self.text += setting
def _parse_line(self, line): """ Initialize this object using the information from the given FEATURE or INCREMENT line. NOTE: FEATURE/INCREMENT lines for a package's enabling feature will have the corresponding PACKAGE line appended to them in the signature-generation output from licsign. This needs to be stripped off so the feature info can be parsed properly. """ if "PACKAGE " in line: self.text = re.sub(r"PACKAGE.*", "", line) else: self.text = line parts = shlex.split(self.text) if parts[0] != "FEATURE" and parts[0] != "INCREMENT": raise LicfileError("Expected FEATURE/INCREMENT line, got\n" + self.text) if len(parts) < 6: raise LicfileError("Incomplete FEATURE/INCREMENT line: \n" + self.text) self.tag = parts.pop(0) self.name = parts.pop(0) self.vendor = parts.pop(0) self.version = int(parts.pop(0)) self.expiration = parse_expiration(parts.pop(0)) self.tokens = parse_tokens(parts.pop(0)) for setting in parts: if '=' in setting: (key, value) = setting.split('=', 1) else: key = setting value = True if key in date_fields: self.field[key] = parse_date(value) else: self.field[key] = value if "HOSTID" in self.field: self.hostid = self.field["HOSTID"].replace(".-1", ".*") self.field["HOSTID"] = self.hostid if "ISSUED" in self.field: self.issued = self.field["ISSUED"] if "START" in self.field: self.start = self.field["START"] if "SUITE" in self.name and implicit_supersede: self.supersede = [self.name] if "SUPERSEDE" in self.field: try: self.supersede = self.field["SUPERSEDE"].split() except AttributeError: self.supersede = [self.name]
[docs]class Package(LicenseLine): """ A Package object represents a single PACKAGE line in the license file. The format of the PACKAGE line is: PACKAGE package vendor [pkg_version] COMPONENTS=pkg_list \ [OPTIONS=SUITE] [SUPERSEDE[="p1 p2 ..."] ISSUED=date] SIGN="<...>" """
[docs] def __init__(self, line=None, line_num=0): super(Package, self).__init__("PACKAGE", line, line_num) self.name = "" self.vendor = "" self.version = 0 self.field = {} self.issued = date.min self.supersede = [] self.suite = False self.components = {} self.options = "" if line: self._parse_line(line)
[docs] def print_fields(self): print(self.tag) print(" name =", self.name) print(" vendor =", self.vendor) if self.version: print(" version =", self.version) print(" suite =", self.suite) print(" components =") for name in sorted(list(self.components)): (version, tokens) = self.components[name] print(" %-16s version:%d tokens:%d" % (name, version, tokens)) for key in self.field: if key != "COMPONENTS": print(" %s = %s" % (key, self.field[key]))
[docs] def short(self): """ Return short string representation (no signatures) """ version_str = str(self.version) if self.version else "" suite_str = "OPTIONS=SUITE" if self.suite else "" supersede_str = "SUPERSEDE" if self.supersede else "" comp_list = [] for name in sorted(list(self.components)): (version, tokens) = self.components[name] if tokens > 1: comp_list.append("%s:%d:%d" % (name, version, tokens)) elif version > 0: comp_list.append("%s:%d" % (name, version)) else: comp_list.append(name) parts = [ self.tag, self.name, self.vendor, version_str, self.issued_str(), supersede_str, suite_str, "COMPONENTS=\"%s\"" % " ".join(comp_list) ] return " ".join(parts)
[docs] def issued_str(self): """ Return the standard string representation of the issued date. """ if self.issued > date.min: return "ISSUED=" + self.issued.strftime("%d-%b-%Y") else: return ""
[docs] def supersedes(self, pkg): """ Does this PACKAGE line supersede the given line? """ return pkg.name in self.supersede and self.issued > pkg.issued
def _elemkey(self): return self.signature()
[docs] def is_signed(self): """ Does this license line have a signature? """ for key in signature_fields: value = self.field.get(key, "") if value != "" and value != "0": return True return False
[docs] def signature(self): """ Return a string representing the signatures for this license line. This is used for recognizing duplicate lines. When no signatures are present, the full text of the line is used. """ if self.is_signed(): return ", ".join([self.field.get(k, "") for k in signature_fields]) else: return self.text
[docs] def unsign(self): for key in signature_fields: if key in self.field: if key == "vendor_info": del self.field[key] setting = "" else: self.field[key] = "0" setting = " %s=0" % key self.text = re.sub(r" %s=(\"[^\"]+\"|\S+)" % key, setting, self.text)
def _parse_line(self, line): self.text = line parts = shlex.split(line) if parts[0] != "PACKAGE": raise LicfileError("Expected PACKAGE line, got\n" + line) self.tag = parts[0] self.name = parts[1] self.vendor = parts[2] if parts and "=" not in parts[3]: try: self.version = int(parts[3]) except ValueError: self.version = 0 for setting in parts[3:]: if '=' in setting: (key, value) = setting.split('=', 1) else: key = setting value = True if key in date_fields: self.field[key] = parse_date(value) else: self.field[key] = value if "COMPONENTS" in self.field: self.components = parse_components(self.field["COMPONENTS"]) if "OPTIONS" in self.field: self.suite = "SUITE" in self.field["OPTIONS"] if "ISSUED" in self.field: self.issued = self.field["ISSUED"] if "SUITE" in self.name and implicit_supersede: self.supersede = [self.name] if "SUPERSEDE" in self.field: try: self.supersede = self.field["SUPERSEDE"].split() except AttributeError: self.supersede = [self.name]
####################################################################### # Other functions #######################################################################
[docs]def find_license_file(): """ Get path to the file defining the license and the actual license file. This is the order of precedence: 1) $SCHROD_LICENSE_FILE 2) $LM_LICENSE_FILE 3) $SCHRODINGER_LICENSE_FALLBACK 4) $SCHRODINGER_LICENSE 5) $SCHRODINGER/license 6) /Library/Application Support/Schrodinger/license (for MacOSX) Returns the tuple (license_file, source) """ lic_file = "" search_path = os.getenv('SCHROD_LICENSE_FILE') defined_by = '$SCHROD_LICENSE_FILE' if not search_path: search_path = os.getenv('LM_LICENSE_FILE') lic_file = "" defined_by = '$LM_LICENSE_FILE' if not search_path: # use default license file if "SCHRODINGER_LICENSE_FALLBACK" in os.environ: lic_file = os.getenv('SCHRODINGER_LICENSE_FALLBACK') defined_by = '$SCHRODINGER_LICENSE_FALLBACK' elif "SCHRODINGER_LICENSE" in os.environ: lic_file = os.getenv('SCHRODINGER_LICENSE') defined_by = '$SCHRODINGER_LICENSE' else: lic_file = os.path.join(SCHRODINGER, 'license') defined_by = '$SCHRODINGER/license' if not os.path.exists(lic_file): lic_file = os.path.join(SCHRODINGER, 'license.txt') defined_by = '$SCHRODINGER/license.txt' if not os.path.exists(lic_file) and sys.platform == 'darwin': lic_file = os.path.join( '/Library/Application Support/Schrodinger', 'license') defined_by = '/Library' if not os.path.exists(lic_file): lic_file = os.path.join( '/Library/Application Support/Schrodinger', 'license.txt') defined_by = '/Library' if not os.path.exists(lic_file): lic_file = "" defined_by = "" search_path = lic_file log.debug('License file defined by %s' % defined_by) log.debug('License search path: %s' % search_path) log.debug('Initial license file path: %s' % lic_file) if search_path and not lic_file: lic_file = search_path.split(os.pathsep)[0] if os.path.exists(lic_file): lic_file = get_linked_path(lic_file) log.debug('License file path: %s' % lic_file) return (lic_file, defined_by)
[docs]def is_server(licfile): """ Returns True if the given "license file" is actually a reference to a license server. """ return '@' in licfile
[docs]def get_linked_path(path): """ Returns the absolute path for `path`. If it is a symlink the absolute path for the linked file is returned. """ if os.path.islink(path): link = os.path.normpath(os.readlink(path)) if not os.path.isabs(link): link = os.path.join(os.path.dirname(path), link) return link else: return os.path.abspath(path)
[docs]def shorthost(hostname): """ Return the short hostname for the given hostname, without the domain name. If hostname is an IP address, it is returned unmodified. """ if re.match(r"\d+\.\d+\.\d+\.\d+$", hostname): return hostname return hostname.split(".", 1)[0]
[docs]def read_license_file(license_file): """ Read the given license file. A License object is returned if the given file was an actual FLEXlm license file. Otherwise, a LicfileError exception is raised. """ if os.path.exists(license_file): lic_text = open(license_file).read() lic = License(lic_text) return lic if is_server(license_file): raise LicfileError("Not a license file: '%s'" % license_file) else: raise LicfileError("License file missing: '%s'" % license_file)