Source code for digraphx.neg_cycle_q

"""Negative Cycle Finder with constraints (neg_cycle_q.py)

This code implements a Negative Cycle Finder for directed graphs using Howard's
method. The purpose of this code is to detect and find negative cycles in a
directed graph. A negative cycle is a cycle in the graph where the sum of the
edge weights is negative.

The main input for this code is a directed graph, represented as a mapping of
nodes to their neighboring nodes and the edges connecting them. The graph is
passed to the NegCycleFinderQ class when it's initialized.

The output of this code is a list of cycles (if any negative cycles are found).
Each cycle is represented as a list of edges that form the negative cycle.

The code achieves its purpose through an algorithm called Howard's method, which
is a minimum cycle ratio (MCR) algorithm. It works by maintaining a set of
candidate cycles and iteratively updating them until it finds the minimum cycle
ratio or detects a negative cycle.

The main logic flow of the algorithm involves two key operations: relaxation
and cycle detection. The relaxation process updates the distances between nodes
based on the edge weights. This is done in two ways: predecessor relaxation
(relax_pred) and successor relaxation (relax_succ). The cycle detection part
(find_cycle) looks for cycles in the graph based on the current set of
predecessors or successors.

The howard_pred and howard_succ methods combine these operations. They
repeatedly perform relaxation and then check for cycles. If a negative cycle is
found, it's yielded as output.

An important data transformation happening in this code is the maintenance of
the 'dist' dictionary, which keeps track of the distances between nodes. This
dictionary is continuously updated during the relaxation process.

The code uses some advanced concepts like generic types and generator functions,
but the core idea is straightforward: it's trying to find paths in the graph
where going around in a circle results in a negative total weight, which
shouldn't happen in many real-world scenarios (like currency exchange rates).

Overall, this code provides a tool for analyzing directed graphs and finding
problematic cycles, which can be useful in various applications such as
detecting arbitrage opportunities in currency exchange or finding
inconsistencies in systems modeled as graphs.
"""

from fractions import Fraction
from typing import (
    Callable,
    Dict,
    Generator,
    Generic,
    List,
    Mapping,
    MutableMapping,
    Tuple,
    TypeVar,
)

# Type variables for generic graph implementation:
# Node must be hashable (used as dictionary keys)
# Arc can be any type (but typically hashable)
# Domain must support comparison and arithmetic operations (int, Fraction, float)
Node = TypeVar("Node")  # Hashable
Arc = TypeVar("Arc")  # Hashable
Domain = TypeVar("Domain", int, Fraction, float)  # Comparable Ring
Cycle = List[Arc]  # List of Arcs


