Source code for schrodinger.utils.sea.sea

"""
Functionality for "ark" file format handling.

'sea' stands for Schrodinger Enhanced Ark.

The syntax of the ark format can be found elsewhere (e.g., Desmond
User Manual).

Below is an example, demonstrating the most basic usage of this module.

Say we have a file called 'config.cfg' with the following content::

    ------start of the content------
    fep = {
       lambda   = "default:12"
       i_window = ?
       output   = {
          name     = "$JOBNAME$[_replica$REPLICA$].dE"
          first    = 0.0
          interval = 1.2
       }
    }
    temperature = [300.0 310.0 320.0]
    ------end of the content------

We can parse it with a code like this::

    import schrodinger.utils.sea as sea

    cfg = sea.Map( open( "config.cfg", "r" ).read() )

In fact, the code does much more than merely parsing the content. But the
important thing here is that at the end we get an 'Map' object that enables
us to easily manipulate settings for the file as shown below::

    assert( cfg.fep.lambda_    .val == "default:12" )
    assert( cfg.fep.i_window   .val == None )
    assert( cfg.output.first   .val == 0.0 )
    assert( cfg.output.interval.val == 1.2 )
    assert( cfg.output.name.raw_val == "$JOBNAME$[_replica$REPLICA$].dE" )
    assert( cfg.temperature[0].val  == 300.0 )
    assert( cfg.temperature[1].val  == 310.0 )
    assert( cfg.temperature[2].val  == 320.0 )
    assert( cfg.temperature   .val  == [300.0, 310.0, 320.0]

    # Another way to access the data:
    assert( cfg["fep"]["lambda"  ].val == "default:12" )
    assert( cfg["fep"]["i_window"].val == None )

    cfg.output.first.val = 1.0
    assert( cfg.output.first.val == 1.0 )

    cfg.output.i_window.val = 1
    assert( cfg.output.first.val == 1 )

    cfg.fep.lambda_.val = "a different string"
    assert( cfg.fep.lambda_.val == "a different string" )

    cfg.temperature[0].val = 20.0
    assert( cfg.temperature[0].val == 20.0 )

    # Adds a new key-value pair.
    cfg["new_key"] = 1

    # Deletes an existing key-value pair.
    del cfg.fep["interval"]

    print str( cfg )
    #Result:
    #fep = {
    #   i_window = 1
    #   lambda = "a different string"
    #   new_key = 1
    #   output = {
    #      name = "$JOBNAME$[_replica$REPLICA$].dE"
    #      first = 1.0
    #   }
    #}
    #temperature = [20.0 310.0 320.0]


Some explanations of the code:

- The ".val" is in fact a python 'property' for reading and mutating the value
  of the parameter.
- In the file, we have a parameter named "lambda", but in the code, we access it
  with a trailing underscore as 'lambda' is a python keyword.
- The value '?' in the file will correspond to 'None' in python code.
- The ".raw_val" is similar to the ".val". The different is that the latter will
  give a value where the macro's are expanded, whereas the former will give the
  original string. More information regarding macros is beyond the scope of this
  docstring, please refere to the document.
- Notice an expression like: 'temperature[0].val' returns the value of the 0-th
  element of the list, while 'temperature.val' returns a buildin 'list' object
  for the entire list.

Copyright Schrodinger, LLC. All rights reserved.
"""

# Contributors: Yujie Wu

import keyword
import re
import weakref
from copy import deepcopy

import schrodinger.infra.ark as dark

from .common import boolean
from .common import debug_print
from .common import is_equal

Set = set


def _val_filter(val):
    """
    Filters a given value 'val' and returns a new 'Sea' (or its derivatives)
    object representing the value.

    `val` object can be of the following types:

    - 'Sea' or derivative: This function will just return a deep copy of 'val'
    - list or tuple: This function will return a 'List' (see below) object.
    - Other non dict types: This function will return a 'Atom' object.
    """
    if (isinstance(val, Sea)):
        return deepcopy(val)
    elif (isinstance(val, list) or isinstance(val, tuple)):
        return _list_filter(val)
    return Atom(val)


def _list_filter(val_list):
    """
    Returns a 'List' object for the given list of objects.  Usually, we do not
    need to call this function directly. Call '_val_filter' instead.

    :param val_list: This should be either a list or a tuple object. The
                     elements of the list or tuple can be of any type.
    """
    ret = List()
    for e in val_list:
        ret.append(e)

    return ret


# Contains the definitions of macro's that are used by all 'Sea' objects. The
# keys are macros names, and the values are values that the macros represent.
_macro_dict = {}


