Skip to content

Commit

Permalink
feat: added vo to index names
Browse files Browse the repository at this point in the history
  • Loading branch information
fstagni committed Jun 18, 2024
1 parent b2f7fb3 commit 33b4687
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 31 deletions.
4 changes: 2 additions & 2 deletions src/DIRAC/Core/Utilities/ElasticSearchDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,14 +273,14 @@ def getDoc(self, index: str, docID: str) -> dict:
return S_ERROR(re)

@ifConnected
def getDocs(self, indexFunc, docIDs: list[str]) -> list[dict]:
def getDocs(self, indexFunc, docIDs: list[str], vo: str) -> list[dict]:
"""Efficiently retrieve many documents from an index.
:param index: name of the index
:param docIDs: document IDs
"""
sLog.debug(f"Retrieving documents {docIDs}")
docs = [{"_index": indexFunc(docID), "_id": docID} for docID in docIDs]
docs = [{"_index": indexFunc(docID, vo), "_id": docID} for docID in docIDs]
try:
response = self.client.mget({"docs": docs})
except RequestError as re:
Expand Down
32 changes: 16 additions & 16 deletions src/DIRAC/WorkloadManagementSystem/DB/JobParametersDB.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,13 +42,13 @@ def __init__(self, parentLogger=None):
except Exception as ex:
raise RuntimeError("Can't connect to JobParameters index") from ex

