Module circuitgraph.tx
Functions for transforming circuits.
Examples
Synthesize a circuit using yosys
>>> import circuitgraph as cg
>>> c = cg.from_lib("c1908")
>>> c = cg.tx.syn(c, suppress_output=True)
Expand source code
"""
Functions for transforming circuits.
Examples
--------
Synthesize a circuit using yosys
>>> import circuitgraph as cg
>>> c = cg.from_lib("c1908")
>>> c = cg.tx.syn(c, suppress_output=True)
"""
import os
import re
import shutil
import subprocess
from collections import defaultdict
from functools import reduce
from pathlib import Path
from queue import Queue
from tempfile import NamedTemporaryFile
import networkx as nx
import circuitgraph as cg
def strip_io(c):
"""
Remove a circuit's outputs and convert inputs to buffers.
Parameters
----------
c : Circuit
Input circuit.
Returns
-------
Circuit
Circuit with removed io.
"""
g = c.graph.copy()
for i in c.inputs():
g.nodes[i]["type"] = "buf"
for o in c.outputs():
g.nodes[o]["output"] = False
return cg.Circuit(graph=g, name=c.name, blackboxes=c.blackboxes.copy())
def strip_outputs(c):
"""
Remove a circuit's outputs for easy instantiation.
Parameters
----------
c : Circuit
Input circuit.
Returns
-------
Circuit
Circuit with removed io.
"""
g = c.graph.copy()
for o in c.outputs():
g.nodes[o]["output"] = False
return cg.Circuit(graph=g, name=c.name, blackboxes=c.blackboxes.copy())
def strip_inputs(c):
"""
Convert inputs to buffers for easy instantiation.
Parameters
----------
c : Circuit
Input circuit.
Returns
-------
Circuit
Circuit with removed io.
"""
g = c.graph.copy()
for i in c.inputs():
g.nodes[i]["type"] = "buf"
return cg.Circuit(graph=g, name=c.name, blackboxes=c.blackboxes.copy())
def strip_blackboxes(c, ignore_pins=None):
"""
Convert blackboxes to io.
Parameters
----------
c : Circuit
Input circuit.
ingnore_pins: str or list of str
Pins to not create io for, just disconnect and delete.
Returns
-------
Circuit
Circuit with removed blackboxes.
"""
if not ignore_pins:
ignore_pins = []
elif isinstance(ignore_pins, str):
ignore_pins = [ignore_pins]
g = c.graph.copy()
bb_pins = []
for n in c.filter_type("bb_input"):
if n.split(".")[-1] in ignore_pins:
g.remove_node(n)
else:
g.nodes[n]["type"] = "buf"
g.nodes[n]["output"] = True
bb_pins.append(n)
for n in c.filter_type("bb_output"):
if n.split(".")[-1] in ignore_pins:
g.remove_node(n)
else:
g.nodes[n]["type"] = "input"
bb_pins.append(n)
# rename nodes
mapping = {n: n.replace(".", "_") for n in bb_pins}
for k in mapping.values():
if k in g:
raise ValueError(f"Overlapping blackbox name: {k}")
nx.relabel_nodes(g, mapping, copy=False)
return cg.Circuit(graph=g, name=c.name)
def relabel(c, mapping):
"""
Build copy of circuit with relabeled nodes.
Parameters
----------
c : Circuit
Input circuit.
mapping : dict of str:str
Relabeling of nodes.
Returns
-------
Circuit
Circuit with removed blackboxes.
"""
g = nx.relabel_nodes(c.graph, mapping)
return cg.Circuit(graph=g, name=c.name, blackboxes=c.blackboxes.copy())
def subcircuit(c, nodes, modify_io=False):
"""
Create a subcircuit from a set of nodes of a given circuit.
Parameters
----------
c: Circuit
The circuit to create a subcircuit from.
nodes: list of str
The nodes to include in the subcircuit.
modify_io: bool
If True, gates without drivers will be turned into inputs and gates without
fanout will be marked as outputs.
Returns
-------
Circuit
The subcircuit.
"""
sc = cg.Circuit()
for node in nodes:
if c.type(node) in ["bb_output", "bb_input"]:
raise NotImplementedError("Cannot create a subcircuit with blackboxes")
sc.add(node, c.type(node), output=c.is_output(node))
for edge in c.edges():
if edge[0] in nodes and edge[1] in nodes:
sc.connect(edge[0], edge[1])
if modify_io:
for node in sc:
if sc.type(node) not in ["0", "1", "x"] and not sc.fanin(node):
sc.set_type(node, "input")
if not sc.fanout(node):
sc.set_output(node)
return sc
def syn(
c,
engine="yosys",
suppress_output=False,
stdout_file=None,
stderr_file=None,
working_dir=".",
fast_parsing=False,
pre_syn_file=None,
post_syn_file=None,
verilog_exists=False,
effort="high",
):
"""
Synthesize the circuit using a third-party synthesis tool.
Parameters
----------
c : Circuit
Circuit to synthesize.
engine : str
Synthesis tool to use ('genus', 'dc', or 'yosys').
suppress_output: bool
If True, synthesis stdout will not be printed.
stdout_file: file or str or None
If defined, synthesis stdout will be directed to this file instead
of being printed.
output_file: file or str or None
If defined, synthesis stderr will be written to this file instead
of being printed.
working_dir: str
The path to run synthesis from. If using genus, this will effect
where the genus run files are stored. Directory will be created
if it does not exist.
fast_parsing: bool
If True, will use fast verilog parsing (which requires
specifically formatted netlists, see the documentation for
`verilog_to_circuit`).
pre_syn_file: file or str or None
If specified, the circuit verilog will be written to this file
before synthesis. If None, a temporary file will be used.
post_syn_file: file or str or None
If specified, the synthesis output verilog will be written to this
file. If None, a temporary file will be used.
verilog_exists: bool
If True, does not write `c` to a file, instead uses the verilog
already present in `pre_syn_file`.
effort: str
The effort to use for synthesis. Either 'high', 'medium', or 'low'.
Returns
-------
Circuit
Synthesized circuit.
"""
if engine == "yosys" and shutil.which("yosys") is None:
raise OSError("'yosys' installation not found")
if engine == "genus" and shutil.which("genus") is None:
raise OSError("'genus' installation not found")
if engine == "dc":
dc_engine = "dc_shell-t"
if shutil.which("dc_shell-t") is None:
dc_engine = "dc_shell"
if shutil.which("dc_shell") is None:
raise OSError("'dc_shell-t' or 'dc_shell' installation not found")
working_dir = Path(working_dir)
working_dir.mkdir(exist_ok=True)
working_dir = str(working_dir)
# Make paths absolute in case synthesis is run from different working dir
if pre_syn_file:
pre_syn_file = Path(pre_syn_file).absolute()
if post_syn_file:
post_syn_file = Path(post_syn_file).absolute()
if verilog_exists and not pre_syn_file:
raise ValueError("Must specify pre_syn_file if using verilog_exists")
with open(pre_syn_file) if verilog_exists else open(
pre_syn_file, "w"
) if pre_syn_file else NamedTemporaryFile(
prefix="circuitgraph_synthesis_input", suffix=".v", mode="w"
) as tmp_in:
if not verilog_exists:
verilog = cg.io.circuit_to_verilog(c)
tmp_in.write(verilog)
tmp_in.flush()
with open(post_syn_file, "w+") if post_syn_file else NamedTemporaryFile(
prefix="circuitgraph_synthesis_output", suffix=".v", mode="r"
) as tmp_out:
if engine == "genus":
try:
lib_path = os.environ["CIRCUITGRAPH_GENUS_LIBRARY_PATH"]
except KeyError as e:
raise ValueError(
"In order to run synthesis with Genus, please set the "
"CIRCUITGRAPH_GENUS_LIBRARY_PATH variable in your os "
"environment to the path to the tech library to use"
) from e
cmd = [
"genus",
"-no_gui",
"-execute",
"set_db / .library "
f"{lib_path};\n"
f"read_hdl -sv {tmp_in.name};\n"
"elaborate;\n"
f"set_db syn_generic_effort {effort};\n"
"syn_generic;\n"
"syn_map;\n"
"syn_opt;\n"
f'redirect {tmp_out.name} "write_hdl -generic";\n'
"exit;",
]
elif engine == "dc":
try:
lib_path = os.environ["CIRCUITGRAPH_DC_LIBRARY_PATH"]
except KeyError as e:
raise ValueError(
"In order to run synthesis with DC, please set the "
"CIRCUITGRAPH_DC_LIBRARY_PATH variable in your os environment "
"to the path to the GTECH library"
) from e
libname = "GTECH"
usable_cells = [f"{libname.lower()}/{libname}_NOT"]
for gate in ["OR", "NOR", "AND", "NAND", "XOR", "XNOR"]:
usable_cells += [
f"{libname.lower()}/{libname}_{gate}{i}" for i in range(2, 5)
]
usable_cells += [
f"{libname.lower()}/{libname}_FD{i}" for i in range(1, 4)
]
execute = (
f"set_app_var target_library {lib_path};\n"
f"set_app_var link_library {lib_path};\n"
"set_dont_use [remove_from_collection "
f"[get_lib_cells {libname.lower()}/*] "
f"\"{' '.join(usable_cells)}\"];\n"
f"read_file {tmp_in.name}\n"
"link;\n"
"uniquify;\n"
"check_design;\n"
"simplify_constants;\n"
f"compile;\n"
f"write -format verilog -output {tmp_out.name};\n"
"exit;"
)
cmd = [dc_engine, "-no_gui", "-x", execute]
elif engine == "yosys":
cmd = [
"yosys",
"-p",
f"read_verilog {tmp_in.name}; "
"synth; "
f"write_verilog -noattr {tmp_out.name}",
]
else:
raise ValueError("synthesis engine must be yosys, dc, or genus")
if suppress_output and not stdout_file:
stdout = subprocess.DEVNULL
elif stdout_file:
stdout = open(stdout_file, "w")
else:
stdout = None
if stderr_file:
stderr = open(stderr_file, "w")
else:
stderr = None
subprocess.run(
cmd, stdout=stdout, stderr=stderr, cwd=working_dir, check=True
)
if stdout_file:
stdout.close()
if stderr_file:
stderr.close()
output_netlist = tmp_out.read()
# Rename dc library gates
if engine == "dc":
def replace_gate(match):
# Keep flops as they are
if match.group(1).startswith(f"{libname}_FD"):
return match
ports = [
i.strip().split("(")[-1].strip(")")
for i in match.group(3).split(",")
]
portlist = ", ".join(reversed(ports))
return f"{match.group(1).lower()} {match.group(2)}({portlist});"
output_netlist = re.sub(
rf"{libname}_([A-Z]+)[1-4]?\s+"
r"([a-zA-Z][a-zA-Z\d_]*)\s*\(([^;]+)\);",
replace_gate,
output_netlist,
)
return cg.io.verilog_to_circuit(output_netlist, c.name, fast=fast_parsing)
def aig(c):
"""
Transform a circuit into and and-inverter graph.
Parameters
----------
c: Circuit
The circuit to transform to an AIG.
Returns
-------
Circuit
The AIG circuit.
"""
with NamedTemporaryFile(
prefix="circuitgraph_aig_input", suffix=".v", mode="w"
) as tmp_in:
cg.to_file(c, tmp_in.name)
with NamedTemporaryFile(
prefix="circuitgraph_aig_output", suffix=".v", mode="r"
) as tmp_out:
execute = (
f"read_verilog {tmp_in.name}; aigmap; "
"opt; "
f"write_verilog -noattr {tmp_out.name}"
)
subprocess.run(
["yosys", "-p", execute], stdout=subprocess.DEVNULL, check=True
)
c = cg.from_file(tmp_out.name)
# c = remove_bufs(c)
return c
def ternary(c):
"""
Encode the circuit with ternary values.
The ternary circuit adds a second net for each net in the original circuit.
The second net encodes a don't care, or X, value. That net being high
corresponds to a don't care value on original net. If the second net is
low, the logical value on the original net is valid.
Parameters
----------
c : Circuit
Circuit to encode.
suffix: str
The suffix to give the added nets. Note that it is safest to use
the returned dictionary to refer to the added nets because they
are uniquified when they are added to the circuit.
Returns
-------
Circuit, dict of str:str
Encoded circuit and dictionary mapping original net names to added ternary
net names.
"""
if c.blackboxes:
raise ValueError(f"{c.name} contains a blackbox")
t = c.copy()
# add dual nodes
mapping = {n: c.uid(f"{n}_X") for n in c}
for n in c:
if c.type(n) in ["and", "nand"]:
t.add(mapping[n], "and", output=c.is_output(n), allow_redefinition=True)
t.add(
f"{n}_x_in_fi",
"or",
fanout=mapping[n],
fanin=[mapping[p] for p in c.fanin(n)],
uid=True,
add_connected_nodes=True,
)
zero_not_in_fi = t.add(
f"{n}_0_not_in_fi", "nor", fanout=mapping[n], uid=True
)
for p in c.fanin(n):
t.add(
f"{p}_is_0",
"nor",
fanout=zero_not_in_fi,
fanin=[p, mapping[p]],
uid=True,
)
elif c.type(n) in ["or", "nor"]:
t.add(mapping[n], "and", output=c.is_output(n), allow_redefinition=True)
t.add(
f"{n}_x_in_fi",
"or",
fanout=mapping[n],
fanin=[mapping[p] for p in c.fanin(n)],
uid=True,
add_connected_nodes=True,
)
one_not_in_fi = t.add(
f"{n}_1_not_in_fi", "nor", fanout=mapping[n], uid=True
)
for p in c.fanin(n):
is_one = t.add(
f"{p}_is_1", "and", fanout=one_not_in_fi, fanin=p, uid=True
)
t.add(f"{p}_not_x", "not", fanout=is_one, fanin=mapping[p], uid=True)
elif c.type(n) in ["buf", "not"]:
p = c.fanin(n).pop()
t.add(
mapping[n],
"buf",
fanin=mapping[p],
output=c.is_output(n),
add_connected_nodes=True,
allow_redefinition=True,
)
elif c.type(n) in ["xor", "xnor"]:
t.add(
mapping[n],
"or",
fanin=[mapping[p] for p in c.fanin(n)],
output=c.is_output(n),
add_connected_nodes=True,
allow_redefinition=True,
)
elif c.type(n) in ["0", "1"]:
t.add(mapping[n], "0", output=c.is_output(n), allow_redefinition=True)
elif c.type(n) in ["input"]:
t.add(mapping[n], "input", allow_redefinition=True)
else:
raise ValueError(f"Node '{n}' has invalid type: '{c.type(n)}'")
return t, mapping
def miter(c0, c1=None, startpoints=None, endpoints=None):
"""
Create a miter circuit.
Parameters
----------
c0 : Circuit
First circuit.
c1 : Circuit
Optional second circuit, if None c0 is mitered with itself.
startpoints : set of str
Nodes to be tied together, must exist in both circuits.
endpoints : set of str
Nodes to be compared, must exist in both circuits.
Returns
-------
Circuit
Miter circuit.
"""
# check for blackboxes
if c0.blackboxes:
raise ValueError(f"{c0.name} contains a blackbox")
if c1 and c1.blackboxes:
raise ValueError(f"{c1.name} contains a blackbox")
# clean inputs
if not c1:
c1 = c0
if not startpoints:
startpoints = c0.startpoints() & c1.startpoints()
if not endpoints:
endpoints = c0.endpoints() & c1.endpoints()
# create miter, relabel
m = cg.Circuit(name=f"miter_{c0.name}_{c1.name}")
m.add_subcircuit(c0, "c0")
m.add_subcircuit(c1, "c1")
# tie inputs
for n in startpoints:
m.add(n, "input", fanout=[f"c0_{n}", f"c1_{n}"])
# compare outputs
m.add("sat", "or" if len(endpoints) > 1 else "buf", output=True)
for n in endpoints:
m.add(f"dif_{n}", "xor", fanin=[f"c0_{n}", f"c1_{n}"], fanout="sat")
return m
def sequential_unroll(
c,
n,
reg_d_port,
reg_q_port,
ignore_pins=None,
add_flop_outputs=False,
initial_values=None,
remove_unloaded=True,
prefix="cg_unroll",
):
"""
Unroll a sequential circuit.
Provides a higher level API than `unroll` by accepting a circuit with
sequential elements kept as blackboxes. Assumes that all blackboxes in
the circuit are sequential elements.
Parameters
----------
c: Circuit
Circuit to unroll.
n: int
The number of unrolled copies of the circuit to create.
reg_d_port: str
The name of the D port in the blackboxes in `c`.
reg_q_port: str
The name of the Q port in the blackboxes in `c`.
ignore_pins: str or list of str
The names of pins in the blackboxes to ignore.
add_flop_outputs: bool
If True, the Q port of the flops will be added as primary outputs.
initial_values: str or dict of str:str
The initial values of the data ports for the first timestep.
If None, the ports will be added as primary inputs.
If a single value ('0', '1', or 'x'), every flop will get that value.
Can also pass in dict mapping flop names to values.
remove_unloaded: bool
If True, unloaded inputs will be removed after unrolling. This can remove
unused sequential signals such as the clock and reset.
prefix: str
The prefix to use for naming unrolled nodes.
Returns
-------
Circuit, dict of str:list of str
Unrolled circuit and mapping of original circuit io to list of unrolled
circuit io. The lists are in order of the unroll iterations.
"""
cs = strip_blackboxes(c, ignore_pins=ignore_pins)
blackbox = c.blackboxes[set(c.blackboxes.keys()).pop()]
if reg_d_port not in blackbox.inputs():
raise ValueError(f"Provided d port {reg_d_port} not in bb inputs")
cs.remove(
f"{bb}_{p}" for p in blackbox.inputs() - {reg_d_port} for bb in c.blackboxes
)
if reg_q_port not in blackbox.outputs():
raise ValueError(f"Provided q port {reg_q_port} not in bb outputs")
cs.remove(
f"{bb}_{p}" for p in blackbox.outputs() - {reg_q_port} for bb in c.blackboxes
)
if remove_unloaded:
for i in cs.inputs():
if not cs.fanout(i):
cs.remove(i)
state_io = {f"{bb}_{reg_d_port}": f"{bb}_{reg_q_port}" for bb in c.blackboxes}
uc, io_map = unroll(cs, n, state_io, prefix=prefix)
for state_output in (f"{bb}_{reg_d_port}" for bb in c.blackboxes):
uc.set_output(io_map[state_output], add_flop_outputs)
if initial_values:
if isinstance(initial_values, str):
for fi in [io_map[f"{bb}_{reg_q_port}"][0] for bb in c.blackboxes]:
uc.set_type(fi, initial_values)
else:
for k, v in initial_values.items():
uc.set_type(io_map[f"{k}_{reg_q_port}"][0], v)
return uc, io_map
def unroll(c, n, state_io, prefix="cg_unroll"):
"""
Unroll a circuit.
Create multiple copies of the circuit and connect together state io.
Parameters
----------
c: Circuit
Circuit to unroll.
n: int
The number of unrolled copies of the circuit to create.
state_io: dict of str:str
For each `(k, v)` pair in the dict, `k` of circuit iteration `n - 1` will be
tied to `v` of circuit iteration `n`.
prefix: str
The prefix to use for naming new io for each iteration.
Returns
-------
Circuit, dict of str:list of str
Unrolled circuit and mapping of original circuit io to list of unrolled
circuit io. The lists are in order of the unroll iterations.
"""
# check for blackboxes
if c.blackboxes:
raise ValueError(f"{c.name} contains a blackbox")
if n < 1:
raise ValueError(f"n must be >= 1 ({n})")
for k, v in state_io.items():
if k not in c.io():
raise ValueError(f"Node '{k}' in state_io dict but not in io of circuit")
if v not in c.io():
raise ValueError(f"Node '{v}' in state_io dict but not in io of circuit")
uc = cg.Circuit()
io_map = {io: [] for io in c.io()}
for itr in range(n):
for io in c.io():
new_io = c.uid(f"{io}_{prefix}_{itr}")
if io in state_io or io in state_io.values():
t = "buf"
elif io in c.inputs():
t = "input"
else:
t = "buf"
uc.add(new_io, t, output=c.is_output(io))
io_map[io].append(new_io)
uc.add_subcircuit(
c, f"unrolled_{itr}", {i: f"{i}_{prefix}_{itr}" for i in c.io()}
)
if itr == 0:
for i in state_io.values():
uc.set_type(f"{i}_{prefix}_{itr}", "input")
else:
for k, v in state_io.items():
uc.connect(f"{k}_{prefix}_{itr-1}", f"{v}_{prefix}_{itr}")
return uc, io_map
def sensitization_transform(c, n, endpoints=None):
"""
Create a circuit to sensitize a node to an endpoint.
Create a miter circuit with that node inverted in one circuit copy.
Parameters
----------
c : Circuit
Input circuit.
n : str
Node to sensitize.
endpoints: str or list of str
Endpoints to sensitize to. If None, any output
can be used for sensitization.
Returns
-------
Circuit
Output circuit.
"""
# check for blackboxes
if c.blackboxes:
raise ValueError("Circuit contains a blackbox")
if endpoints:
if isinstance(endpoints, str):
endpoints = {endpoints}
else:
endpoints = set(endpoints)
fi = c.transitive_fanin(endpoints)
if n not in fi:
raise ValueError(f"'{n}' is not in fanin of given endpoints")
subc = subcircuit(c, endpoints | fi)
for node in subc:
subc.set_output(node, node in endpoints)
miter_name = f"{c.name}_sensitize_{n}_to_{'_'.join(endpoints)}"
else:
subc = c
miter_name = f"{c.name}_sensitize_{n}"
# create miter
m = miter(subc)
m.name = miter_name
# flip node in c1
m.disconnect(m.fanin(f"c1_{n}"), f"c1_{n}")
m.set_type(f"c1_{n}", "not")
m.connect(f"c0_{n}", f"c1_{n}")
return m
def sensitivity_transform(c, n):
"""
Create a circuit to compute sensitivity.
Creatie a miter circuit for each input 'i' with the fanin cone of `n`
where the second circuit has 'i' inverted, so that the miter output is
high when `n` is sensitive to 'i'. The uninverted circuit is shared
across all miters and the outputs of the miters are fed into a
population count circuit so that the output of the population count
circuit gives the sensitivity of `n` for a given input pattern.
Parameters
----------
c : Circuit
Sequential circuit to ccompute sensitivity for.
n : str
Node to compute sensitivity at.
Returns
-------
Circuit
Sensitivity circuit.
"""
# check for blackboxes
if c.blackboxes:
raise ValueError(f"{c.name} contains a blackbox")
# check for startpoints
startpoints = c.startpoints(n)
if len(startpoints) < 1:
raise ValueError(f"{n} has no startpoints")
# get input cone
fi_nodes = c.transitive_fanin(n) | {n}
sub_c = cg.Circuit(graph=c.graph.subgraph(fi_nodes).copy())
# create sensitivity circuit
sen = cg.Circuit()
sen.add_subcircuit(sub_c, "orig")
for s in startpoints:
sen.add(s, "input", fanout=f"orig_{s}")
# add popcount
sen.add_subcircuit(cg.logic.popcount(len(startpoints)), "pc")
# add inverted input copies
for i, s0 in enumerate(startpoints):
sen.add_subcircuit(sub_c, f"inv_{s0}")
# connect inputs
for s1 in startpoints:
if s0 != s1:
sen.connect(s1, f"inv_{s0}_{s1}")
else:
# connect inverted input
sen.set_type(f"inv_{s0}_{s1}", "not")
sen.connect(s0, f"inv_{s0}_{s1}")
# compare to orig
sen.add(
f"dif_out_{s0}",
"xor",
fanin=[f"orig_{n}", f"inv_{s0}_{n}"],
fanout=f"pc_in_{i}",
output=True,
)
# instantiate population count
for o in range(cg.utils.clog2(len(startpoints) + 1)):
sen.add(f"sen_out_{o}", "buf", fanin=f"pc_out_{o}", output=True)
return sen
def limit_fanin(c, k):
"""
Reduce the maximum fanin of circuit gates to k.
Parameters
----------
c : Circuit
Input circuit.
k : str
Maximum fanin. (k >= 2)
Returns
-------
Circuit
Output circuit.
"""
if k < 2:
raise ValueError(f"'k' must be >= 2, not '{k}'")
gatemap = {
"and": "and",
"nand": "and",
"or": "or",
"nor": "or",
"xor": "xor",
"xnor": "xnor",
}
ck = c.copy()
for n in ck.nodes():
i = 0
while len(ck.fanin(n)) > k:
fi = ck.fanin(n)
f0 = fi.pop()
f1 = fi.pop()
ck.disconnect([f0, f1], n)
ck.add(
f"{n}_limit_fanin_{i}",
gatemap[ck.type(n)],
fanin=[f0, f1],
fanout=n,
uid=True,
)
i += 1
return ck
def limit_fanout(c, k):
"""
Reduce the maximum fanout of circuit gates to k.
Parameters
----------
c : Circuit
Input circuit.
k : str
Maximum fanout. (k >= 2)
Returns
-------
Circuit
Output circuit.
"""
if k < 2:
raise ValueError(f"'k' must be >= 2, not '{k}'")
ck = c.copy()
for n in ck.nodes():
i = 0
while len(ck.fanout(n)) > k:
fo = ck.fanout(n)
f0 = fo.pop()
f1 = fo.pop()
ck.disconnect(n, [f0, f1])
ck.add(
f"{n}_limit_fanout_{i}",
"buf",
fanin=n,
fanout=[f0, f1],
uid=True,
)
i += 1
return ck
def acyclic_unroll(c):
"""
Unroll a cyclic circuit to remove cycles.
Parameters
----------
c: Circuit
Circuit to unroll.
Returns
-------
Circuit
The unrolled circuit.
"""
if c.blackboxes:
raise ValueError("Cannot perform acyclic unroll with blackboxes")
def approx_min_fas(g):
g_copy = g.copy()
s1, s2 = [], []
while g_copy.nodes:
# find sinks
sinks = [n for n in g_copy.nodes if g_copy.out_degree(n) == 0]
while sinks:
s2 += sinks
g_copy.remove_nodes_from(sinks)
sinks = [n for n in g_copy.nodes if g_copy.out_degree(n) == 0]
# find sources
sources = [n for n in g_copy.nodes if g_copy.in_degree(n) == 0]
while sources:
s1 += sources
g_copy.remove_nodes_from(sources)
sources = [n for n in g_copy.nodes if g_copy.in_degree(n) == 0]
# choose max in/out degree difference
if g_copy.nodes:
n = max(
g_copy.nodes,
key=lambda x: g_copy.out_degree(x) - g_copy.in_degree(x),
)
s1.append(n)
g_copy.remove_node(n)
ordering = s1 + list(reversed(s2))
feedback_edges = [
e for e in g.edges if ordering.index(e[0]) > ordering.index(e[1])
]
feedback_edges = [
(u, v) for u, v in feedback_edges if u in nx.descendants(g, v)
]
g_copy = g.copy()
g_copy.remove_edges_from(feedback_edges)
try:
if nx.find_cycle(g_copy):
raise ValueError("approx_min_fas has failed")
except nx.NetworkXNoCycle:
pass
return feedback_edges
# find feedback nodes
feedback = {e[0] for e in approx_min_fas(c.graph)}
# get startpoints
sp = c.startpoints()
# create acyclic circuit
acyc = cg.Circuit(name=f"acyc_{c.name}")
for n in sp:
acyc.add(n, "input")
# create copy with broken feedback
c_cut = c.copy()
for f in feedback:
fanout = c.fanout(f)
c_cut.disconnect(f, fanout)
c_cut.add(f"aux_in_{f}", "buf", fanout=fanout)
c_cut.set_output(c.outputs(), False)
# cut feedback
for i in range(len(feedback) + 1):
# instantiate copy
acyc.add_subcircuit(c_cut, f"c{i}", {n: n for n in sp})
if i > 0:
# connect to last
for f in feedback:
acyc.connect(f"c{i-1}_{f}", f"c{i}_aux_in_{f}")
else:
# make feedback inputs
for f in feedback:
acyc.set_type(f"c{i}_aux_in_{f}", "input")
# connect outputs
for o in c.outputs():
acyc.add(o, "buf", fanin=f"c{i}_{o}", output=True)
cg.lint(acyc)
if acyc.is_cyclic():
raise ValueError("Circuit still cyclic")
return acyc
def supergates(c, construct_supercircuit=False):
"""
Break the circuit up into supergates.
Calculate the minimal covering of all circuit nodes with maximal supergates
of a circuit. For more information, see
Sharad C. Seth and Vishwani D. Agrawal. "A new model for computation of
probabilistic testability in combinational circuits." Integration 7.1
(1989): 49-75.
Parameters
----------
c: Circuit
The circuit to compute supergates for
construct_supercircuit: bool
If True, a circuit connecting together the supergates as black boxes
will be formed. Currently this only works if `c` has only one output.
Returns
-------
list of Circuit or (Circuit, dict of str:Circuit)
If `construct_supercircuit` is `False`, the supergate circuits,
topologically sorted. Otherwise, the supercircuit and a dict
mapping blackbox names to corresponding supergates.
"""
if construct_supercircuit and len(c.outputs()) > 1:
raise ValueError(
"Can only use `construct_supercircuit` one a single-output circuit"
)
# The current algorithm seems to fail for some circuits (like c880) with gates with
# fanin greater than 2. At the moment not sure if this is a bug in the
# implementation or expected behavior
c = limit_fanin(c, 2)
supergate_circuits = set()
for output in c.outputs():
c_output = subcircuit(c, c.transitive_fanin(output) | {output})
c_output.set_output(c_output.outputs(), False)
c_output.set_output(output, True)
g = c_output.graph.copy()
# Add backwards edge for every forward edge not connected to an output
rm_edges = []
for u, v in g.edges:
g.add_edge(v, u)
if v == output:
rm_edges.append((u, v))
for u, v in rm_edges:
g.remove_edge(u, v)
# Get dominator tree
doms = nx.immediate_dominators(g, output)
dom_tree = defaultdict(set)
for k, v in doms.items():
dom_tree[v].add(k)
# Build supergates starting at the output
dom_tree[output].remove(output)
frontier = Queue()
frontier.put(output)
while not frontier.empty():
# Build the supergate for this node
node = frontier.get()
supergate = {node}
# Include children and single successors
fanins = Queue()
for fi in dom_tree[node]:
fanins.put(fi)
while not fanins.empty():
fi = fanins.get()
supergate.add(fi)
if len(dom_tree[fi]) > 1:
frontier.put(fi)
elif len(dom_tree[fi]) == 1:
fanins.put(dom_tree[fi].pop())
supergate_circuit = subcircuit(c_output, supergate, modify_io=True)
supergate_circuit.set_output(node, True)
supergate_circuits.add(supergate_circuit)
# Find minimal covering of supergates indexed by the output
minimal_supergate_circuits = {}
for supergate in supergate_circuits:
remaining_cover = reduce(
lambda a, b: a | b,
(s.nodes() - s.inputs() for s in supergate_circuits - {supergate}),
set(),
)
if supergate.nodes() - remaining_cover:
minimal_supergate_circuits[supergate.outputs().pop()] = supergate
if construct_supercircuit:
superc = cg.Circuit(f"{c.name}_supergates")
for i in c.inputs():
superc.add(i, "input")
for o in c.outputs():
superc.add(o, "buf", output=True)
supergate_map = {}
for output, supergate in minimal_supergate_circuits.items():
sg_name = f"sg_{output}"
supergate_map[sg_name] = supergate
bb = cg.BlackBox(name=sg_name, inputs=supergate.inputs(), outputs={output})
for n in supergate.io():
if n not in superc:
superc.add(n, "buf")
superc.add_blackbox(bb, sg_name, {i: i for i in supergate.io()})
return superc, supergate_map
# Find topological ordering of supergates
g = nx.DiGraph()
for output, supergate in minimal_supergate_circuits.items():
g.add_node(output)
for i in supergate.inputs() - c.inputs():
for other_output in set(minimal_supergate_circuits) - {output}:
other_supergate = minimal_supergate_circuits[other_output]
if i in other_supergate.nodes() - other_supergate.inputs():
g.add_edge(other_output, output)
sorted_supergate_circuits = []
for node in nx.topological_sort(g):
sorted_supergate_circuits.append(minimal_supergate_circuits[node])
return sorted_supergate_circuits
def insert_registers(
c,
num_stages,
ff=cg.generic_flop,
d_port="d",
q_port="q",
other_flop_io={"clk": "clk"},
q_suffix="_cg_insert_reg_q_",
):
"""
Insert pipeline registers into a combinational design.
Parameters
----------
c: circuitgraph.Circuit
The circuit to insert registers into.
num_stages: int
The number of stages to add.
ff: circuitgraph.BlackBox
The flip flop blackbox to use.
d_port: str
The d port on the flip flop blackbox.
q_port: str
The q port on the flip flop blackbox.
other_flop_io: dict of str:str
Other io to connect on the flop (e.g. clk, rst ports).
Dict maps circuit nodes to flop ports. If a node is
present in the dict but not in the circuit, it will be
added as an input.
q_suffix: str
Inserted q nodes are named with the suffix `{q_suffix}{i}` where
`i` is the level the flop is inserted at.
Returns
-------
circuitgraph.Circuit
The circuit with added registers.
"""
c_reg = c.copy()
nodes_at_depths = []
max_depth = 0
for n in c_reg:
depth = c_reg.fanin_depth(n)
while depth >= len(nodes_at_depths):
nodes_at_depths.append([])
nodes_at_depths[depth].append(n)
if depth > max_depth:
max_depth = depth
depth_inc = round(max_depth / (num_stages + 1))
for n in other_flop_io:
if n not in c_reg:
c_reg.add(n, "input")
for i in range(depth_inc, max_depth, depth_inc):
for n in nodes_at_depths[i]:
fanout = c_reg.fanout(n)
c_reg.disconnect(n, fanout)
q = c_reg.add(f"{n}{q_suffix}{i}", "buf", uid=True, fanout=fanout)
conns = {d_port: n, q_port: q}
conns.update(other_flop_io)
c_reg.add_blackbox(ff, f"ff_{n}", conns)
return c_reg
Functions
def acyclic_unroll(c)
-
Unroll a cyclic circuit to remove cycles.
Parameters
c
:Circuit
- Circuit to unroll.
Returns
Circuit
- The unrolled circuit.
Expand source code
def acyclic_unroll(c): """ Unroll a cyclic circuit to remove cycles. Parameters ---------- c: Circuit Circuit to unroll. Returns ------- Circuit The unrolled circuit. """ if c.blackboxes: raise ValueError("Cannot perform acyclic unroll with blackboxes") def approx_min_fas(g): g_copy = g.copy() s1, s2 = [], [] while g_copy.nodes: # find sinks sinks = [n for n in g_copy.nodes if g_copy.out_degree(n) == 0] while sinks: s2 += sinks g_copy.remove_nodes_from(sinks) sinks = [n for n in g_copy.nodes if g_copy.out_degree(n) == 0] # find sources sources = [n for n in g_copy.nodes if g_copy.in_degree(n) == 0] while sources: s1 += sources g_copy.remove_nodes_from(sources) sources = [n for n in g_copy.nodes if g_copy.in_degree(n) == 0] # choose max in/out degree difference if g_copy.nodes: n = max( g_copy.nodes, key=lambda x: g_copy.out_degree(x) - g_copy.in_degree(x), ) s1.append(n) g_copy.remove_node(n) ordering = s1 + list(reversed(s2)) feedback_edges = [ e for e in g.edges if ordering.index(e[0]) > ordering.index(e[1]) ] feedback_edges = [ (u, v) for u, v in feedback_edges if u in nx.descendants(g, v) ] g_copy = g.copy() g_copy.remove_edges_from(feedback_edges) try: if nx.find_cycle(g_copy): raise ValueError("approx_min_fas has failed") except nx.NetworkXNoCycle: pass return feedback_edges # find feedback nodes feedback = {e[0] for e in approx_min_fas(c.graph)} # get startpoints sp = c.startpoints() # create acyclic circuit acyc = cg.Circuit(name=f"acyc_{c.name}") for n in sp: acyc.add(n, "input") # create copy with broken feedback c_cut = c.copy() for f in feedback: fanout = c.fanout(f) c_cut.disconnect(f, fanout) c_cut.add(f"aux_in_{f}", "buf", fanout=fanout) c_cut.set_output(c.outputs(), False) # cut feedback for i in range(len(feedback) + 1): # instantiate copy acyc.add_subcircuit(c_cut, f"c{i}", {n: n for n in sp}) if i > 0: # connect to last for f in feedback: acyc.connect(f"c{i-1}_{f}", f"c{i}_aux_in_{f}") else: # make feedback inputs for f in feedback: acyc.set_type(f"c{i}_aux_in_{f}", "input") # connect outputs for o in c.outputs(): acyc.add(o, "buf", fanin=f"c{i}_{o}", output=True) cg.lint(acyc) if acyc.is_cyclic(): raise ValueError("Circuit still cyclic") return acyc
def aig(c)
-
Transform a circuit into and and-inverter graph.
Parameters
c
:Circuit
- The circuit to transform to an AIG.
Returns
Circuit
- The AIG circuit.
Expand source code
def aig(c): """ Transform a circuit into and and-inverter graph. Parameters ---------- c: Circuit The circuit to transform to an AIG. Returns ------- Circuit The AIG circuit. """ with NamedTemporaryFile( prefix="circuitgraph_aig_input", suffix=".v", mode="w" ) as tmp_in: cg.to_file(c, tmp_in.name) with NamedTemporaryFile( prefix="circuitgraph_aig_output", suffix=".v", mode="r" ) as tmp_out: execute = ( f"read_verilog {tmp_in.name}; aigmap; " "opt; " f"write_verilog -noattr {tmp_out.name}" ) subprocess.run( ["yosys", "-p", execute], stdout=subprocess.DEVNULL, check=True ) c = cg.from_file(tmp_out.name) # c = remove_bufs(c) return c
def insert_registers(c, num_stages, ff=<circuitgraph.circuit.BlackBox object>, d_port='d', q_port='q', other_flop_io={'clk': 'clk'}, q_suffix='_cg_insert_reg_q_')
-
Insert pipeline registers into a combinational design.
Parameters
c
:circuitgraph.Circuit
- The circuit to insert registers into.
num_stages
:int
- The number of stages to add.
ff
:circuitgraph.BlackBox
- The flip flop blackbox to use.
d_port
:str
- The d port on the flip flop blackbox.
q_port
:str
- The q port on the flip flop blackbox.
other_flop_io
:dict
ofstr:str
- Other io to connect on the flop (e.g. clk, rst ports). Dict maps circuit nodes to flop ports. If a node is present in the dict but not in the circuit, it will be added as an input.
q_suffix
:str
- Inserted q nodes are named with the suffix
{q_suffix}{i}
wherei
is the level the flop is inserted at.
Returns
circuitgraph.Circuit
- The circuit with added registers.
Expand source code
def insert_registers( c, num_stages, ff=cg.generic_flop, d_port="d", q_port="q", other_flop_io={"clk": "clk"}, q_suffix="_cg_insert_reg_q_", ): """ Insert pipeline registers into a combinational design. Parameters ---------- c: circuitgraph.Circuit The circuit to insert registers into. num_stages: int The number of stages to add. ff: circuitgraph.BlackBox The flip flop blackbox to use. d_port: str The d port on the flip flop blackbox. q_port: str The q port on the flip flop blackbox. other_flop_io: dict of str:str Other io to connect on the flop (e.g. clk, rst ports). Dict maps circuit nodes to flop ports. If a node is present in the dict but not in the circuit, it will be added as an input. q_suffix: str Inserted q nodes are named with the suffix `{q_suffix}{i}` where `i` is the level the flop is inserted at. Returns ------- circuitgraph.Circuit The circuit with added registers. """ c_reg = c.copy() nodes_at_depths = [] max_depth = 0 for n in c_reg: depth = c_reg.fanin_depth(n) while depth >= len(nodes_at_depths): nodes_at_depths.append([]) nodes_at_depths[depth].append(n) if depth > max_depth: max_depth = depth depth_inc = round(max_depth / (num_stages + 1)) for n in other_flop_io: if n not in c_reg: c_reg.add(n, "input") for i in range(depth_inc, max_depth, depth_inc): for n in nodes_at_depths[i]: fanout = c_reg.fanout(n) c_reg.disconnect(n, fanout) q = c_reg.add(f"{n}{q_suffix}{i}", "buf", uid=True, fanout=fanout) conns = {d_port: n, q_port: q} conns.update(other_flop_io) c_reg.add_blackbox(ff, f"ff_{n}", conns) return c_reg
def limit_fanin(c, k)
-
Reduce the maximum fanin of circuit gates to k.
Parameters
c
:Circuit
- Input circuit.
k
:str
- Maximum fanin. (k >= 2)
Returns
Circuit
- Output circuit.
Expand source code
def limit_fanin(c, k): """ Reduce the maximum fanin of circuit gates to k. Parameters ---------- c : Circuit Input circuit. k : str Maximum fanin. (k >= 2) Returns ------- Circuit Output circuit. """ if k < 2: raise ValueError(f"'k' must be >= 2, not '{k}'") gatemap = { "and": "and", "nand": "and", "or": "or", "nor": "or", "xor": "xor", "xnor": "xnor", } ck = c.copy() for n in ck.nodes(): i = 0 while len(ck.fanin(n)) > k: fi = ck.fanin(n) f0 = fi.pop() f1 = fi.pop() ck.disconnect([f0, f1], n) ck.add( f"{n}_limit_fanin_{i}", gatemap[ck.type(n)], fanin=[f0, f1], fanout=n, uid=True, ) i += 1 return ck
def limit_fanout(c, k)
-
Reduce the maximum fanout of circuit gates to k.
Parameters
c
:Circuit
- Input circuit.
k
:str
- Maximum fanout. (k >= 2)
Returns
Circuit
- Output circuit.
Expand source code
def limit_fanout(c, k): """ Reduce the maximum fanout of circuit gates to k. Parameters ---------- c : Circuit Input circuit. k : str Maximum fanout. (k >= 2) Returns ------- Circuit Output circuit. """ if k < 2: raise ValueError(f"'k' must be >= 2, not '{k}'") ck = c.copy() for n in ck.nodes(): i = 0 while len(ck.fanout(n)) > k: fo = ck.fanout(n) f0 = fo.pop() f1 = fo.pop() ck.disconnect(n, [f0, f1]) ck.add( f"{n}_limit_fanout_{i}", "buf", fanin=n, fanout=[f0, f1], uid=True, ) i += 1 return ck
def miter(c0, c1=None, startpoints=None, endpoints=None)
-
Create a miter circuit.
Parameters
c0
:Circuit
- First circuit.
c1
:Circuit
- Optional second circuit, if None c0 is mitered with itself.
startpoints
:set
ofstr
- Nodes to be tied together, must exist in both circuits.
endpoints
:set
ofstr
- Nodes to be compared, must exist in both circuits.
Returns
Circuit
- Miter circuit.
Expand source code
def miter(c0, c1=None, startpoints=None, endpoints=None): """ Create a miter circuit. Parameters ---------- c0 : Circuit First circuit. c1 : Circuit Optional second circuit, if None c0 is mitered with itself. startpoints : set of str Nodes to be tied together, must exist in both circuits. endpoints : set of str Nodes to be compared, must exist in both circuits. Returns ------- Circuit Miter circuit. """ # check for blackboxes if c0.blackboxes: raise ValueError(f"{c0.name} contains a blackbox") if c1 and c1.blackboxes: raise ValueError(f"{c1.name} contains a blackbox") # clean inputs if not c1: c1 = c0 if not startpoints: startpoints = c0.startpoints() & c1.startpoints() if not endpoints: endpoints = c0.endpoints() & c1.endpoints() # create miter, relabel m = cg.Circuit(name=f"miter_{c0.name}_{c1.name}") m.add_subcircuit(c0, "c0") m.add_subcircuit(c1, "c1") # tie inputs for n in startpoints: m.add(n, "input", fanout=[f"c0_{n}", f"c1_{n}"]) # compare outputs m.add("sat", "or" if len(endpoints) > 1 else "buf", output=True) for n in endpoints: m.add(f"dif_{n}", "xor", fanin=[f"c0_{n}", f"c1_{n}"], fanout="sat") return m
def relabel(c, mapping)
-
Build copy of circuit with relabeled nodes.
Parameters
c
:Circuit
- Input circuit.
mapping
:dict
ofstr:str
- Relabeling of nodes.
Returns
Circuit
- Circuit with removed blackboxes.
Expand source code
def relabel(c, mapping): """ Build copy of circuit with relabeled nodes. Parameters ---------- c : Circuit Input circuit. mapping : dict of str:str Relabeling of nodes. Returns ------- Circuit Circuit with removed blackboxes. """ g = nx.relabel_nodes(c.graph, mapping) return cg.Circuit(graph=g, name=c.name, blackboxes=c.blackboxes.copy())
def sensitivity_transform(c, n)
-
Create a circuit to compute sensitivity.
Creatie a miter circuit for each input 'i' with the fanin cone of
n
where the second circuit has 'i' inverted, so that the miter output is high whenn
is sensitive to 'i'. The uninverted circuit is shared across all miters and the outputs of the miters are fed into a population count circuit so that the output of the population count circuit gives the sensitivity ofn
for a given input pattern.Parameters
c
:Circuit
- Sequential circuit to ccompute sensitivity for.
n
:str
- Node to compute sensitivity at.
Returns
Circuit
- Sensitivity circuit.
Expand source code
def sensitivity_transform(c, n): """ Create a circuit to compute sensitivity. Creatie a miter circuit for each input 'i' with the fanin cone of `n` where the second circuit has 'i' inverted, so that the miter output is high when `n` is sensitive to 'i'. The uninverted circuit is shared across all miters and the outputs of the miters are fed into a population count circuit so that the output of the population count circuit gives the sensitivity of `n` for a given input pattern. Parameters ---------- c : Circuit Sequential circuit to ccompute sensitivity for. n : str Node to compute sensitivity at. Returns ------- Circuit Sensitivity circuit. """ # check for blackboxes if c.blackboxes: raise ValueError(f"{c.name} contains a blackbox") # check for startpoints startpoints = c.startpoints(n) if len(startpoints) < 1: raise ValueError(f"{n} has no startpoints") # get input cone fi_nodes = c.transitive_fanin(n) | {n} sub_c = cg.Circuit(graph=c.graph.subgraph(fi_nodes).copy()) # create sensitivity circuit sen = cg.Circuit() sen.add_subcircuit(sub_c, "orig") for s in startpoints: sen.add(s, "input", fanout=f"orig_{s}") # add popcount sen.add_subcircuit(cg.logic.popcount(len(startpoints)), "pc") # add inverted input copies for i, s0 in enumerate(startpoints): sen.add_subcircuit(sub_c, f"inv_{s0}") # connect inputs for s1 in startpoints: if s0 != s1: sen.connect(s1, f"inv_{s0}_{s1}") else: # connect inverted input sen.set_type(f"inv_{s0}_{s1}", "not") sen.connect(s0, f"inv_{s0}_{s1}") # compare to orig sen.add( f"dif_out_{s0}", "xor", fanin=[f"orig_{n}", f"inv_{s0}_{n}"], fanout=f"pc_in_{i}", output=True, ) # instantiate population count for o in range(cg.utils.clog2(len(startpoints) + 1)): sen.add(f"sen_out_{o}", "buf", fanin=f"pc_out_{o}", output=True) return sen
def sensitization_transform(c, n, endpoints=None)
-
Create a circuit to sensitize a node to an endpoint.
Create a miter circuit with that node inverted in one circuit copy.
Parameters
c
:Circuit
- Input circuit.
n
:str
- Node to sensitize.
endpoints
:str
orlist
ofstr
- Endpoints to sensitize to. If None, any output can be used for sensitization.
Returns
Circuit
- Output circuit.
Expand source code
def sensitization_transform(c, n, endpoints=None): """ Create a circuit to sensitize a node to an endpoint. Create a miter circuit with that node inverted in one circuit copy. Parameters ---------- c : Circuit Input circuit. n : str Node to sensitize. endpoints: str or list of str Endpoints to sensitize to. If None, any output can be used for sensitization. Returns ------- Circuit Output circuit. """ # check for blackboxes if c.blackboxes: raise ValueError("Circuit contains a blackbox") if endpoints: if isinstance(endpoints, str): endpoints = {endpoints} else: endpoints = set(endpoints) fi = c.transitive_fanin(endpoints) if n not in fi: raise ValueError(f"'{n}' is not in fanin of given endpoints") subc = subcircuit(c, endpoints | fi) for node in subc: subc.set_output(node, node in endpoints) miter_name = f"{c.name}_sensitize_{n}_to_{'_'.join(endpoints)}" else: subc = c miter_name = f"{c.name}_sensitize_{n}" # create miter m = miter(subc) m.name = miter_name # flip node in c1 m.disconnect(m.fanin(f"c1_{n}"), f"c1_{n}") m.set_type(f"c1_{n}", "not") m.connect(f"c0_{n}", f"c1_{n}") return m
def sequential_unroll(c, n, reg_d_port, reg_q_port, ignore_pins=None, add_flop_outputs=False, initial_values=None, remove_unloaded=True, prefix='cg_unroll')
-
Unroll a sequential circuit.
Provides a higher level API than
unroll()
by accepting a circuit with sequential elements kept as blackboxes. Assumes that all blackboxes in the circuit are sequential elements.Parameters
c
:Circuit
- Circuit to unroll.
n
:int
- The number of unrolled copies of the circuit to create.
reg_d_port
:str
- The name of the D port in the blackboxes in
c
. reg_q_port
:str
- The name of the Q port in the blackboxes in
c
. ignore_pins
:str
orlist
ofstr
- The names of pins in the blackboxes to ignore.
add_flop_outputs
:bool
- If True, the Q port of the flops will be added as primary outputs.
initial_values
:str
ordict
ofstr:str
- The initial values of the data ports for the first timestep. If None, the ports will be added as primary inputs. If a single value ('0', '1', or 'x'), every flop will get that value. Can also pass in dict mapping flop names to values.
remove_unloaded
:bool
- If True, unloaded inputs will be removed after unrolling. This can remove unused sequential signals such as the clock and reset.
prefix
:str
- The prefix to use for naming unrolled nodes.
Returns
Circuit, dict
ofstr:list
ofstr
- Unrolled circuit and mapping of original circuit io to list of unrolled circuit io. The lists are in order of the unroll iterations.
Expand source code
def sequential_unroll( c, n, reg_d_port, reg_q_port, ignore_pins=None, add_flop_outputs=False, initial_values=None, remove_unloaded=True, prefix="cg_unroll", ): """ Unroll a sequential circuit. Provides a higher level API than `unroll` by accepting a circuit with sequential elements kept as blackboxes. Assumes that all blackboxes in the circuit are sequential elements. Parameters ---------- c: Circuit Circuit to unroll. n: int The number of unrolled copies of the circuit to create. reg_d_port: str The name of the D port in the blackboxes in `c`. reg_q_port: str The name of the Q port in the blackboxes in `c`. ignore_pins: str or list of str The names of pins in the blackboxes to ignore. add_flop_outputs: bool If True, the Q port of the flops will be added as primary outputs. initial_values: str or dict of str:str The initial values of the data ports for the first timestep. If None, the ports will be added as primary inputs. If a single value ('0', '1', or 'x'), every flop will get that value. Can also pass in dict mapping flop names to values. remove_unloaded: bool If True, unloaded inputs will be removed after unrolling. This can remove unused sequential signals such as the clock and reset. prefix: str The prefix to use for naming unrolled nodes. Returns ------- Circuit, dict of str:list of str Unrolled circuit and mapping of original circuit io to list of unrolled circuit io. The lists are in order of the unroll iterations. """ cs = strip_blackboxes(c, ignore_pins=ignore_pins) blackbox = c.blackboxes[set(c.blackboxes.keys()).pop()] if reg_d_port not in blackbox.inputs(): raise ValueError(f"Provided d port {reg_d_port} not in bb inputs") cs.remove( f"{bb}_{p}" for p in blackbox.inputs() - {reg_d_port} for bb in c.blackboxes ) if reg_q_port not in blackbox.outputs(): raise ValueError(f"Provided q port {reg_q_port} not in bb outputs") cs.remove( f"{bb}_{p}" for p in blackbox.outputs() - {reg_q_port} for bb in c.blackboxes ) if remove_unloaded: for i in cs.inputs(): if not cs.fanout(i): cs.remove(i) state_io = {f"{bb}_{reg_d_port}": f"{bb}_{reg_q_port}" for bb in c.blackboxes} uc, io_map = unroll(cs, n, state_io, prefix=prefix) for state_output in (f"{bb}_{reg_d_port}" for bb in c.blackboxes): uc.set_output(io_map[state_output], add_flop_outputs) if initial_values: if isinstance(initial_values, str): for fi in [io_map[f"{bb}_{reg_q_port}"][0] for bb in c.blackboxes]: uc.set_type(fi, initial_values) else: for k, v in initial_values.items(): uc.set_type(io_map[f"{k}_{reg_q_port}"][0], v) return uc, io_map
def strip_blackboxes(c, ignore_pins=None)
-
Convert blackboxes to io.
Parameters
c
:Circuit
- Input circuit.
ingnore_pins
:str
orlist
ofstr
- Pins to not create io for, just disconnect and delete.
Returns
Circuit
- Circuit with removed blackboxes.
Expand source code
def strip_blackboxes(c, ignore_pins=None): """ Convert blackboxes to io. Parameters ---------- c : Circuit Input circuit. ingnore_pins: str or list of str Pins to not create io for, just disconnect and delete. Returns ------- Circuit Circuit with removed blackboxes. """ if not ignore_pins: ignore_pins = [] elif isinstance(ignore_pins, str): ignore_pins = [ignore_pins] g = c.graph.copy() bb_pins = [] for n in c.filter_type("bb_input"): if n.split(".")[-1] in ignore_pins: g.remove_node(n) else: g.nodes[n]["type"] = "buf" g.nodes[n]["output"] = True bb_pins.append(n) for n in c.filter_type("bb_output"): if n.split(".")[-1] in ignore_pins: g.remove_node(n) else: g.nodes[n]["type"] = "input" bb_pins.append(n) # rename nodes mapping = {n: n.replace(".", "_") for n in bb_pins} for k in mapping.values(): if k in g: raise ValueError(f"Overlapping blackbox name: {k}") nx.relabel_nodes(g, mapping, copy=False) return cg.Circuit(graph=g, name=c.name)
def strip_inputs(c)
-
Convert inputs to buffers for easy instantiation.
Parameters
c
:Circuit
- Input circuit.
Returns
Circuit
- Circuit with removed io.
Expand source code
def strip_inputs(c): """ Convert inputs to buffers for easy instantiation. Parameters ---------- c : Circuit Input circuit. Returns ------- Circuit Circuit with removed io. """ g = c.graph.copy() for i in c.inputs(): g.nodes[i]["type"] = "buf" return cg.Circuit(graph=g, name=c.name, blackboxes=c.blackboxes.copy())
def strip_io(c)
-
Remove a circuit's outputs and convert inputs to buffers.
Parameters
c
:Circuit
- Input circuit.
Returns
Circuit
- Circuit with removed io.
Expand source code
def strip_io(c): """ Remove a circuit's outputs and convert inputs to buffers. Parameters ---------- c : Circuit Input circuit. Returns ------- Circuit Circuit with removed io. """ g = c.graph.copy() for i in c.inputs(): g.nodes[i]["type"] = "buf" for o in c.outputs(): g.nodes[o]["output"] = False return cg.Circuit(graph=g, name=c.name, blackboxes=c.blackboxes.copy())
def strip_outputs(c)
-
Remove a circuit's outputs for easy instantiation.
Parameters
c
:Circuit
- Input circuit.
Returns
Circuit
- Circuit with removed io.
Expand source code
def strip_outputs(c): """ Remove a circuit's outputs for easy instantiation. Parameters ---------- c : Circuit Input circuit. Returns ------- Circuit Circuit with removed io. """ g = c.graph.copy() for o in c.outputs(): g.nodes[o]["output"] = False return cg.Circuit(graph=g, name=c.name, blackboxes=c.blackboxes.copy())
def subcircuit(c, nodes, modify_io=False)
-
Create a subcircuit from a set of nodes of a given circuit.
Parameters
c
:Circuit
- The circuit to create a subcircuit from.
nodes
:list
ofstr
- The nodes to include in the subcircuit.
modify_io
:bool
- If True, gates without drivers will be turned into inputs and gates without fanout will be marked as outputs.
Returns
Circuit
- The subcircuit.
Expand source code
def subcircuit(c, nodes, modify_io=False): """ Create a subcircuit from a set of nodes of a given circuit. Parameters ---------- c: Circuit The circuit to create a subcircuit from. nodes: list of str The nodes to include in the subcircuit. modify_io: bool If True, gates without drivers will be turned into inputs and gates without fanout will be marked as outputs. Returns ------- Circuit The subcircuit. """ sc = cg.Circuit() for node in nodes: if c.type(node) in ["bb_output", "bb_input"]: raise NotImplementedError("Cannot create a subcircuit with blackboxes") sc.add(node, c.type(node), output=c.is_output(node)) for edge in c.edges(): if edge[0] in nodes and edge[1] in nodes: sc.connect(edge[0], edge[1]) if modify_io: for node in sc: if sc.type(node) not in ["0", "1", "x"] and not sc.fanin(node): sc.set_type(node, "input") if not sc.fanout(node): sc.set_output(node) return sc
def supergates(c, construct_supercircuit=False)
-
Break the circuit up into supergates.
Calculate the minimal covering of all circuit nodes with maximal supergates of a circuit. For more information, see Sharad C. Seth and Vishwani D. Agrawal. "A new model for computation of probabilistic testability in combinational circuits." Integration 7.1 (1989): 49-75.
Parameters
c
:Circuit
- The circuit to compute supergates for
construct_supercircuit
:bool
- If True, a circuit connecting together the supergates as black boxes
will be formed. Currently this only works if
c
has only one output.
Returns
list
ofCircuit
or(Circuit, dict
ofstr:Circuit)
- If
construct_supercircuit
isFalse
, the supergate circuits, topologically sorted. Otherwise, the supercircuit and a dict mapping blackbox names to corresponding supergates.
Expand source code
def supergates(c, construct_supercircuit=False): """ Break the circuit up into supergates. Calculate the minimal covering of all circuit nodes with maximal supergates of a circuit. For more information, see Sharad C. Seth and Vishwani D. Agrawal. "A new model for computation of probabilistic testability in combinational circuits." Integration 7.1 (1989): 49-75. Parameters ---------- c: Circuit The circuit to compute supergates for construct_supercircuit: bool If True, a circuit connecting together the supergates as black boxes will be formed. Currently this only works if `c` has only one output. Returns ------- list of Circuit or (Circuit, dict of str:Circuit) If `construct_supercircuit` is `False`, the supergate circuits, topologically sorted. Otherwise, the supercircuit and a dict mapping blackbox names to corresponding supergates. """ if construct_supercircuit and len(c.outputs()) > 1: raise ValueError( "Can only use `construct_supercircuit` one a single-output circuit" ) # The current algorithm seems to fail for some circuits (like c880) with gates with # fanin greater than 2. At the moment not sure if this is a bug in the # implementation or expected behavior c = limit_fanin(c, 2) supergate_circuits = set() for output in c.outputs(): c_output = subcircuit(c, c.transitive_fanin(output) | {output}) c_output.set_output(c_output.outputs(), False) c_output.set_output(output, True) g = c_output.graph.copy() # Add backwards edge for every forward edge not connected to an output rm_edges = [] for u, v in g.edges: g.add_edge(v, u) if v == output: rm_edges.append((u, v)) for u, v in rm_edges: g.remove_edge(u, v) # Get dominator tree doms = nx.immediate_dominators(g, output) dom_tree = defaultdict(set) for k, v in doms.items(): dom_tree[v].add(k) # Build supergates starting at the output dom_tree[output].remove(output) frontier = Queue() frontier.put(output) while not frontier.empty(): # Build the supergate for this node node = frontier.get() supergate = {node} # Include children and single successors fanins = Queue() for fi in dom_tree[node]: fanins.put(fi) while not fanins.empty(): fi = fanins.get() supergate.add(fi) if len(dom_tree[fi]) > 1: frontier.put(fi) elif len(dom_tree[fi]) == 1: fanins.put(dom_tree[fi].pop()) supergate_circuit = subcircuit(c_output, supergate, modify_io=True) supergate_circuit.set_output(node, True) supergate_circuits.add(supergate_circuit) # Find minimal covering of supergates indexed by the output minimal_supergate_circuits = {} for supergate in supergate_circuits: remaining_cover = reduce( lambda a, b: a | b, (s.nodes() - s.inputs() for s in supergate_circuits - {supergate}), set(), ) if supergate.nodes() - remaining_cover: minimal_supergate_circuits[supergate.outputs().pop()] = supergate if construct_supercircuit: superc = cg.Circuit(f"{c.name}_supergates") for i in c.inputs(): superc.add(i, "input") for o in c.outputs(): superc.add(o, "buf", output=True) supergate_map = {} for output, supergate in minimal_supergate_circuits.items(): sg_name = f"sg_{output}" supergate_map[sg_name] = supergate bb = cg.BlackBox(name=sg_name, inputs=supergate.inputs(), outputs={output}) for n in supergate.io(): if n not in superc: superc.add(n, "buf") superc.add_blackbox(bb, sg_name, {i: i for i in supergate.io()}) return superc, supergate_map # Find topological ordering of supergates g = nx.DiGraph() for output, supergate in minimal_supergate_circuits.items(): g.add_node(output) for i in supergate.inputs() - c.inputs(): for other_output in set(minimal_supergate_circuits) - {output}: other_supergate = minimal_supergate_circuits[other_output] if i in other_supergate.nodes() - other_supergate.inputs(): g.add_edge(other_output, output) sorted_supergate_circuits = [] for node in nx.topological_sort(g): sorted_supergate_circuits.append(minimal_supergate_circuits[node]) return sorted_supergate_circuits
def syn(c, engine='yosys', suppress_output=False, stdout_file=None, stderr_file=None, working_dir='.', fast_parsing=False, pre_syn_file=None, post_syn_file=None, verilog_exists=False, effort='high')
-
Synthesize the circuit using a third-party synthesis tool.
Parameters
c
:Circuit
- Circuit to synthesize.
engine
:str
- Synthesis tool to use ('genus', 'dc', or 'yosys').
suppress_output
:bool
- If True, synthesis stdout will not be printed.
stdout_file
:file
orstr
orNone
- If defined, synthesis stdout will be directed to this file instead of being printed.
output_file
:file
orstr
orNone
- If defined, synthesis stderr will be written to this file instead of being printed.
working_dir
:str
- The path to run synthesis from. If using genus, this will effect where the genus run files are stored. Directory will be created if it does not exist.
fast_parsing
:bool
- If True, will use fast verilog parsing (which requires
specifically formatted netlists, see the documentation for
verilog_to_circuit
). pre_syn_file
:file
orstr
orNone
- If specified, the circuit verilog will be written to this file before synthesis. If None, a temporary file will be used.
post_syn_file
:file
orstr
orNone
- If specified, the synthesis output verilog will be written to this file. If None, a temporary file will be used.
verilog_exists
:bool
- If True, does not write
c
to a file, instead uses the verilog already present inpre_syn_file
. effort
:str
- The effort to use for synthesis. Either 'high', 'medium', or 'low'.
Returns
Circuit
- Synthesized circuit.
Expand source code
def syn( c, engine="yosys", suppress_output=False, stdout_file=None, stderr_file=None, working_dir=".", fast_parsing=False, pre_syn_file=None, post_syn_file=None, verilog_exists=False, effort="high", ): """ Synthesize the circuit using a third-party synthesis tool. Parameters ---------- c : Circuit Circuit to synthesize. engine : str Synthesis tool to use ('genus', 'dc', or 'yosys'). suppress_output: bool If True, synthesis stdout will not be printed. stdout_file: file or str or None If defined, synthesis stdout will be directed to this file instead of being printed. output_file: file or str or None If defined, synthesis stderr will be written to this file instead of being printed. working_dir: str The path to run synthesis from. If using genus, this will effect where the genus run files are stored. Directory will be created if it does not exist. fast_parsing: bool If True, will use fast verilog parsing (which requires specifically formatted netlists, see the documentation for `verilog_to_circuit`). pre_syn_file: file or str or None If specified, the circuit verilog will be written to this file before synthesis. If None, a temporary file will be used. post_syn_file: file or str or None If specified, the synthesis output verilog will be written to this file. If None, a temporary file will be used. verilog_exists: bool If True, does not write `c` to a file, instead uses the verilog already present in `pre_syn_file`. effort: str The effort to use for synthesis. Either 'high', 'medium', or 'low'. Returns ------- Circuit Synthesized circuit. """ if engine == "yosys" and shutil.which("yosys") is None: raise OSError("'yosys' installation not found") if engine == "genus" and shutil.which("genus") is None: raise OSError("'genus' installation not found") if engine == "dc": dc_engine = "dc_shell-t" if shutil.which("dc_shell-t") is None: dc_engine = "dc_shell" if shutil.which("dc_shell") is None: raise OSError("'dc_shell-t' or 'dc_shell' installation not found") working_dir = Path(working_dir) working_dir.mkdir(exist_ok=True) working_dir = str(working_dir) # Make paths absolute in case synthesis is run from different working dir if pre_syn_file: pre_syn_file = Path(pre_syn_file).absolute() if post_syn_file: post_syn_file = Path(post_syn_file).absolute() if verilog_exists and not pre_syn_file: raise ValueError("Must specify pre_syn_file if using verilog_exists") with open(pre_syn_file) if verilog_exists else open( pre_syn_file, "w" ) if pre_syn_file else NamedTemporaryFile( prefix="circuitgraph_synthesis_input", suffix=".v", mode="w" ) as tmp_in: if not verilog_exists: verilog = cg.io.circuit_to_verilog(c) tmp_in.write(verilog) tmp_in.flush() with open(post_syn_file, "w+") if post_syn_file else NamedTemporaryFile( prefix="circuitgraph_synthesis_output", suffix=".v", mode="r" ) as tmp_out: if engine == "genus": try: lib_path = os.environ["CIRCUITGRAPH_GENUS_LIBRARY_PATH"] except KeyError as e: raise ValueError( "In order to run synthesis with Genus, please set the " "CIRCUITGRAPH_GENUS_LIBRARY_PATH variable in your os " "environment to the path to the tech library to use" ) from e cmd = [ "genus", "-no_gui", "-execute", "set_db / .library " f"{lib_path};\n" f"read_hdl -sv {tmp_in.name};\n" "elaborate;\n" f"set_db syn_generic_effort {effort};\n" "syn_generic;\n" "syn_map;\n" "syn_opt;\n" f'redirect {tmp_out.name} "write_hdl -generic";\n' "exit;", ] elif engine == "dc": try: lib_path = os.environ["CIRCUITGRAPH_DC_LIBRARY_PATH"] except KeyError as e: raise ValueError( "In order to run synthesis with DC, please set the " "CIRCUITGRAPH_DC_LIBRARY_PATH variable in your os environment " "to the path to the GTECH library" ) from e libname = "GTECH" usable_cells = [f"{libname.lower()}/{libname}_NOT"] for gate in ["OR", "NOR", "AND", "NAND", "XOR", "XNOR"]: usable_cells += [ f"{libname.lower()}/{libname}_{gate}{i}" for i in range(2, 5) ] usable_cells += [ f"{libname.lower()}/{libname}_FD{i}" for i in range(1, 4) ] execute = ( f"set_app_var target_library {lib_path};\n" f"set_app_var link_library {lib_path};\n" "set_dont_use [remove_from_collection " f"[get_lib_cells {libname.lower()}/*] " f"\"{' '.join(usable_cells)}\"];\n" f"read_file {tmp_in.name}\n" "link;\n" "uniquify;\n" "check_design;\n" "simplify_constants;\n" f"compile;\n" f"write -format verilog -output {tmp_out.name};\n" "exit;" ) cmd = [dc_engine, "-no_gui", "-x", execute] elif engine == "yosys": cmd = [ "yosys", "-p", f"read_verilog {tmp_in.name}; " "synth; " f"write_verilog -noattr {tmp_out.name}", ] else: raise ValueError("synthesis engine must be yosys, dc, or genus") if suppress_output and not stdout_file: stdout = subprocess.DEVNULL elif stdout_file: stdout = open(stdout_file, "w") else: stdout = None if stderr_file: stderr = open(stderr_file, "w") else: stderr = None subprocess.run( cmd, stdout=stdout, stderr=stderr, cwd=working_dir, check=True ) if stdout_file: stdout.close() if stderr_file: stderr.close() output_netlist = tmp_out.read() # Rename dc library gates if engine == "dc": def replace_gate(match): # Keep flops as they are if match.group(1).startswith(f"{libname}_FD"): return match ports = [ i.strip().split("(")[-1].strip(")") for i in match.group(3).split(",") ] portlist = ", ".join(reversed(ports)) return f"{match.group(1).lower()} {match.group(2)}({portlist});" output_netlist = re.sub( rf"{libname}_([A-Z]+)[1-4]?\s+" r"([a-zA-Z][a-zA-Z\d_]*)\s*\(([^;]+)\);", replace_gate, output_netlist, ) return cg.io.verilog_to_circuit(output_netlist, c.name, fast=fast_parsing)
def ternary(c)
-
Encode the circuit with ternary values.
The ternary circuit adds a second net for each net in the original circuit. The second net encodes a don't care, or X, value. That net being high corresponds to a don't care value on original net. If the second net is low, the logical value on the original net is valid.
Parameters
c
:Circuit
- Circuit to encode.
suffix
:str
- The suffix to give the added nets. Note that it is safest to use the returned dictionary to refer to the added nets because they are uniquified when they are added to the circuit.
Returns
Circuit, dict
ofstr:str
- Encoded circuit and dictionary mapping original net names to added ternary net names.
Expand source code
def ternary(c): """ Encode the circuit with ternary values. The ternary circuit adds a second net for each net in the original circuit. The second net encodes a don't care, or X, value. That net being high corresponds to a don't care value on original net. If the second net is low, the logical value on the original net is valid. Parameters ---------- c : Circuit Circuit to encode. suffix: str The suffix to give the added nets. Note that it is safest to use the returned dictionary to refer to the added nets because they are uniquified when they are added to the circuit. Returns ------- Circuit, dict of str:str Encoded circuit and dictionary mapping original net names to added ternary net names. """ if c.blackboxes: raise ValueError(f"{c.name} contains a blackbox") t = c.copy() # add dual nodes mapping = {n: c.uid(f"{n}_X") for n in c} for n in c: if c.type(n) in ["and", "nand"]: t.add(mapping[n], "and", output=c.is_output(n), allow_redefinition=True) t.add( f"{n}_x_in_fi", "or", fanout=mapping[n], fanin=[mapping[p] for p in c.fanin(n)], uid=True, add_connected_nodes=True, ) zero_not_in_fi = t.add( f"{n}_0_not_in_fi", "nor", fanout=mapping[n], uid=True ) for p in c.fanin(n): t.add( f"{p}_is_0", "nor", fanout=zero_not_in_fi, fanin=[p, mapping[p]], uid=True, ) elif c.type(n) in ["or", "nor"]: t.add(mapping[n], "and", output=c.is_output(n), allow_redefinition=True) t.add( f"{n}_x_in_fi", "or", fanout=mapping[n], fanin=[mapping[p] for p in c.fanin(n)], uid=True, add_connected_nodes=True, ) one_not_in_fi = t.add( f"{n}_1_not_in_fi", "nor", fanout=mapping[n], uid=True ) for p in c.fanin(n): is_one = t.add( f"{p}_is_1", "and", fanout=one_not_in_fi, fanin=p, uid=True ) t.add(f"{p}_not_x", "not", fanout=is_one, fanin=mapping[p], uid=True) elif c.type(n) in ["buf", "not"]: p = c.fanin(n).pop() t.add( mapping[n], "buf", fanin=mapping[p], output=c.is_output(n), add_connected_nodes=True, allow_redefinition=True, ) elif c.type(n) in ["xor", "xnor"]: t.add( mapping[n], "or", fanin=[mapping[p] for p in c.fanin(n)], output=c.is_output(n), add_connected_nodes=True, allow_redefinition=True, ) elif c.type(n) in ["0", "1"]: t.add(mapping[n], "0", output=c.is_output(n), allow_redefinition=True) elif c.type(n) in ["input"]: t.add(mapping[n], "input", allow_redefinition=True) else: raise ValueError(f"Node '{n}' has invalid type: '{c.type(n)}'") return t, mapping
def unroll(c, n, state_io, prefix='cg_unroll')
-
Unroll a circuit.
Create multiple copies of the circuit and connect together state io.
Parameters
c
:Circuit
- Circuit to unroll.
n
:int
- The number of unrolled copies of the circuit to create.
state_io
:dict
ofstr:str
- For each
(k, v)
pair in the dict,k
of circuit iterationn - 1
will be tied tov
of circuit iterationn
. prefix
:str
- The prefix to use for naming new io for each iteration.
Returns
Circuit, dict
ofstr:list
ofstr
- Unrolled circuit and mapping of original circuit io to list of unrolled circuit io. The lists are in order of the unroll iterations.
Expand source code
def unroll(c, n, state_io, prefix="cg_unroll"): """ Unroll a circuit. Create multiple copies of the circuit and connect together state io. Parameters ---------- c: Circuit Circuit to unroll. n: int The number of unrolled copies of the circuit to create. state_io: dict of str:str For each `(k, v)` pair in the dict, `k` of circuit iteration `n - 1` will be tied to `v` of circuit iteration `n`. prefix: str The prefix to use for naming new io for each iteration. Returns ------- Circuit, dict of str:list of str Unrolled circuit and mapping of original circuit io to list of unrolled circuit io. The lists are in order of the unroll iterations. """ # check for blackboxes if c.blackboxes: raise ValueError(f"{c.name} contains a blackbox") if n < 1: raise ValueError(f"n must be >= 1 ({n})") for k, v in state_io.items(): if k not in c.io(): raise ValueError(f"Node '{k}' in state_io dict but not in io of circuit") if v not in c.io(): raise ValueError(f"Node '{v}' in state_io dict but not in io of circuit") uc = cg.Circuit() io_map = {io: [] for io in c.io()} for itr in range(n): for io in c.io(): new_io = c.uid(f"{io}_{prefix}_{itr}") if io in state_io or io in state_io.values(): t = "buf" elif io in c.inputs(): t = "input" else: t = "buf" uc.add(new_io, t, output=c.is_output(io)) io_map[io].append(new_io) uc.add_subcircuit( c, f"unrolled_{itr}", {i: f"{i}_{prefix}_{itr}" for i in c.io()} ) if itr == 0: for i in state_io.values(): uc.set_type(f"{i}_{prefix}_{itr}", "input") else: for k, v in state_io.items(): uc.connect(f"{k}_{prefix}_{itr-1}", f"{v}_{prefix}_{itr}") return uc, io_map