Skip to content

Commit

Permalink
Refactor.
Browse files Browse the repository at this point in the history
  • Loading branch information
LarsAsplund committed Sep 7, 2024
1 parent 3dcbb5e commit 6bcd461
Show file tree
Hide file tree
Showing 3 changed files with 148 additions and 124 deletions.
260 changes: 141 additions & 119 deletions vunit/sim_if/modelsim.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,14 +11,14 @@
from pathlib import Path
import os
import logging
from threading import Lock, Event
from time import sleep
from configparser import RawConfigParser
from ..exceptions import CompileError
from ..ostools import write_file, Process, file_exists
from ..vhdl_standard import VHDL
from . import SimulatorInterface, ListOfStringOption, StringOption, BooleanOption
from .vsim_simulator_mixin import VsimSimulatorMixin, fix_path
from threading import Lock, Event
from time import sleep

LOGGER = logging.getLogger(__name__)

Expand Down Expand Up @@ -102,8 +102,8 @@ def __init__(self, prefix, output_path, persistent=False, gui=False):
self._coverage_files = set()
assert not (persistent and gui)
self._create_modelsim_ini()
self._optimized_designs = dict()
self._optimized_libraries = dict()
self._optimized_designs = {}
self._optimized_libraries = {}
self._vopt_lock = Lock()

def _create_modelsim_ini(self):
Expand Down Expand Up @@ -236,9 +236,116 @@ def _get_mapped_libraries(self):
del libraries["others"]
return libraries

def _optimize(self, config, script_path):
three_step_flow = config.sim_options.get("modelsim.three_step_flow", False)
def _create_optimize_script(self, design_to_optimize, simulation_target, config):
"""
Create vopt script.
"""
vopt_flags = [
f"{design_to_optimize}",
f"-work {{{config.library_name}}}",
"-quiet",
f"-floatgenerics+{config.entity_name}.",
f"-o {{{simulation_target}}}",
self._vopt_extra_args(config),
]

# There is a known bug in modelsim that prevents the -modelsimini flag from accepting
# a space in the path even with escaping, see issue #36
if " " not in self._sim_cfg_file_name:
modelsimini_option = f"-modelsimini {fix_path(self._sim_cfg_file_name)!s}"
vopt_flags.insert(0, modelsimini_option)

for library in self._libraries:
vopt_flags += ["-L", library.name]

tcl = """
proc vunit_optimize {{vopt_extra_args ""}} {"""
tcl += """
set vopt_failed [catch {{
eval vopt ${{vopt_extra_args}} {{{vopt_flags}}}
}}]
if {{${{vopt_failed}}}} {{
echo Command 'vopt ${{vopt_extra_args}} {vopt_flags}' failed
echo Bad flag from vopt_extra_args?
return true
}}
return False
}}
""".format(
vopt_flags=" ".join(vopt_flags)
)

return tcl

def _run_persistent_optimize(self, optimize_file_name):
"""
Run a test bench using the persistent vsim process
"""
try:
self._persistent_shell.execute(f'source "{fix_path(str(optimize_file_name))!s}"')
self._persistent_shell.execute("set failed [vunit_optimize]")
if self._persistent_shell.read_bool("failed"):
status = False
else:
status = True

except Process.NonZeroExitCode:
status = False

return status

def _run_optimize_batch_file(self, batch_file_name, script_path):
"""
Run a test bench in batch by invoking a new vsim process from the command line
"""
try:
args = [
str(Path(self._prefix) / "vsim"),
"-c",
"-l",
str(script_path / "transcript"),
"-do",
f'source "{fix_path(str(batch_file_name))!s}"',
]

proc = Process(args, cwd=str(Path(self._sim_cfg_file_name).parent))
proc.consume_output()
status = True
except Process.NonZeroExitCode:
status = False

return status

def _acquire_library_lock(self, library, config, design_to_optimize):
"""
Acquire library lock and wait for any lock file to be removed.
"""
with self._vopt_lock:
library_lock = self._optimized_libraries[config.library_name]

if library_lock.locked():
LOGGER.debug("Waiting for library lock for %s to optimize %s", config.library_name, design_to_optimize)
# Do not completely block to allow for Ctrl+C
while not library_lock.acquire(timeout=0.05):
pass
while (Path(library.directory) / "_lock").exists():
LOGGER.debug("Waiting for %s to be removed", Path(library.directory) / "_lock")
sleep(0.05)
LOGGER.debug("Acquired library lock for %s to optimize %s", config.library_name, design_to_optimize)

