# Source code for digraphx.neg_cycle_q

```
"""
Negative cycle detection for directed graphs.
1. Based on Howard's policy graph algorithm
2. Looking for more than one negative cycle
"""
from fractions import Fraction
from typing import (
Callable,
Dict,
Generator,
Generic,
List,
Mapping,
MutableMapping,
Tuple,
TypeVar,
)
Node = TypeVar("Node") # Hashable
Edge = TypeVar("Edge") # Hashable
Domain = TypeVar("Domain", int, Fraction, float) # Comparable Ring
Cycle = List[Edge] # List of Edges
# The `NegCycleFinder` class implements Howard's method, a minimum cycle ratio algorithm, to find
# negative cycles in a directed graph.
[docs]
class NegCycleFinder(Generic[Node, Edge, Domain]):
"""Negative Cycle Finder by Howard's method
Howard's method is a minimum cycle ratio (MCR) algorithm that uses a policy
iteration algorithm to find the minimum cycle ratio of a directed graph. The
algorithm maintains a set of candidate cycles and iteratively updates the
cycle with the minimum ratio until convergence. To detect negative cycles,
Howard's method uses a cycle detection algorithm that is based on the
Bellman-Ford relaxation algorithm. Specifically, the algorithm maintains a
predecessor graph of the original graph and performs cycle detection on this
graph using the Bellman-Ford relaxation algorithm. If a negative cycle is
detected, the algorithm terminates and returns the cycle.
"""
pred: Dict[Node, Tuple[Node, Edge]] = {}
succ: Dict[Node, Tuple[Node, Edge]] = {}
def __init__(self, digraph: Mapping[Node, Mapping[Node, Edge]]) -> None:
"""
The function initializes a graph object with an adjacency list.
:param digraph: The parameter `digraph` is a mapping that represents an adjacency list. It is a
dictionary-like object where the keys are nodes and the values are mappings of nodes to edges. Each
edge represents a connection between two nodes in a directed graph
:type digraph: Mapping[Node, Mapping[Node, Edge]]
"""
self.digraph = digraph
[docs]
def find_cycle(self, point_to) -> Generator[Node, None, None]:
"""
The `find_cycle` function is used to find a cycle in a policy graph and yields the start node of the
cycle.
:param point_to: The `point_to` parameter is a dictionary that represents the edges of a directed
graph. Each key-value pair in the dictionary represents an edge from the key vertex to the value
vertex
Yields:
Generator[Node, None, None]: a start node of the cycle
Examples:
>>> digraph = {
... "a0": {"a1": 7, "a2": 5},
... "a1": {"a0": 0, "a2": 3},
... "a2": {"a1": 1, "a0": 2},
... }
>>> finder = NegCycleFinder(digraph)
>>> for cycle in finder.find_cycle(finder.pred):
... print(cycle)
"""
visited: Dict[Node, Node] = {}
for vtx in filter(lambda vtx: vtx not in visited, self.digraph):
utx = vtx
while True:
visited[utx] = vtx
if utx not in point_to:
break
utx, _ = point_to[utx]
if utx in visited:
if visited[utx] == vtx:
yield utx
break
[docs]
def relax_pred(
self,
dist: MutableMapping[Node, Domain],
get_weight: Callable[[Edge], Domain],
update_ok: Callable[[Domain, Domain], bool],
) -> bool:
"""
The `relax_pred` function updates the `dist` and `pred` dictionaries based on the current distances and
weights of edges in a graph.
:param dist: `dist` is a mutable mapping that represents the current distances from a source node to
all other nodes in a graph. It is a mapping from nodes to their corresponding distances
:type dist: MutableMapping[Node, Domain]
:param get_weight: The `get_weight` parameter is a callable function that takes an `Edge` object as
input and returns a value of type `Domain`. This function is used to calculate the weight or cost
associated with an edge in the graph
:type get_weight: Callable[[Edge], Domain]
:param update_ok: The `update_ok` parameter is a function that determines whether an update to the
distance `dist[vtx_v]` is allowed. It takes two arguments: the current value of `dist[vtx_v]` and
the new value `d`. It should return `True` if the update is
:return: a boolean value indicating whether any changes were made to the `dist` mapping and `pred` dictionary.
"""
changed = False
for utx, neighbors in self.digraph.items():
for vtx, edge in neighbors.items():
distance = dist[utx] + get_weight(edge)
if dist[vtx] > distance and update_ok(dist[vtx], distance):
dist[vtx] = distance
self.pred[vtx] = (utx, edge)
changed = True
return changed
[docs]
def relax_succ(
self,
dist: MutableMapping[Node, Domain],
get_weight: Callable[[Edge], Domain],
update_ok: Callable[[Domain, Domain], bool],
) -> bool:
"""
The `relax_succ` function updates the `dist` and `succ` dictionaries based on the current distances and
weights of edges in a graph.
:param dist: `dist` is a mutable mapping that represents the current distances from a source node to
all other nodes in a graph. It is a mapping from nodes to their corresponding distances
:type dist: MutableMapping[Node, Domain]
:param get_weight: The `get_weight` parameter is a callable function that takes an `Edge` object as
input and returns a value of type `Domain`. This function is used to calculate the weight or cost
associated with an edge in the graph
:type get_weight: Callable[[Edge], Domain]
:param update_ok: The `update_ok` parameter is a function that determines whether an update to the
distance `dist[vtx_v]` is allowed. It takes two arguments: the current value of `dist[vtx_v]` and
the new value `d`. It should return `True` if the update is
:return: a boolean value indicating whether any changes were made to the `dist` mapping and `pred` dictionary.
"""
changed = False
for utx, neighbors in self.digraph.items():
for vtx, edge in neighbors.items():
distance = dist[vtx] - get_weight(edge)
if dist[utx] < distance and update_ok(dist[utx], distance):
dist[utx] = distance
self.succ[utx] = (vtx, edge)
changed = True
return changed
[docs]
def howard_pred(
self,
dist: MutableMapping[Node, Domain],
get_weight: Callable[[Edge], Domain],
update_ok: Callable[[Domain, Domain], bool],
) -> Generator[Cycle, None, None]:
"""
The `howard_pred` function finds negative cycles in a graph and yields a list of cycles.
:param dist: `dist` is a mutable mapping that maps each node in the graph to a domain value. The
domain value represents the distance or cost from the source node to that particular node
:type dist: MutableMapping[Node, Domain]
:param get_weight: The `get_weight` parameter is a callable function that takes an `Edge` object as
input and returns the weight of that edge
:type get_weight: Callable[[Edge], Domain]
:param update_ok: The `update_ok` parameter is a callable function that determines whether an update
to the distance value of a vertex is allowed. It takes in three arguments: the current distance
value of the vertex, the weight of the edge being considered for update, and the current distance
value of the vertex at the other
Examples:
>>> digraph = {
... "a0": {"a1": 7, "a2": 5},
... "a1": {"a0": 0, "a2": 3},
... "a2": {"a1": 1, "a0": 2},
... }
>>> dist = {vtx: 0 for vtx in digraph}
>>> def update_ok(dist, v) : return True
>>> finder = NegCycleFinder(digraph)
>>> has_neg = False
>>> for _ in finder.howard_pred(dist, lambda edge: edge, update_ok):
... has_neg = True
... break
...
>>> has_neg
False
"""
self.pred = {}
found = False
while not found and self.relax_pred(dist, get_weight, update_ok):
for vtx in self.find_cycle(self.pred):
# Will zero cycle be found???
assert self.is_negative(vtx, dist, get_weight)
found = True
yield self.cycle_list(vtx, self.pred)
[docs]
def howard_succ(
self,
dist: MutableMapping[Node, Domain],
get_weight: Callable[[Edge], Domain],
update_ok: Callable[[Domain, Domain], bool],
) -> Generator[Cycle, None, None]:
"""
The `howard_succ` function finds negative cycles in a graph and yields a list of cycles.
:param dist: `dist` is a mutable mapping that maps each node in the graph to a domain value. The
domain value represents the distance or cost from the source node to that particular node
:type dist: MutableMapping[Node, Domain]
:param get_weight: The `get_weight` parameter is a callable function that takes an `Edge` object as
input and returns the weight of that edge
:type get_weight: Callable[[Edge], Domain]
:param update_ok: The `update_ok` parameter is a callable function that determines whether an update
to the distance value of a vertex is allowed. It takes in three arguments: the current distance
value of the vertex, the weight of the edge being considered for update, and the current distance
value of the vertex at the other
Examples:
>>> digraph = {
... "a0": {"a1": 7, "a2": 5},
... "a1": {"a0": 0, "a2": 3},
... "a2": {"a1": 1, "a0": 2},
... }
>>> def update_ok(dist, v) : return True
>>> dist = {vtx: 0 for vtx in digraph}
>>> finder = NegCycleFinder(digraph)
>>> has_neg = False
>>> for _ in finder.howard_succ(dist, lambda edge: edge, update_ok):
... has_neg = True
... break
...
>>> has_neg
False
"""
self.succ = {}
found = False
while not found and self.relax_succ(dist, get_weight, update_ok):
for vtx in self.find_cycle(self.succ):
# Will zero cycle be found???
# assert self.is_negative(vtx, dist, get_weight)
found = True
yield self.cycle_list(vtx, self.succ)
[docs]
def cycle_list(self, handle: Node, point_to) -> Cycle:
"""
The `cycle_list` function returns a list of edges that form a cycle in a graph, starting from a
given node.
:param handle: The `handle` parameter is a reference to a node in a graph. It represents the
starting point of the cycle in the list
:type handle: Node
:param point_to: point_to is a dictionary that maps each graph node to the node it points to
:return: a list of edges, which represents a cycle in a graph.
"""
vtx = handle
cycle = list()
while True:
utx, edge = point_to[vtx]
cycle.append(edge)
vtx = utx
if vtx == handle:
break
return cycle
[docs]
def is_negative(
self,
handle: Node,
dist: MutableMapping[Node, Domain],
get_weight: Callable[[Edge], Domain],
) -> bool:
"""
The `is_negative` function checks if a cycle list is negative by comparing the distances between
nodes and the weights of the edges.
:param handle: The `handle` parameter is a `Node` object that represents a vertex in a graph. It is
used as a starting point to check for negative cycles in the graph
:type handle: Node
:param dist: `dist` is a mutable mapping that maps each node to its corresponding domain value. The
domain value represents the distance from the starting node to the current node in a graph
:type dist: MutableMapping[Node, Domain]
:param get_weight: The `get_weight` parameter is a callable function that takes an `Edge` object as
input and returns the weight of that edge
:type get_weight: Callable[[Edge], Domain]
:return: a boolean value.
"""
vtx = handle
# do while loop in C++
while True:
utx, edge = self.pred[vtx]
if dist[vtx] > dist[utx] + get_weight(edge):
return True
vtx = utx
if vtx == handle:
break
return False
```