Skip to content

Commit

Permalink
feat: SingularityCE: looking for the platform-aware image in CVMFS lo…
Browse files Browse the repository at this point in the history
…cation
  • Loading branch information
fstagni committed May 23, 2024
1 parent c111713 commit 79ff8f5
Show file tree
Hide file tree
Showing 3 changed files with 65 additions and 14 deletions.
35 changes: 31 additions & 4 deletions src/DIRAC/Core/Utilities/File.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,16 @@
By default on Error they return None.
"""

import os
import errno
import glob
import hashlib
import os
import random
import glob
import sys
import re
import errno
import stat
import sys
import tempfile
import threading
from contextlib import contextmanager

# Translation table of a given unit to Bytes
Expand Down Expand Up @@ -277,6 +278,32 @@ def secureOpenForWrite(filename=None, *, text=True):
yield fd, filename


def safe_listdir(directory, timeout=60):
"""This is a "safe" list directory,
for lazily-loaded File Systems like CVMFS.
There's by default a 60 seconds timeout.
:param str directory: directory to list
:param int timeout: optional timeout, in seconds. Defaults to 60.
"""

def listdir(directory):
try:
return os.listdir(directory)
except FileNotFoundError:
print(f"{directory} not found")
return []

contents = []
t = threading.Thread(target=lambda: contents.extend(listdir(directory)))
t.daemon = True # don't delay program's exit
t.start()
t.join(timeout)
if t.is_alive():
return None # timeout
return contents


if __name__ == "__main__":
for p in sys.argv[1:]:
print(f"{p} : {getGlobbedTotalSize(p)} bytes")
27 changes: 26 additions & 1 deletion src/DIRAC/Core/Utilities/Os.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,14 @@
by default on Error they return None
"""
import os
import platform
import shutil

import DIRAC
from DIRAC.Core.Utilities import List
from DIRAC.Core.Utilities.Decorators import deprecated
from DIRAC.Core.Utilities.File import safe_listdir
from DIRAC.Core.Utilities.Subprocess import shellCall, systemCall
from DIRAC.Core.Utilities import List

DEBUG = 0

Expand Down Expand Up @@ -150,3 +152,26 @@ def sourceEnv(timeout, cmdTuple, inputEnv=None):
@deprecated("Will be removed in DIRAC 8.1", onlyOnce=True)
def which(executable):
return shutil.which(executable)


def findImage(operating_system="alma9"):
"""Finds the image for the current platform"""
from DIRAC.ConfigurationSystem.Client.Helpers.Operations import Operations

plat = DIRAC.gConfig.getValue("LocalSite/Platform", platform.machine())
if plat == "x86_64":
plat = "amd64"

CVMFS_locations = DIRAC.gConfig.getValue(
"LocalSite/CVMFS_locations", Operations().getValue("Pilot/CVMFS_locations", [])
)

rootImage = None

for candidate in CVMFS_locations:
rootImage = os.path.join(candidate, "containers", "os-base", f"{operating_system}-devel", "prod", plat)
DIRAC.gLogger.verbose(f"Checking {rootImage} existence")
if safe_listdir(rootImage):
break

return rootImage
17 changes: 8 additions & 9 deletions src/DIRAC/Resources/Computing/SingularityComputingElement.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@
See the Configuration/Resources/Computing documention for details on
where to set the option parameters.
"""

import io
import json
import os
import re
Expand All @@ -21,7 +19,8 @@
import tempfile

import DIRAC
from DIRAC import S_OK, S_ERROR, gConfig, gLogger
from DIRAC import S_ERROR, S_OK, gConfig, gLogger
from DIRAC.Core.Utilities.Os import findImage
from DIRAC.Core.Utilities.Subprocess import systemCall
from DIRAC.ConfigurationSystem.Client.Helpers import Operations
from DIRAC.Core.Utilities.ThreadScheduler import gThreadScheduler
Expand All @@ -31,6 +30,7 @@

# Default container to use if it isn't specified in the CE options
CONTAINER_DEFROOT = "/cvmfs/cernvm-prod.cern.ch/cvm4"
OPERATING_SYSTEM = "alma9"
CONTAINER_WORKDIR = "DIRAC_containers"
CONTAINER_INNERDIR = "/tmp"

Expand Down Expand Up @@ -107,9 +107,6 @@ def __init__(self, ceUniqueID):
super().__init__(ceUniqueID)
self.__submittedJobs = 0
self.__runningJobs = 0
self.__root = CONTAINER_DEFROOT
if "ContainerRoot" in self.ceParameters:
self.__root = self.ceParameters["ContainerRoot"]
self.__workdir = CONTAINER_WORKDIR
self.__innerdir = CONTAINER_INNERDIR
self.__singularityBin = "singularity"
Expand Down Expand Up @@ -147,7 +144,7 @@ def __hasSingularity(self):
self.log.debug(f'Use singularity from "{self.__singularityBin}"')
return True
if "PATH" not in os.environ:
return False # Hmm, PATH not set? How unusual...
return False # PATH might not be set (e.g. HTCondorCE)
searchPaths = os.environ["PATH"].split(os.pathsep)
# We can use CVMFS as a last resort if userNS is enabled
if self.__hasUserNS():
Expand Down Expand Up @@ -359,8 +356,6 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
:return: S_OK(payload exit code) / S_ERROR() if submission issue
"""
rootImage = self.__root
renewTask = None
# Check that singularity is available
if not self.__hasSingularity():
self.log.error("Singularity is not installed on PATH.")
Expand All @@ -375,6 +370,7 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
baseDir = ret["baseDir"]
tmpDir = ret["tmpDir"]

renewTask = False
if proxy:
payloadProxyLoc = ret["proxyLocation"]

Expand Down Expand Up @@ -449,6 +445,9 @@ def submitJob(self, executableFile, proxy=None, **kwargs):
containerOpts = self.ceParameters["ContainerOptions"].split(",")
for opt in containerOpts:
cmd.extend([opt.strip()])

rootImage = findImage() or self.ceParameters.get("ContainerRoot") or CONTAINER_DEFROOT

if os.path.isdir(rootImage) or os.path.isfile(rootImage):
cmd.extend([rootImage, innerCmd])
else:
Expand Down

0 comments on commit 79ff8f5

Please sign in to comment.