def _release_library_lock(self, library, config):
"""
Release library lock and wait for any lock file to be removed.
"""
with self._vopt_lock:
while (Path(library.directory) / "_lock").exists():
LOGGER.debug("Waiting for %s to be removed", Path(library.directory) / "_lock")
sleep(0.05)
self._optimized_libraries[config.library_name].release()

def _optimize(self, config, script_path):
if config.architecture_name is None:
architecture_suffix = ""
else:
Expand All @@ -250,44 +357,27 @@ def _optimize(self, config, script_path):
else config.library_name + "." + config.vhdl_configuration_name
)

if not three_step_flow:
if not config.sim_options.get("modelsim.three_step_flow", False):
return design_to_optimize

for library in self._libraries:
if library.name == config.library_name:
break
libraries = {lib.name: lib for lib in self._libraries}
library = libraries[config.library_name]

optimize = False
has_library_lock = False
with self._vopt_lock:
if not self._optimized_designs.get(design_to_optimize, None):
self._optimized_designs[design_to_optimize] = dict(simulation_target=None, vopt_event=Event())
self._optimized_designs[design_to_optimize] = {"simulation_target": None, "vopt_event": Event()}
optimize = True

if not self._optimized_libraries.get(config.library_name, None):
library_lock = Lock()
library_lock.acquire()
LOGGER.debug(f"Acquired library lock for {config.library_name} to optimize {design_to_optimize}")
has_library_lock = True
self._optimized_libraries[config.library_name] = library_lock
self._optimized_libraries[config.library_name] = Lock()

simulation_target, vopt_event = self._optimized_designs[design_to_optimize].values()

if optimize:
if not has_library_lock:
with self._vopt_lock:
library_lock = self._optimized_libraries[config.library_name]
self._acquire_library_lock(library, config, design_to_optimize)

LOGGER.debug(f"Waiting for library lock for {config.library_name} to optimize {design_to_optimize}")
# Do not completely block to allow for Ctrl+C
while not library_lock.acquire(timeout=0.05):
pass
while (Path(library.directory) / "_lock").exists():
LOGGER.debug(f"Waiting for {Path(library.directory) / "_lock"} to be removed")
sleep(0.05)
LOGGER.debug(f"Acquired library lock for {config.library_name} to optimize {design_to_optimize}")

LOGGER.debug(f"Optimizing {design_to_optimize}")
LOGGER.debug("Optimizing %s", design_to_optimize)

# vopt has limitations on how the optimized design can be named. Simply removing
# non-alphanumeric characters is a simple solution to that
Expand All @@ -296,112 +386,47 @@ def _optimize(self, config, script_path):
with self._vopt_lock:
self._optimized_designs[design_to_optimize]["simulation_target"] = simulation_target

vopt_flags = [
f"{design_to_optimize}",
f"-work {{{config.library_name}}}",
"-quiet",
f"-floatgenerics+{config.entity_name}.",
f"-o {{{simulation_target}}}",
self._vopt_extra_args(config),
]

# There is a known bug in modelsim that prevents the -modelsimini flag from accepting
# a space in the path even with escaping, see issue #36
if " " not in self._sim_cfg_file_name:
modelsimini_option = f"-modelsimini {fix_path(self._sim_cfg_file_name)!s}"
vopt_flags.insert(0, modelsimini_option)

library_option = []
for library in self._libraries:
library_option += ["-L", library.name]

vopt_flags += library_option

tcl = """
proc vunit_optimize {{vopt_extra_args ""}} {"""
tcl += """
set vopt_failed [catch {{
eval vopt ${{vopt_extra_args}} {{{vopt_flags}}}
}}]
if {{${{vopt_failed}}}} {{
echo Command 'vopt ${{vopt_extra_args}} {vopt_flags}' failed
echo Bad flag from vopt_extra_args?
return true
}}
return False
}}
""".format(
vopt_flags=" ".join(vopt_flags)
)

optimize_file_name = script_path / "optimize.do"
write_file(str(optimize_file_name), tcl)
write_file(
str(optimize_file_name), self._create_optimize_script(design_to_optimize, simulation_target, config)
)

if self._persistent_shell is not None:
try:
self._persistent_shell.execute(f'source "{fix_path(str(optimize_file_name))!s}"')
self._persistent_shell.execute("set failed [vunit_optimize]")
if self._persistent_shell.read_bool("failed"):
status = False
else:
status = True

except Process.NonZeroExitCode:
status = False
status = self._run_persistent_optimize(optimize_file_name)