[docs]def update_macro_dict(new_macro_dict): global _macro_dict _macro_dict.update(new_macro_dict)
[docs]def set_macro_dict(new_macro_dict): global _macro_dict _macro_dict = new_macro_dict
[docs]def get_macro_dict(): return _macro_dict
def _escape_dollar(s): """ This function is needed by the 'expand_macro' function below. It replaces dollar char ``$`` as in the following cases: ``$$, $[, $]`` In a string with the '\x00' char. The results for these cases will be the following: \x00\x00, \x00[, \x00] respectively. """ s = s.replace("$[", "\x00[", -1) s = s.replace("$]", "\x00]", -1) s = s.replace("$$", "\x00\x00", -1) return s _STRIP_BRACKET_PATTERN = re.compile(r"\x00\[[^\[]*?\$[^\]]*?\x00\]") def _strip_bracket(s): """ Mutates a given string 's' by following the following rules: 1. If there is a dollar sign '$' within a fragment of string that is delimited by '\x00[' and '\x00]', the entire fragment including the '\x00[' and '\x00]' delimiters will be removed. 2. If there is no dollar sign within the fragment, then the fragment will be retained, and only the delimiters will be removed. Examples:: # Returns "desmond" _strip_bracket( "desmond\x00[_$JOBNAME\x00]" ) # Returns "desmond_jobname" _strip_bracket( "desmond\x00[_jobname\x00]" ) This function is usually used together with the '_escape_dollar' function in the 'expand_macro' function (see below). """ s = _STRIP_BRACKET_PATTERN.sub("", s) s = s.replace("\x00[", "") s = s.replace("\x00]", "") s = s.replace("\x00\x00", "$") return s
[docs]def expand_macro(s, macro_dict): """ Replaces the macros in the string 's' using the values given by the macro dictionary 'macro_dict'. The expanded string will be returned. Rules or conventions about macros and expansion: - All macros should start with a single '$', followed by capital letters, e.g., "$JOBNAME", "$USERNAME". - Optional string fragments should be bracketed by '$[' and '$]', e.g., "myjob$[_lambda$LAMBDANO$]", where "_lambda$LAMBDANO" is an optional string fragment. - Optional string fragments will be retained with the brackets '$[' and '$]' stripped off if the macro '$LAMBDANO' is defined; otherwise, the optional string together with the brackets will be removed. """ s = _escape_dollar(s) for m, v in list(macro_dict.items()): s = s.replace(m, str(v), -1) return _strip_bracket(s)
[docs]class Sea: """ This is the base class the 'Atom', 'List', and 'Map' classes. A 'Sea' object will manage three types of information: tag: As the name suggests, tags allow users of the 'Sea' object to label the object with strings and latter on to extract or print those with certain tags. parent: Pointer (weak reference) to the parent of this 'Sea' object. Operations to manipulate these data are defined in this base class. """ @staticmethod def _normalize_tag(tag): """ User can supply either a single string, or a list, or a set of strings as tags. This function will normalize the string or list types of tags to the set type. This function will return a set object. """ if (isinstance(tag, str)): tag = [ tag, ] return set(tag) @staticmethod def _gen_sea(ark, parent): """ Creates a new 'Map' or 'List' or 'Atom' object from the given data 'ark' and sets the parent of this new object to the given 'parent', and then returns this new object. :param ark: The data object for which to create a new 'Map' or 'List' or 'Atom' object for. The 'ark' must be of a buildin type. If it is a dict, a 'Map' object will be created and returned; if it is a list or tuple, a 'List' object will be created and returned; otherwise, an 'Atom' object will be created and returned. :param parent: The parent of the new object. It can be None. """ if (isinstance(ark, dict)): v = Map(ark, parent) elif (isinstance(ark, list) or isinstance(ark, tuple)): v = List(parent=parent) if (ark != []): atomtype_directives = { "!int!": int, "!real!": float, "!str!": str, "!bool!": bool, } try: atomtype = atomtype_directives[ark[0]] for e in ark[1:]: v.quick_append(Hydrogen(e, atomtype, parent)) except: for e in ark: v.quick_append(Sea._gen_sea(e, v)) else: v = Atom(ark, parent=parent) return v
[docs] def __init__(self, parent=None): """ Creates and initializes a 'Sea' object. :param parent: This parameter provides the parent of this 'Sea' object. It can be None. """ # In case you wonder, a subclass of `Sea` may overload the __setattr__ # method. We don't want to call that method for these attributes since # it could form an endless recursion. self.__parent = None self.__tag = Set() if (Set) else None self.__pmode = None # None | "hier" | "path" self.__parent = None if (parent is None) else weakref.ref( parent) #self.set_parent( parent )
def __getstate__(self): """ This is needed for pickling. The returned object is of a buildin type (dict, list, int, str, float, ...). """ return self.raw_val def __setstate__(self, ark): """ Restores the 'Sea' object from an object of the buildin type. """ v = Sea._gen_sea(ark, None) self.__dict__.update(v.__dict__) def __deepcopy__(self, memo={}): # noqa: M511 """ """ raise NotImplementedError() def __str__(self): """ Subclasses of the 'Sea' class must implement this function if conversion of the subclass's instances to strings is needed. """ raise NotImplementedError( "`__str__` method not implemented for sea.%s class" % self.__class__.__name__) def __ne__(self, rhs): """ Returns True if the value of this object does not equal that of 'rhs'. """ return not self.__eq__(rhs) def __eq__(self, rhs): """ Returns True if the value of this object equals that of 'rhs'. Subclasses of the 'Sea' class must implement this function. """ raise NotImplementedError( "`__eq__` method not implemented for sea.%s class" % self.__class__.__name__) def __hash__(self): return hash(id(self)) def __get_sval(self): """ Used by the 'sval' property, which is to get an 'Sea' object of the value that is represented by this 'Sea' object. Normally this function just returns itself. But when this 'Sea' object is referring to another 'Sea' object, the latter should be returned -- this functionality is left to subclasses to implement if it makes sense for that subclass. """ return self sval = property(fget=__get_sval, doc="Readonly. Returns the current `Sea` object.")
[docs] def apply(self, op): """ Recursively applies the operation as given by 'op' to all 'Sea' subobjects of this 'Sea' object. """
[docs] def parent(self): """ Rerturns the parent of this 'Sea' object or None if it does not have a parent. """ return None if (self.__parent is None) else self.__parent()
[docs] def set_parent(self, parent): """ Sets the parent of this 'Sea' object to the given 'parent'. """ self.__parent = None if (parent is None) else weakref.ref(parent) if (self.__parent): self.set_pmode(self.__parent().pmode()) return self
[docs] def tag(self): """ Returns tags, which are 'set' objects. """ return self.__tag
[docs] def has_tag(self, tag): """ Returns True if we already tagged this object with the given 'tag'. :param tag: The given 'tag' can be a string, or a list of strings, or a 'set' of strings. """ if (self.__tag is not None): return self._normalize_tag(tag).issubset(self.__tag)
def _add_tag_impl(self, tag, propagate): """ """ if (self.__tag is not None): self.__tag |= tag if (propagate): self.apply(lambda v: v._add_tag_impl(tag, propagate))
[docs] def add_tag(self, tag, propagate=True): """ Tags this object with another string(s). :param tag: The given 'tag' can be a string, or a list of strings, or a 'set' of strings. :param propagate: If True, the function will propagate the operation to all 'Sea' subobjects. """ if (self.__tag is not None): self._add_tag_impl(self._normalize_tag(tag), propagate)
def _remove_tag_impl(self, tag, propagate): """ """ if (self.__tag is not None): self.__tag -= tag if (propagate): self.apply(lambda v: v._remove_tag_impl(tag, propagate))
[docs] def remove_tag(self, tag, propagate=True): """ Removes a tag. :param tag: The given 'tag' can be a string, or a list of strings, or a 'set' of strings. :param propagate: If True, the function will propagate the operation to all 'Sea' subobjects. """ if (self.__tag is not None): self._remove_tag_impl(self._normalize_tag(tag), propagate)
def _reset_tag_impl(self, tag, propagate): """ """ if (self.__tag is not None): self.__tag = tag if (propagate): self.apply(lambda v: v._reset_tag_impl(tag, propagate))
[docs] def reset_tag(self, tag, propagate=True): """ Resets the tag of this object to the given 'tag'. :param tag: The given 'tag' can be a string, or a list of strings, or a 'set' of strings. :param propagate: If True, the function will propagate the operation to all 'Sea' subobjects. """ if (self.__tag is not None): self._reset_tag_impl(self._normalize_tag(tag), propagate)
[docs] def clear_all_tag(self, propagate=True): """ Removes all tags. :param propagate: If True, the function will propagate the operation to all 'Sea' subobjects. """ if (self.__tag is not None): self.__tag = set() if (propagate): self.apply(lambda v: v.clear_all_tag(propagate))
[docs] def pmode(self): """ Returns the printing mode. """ return self.__pmode if (self.__pmode) else "hier"
[docs] def set_pmode(self, pmode, propagate=True): """ Resets the printing mode of this object to the given 'pmode'. :param propagate: If True, the function will propagate the operation to all 'Sea' subobjects. """ self.__pmode = pmode if (propagate): self.apply(lambda v: v.set_pmode(pmode, propagate))
def _dump_impl(self, tag): """ """ raise NotImplementedError( "Subclass of 'Sea' does not implement the '_dump_impl' method: %s" % type(self))
[docs] def dump(self, tag=set()): # noqa: M511 """ Converts this 'Sea' object into a string that looks ugly (yet syntactically correct). This method is 50%-900% faster than the __str__ method. """ return self._dump_impl(self._normalize_tag(tag))
[docs]class Atom(Sea): """ This class represents the "atomic" parameters in a config file. Atomic parameters do not contain further sub-elements. For example, 'force.type' is an atomic parameter, whereas 'force' is not because it has sub-elements like 'type', 'gibbs', etc. Public attributes: - validate - None by default. This attribute can be set to be a callable object that assesses whether a given value is legal or not for this 'Atom' object. The callable object should be able to take one argument -- the value subject to its assessment and returns either True (if the value is OK) or False (if the value is illegal). """ WILDCARD_PATTERN = re.compile(r'\*\ *\.') # For detecting circular references. __refmemo = []
[docs] @staticmethod def num_wildcard(s): """ This function is used to tell about strings like "*.*.keyword". It returns a tuple object. The first element is the number of wildcards "*", the second element is the word at the end after the '.' symbol. For example: num_wildcard( "*.*.keyword" ) will yield (2, "keyword"). See more examples in the unit tests below. """ length = len(s) p = Atom.WILDCARD_PATTERN.search(s) if (p is None): return 0, s n, m, r = 1, 0, "" i, j = p.span() if (j < length): m, r = Atom.num_wildcard(s[j:]) return n + m, r
[docs] @staticmethod def guess_value_type(s): """ Guesses the type of the object that 's' represents. A tuple will be returned. The first element is the object of the guessed type, the second element is the type. If 's' is a non-str object of buildin type, 's' and its type will be returned. If 's' is str object, the type of the object that the string represents will be guessed and the string will be converted to an object of the guessed type. Note these strings: "yes", "true", "on", "no", "false", and "off" will be considered as bool type of objects. If 's' is None, (None, None) will be returned. If 's' is of other types than the above, a ValueError exception will be returned. """ import numpy as np val = None type = None if (s is None): pass elif (isinstance(s, bool)): val = s type = boolean elif (isinstance(s, (int, np.integer))): val = int(s) type = int elif (isinstance(s, (float, np.floating))): val = float(s) type = float elif (isinstance(s, (bytes, str))): # Extracts the value from the string `s` and determines the value type. try: val = int(s) type = int except ValueError: try: val = float(s) type = float except ValueError: if isinstance(s, bytes): s = s.decode() try: val = boolean(s) type = boolean except ValueError: val = str(s) type = str else: raise ValueError("cannot convert '%s' to `sea.Atom`" % str(s.__class__)) return ( val, type, )
[docs] def __init__(self, s=None, type=None, parent=None): """ Constructs a 'Atom' object based on the value 's'. :param s: Can be a floating number, a boolean value, an integer number, or a string. If 's' is a string. the function will try to guess the type of object that the string represents and convert the string to the guessed type. 's' cannot be a dict, or tuple, or dict object. :param type: Supplies a type, instead of using the guessed one. If it is None, the guessed type will be used. :param parent: Specifies the parent of this 'Atom' object. """ Sea.__init__(self, parent) if (isinstance(s, dict)): raise ValueError( "cannot construct an `sea.Atom` object from a dict object") elif (isinstance(s, list)): raise ValueError( "cannot construct an `sea.Atom` object from a list object") elif (isinstance(s, tuple)): raise ValueError( "cannot construct an `sea.Atom` object from a tuple object") if (type is None): self._val, self._type = Atom.guess_value_type(s) else: self._val = None if (s is None) else type(s) self._type = boolean if (type is bool) else type
def __str__(self): """ Converts the value to a string. If the value itself is a string, then returns the value flanked with double-quotes (i.e., "<value>"). """ val = self._val if (self._val is None): return "?" elif (self._type is boolean): return "true" if (val) else "false" elif (self._type is str): if (re.match("^[A-Za-z0-9_-]+$", val)): return val s = "" for e in val: s += ('\\' if (e in [ '"', '\\', ]) else '') + e return '"' + s + '"' else: return str(val) def __eq__(self, rhs): """ Returns True if the value of this object equals that of 'rhs'. If the values of both this and 'rhs' are floating numbers, the `is_equal` function (see above) will be used for comparison. If 'rhs' is not an 'Atom' object, False will be returned. """ if (not isinstance(rhs, Atom)): return False val = self._val va_ = rhs._val result = True if (val is None and va_ is None) else (False if (val is None or va_ is None) else (is_equal(val, va_) if (self._type is float) else (val == va_))) if (not result): debug_print("atom comparison: '{}' '{}'".format( str(val), str(va_), )) return result def __deepcopy__(self, memo={}): # noqa: M511 """ """ newobj = self.__new__(self.__class__) memo[id(self)] = newobj newobj._type = self._type newobj._val = self._val newobj._Sea__tag = deepcopy(self._Sea__tag) newobj._Sea__pmode = self._Sea__pmode newobj._Sea__parent = None return newobj def __set_val(self, val): """ Sets the value. :param val: Can be any object as long as it can be converted to the internal type of the value via the `_convert` method. If 'val' cannot be converted, a ValueError exception will be raised. """ if (val is None): self._val = val else: if (self._type is None): self._val, self._type = Atom.guess_value_type(val) else: try: self._val = self._type(val) except ValueError: raise ValueError("Invalid value") def __get_raw_val(self): """ Returns the raw value. """ return self._val def __get_sval(self): """ Used by the `sval` property, which is to get an `Sea` object of the value that is represented by this `Sea` object. If the value is a reference, the `sval` of the referenced `Sea` object will be returned; otherwise, this object will be returned. This function will raise ValueError if the reference is invalid or if the reference is circular. """ # A general comment on `__refmemo`: We use it as a memo to keep track # of references that this call is going through during the dereferencing # process. This is because the value pointed to by the current ref # could itself be another reference, and so on. `__refmemo` needs kept # until the function exits. va_ = self._val if (va_ is not None and self._type is str): va_ = expand_macro(va_, get_macro_dict()) val = va_.strip() if (val != "" and val[0] == '@'): if (self in Atom.__refmemo): s = "Circular reference:\n " for e in Atom.__refmemo: s += ("%s => " % _pathname(e)) s += _pathname(self) Atom.__refmemo = [] raise ValueError(s) else: Atom.__refmemo.append(self) val = val[1:].strip() if (val[0] == '.'): root = self parent = root.parent() while (parent is not None): root = parent parent = root.parent() key = val[1:] elif (val[0] == '*'): num_parent, key = Atom.num_wildcard(val) root = self.parent() for i in range(num_parent): if (root is None): Atom.__refmemo = [] raise ValueError("Invalid reference: %s" % val) root = root.parent() else: key = val root = self parent = root.parent() while (parent is not None): root = parent parent = root.parent() if (isinstance(root, Map) and key in root): ret = root.get_value(key).sval Atom.__refmemo = [] return ret if (isinstance(root, Map) and key in root): ret = root.get_value(key).sval Atom.__refmemo = [] return ret Atom.__refmemo = [] raise ValueError("Invalid reference: %s" % val) Atom.__refmemo = [] return self def __get_bval(self): """ Returns a new `Sea` object, which has all macros expanded and references dereferenced. """ sval = self.sval if (sval is self): newobj = Atom(self.val) newobj.reset_tag(self.tag()) else: newobj = sval.bval return newobj def __get_dval(self): """ Returns a new `Sea` object, which has all references dereferenced, but macros are preserved. """ sval = self.sval if (sval is self): newobj = Atom(self.raw_val) newobj.reset_tag(self.tag()) else: newobj = sval.dval return newobj def __get_val(self): """ Returns the value. If the raw value is a string, the returned value will have all macros (if any) expanded. If the value is a reference, the actual referenced value will be returned. """ sval = self.sval if (sval is self): val = self._val if (val is not None and self._type is str): val = expand_macro(val, get_macro_dict()) else: val = sval.val return val raw_val = property(fget=__get_raw_val, fset=__set_val, doc="Readwrite. When read, this returns the raw value.") sval = property(fget=__get_sval, doc="Readonly. Returns the dereferenced `Sea` object.") bval = property(fget=__get_bval, doc="Readonly. Returns a new `Atom` object, which has all macros expanded and references " \ "dereferenced.") dval = property( fget=__get_dval, doc="Readonly. Returns a new `Atom` object with dereferenced value.") val = property(fget=__get_val, fset=__set_val, doc="Readwrite. When read, this returns the current value.")
[docs] def update(self, val, tag=set()): # noqa: M511 """ Updates the value with 'val'. If 'val' is a `Atom`, then this `Atom` object will be altered to be the same as 'val'. So the type of the value of this object can be altered by the `update` function. If 'val' is not a `Atom`, then this function will behave exactly the same as setting the value via the 'val' property. :param val: Can be any object as long as it can be converted to the internal type of the value via the `_convert` method. If 'val' cannot be converted, it is ignored. :param tag: Add the tag to this object. """ if (isinstance(val, Atom)): self._val = val._val self._type = val._type else: self.__set_val(val) self.add_tag(tag)
def _dump_impl(self, tag): """ """ return self.__str__()
[docs]class Hydrogen(Atom):
[docs] def __init__(self, s, type, parent=None): """ Just a slightly faster way to construct a 'Atom' object """ Sea.__init__(self, parent) self._val = type(s) self._type = type
[docs]class List(Sea, list): """ This class represents the "list" parameters in a config file. This class' behavior is very similar to that of the buildin `list` class. """
[docs] def __init__(self, val=[], parent=None): # noqa: M511 """ Constructs a `List` object from the given 'val'. :param val: Must be either a list or tuple. The elements must be of either buildin types or `Sea`. :param parent: Specifies the parent of this `List` object. """ Sea.__init__(self, parent) if (self.parent()): self.set_pmode(self.parent().pmode()) self.val = val
def __reduce__(self): """ DESMOND-10233: Patch for python 3.8. In the new version of python, list.__reduce__() has a special optimization which will extend pickled values on restore instead of calling `__getstate__`. This mechanism is not compatible with how the Sea parser works. Instead, fall back to `Sea.__reduce__()` here so the pickled List can be loaded. """ return super().__reduce__() def __getitem__(self, i): if isinstance(i, slice): return List(list.__getitem__(self, i), self.parent()) return list.__getitem__(self, i) def __getslice__(self, i, j): """ Returns a new list composed of elements deep-copied from this list from i-th through the j-th element (but not including the j-th element). The parent of the new list remains the same as this list. DEPRECATION: This has been deprecated since Python version 2.0, and does NOT work any more in Python version 3.x. The replacement is the `__getitem__` method accepting a `slice` object. """ new_list = List(list.__getslice__(self, i, j), self.parent()) return new_list def __add__(self, rhs): """ Returns a new list composed of elements deep-copied from this list and the 'rhs' list. The parent of the new list remains the same as this list. :param rhs: The 'rhs' can be a buildin list or tuple object. In this case, a `Sea` object will be made out of 'rhs' and then concatenated to the copy of this list. """ return deepcopy(self).extend(rhs) def __iadd__(self, rhs): """ Updates this list with the given 'rhs'. :param rhs: The 'rhs' can be a buildin list or tuple object. In this case, a `Sea` object will be made out of 'rhs' and then concatenated to this list. """ return self.extend(rhs) def __setitem__(self, index, val): """ Resets the 'index'-th element with the given value 'val'. :param val: Can be either a 'Sea' object or a buildin type object. In the latter case, a 'Sea' object will be made out of 'val' and then be used to set the 'index'-th element. """ if (not isinstance(val, Sea)): raise ValueError("val is not a sea.Sea object") list.__setitem__(self, index, val) self[index].set_parent(self) def __str__( self, ind="", tag=set(), # noqa: M511 pre="", is_list_elem=False, outdent=""): """ Converts this list to a string in the ark syntax and returns the string. :param ind: Indentation of the converted string. :param tag: Converts only the elements with the given tag. """ s = "[" if (is_atom_list(self)): for e in self: s += str(e) + " " s += "]" else: ind2 = ind + " " i = 0 if (is_list_elem): i = 1 e = self[0] if (isinstance(e, Atom)): s += str(e) elif (isinstance(e, Map)): s += "{{{}{}}}\n".format( e.__str__(ind2, tag, pre, True), ind, ) elif (isinstance(e, List)): if (is_atom_list(e)): s += "%s" % e.__str__(ind2, tag, pre, True, ind) else: s += "%s\n" % e.__str__(ind2, tag, pre, True, ind) n = len(self) while (i < n): e = self[i] i += 1 if (isinstance(e, Atom)): s += "\n{}{}".format( ind, str(e), ) elif (isinstance(e, Map)): s += "\n{}{{{}{}}}\n".format( ind, e.__str__(ind2, tag, pre, True), ind, ) elif (isinstance(e, List)): if (is_atom_list(e)): s += "\n{}{}".format( ind, e.__str__(ind2, tag, pre, True, ind), ) else: s += "\n{}{}\n".format( ind, e.__str__(ind2, tag, pre, True, ind), ) s += ("%s]" % outdent) if (s[-1] == "\n") else ("\n%s]" % outdent) return s def __eq__(self, rhs): """ Returns True if the value of this object equals that of 'rhs'. """ if (not isinstance(rhs, list)): return False if (self is not rhs): len1 = len(self) len2 = len(rhs) if (len1 != len2): debug_print("list comparison: list lengthes: %d %d" % ( len1, len2, )) return False for i, e1, e2 in zip(list(range(len1)), self, rhs): if (e1 != e2): debug_print("list comparison: element %d changed" % i) return False return True
[docs] def __contains__(self, item): return list.__contains__(self, _val_filter(item))
def __deepcopy__(self, memo={}): # noqa: M511 """ """ newobj = self.__new__(self.__class__) memo[id(self)] = newobj newobj._Sea__tag = deepcopy(self._Sea__tag) newobj._Sea__pmode = self._Sea__pmode newobj._Sea__parent = None for e in self: newobj.quick_append(deepcopy(e, memo)) return newobj def __set_val(self, val): """ Sets the value, which must be a list or tuple. """ if (isinstance(val, tuple) or isinstance(val, list)): list.__init__(self) for e in val: self.append(e) else: raise ValueError("value is not a list or tuple") def __get_raw_val(self): """ Returns the raw value. """ val = [] for e in self: val.append(e.raw_val) return val def __get_bval(self): """ Returns a new `Sea` object, which has all macros expanded and references dereferenced. """ newobj = List() for e in self: newobj.append(e.bval) _asea_copy(self, newobj) return newobj def __get_dval(self): """ Returns a new `Sea` object with dereferenced values. """ newobj = List() for e in self: newobj.append(e.dval) _asea_copy(self, newobj) return newobj def __get_val(self): """ Returns the value with macro's expended and references dereferenced if the raw value is a string. """ val = [] for e in self: val.append(e.val) return val raw_val = property( fget=__get_raw_val, fset=__set_val, doc="Readwrite. When read, this returns the current value.") bval = property(fget=__get_bval, doc="Readonly. Returns a new `List` object, which has all macros expanded and references " \ "dereferenced.") dval = property( fget=__get_bval, doc="Readonly. Returns a new `List` object dereferenced values.") val = property(fget=__get_val, fset=__set_val, doc="Readwrite. When read, this returns the current value.")
[docs] def append(self, val): """ Appends the given value 'val' to this list. """ list.append(self, _val_filter(val)) self[-1].set_parent(self)
[docs] def quick_append(self, val): """ Appends the given value 'val' to this list. """ list.append(self, val) self[-1].set_parent(self)
[docs] def extend(self, val_list): """ Extends this list with the given list or tuple 'val_list'. """ n = len(val_list) list.extend(self, _val_filter(val_list)) for e in range(-n, 0): self[e].set_parent(self) return self
[docs] def insert(self, index, val): """ Inserts the given value 'val' to the 'index'-th position in the list. """ list.insert(self, index, _val_filter(val)) self[index].set_parent(self)
[docs] def pop(self, index=-1): """ Removes the 'index'-th element from the list and returns the removed element. The parent of the returned element will be set to None. """ e = list.pop(self, index) e.set_parent(None) return e
[docs] def index(self, obj, **kwargs): return list.index(self, _val_filter(obj), **kwargs)
[docs] def apply(self, op): """ Recursively applies the operation as given by 'op' to all 'Sea' subobjects of this 'Sea' object. """ for e in self: op(e)
[docs] def update(self, ark=None, tag=set()): # noqa: M511 """ Updates this list with 'ark'. :param ark: If 'ark' is None, no effects. If the first element has a string value "!append!", the remaining elements in 'ark' will be appended to this list. If the first element has a string value "!removed!", the remaining elements in 'ark' will be removed from this list. Otherwise, this list will be completely reset to 'ark'. :param tag: Resets the tag of this list to the given 'tag'. """ if (ark is None): return self # Ensures that an execution list is immutable. if (self and isinstance(self[0], Atom) and self[0].val in [ "!append!", "!remove!", ]): return self if (not isinstance(ark, list)): raise ValueError("type of the `ark` argument is not list") ark = ark if (isinstance(ark, Sea)) else _val_filter(ark) if (ark == []): list.__init__(self, []) else: e0 = ark[0] if (isinstance(e0, Atom) and isinstance(e0.val, str)): e0 = e0.val.lower() if (e0 == "!append!"): self.extend(ark[1:]) elif (e0 == "!remove!"): index = set() for a in ark[1:]: for i, e in enumerate(self): if (e == a): index.add(i) index = list(index) index.sort(reverse=True) for i in index: del self[i] else: list.__init__(self, deepcopy(ark)) for e in self: e.set_parent(self) else: list.__init__(self, deepcopy(ark)) for e in self: e.set_parent(self) self.reset_tag(tag)
def _set_value_helper(self, key_index_list, value, tag): """ This functions uses the given 'key_index_list' to locate the `Sea` object and changes its value to the given 'value'. """ k0 = key_index_list[0] k_ = key_index_list[1:] if (k_ == []): value.set_parent(self) self[k0] = value else: try: v = self[k0] except IndexError: raise IndexError( "cannot assign value to a non-existing element in list") else: if (isinstance(v, Atom)): raise KeyError() v._set_value_helper(k_, value, tag) self.add_tag(tag, propagate=False) def _del_key_helper(self, key_index_list): k0 = key_index_list[0] k_ = key_index_list[1:] if k_ == []: del self[k0] else: try: v = self[k0] except IndexError: raise IndexError("cannot delete non-existing element in list") else: if isinstance(v, Atom): raise KeyError("cannot delete non-existing key") v._del_key_helper(k_) def _dump_impl(self, tag): """ """ s = "[" for e in self: s += ("{" + e._dump_impl(tag) + "}" if (isinstance(e, Map)) else e._dump_impl(tag)) + "\n" return s + "]"
[docs]class Key(str): """ An instance of this class will be used as the key of the `Map` class (see below). """ def __new__(cls, key): """ Creates a new `Key` object for the given string 'key'. If 'key' is a python keyword, a underscore '_' will be appended to the original string. """ if (isinstance(key, Key)): key = key.orig_key() else: key = str(key) if (keyword.iskeyword(key)): key = key + "_" return str.__new__(cls, key)
[docs] def __init__(self, key): """ Constructs a new `Key` object for the given string 'key'. """ if (isinstance(key, Key)): self._orig_key = key.orig_key() else: self._orig_key = str(key)
# __init__
[docs] def orig_key(self): """ Returns the original string. """ return self._orig_key
[docs]class Map(Sea): """ This class represents the "map" parameters in a config file. This class' behavior is very similar to that of the buildin `dict` class. """ INDEX_PATTERN = re.compile(r'([^\[]*)\[ *([+-]*[1234567890]*) *\]') INDEX_PATTERN2 = re.compile(r'\[ *[+-]*([1234567890]*) *\]') @staticmethod def _get_key_index(s): """ Parses a string that represents a key-index object in the form of "key[index]". Note that in this form, there is only one key, but may be more than 1 indices (e.g., key[index1][index2]). This function returns a list, where the first element is the key (a string object), and remaining are indices (integer objects). For example, _get_key_index( "key[1][2]" ) should return ["key", 1, 2,]. """ length = len(s) p = Map.INDEX_PATTERN.search(s) if (p is None): return [ s, ] key = p.group(1).strip() index = [ key, int(p.group(2)), ] i, j = p.span() if (j < length): index.extend(Map._get_index(s[j:])) return index @staticmethod def _get_index(s): """ Parses a string that represents an index of a list in the form of "[index]". Note that in this form, there may be more than 1 indices (e.g., [index1][index2]). This function returns a list, where the elements are the indices (integer objects). For example, _get_index( "[1][2]" ) should return [1, 2,]. This function is used by '_get_key_index' function (see above). """ length = len(s) p = Map.INDEX_PATTERN2.search(s) if (p is None): return [] index = [ int(p.group(1)), ] i, j = p.span() if (j < length): index.extend(Map._get_index(s[j:])) return index @staticmethod def _parse_composite_key(key): """ Parses a composite key in the form of, e.g., "keyword1.keyword2[1][2].keyword3", and returns a list, e.g., ["keyword1", "keyword2", 1, 2, "keyword3",]. """ key = key.split('.') ret = [] for k in key: k = k.strip() ret.extend(Map._get_key_index(k)) return ret
[docs] def __init__( self, ark=dark.fromString("{}"), # noqa: M511 parent=None): """ Constructs a 'Map' object with a given 'ark'. The 'ark' can be of the following types of objects: - dict The 'Map' object will be constructed consistent with the dict object. - Map The 'ark' will be deep-copied. - str The string will be parsed and the 'Map' object will be constructed for the parsed string. - list The elements of the list object must be of the above types. A new 'Map' object will be constructed using the first elements, and then the 'Map' object will be updated with the remaining objects in the list. If 'ark' is not provided, an empty 'Map' will be constructed. User can optionally specify the 'parent' parameter, which will set the parent of this 'Map' object to the given value of the 'parent' parameter. """ Sea.__init__(self, parent) ark_update = [] if (isinstance(ark, list)): ark_update = ark[1:] ark = ark[0] if isinstance(ark, bytes): ark = ark.decode('utf-8') if isinstance(ark, str): ark = ark.strip() if (ark == "" or ark[0] != "{"): ark = "{" + ark + "\n}" ark = dark.fromString(ark) keys = list(ark) for k in keys: self.__dict__[Key(k)] = Sea._gen_sea(ark[k], self) if ark_update != []: self.update(ark_update)
# __init__ def __setattr__(self, key, value): """ Sets an attribute called 'key' with 'value'. - 'key' can be a str object or a 'Key' object. - 'value' can be a 'Sea' object or other types of object (including None). - If 'key' already exists in this 'Map' object and its value is a 'Sea' object, then 'value' must be either None or a - 'Sea' object. In other words, you cannot change a 'Sea' value to a non-'Sea' value except for None. - The parent of the 'value', if it is a 'Sea' object, will reset to this 'Map' object after this function. """ if (isinstance(value, Sea)): if ("_Sea__" == key[:6]): key = str(key) else: key = Key(key) else: if (key in self.__dict__): old_value = self.__dict__[key] if (value is not None and isinstance(old_value, Sea) and not isinstance(value, Sea)): raise ValueError( "cannot reassign attribute '%s' to a non-`sea.Sea` object" % key) self.__dict__[key] = value if (isinstance(key, Key) and isinstance(value, Sea)): value.set_parent(self) def __str__( self, ind="", tag=set(), # noqa: M511 pre="", is_list_elem=False): """ Converts this 'Map' object into a string representation. - 'ind' specifies the indentation of the converted string. - 'tag' specifies the tags. And only the keys with the specified tags will be converted. 'tag' can be a string (representing a single tag) or a 'list' or 'set' of strings (representing multiple tags). - 'pre' specifies the prefix. """ s = "" ind2 = ind + " " key_value = self.key_value(tag, should_sort=True) is_first = True for k, v in key_value: ind3 = "" if (is_first and is_list_elem) else ind k = k.orig_key() is_first = False if (isinstance(v, Map)): if (v.pmode() == "hier"): s += "{}{} = {{\n{}{}}}\n".format( ind3, k, v.__str__(ind2, tag), ind, ) elif (v.pmode() == "path"): s += v.__str__(ind, tag, pre + k + ".", is_list_elem) else: raise ValueError("Unrecoginized value of pmode: %s" % v.pmode()) elif (isinstance(v, Atom)): if (v.pmode() == "hier"): s += "{}{} = {}\n".format( ind3, k, str(v), ) elif (v.pmode() == "path"): s += "{}{}{} = {}\n".format( ind3, pre, k, str(v), ) elif (isinstance(v, List)): if (v.pmode() == "hier"): s += "{}{} = {}\n".format( ind3, k, v.__str__(ind2, tag, outdent=ind), ) elif (v.pmode() == "path"): s += "{}{}{} = {}\n".format( ind, pre, k, v.__str__(ind2, tag), ) return s def __eq__(self, rhs): """ Compares this 'Map' object with another 'Map' object 'rhs', returns True if the keys and values in both are equal, or False if otherwise. Note if '__CHECK_SEA_DEBUG' is set to True, difference in detail will be printed to the stdout. """ if (not isinstance(rhs, Map)): return False if (self is rhs): return True for k, v in self.key_value(): if (isinstance(v, Map) or isinstance(v, Atom)): debug_print(f"kv: {k} {v} {self.raw_val} {rhs.raw_val}") try: rv = rhs[k] except KeyError: debug_print("map comparison: key '%s' was lost" % k) return False if (rv.__class__ != v.__class__): debug_print( "map comparison: key '{}'s type changed: {} vs {}". format( k, rv.__class__, v.__class__, )) return False if (v != rv): debug_print("map comparison: key '%s's value changed" % k) return False elif (isinstance(v, List)): try: rv = rhs[k] except KeyError: debug_print("map comparison: key '%s' was lost" % k) return False if (isinstance(rv, List)): if (v != rv): debug_print("map comparison: key '%s' changed" % k) return False else: debug_print( "map comparison: key '%s's type changed: %s vs %s" % (k, rv.__class__, v.__class__)) return False for k, v in rhs.key_value(): if (k not in self): debug_print("map comparison: key '%s' present only in the rhs" % k) return False return True def __hash__(self): return hash(id(self)) def __iter__(self): """ This is the iterator fr the support for the 'for ... in ...' syntax. The element yielded is the key of this 'Map' object. """ keys = self.keys() yield from keys def __getitem__(self, key): """ Returns the value associated with the 'key'. - 'key' should be string or 'Key' object. - If 'key' does not exist in this 'Map' object, the 'KeyError' exception will be raised. """ try: v = self.__dict__[Key(key)] if (not isinstance(v, Sea)): raise KeyError except KeyError: raise KeyError("key '%s' not found" % key) return v def __setitem__(self, key, value): """ Associates the given 'value' with the given 'key'. - 'key' should be string or 'Key' object. - If 'key' does not exist in this 'Map' object before, the 'key' will be put into the 'Map' object. - 'value' will be processed first by the '_val_filter' function (see above), the 'Sea' object returned by the '_val_filter' function will be the actual object associated with the key. So any object acceptable by the '_val_filter' function can be used here. """ value = _val_filter(value) value.set_parent(self) self.__dict__[Key(key)] = value def __delitem__(self, key): """ Removes a key and the associated value from the 'Map' object. """ key = Key(key) value = self.__dict__[key] if (not isinstance(value, Sea)): raise KeyError("key '%s' not in the `sea.Map` object" % key) del self.__dict__[key] def __deepcopy__(self, memo={}): # noqa: M511 """ """ newobj = self.__new__(self.__class__) memo[id(self)] = newobj newobj._Sea__parent = None newobj._Sea__pmode = self._Sea__pmode newobj_dict = newobj.__dict__ for k, v in list(self.__dict__.items()): new_v = deepcopy(v, memo) if (isinstance(new_v, Sea)): new_v.set_parent(newobj) newobj_dict[k] = new_v newobj._Sea__parent = None return newobj def __set_val(self, val): """ This function sets the value of the 'val' property (see below). :param val: 'val' must be either a dict or a 'Map' object, otherwise a 'ValueError' exception will be raised. """ if (isinstance(val, dict) or isinstance(val, Map)): val = _val_filter(val) keys = self.keys() for k in keys: del self[k] key_value = val.key_value() for k, v in key_value: v.set_parent(self) self[k] = v else: raise ValueError("value is not a dict or `sea.Map`") def __get_raw_val(self): """ This function returns the value of the 'raw_val' property (see below). Returns the raw value of this 'Map' object. The value will be in the form of a dict object, and all macros will be kept as is. """ val = {} for k, v in self.key_value(): val[k.orig_key()] = v.raw_val return val def __get_bval(self): """ Returns a new 'Sea' object, which has all macros expanded and references dereferenced. This function is used by the 'bval' property (see below). """ newobj = Map() for k, v in self.key_value(): newobj[k] = v.bval _asea_copy(self, newobj) return newobj def __get_dval(self): """ Returns a new `Sea` object, which has all references dereferenced but macros kept as is. This function is used by the 'dval' property (see below). """ newobj = Map() for k, v in self.key_value(): newobj[k] = v.dval _asea_copy(self, newobj) return newobj def __get_val(self): """ Returns the value of this 'Map' object. The value will be in the form of a dict object, and all macros will be expended and references dereferenced. """ val = {} for k, v in self.key_value(): val[k.orig_key()] = v.val return val raw_val = property( fget=__get_raw_val, fset=__set_val, doc= "Readwrite. When read, this returns the current raw value (references and macros kept as is)." ) bval = property(fget=__get_bval, doc="Readonly. Returns a new `Map` object, which has all macros expanded and references " \ "dereferenced.") dval = property( fget=__get_dval, doc="Readonly. Returns a new `Map` object with dereferenced values.") val = property(fget=__get_val, fset=__set_val, doc="Readwrite. When read, this returns the current value (macros will be expanded, and references" \ " will be dereferenced.")
[docs] def keys(self, tag=set()): # noqa: M511 """ Returns references of all keys in a list. Note each element in the returned list will be of the 'Key' type. """ ret = [] ret_append = ret.append for k, v in self.__dict__.items(): if isinstance(k, Key) and isinstance(v, Sea) and v.has_tag(tag): ret_append(k) return ret
[docs] def values(self, tag=set()): # noqa: M511 """ Returns references of all values in a list. Note each element in the returned list will be of the 'Sea' type. """ ret = [] ret_append = ret.append for k, v in self.__dict__.items(): if isinstance(k, Key) and isinstance(v, Sea) and v.has_tag(tag): ret_append(v) return ret
[docs] def key_value( self, tag=set(), # noqa: M511 should_sort=False): """ Returns the key and associated value in a list. Note each element in the returned list will be a 2-tuple object. The first element of the tuple is a reference of the key, and the second element is a reference of the value. User can optionally set the 'should_sort' parameter to True, which will let the function return a sorted list. The sorting will be based on the alphanumeric order of 'key'. """ ret = [] ret_append = ret.append for k, v in self.__dict__.items(): if isinstance(k, Key) and isinstance(v, Sea) and v.has_tag(tag): ret_append((k, v)) if should_sort: ret.sort() return ret
[docs] def clone(self, orig): """ Lets this 'Map' object become a deep copy of the 'orig' 'Map' object. """ for k in orig.__dict__: if (k != "_Sea__parent"): v = deepcopy(orig.__dict__[k]) if (isinstance(k, Key) and isinstance(v, Sea)): v.set_parent(self) self.__dict__[k] = v key_to_delete = [] for k in self.__dict__: if (k not in orig.__dict__): key_to_delete.append(k) for k in key_to_delete: del self.__dict__[k]
[docs] def apply(self, op): """ Recursively applies the operation as given by 'op' to all 'Sea' subobjects of this 'Sea' object. """ for v in self.values(): op(v)
[docs] def update(self, ark=None, file=None, tag=set()): # noqa: M511 """ Updates this 'Map' object with the given 'ark' or with the given 'file'. :param file: If 'file' is not None, it must be the name of a file in the ark file format. If 'file' is given, the 'ark' parameter will be ignored. :param ark: ark can be a string or a dict or a 'Map' object. Or ark can be list of the previous objects. """ if (file is not None): with open(file) as file_reader: ark = file_reader.read() if (ark is None): return self if (isinstance(ark, (bytes, str))): ark = Map(ark) elif (isinstance(ark, dict)): ark = Map(ark) elif (isinstance(ark, Map)): pass elif (isinstance(ark, list)): for e in ark: self.update(e, tag=tag) return self else: raise TypeError("Unsupported class of the `ark` argument: %s" % ark.__class__) key_value = ark.key_value() for k, v in key_value: if (k in self and v.__class__ == self[k].__class__): self[k].update(v, tag=tag) else: self[k] = v self[k].reset_tag(tag) self.add_tag(tag, propagate=False) return self
[docs] def has_key(self, key): return self.__contains__(key)
[docs] def __contains__(self, key): """ Returns True if this 'Map' object has the 'key'. Returns False if otherwise. """ if key is None: return False kl = Map._parse_composite_key(key) v = self for k in kl: try: v = v[k] except: return False return True
[docs] def get_value(self, key): """ Returns the value of the given 'key'. :param key: The 'key' can be a composite key (i.e., the pathway notation), such as, e.g., "key[1][2].key2.key3[2]". """ kl = Map._parse_composite_key(key) v = self for k in kl: v = v[k] return v
def _set_value_helper(self, key_index_list, value, tag): """ This is the actual implmentation of the 'set_value' function. :param key_index_list: A list returned by the 'Map._parse_composite_key' function. :param value: Same as the 'value' parameter in the 'set_value' function. :param tag: Same as the 'tag' parameter in the 'set_value' function. """ k0 = key_index_list[0] k_ = key_index_list[1:] if (k_ == []): if (isinstance(value, List)): e0 = value[0] if (isinstance(e0, Atom) and isinstance(e0.val, str)): e0 = e0.val.lower() if (e0 == "!append!"): value = self[k0] + value[1:] elif (e0 == "!remove!"): index = set() for a in value[1:]: for i, e in enumerate(self): if (e == a): index.add(i) index = list(index) index.sort(reverse=True) for i in index: del self[k0][i] value = self[k0] value.set_parent(self) value.add_tag(tag, propagate=True) self[k0] = value else: try: v = self[k0] if (isinstance(v, Atom)): raise KeyError() v._set_value_helper(k_, value, tag) except KeyError: for e in k_: if (isinstance(e, int)): raise KeyError( "cannot assign value to a non-existing list") self[k0] = Map("", self) self[k0]._set_value_helper(k_, value, tag) self.add_tag(tag, propagate=False)
[docs] def set_value(self, key, value, tag=set()): # noqa: M511 """ Associates the given value with the given key. The difference between this function and the __setitem__ operator is that the former allows us to reset the tag of the value. :param key: The 'key' can be a composite key (i.e., the pathway notation), e.g., "key[1].key2[0].key3". :param tag: If the "tag" parameter is specified, the value of 'tag' will be used to tag the 'value'. """ kl = Map._parse_composite_key(key) value = _val_filter(value) self._set_value_helper(kl, value, tag)
def _set_value_fast_helper(self, key_index_list, value, tag): k0 = key_index_list[0] k_ = key_index_list[1:] if k_ == []: self.__dict__[Key(k0)] = value value.set_parent(self) else: try: v = self[k0] if (isinstance(v, Atom)): raise KeyError() v._set_value_helper(k_, value, tag) except KeyError: for e in k_: if (isinstance(e, int)): raise KeyError( "cannot assign value to a non-existing list") self[k0] = Map("", self) self[k0]._set_value_helper(k_, value, tag) self.add_tag(tag, propagate=False)
[docs] def set_value_fast(self, key, value, tag=set()): # noqa: M511 """ Similar to `set_value` method. The difference is that if `value` is a `Sea` object the `value` object itself (as opposed to a copy) will be included into this `Map` object after this function call, as a result, the original `Sea` object `value` might be mutated as necessary. This function is much faster than `set_value`. """ kl = Map._parse_composite_key(key) if not isinstance(value, Sea): value = _val_filter(value) self._set_value_fast_helper(kl, value, tag)
def _del_key_helper(self, key_index_list): k0 = key_index_list[0] k_ = key_index_list[1:] if k_ == []: del self.__dict__[Key(k0)] else: v = self[k0] if isinstance(v, Atom): raise KeyError() v._del_key_helper(k_)
[docs] def del_key(self, key: str): """ Deletes the given key from this map. :param key: The 'key' can be a composite key in the pathway notation, e.g., "key[1].key2[0].key3". """ kl = Map._parse_composite_key(key) self._del_key_helper(kl)
def _dump_impl(self, tag): """ """ s = "" for k, v in self.key_value(tag): p, q = ( "={", "}\n", ) if (isinstance(v, Map)) else ( "=", "\n", ) s += k.orig_key() + p + v._dump_impl(tag) + q return s
[docs]def get_val(my_macro_dict, sea_object): """ - Returns values of the `sea_object` with the given macro dictionary (`macro_dict`). - `sea_object` must be a single `Sea` object or a sequence `Sea` objects. - This function does not change the global `macro_dict` object. """ orig_macro_dict = get_macro_dict() set_macro_dict(my_macro_dict) if (isinstance(sea_object, Sea)): value = sea_object.val else: value = [] for e in sea_object: value.append(e.val) set_macro_dict(orig_macro_dict) return value
def _asea_copy(src, des): """ Copies all keys and the associated values in the 'Map' object 'src' to the 'Map' object 'des', but does not change the parent and pmode of the 'des'. """ for k in src.__dict__: v = src.__dict__[k] if ((not isinstance(k, Key) or not isinstance(v, Sea)) and (k not in [ "_Sea__parent", "_Sea__pmode", ])): des.__dict__[k] = deepcopy(v) def _pathname(x): """ Given a 'Map' object 'x', returns its pathname. """ node = [] root = x parent = root.parent() node.append(root) while (parent is not None): root = parent parent = root.parent() node.append(root) node.reverse() ret = "" parent = node[0] for e in node[1:]: if (isinstance(parent, List)): n = len(parent) for i in range(n): if (e is parent[i]): ret += ("[%d]" % i) else: key_value = parent.key_value() for k, v in key_value: if (e is v): ret += (".%s" % k) parent = e return ret
[docs]def diff(x, reference): """ Returns the difference between 'x' and 'reference'. Both 'x' and 'reference' must be 'Map' objects. The difference is a 4-tuple: The first element is a 'Map' object containing the changed values in 'x', the second element is a 'Map' object containing the changed value in 'reference', the third element is a 'Map' object containing keys in x but not in 'reference', the forth element is a 'Map' object containing keys in 'reference' but not in 'x'. """ empty_key = Map() changed = Map() referred = Map() added = Map() lost = Map() x_param = x.key_value() ref_param = reference.key_value() ref = {} xxx = {} for k, v in ref_param: ref[k] = v for k, v in x_param: xxx[k] = v for k, v in x_param: if (k not in ref): added[k] = v else: r = ref[k] if (isinstance(v, Map) and isinstance(r, Map)): changed_, referred_, added_, lost_ = diff(v, r) if (changed_ != empty_key): changed[k] = changed_ if (referred_ != empty_key): referred[k] = referred_ if (added_ != empty_key): added[k] = added_ if (lost_ != empty_key): lost[k] = lost_ elif (isinstance(v, Atom) and isinstance(r, Atom)): if (v != r): changed[k] = v referred[k] = r elif (isinstance(v, List) and isinstance(r, List)): if (len(v) != len(r)): changed[k] = v referred[k] = r else: for e, er in zip(v, r): if (e != er): changed[k] = v referred[k] = r break else: changed[k] = v referred[k] = r for k, v in ref_param: if (k not in xxx): lost[k] = v return changed, referred, added, lost
[docs]def sea_filter(x, tag=set()): # noqa: M511 """ Extracts a subset of keys from the 'Map' object 'x' that has the tag. And returns a new 'Map' object containing the extracted keys. """ return Map(x.dump(tag=tag))
[docs]def is_atom_list(a): """ This function returns: True - if 'a' is a List object and all of its elements are instances of the 'Atom' class, True - if 'a' is a List object and is empty, False - if otherwise. """ if (isinstance(a, List)): for e in a: if (not isinstance(e, Atom)): return False return True return False