Source code for schrodinger.application.matsci.graph

"""
Module containing methods applied on networkx graph

Copyright Schrodinger, LLC. All rights reserved.
"""

from itertools import combinations

import networkx

BFS_SEARCH = networkx.algorithms.traversal.breadth_first_search
MAX_CHECKS = 10000


[docs]class NetworkLoopError(Exception): """Raised when the whole network is infinite""" pass
[docs]def get_sub_graphs(graph): """ Generator of the disconnect graphs in the passed graph. :type graph: 'networkx.classes.graph.Graph' :param graph: The graph to find subgraphs in :rtype: 'networkx.classes.graph.Graph' :return: subgraph of the graph """ for gindex in networkx.connected_components(graph): yield graph.subgraph(gindex).copy()
[docs]def get_fused_cycles(graph): """ Return cycles (fused and non-fused) in the graph :type graph: 'networkx.classes.graph.Graph' :param graph: The graph to find cycles in :rtype: list(set) :return: list of sets of nodes that make each cycle """ # Get simple cycles cycles = list(set(x) for x in networkx.cycle_basis(graph)) # No cycles found if not cycles: return cycles # Fuse cycles while True: for cid1, cid2 in combinations(range(len(cycles)), 2): if not cycles[cid1].isdisjoint(cycles[cid2]): fused_ring = cycles[cid1].union(cycles[cid2]) new_rings = [ cycles[x] for x in range(len(cycles)) if x not in [cid1, cid2] ] new_rings.append(fused_ring) cycles = new_rings break else: break return cycles
[docs]def get_sorted_shortest_path(graph, end_nodes, max_fix=False): """ Get the shortest path with lowest indexes. Networkx does not handle degeneracy due to cycles properly, so using all_shortest_paths to compare each path :type graph: 'networkx.classes.graph.Graph' :param graph: The graph to calculate the shortest path in :param end_nodes: The start and end nodes :type end_nodes: list :param max_fix: Whether to fix internal degeneracy when maximum number of path checks are reached :type max_fix: bool :returns: The sorted shortest path. :rtype: list of node index for shortest ordered path between the end nodes """ all_paths = networkx.all_shortest_paths(graph, source=end_nodes[0], target=end_nodes[-1]) sorted_path = [] for count, path in enumerate(all_paths): if count > MAX_CHECKS: # The number of paths increase exponentially with number of rings, # at this point its better to fix the degeneracy locally or # accept the degeneracy if max_fix: sorted_path = _fix_ring_path(graph, sorted_path) break # Update if current path is less than backbone path, if yes update # it as backbone path. The less than operator on path (list type) checks # for sorting (https://docs.python.org/3/tutorial/datastructures.html # #comparing-sequences-and-other-types). Also update current path as # backbone path if backbone path is empty if not sorted_path or path < sorted_path: sorted_path = path return sorted_path
[docs]def break_infinite_segment(graph, end_nodes): """ Break a infinite loop segment to make it finite :type graph: 'networkx.classes.graph.Graph' :param graph: The graph to make finite :param end_nodes: The start and end nodes :type end_nodes: list :raises NetworkLoopError: If the segment cannot be made finite """ backbone_path = networkx.shortest_path(graph, source=end_nodes[0], target=end_nodes[-1]) # Loop through backbone path edges to find the edge which will break the # loop for index in range(len(backbone_path) - 1): # Remove the edge main_ring_node = [backbone_path[index + x] for x in range(2)] graph.remove_edge(*main_ring_node) # Breaking the edge can cause no change if the edge is part of an # internal loop. Check if graph is still an fused loop cycles = get_fused_cycles(graph) if cycles: max_cycle = max([len(x) for x in cycles]) else: max_cycle = 0 is_fully_fused = max_cycle < len(backbone_path) # Breaking the edge does not result into splitting of graph did_not_split = len(list(get_sub_graphs(graph))) == 1 if (is_fully_fused and did_not_split): return # Reform the edge since it was not the right edge to break graph.add_edge(*main_ring_node) else: # Raise error if the loop cannot be broken raise NetworkLoopError()
def _fix_ring_path(graph, backbone_path): """ Backbone path can have degenerate results if cycle nodes are part of it. This method fixes the issue by selecting the path with lowest indexes :type graph: 'networkx.classes.graph.Graph' :param graph: The graph to fix the ring path in :type backbone_path: list :param backbone_path: list of nodes in the longest path in the graph, with no preference in case of nodes that are path of the cycle :rtype: list :return: list of nodes in the longest path in the graph. Between the two end node, the node with lower index is considered as the first element of the list and the other as the last. If cycles nodes (rings) are part of the path then among the degenerate paths, path containing lower indexes (sorted) is selected. """ cycles = get_fused_cycles(graph) if not cycles: return backbone_path max_cycle = max([len(x) for x in cycles]) # Largest cycle will be larger than the backbone only when the whole # segment is a ring (infinite chain) if max_cycle > len(backbone_path): raise NetworkLoopError() # Create cycle associations for faster checks node_to_cid = {} for cid, cycle in enumerate(cycles, 1): for cycle_node in cycle: node_to_cid[cycle_node] = cid # Generate new sorted backbone path from current backbone path sorted_path = [] ring_path = [] last_ring = None for node in backbone_path: current_ring = node_to_cid.get(node, None) if not last_ring and not current_ring: # Add to the sorted path when last atom and current atom is linear sorted_path.append(node) elif last_ring == current_ring or (not last_ring and current_ring): # Add to ring path when last atom and current atom are part of # same ring or when a new ring start ring_path.append(node) elif last_ring != current_ring and ring_path: # When last atom and current atom are not of same ring, find the # local sorted path for the ring path and add to the sorted path ring_path = get_sorted_shortest_path(graph, [ring_path[0], ring_path[-1]]) sorted_path.extend(ring_path) # Reset ring path and add the current node to the sorted path ring_path = [] sorted_path.append(node) if current_ring: # Two rings are connected to each other but are not fused last_ring = current_ring else: # This should never happen, but good to have a check for it raise ValueError(f'Node {node} does not meet any conditions.') last_ring = current_ring return sorted_path
[docs]def find_backbone(graph, prefer_indexes=None, max_fix=True): """ Find the shortest path between atoms that are furthest apart :type graph: 'networkx.classes.graph.Graph' :param graph: The graph to find longest path in :param set prefer_indexes: A list of preferred atom indexes to choose for the head and tail of the backbone. *For paths of equal lengths*, the chosen path will start and end with atoms in the prefer_indexes set if one is available. Can be used, for instance, to prefer backbones that start and end with hydrogen atoms. :param max_fix: Whether to fix internal degeneracy when maximum number of path checks are reached :type max_fix: bool :rtype: list :return: list of nodes in the longest path in the graph. Between the two end node, the node with lower index is considered as the first element of the list and the other as the last. In case of degeneracy due to cycles nodes in the path, the shortest path containing lowest indexes is selected. """ # Get all nodes nodes = sorted(graph.nodes()) # Return empty path if backbone does not have atleast 2 atoms if len(nodes) < 2: return [] # First select lowest index node in the graph and find the # farthest node from it using bfs. This will be one of the end nodes # then find the other end node by finding the farthest node from the # first end node using bfs. The method is described in # "https://stackoverflow.com/questions/20010472/proof-of-correctness- # algorithm-for-diameter-of-a-tree-in-graph-theory" source_node = nodes[0] end_nodes = [] for _ in range(2): # Transverse the graph using breadth first search, and get the ordered # dict where the key is the source node and item is the list of next # nodes bfs_nodes_dict = BFS_SEARCH.bfs_successors(graph, source=source_node) # Convert the bfs nodes dict to list to get the last key source node # and its values as end_targets end_source, end_targets = list(bfs_nodes_dict)[-1] # Get the last node, with largest atom index if prefer_indexes: # If any of the end_targets is preferred, remove all non-preferred # targets preferred = prefer_indexes.intersection(end_targets) if preferred: end_targets = list(preferred) source_node = sorted(end_targets)[-1] end_nodes.append(source_node) # Sort end atoms indexes to make backbone path always go from # lower to higher node end_nodes.sort() try: backbone_path = get_sorted_shortest_path(graph, end_nodes, max_fix) except NetworkLoopError: break_infinite_segment(graph, end_nodes) backbone_path = find_backbone(graph) return backbone_path
[docs]def find_segments(graph): """ Find the segments in the input graph. Segments are path containing nodes where no node appears in a segment twice. Longest segments are always created first and then recursively smaller ones. :type graph: 'networkx.classes.graph.Graph' :param graph: The graph to find segments in :rtype: list(list) :return: list of list of nodes. Each sublist is a list of ordered nodes, starting from lower index of the two end nodes, forming the longest path graph not containing the nodes already considered. """ segments = [] # Loop over all disconnected graphs for sub_graph in get_sub_graphs(graph): # Search for segment only if graph contains more than 1 node if len(set(sub_graph.nodes())) > 1: # Find the backbone for the graph, do not add to segment if no # backbone is found backbone = find_backbone(sub_graph) # Check if backbone was found and continue to next graph it was not if not backbone: continue # Add the backbone to segments segments.append(backbone) # Create a copy of graph with backbone removed new_graph = sub_graph.copy() for backbone_node in backbone: new_graph.remove_node(backbone_node) # Find child segments of the backbone removed graph recursively child_segments = find_segments(new_graph) for child_chain in child_segments: # Check if there were any segments (backbones) in the child if child_chain: segments.append(child_chain) return segments