# Copyright 2019 HTCondor Team, Computer Sciences Department,
# University of Wisconsin-Madison, WI.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import datetime
import logging
import os
import pickle
import threading
from pathlib import Path
from typing import Dict, List, Tuple
import htcondor
from . import exceptions, holds, names, utils
logger = logging.getLogger(__name__)
[docs]
class ComponentStatus(utils.StrEnum):
"""
An enumeration of the possible statuses that a map component can be in.
These are mostly identical to the HTCondor job statuses of the same name.
"""
UNKNOWN = "UNKNOWN"
UNMATERIALIZED = "UNMATERIALIZED"
IDLE = "IDLE"
RUNNING = "RUNNING"
REMOVED = "REMOVED"
COMPLETED = "COMPLETED"
HELD = "HELD"
SUSPENDED = "SUSPENDED"
ERRORED = "ERRORED"
[docs]
@classmethod
def display_statuses(cls) -> Tuple["ComponentStatus", ...]:
return (
cls.HELD,
cls.ERRORED,
cls.IDLE,
cls.RUNNING,
cls.COMPLETED,
)
JOB_EVENT_STATUS_TRANSITIONS = {
htcondor.JobEventType.SUBMIT: ComponentStatus.IDLE,
htcondor.JobEventType.JOB_EVICTED: ComponentStatus.IDLE,
htcondor.JobEventType.JOB_UNSUSPENDED: ComponentStatus.IDLE,
htcondor.JobEventType.JOB_RELEASED: ComponentStatus.IDLE,
htcondor.JobEventType.SHADOW_EXCEPTION: ComponentStatus.IDLE,
htcondor.JobEventType.JOB_RECONNECT_FAILED: ComponentStatus.IDLE,
htcondor.JobEventType.JOB_TERMINATED: ComponentStatus.COMPLETED,
htcondor.JobEventType.EXECUTE: ComponentStatus.RUNNING,
htcondor.JobEventType.JOB_HELD: ComponentStatus.HELD,
htcondor.JobEventType.JOB_SUSPENDED: ComponentStatus.SUSPENDED,
htcondor.JobEventType.JOB_ABORTED: ComponentStatus.REMOVED,
}
class MapState:
def __init__(self, map):
self.map = map
self._event_reader = None # delayed until _read_events is called
self._jobid_to_component: Dict[Tuple[int, int], int] = {}
self._component_statuses = [ComponentStatus.UNMATERIALIZED for _ in self.map.components]
self._holds: Dict[int, holds.ComponentHold] = {}
self._memory_usage = [0 for _ in self.map.components]
self._runtime = [datetime.timedelta(0) for _ in self.map.components]
self._event_reader_lock = threading.Lock()
@property
def component_statuses(self) -> List[ComponentStatus]:
self._read_events()
return self._component_statuses
@property
def holds(self) -> Dict[int, holds.ComponentHold]:
self._read_events()
return self._holds
@property
def memory_usage(self) -> List[int]:
self._read_events()
return self._memory_usage
@property
def runtime(self) -> List[datetime.timedelta]:
self._read_events()
return self._runtime
@property
def _event_log_path(self):
return self.map._map_dir / names.EVENT_LOG
def _read_events(self):
with self._event_reader_lock: # no thread can be in here at the same time as another
if self._event_reader is None:
logger.debug(f"Created event log reader for map {self.map.tag}")
self._event_reader = htcondor.JobEventLog(self._event_log_path.as_posix())
with utils.Timer() as timer:
handled_events = self._handle_events()
if handled_events > 0:
logger.debug(
f"Processed {handled_events} events for map {self.map.tag} (took {timer.elapsed:.6f} seconds)"
)
self.map._local_data = None # invalidate cache if any events were received
if utils.BINDINGS_VERSION_INFO >= (8, 9, 3):
self.save()
def _handle_events(self) -> int:
"""
Process new events and return the number of new events processed.
"""
handled_events = 0
# Workaround HTCONDOR-463
os.stat(self._event_log_path.as_posix())
for event in self._event_reader.events(0):
handled_events += 1
# skip the late materialization submit event
if event.proc == -1:
continue
if event.type is htcondor.JobEventType.SUBMIT:
self._jobid_to_component[(event.cluster, event.proc)] = int(event["LogNotes"])
# this lookup is safe because the SUBMIT event always comes first
# ... but it can happen if the event log is corrupted somehow
try:
component = self._jobid_to_component[(event.cluster, event.proc)]
except KeyError as e:
raise exceptions.CorruptEventLog(
f"Found an event for a job that we never saw a submit event for:\n{event}"
) from e
if event.type is htcondor.JobEventType.IMAGE_SIZE:
self._memory_usage[component] = max(
self._memory_usage[component], int(event.get("MemoryUsage", 0)),
)
elif event.type is htcondor.JobEventType.JOB_TERMINATED:
self._runtime[component] = parse_runtime(event["RunRemoteUsage"])
elif event.type is htcondor.JobEventType.JOB_RELEASED:
self._holds.pop(component, None)
elif event.type is htcondor.JobEventType.JOB_HELD:
h = holds.ComponentHold(
code=int(event["HoldReasonCode"]),
reason=event.get("HoldReason", "UNKNOWN").strip(),
)
self._holds[component] = h
new_status = JOB_EVENT_STATUS_TRANSITIONS.get(event.type, None)
# the component has *terminated*, but did it error?
if new_status is ComponentStatus.COMPLETED:
try:
exec_status = self.map._peek_status(component)
except exceptions.OutputNotFound:
logger.warning(
f"Output was not found for component {component} for map {self.map.tag}, marking as errored"
)
exec_status = "ERR"
if exec_status == "ERR":
new_status = ComponentStatus.ERRORED
if new_status is not None:
if new_status is self._component_statuses[component]:
logger.warning(
f"Component {component} of map {self.map.tag} tried to transition into the state it is already in ({new_status})"
)
else:
# this log is commented-out because its very verbose
# might be helpful when debugging
# logger.debug(f'Component {component} of map {self.map.tag} changed state: {self._component_statuses[component]} -> {new_status}')
self._component_statuses[component] = new_status
return handled_events
def save(self) -> Path:
final_path = self.map._map_dir / names.MAP_STATE
working_path = final_path.with_suffix(".working")
with working_path.open(mode="wb") as f:
pickle.dump(self, f, protocol=-1)
working_path.rename(final_path)
logger.debug(f"Saved map state for map {self.map.tag}")
return final_path
@staticmethod
def load(map):
if utils.BINDINGS_VERSION_INFO < (8, 9, 3):
raise exceptions.InsufficientHTCondorVersion(
"Map state can only be saved with HTCondor 8.9.3 or greater"
)
with (map._map_dir / names.MAP_STATE).open(mode="rb") as f:
state = pickle.load(f)
state.map = map
return state
def __getstate__(self):
d = self.__dict__.copy()
d.pop("_event_reader_lock")
d.pop("map")
return d
def __setstate__(self, state):
self.__dict__ = state
self._event_reader_lock = threading.Lock()
# note: the map reference is restored in the load method
def parse_runtime(runtime_string: str) -> datetime.timedelta:
(_, usr_days, usr_hms), (_, sys_days, sys_hms) = [s.split() for s in runtime_string.split(",")]
usr_h, usr_m, usr_s = usr_hms.split(":")
sys_h, sys_m, sys_s = sys_hms.split(":")
usr_time = datetime.timedelta(
days=int(usr_days), hours=int(usr_h), minutes=int(usr_m), seconds=int(usr_s),
)
sys_time = datetime.timedelta(
days=int(sys_days), hours=int(sys_h), minutes=int(sys_m), seconds=int(sys_s),
)
return usr_time + sys_time