Skip to content

Commit

Permalink
mutex savenexus calls to prevent segfaults (#544)
Browse files Browse the repository at this point in the history
* mutex savenexus calls to prevent segfaults

add back the sleeps

remove algo from manager when complete

added timeout in case of long async algo queue

reorg where we check and wait for algos to complete, share a lock between potential cross interference

both reentrant and nonconcurrent mutexes

* cleanup timeout member, add some tests

* only apply wait and remove to nonconcurrent case

* update tests to account for timeout only applying to nonconcurrent algos

* fix spelling mistake
  • Loading branch information
walshmm authored Feb 18, 2025
1 parent c5e5379 commit 53e9326
Show file tree
Hide file tree
Showing 3 changed files with 123 additions and 4 deletions.
31 changes: 29 additions & 2 deletions src/snapred/backend/recipe/algorithm/MantidSnapper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from collections import namedtuple
from threading import Lock

Expand Down Expand Up @@ -38,7 +39,10 @@ class MantidSnapper:
##
## KNOWN NON-REENTRANT ALGORITHMS
##
_algorithmMutexes = {"LoadLiveData": Lock()}
_nonReentrantMutexes = {"LoadLiveData": Lock()}

_nonConcurrentAlgorithms = "SaveNexus", "SaveNexusESS", "SaveDiffCal", "RenameWorkspace"
_nonConcurrentAlgorithmMutex = Lock()

typeTranslationTable = {"string": str, "number": float, "dbl list": list, "boolean": bool}
_mtd = _CustomMtd()
Expand Down Expand Up @@ -78,6 +82,8 @@ def __init__(self, parentAlgorithm, name):
self._algorithmQueue = []
self._exportScript = ""
self._export = False
self.timeout = 60 # seconds
self.checkInterval = 0.05 # 50 ms
if self._export:
self._cleanOldExport()

Expand Down Expand Up @@ -141,12 +147,27 @@ def createAlgorithm(self, name):
alg.setRethrows(True)
return alg

def _waitForAlgorithmCompletion(self, name):
currentTimeout = 0
while len(AlgorithmManager.runningInstancesOf(name)) > 0:
if currentTimeout >= self.timeout:
raise TimeoutError(f"Timeout occurred while waiting for instance of {name} to cleanup")
currentTimeout += self.checkInterval
time.sleep(self.checkInterval)

def obtainMutex(self, name):
mutex = self._nonReentrantMutexes.get(name)
if mutex is None and name in self._nonConcurrentAlgorithms:
mutex = self._nonConcurrentAlgorithmMutex
return mutex

def executeAlgorithm(self, name, outputs, **kwargs):
algorithm = self.createAlgorithm(name)
mutex = None
try:
# Protect non-reentrant algorithms.
mutex = self._algorithmMutexes.get(name)

mutex = self.obtainMutex(name)
if mutex is not None:
mutex.acquire()

Expand Down Expand Up @@ -180,9 +201,15 @@ def executeAlgorithm(self, name, outputs, **kwargs):
self.cleanup()
raise AlgorithmException(name, str(e)) from e
finally:
self._cleanupNonConcurrent(name, algorithm)
if mutex is not None:
mutex.release()

def _cleanupNonConcurrent(self, name, algorithm):
if name in self._nonConcurrentAlgorithms:
self._waitForAlgorithmCompletion(name)
AlgorithmManager.removeById(algorithm.getAlgorithmID())

def executeQueue(self):
if self.parentAlgorithm:
self._prog_reporter = Progress(self.parentAlgorithm, start=0.0, end=1.0, nreports=self._endrange)
Expand Down
39 changes: 39 additions & 0 deletions tests/cis_tests/recreate_save_reduction_segfault.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from snapred.backend.dao.reduction.ReductionRecord import ReductionRecord
from snapred.backend.recipe.algorithm.MantidSnapper import MantidSnapper
from snapred.backend.recipe.algorithm.Utensils import Utensils
import h5py
from snapred.backend.data.NexusHDF5Metadata import NexusHDF5Metadata as n5m
from snapred.meta.redantic import parse_file_as
import time

inputFile = "/SNS/users/wqp/SNAP/IPTS-24641/shared/SNAPRed/04bd2c53f6bf6754/lite/46680/2024-11-26T121144/reduced_046680_2024-11-26T121144.nxs.h5"
outputFile = "/SNS/users/wqp/tmp/test_segfault_append.nxs"
record = parse_file_as(ReductionRecord, "/SNS/users/wqp/SNAP/IPTS-24641/shared/SNAPRed/04bd2c53f6bf6754/lite/46680/2024-11-26T121144/ReductionRecord.json")

untensils = Utensils()
untensils.PyInit()
mantidSnapper = untensils.mantidSnapper

ws = mantidSnapper.LoadNexus("..", Filename=inputFile, OutputWorkspace="ws")
mantidSnapper.executeQueue()

with open("snapred_script.log", "w") as f:
import faulthandler
faulthandler.enable(file=f)
try:
for i in range(100):
for wsName in mantidSnapper.mtd[ws].getNames():
mantidSnapper.SaveNexus("..", InputWorkspace=wsName, Filename=outputFile, Append=True)
mantidSnapper.executeQueue()

# commented out to see if this somehow competed with SaveNexus, it does not
# with h5py.File(outputFile, "a") as h5:
# n5m.insertMetadataGroup(h5, record.dict(), "/metadata")

import os
os.remove(outputFile)
finally:
faulthandler.disable()



57 changes: 55 additions & 2 deletions tests/unit/backend/recipe/algorithm/test_MantidSnapper.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
import unittest
from unittest import mock

import pytest
from mantid.kernel import Direction

from snapred.backend.recipe.algorithm.MantidSnapper import MantidSnapper
from snapred.meta.Callback import callback

PatchRoot: str = "snapred.backend.recipe.algorithm.MantidSnapper.{0}"


class TestMantidSnapper(unittest.TestCase):
def setUp(self):
Expand All @@ -17,19 +20,69 @@ def setUp(self):
self.fakeFunction.getProperties.return_value = [self.fakeOutput]
self.fakeFunction.getProperty.return_value = self.fakeOutput

@mock.patch("snapred.backend.recipe.algorithm.MantidSnapper.AlgorithmManager")
@mock.patch(PatchRoot.format("AlgorithmManager"))
def test_snapper_fake_algo(self, mock_AlgorithmManager):
mock_AlgorithmManager.create.return_value = self.fakeFunction
return_of_algo = "return of algo"
mantidSnapper = MantidSnapper(parentAlgorithm=None, name="")
test = mantidSnapper.fakeFunction("test", fakeOutput=return_of_algo)
assert str(test.__class__) == str(callback(return_of_algo.__class__).__class__)

@mock.patch("snapred.backend.recipe.algorithm.MantidSnapper.AlgorithmManager")
@mock.patch(PatchRoot.format("AlgorithmManager"))
def test_snapper_fake_queue(self, mock_AlgorithmManager):
mock_AlgorithmManager.create.return_value = self.fakeFunction
return_of_algo = "return of algo"
mantidSnapper = MantidSnapper(parentAlgorithm=None, name="")
mantidSnapper.fakeFunction("test", fakeOutput=return_of_algo)
mantidSnapper.executeQueue()
assert self.fakeFunction.execute.called

@mock.patch(PatchRoot.format("AlgorithmManager"))
def test_timeout(self, mockAlgorithmManager):
mockAlgorithmManager.runningInstancesOf = mock.Mock(return_value=["theAlgoThatNeverEnds"])
mockAlgorithmManager.create.return_value = self.fakeFunction

mantidSnapper = MantidSnapper(parentAlgorithm=None, name="")
mantidSnapper.timeout = 0.2
mantidSnapper.fakeFunction("test", fakeOutput="output")
mantidSnapper._nonConcurrentAlgorithms = mantidSnapper._nonConcurrentAlgorithms + ("fakeFunction",)
mantidSnapper._nonConcurrentAlgorithmMutex = mock.Mock()

with pytest.raises(TimeoutError, match="Timeout occurred while waiting for instance of"):
mantidSnapper.executeQueue()

@mock.patch(PatchRoot.format("AlgorithmManager"))
def test_timeout_concurrent(self, mockAlgorithmManager):
mockAlgorithmManager.runningInstancesOf = mock.Mock(return_value=["theAlgoThatNeverEnds"])
mockAlgorithmManager.create.return_value = self.fakeFunction

mantidSnapper = MantidSnapper(parentAlgorithm=None, name="")
mantidSnapper.timeout = 0.2
mantidSnapper.fakeFunction("test", fakeOutput="output")
mantidSnapper._nonConcurrentAlgorithmMutex = mock.Mock()

mantidSnapper.executeQueue()
assert not mantidSnapper._nonConcurrentAlgorithmMutex.acquire.called

@mock.patch(PatchRoot.format("AlgorithmManager"))
def test_mutexIsObtained_nonConcurrent(self, mockAlgorithmManager):
mockAlgorithmManager.create.return_value = self.fakeFunction
mantidSnapper = MantidSnapper(parentAlgorithm=None, name="")
mantidSnapper.fakeFunction("test", fakeOutput="output")
mantidSnapper._nonConcurrentAlgorithms = mantidSnapper._nonConcurrentAlgorithms + ("fakeFunction",)
mantidSnapper._nonConcurrentAlgorithmMutex = mock.Mock()

mantidSnapper.executeQueue()
assert mantidSnapper._nonConcurrentAlgorithmMutex.acquire.called
assert mantidSnapper._nonConcurrentAlgorithmMutex.release.called

@mock.patch(PatchRoot.format("AlgorithmManager"))
def test_mutexIsObtained_nonReentrant(self, mockAlgorithmManager):
mockAlgorithmManager.create.return_value = self.fakeFunction
mantidSnapper = MantidSnapper(parentAlgorithm=None, name="")
mantidSnapper.fakeFunction("test", fakeOutput="output")
mantidSnapper._nonReentrantMutexes["fakeFunction"] = mock.Mock()

mantidSnapper.executeQueue()
assert mantidSnapper._nonReentrantMutexes["fakeFunction"].acquire.called
assert mantidSnapper._nonReentrantMutexes["fakeFunction"].release.called

0 comments on commit 53e9326

Please # to comment.