else:
tcl = f"""\
onerror {{quit -code 1}}
source "{fix_path(str(optimize_file_name))!s}"
set failed [vunit_optimize]
if {{$failed}} {{quit -code 1}}
quit -code 0
"""
onerror {{quit -code 1}}
source "{fix_path(str(optimize_file_name))!s}"
set failed [vunit_optimize]
if {{$failed}} {{quit -code 1}}
quit -code 0
"""
batch_file_name = script_path / "batch_optimize.do"
write_file(str(batch_file_name), tcl)

try:
args = [
str(Path(self._prefix) / "vsim"),
"-c",
"-l",
str(script_path / "transcript"),
"-do",
f'source "{fix_path(str(batch_file_name))!s}"',
]

proc = Process(args, cwd=str(Path(self._sim_cfg_file_name).parent))
proc.consume_output()
status = True
except Process.NonZeroExitCode:
status = False
status = self._run_optimize_batch_file(batch_file_name, script_path)

with self._vopt_lock:
if not status:
LOGGER.debug(f"Failed to optimize {design_to_optimize}.")
else:
while (Path(library.directory) / "_lock").exists():
LOGGER.debug(f"Waiting for {Path(library.directory) / "_lock"} to be removed")
sleep(0.05)
LOGGER.debug(f"{design_to_optimize} optimization completed.")
self._optimized_designs[design_to_optimize]["vopt_event"].set()
self._optimized_libraries[config.library_name].release()
self._release_library_lock(library, config)

if not status:
LOGGER.debug("Failed to optimize %s.", design_to_optimize)
return False

LOGGER.debug("%s optimization completed.", design_to_optimize)
self._optimized_designs[design_to_optimize]["vopt_event"].set()

elif not simulation_target:
LOGGER.debug(f"Waiting for {design_to_optimize} to be optimized.")
LOGGER.debug("Waiting for %s to be optimized.", design_to_optimize)
# Do not completely block to allow for Ctrl+C
while not vopt_event.wait(0.05):
pass
LOGGER.debug(f"Done waiting for {design_to_optimize} to be optimized.")
LOGGER.debug("Done waiting for %s to be optimized.", design_to_optimize)
with self._vopt_lock:
simulation_target = self._optimized_designs[design_to_optimize]["simulation_target"]

else:
LOGGER.debug(f"Reusing optimized {design_to_optimize}.")
LOGGER.debug("Reusing optimized %s.", design_to_optimize)

return simulation_target

Expand Down Expand Up @@ -450,11 +475,8 @@ def _create_load_function(self, test_suite_name, config, output_path, simulation
modelsimini_option = f"-modelsimini {fix_path(self._sim_cfg_file_name)!s}"
vsim_flags.insert(0, modelsimini_option)

library_option = []
for library in self._libraries:
library_option += ["-L", library.name]

vsim_flags += library_option
vsim_flags += ["-L", library.name]

vhdl_assert_stop_level_mapping = {"warning": 1, "error": 2, "failure": 3}

Expand Down
4 changes: 3 additions & 1 deletion vunit/sim_if/rivierapro.py
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,9 @@ def _get_mapped_libraries(self, library_cfg_file):
libraries[key] = str((Path(library_cfg_file).parent / (Path(value).parent)).resolve())
return libraries

def _create_load_function(self, test_suite_name, config, output_path): # pylint: disable=unused-argument
def _create_load_function(
self, test_suite_name, config, output_path, simulation_target
): # pylint: disable=unused-argument
"""
Create the vunit_load TCL function that runs the vsim command and loads the design
"""
Expand Down
8 changes: 4 additions & 4 deletions vunit/sim_if/vsim_simulator_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -307,11 +307,11 @@ def _run_persistent(self, common_file_name, load_only=False):
except Process.NonZeroExitCode:
return False

def _optimize(self, config, script_path):
def _optimize(self, config, script_path): # pylint: disable=unused-argument
"""
Return simulation target or False if optimization failed or None the optimization step isn't supported.
Return simulation target or False if optimization failed.
"""
return None
return "Optimize not supported"

def simulate(self, output_path, test_suite_name, config, elaborate_only):
"""
Expand All @@ -324,7 +324,7 @@ def simulate(self, output_path, test_suite_name, config, elaborate_only):
batch_file_name = script_path / "batch.do"

simulation_target = self._optimize(config, script_path)
if simulation_target == False:
if simulation_target is False:
return False

write_file(
Expand Down

0 comments on commit 6bcd461

Please # to comment.