Graph Isomorphisms
This module provides functionality for checking graph isomorphisms between hypergraphs, particularly for monogamous, cartesian (MC) hypergraphs.
Overview
Graph isomorphism checking determines whether two graphs are structurally identical, meaning there exists a bijection between their nodes and edges that preserves the graph structure.
Classes
Isomorphism
Isomorphism
dataclass
Class to check isomorphism between two hypergraphs.
Source code in src/IsomorphismChecker_python_serial/isomorphisms.py
| @dataclass(slots=True)
class Isomorphism:
"""Class to check isomorphism between two hypergraphs."""
graphs: tuple[OpenHypergraph, OpenHypergraph]
node_mapping: list[int] = field(init=False)
edge_mapping: list[int] = field(init=False)
mapping_valid: bool = field(init=False)
visited_nodes: list[int] = field(default_factory=list, init=False)
visited_edges: list[int] = field(default_factory=list, init=False)
@property
def n_nodes(self) -> int:
"""Return the number of nodes in the graphs."""
return len(self.graphs[0].nodes)
@property
def n_edges(self) -> int:
"""Return the number of edges in the graphs."""
return len(self.graphs[0].edges)
def __post_init__(self):
"""Post-initialization checks for isomorphism. Includes basic size checks."""
g1, g2 = self.graphs
if not (
len(g1.nodes) == len(g2.nodes)
and len(g1.input_nodes) == len(g2.input_nodes)
and len(g1.output_nodes) == len(g2.output_nodes)
and len(g1.edges) == len(g2.edges)
):
self.mapping_valid = False
self.node_mapping = []
self.edge_mapping = []
else:
self.node_mapping = [-1] * self.n_nodes
self.edge_mapping = [-1] * self.n_edges
self.mapping_valid = True
def update_mapping(self, i: int, j: int, mode: MappingMode):
"""Update the mapping p with i -> j, Sets is_isomorphic to False if inconsistent"""
if mode == MappingMode.NODE:
mapping = self.node_mapping
elif mode == MappingMode.EDGE:
mapping = self.edge_mapping
else:
raise ValueError(f"Mode must be 'node' or 'edge', got {mode}")
logger.debug(f"Current mapping {mode}: {mapping}")
logger.debug(f"Updating mapping {mode}: {i} -> {j}")
if i < 0 or i >= len(mapping):
raise ValueError(
f"Index {i} out of bounds for permutation of size {len(mapping)}"
)
if j < 0 or j >= len(mapping):
raise ValueError(
f"Index {j} out of bounds for permutation of size {len(mapping)}"
)
if mapping[i] == j:
return
elif mapping[i] == -1:
if j in mapping:
# Another node is already mapped to j
self.mapping_valid = False
logger.debug(
f"Contradiction mapping {mode} {i} -> {j}; existing mapping {mapping.index(j)} -> {j}"
)
else:
mapping[i] = j
logger.debug(f"Mapping {mode} {i} -> {j}")
else:
self.mapping_valid = False
def update_mapping_list(
self, list1: list[int], list2: list[int], mode: MappingMode
):
"""Update the mapping p with i -> j for all i in list1 and j in list2, Sets is_isomorphic to False if inconsistent"""
if len(list1) != len(list2):
raise ValueError(
f"Lists must be of same length, got {len(list1)} and {len(list2)}"
)
for i, j in zip(list1, list2):
if self.mapping_valid:
self.update_mapping(i, j, mode)
def check_edge_compatibility(self, e1: int | None, e2: int | None) -> bool:
"""Checks the length of sources and targets and labels of two edges for compatibility"""
logger.debug(f"Check equality: e1={e1}, e2={e2}")
if e1 is None and e2 is None:
return True
if e1 is None or e2 is None:
return False
edge1 = self.graphs[0].edges[e1]
edge2 = self.graphs[1].edges[e2]
return (
(edge1.label == edge2.label)
and (len(edge1.sources) == len(edge2.sources))
and (len(edge1.targets) == len(edge2.targets))
)
def explore_edges(self, e1: int, e2: int):
"""Explore edges e1 and e2, updating mappings and traversing connected nodes."""
print(f"Traverse from edges {e1, e2}")
logger.debug(f"Exploring edges {e1, e2}")
if e1 in self.visited_edges:
self.mapping_valid = (
False if self.edge_mapping[e1] != e2 else self.mapping_valid
)
return
if not self.check_edge_compatibility(e1, e2):
self.mapping_valid = False
return
self.visited_edges.append(e1)
logger.debug(f"Mapping edge {e1} -> {e2}")
logger.debug(f"Currently is_isomorphic = {self.mapping_valid}")
self.update_mapping(e1, e2, MappingMode.EDGE)
logger.debug(f"Currently is_isomorphic = {self.mapping_valid}")
logger.debug("Checkpoint 1")
if not self.mapping_valid:
return False
logger.debug("Checkpoint 2")
# Commented out: logger.debug(g1.edges[e1], g2.edges[e2])
logger.debug(f"Edges: {self.graphs[0].edges[e1]}, {self.graphs[1].edges[e2]}")
n_sources = len(self.graphs[0].edges[e1].sources)
n_targets = len(self.graphs[0].edges[e1].targets)
logger.debug(f"n_sources, n_targets = {n_sources, n_targets}")
# check sources
for s in range(n_sources):
s1 = self.graphs[0].edges[e1].sources[s]
s2 = self.graphs[1].edges[e2].sources[s]
v1, v2 = self.graphs[0].nodes[s1], self.graphs[1].nodes[s2]
logger.debug(f"Prev nodes = {v1, v2}")
self.update_mapping(v1.index, v2.index, mode=MappingMode.NODE)
if self.mapping_valid:
self.traverse_from_nodes(v1, v2)
else:
return
# check targets
for t in range(n_targets):
t1 = self.graphs[0].edges[e1].targets[t]
t2 = self.graphs[1].edges[e2].targets[t]
v1, v2 = self.graphs[0].nodes[t1], self.graphs[1].nodes[t2]
logger.debug(f"Prev nodes = {v1, v2}")
self.update_mapping(v1.index, v2.index, mode=MappingMode.NODE)
if self.mapping_valid:
self.traverse_from_nodes(v1, v2)
else:
return
def traverse_from_nodes(self, v1: Node, v2: Node):
"""Traverse the graph from nodes v1 and v2, exploring connected edges and nodes."""
print(f"Traverse from nodes {v1, v2}")
logger.debug(f"Traversing {v1, v2}")
if v1.index in self.visited_nodes:
if v2.index != self.node_mapping[v1.index]:
self.mapping_valid = False
return
if v1.label != v2.label:
self.mapping_valid = False
return
self.visited_nodes.append(v1.index)
print("check splitting")
# nodes require the same amount of splitting in each directoin
if len(v1.next) != len(v2.next) or (len(v1.prev) != len(v2.prev)):
self.mapping_valid = False
return
# num_nexts = len(v1.next)
# matching_paths: list[list[int]] = [[] for i in range(num_nexts)]
# print(f"Nexts = {v1.next, v2.next}")
# eliminated_paths = {i: False for i in range(num_nexts)}
if len(v1.next) > 1:
raise RuntimeError("Branching paths algorithm in progress")
# for i, next1 in enumerate(v1.next):
# # attempt to find a matching path in g2
# for j, next2 in enumerate(v2.next):
# valid_pair = self.check_edges_for_continuation(next1, next2)
# print(f"Validity: {i, j, valid_pair}")
# if valid_pair:
# print(f"Update matching paths {i, j, matching_paths}")
# matching_paths[i].append(j)
# print(f"New matchin paths {matching_paths}")
# # sort the potential paths to eliminate the simplest decisions first
# matching_paths.sort(key=lambda x: len(x))
# print(matching_paths)
# for i, js in enumerate(matching_paths):
# if len(js) == 0:
# self.mapping_valid = False
# return
# elif len(js) == 1:
# j = js[0]
# self.explore_edges(v1.next[i].index, v2.next[j].index)
# else: # explore possible paths
# # placeholder to allow for rollback: we don't want to do a huge copy every branch!
# print("ended up here")
# # node_map_copy = self.node_mapping.copy()
# # edge_map_copy = self.edge_mapping.copy()
# assert self.mapping_valid
# explore paths starting with the minimal branching
else:
next1, next2 = head(v1.next), head(v2.next)
if self.check_edges_for_continuation(next1, next2):
self.explore_edges(next1.index, next2.index)
prev1, prev2 = head(v1.prev), head(v2.prev)
if self.check_edges_for_continuation(prev1, prev2):
self.explore_edges(prev1.index, prev2.index)
def check_edges_for_continuation(self, next1, next2):
if next1 is None or next2 is None:
print(f"ONe of them is none: {next1, next2}")
if next1 != next2:
print(f"Only one of them was none: {next1, next2}")
self.mapping_valid = False
return False # no need to continue path if edges are None or don't match
if next1.label != next2.label:
print(f"Label mismatch {next1, next2}")
self.mapping_valid = False
return False
if next1.port != next2.port:
print(f"Port mismatch {next1, next2}")
self.mapping_valid = False
return False # no need to continue path if ports don't match
return True
def check_subgraph_isomorphism(
self, v1: int, v2: int, subgraph1: SubGraph, subgraph2: SubGraph
) -> IsomorphismData:
"""Check for disconnected subgraph isomorphism where no nodes connect
to global inputs or outputs"""
print("Check subgraph isomorphism")
g1, g2 = self.graphs
if (v1 < 0 or v1 > max(subgraph1.nodes)) or (
v2 < 0 or v2 > max(subgraph2.nodes)
):
raise ValueError(
f"Node pair {(v1, v2)} not in the node sets for graph pair."
)
# beginning by mapping v1 to v2 as a first attempt
self.update_mapping(v1, v2, MappingMode.NODE)
self.traverse_from_nodes(g1.nodes[v1], g2.nodes[v2])
if self.mapping_valid:
if any([self.node_mapping[i] == -1 for i in subgraph1.nodes]):
raise ValueError(f"Permutation incomplete: {self.node_mapping}")
return IsomorphismData(self.mapping_valid, self.node_mapping, self.edge_mapping)
def check_MC_isomorphism(self) -> tuple[bool, list[int], list[int]]:
"""Check for graph isomorphism in monogamous, cartesian case"""
g1, g2 = self.graphs
# We can begin with mapping the input and output nodes only
if self.mapping_valid:
self.update_mapping_list(
g1.input_nodes, g2.input_nodes, mode=MappingMode.NODE
)
if self.mapping_valid:
self.update_mapping_list(
g1.output_nodes, g2.output_nodes, mode=MappingMode.NODE
)
# Transverse from the input nodes
for input_1, input_2 in zip(g1.input_nodes, g2.input_nodes):
v1 = g1.nodes[input_1]
v2 = g2.nodes[input_2]
logger.debug(f"Starting traversal from input nodes {v1, v2}")
self.traverse_from_nodes(v1, v2)
# Transverse from the output nodes
for output_1, output_2 in zip(g1.output_nodes, g2.output_nodes):
v1 = g1.nodes[output_1]
v2 = g2.nodes[output_2]
self.traverse_from_nodes(v1, v2)
if self.mapping_valid:
if any([i == -1 for i in self.node_mapping]):
raise ValueError(f"Permutation incomplete: {self.node_mapping}")
return (self.mapping_valid, self.node_mapping, self.edge_mapping)
|
n_nodes
property
Return the number of nodes in the graphs.
n_edges
property
Return the number of edges in the graphs.
__post_init__()
Post-initialization checks for isomorphism. Includes basic size checks.
Source code in src/IsomorphismChecker_python_serial/isomorphisms.py
| def __post_init__(self):
"""Post-initialization checks for isomorphism. Includes basic size checks."""
g1, g2 = self.graphs
if not (
len(g1.nodes) == len(g2.nodes)
and len(g1.input_nodes) == len(g2.input_nodes)
and len(g1.output_nodes) == len(g2.output_nodes)
and len(g1.edges) == len(g2.edges)
):
self.mapping_valid = False
self.node_mapping = []
self.edge_mapping = []
else:
self.node_mapping = [-1] * self.n_nodes
self.edge_mapping = [-1] * self.n_edges
self.mapping_valid = True
|
update_mapping(i, j, mode)
Update the mapping p with i -> j, Sets is_isomorphic to False if inconsistent
Source code in src/IsomorphismChecker_python_serial/isomorphisms.py
| def update_mapping(self, i: int, j: int, mode: MappingMode):
"""Update the mapping p with i -> j, Sets is_isomorphic to False if inconsistent"""
if mode == MappingMode.NODE:
mapping = self.node_mapping
elif mode == MappingMode.EDGE:
mapping = self.edge_mapping
else:
raise ValueError(f"Mode must be 'node' or 'edge', got {mode}")
logger.debug(f"Current mapping {mode}: {mapping}")
logger.debug(f"Updating mapping {mode}: {i} -> {j}")
if i < 0 or i >= len(mapping):
raise ValueError(
f"Index {i} out of bounds for permutation of size {len(mapping)}"
)
if j < 0 or j >= len(mapping):
raise ValueError(
f"Index {j} out of bounds for permutation of size {len(mapping)}"
)
if mapping[i] == j:
return
elif mapping[i] == -1:
if j in mapping:
# Another node is already mapped to j
self.mapping_valid = False
logger.debug(
f"Contradiction mapping {mode} {i} -> {j}; existing mapping {mapping.index(j)} -> {j}"
)
else:
mapping[i] = j
logger.debug(f"Mapping {mode} {i} -> {j}")
else:
self.mapping_valid = False
|
update_mapping_list(list1, list2, mode)
Update the mapping p with i -> j for all i in list1 and j in list2, Sets is_isomorphic to False if inconsistent
Source code in src/IsomorphismChecker_python_serial/isomorphisms.py
| def update_mapping_list(
self, list1: list[int], list2: list[int], mode: MappingMode
):
"""Update the mapping p with i -> j for all i in list1 and j in list2, Sets is_isomorphic to False if inconsistent"""
if len(list1) != len(list2):
raise ValueError(
f"Lists must be of same length, got {len(list1)} and {len(list2)}"
)
for i, j in zip(list1, list2):
if self.mapping_valid:
self.update_mapping(i, j, mode)
|
check_edge_compatibility(e1, e2)
Checks the length of sources and targets and labels of two edges for compatibility
Source code in src/IsomorphismChecker_python_serial/isomorphisms.py
| def check_edge_compatibility(self, e1: int | None, e2: int | None) -> bool:
"""Checks the length of sources and targets and labels of two edges for compatibility"""
logger.debug(f"Check equality: e1={e1}, e2={e2}")
if e1 is None and e2 is None:
return True
if e1 is None or e2 is None:
return False
edge1 = self.graphs[0].edges[e1]
edge2 = self.graphs[1].edges[e2]
return (
(edge1.label == edge2.label)
and (len(edge1.sources) == len(edge2.sources))
and (len(edge1.targets) == len(edge2.targets))
)
|
explore_edges(e1, e2)
Explore edges e1 and e2, updating mappings and traversing connected nodes.
Source code in src/IsomorphismChecker_python_serial/isomorphisms.py
| def explore_edges(self, e1: int, e2: int):
"""Explore edges e1 and e2, updating mappings and traversing connected nodes."""
print(f"Traverse from edges {e1, e2}")
logger.debug(f"Exploring edges {e1, e2}")
if e1 in self.visited_edges:
self.mapping_valid = (
False if self.edge_mapping[e1] != e2 else self.mapping_valid
)
return
if not self.check_edge_compatibility(e1, e2):
self.mapping_valid = False
return
self.visited_edges.append(e1)
logger.debug(f"Mapping edge {e1} -> {e2}")
logger.debug(f"Currently is_isomorphic = {self.mapping_valid}")
self.update_mapping(e1, e2, MappingMode.EDGE)
logger.debug(f"Currently is_isomorphic = {self.mapping_valid}")
logger.debug("Checkpoint 1")
if not self.mapping_valid:
return False
logger.debug("Checkpoint 2")
# Commented out: logger.debug(g1.edges[e1], g2.edges[e2])
logger.debug(f"Edges: {self.graphs[0].edges[e1]}, {self.graphs[1].edges[e2]}")
n_sources = len(self.graphs[0].edges[e1].sources)
n_targets = len(self.graphs[0].edges[e1].targets)
logger.debug(f"n_sources, n_targets = {n_sources, n_targets}")
# check sources
for s in range(n_sources):
s1 = self.graphs[0].edges[e1].sources[s]
s2 = self.graphs[1].edges[e2].sources[s]
v1, v2 = self.graphs[0].nodes[s1], self.graphs[1].nodes[s2]
logger.debug(f"Prev nodes = {v1, v2}")
self.update_mapping(v1.index, v2.index, mode=MappingMode.NODE)
if self.mapping_valid:
self.traverse_from_nodes(v1, v2)
else:
return
# check targets
for t in range(n_targets):
t1 = self.graphs[0].edges[e1].targets[t]
t2 = self.graphs[1].edges[e2].targets[t]
v1, v2 = self.graphs[0].nodes[t1], self.graphs[1].nodes[t2]
logger.debug(f"Prev nodes = {v1, v2}")
self.update_mapping(v1.index, v2.index, mode=MappingMode.NODE)
if self.mapping_valid:
self.traverse_from_nodes(v1, v2)
else:
return
|
traverse_from_nodes(v1, v2)
Traverse the graph from nodes v1 and v2, exploring connected edges and nodes.
Source code in src/IsomorphismChecker_python_serial/isomorphisms.py
| def traverse_from_nodes(self, v1: Node, v2: Node):
"""Traverse the graph from nodes v1 and v2, exploring connected edges and nodes."""
print(f"Traverse from nodes {v1, v2}")
logger.debug(f"Traversing {v1, v2}")
if v1.index in self.visited_nodes:
if v2.index != self.node_mapping[v1.index]:
self.mapping_valid = False
return
if v1.label != v2.label:
self.mapping_valid = False
return
self.visited_nodes.append(v1.index)
print("check splitting")
# nodes require the same amount of splitting in each directoin
if len(v1.next) != len(v2.next) or (len(v1.prev) != len(v2.prev)):
self.mapping_valid = False
return
# num_nexts = len(v1.next)
# matching_paths: list[list[int]] = [[] for i in range(num_nexts)]
# print(f"Nexts = {v1.next, v2.next}")
# eliminated_paths = {i: False for i in range(num_nexts)}
if len(v1.next) > 1:
raise RuntimeError("Branching paths algorithm in progress")
# for i, next1 in enumerate(v1.next):
# # attempt to find a matching path in g2
# for j, next2 in enumerate(v2.next):
# valid_pair = self.check_edges_for_continuation(next1, next2)
# print(f"Validity: {i, j, valid_pair}")
# if valid_pair:
# print(f"Update matching paths {i, j, matching_paths}")
# matching_paths[i].append(j)
# print(f"New matchin paths {matching_paths}")
# # sort the potential paths to eliminate the simplest decisions first
# matching_paths.sort(key=lambda x: len(x))
# print(matching_paths)
# for i, js in enumerate(matching_paths):
# if len(js) == 0:
# self.mapping_valid = False
# return
# elif len(js) == 1:
# j = js[0]
# self.explore_edges(v1.next[i].index, v2.next[j].index)
# else: # explore possible paths
# # placeholder to allow for rollback: we don't want to do a huge copy every branch!
# print("ended up here")
# # node_map_copy = self.node_mapping.copy()
# # edge_map_copy = self.edge_mapping.copy()
# assert self.mapping_valid
# explore paths starting with the minimal branching
else:
next1, next2 = head(v1.next), head(v2.next)
if self.check_edges_for_continuation(next1, next2):
self.explore_edges(next1.index, next2.index)
prev1, prev2 = head(v1.prev), head(v2.prev)
if self.check_edges_for_continuation(prev1, prev2):
self.explore_edges(prev1.index, prev2.index)
|
check_MC_isomorphism()
Check for graph isomorphism in monogamous, cartesian case
Source code in src/IsomorphismChecker_python_serial/isomorphisms.py
| def check_MC_isomorphism(self) -> tuple[bool, list[int], list[int]]:
"""Check for graph isomorphism in monogamous, cartesian case"""
g1, g2 = self.graphs
# We can begin with mapping the input and output nodes only
if self.mapping_valid:
self.update_mapping_list(
g1.input_nodes, g2.input_nodes, mode=MappingMode.NODE
)
if self.mapping_valid:
self.update_mapping_list(
g1.output_nodes, g2.output_nodes, mode=MappingMode.NODE
)
# Transverse from the input nodes
for input_1, input_2 in zip(g1.input_nodes, g2.input_nodes):
v1 = g1.nodes[input_1]
v2 = g2.nodes[input_2]
logger.debug(f"Starting traversal from input nodes {v1, v2}")
self.traverse_from_nodes(v1, v2)
# Transverse from the output nodes
for output_1, output_2 in zip(g1.output_nodes, g2.output_nodes):
v1 = g1.nodes[output_1]
v2 = g2.nodes[output_2]
self.traverse_from_nodes(v1, v2)
if self.mapping_valid:
if any([i == -1 for i in self.node_mapping]):
raise ValueError(f"Permutation incomplete: {self.node_mapping}")
return (self.mapping_valid, self.node_mapping, self.edge_mapping)
|
MappingMode
MappingMode
Bases: Enum
Source code in src/IsomorphismChecker_python_serial/isomorphisms.py
| class MappingMode(Enum):
NODE = "node"
EDGE = "edge"
|
Functions
MC_isomorphism
Main function for checking monogamous, cartesian hypergraph isomorphism.
from IsomorphismChecker_python_serial.isomorphisms import MC_isomorphism
from IsomorphismChecker_python_serial.hypergraph import OpenHypergraph
# Check if two hypergraphs are isomorphic
is_iso, node_mapping, edge_mapping = MC_isomorphism(graph1, graph2)
if is_iso:
print(f"Graphs are isomorphic!")
print(f"Node mapping: {node_mapping}")
print(f"Edge mapping: {edge_mapping}")
else:
print("Graphs are not isomorphic")
Parameters:
- g1 (OpenHypergraph): First hypergraph
- g2 (OpenHypergraph): Second hypergraph
Returns:
- tuple[bool, list[int], list[int]]:
- Boolean indicating if graphs are isomorphic
- Node mapping (permutation) if isomorphic, empty list otherwise
- Edge mapping (permutation) if isomorphic, empty list otherwise
permute_graph
Creates a randomly permuted version of a hypergraph for testing isomorphism algorithms.
from IsomorphismChecker_python_serial.isomorphisms import permute_graph
# Create a permuted version of a graph
permutation, permuted_graph = permute_graph(original_graph)
# The permutation list shows how nodes were reordered
print(f"Permutation used: {permutation}")
Parameters:
- g (OpenHypergraph): Input hypergraph to permute
Returns:
- tuple[list[int], OpenHypergraph]:
- The permutation used (list of node indices)
- The permuted hypergraph
How Isomorphism is Computed
This section explains the algorithm used in the MC_isomorphism function in terms of code logic and steps. In order to understand the mathematical reasoning behind the algorithm, please refer to the algorithm documentation.
The check_MC_isomorphism method implements a graph traversal-based algorithm specifically designed for monogamous, cartesian (MC) hypergraphs. Here's a detailed explanation of how it works:
Step-by-Step Algorithm
1. Initial Validation
# Check basic structural properties
if self.n_nodes[0] != self.n_nodes[1]:
return False, [], [] # Different number of nodes
if self.n_edges[0] != self.n_edges[1]:
return False, [], [] # Different number of edges
The algorithm first performs quick checks to ensure both graphs have:
- Same number of nodes
- Same number of edges
- Same number of input nodes
- Same number of output nodes
If any of these checks fail, the graphs cannot be isomorphic.
# Map input nodes (sources with no incoming edges)
for i, input_index in enumerate(self.graphs[0].input_nodes):
corresponding_input = self.graphs[1].input_nodes[i]
self.update_mapping(input_index, corresponding_input, MappingMode.NODES)
# Similarly for output nodes
The algorithm establishes an initial mapping by aligning:
- Input nodes (nodes with no predecessors) in order
- Output nodes (nodes with no successors) in order
This provides anchor points for the traversal.
def traverse_from_nodes(self, start_nodes: list[int], graph_id: int) -> bool:
"""Traverse graph starting from given nodes, building mappings."""
queue = deque(start_nodes)
visited_nodes = set(start_nodes)
while queue:
current_node = queue.popleft()
# Find outgoing edges from current node
outgoing_edges = [e for e in graph.edges if current_node in e.sources]
for edge in outgoing_edges:
# Try to map this edge and its connected nodes
if not self.explore_edges(edge, graph_id):
return False # Incompatible structure
# Add newly mapped target nodes to queue
for target in edge.targets:
if target not in visited_nodes:
queue.append(target)
visited_nodes.add(target)
return True
Key points:
- Uses breadth-first search (BFS) starting from input nodes
- Maintains a queue of nodes to visit
- For each node, explores all outgoing edges
- Attempts to map edges and their connected nodes
- Continues until all reachable nodes are visited
4. Edge Exploration and Mapping
def explore_edges(self, edge: HyperEdge, graph_id: int) -> bool:
"""Explore an edge and try to map it to the other graph."""
# Get corresponding edge in the other graph
other_graph = self.graphs[1 - graph_id]
# Find candidate edges in other graph that could match
candidates = [e for e in other_graph.edges
if self.check_edge_compatibility(edge, e, graph_id)]
if not candidates:
return False # No compatible edge found
# Use the first compatible edge (deterministic for MC graphs)
matched_edge = candidates[0]
# Update edge mapping
self.update_mapping(edge.index, matched_edge.index, MappingMode.EDGES)
# Update node mappings for sources and targets
for src1, src2 in zip(edge.sources, matched_edge.sources):
self.update_mapping(src1, src2, MappingMode.NODES)
for tgt1, tgt2 in zip(edge.targets, matched_edge.targets):
self.update_mapping(tgt1, tgt2, MappingMode.NODES)
return True
The exploration process:
- For each edge in graph 1, finds compatible edges in graph 2
- Checks compatibility based on: (a) Edge labels must match (b)Number of sources must match (c)Number of targets must match (d)Already-mapped nodes must correspond correctly
- Updates both edge and node mappings
- Returns false if no compatible edge exists
5. Edge Compatibility Checking
def check_edge_compatibility(self, edge1: HyperEdge, edge2: HyperEdge,
graph_id: int) -> bool:
"""Check if two edges are compatible for mapping."""
# Labels must match
if edge1.label != edge2.label:
return False
# Structure must match
if len(edge1.sources) != len(edge2.sources):
return False
if len(edge1.targets) != len(edge2.targets):
return False
# Check if already-mapped nodes correspond correctly
for src1, src2 in zip(edge1.sources, edge2.sources):
if src1 in self.node_mappings[graph_id]:
if self.node_mappings[graph_id][src1] != src2:
return False # Conflict with existing mapping
# Similar check for targets
for tgt1, tgt2 in zip(edge1.targets, edge2.targets):
if tgt1 in self.node_mappings[graph_id]:
if self.node_mappings[graph_id][tgt1] != tgt2:
return False
return True
Compatibility requires:
- Identical edge labels
- Same number of sources and targets
- Consistency with existing node mappings
- No conflicts with previously established correspondences
6. Bidirectional Traversal
# Traverse from inputs (forward direction)
if not self.traverse_from_nodes(self.graphs[0].input_nodes, 0):
return False, [], []
# Traverse from outputs (backward direction)
if not self.traverse_from_nodes(self.graphs[0].output_nodes, 0):
return False, [], []
The algorithm performs two traversals:
1. Forward: Starting from input nodes, following edge directions
2. Backward: Starting from output nodes, following edges in reverse
This ensures all nodes and edges are covered, even in graphs with cycles.
7. Final Validation
# Verify all nodes are mapped
if len(self.node_mappings[0]) != self.n_nodes[0]:
return False, [], []
# Verify all edges are mapped
if len(self.edge_mappings[0]) != self.n_edges[0]:
return False, [], []
# Convert mappings to permutation lists
node_permutation = [self.node_mappings[0][i] for i in range(self.n_nodes[0])]
edge_permutation = [self.edge_mappings[0][i] for i in range(self.n_edges[0])]
return True, node_permutation, edge_permutation
Final checks ensure:
- Every node in graph 1 has been mapped to a node in graph 2
- Every edge in graph 1 has been mapped to an edge in graph 2
- The mappings form valid permutations (bijections)
Algorithm Complexity
Time Complexity: O(V + E)
- Each node is visited exactly once during traversal
- Each edge is examined exactly once
- V = number of vertices (nodes)
- E = number of edges
Space Complexity: O(V + E)
- Stores node mappings: O(V)
- Stores edge mappings: O(E)
- BFS queue: O(V) in worst case
- Visited set: O(V)
Why This Algorithm Works for MC Hypergraphs
Monogamous Property:
- Each source/target appears in exactly one edge
- This makes edge matching deterministic
- No backtracking needed
Cartesian Property:
- Graph structure follows cartesian product rules
- Traversal order doesn't affect correctness
- Single forward pass is sufficient
Key Insight: The combination of monogamous and cartesian properties means that once we establish the input/output node mapping, the rest of the graph structure is uniquely determined. The traversal simply verifies this unique structure exists in both graphs.
Example Execution Trace
Consider two simple isomorphic graphs:
Graph 1: a --f--> b --g--> c
Graph 2: x --f--> y --g--> z
Step 1: Initial validation ✓
- Both have 3 nodes, 2 edges
Step 2: Map inputs/outputs
- Map: a → x (both are inputs)
- Map: c → z (both are outputs)
Step 3: Forward traversal from 'a'
- Visit node 'a' (already mapped to 'x')
- Find edge 'f' from 'a' to 'b'
- Find edge 'f' from 'x' in Graph 2 (goes to 'y')
- Map: edge f₁ → f₂
- Map: b → y (target nodes)
Step 4: Continue from 'b'
- Visit node 'b' (mapped to 'y')
- Find edge 'g' from 'b' to 'c'
- Find edge 'g' from 'y' in Graph 2 (goes to 'z')
- Map: edge g₁ → g₂
- Node 'c' already mapped to 'z' ✓
Step 5: Final validation
- All 3 nodes mapped ✓
- All 2 edges mapped ✓
- Return: (True, [0,1,2], [0,1])
When the Algorithm Fails
The algorithm returns False if:
-
Structural mismatch:
Graph 1: a --> b --> c
Graph 2: a --> b, b --> c, a --> c # Extra edge
# Fails: Different number of edges
-
Label mismatch:
Graph 1: a --f--> b
Graph 2: a --g--> b # Different edge label
# Fails: Edge compatibility check
-
Incompatible structure:
Graph 1: a --f--> b --g--> c
Graph 2: a --f--> c --g--> b # Different connectivity
# Fails: Node mapping conflict
-
Incomplete traversal:
# If graph has unreachable nodes
# Fails: Not all nodes mapped