"""Negative Cycle Finder with constraints (neg_cycle_q.py)
This code implements a Negative Cycle Finder for directed graphs using Howard's
method. The purpose of this code is to detect and find negative cycles in a
directed graph. A negative cycle is a cycle in the graph where the sum of the
edge weights is negative.
The main input for this code is a directed graph, represented as a mapping of
nodes to their neighboring nodes and the edges connecting them. The graph is
passed to the NegCycleFinderQ class when it's initialized.
The output of this code is a list of cycles (if any negative cycles are found).
Each cycle is represented as a list of edges that form the negative cycle.
The code achieves its purpose through an algorithm called Howard's method, which
is a minimum cycle ratio (MCR) algorithm. It works by maintaining a set of
candidate cycles and iteratively updating them until it finds the minimum cycle
ratio or detects a negative cycle.
The main logic flow of the algorithm involves two key operations: relaxation
and cycle detection. The relaxation process updates the distances between nodes
based on the edge weights. This is done in two ways: predecessor relaxation
(relax_pred) and successor relaxation (relax_succ). The cycle detection part
(find_cycle) looks for cycles in the graph based on the current set of
predecessors or successors.
The howard_pred and howard_succ methods combine these operations. They
repeatedly perform relaxation and then check for cycles. If a negative cycle is
found, it's yielded as output.
An important data transformation happening in this code is the maintenance of
the 'dist' dictionary, which keeps track of the distances between nodes. This
dictionary is continuously updated during the relaxation process.
The code uses some advanced concepts like generic types and generator functions,
but the core idea is straightforward: it's trying to find paths in the graph
where going around in a circle results in a negative total weight, which
shouldn't happen in many real-world scenarios (like currency exchange rates).
Overall, this code provides a tool for analyzing directed graphs and finding
problematic cycles, which can be useful in various applications such as
detecting arbitrage opportunities in currency exchange or finding
inconsistencies in systems modeled as graphs.
"""
from fractions import Fraction
from typing import (
Callable,
Dict,
Generator,
Generic,
List,
Mapping,
MutableMapping,
Tuple,
TypeVar,
)
# Type variables for generic graph implementation:
# Node must be hashable (used as dictionary keys)
# Arc can be any type (but typically hashable)
# Domain must support comparison and arithmetic operations (int, Fraction, float)
Node = TypeVar("Node") # Hashable
Arc = TypeVar("Arc") # Hashable
Domain = TypeVar("Domain", int, Fraction, float) # Comparable Ring
Cycle = List[Arc] # List of Arcs
[docs]
class NegCycleFinderQ(Generic[Node, Arc, Domain]):
"""Negative Cycle Finder with constraints by Howard's method
Howard's method is a minimum cycle ratio (MCR) algorithm that uses a policy
iteration algorithm to find the minimum cycle ratio of a directed graph. The
algorithm maintains a set of candidate cycles and iteratively updates the
cycle with the minimum ratio until convergence. To detect negative cycles,
Howard's method uses a cycle detection algorithm that is based on the
Bellman-Ford relaxation algorithm. Specifically, the algorithm maintains a
predecessor graph of the original graph and performs cycle detection on this
graph using the Bellman-Ford relaxation algorithm. If a negative cycle is
detected, the algorithm terminates and returns the cycle.
The class implements both predecessor and successor versions of Howard's
algorithm, providing flexibility in how negative cycles are detected and
processed.
"""
# Predecessor dictionary: maps each node to (predecessor_node, connecting_edge)
pred: Dict[Node, Tuple[Node, Arc]]
# Successor dictionary: maps each node to (successor_node, connecting_edge)
succ: Dict[Node, Tuple[Node, Arc]]
def __init__(self, digraph: Mapping[Node, Mapping[Node, Arc]]) -> None:
"""Initialize the negative cycle finder with a directed graph.
Args:
digraph: A directed graph represented as a nested mapping:
- Outer keys: source nodes
- Inner mappings: {target_node: edge} pairs
Example: {u: {v: edge_uv, w: edge_uw}, v: {u: edge_vu}}
"""
self.digraph = digraph
self.pred: Dict[Node, Tuple[Node, Arc]] = {}
self.succ: Dict[Node, Tuple[Node, Arc]] = {}
[docs]
def find_cycle(
self, point_to: Dict[Node, Tuple[Node, Arc]]
) -> Generator[Node, None, None]:
"""Detect cycles in the current predecessor/successor graph using depth-first search.
Args:
point_to: Either self.pred or self.succ dictionary defining the graph edges
Yields:
Generator[Node, None, None]: Each node that starts a cycle in the graph
Algorithm:
1. Uses a coloring approach (white/gray/black) for cycle detection
2. White nodes are unvisited
3. Gray nodes are currently being visited in DFS stack
4. Black nodes are fully processed
5. A cycle is detected when we encounter a gray node during DFS
Examples:
>>> digraph = {
... "a0": {"a1": 7, "a2": 5},
... "a1": {"a0": 0, "a2": 3},
... "a2": {"a1": 1, "a0": 2},
... }
>>> finder = NegCycleFinderQ(digraph)
>>> for cycle in finder.find_cycle(finder.pred):
... print(cycle)
"""
visited: Dict[Node, Node] = {} # Maps nodes to their DFS root
for v_node in filter(lambda v_node: v_node not in visited, self.digraph):
u_node = v_node
while True:
visited[u_node] = v_node # Mark as visited with current DFS root
if u_node not in point_to:
break # Reached a leaf node
u_node, _ = point_to[u_node] # Move to predecessor/successor
if u_node in visited:
if visited[u_node] == v_node: # Found cycle back to current root
yield u_node
break # Cycle or different DFS tree
[docs]
def relax_pred(
self,
dist: MutableMapping[Node, Domain],
get_weight: Callable[[Arc], Domain],
update_ok: Callable[[Domain, Domain], bool],
) -> bool:
"""Perform predecessor relaxation step (Bellman-Ford style).
Args:
dist: Current distance estimates for each node
get_weight: Function to get weight of an edge
update_ok: Function to determine if distance update should be applied
Returns:
bool: True if any distances were updated, False otherwise
Note:
Updates distances based on predecessor edges (u -> v)
Implements the relaxation: if dist[v] > dist[u] + weight(u,v), update dist[v]
Examples:
>>> digraph = {
... 'a': {'b': 1, 'c': 4},
... 'b': {'c': 2},
... 'c': {'a': -5}
... }
>>> dist = {'a': 0, 'b': float('inf'), 'c': float('inf')}
>>> finder = NegCycleFinderQ(digraph)
>>> finder.relax_pred(dist, lambda edge: edge, lambda old, new: True)
True
>>> dist['b']
1
>>> dist['c']
3
"""
changed = False
for u_node, neighbors in self.digraph.items():
for v_node, edge in neighbors.items():
distance = dist[u_node] + get_weight(edge)
if dist[v_node] > distance and update_ok(dist[v_node], distance):
dist[v_node] = distance
self.pred[v_node] = (u_node, edge) # Update predecessor
changed = True
return changed
[docs]
def relax_succ(
self,
dist: MutableMapping[Node, Domain],
get_weight: Callable[[Arc], Domain],
update_ok: Callable[[Domain, Domain], bool],
) -> bool:
"""Perform successor relaxation step (reverse Bellman-Ford style).
Args:
dist: Current distance estimates for each node
get_weight: Function to get weight of an edge
update_ok: Function to determine if distance update should be applied
Returns:
bool: True if any distances were updated, False otherwise
Note:
Updates distances based on successor edges (u -> v)
Implements the relaxation: if dist[u] < dist[v] - weight(u,v), update dist[u]
Examples:
>>> digraph = {
... 'a': {'b': 1},
... 'b': {}
... }
>>> dist = {'a': 0, 'b': 5}
>>> finder = NegCycleFinderQ(digraph)
>>> finder.relax_succ(dist, lambda edge: edge, lambda old, new: True)
True
>>> dist['a']
4
"""
changed = False
for u_node, neighbors in self.digraph.items():
for v_node, edge in neighbors.items():
distance = dist[v_node] - get_weight(edge)
if dist[u_node] < distance and update_ok(dist[u_node], distance):
dist[u_node] = distance
self.succ[u_node] = (v_node, edge) # Update successor
changed = True
return changed
[docs]
def howard_pred(
self,
dist: MutableMapping[Node, Domain],
get_weight: Callable[[Arc], Domain],
update_ok: Callable[[Domain, Domain], bool],
) -> Generator[Cycle, None, None]:
"""Find negative cycles using predecessor-based Howard's algorithm.
Args:
dist: Initial distance estimates (often zero-initialized)
get_weight: Function to get weight of an edge
update_ok: Function to determine if distance updates are allowed
Yields:
Generator[Cycle, None, None]: Each negative cycle found as a list of edges
Algorithm:
1. Repeatedly relax edges using predecessor updates
2. After each relaxation, check for cycles in predecessor graph
3. If cycle found, verify if it's negative
4. Yield each negative cycle found
Examples:
>>> digraph = {
... "a0": {"a1": 7, "a2": 5},
... "a1": {"a0": 0, "a2": 3},
... "a2": {"a1": 1, "a0": 2},
... }
>>> dist = {vtx: 0 for vtx in digraph}
>>> def update_ok(dist, v) : return True
>>> finder = NegCycleFinderQ(digraph)
>>> has_neg = False
>>> for _ in finder.howard_pred(dist, lambda edge: edge, update_ok):
... has_neg = True
... break
...
>>> has_neg
False
"""
self.pred = {} # Reset predecessor graph
found = False
while not found and self.relax_pred(dist, get_weight, update_ok):
for v_node in self.find_cycle(self.pred):
# Safety check - verify the cycle is indeed negative
assert self.is_negative(v_node, dist, get_weight)
found = True
yield self.cycle_list(v_node, self.pred)
[docs]
def howard_succ(
self,
dist: MutableMapping[Node, Domain],
get_weight: Callable[[Arc], Domain],
update_ok: Callable[[Domain, Domain], bool],
) -> Generator[Cycle, None, None]:
"""Find negative cycles using successor-based Howard's algorithm.
Args:
dist: Initial distance estimates (often zero-initialized)
get_weight: Function to get weight of an edge
update_ok: Function to determine if distance updates are allowed
Yields:
Generator[Cycle, None, None]: Each negative cycle found as a list of edges
Note:
Similar to howard_pred but uses successor updates instead of predecessor
Currently skips the negative cycle verification (commented assert)
Examples:
>>> digraph = {
... "a0": {"a1": 7, "a2": 5},
... "a1": {"a0": 0, "a2": 3},
... "a2": {"a1": 1, "a0": 2},
... }
>>> def update_ok(dist, v) : return True
>>> dist = {vtx: 0 for vtx in digraph}
>>> finder = NegCycleFinderQ(digraph)
>>> has_neg = False
>>> for _ in finder.howard_succ(dist, lambda edge: edge, update_ok):
... has_neg = True
... break
...
>>> has_neg
False
"""
self.succ = {} # Reset successor graph
found = False
while not found and self.relax_succ(dist, get_weight, update_ok):
for v_node in self.find_cycle(self.succ):
# Note: Negative verification currently disabled
# assert self.is_negative(v_node, dist, get_weight)
found = True
yield self.cycle_list(v_node, self.succ)
[docs]
def cycle_list(self, handle: Node, point_to: Dict[Node, Tuple[Node, Arc]]) -> Cycle:
"""Reconstruct the cycle starting from the given node.
Args:
handle: Starting node of the cycle
point_to: Either self.pred or self.succ dictionary defining the edges
Returns:
Cycle: List of edges forming the cycle in order
Note:
Follows the predecessor/successor links until returning to starting node
Examples:
>>> digraph = {
... 'a': {'b': 'ab'},
... 'b': {'c': 'bc'},
... 'c': {'a': 'ca'}
... }
>>> finder = NegCycleFinderQ(digraph)
>>> finder.pred = {'b': ('a', 'ab'), 'c': ('b', 'bc'), 'a': ('c', 'ca')}
>>> finder.cycle_list('a', finder.pred)
['ca', 'bc', 'ab']
"""
v_node = handle
cycle = list()
while True:
u_node, edge = point_to[v_node] # Get next node and connecting edge
cycle.append(edge) # Add edge to cycle
v_node = u_node # Move to next node
if v_node == handle: # Completed the cycle
break
return cycle
[docs]
def is_negative(
self,
handle: Node,
dist: MutableMapping[Node, Domain],
get_weight: Callable[[Arc], Domain],
) -> bool:
"""Verify if the cycle starting at handle is negative.
Args:
handle: Starting node of the cycle
dist: Current distance estimates
get_weight: Function to get weight of an edge
Returns:
bool: True if the cycle is negative, False otherwise
Note:
A cycle is negative if the sum of its edge weights is negative
This is detected by finding at least one edge that violates the
triangle inequality: dist[v] > dist[u] + weight(u,v)
Examples:
>>> digraph = {
... 'a': {'b': 1},
... 'b': {'c': 1},
... 'c': {'a': -3}
... }
>>> dist = {'a': 0, 'b': 1, 'c': 2}
>>> finder = NegCycleFinderQ(digraph)
>>> finder.pred = {'b': ('a', 1), 'c': ('b', 1), 'a': ('c', -3)}
>>> finder.is_negative('a', dist, lambda edge: edge)
True
"""
v_node = handle
# C-style do-while loop
while True:
u_node, edge = self.pred[v_node]
if dist[v_node] > dist[u_node] + get_weight(edge):
return True # Found negative cycle
v_node = u_node
if v_node == handle: # Completed full cycle
break
return False