Source code for being.graph

"""Graphing and topological sorting."""
import collections
import itertools
from typing import ForwardRef, Generator, Hashable, Iterable, List, Optional, Sequence, Tuple


Vertex = Hashable
"""Graph vertex."""

Edge = Tuple[Vertex, Vertex]
"""Source -> destination graph edge."""

Graph = ForwardRef('Graph')


[docs]def unique_elements(iterable: Iterable[Hashable]) -> Generator[Hashable, None, None]: """Iterate over unique elements in iterable.""" seen = set() for element in itertools.filterfalse(seen.__contains__, iterable): seen.add(element) yield element
[docs]def find_back_edges(graph) -> Generator[Edge, None, None]: """Find back edges of graph. Args: graph: Input graph. Yields: Source to destination back edges. See Also: `Output of a depth-first search <https://en.wikipedia.org/wiki/Depth-first_search#Output_of_a_depth-first_search>`_ """ paths = collections.deque([v] for v in graph.vertices) visited = set() while paths: path = paths.popleft() src = path[-1] if src in visited: continue visited.add(src) for dst in reversed(graph.successors[src]): if dst in path: yield src, dst else: paths.appendleft(path + [dst])
[docs]def remove_back_edges(graph: Graph) -> Graph: """Remove back edges from graph an return new DAG. Args: graph: Input graph. Returns: DAG """ backEdges = list(find_back_edges(graph)) if not backEdges: return graph edges = list(graph.edges) for backEdge in backEdges: edges.remove(backEdge) noBackEdges = edges return Graph(graph.vertices, edges=noBackEdges)
[docs]def topological_sort(graph: Graph) -> List[Vertex]: """Topological sorting of directed graph vertices. Args: Input graph. Returns: Topological sorting. """ order = [] dag = remove_back_edges(graph) def vertex_is_ready(vertex: Vertex) -> bool: """Check if vertex is ready for insertion into topological order.""" return all(p in order for p in dag.predecessors[vertex]) queue = collections.deque(dag.vertices) while queue: vertex = queue.popleft() if vertex in order: continue successors = reversed(dag.successors[vertex]) if vertex_is_ready(vertex): order.append(vertex) queue.extendleft(successors) else: queue.extend(successors) return order
[docs]class Graph(collections.namedtuple('Graph', ['vertices', 'edges', 'successors', 'predecessors'])): """Immutable graph. Graph vertices can be any hashable Python object. An edge is a source -> destination tuple. A graph is a tuple of a vertex and a edges sets. Note: Tuples instead of sets to preserve order. Attributes: vertices (tuple): Graph vertices. edges (tuple): Graph edges. successors (collections.defaultdict): Source -> destinations relationships. For each source vertex get a list of destination vertices. predecessors (collections.defaultdict): Destination -> sources relationships. For each destination vertex get a list of source vertices. """ # __slots__ = () # ...consenting adults def __new__(cls, vertices: Optional[Iterable] = None, edges: Sequence[Edge] = None): """ Args: vertices: Initial vertices. Final vertices will be auto updated from the ones present in the edges. edges: Source -> destination edge tuples. """ vertices = list(vertices or []) edges = list(edges or []) for edge in edges: vertices.extend(edge) vertices = tuple(unique_elements(vertices)) edges = tuple(unique_elements(edges)) successors = collections.defaultdict(list) predecessors = collections.defaultdict(list) for src, dst in edges: successors[src].append(dst) predecessors[dst].append(src) return super().__new__(cls, vertices, edges, successors, predecessors) def __str__(self): return '%s(%d vertices, %d edges)' % ( type(self).__name__, len(self.vertices), len(self.edges), )