diff --git a/src/DIRAC/Resources/Computing/BatchSystems/Condor.py b/src/DIRAC/Resources/Computing/BatchSystems/Condor.py index 8a79cacb02f..b37d40580d9 100644 --- a/src/DIRAC/Resources/Computing/BatchSystems/Condor.py +++ b/src/DIRAC/Resources/Computing/BatchSystems/Condor.py @@ -6,6 +6,7 @@ from __future__ import print_function from __future__ import absolute_import from __future__ import division +import json import re import tempfile import subprocess @@ -25,6 +26,8 @@ HOLD_REASON_SUBCODE = "55" +STATE_ATTRIBUTES = "ClusterId,ProcId,JobStatus,HoldReasonCode,HoldReasonSubCode,HoldReason" + subTemplate = """ # Environment # ----------- @@ -62,6 +65,7 @@ # Requirements # ------------ request_cpus = %(processors)s +requirements = NumJobStarts == 0 # Exit options # ------------ @@ -73,7 +77,8 @@ # A subcode of our choice to identify who put the job on hold on_exit_hold_subcode = %(holdReasonSubcode)s # Jobs are then deleted from the system after N days if they are not idle or running -periodic_remove = (JobStatus != 1) && (JobStatus != 2) && ((time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600)) +periodic_remove = ((JobStatus == 1) && (NumJobStarts > 0)) || \ + ((JobStatus != 1) && (JobStatus != 2) && ((time() - EnteredCurrentStatus) > (%(daysToKeepRemoteLogs)s * 24 * 3600)) # Specific options # ---------------- @@ -87,63 +92,34 @@ """ -def parseCondorStatus(lines, jobID): +def getCondorStatus(jobMetadata): """parse the condor_q or condor_history output for the job status - :param lines: list of lines from the output of the condor commands, each line is a tuple of jobID, statusID, and holdReasonCode - :type lines: python:list - :param str jobID: jobID of condor job, e.g.: 123.53 + :param jobMetadata: dict with job metadata + :type jobMetadata: dict[str, str | int] :returns: Status as known by DIRAC, and a reason if the job is being held """ - jobID = str(jobID) - - holdReason = "" - status = None - for line in lines: - l = line.strip().split() - - # Make sure the job ID exists - if len(l) < 1 or l[0] != jobID: - continue - - # Make sure the status is present and is an integer - try: - status = int(l[1]) - except (ValueError, IndexError): - break - - # Stop here if the status is not held (5): result should be found in STATES_MAP - if status != 5: - break - - # A job can be held for various reasons, - # we need to further investigate with the holdReasonCode & holdReasonSubCode - # Details in: - # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode - - # By default, a held (5) job is defined as Aborted in STATES_MAP, but there might be some exceptions - status = 3 - try: - holdReasonCode = l[2] - holdReasonSubcode = l[3] - holdReason = " ".join(l[4:]) - except IndexError: - # This should not happen in theory - # Just set the status to unknown such as - status = None - holdReasonCode = "undefined" - holdReasonSubcode = "undefined" - break - - # If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true) - # And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed - if holdReasonCode == "3" and holdReasonSubcode == HOLD_REASON_SUBCODE: - status = 5 - # If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting - elif holdReasonCode == "16": - status = 1 - - return (STATES_MAP.get(status, "Unknown"), holdReason) + if jobMetadata["JobStatus"] != 5: + # If the job is not held, we can return the status directly + return (STATES_MAP.get(jobMetadata["JobStatus"], "Unknown"), "") + + # A job can be held for various reasons, + # we need to further investigate with the holdReasonCode & holdReasonSubCode + # Details in: + # https://htcondor.readthedocs.io/en/latest/classad-attributes/job-classad-attributes.html#HoldReasonCode + + # By default, a held (5) job is defined as Aborted in STATES_MAP, but there might be some exceptions + status = 3 + + # If holdReasonCode is 3 (The PERIODIC_HOLD expression evaluated to True. Or, ON_EXIT_HOLD was true) + # And subcode is HOLD_REASON_SUBCODE, then it means the job failed by itself, it needs to be marked as Failed + if jobMetadata["HoldReasonCode"] == 3 and jobMetadata["HoldReasonSubCode"] == HOLD_REASON_SUBCODE: + status = 5 + # If holdReasonCode is 16 (Input files are being spooled), the job should be marked as Waiting + elif jobMetadata["HoldReasonCode"] == 16: + status = 1 + + return (STATES_MAP.get(status, "Unknown"), jobMetadata["HoldReason"]) class Condor(object): @@ -171,8 +147,6 @@ def submitJob(self, **kwargs): preamble = kwargs.get("Preamble") jdlFile = tempfile.NamedTemporaryFile(dir=outputDir, suffix=".jdl", mode="wt") - scheddOptions = 'requirements = OpSys == "LINUX"\n' - scheddOptions += "gentenv = False" jdlFile.write( subTemplate % dict( @@ -185,7 +159,7 @@ def submitJob(self, **kwargs): holdReasonSubcode=HOLD_REASON_SUBCODE, daysToKeepRemoteLogs=1, scheddOptions="", - extraString="", + extraString=submitOptions, pilotStampList=",".join(stamps), ) ) @@ -193,7 +167,7 @@ def submitJob(self, **kwargs): jdlFile.flush() cmd = "%s; " % preamble if preamble else "" - cmd += "condor_submit %s %s" % (submitOptions, jdlFile.name) + cmd += "condor_submit %s" % jdlFile.name sp = subprocess.Popen( cmd, shell=True, @@ -283,7 +257,6 @@ def killJob(self, **kwargs): def getJobStatus(self, **kwargs): """Get status of the jobs in the given list""" - resultDict = {} MANDATORY_PARAMETERS = ["JobIDList"] @@ -299,15 +272,11 @@ def getJobStatus(self, **kwargs): resultDict["Message"] = "Empty job list" return resultDict - user = kwargs.get("User") - if not user: - user = os.environ.get("USER") - if not user: - resultDict["Status"] = -1 - resultDict["Message"] = "No user name" - return resultDict + # Prepare the command to get the status of the jobs + cmdJobs = " ".join(str(jobID) for jobID in jobIDList) - cmd = "condor_q -submitter %s -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason" % user + # Get the status of the jobs currently active + cmd = "condor_q %s -attributes %s -json" % (cmdJobs, STATE_ATTRIBUTES) sp = subprocess.Popen( shlex.split(cmd), stdout=subprocess.PIPE, @@ -317,16 +286,15 @@ def getJobStatus(self, **kwargs): output, error = sp.communicate() status = sp.returncode - if status != 0: + if status != 0 or not output: resultDict["Status"] = status resultDict["Message"] = error return resultDict - qList = output.strip().split("\n") + jobsMetadata = json.loads(output) - condorHistCall = ( - "condor_history -af:j JobStatus HoldReasonCode HoldReasonSubCode HoldReason -submitter %s" % user - ) + # Get the status of the jobs in the history + condorHistCall = "condor_history %s -attributes %s -json" % (cmdJobs, STATE_ATTRIBUTES) sp = subprocess.Popen( shlex.split(condorHistCall), stdout=subprocess.PIPE, @@ -335,15 +303,26 @@ def getJobStatus(self, **kwargs): ) output, _ = sp.communicate() status = sp.returncode - if status == 0: - for line in output.split("\n"): - qList.append(line) + + if status != 0 or not output: + resultDict["Status"] = status + resultDict["Message"] = error + return resultDict + + jobsMetadata += json.loads(output) statusDict = {} - if len(qList): - for job in jobIDList: - job = str(job) - statusDict[job], _ = parseCondorStatus(qList, job) + # Build a set of job IDs found in jobsMetadata + foundJobIDs = set() + for jobDict in jobsMetadata: + jobID = "%s.%s" % (jobDict["ClusterId"], jobDict["ProcId"]) + statusDict[jobID], _ = getCondorStatus(jobDict) + foundJobIDs.add(jobID) + + # For job IDs not found, set status to "Unknown" + for jobID in jobIDList: + if str(jobID) not in foundJobIDs: + statusDict[str(jobID)] = "Unknown" # Final output status = 0 @@ -355,19 +334,30 @@ def getCEStatus(self, **kwargs): """Get the overall status of the CE""" resultDict = {} - user = kwargs.get("User") - if not user: - user = os.environ.get("USER") - if not user: + cmd = "condor_q -totals -json" + sp = subprocess.Popen( + shlex.split(cmd), + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + universal_newlines=True, + ) + output, error = sp.communicate() + status = sp.returncode + + if status != 0 or not output: resultDict["Status"] = -1 - resultDict["Message"] = "No user name" + resultDict["Message"] = error return resultDict - waitingJobs = 0 - runningJobs = 0 + jresult = json.loads(output) + resultDict["Status"] = 0 + resultDict["Waiting"] = jresult[0]["Idle"] + resultDict["Running"] = jresult[0]["Running"] + # We also need to check the hold jobs, some of them are actually waiting (e.g. for input files) + cmd = 'condor_q -json -constraint "JobStatus == 5" -attributes HoldReasonCode' sp = subprocess.Popen( - shlex.split("condor_q -submitter %s" % user), + shlex.split(cmd), stdout=subprocess.PIPE, stderr=subprocess.PIPE, universal_newlines=True, @@ -376,33 +366,42 @@ def getCEStatus(self, **kwargs): status = sp.returncode if status != 0: - if "no record" in output: - resultDict["Status"] = 0 - resultDict["Waiting"] = waitingJobs - resultDict["Running"] = runningJobs - return resultDict - resultDict["Status"] = status + resultDict["Status"] = -1 resultDict["Message"] = error return resultDict - if "no record" in output: - resultDict["Status"] = 0 - resultDict["Waiting"] = waitingJobs - resultDict["Running"] = runningJobs + # If there are no held jobs, we can return the result + if not output: return resultDict - if output: - lines = output.split("\n") - for line in lines: - if not line.strip(): - continue - if " I " in line: - waitingJobs += 1 - elif " R " in line: - runningJobs += 1 + jresult = json.loads(output) + for job_metadata in jresult: + if job_metadata["HoldReasonCode"] == 16: + resultDict["Waiting"] += 1 + + return resultDict + + def getJobOutputFiles(self, **kwargs): + """Get output file names and templates for the specific CE""" + resultDict = {} + + MANDATORY_PARAMETERS = ["JobIDList", "OutputDir", "ErrorDir"] + for argument in MANDATORY_PARAMETERS: + if argument not in kwargs: + resultDict["Status"] = -1 + resultDict["Message"] = "No %s" % argument + return resultDict + + outputDir = kwargs["OutputDir"] + errorDir = kwargs["ErrorDir"] + jobIDList = kwargs["JobIDList"] + + jobDict = {} + for jobID in jobIDList: + jobDict[jobID] = {} + jobDict[jobID]["Output"] = "%s/%s.out" % (outputDir, jobID) + jobDict[jobID]["Error"] = "%s/%s.err" % (errorDir, jobID) - # Final output resultDict["Status"] = 0 - resultDict["Waiting"] = waitingJobs - resultDict["Running"] = runningJobs + resultDict["Jobs"] = jobDict return resultDict diff --git a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py index 8b07490e17e..25581a56550 100644 --- a/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/HTCondorCEComputingElement.py @@ -50,6 +50,7 @@ import datetime import errno +import json import os import subprocess import tempfile @@ -63,7 +64,12 @@ from DIRAC.Core.Utilities.List import breakListIntoChunks from DIRAC.Core.Utilities.Subprocess import systemCall from DIRAC.FrameworkSystem.private.authorization.utils.Tokens import writeToTokenFile -from DIRAC.Resources.Computing.BatchSystems.Condor import HOLD_REASON_SUBCODE, parseCondorStatus, subTemplate +from DIRAC.Resources.Computing.BatchSystems.Condor import ( + HOLD_REASON_SUBCODE, + STATE_ATTRIBUTES, + getCondorStatus, + subTemplate, +) from DIRAC.Resources.Computing.ComputingElement import ComputingElement from DIRAC.WorkloadManagementSystem.Client import PilotStatus @@ -400,45 +406,57 @@ def getJobStatus(self, jobIDList): if isinstance(jobIDList, str): jobIDList = [jobIDList] + self.tokenFile = None resultDict = {} condorIDs = {} # Get all condorIDs so we can just call condor_q and condor_history once for jobReference in jobIDList: jobReference = jobReference.split(":::")[0] - condorIDs[jobReference] = self._jobReferenceToCondorID(jobReference) - - self.tokenFile = None + condorIDs[self._jobReferenceToCondorID(jobReference)] = jobReference - qList = [] - for _condorIDs in breakListIntoChunks(condorIDs.values(), 100): - # This will return a list of 1245.75 3 undefined undefined undefined + jobsMetadata = [] + for _condorIDs in breakListIntoChunks(condorIDs.keys(), 100): cmd = ["condor_q"] cmd.extend(self.remoteScheddOptions.strip().split(" ")) cmd.extend(_condorIDs) - cmd.extend(["-af:j", "JobStatus", "HoldReasonCode", "HoldReasonSubCode", "HoldReason"]) + cmd.extend(["-attributes", STATE_ATTRIBUTES]) + cmd.extend(["-json"]) result = self._executeCondorCommand(cmd, keepTokenFile=True) if not result["OK"]: return result - qList.extend(result["Value"].split("\n")) + if result["Value"]: + jobsMetadata.extend(json.loads(result["Value"])) condorHistCall = ["condor_history"] condorHistCall.extend(self.remoteScheddOptions.strip().split(" ")) condorHistCall.extend(_condorIDs) - condorHistCall.extend(["-af:j", "JobStatus", "HoldReasonCode", "HoldReasonSubCode", "HoldReason"]) + condorHistCall.extend(["-attributes", STATE_ATTRIBUTES]) + condorHistCall.extend(["-json"]) result = self._executeCondorCommand(cmd, keepTokenFile=True) if not result["OK"]: return result - qList.extend(result["Value"].split("\n")) + if result["Value"]: + jobsMetadata.extend(json.loads(result["Value"])) - for job, jobID in condorIDs.items(): - jobStatus, reason = parseCondorStatus(qList, jobID) + foundJobIDs = set() + for jobDict in jobsMetadata: + jobStatus, reason = getCondorStatus(jobDict) + condorId = f"{jobDict['ClusterId']}.{jobDict['ProcId']}" + jobReference = condorIDs.get(condorId) if jobStatus == PilotStatus.ABORTED: - self.log.verbose("Job", f"{jobID} held: {reason}") + self.log.verbose("Job", f"{jobReference} held: {reason}") + + resultDict[jobReference] = jobStatus + foundJobIDs.add(jobReference) - resultDict[job] = jobStatus + # Check if we have any jobs that were not found in the condor_q or condor_history + for jobReference in condorIDs.values(): + if jobReference not in foundJobIDs: + self.log.verbose("Job", f"{jobReference} not found in condor_q or condor_history") + resultDict[jobReference] = PilotStatus.UNKNOWN self.tokenFile = None diff --git a/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py b/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py index 9d0436bdbaa..d9bcefcffa7 100644 --- a/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py +++ b/src/DIRAC/Resources/Computing/test/Test_HTCondorCEComputingElement.py @@ -2,6 +2,7 @@ """ tests for HTCondorCEComputingElement module """ +import json import uuid import pytest @@ -12,18 +13,30 @@ MODNAME = "DIRAC.Resources.Computing.HTCondorCEComputingElement" -STATUS_LINES = """ -123.2 5 4 0 undefined -123.1 3 undefined undefined undefined -""".strip().split( - "\n" -) - -HISTORY_LINES = """ -123.0 4 undefined undefined undefined -""".strip().split( - "\n" -) +STATUS_QUEUE = [ + { + "ClusterId": 123, + "ProcId": 2, + "JobStatus": 5, + "HoldReasonCode": 4, + "HoldReasonSubCode": 0, + "HoldReason": "The credentials for the job are invalid", + }, + { + "ClusterId": 123, + "ProcId": 1, + "JobStatus": 3, + }, +] + + +STATUS_HISTORY = [ + { + "ClusterId": 123, + "ProcId": 0, + "JobStatus": 4, + } +] @pytest.fixture @@ -32,42 +45,47 @@ def setUp(): def test_parseCondorStatus(): - statusLines = f""" - 104098.1 1 undefined undefined undefined - 104098.2 2 undefined undefined undefined - 104098.3 3 undefined undefined undefined - 104098.4 4 undefined undefined undefined - 104098.5 5 16 57 Input data are being spooled - 104098.6 5 3 {Condor.HOLD_REASON_SUBCODE} Policy - 104098.7 5 1 0 undefined - - foo bar - 104096.1 3 16 test test - 104096.2 3 test - 104096.3 5 undefined undefined undefined - 104096.4 7 - """.strip().split( - "\n" - ) - # force there to be an empty line + statusOutput = {"ClusterId": 104098, "ProcId": 1, "JobStatus": 1} + assert HTCE.getCondorStatus(statusOutput) == ("Waiting", "") - expectedResults = { - "104098.1": "Waiting", - "104098.2": "Running", - "104098.3": "Aborted", - "104098.4": "Done", - "104098.5": "Waiting", - "104098.6": "Failed", - "104098.7": "Aborted", - "foo": "Unknown", - "104096.1": "Aborted", - "104096.2": "Aborted", - "104096.3": "Aborted", - "104096.4": "Unknown", - } + statusOutput = {"ClusterId": 104098, "ProcId": 2, "JobStatus": 2} + assert HTCE.getCondorStatus(statusOutput) == ("Running", "") + + statusOutput = {"ClusterId": 104098, "ProcId": 3, "JobStatus": 3} + assert HTCE.getCondorStatus(statusOutput) == ("Aborted", "") + + statusOutput = {"ClusterId": 104098, "ProcId": 4, "JobStatus": 4} + assert HTCE.getCondorStatus(statusOutput) == ("Done", "") - for jobID, expected in expectedResults.items(): - assert HTCE.parseCondorStatus(statusLines, jobID)[0] == expected + statusOutput = { + "ClusterId": 104098, + "ProcId": 5, + "JobStatus": 5, + "HoldReasonCode": 16, + "HoldReasonSubCode": 57, + "HoldReason": "Input data are being spooled", + } + assert HTCE.getCondorStatus(statusOutput) == ("Waiting", "Input data are being spooled") + + statusOutput = { + "ClusterId": 104098, + "ProcId": 6, + "JobStatus": 5, + "HoldReasonCode": 3, + "HoldReasonSubCode": HTCE.HOLD_REASON_SUBCODE, + "HoldReason": "Policy", + } + assert HTCE.getCondorStatus(statusOutput) == ("Failed", "Policy") + + statusOutput = { + "ClusterId": 104098, + "ProcId": 7, + "JobStatus": 5, + "HoldReasonCode": 1, + "HoldReasonSubCode": 0, + "HoldReason": "Aborted by user", + } + assert HTCE.getCondorStatus(statusOutput) == ("Aborted", "Aborted by user") def test_getJobStatus(mocker): @@ -75,8 +93,8 @@ def test_getJobStatus(mocker): mocker.patch( MODNAME + ".systemCall", side_effect=[ - S_OK((0, "\n".join(STATUS_LINES), "")), - S_OK((0, "\n".join(HISTORY_LINES), "")), + S_OK((0, json.dumps(STATUS_QUEUE), "")), + S_OK((0, json.dumps(STATUS_HISTORY), "")), S_OK((0, "", "")), S_OK((0, "", "")), ], @@ -110,7 +128,7 @@ def test_getJobStatus(mocker): def test_getJobStatusBatchSystem(mocker): """Test Condor Batch System plugin getJobStatus""" patchPopen = mocker.patch("DIRAC.Resources.Computing.BatchSystems.Condor.subprocess.Popen") - patchPopen.return_value.communicate.side_effect = [("\n".join(STATUS_LINES), ""), ("\n".join(HISTORY_LINES), "")] + patchPopen.return_value.communicate.side_effect = [(json.dumps(STATUS_QUEUE), ""), (json.dumps(STATUS_HISTORY), "")] patchPopen.return_value.returncode = 0 ret = Condor.Condor().getJobStatus(JobIDList=["123.0", "123.1", "123.2", "333.3"])