diff --git a/src/DIRAC/Core/Utilities/File.py b/src/DIRAC/Core/Utilities/File.py index 9548faea8a6..2141eaa041f 100755 --- a/src/DIRAC/Core/Utilities/File.py +++ b/src/DIRAC/Core/Utilities/File.py @@ -4,15 +4,16 @@ By default on Error they return None. """ -import os +import errno +import glob import hashlib +import os import random -import glob -import sys import re -import errno import stat +import sys import tempfile +import threading from contextlib import contextmanager # Translation table of a given unit to Bytes @@ -277,6 +278,32 @@ def secureOpenForWrite(filename=None, *, text=True): yield fd, filename +def safe_listdir(directory, timeout=60): + """This is a "safe" list directory, + for lazily-loaded File Systems like CVMFS. + There's by default a 60 seconds timeout. + + :param str directory: directory to list + :param int timeout: optional timeout, in seconds. Defaults to 60. + """ + + def listdir(directory): + try: + return os.listdir(directory) + except FileNotFoundError: + print(f"{directory} not found") + return [] + + contents = [] + t = threading.Thread(target=lambda: contents.extend(listdir(directory))) + t.daemon = True # don't delay program's exit + t.start() + t.join(timeout) + if t.is_alive(): + return None # timeout + return contents + + if __name__ == "__main__": for p in sys.argv[1:]: print(f"{p} : {getGlobbedTotalSize(p)} bytes") diff --git a/src/DIRAC/Core/Utilities/Os.py b/src/DIRAC/Core/Utilities/Os.py index 103d0676575..45d431c2b47 100755 --- a/src/DIRAC/Core/Utilities/Os.py +++ b/src/DIRAC/Core/Utilities/Os.py @@ -3,12 +3,14 @@ by default on Error they return None """ import os +import platform import shutil import DIRAC +from DIRAC.Core.Utilities import List from DIRAC.Core.Utilities.Decorators import deprecated +from DIRAC.Core.Utilities.File import safe_listdir from DIRAC.Core.Utilities.Subprocess import shellCall, systemCall -from DIRAC.Core.Utilities import List DEBUG = 0 @@ -150,3 +152,26 @@ def sourceEnv(timeout, cmdTuple, inputEnv=None): @deprecated("Will be removed in DIRAC 8.1", onlyOnce=True) def which(executable): return shutil.which(executable) + + +def findImage(operating_system="alma9"): + """Finds the image for the current platform""" + from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations + + plat = DIRAC.gConfig.getValue("LocalSite/Platform", platform.machine()) + if plat == "x86_64": + plat = "amd64" + + CVMFS_locations = DIRAC.gConfig.getValue( + "LocalSite/CVMFS_locations", Operations().getValue("Pilot/CVMFS_locations", []) + ) + + rootImage = None + + for candidate in CVMFS_locations: + rootImage = os.path.join(candidate, "containers", "os-base", f"{operating_system}-devel", "prod", plat) + DIRAC.gLogger.verbose(f"Checking {rootImage} existence") + if safe_listdir(rootImage): + break + + return rootImage diff --git a/src/DIRAC/Resources/Computing/SingularityComputingElement.py b/src/DIRAC/Resources/Computing/SingularityComputingElement.py index f0e153d11b4..e0ce4d185e8 100644 --- a/src/DIRAC/Resources/Computing/SingularityComputingElement.py +++ b/src/DIRAC/Resources/Computing/SingularityComputingElement.py @@ -11,8 +11,6 @@ See the Configuration/Resources/Computing documention for details on where to set the option parameters. """ - -import io import json import os import re @@ -21,7 +19,8 @@ import tempfile import DIRAC -from DIRAC import S_OK, S_ERROR, gConfig, gLogger +from DIRAC import S_ERROR, S_OK, gConfig, gLogger +from DIRAC.Core.Utilities.Os import findImage from DIRAC.Core.Utilities.Subprocess import systemCall from DIRAC.ConfigurationSystem.Client.Helpers import Operations from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler @@ -31,6 +30,7 @@ # Default container to use if it isn't specified in the CE options CONTAINER_DEFROOT = "/cvmfs/cernvm-prod.cern.ch/cvm4" +OPERATING_SYSTEM = "alma9" CONTAINER_WORKDIR = "DIRAC_containers" CONTAINER_INNERDIR = "/tmp" @@ -107,9 +107,6 @@ def __init__(self, ceUniqueID): super().__init__(ceUniqueID) self.__submittedJobs = 0 self.__runningJobs = 0 - self.__root = CONTAINER_DEFROOT - if "ContainerRoot" in self.ceParameters: - self.__root = self.ceParameters["ContainerRoot"] self.__workdir = CONTAINER_WORKDIR self.__innerdir = CONTAINER_INNERDIR self.__singularityBin = "singularity" @@ -147,7 +144,7 @@ def __hasSingularity(self): self.log.debug(f'Use singularity from "{self.__singularityBin}"') return True if "PATH" not in os.environ: - return False # Hmm, PATH not set? How unusual... + return False # PATH might not be set (e.g. HTCondorCE) searchPaths = os.environ["PATH"].split(os.pathsep) # We can use CVMFS as a last resort if userNS is enabled if self.__hasUserNS(): @@ -359,8 +356,6 @@ def submitJob(self, executableFile, proxy=None, **kwargs): :return: S_OK(payload exit code) / S_ERROR() if submission issue """ - rootImage = self.__root - renewTask = None # Check that singularity is available if not self.__hasSingularity(): self.log.error("Singularity is not installed on PATH.") @@ -375,6 +370,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs): baseDir = ret["baseDir"] tmpDir = ret["tmpDir"] + renewTask = False if proxy: payloadProxyLoc = ret["proxyLocation"] @@ -449,6 +445,9 @@ def submitJob(self, executableFile, proxy=None, **kwargs): containerOpts = self.ceParameters["ContainerOptions"].split(",") for opt in containerOpts: cmd.extend([opt.strip()]) + + rootImage = findImage() or self.ceParameters.get("ContainerRoot") or CONTAINER_DEFROOT + if os.path.isdir(rootImage) or os.path.isfile(rootImage): cmd.extend([rootImage, innerCmd]) else: