Source code for symbolic_dynamics.sofic

"""Functions and algorithms for sofic shifts.

The functions in this module are designed to work
on a :class:`networkx.MultiDiGraph`, which we refer to just
as :dfn:`graphs`. Furthermore, some functions require
more specific properties to ensure correctness, which include

* labeled - :func:`is_labeled`
* deterministic - :func:`is_deterministic`
* essential - :func:`is_essential`
* irreducible - :func:`is_irreducible`
* follower-separated - :func:`is_follower_separated`
* synchronizing - :func:`is_synchronizing`

See the documentation for the associated functions for a definition
of the properties. However, all functions in this module
do not explicity check if their input graphs have the property needed
correctness.
"""


import networkx as nx
from symbolic_dynamics.utils import iset, first
from itertools import combinations, islice, product
import random
from contextlib import contextmanager
from functools import wraps

# __all__ = ["dot", "idot", "is_labeled", "alphabet", "out_labels",
#            "is_stranded", "is_essential", "make_essential",
#            "is_deterministic", "is_fully_deterministic",
#            "from_partial_fns", "random_deterministic_graph",
#            "random_deterministic_graph_with_props",
#            "is_irreducible", "add_sink_vertex",
#            "get_follower_equivalences", "is_follower_separated",
#            "extend_to_synchronizing_word", "find_synchronizing_word",
#            "reduce", "find_separating_word", "is_subshift",
#            "is_synchronizing", "is_label_isomorphic_fs",
#            "subset_construction", "sink"]