def _indexName(self, jobID: int) -> str:
def _indexName(self, jobID: int, vo: str) -> str:
"""construct the index name
:param jobID: Job ID
"""
indexSplit = int(int(jobID) // 1e6)
return f"{self.index_name}_{indexSplit}m"
return f"{self.index_name}_{vo}_{indexSplit}m"

def _createIndex(self, indexName: str) -> None:
"""Create a new index if needed
Expand All @@ -64,7 +64,7 @@ def _createIndex(self, indexName: str) -> None:
raise RuntimeError(result["Message"])
self.log.always("Index created:", indexName)

def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dict:
def getJobParameters(self, jobIDs: Union[int, list[int]], vo: str,paramList=None) -> dict:
"""Get Job Parameters defined for jobID.
Returns a dictionary with the Job Parameters.
If paramList is empty - all the parameters are returned.
Expand All @@ -80,7 +80,7 @@ def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dic
paramList = paramList.replace(" ", "").split(",")
self.log.debug(f"JobDB.getParameters: Getting Parameters for jobs {jobIDs}")

res = self.getDocs(self._indexName, jobIDs)
res = self.getDocs(self._indexName, jobIDs, vo)
if not res["OK"]:
return res
result = {}
Expand All @@ -92,7 +92,7 @@ def getJobParameters(self, jobIDs: Union[int, list[int]], paramList=None) -> dic

return S_OK(result)

def setJobParameter(self, jobID: int, key: str, value: str) -> dict:
def setJobParameter(self, jobID: int, key: str, value: str, vo: str) -> dict:
"""
Inserts data into JobParametersDB index
Expand All @@ -109,18 +109,18 @@ def setJobParameter(self, jobID: int, key: str, value: str) -> dict:
# The _id in ES can't exceed 512 bytes, this is a ES hard-coded limitation.

# If a record with this jobID update and add parameter, otherwise create a new record
if self.existsDoc(self._indexName(jobID), docID=str(jobID)):
if self.existsDoc(self._indexName(jobID, vo), docID=str(jobID)):
self.log.debug("A document for this job already exists, it will now be updated")
result = self.updateDoc(index=self._indexName(jobID), docID=str(jobID), body={"doc": data})
result = self.updateDoc(index=self._indexName(jobID, vo), docID=str(jobID), body={"doc": data})
else:
self.log.debug("No document has this job id, creating a new document for this job")
self._createIndex(self._indexName(jobID))
self._createIndex(self._indexName(jobID, vo))
result = self.index(indexName=self._indexName(jobID), body=data, docID=str(jobID))
if not result["OK"]:
self.log.error("Couldn't insert or update data", result["Message"])
return result

def setJobParameters(self, jobID: int, parameters: list) -> dict:
def setJobParameters(self, jobID: int, parameters: list, vo: str) -> dict:
"""
Inserts data into JobParametersDB index using bulk indexing
Expand All @@ -135,18 +135,18 @@ def setJobParameters(self, jobID: int, parameters: list) -> dict:
parametersDict["JobID"] = jobID
parametersDict["timestamp"] = int(TimeUtilities.toEpochMilliSeconds())

if self.existsDoc(self._indexName(jobID), docID=str(jobID)):
if self.existsDoc(self._indexName(jobID, vo), docID=str(jobID)):
self.log.debug("A document for this job already exists, it will now be updated")
result = self.updateDoc(index=self._indexName(jobID), docID=str(jobID), body={"doc": parametersDict})
result = self.updateDoc(index=self._indexName(jobID, vo), docID=str(jobID), body={"doc": parametersDict})
else:
self.log.debug("Creating a new document for this job")
self._createIndex(self._indexName(jobID))
result = self.index(self._indexName(jobID), body=parametersDict, docID=str(jobID))
self._createIndex(self._indexName(jobID, vo))
result = self.index(self._indexName(jobID, vo), body=parametersDict, docID=str(jobID))
if not result["OK"]:
self.log.error("Couldn't insert or update data", result["Message"])
return result

def deleteJobParameters(self, jobID: int, paramList=None) -> dict:
def deleteJobParameters(self, jobID: int, paramList=None, vo: str="") -> dict:
"""Deletes Job Parameters defined for jobID.
Returns a dictionary with the Job Parameters.
If paramList is empty - all the parameters for the job are removed
Expand All @@ -163,13 +163,13 @@ def deleteJobParameters(self, jobID: int, paramList=None) -> dict:
if not paramList:
# Deleting the whole record
self.log.debug("Deleting record of job {jobID}")
result = self.deleteDoc(self._indexName(jobID), docID=str(jobID))
result = self.deleteDoc(self._indexName(jobID, vo), docID=str(jobID))
else:
# Deleting the specific parameters
self.log.debug(f"JobDB.getParameters: Deleting Parameters {paramList} for job {jobID}")
for paramName in paramList:
result = self.updateDoc(
index=self._indexName(jobID),
index=self._indexName(jobID, vo),
docID=str(jobID),
body={"script": "ctx._source.remove('" + paramName + "')"},
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@

import time

from DIRAC import S_OK, S_ERROR
from DIRAC import S_ERROR, S_OK
from DIRAC.ConfigurationSystem.Client.Helpers import Registry
from DIRAC.Core.DISET.RequestHandler import RequestHandler
from DIRAC.Core.Utilities.DEncode import ignoreEncodeWarning
from DIRAC.Core.Utilities.ObjectLoader import ObjectLoader
Expand Down Expand Up @@ -159,8 +160,9 @@ def export_setJobParameter(cls, jobID, name, value):
"""Set arbitrary parameter specified by name/value pair
for job specified by its JobId
"""

return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value) # pylint: disable=no-member
credDict = cls.getRemoteCredentials()
vo = credDict.get("VO", Registry.getVOForGroup(credDict["group"]))
return cls.elasticJobParametersDB.setJobParameter(int(jobID), name, value, vo=vo) # pylint: disable=no-member

###########################################################################
types_setJobsParameter = [dict]
Expand Down Expand Up @@ -196,7 +198,9 @@ def export_setJobParameters(cls, jobID, parameters):
"""Set arbitrary parameters specified by a list of name/value pairs
for job specified by its JobId
"""
result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters)
credDict = cls.getRemoteCredentials()
vo = credDict.get("VO", Registry.getVOForGroup(credDict["group"]))
result = cls.elasticJobParametersDB.setJobParameters(int(jobID), parameters, vo=vo)
if not result["OK"]:
cls.log.error("Failed to add Job Parameters to JobParametersDB", result["Message"])

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ def test_setAndGetJobFromDB():
assert res["Value"][101]["someKey"] == "value101"
assert len(res["Value"]) == 1
assert len(res["Value"][101]) == 5 # Same thing as with doc 100
res = elasticJobParametersDB.setJobParameters(101, [("k", "v"), ("k1", "v1"), ("k2", "v2")])
res = elasticJobParametersDB.setJobParameters(101, [("k", "v"), ("k1", "v1"), ("k2", "v2")], vo="vo")
assert res["OK"]
time.sleep(SLEEP_DELAY)
res = elasticJobParametersDB.getJobParameters(101)
Expand All @@ -108,41 +108,41 @@ def test_setAndGetJobFromDB():
assert res["Value"][101]["k2"] == "v2"

# another job with jobID > 1000000
res = elasticJobParametersDB.setJobParameters(1010000, [("k", "v"), ("k1", "v1"), ("k2", "v2")])
res = elasticJobParametersDB.setJobParameters(1010000, [("k", "v"), ("k1", "v1"), ("k2", "v2")], vo="vo")
assert res["OK"]
time.sleep(SLEEP_DELAY)
res = elasticJobParametersDB.getJobParameters(1010000)
assert res["Value"][1010000]["k"] == "v"
assert res["Value"][1010000]["k2"] == "v2"

# deleting
res = elasticJobParametersDB.deleteJobParameters(100)
res = elasticJobParametersDB.deleteJobParameters(100, vo="vo")
assert res["OK"]
time.sleep(SLEEP_DELAY)
res = elasticJobParametersDB.getJobParameters(100)
assert res["OK"]
assert len(res["Value"][100]) == 0

res = elasticJobParametersDB.deleteJobParameters(101, "someKey")
res = elasticJobParametersDB.deleteJobParameters(101, "someKey", vo="vo")
assert res["OK"]
time.sleep(SLEEP_DELAY)
res = elasticJobParametersDB.getJobParameters(101)
assert res["OK"]
assert len(res["Value"][101]) == 7
res = elasticJobParametersDB.deleteJobParameters(101, "someKey, key101") # someKey is already deleted
res = elasticJobParametersDB.deleteJobParameters(101, "someKey, key101", vo="vo") # someKey is already deleted
assert res["OK"]
time.sleep(SLEEP_DELAY)
res = elasticJobParametersDB.getJobParameters(101)
assert res["OK"]
assert len(res["Value"][101]) == 6
res = elasticJobParametersDB.deleteJobParameters(101, "nonExistingKey")
res = elasticJobParametersDB.deleteJobParameters(101, "nonExistingKey", vo="vo")
assert res["OK"]
time.sleep(SLEEP_DELAY)
res = elasticJobParametersDB.getJobParameters(101)
assert res["OK"]
assert len(res["Value"][101]) == 6

res = elasticJobParametersDB.deleteJobParameters(1010000)
res = elasticJobParametersDB.deleteJobParameters(1010000, vo="vo")
assert res["OK"]
time.sleep(SLEEP_DELAY)
res = elasticJobParametersDB.getJobParameters(1010000)
Expand All @@ -153,7 +153,7 @@ def test_setAndGetJobFromDB():
res = elasticJobParametersDB.deleteIndex("job_parameters")
assert res["OK"]
assert res["Value"] == "Nothing to delete"
res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB._indexName(100))
res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB._indexName(100, vo="vo"))
assert res["OK"]
res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB._indexName(1010000))
res = elasticJobParametersDB.deleteIndex(elasticJobParametersDB._indexName(1010000, vo="vo"))
assert res["OK"]

0 comments on commit 33b4687

Please sign in to comment.