[docs] class NegCycleFinderQ(Generic[Node, Arc, Domain]): """Negative Cycle Finder with constraints by Howard's method Howard's method is a minimum cycle ratio (MCR) algorithm that uses a policy iteration algorithm to find the minimum cycle ratio of a directed graph. The algorithm maintains a set of candidate cycles and iteratively updates the cycle with the minimum ratio until convergence. To detect negative cycles, Howard's method uses a cycle detection algorithm that is based on the Bellman-Ford relaxation algorithm. Specifically, the algorithm maintains a predecessor graph of the original graph and performs cycle detection on this graph using the Bellman-Ford relaxation algorithm. If a negative cycle is detected, the algorithm terminates and returns the cycle. The class implements both predecessor and successor versions of Howard's algorithm, providing flexibility in how negative cycles are detected and processed. """ # Predecessor dictionary: maps each node to (predecessor_node, connecting_edge) pred: Dict[Node, Tuple[Node, Arc]] # Successor dictionary: maps each node to (successor_node, connecting_edge) succ: Dict[Node, Tuple[Node, Arc]] def __init__(self, digraph: Mapping[Node, Mapping[Node, Arc]]) -> None: """Initialize the negative cycle finder with a directed graph. Args: digraph: A directed graph represented as a nested mapping: - Outer keys: source nodes - Inner mappings: {target_node: edge} pairs Example: {u: {v: edge_uv, w: edge_uw}, v: {u: edge_vu}} """ self.digraph = digraph self.pred: Dict[Node, Tuple[Node, Arc]] = {} self.succ: Dict[Node, Tuple[Node, Arc]] = {}
[docs] def find_cycle( self, point_to: Dict[Node, Tuple[Node, Arc]] ) -> Generator[Node, None, None]: """Detect cycles in the current predecessor/successor graph using depth-first search. Args: point_to: Either self.pred or self.succ dictionary defining the graph edges Yields: Generator[Node, None, None]: Each node that starts a cycle in the graph Algorithm: 1. Uses a coloring approach (white/gray/black) for cycle detection 2. White nodes are unvisited 3. Gray nodes are currently being visited in DFS stack 4. Black nodes are fully processed 5. A cycle is detected when we encounter a gray node during DFS Examples: >>> digraph = { ... "a0": {"a1": 7, "a2": 5}, ... "a1": {"a0": 0, "a2": 3}, ... "a2": {"a1": 1, "a0": 2}, ... } >>> finder = NegCycleFinderQ(digraph) >>> for cycle in finder.find_cycle(finder.pred): ... print(cycle) """ visited: Dict[Node, Node] = {} # Maps nodes to their DFS root for v_node in filter(lambda v_node: v_node not in visited, self.digraph): u_node = v_node while True: visited[u_node] = v_node # Mark as visited with current DFS root if u_node not in point_to: break # Reached a leaf node u_node, _ = point_to[u_node] # Move to predecessor/successor if u_node in visited: if visited[u_node] == v_node: # Found cycle back to current root yield u_node break # Cycle or different DFS tree
[docs] def relax_pred( self, dist: MutableMapping[Node, Domain], get_weight: Callable[[Arc], Domain], update_ok: Callable[[Domain, Domain], bool], ) -> bool: """Perform predecessor relaxation step (Bellman-Ford style). Args: dist: Current distance estimates for each node get_weight: Function to get weight of an edge update_ok: Function to determine if distance update should be applied Returns: bool: True if any distances were updated, False otherwise Note: Updates distances based on predecessor edges (u -> v) Implements the relaxation: if dist[v] > dist[u] + weight(u,v), update dist[v] Examples: >>> digraph = { ... 'a': {'b': 1, 'c': 4}, ... 'b': {'c': 2}, ... 'c': {'a': -5} ... } >>> dist = {'a': 0, 'b': float('inf'), 'c': float('inf')} >>> finder = NegCycleFinderQ(digraph) >>> finder.relax_pred(dist, lambda edge: edge, lambda old, new: True) True >>> dist['b'] 1 >>> dist['c'] 3 """ changed = False for u_node, neighbors in self.digraph.items(): for v_node, edge in neighbors.items(): distance = dist[u_node] + get_weight(edge) if dist[v_node] > distance and update_ok(dist[v_node], distance): dist[v_node] = distance self.pred[v_node] = (u_node, edge) # Update predecessor changed = True return changed
[docs] def relax_succ( self, dist: MutableMapping[Node, Domain], get_weight: Callable[[Arc], Domain], update_ok: Callable[[Domain, Domain], bool], ) -> bool: """Perform successor relaxation step (reverse Bellman-Ford style). Args: dist: Current distance estimates for each node get_weight: Function to get weight of an edge update_ok: Function to determine if distance update should be applied Returns: bool: True if any distances were updated, False otherwise Note: Updates distances based on successor edges (u -> v) Implements the relaxation: if dist[u] < dist[v] - weight(u,v), update dist[u] Examples: >>> digraph = { ... 'a': {'b': 1}, ... 'b': {} ... } >>> dist = {'a': 0, 'b': 5} >>> finder = NegCycleFinderQ(digraph) >>> finder.relax_succ(dist, lambda edge: edge, lambda old, new: True) True >>> dist['a'] 4 """ changed = False for u_node, neighbors in self.digraph.items(): for v_node, edge in neighbors.items(): distance = dist[v_node] - get_weight(edge) if dist[u_node] < distance and update_ok(dist[u_node], distance): dist[u_node] = distance self.succ[u_node] = (v_node, edge) # Update successor changed = True return changed
[docs] def howard_pred( self, dist: MutableMapping[Node, Domain], get_weight: Callable[[Arc], Domain], update_ok: Callable[[Domain, Domain], bool], ) -> Generator[Cycle, None, None]: """Find negative cycles using predecessor-based Howard's algorithm. Args: dist: Initial distance estimates (often zero-initialized) get_weight: Function to get weight of an edge update_ok: Function to determine if distance updates are allowed Yields: Generator[Cycle, None, None]: Each negative cycle found as a list of edges Algorithm: 1. Repeatedly relax edges using predecessor updates 2. After each relaxation, check for cycles in predecessor graph 3. If cycle found, verify if it's negative 4. Yield each negative cycle found Examples: >>> digraph = { ... "a0": {"a1": 7, "a2": 5}, ... "a1": {"a0": 0, "a2": 3}, ... "a2": {"a1": 1, "a0": 2}, ... } >>> dist = {vtx: 0 for vtx in digraph} >>> def update_ok(dist, v) : return True >>> finder = NegCycleFinderQ(digraph) >>> has_neg = False >>> for _ in finder.howard_pred(dist, lambda edge: edge, update_ok): ... has_neg = True ... break ... >>> has_neg False """ self.pred = {} # Reset predecessor graph found = False while not found and self.relax_pred(dist, get_weight, update_ok): for v_node in self.find_cycle(self.pred): # Safety check - verify the cycle is indeed negative assert self.is_negative(v_node, dist, get_weight) found = True yield self.cycle_list(v_node, self.pred)
[docs] def howard_succ( self, dist: MutableMapping[Node, Domain], get_weight: Callable[[Arc], Domain], update_ok: Callable[[Domain, Domain], bool], ) -> Generator[Cycle, None, None]: """Find negative cycles using successor-based Howard's algorithm. Args: dist: Initial distance estimates (often zero-initialized) get_weight: Function to get weight of an edge update_ok: Function to determine if distance updates are allowed Yields: Generator[Cycle, None, None]: Each negative cycle found as a list of edges Note: Similar to howard_pred but uses successor updates instead of predecessor Currently skips the negative cycle verification (commented assert) Examples: >>> digraph = { ... "a0": {"a1": 7, "a2": 5}, ... "a1": {"a0": 0, "a2": 3}, ... "a2": {"a1": 1, "a0": 2}, ... } >>> def update_ok(dist, v) : return True >>> dist = {vtx: 0 for vtx in digraph} >>> finder = NegCycleFinderQ(digraph) >>> has_neg = False >>> for _ in finder.howard_succ(dist, lambda edge: edge, update_ok): ... has_neg = True ... break ... >>> has_neg False """ self.succ = {} # Reset successor graph found = False while not found and self.relax_succ(dist, get_weight, update_ok): for v_node in self.find_cycle(self.succ): # Note: Negative verification currently disabled # assert self.is_negative(v_node, dist, get_weight) found = True yield self.cycle_list(v_node, self.succ)
[docs] def cycle_list(self, handle: Node, point_to: Dict[Node, Tuple[Node, Arc]]) -> Cycle: """Reconstruct the cycle starting from the given node. Args: handle: Starting node of the cycle point_to: Either self.pred or self.succ dictionary defining the edges Returns: Cycle: List of edges forming the cycle in order Note: Follows the predecessor/successor links until returning to starting node Examples: >>> digraph = { ... 'a': {'b': 'ab'}, ... 'b': {'c': 'bc'}, ... 'c': {'a': 'ca'} ... } >>> finder = NegCycleFinderQ(digraph) >>> finder.pred = {'b': ('a', 'ab'), 'c': ('b', 'bc'), 'a': ('c', 'ca')} >>> finder.cycle_list('a', finder.pred) ['ca', 'bc', 'ab'] """ v_node = handle cycle = list() while True: u_node, edge = point_to[v_node] # Get next node and connecting edge cycle.append(edge) # Add edge to cycle v_node = u_node # Move to next node if v_node == handle: # Completed the cycle break return cycle
[docs] def is_negative( self, handle: Node, dist: MutableMapping[Node, Domain], get_weight: Callable[[Arc], Domain], ) -> bool: """Verify if the cycle starting at handle is negative. Args: handle: Starting node of the cycle dist: Current distance estimates get_weight: Function to get weight of an edge Returns: bool: True if the cycle is negative, False otherwise Note: A cycle is negative if the sum of its edge weights is negative This is detected by finding at least one edge that violates the triangle inequality: dist[v] > dist[u] + weight(u,v) Examples: >>> digraph = { ... 'a': {'b': 1}, ... 'b': {'c': 1}, ... 'c': {'a': -3} ... } >>> dist = {'a': 0, 'b': 1, 'c': 2} >>> finder = NegCycleFinderQ(digraph) >>> finder.pred = {'b': ('a', 1), 'c': ('b', 1), 'a': ('c', -3)} >>> finder.is_negative('a', dist, lambda edge: edge) True """ v_node = handle # C-style do-while loop while True: u_node, edge = self.pred[v_node] if dist[v_node] > dist[u_node] + get_weight(edge): return True # Found negative cycle v_node = u_node if v_node == handle: # Completed full cycle break return False