import copy
import math
import queue
import re
import warnings
from collections.abc import Iterable
from typing import Tuple
from bidict import bidict
from scipy.stats import rv_discrete
# local packages
from specless.factory.builder import Builder
from .base import Automaton
from .pdfa import PDFA
from .transition_system import TransitionSystem
from .types import (
GeneratedTraceData,
Node,
NXEdgeList,
NXNodeList,
Observation,
Probability,
Symbol,
Symbols,
)
from .utils import MaxHeap
# define these type defs for method annotation type hints
TS_Trans_Data = Tuple[Node, Observation]
IS_STOCHASTIC = True
SPEC_VIOLATING_STATE = "q_v"
[docs]class Product(Automaton):
"""
Describes a product automaton between a specification automaton
and a dynamics automaton.
You can use this class to compose the two automaton together and then find
a controller for the dynamical system that satisfies the specification
:param nodes: node list as expected by
networkx.add_nodes_from()
(node label, node attribute
dict)
:param edges: edge list as expected by
networkx.add_edges_from()
(src node label, dest node
label, edge attribute dict)
:param symbol_display_map: bidirectional mapping of
hashable symbols, to a
unique integer index in the
symbol map. Needed to
translate between the
indices in the transition
distribution and the
hashable representation
which is meaningful to the
user
:param alphabet_size: number of symbols in system
alphabet
:param num_states: number of states in
automaton state space
:param start_state: unique start state string
label of system
:param num_obs: number of observation
symbols
:param final_transition_sym: representation of the
termination symbol. If not
given, will default to base
class default.
:param empty_transition_sym: representation of the empty
symbol (a.k.a. lambda). If
not given, will default to
base class default.
:param is_normalized: whether to renormalize the
edge probabilities such
that each states has a well
defined transition
probability distribution.
We typically DONT want to
modify the probabilities of
the product algorithm,
except if we would like to
be able to easily sample
traces
"""
def __init__(
self,
nodes: NXNodeList,
edges: NXEdgeList,
symbol_display_map: bidict,
alphabet_size: int,
num_states: int,
start_state: Node,
num_obs: int,
final_transition_sym: Symbol,
empty_transition_sym: Symbol,
is_normalized: bool,
) -> "Product":
"""
Constructs a new instance of an Product automaton object.
"""
# if we normalize the probabilities
if is_normalized:
is_sampleable = True
else:
is_sampleable = False
# need to start with a fully initialized automaton
super().__init__(
nodes,
edges,
symbol_display_map,
alphabet_size,
num_states,
start_state,
smooth_transitions=False,
is_stochastic=IS_STOCHASTIC,
is_sampleable=is_sampleable,
is_normalized=is_normalized,
num_obs=num_obs,
final_transition_sym=final_transition_sym,
empty_transition_sym=empty_transition_sym,
final_weight_key="final_probability",
state_observation_key="observation",
can_have_accepting_nodes=True,
edge_weight_key="probability",
)
[docs] def compute_strategy(
self,
min_string_probability: {Probability, None} = None,
max_string_length: {int, None} = None,
) -> Tuple[Symbols, Probability]:
"""
Calculates a control strategy for the dynamical system that best
matches the language of the specification.
:param min_string_probability: The minimum string probability
:param max_string_length: The maximum string length
:returns: The sequence of controls to apply, the probability in the
languge of the specification of generating the output word
of the dynamical system under the control symbols.
"""
# using DFS for BMPS as products are often very deep, tree-like
(controls_symbols, obs_prob, _) = self.most_probable_string(
min_string_probability, max_string_length, depth_first=True
)
# None -> completely incompatible
if controls_symbols is None:
msg = (
"no valid controller possible for given settings of "
+ f"min_string_probability {min_string_probability} and "
+ f"max_string_length {max_string_length}."
)
warnings.warn(msg, RuntimeWarning)
return controls_symbols, obs_prob
if len(controls_symbols) == 1:
msg = (
'only "initialization" symbol artificially added to '
+ "the TS found to be most probable controller -> "
+ "specification and dynamical system are "
+ "incompatible. Try adjusting "
+ f"min_string_probability {min_string_probability} and "
+ f"max_string_length {max_string_length}, or trying a "
+ "different solver."
)
warnings.warn(msg, RuntimeWarning)
# as seen above, first symbol is a useless / artifical initialization
# symbol
if isinstance(controls_symbols, Iterable):
controls_symbols = controls_symbols[1:]
return controls_symbols, obs_prob
[docs] def generate_traces(
self,
num_samples: int,
N: int = None,
num_traces_to_find: int = None,
min_trace_probability: Probability = None,
complete_samples: bool = False,
max_resamples: int = 100,
use_greedy_MPS_sampler: bool = True,
force_MPS_sampler: bool = False,
return_whatever_you_got: bool = False,
force_multicore: bool = True,
show_progress_bar: bool = True,
) -> GeneratedTraceData:
"""
Tries to generate num_samples random traces from the product.
:param num_samples: The number of trace samples to
generate
:param N: maximum length of any trace
:param num_traces_to_find: the number of base random traces
to find in the automaton when
using the MPS sampler. This is
not necessarily the same as
num_samples, as often the MPS
sampler is too slow to return
that many samples. Thus, if you
allow for it, the MPS samples can
be resampled to return
num_samples samples after the MPS
sampler is done.
:param min_trace_probability: The minimum trace probability.
only needed when using the MPS
sampler. Lowering this will
result in more random (less
representative traces), but will
make the algorithm much faster.
If set TOO high, you will find no
traces meeting this requirement.
(default 0.0)
:param complete_samples: If enabled, if the underlying
sampler fails to generate
num_samples, then any samples it
does find will be resampled to
create num_samples samples
occurring with correct RELATIVE
frequencies.
:param max_resamples: The maximum number of times to
resample if if we create a trace
of length N that still doesn't
have a probability > 0 in the
language
:param use_greedy_MPS_sampler: whether to try using the MUCH
faster greedy search algorithm.
only possible if the automaton
has deterministic transitions.
Only set this to False if the
automaton actually is
non-deterministic, as the
non-deterministic solver is an
approximation and MUCH slower.
:param force_MPS_sampler: by default, IF THE PRODUCT HAS
BEEN NORMALIZED (and thus is
sampleable), then it will fall
back on the base class' sampler.
Not available if the product is
not is_sampleable. This sampler
is truly a random MC sampler, and
thus is appropriate if you want
to generate traces with more
randomness than the MPS sampler.
:param return_whatever_you_got: Whether to return a string with a
zero probability after all
resampling attempts are
exhausted.
:param force_multicore: whether to force use the threaded
sampler this is set by default to
optimize speed, as the threaded
sampler is slower for smaller
num_samples. Force this to be
true if the automaton is slow to
sample.
:param show_progress_bar: whether to show a tqdm progress
bar for each sampled trace. Only
turn this on if you're sampling a
few traces from a very
expensive-to-sample automaton.
:returns: list of sampled traces, list of the associated trace
lengths, list of the associated trace probabilities
:rtype: tuple(list(list(int)), list(int), list(float))
"""
if not use_greedy_MPS_sampler and N is None:
msg = (
"Must provide a value for N if not using the "
+ "use_greedy_MPS_sampler"
)
raise ValueError(msg)
if self.is_sampleable and not force_MPS_sampler:
results = super().generate_traces(
num_samples=num_samples,
N=N,
max_resamples=max_resamples,
return_whatever_you_got=return_whatever_you_got,
force_multicore=force_multicore,
)
controls, _, sequence_probs = results
if controls is not None:
# convert to max heap to match MPS sampling returns
viable_traces = MaxHeap()
for prob, control in zip(sequence_probs, controls):
viable_traces.heappush((prob, control))
else:
viable_traces = None
else:
# making sure that these params were provided if using MPS sampler
if num_traces_to_find is None and not use_greedy_MPS_sampler:
num_traces_to_find = num_samples
msg = (
"No value given for num_traces_to_find. Using "
+ "default value of num_traces_to_find = num_samples "
+ f"({num_samples}). This may be too slow for large "
+ "product automata. Try providing a value of "
+ "num_traces_to_find less than num_samples and "
+ "enabling complete_samples"
)
warnings.warn(msg)
if min_trace_probability is None and not use_greedy_MPS_sampler:
min_trace_probability = 0.0
msg = (
"No value given for min_trace_probability. Using "
+ f"default value of {min_trace_probability}"
)
warnings.warn(msg)
if use_greedy_MPS_sampler:
_, _, viable_traces = self.most_probable_string(
try_to_use_greedy=use_greedy_MPS_sampler
)
else:
# we want to sample from both the most and least likely traces
# to get some diversity for resampling
N_DFS = math.ceil(num_traces_to_find / 2)
_, _, viable_traces_min = self.most_probable_string(
min_string_probability=min_trace_probability,
max_string_length=N,
num_strings_to_find=N_DFS,
try_to_use_greedy=False,
backwards_search=True,
depth_first=True,
add_entropy=True,
)
N_BFS = math.ceil(num_traces_to_find / 2)
_, _, viable_traces_max = self.most_probable_string(
min_string_probability=min_trace_probability,
max_string_length=N,
num_strings_to_find=N_BFS,
try_to_use_greedy=False,
backwards_search=True,
depth_first=False,
add_entropy=True,
)
# merge the two heaps
viable_traces = MaxHeap()
for item_1 in viable_traces_min:
viable_traces.heappush(item_1)
for item_2 in viable_traces_max:
viable_traces.heappush(item_2)
# need to post-process the sampled data, as this is a product
if viable_traces is not None:
# resampling the viable traces to ensure we always have num_samples
# samples
if len(viable_traces) < num_samples and complete_samples:
probs, symbols = zip(*viable_traces)
# need to normalize the probabilities, as we won't have the
# full trace distribution
normalized_probs = [prob / sum(probs) for prob in probs]
trace_idxs = list(range(len(symbols)))
trace_dist = rv_discrete(values=(trace_idxs, normalized_probs))
sampled_trace_idxs = trace_dist.rvs(size=num_samples)
new_traces = [symbols[idx] for idx in sampled_trace_idxs]
new_probs = [probs[idx] for idx in sampled_trace_idxs]
viable_traces = MaxHeap()
for prob, trace in zip(new_probs, new_traces):
viable_traces.heappush((prob, trace))
samples, trace_lengths, trace_probs = [], [], []
for prob, controls in viable_traces:
if controls is not None:
# the first control symbol is always the initialization
# symbol, so we should remove it for general use
if isinstance(controls, Iterable):
sample = controls[1:]
else:
sample = controls
sample_length = len(sample)
else:
sample = controls
sample_length = None
samples.append(sample)
trace_lengths.append(sample_length)
trace_probs.append(prob)
else:
samples, trace_lengths, trace_probs = None, None, None
return samples, trace_lengths, trace_probs
[docs] def observe(self, curr_state: Node) -> Observation:
"""
Returns the given state's observation symbol
:param curr_state: The current product state
:returns: observation symbol emitted at curr_state
"""
return self._get_node_data(curr_state, "observation")
def _set_state_acceptance(self, curr_state: Node) -> None:
"""
Sets the state acceptance property for the given state.
If curr_state's final_probability == 1.00 then the state is guaranteed
to be final
:param curr_state: The current state's node label
"""
curr_final_prob = self._get_node_data(curr_state, "final_probability")
if curr_final_prob >= 1.00:
state_accepts = True
else:
state_accepts = False
self._set_node_data(curr_state, "is_accepting", state_accepts)
@classmethod
def _complete_specification(cls, specification: PDFA) -> PDFA:
"""
processes the automaton and makes sure each state has a transition for
each symbol
completed nodes will be sent to "violating", cyclic state with
uniform probability over all symbols, as producing the missing symbols
is impossible given the language defined by the specification
:param specification: The specification to complete
:returns: the completed version of the specification
"""
# first need to define and add the "violating" state to the
# specification's underlying graph
violating_state = SPEC_VIOLATING_STATE
violating_state_props = {
"final_probability": 0.00,
"trans_distribution": None,
"is_accepting": None,
"is_violating": True,
}
specification.add_node(violating_state, **violating_state_props)
specification._initialize_node_edge_properties(
final_weight_key="final_probability",
can_have_accepting_nodes=True,
edge_weight_key="probability",
should_complete=True,
violating_state=violating_state,
complete="violate",
)
return specification
@classmethod
def _augment_initial_state(
cls, dynamical_system: TransitionSystem, specification: PDFA
) -> TransitionSystem:
"""
Adds an initialization state to the dynamical system to maintain
language distributional similarity with the specification
:param dynamical_system: The dynamical system to augment
:param specification: The specification to take a product with
:returns: The transition system with a new initialization state added
"""
initialization_state = "x_init"
spec_empty_symbol = specification.empty_transition_sym
initialization_state_props = {"observation": spec_empty_symbol}
if spec_empty_symbol not in dynamical_system.observations:
dynamical_system.observations.add(spec_empty_symbol)
dynamical_system.num_obs += 1
dynamical_system.add_node(initialization_state, **initialization_state_props)
old_start_state = dynamical_system.start_state
dynamical_system.start_state = initialization_state
# can choose any symbol to be the initialization symbol, doesn't matter
initialization_symbol = list(dynamical_system.symbols)[0]
initialization_edge_props = {
"symbol": initialization_symbol,
"probability": 1.00,
}
dynamical_system.add_edge(
initialization_state, old_start_state, **initialization_edge_props
)
dynamical_system._initialize_node_edge_properties(
can_have_accepting_nodes=False,
state_observation_key="observation",
should_complete=False,
)
return dynamical_system
@classmethod
def _compute_product(
cls, dynamical_system: TransitionSystem, specification: PDFA
) -> dict:
"""
Calculates the product automaton given pre-processed automata
:param dynamical_system: The dynamical system
:param specification: The specification
:returns: The initialized product automaton configuration data
"""
# naming to follow written algorithm
T = dynamical_system
A = specification
Sigma = dynamical_system.symbols
x_init = T.start_state
q_init = A.start_state
init_prod_state = cls._get_product_state_label(x_init, q_init)
nodes = {}
edges = {}
search_queue = queue.Queue()
search_queue.put((x_init, q_init))
visited = set()
while not search_queue.empty():
x, q = search_queue.get()
for sigma in Sigma:
dyn_trans = (x, sigma)
dynamically_compatible = dyn_trans in T._transition_map
if not dynamically_compatible:
specification_compatible = False
else:
x_prime = T._transition_map[dyn_trans]
o_x_prime = T.observe(x_prime)
spec_trans = (q, o_x_prime)
specification_compatible = spec_trans in A._transition_map
if dynamically_compatible and specification_compatible:
q_prime, trans_prob = A._get_next_state(q, o_x_prime)
q_final_prob = A._get_node_data(q, "final_probability")
q_prime_final_prob = A._get_node_data(q_prime, "final_probability")
o_x = T.observe(x)
(nodes, edges, _, _) = cls._add_product_edge(
T,
nodes,
edges,
x_src=x,
x_dest=x_prime,
q_src=q,
q_dest=q_prime,
q_src_final_prob=q_final_prob,
q_dest_final_prob=q_prime_final_prob,
observation_src=o_x,
observation_dest=o_x_prime,
sigma=sigma,
trans_prob=trans_prob,
)
prod_dest_state = (x_prime, q_prime)
if prod_dest_state not in visited:
visited.add(prod_dest_state)
search_queue.put(prod_dest_state)
return cls._package_data(T, nodes, edges, init_prod_state)
@classmethod
def _get_product_state_label(
cls, dynamical_system_state: Node, specification_state: Node
) -> Node:
"""
Computes the combined product state label
:param dynamical_system_state: The dynamical system state label
:param specification_state: The specification state label
:returns: The product state label.
"""
if type(dynamical_system_state) != str:
dynamical_system_state = str(dynamical_system_state)
if type(specification_state) != str:
specification_state = str(specification_state)
return dynamical_system_state + ", " + specification_state
@classmethod
def _breakdown_product_state(cls, product_state: Node) -> Tuple[Node, Node]:
"""
Gets the dynamical system and specification states from a product state
:param product_state: The product state label
:returns: dynamical system state label, specification state label
"""
m = re.findall(r"(.+?)(?:,\s*|$)", product_state)
x = m[0]
q = m[1]
return x, q
@classmethod
def _add_product_node(
cls,
dynamical_system: TransitionSystem,
nodes: dict,
x: Node,
q: Node,
q_final_prob: Probability,
observation: Observation,
) -> Tuple[dict, Node]:
"""
Adds a newly identified product state to the nodes dict w/ needed data
:param dynamical_system: The dynamical system
:param nodes: dict of nodes to build the product out
of. must be in the format needed by
networkx.add_nodes_from()
:param x: state label in the dynamical system
:param q: state label in the specification
:param q_final_prob: the probability of terminating at q in
the specification
:param observation: The observation emitted by the dynamical
system / product at the dynamical system
state (x)
:returns: nodes dict populated with all of the given data, and the
label of the newly added product state
"""
prod_state = cls._get_product_state_label(x, q)
is_violating = q == SPEC_VIOLATING_STATE
if prod_state not in nodes:
prod_state_data = {
"final_probability": q_final_prob,
"trans_distribution": None,
"is_violating": is_violating,
"is_accepting": None,
"observation": observation,
}
if "color" in dynamical_system.nodes[x]:
color = dynamical_system.nodes[x]["color"]
prod_state_data.update({"color": color})
nodes[prod_state] = prod_state_data
return nodes, prod_state
@classmethod
def _add_product_edge(
cls,
dynamical_system: TransitionSystem,
nodes: dict,
edges: dict,
x_src: Node,
x_dest: Node,
q_src: Node,
q_dest: Node,
q_src_final_prob: Probability,
q_dest_final_prob: Probability,
observation_src: Observation,
observation_dest: Observation,
sigma: Symbol,
trans_prob: Probability,
) -> Tuple[dict, dict, Node, Node]:
"""
Adds a newly identified product edge to the nodes & edges dicts
:param dynamical_system: The dynamical system
:param nodes: dict of nodes to build the product out
of. Must be in the format needed by
networkx.add_nodes_from()
:param edges: dict of edges to build the product out
of. Must be in the format needed by
networkx.add_edges_from()
:param x_src: source product edge's dynamical system
state
:param x_dest: dest. product edge's dynamical system
state
:param q_src: source product edge's specification
state
:param q_dest: dest. product edge's specification
state
:param q_src_final_prob: the probability of terminating at q_src
in the specification
:param q_dest_final_prob: the probability of terminating at
q_dest in the specification
:param observation_src: The observation emitted by the
dynamical system / product at the
source dynamical system state (x_src)
:param observation_dest: The observation emitted by the
dynamical system / product at the
dest. dynamical system state (x_dest)
:param sigma: dynamical system control input symbol
enabling the product edge
:param trans_prob: The product edge's transition prob.
:returns: nodes dict populated w/ all the given data for src & dest
edges dict populated w/ all the given data,
the label of the newly added source product state,
the label of the newly added product state
"""
nodes, prod_src = cls._add_product_node(
dynamical_system, nodes, x_src, q_src, q_src_final_prob, observation_src
)
nodes, prod_dest = cls._add_product_node(
dynamical_system, nodes, x_dest, q_dest, q_dest_final_prob, observation_dest
)
prod_edge_data = {"symbols": [sigma], "probabilities": [trans_prob]}
prod_edge = {prod_dest: prod_edge_data}
if prod_src in edges:
if prod_dest in edges[prod_src]:
existing_edge_data = edges[prod_src][prod_dest]
existing_edge_data["symbols"].extend(prod_edge_data["symbols"])
new_probs = prod_edge_data["probabilities"]
existing_edge_data["probabilities"].extend(new_probs)
edges[prod_src][prod_dest] = existing_edge_data
else:
edges[prod_src].update(prod_edge)
else:
edges[prod_src] = prod_edge
return nodes, edges, prod_src, prod_dest
@classmethod
def _package_data(
cls,
dynamical_system: TransitionSystem,
nodes: dict,
edges: dict,
init_prod_state: Node,
) -> dict:
config_data = {}
final_sym = dynamical_system.final_transition_sym
config_data["final_transition_sym"] = final_sym
empty_sym = dynamical_system.empty_transition_sym
config_data["empty_transition_sym"] = empty_sym
# can directly compute these from the graph data
symbols = set()
state_labels = set()
observations = set()
for state, edge in edges.items():
for _, edge_data in edge.items():
symbols.update(edge_data["symbols"])
state_labels.add(state)
for node in nodes.keys():
observation = nodes[node]["observation"]
observations.add(observation)
alphabet_size = len(symbols)
num_states = len(state_labels)
num_obs = len(observations)
config_data["alphabet_size"] = alphabet_size
config_data["num_states"] = num_states
config_data["num_obs"] = num_obs
(symbol_display_map, nodes, edges) = Automaton._convert_states_edges(
nodes, edges, final_sym, empty_sym, is_stochastic=IS_STOCHASTIC
)
config_data["nodes"] = nodes
config_data["edges"] = edges
config_data["start_state"] = init_prod_state
config_data["symbol_display_map"] = symbol_display_map
return config_data
[docs]class ProductBuilder(Builder):
"""
Implements the generic automaton builder class for Product objects
"""
def __init__(self) -> "ProductBuilder":
"""
Constructs a new instance of the ProductBuilder
"""
# need to call the super class constructor to gain its properties
Builder.__init__(self)
# keep these properties so we don't re-initialize unless underlying
# data changes
self.nodes = None
self.edges = None
[docs] def __call__(
self,
graph_data: Tuple[TransitionSystem, PDFA],
graph_data_format: str = "existing_objects",
**kwargs: dict,
) -> Product:
"""
Returns an initialized Product instance given the graph_data
graph_data and graph_data_format must match
:param graph_data: The graph configuration file name
:param graph_data_format: The graph data file format.
{'existing_objects'}
:param kwargs: The keywords arguments to the specific
constructors
:returns: instance of an initialized Product object
:raises ValueError: checks if graph_data and
graph_data_format have a compatible
data loader
"""
if graph_data_format == "existing_objects":
sys = graph_data[0]
spec = graph_data[1]
self._instance = self._from_automata(
dynamical_system=sys, specification=spec, **kwargs
)
else:
msg = (
"graph_data_format ({}) must be one of: "
+ '"existing_objects"'.format()
)
raise ValueError(msg)
return self._instance
def _from_automata(
self,
dynamical_system: TransitionSystem,
specification: PDFA,
normalize_trans_probabilities: bool = False,
show_steps: bool = False,
) -> Product:
"""
Returns an instance of a Product Automaton from existing automata
:param dynamical_system: The dynamical system
automaton instance
:param specification: The specification automaton
instance
:param normalize_trans_probabilities: whether to renormalize the
edge probabilities such
that each states has a well
defined transition
probability distribution.
We typically DONT want to
modify the probabilities
of the product algorithm,
except if we would like
to be able to easily sample
from the automaton.
:param show_steps: draw intermediate steps in
the product creation
:returns: instance of an initialized Product automaton object
"""
# don't want to destroy the automaton when we pre-process them
internal_dyn_sys = copy.deepcopy(dynamical_system)
augmented_dyn_sys = Product._augment_initial_state(
internal_dyn_sys, specification
)
if show_steps:
augmented_dyn_sys.draw("augment_initial_state")
config_data = Product._compute_product(augmented_dyn_sys, specification)
config_data["is_normalized"] = normalize_trans_probabilities
# saving these so we can just return initialized instances if the
# underlying data has not changed
self.nodes = config_data["nodes"]
self.edges = config_data["edges"]
if not self.edges:
msg = "no compatible edges were found, so the product is empty"
warnings.warn(msg, RuntimeWarning)
self._instance = None
else:
self._instance = Product(**config_data)
return self._instance