[docs]def dot(G, q, w): """Computes the transition action of `w` in `G` on `q`. In a deterministic labeled graph `G`, any path starting at a given vertex is uniquely determinied by its sequence of labels. If there is a path labeled `w` starting at `q` in `G`, then the transition action of `w` in `G` on `q` is defined to be the vertex that path ends at. Otherwise, if there is no such path, then the transtion action of `w` in `G` on `q` is defined to be None. Parameters ---------- G : deterministic labeled graph q : vertex in `G` w : word Examples -------- >>> G = nx.MultiDiGraph() >>> nx.add_path(G, range(5), label="a") >>> sd.dot(G, 0, "aaaa") 4 >>> sd.dot(G, 1, "aaaa") is None True """ if q is None: return None for a in w: delta = {ell: p for (_, p, ell) in G.out_edges(q, data="label")} if a not in delta: return None q = delta[a] return q
[docs]def idot(G, it, w): """Computes the transition action of `w` in `G` on each element of the iterable `it`, returning an :class:`.iset` of the non-None results. Parameters ---------- G : deterministic labeled graph it : iterable of vertices in `G` w : word Examples -------- >>> G = nx.MultiDiGraph() >>> nx.add_path(G, range(5), label="a") >>> sd.idot(G, [0, 1], "aaa") {3, 4} >>> sd.idot(G, [0, 1], "aaaa") {4} >>> sd.idot(G, [0, 1], "aaaaa") {} See Also -------- :func:`dot` """ x = (dot(G, q, w) for q in it) x = (q for q in x if q is not None) return iset(x)
[docs]def is_labeled(G): """Returns True iff every edge in the graph `G` has the attribute ``label``. Parameters ---------- G : graph Examples -------- >>> G = nx.MultiDiGraph() >>> G.add_edge(1, 2, label="a") >>> sd.is_labeled(G) True >>> G.add_edge(2, 1) >>> sd.is_labeled(G) False """ return all("label" in data for _, _, data in G.edges(data=True))
[docs]def alphabet(G): """Returns an :class:`.iset` of labels appearing in `G`. Parameters ---------- G : labeled graph Examples -------- >>> G = nx.MultiDiGraph() >>> G.add_edge(1, 2, label="a") >>> G.add_edge(2, 1, label="b") >>> sd.alphabet(G) {'a', 'b'} """ return iset(label for (_, _, label) in G.edges(data="label"))
[docs]def out_labels(G, q): """Returns a list of each of the labels appearing on the edges starting at `q` in `G`. Parameters ---------- G : labeled graph q : vertex in `G` Examples -------- >>> G = nx.MultiDiGraph() >>> G.add_edge(1, 2, label="a") >>> G.add_edge(1, 3, label="a") >>> G.add_edge(1, 1, label="b") >>> sd.out_labels(G, 1) ['a', 'a', 'b'] """ return [label for (_, _, label) in G.out_edges(q, data="label")]
[docs]def is_stranded(G, q): """Returns True iff `q` is stranded in `G`. A vertex `q` in `G` is :dfn:`stranded` if there is no edge starting at `q` or if there is no edge ending at `q`. Parameters ---------- G : graph q : vertex in `G` Examples -------- >>> G = nx.MultiDiGraph() >>> G.add_edge(1, 2) >>> sd.is_stranded(G, 1) True >>> G.add_edge(1, 1) >>> sd.is_stranded(G, 1) False See Also -------- :func:`is_essential` """ return not (G.out_edges(q) and G.in_edges(q))
[docs]def is_essential(G): """Returns True iff `G` is essential. A graph `G` is :dfn:`essential` if no vertex is stranded. Parameters ---------- G : graph Examples -------- >>> G = nx.MultiDiGraph() >>> G.add_edge(1, 2) >>> G.add_edge(1, 1) >>> sd.is_essential(G) False >>> G.add_edge(2, 2) >>> sd.is_essential(G) True See Also -------- :func:`is_stranded` """ return all(not is_stranded(G, q) for q in G)
[docs]def make_essential(G): """Modifies `G` by removing vertices in `G` that do not lie on bi-infinite paths. The resulting graph is the maximal essential subgraph of the original graph. Parameters ---------- G : a graph Examples -------- >>> G = nx.MultiDiGraph() >>> G.add_edge(1, 2) >>> G.add_edge(1, 1) >>> len(G), sd.is_essential(G) (2, False) >>> sd.make_essential(G) >>> len(G), sd.is_essential(G) (1, True) See Also -------- :func:`is_essential` """ nonextensible = [q for q in G if not G.out_edges(q)] while True: frontier = iset(q for (q, _) in G.in_edges(nonextensible)) G.remove_nodes_from(nonextensible) nonextensible = [q for q in frontier if not G.out_edges(q)] if not nonextensible: break noncoextensible = [q for q in G if not G.in_edges(q)] while True: frontier = iset(q for (_, q) in G.out_edges(noncoextensible)) G.remove_nodes_from(noncoextensible) noncoextensible = [q for q in frontier if not G.in_edges(q)] if not noncoextensible: break
[docs]def is_deterministic(G): """Returns True iff `G` is deterministic. A labeled graph `G` is :dfn:`deterministic` if every edge starting at a given vertex is labeled uniquely. Parameters ---------- G : labeled graph Examples -------- >>> G = nx.MultiDiGraph() >>> G.add_edge(1, 2, label="a") >>> G.add_edge(1, 3, label="b") >>> sd.is_deterministic(G) True >>> G.add_edge(1, 4, label="a") >>> sd.is_deterministic(G) False """ for q in G: ol = out_labels(G, q) if len(ol) != len(set(ol)): return False return True
[docs]def is_fully_deterministic(G): """Returns True iff `G` is fully deterministic. A labeled graph `G` is :dfn:`fully deterministic` if `G` is deterministic and for every vertex `q` in `G`, the set of labels of edges starting at `q` is equal the set of labels appearing in `G`. Parameters ---------- G : labeled graph Examples -------- >>> G = nx.MultiDiGraph() >>> G.add_edge(1, 2, label="a") >>> G.add_edge(2, 1, label="a") >>> sd.is_fully_deterministic(G) True >>> G.add_edge(1, 1, label="b") >>> sd.is_fully_deterministic(G) False >>> G.add_edge(2, 2, label="b") >>> sd.is_fully_deterministic(G) True See Also -------- :func:`is_deterministic`, :func:`alphabet` """ sigma = alphabet(G) for q in G: ol = out_labels(G, q) ol_s = iset(ol) if not (len(ol) == len(ol_s) and ol_s == sigma): return False return True
[docs]def from_partial_fns(pfns): """Returns a deterministic graph built from given partial functions. Here, `pfns` is a dict of dicts: the keys of the outer dicts are labels to be used in the output graph, and the inner dicts represent where the edges for a given label are located. More specifically, if `G` is the result of ``from_partial_fns(pfns)``, then `G` has the following property: >>> for (a, pfn) in pfns.items(): ... for q in G: ... if q in pfn: ... assert dot(G, q, [a]) == pfn[q] ... else: ... assert dot(G, q, [a]) is None Parameters ---------- pfns : dict of dicts; see above Examples -------- >>> d = {"a": {0: 1, 1: 2, 2: 3}, "b": {3: 0, 0: 3}} >>> G = sd.from_partial_fns(d) >>> list(G.edges(data="label")) [(0, 1, 'a'), (0, 3, 'b'), (1, 2, 'a'), (2, 3, 'a'), (3, 0, 'b')] """ G = nx.MultiDiGraph() for a, pfn in pfns.items(): for p, q in pfn.items(): G.add_edge(p, q, label=a) return G
def random_partial_fn(n): pfn = {} for i in range(n): r = random.randrange(-1, n) if r >= 0: pfn[i] = r return pfn
[docs]def random_deterministic_graph(n, m): """Randomly generates a deterministic graph defined by `m` partial functions over `n` states. Parameters ---------- n : int domain/codomain of partial functions m : int number of partial functions """ return from_partial_fns({str(i): random_partial_fn(n) for i in range(m)})
[docs]def random_deterministic_graph_with_props(n, m, props): """Randomly generates a deterministic graph defined by `m` partial functions over `n` states that satisfies each each predicate in `props`. Parameters ---------- n : int domain/codomain of partial functions m : int number of partial functions props : list of predicates the predicates for the random graph to satisfy Examples -------- >>> is_reducible = lambda G: not sd.is_irreducible(G) >>> props = [sd.is_essential, is_reducible] >>> G = sd.random_deterministic_graph_with_props(5, 2, props) >>> sd.is_essential(G) True >>> sd.is_irreducible(G) False """ while True: G = random_deterministic_graph(n, m) if all(p(G) for p in props): return G
[docs]def subset_presentation(G): """Constructs the subset presentation of `G`. The :dfn:`subset presentation` of `G` is a deterministic labeled graph such that shift presented by `G` is equivalent to the shift presented by the subset presentation. Parameters ---------- G : labeled graph """ ssc = nx.MultiDiGraph() sigma = alphabet(G) X = iset(G) stack = [X] popped = set() while stack: X = stack.pop() ssc.add_node(X) popped.add(X) for a in sigma: Y = iset(q for (_, q, b) in G.out_edges(X, data="label") if b == a) if Y: ssc.add_edge(X, Y, label=a) if Y not in popped: stack.append(Y) return ssc
[docs]def is_irreducible(G): """Returns True iff `G` is irreducible A graph is :dfn:`irreducible` if for any two vertices in `G`, there is a path between them. Synonymous with *strongly connected*. Parameters ---------- G : graph """ return nx.is_strongly_connected(G)
class Partition: def __init__(self, elements): self.elements = elements self.part_lookup = {e: 0 for e in elements} self.parts = {0: set(elements)} def part_count(self): return len(self.parts) def part_size(self, p): return len(self.parts[p]) def split(self, mark_it): # mark_it is an iterable of elements to mark marks = {} for element in mark_it: p = self.part_lookup[element] if p not in marks: marks[p] = set() marks[p].add(element) splits = [] # iterate over parts with marks for p in marks: if self.part_size(p) > len(marks[p]): new_p = self.part_count() self.parts[new_p] = set() for marked in marks[p]: self.part_lookup[marked] = new_p self.parts[new_p].add(marked) self.parts[p].remove(marked) splits.append((p, new_p)) return splits def select_smaller(self, p1, p2): if self.part_size(p1) <= self.part_size(p2): return p1 else: return p2 def hopcroft(G, S): """Hopcroft's algorthm for computing state equivalence. Parameters ---------- G : fully deterministic graph S : iterable one half of the initial (bi)partition Returns ------- Partition """ sigma = alphabet(G) partition = Partition(list(G)) p1, p2 = partition.split(S)[0] smaller = partition.select_smaller(p1, p2) wait_set = set() for a in sigma: wait_set.add((smaller, a)) while wait_set: p, a = wait_set.pop() inv_a_p = G.in_edges(partition.parts[p], data="label") inv_a_p = (p for (p, q, label) in inv_a_p if label == a) for (p1, p2) in partition.split(inv_a_p): for b in sigma: if (p1, b) in wait_set: wait_set.add((p2, b)) else: smaller = partition.select_smaller(p1, p2) wait_set.add((smaller, b)) return partition sink = float("inf") def add_sink_vertex(G, sigma=None): if sigma is None: sigma = alphabet(G) G.add_node(sink) for q in G: ol = out_labels(G, q) diff = [a for a in sigma if a not in ol] for a in diff: G.add_edge(q, sink, label=a) for a in sigma: G.add_edge(sink, sink, label=a) @contextmanager def sink_context(G, sigma=None): add_sink_vertex(G, sigma) try: yield finally: G.remove_node(sink)
[docs]def get_follower_equivalences(G): """Gets the follower-equivalences of `G`. In a deterministic labeled graph `G`, two vertices `p` and `q` are :dfn:`follower-equivalent` if there is no word `w` such that ``dot(G, p, w)`` and ``dot(G, q, w)`` are not both None. Returns a pair of dicts representing a partition of `G`, where two states are follower-equivalent iff they are in the same part of the partition. Parameters ---------- G : deterministic label graph Returns ------- parts : dict maps each part number to a part of the partition part_lookup : dict maps each vertex of `G` to a part number Examples -------- >>> G = nx.MultiDiGraph() >>> G.add_edge(1, 3, label="a") >>> G.add_edge(2, 3, label="a") >>> G.add_edge(1, 1, label="b") >>> G.add_edge(2, 2, label="b") >>> G.add_edge(3, 3, label="b") >>> parts, part_lookup = sd.get_follower_equivalences(G) >>> parts {0: {1, 2}, 2: {3}} >>> part_lookup {1: 0, 3: 2, 2: 0} See Also -------- :func:`is_follower_separated` :func:`reduce` """ with sink_context(G): partition = hopcroft(G, [sink]) sink_p = partition.part_lookup[sink] del partition.parts[sink_p] del partition.part_lookup[sink] return partition.parts, partition.part_lookup
[docs]def is_follower_separated(G): """Returns True iff `G` is follower-separated. A deterministic labeled graph `G` if no two distinct vertices are follower-equivalant. Parameters ---------- G : deterministic labeled graph Examples -------- >>> G = nx.MultiDiGraph() >>> G.add_edge(1, 3, label="a") >>> G.add_edge(2, 3, label="a") >>> G.add_edge(1, 1, label="b") >>> G.add_edge(2, 2, label="b") >>> G.add_edge(3, 3, label="b") >>> sd.is_follower_separated(G) False See Also -------- :func:`get_follower_equivalences` :func:`reduce` """ parts, _ = get_follower_equivalences(G) return all(len(part) == 1 for part in parts.values())
def pair_shrink_graph(G): sigma = alphabet(G) S = nx.MultiDiGraph() S.add_node(sink) for pair in combinations(G, 2): pair = iset(pair) for a in sigma: res = idot(G, pair, [a]) if len(res) == 2: S.add_edge(pair, res, label=a) elif len(res) == 1: S.add_edge(pair, sink, label=a) return S def returns_word(f): @wraps(f) def decorated(*args, **kwargs): as_str = kwargs.pop("as_str", False) w = f(*args, **kwargs) if as_str: return "".join(w) else: return w return decorated @returns_word def get_path_label(G, path): label = [] for p, q in zip(path, islice(path, 1, None)): edge = first(G[p][q].values()) label.append(edge["label"]) return label @returns_word def extend_to_synchronizing_word(G, w): """Attempt to extend `w` to a synchronizing word in `G`. Parameters ---------- G : deterministic labeled graph w : word Returns ------- u : word or None a synchronizing word `u` extending `w` if successful, None otherwise """ S = pair_shrink_graph(G) paths = nx.shortest_path(S, target=sink) w = list(w) X = idot(G, G, w) while len(X) > 1: pq = iset(islice(X, 2)) if pq in paths: u = get_path_label(S, paths[pq]) X = idot(G, X, u) w.extend(u) else: return None return w
[docs]@returns_word def find_synchronizing_word(G): """Search for a synchronizing word in `G`. A :dfn:`synchronizing word` in `G` is a word `w` such that ``len(idot(G, G, w)) == 1``. If `G` is follower-separated, a synchronizing word always exists and one is always returned. If `G` is irreducible or fully deterministic, then the return value is not None iff there is a synchronizing word. Otherwise, the return value being None does not imply no synchronizing word exists in `G`. Parameters ---------- G : determinstic labeled graph Returns ------- word or None a synchronizing word in `G`, None if search was unsuccessful See Also -------- :func:`idot` """ return extend_to_synchronizing_word(G, [])
def find_synchronizing_word2(G): X = iset(G) w = [] with sink_context(G): paths = nx.shortest_path(G, target=sink) while len(X) > 1: for q in X: u = get_path_label(G, paths[q]) Y = idot(G, X, u) if len(Y) > 1: X = iset(q for q in Y if q != sink) w.extend(u) break return w
[docs]def reduce(G): """Compute the reduction of `G`. The :dfn:`reduction` of `G` is the graph resulting from merging follower-equivalant states in `G`. Parameters ---------- G : deterministic labeled graph Returns ------- deterministic labeled graph the reduction of `G` Examples -------- >>> G = nx.MultiDiGraph() >>> G.add_edge(1, 3, label="a") >>> G.add_edge(2, 3, label="a") >>> G.add_edge(1, 1, label="b") >>> G.add_edge(2, 2, label="b") >>> G.add_edge(3, 3, label="b") >>> list(sd.reduce(G).edges(data="label")) [({1, 2}, {3}, 'a'), ({1, 2}, {1, 2}, 'b'), ({3}, {3}, 'b')] See Also -------- :func:`get_follower_equivalences` :func:`is_follower_separated` """ parts, part_lookup = get_follower_equivalences(G) # convert the parts into iset for k in parts: parts[k] = iset(parts[k]) Gr = nx.MultiDiGraph() for (p, q, a) in G.edges(data="label"): p_set = parts[part_lookup[p]] q_set = parts[part_lookup[q]] # here, we are making sure not to add two edges of the # same label between the same two vertices # I believe follower-separation can be performed on # nondeterministic graphs, but since the only we calculate # follower equivalences is with deterministic graphs, # this check is sufficient if a not in out_labels(Gr, p_set): Gr.add_edge(p_set, q_set, label=a) return Gr
def asymmetric_shrink_graph(G, H): sigma = alphabet(G) | alphabet(H) S = nx.MultiDiGraph() S.add_node(sink) with sink_context(H, sigma=sigma): for p, q in product(G, H): if q == sink: continue for a in sigma: res_G = dot(G, p, [a]) res_H = dot(H, q, [a]) # node in G is alive # (no sink was added to G, which is why we check for None) if res_G is not None: # node in H is alive # (sink was added to H, which is why we check for sink) if res_H != sink: S.add_edge((p, q), (res_G, res_H), label=a) # node in H was killed else: S.add_edge((p, q), sink, label=a) # otherwise, node in G is killed, # do not add any edge return S
[docs]@returns_word def find_separating_word(G, H): """Search for a separting word between `G` and `H`. A :dfn:`separating word` between `G` and `H` is a word `w` such that ``len(idot(G, G, w)) > 0`` and ``len(idot(H, H, w)) == 0``. If `G` is irreducible, then return value is not None iff there is a separating word between `G` and `H`. Otherwise, the return value being None does not imply there is no separating word between `G` and `H`. Parameters ---------- G, H : deterministic labeled graphs Returns ------- word or None a separating word between `G` and `H`, None if search was unsuccessful """ S = asymmetric_shrink_graph(G, H) paths = nx.shortest_path(S, target=sink) p = first(G) X = iset(H) w = [] while len(X) != 0: q = first(X) if (p, q) in paths: u = get_path_label(S, paths[(p, q)]) p = dot(G, p, u) X = idot(H, X, u) w.extend(u) else: return None return w
[docs]def is_subshift(G, H): """Returns True iff the shift presented by `G` is contained in the shift presented by `H`. Requires `G` to be irreducible and `H` to be essential for the return value to be correct. Parameters ---------- G : irreducible deterministic labeled graph H : essential deterministic labeled graph Examples -------- >>> G = nx.MultiDiGraph() >>> G.add_edge(1, 1, label="0") >>> H = nx.MultiDiGraph() >>> H.add_edge(1, 1, label="1") >>> H.add_edge(1, 2, label="0") >>> H.add_edge(2, 1, label="0") >>> sd.is_subshift(G, H) True See Also -------- :func:`find_separating_word` """ return find_separating_word(G, H) is None
@returns_word def find_synchronizing_word_in_component(G, C): C_complement = [q for q in G if q not in C] G_C = G.subgraph(C).copy() G_C_complement = G.subgraph(C_complement).copy() w = find_separating_word(G_C, G_C_complement) if w is None: return None return extend_to_synchronizing_word(G_C, w) def _sync_words_ics(G): condensate = nx.condensation(G) initial_components = [ iset(C) for (Cp, C) in condensate.nodes(data="members") if not condensate.in_edges(Cp) ] for C in initial_components: yield (C, find_synchronizing_word_in_component(G, C))
[docs]def is_synchronizing(G): """Returns True iff `G` is synchronizing. A deterministic graph `G` is synchronizing if for each vertex `q` in `G` there is a word `w` such that ``idot(G, G, w) == iset([q])``. Parameters ---------- G : deterministic labeled graph Examples -------- >>> G = nx.MultiDiGraph() >>> G.add_edge(1, 1, label="a") >>> G.add_edge(1, 2, label="b") >>> G.add_edge(2, 2, label="a") >>> sd.is_synchronizing(G) False >>> G.add_edge(1, 1, label="c") >>> sd.is_synchronizing(G) True """ return all(w is not None for (_, w) in _sync_words_ics(G))
def is_label_isomorphic_fs(G, H): return find_label_isomorphism_fs(G, H) is not None def find_label_isomorphism_fs(G, H): Gp = nx.relabel_nodes(G, lambda x: (x, 1)) Hp = nx.relabel_nodes(H, lambda x: (x, 2)) GH = nx.union(Gp, Hp) parts, _ = get_follower_equivalences(GH) mapping = {} for part in parts.values(): if len(part) != 2: return None (qG, _), (qH, _) = sorted(part, key=lambda x: x[1]) mapping[qG] = qH return mapping
[docs]def are_shifts_equal_sync(G, H): """Returns True iff the shift presnted by `G` is equivalent to the shift presented by `H`. Requires `G` and `H` to be synchronizing for the return value to be correct. Parameters ---------- G, H : synchronizing deterministic graphs """ return is_label_isomorphic_fs(reduce(G), reduce(H))
[docs]def is_sft(G, return_step=False): """Returns True iff the shift presents by `G` is a shift of finite type (SFT). Requires `G` to be synchronizing for the return value to be correct. Parameters ---------- G : synchronizing deterministic presentation """ Ghat = reduce(G) Ghat = pair_shrink_graph(Ghat) Ghat.remove_node(sink) res = nx.is_directed_acyclic_graph(Ghat) if return_step: return len(Ghat) if res else None else: return res