Skip to content

Commit

Permalink
mutex savenexus calls to prevent segfaults
Browse files Browse the repository at this point in the history
add back the sleeps

remove algo from manager when complete

added timeout in case of long async algo queue

reorg where we check and wait for algos to complete, share a lock between potential cross interference

both reentrant and nonconcurrent mutexes
  • Loading branch information
walshmm committed Feb 17, 2025
1 parent c5e5379 commit de1412c
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 2 deletions.
27 changes: 25 additions & 2 deletions src/snapred/backend/recipe/algorithm/MantidSnapper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import time
from collections import namedtuple
from threading import Lock

Expand Down Expand Up @@ -38,7 +39,10 @@ class MantidSnapper:
##
## KNOWN NON-REENTRANT ALGORITHMS
##
_algorithmMutexes = {"LoadLiveData": Lock()}
_nonReentrantMutexes = {"LoadLiveData": Lock()}

_nonConcurrentAlgorithms = "SaveNexus", "SaveNexusESS", "SaveDiffCal", "RenameWorkspace"
_nonConcurrentAlgorithmMutex = Lock()

typeTranslationTable = {"string": str, "number": float, "dbl list": list, "boolean": bool}
_mtd = _CustomMtd()
Expand Down Expand Up @@ -141,12 +145,29 @@ def createAlgorithm(self, name):
alg.setRethrows(True)
return alg

def _waitForAlgorithmCompletion(self, name):
timeout = 60 # seconds
checkInterval = 0.05 # 50 ms
currentTimeout = 0
while len(AlgorithmManager.runningInstancesOf(name)) > 0:
if currentTimeout >= timeout:
raise RuntimeError(f"Timeout occured while waiting for instance of {name} to cleanup")
currentTimeout += checkInterval
time.sleep(checkInterval)

def obtainMutex(self, name):
mutex = self._nonReentrantMutexes.get(name)
if mutex is None and name in self._nonConcurrentAlgorithms:
mutex = self._nonConcurrentAlgorithmMutex
return mutex

def executeAlgorithm(self, name, outputs, **kwargs):
algorithm = self.createAlgorithm(name)
mutex = None
try:
# Protect non-reentrant algorithms.
mutex = self._algorithmMutexes.get(name)

mutex = self.obtainMutex(name)
if mutex is not None:
mutex.acquire()

Expand Down Expand Up @@ -180,6 +201,8 @@ def executeAlgorithm(self, name, outputs, **kwargs):
self.cleanup()
raise AlgorithmException(name, str(e)) from e
finally:
self._waitForAlgorithmCompletion(name)
AlgorithmManager.removeById(algorithm.getAlgorithmID())
if mutex is not None:
mutex.release()

Expand Down
39 changes: 39 additions & 0 deletions tests/cis_tests/recreate_save_reduction_segfault.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
from snapred.backend.dao.reduction.ReductionRecord import ReductionRecord
from snapred.backend.recipe.algorithm.MantidSnapper import MantidSnapper
from snapred.backend.recipe.algorithm.Utensils import Utensils
import h5py
from snapred.backend.data.NexusHDF5Metadata import NexusHDF5Metadata as n5m
from snapred.meta.redantic import parse_file_as
import time

inputFile = "/SNS/users/wqp/SNAP/IPTS-24641/shared/SNAPRed/04bd2c53f6bf6754/lite/46680/2024-11-26T121144/reduced_046680_2024-11-26T121144.nxs.h5"
outputFile = "/SNS/users/wqp/tmp/test_segfault_append.nxs"
record = parse_file_as(ReductionRecord, "/SNS/users/wqp/SNAP/IPTS-24641/shared/SNAPRed/04bd2c53f6bf6754/lite/46680/2024-11-26T121144/ReductionRecord.json")

untensils = Utensils()
untensils.PyInit()
mantidSnapper = untensils.mantidSnapper

ws = mantidSnapper.LoadNexus("..", Filename=inputFile, OutputWorkspace="ws")
mantidSnapper.executeQueue()

with open("snapred_script.log", "w") as f:
import faulthandler
faulthandler.enable(file=f)
try:
for i in range(100):
for wsName in mantidSnapper.mtd[ws].getNames():
mantidSnapper.SaveNexus("..", InputWorkspace=wsName, Filename=outputFile, Append=True)
mantidSnapper.executeQueue()

# commented out to see if this somehow competed with SaveNexus, it does not
# with h5py.File(outputFile, "a") as h5:
# n5m.insertMetadataGroup(h5, record.dict(), "/metadata")

import os
os.remove(outputFile)
finally:
faulthandler.disable()



0 comments on commit de1412c

Please # to comment.