Source code for htmap.maps

# Copyright 2018 HTCondor Team, Computer Sciences Department,
# University of Wisconsin-Madison, WI.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import collections.abc
import datetime
import functools
import inspect
import logging
import shutil
import time
import weakref
from copy import copy
from pathlib import Path
from typing import Any, Dict, Iterable, Iterator, List, Mapping, MutableMapping, Optional, Tuple

import classad
import htcondor
from tqdm import tqdm

from . import condor, errors, exceptions, holds, htio, mapping, names, settings, state, tags, utils

logger = logging.getLogger(__name__)


def _protector(method):
    @functools.wraps(method)
    def _protect(self, *args, **kwargs):
        if not self.exists:
            raise exceptions.MapWasRemoved(
                f"Cannot call {method} for map {self.tag} because it has been removed"
            )
        return method(self, *args, **kwargs)

    return _protect


def _protect_map_after_remove(result_class):
    # decorate all public instance methods
    for key, member in inspect.getmembers(result_class, predicate=inspect.isfunction):
        if not key.startswith("_"):
            setattr(result_class, key, _protector(member))

    return result_class


# this set is used in Map.load to make Maps singletons
MAPS = weakref.WeakSet()


def maps_by_tag() -> Dict[str, "Map"]:
    """
    Get the current mapping of tags to map objects.

    Don't try to cache the results of this function; always get it fresh.
    This lets it smoothly handle retagging.
    """
    return {m.tag: m for m in MAPS}


[docs] @_protect_map_after_remove class Map(collections.abc.Sequence): """ Represents the results from a map call. .. warning :: You should never instantiate a :class:`Map` directly! Instead, you'll get your :class:`Map` by calling a top-level mapping function like :func:`htmap.map`, a :class:`MappedFunction` mapping method, or by using :func:`htmap.load`. We are not responsible for whatever vile contraption you build if you bypass the correct methods! """ def __init__( self, *, tag: str, map_dir: Path, ): self.tag = tag self._map_dir = map_dir try: self._state = state.MapState.load(self) logger.debug(f"Loaded existing map state for map {self.tag}") except (FileNotFoundError, exceptions.InsufficientHTCondorVersion): self._state = state.MapState(self) except IOError as e: logger.debug(f"Failed to read existing map state for map {self.tag} because: {repr(e)}") self._state = state.MapState(self) self._local_data: Optional[int] = None self._stdout: MapStdOut = MapStdOut(self) self._stderr: MapStdErr = MapStdErr(self) self._output_files: MapOutputFiles = MapOutputFiles(self) MAPS.add(self) @property def _cluster_ids(self): return htio.load_cluster_ids(self._map_dir) @property def _num_components(self): return htio.load_num_components(self._map_dir)
[docs] @classmethod def load(cls, tag: str) -> "Map": """ Load a :class:`Map` by looking up its ``tag``. Raises :class:`htmap.exceptions.TagNotFound` if the ``tag`` does not exist. Parameters ---------- tag The ``tag`` to search for. Returns ------- map The map with the given ``tag``. """ try: # if we already have this map in memory, return that object instead return maps_by_tag()[tag] except KeyError: map_dir = mapping.tag_to_map_dir(tag) logger.debug(f"Loaded map {tag} from {map_dir}") return cls(tag=tag, map_dir=map_dir,)
def __repr__(self): return f"{self.__class__.__name__}(tag = {self.tag})" def __gt__(self, other): return self.tag > other.tag def __lt__(self, other): return self.tag < other.tag def __ge__(self, other): return self.tag >= other.tag def __le__(self, other): return self.tag <= other.tag
[docs] def __len__(self): """The length of a :class:`Map` is the number of components it contains.""" return self._num_components
def __contains__(self, component: Any) -> bool: return component in range(self._num_components) @property def _tag_file_path(self) -> Path: return tags.tag_file_path(self.tag) @property def _inputs_dir(self) -> Path: """The path to the inputs directory, inside the map directory.""" return self._map_dir / names.INPUTS_DIR @property def _outputs_dir(self) -> Path: """The path to the outputs directory, inside the map directory.""" return self._map_dir / names.OUTPUTS_DIR def _input_file_path(self, component: int) -> Path: return self._inputs_dir / f"{component}.{names.INPUT_EXT}" def _output_file_path(self, component: int) -> Path: return self._outputs_dir / f"{component}.{names.OUTPUT_EXT}" @property def _job_logs_dir(self) -> Path: return self._map_dir / names.JOB_LOGS_DIR def _stdout_file_path(self, component: int) -> Path: return self._job_logs_dir / f"{component}.{names.STDOUT_EXT}" def _stderr_file_path(self, component: int) -> Path: return self._job_logs_dir / f"{component}.{names.STDERR_EXT}" @property def _user_output_files_dir(self): return self._map_dir / names.OUTPUT_FILES_DIR def _user_output_files_path(self, component: int) -> Path: return self._user_output_files_dir / str(component) @property def components(self) -> Tuple[int, ...]: """Return a tuple containing the component indices for the :class:`htmap.Map`.""" return tuple(range(self._num_components)) @property def is_done(self) -> bool: """``True`` if all of the output is available for this map.""" return all(cs is state.ComponentStatus.COMPLETED for cs in self.component_statuses) @property def is_active(self) -> bool: """``True`` if any map components are not complete (or errored!).""" return any( cs not in (state.ComponentStatus.COMPLETED, state.ComponentStatus.ERRORED) for cs in self.component_statuses )
[docs] def wait( self, timeout: utils.Timeout = None, show_progress_bar: bool = False, holds_ok: bool = False, errors_ok: bool = False, ) -> None: """ Wait until all output associated with this :class:`Map` is available. If any components in the map are held or experience an execution error, this method will raise an exception (:class:`htmap.exceptions.MapComponentHeld` or :class:`htmap.exceptions.MapComponentError`, respectively). Parameters ---------- timeout How long to wait for the map to complete before raising a :class:`htmap.exceptions.TimeoutError`. If ``None``, wait forever. show_progress_bar If ``True``, a progress bar will be displayed. holds_ok If ``True``, will not raise exceptions if components are held. errors_ok If ``True``, will not raise exceptions if components experience execution errors. """ start_time = time.time() timeout = utils.timeout_to_seconds(timeout) try: if show_progress_bar: pbar = tqdm(desc=self.tag, total=len(self), unit="component", ascii=True,) previous_pbar_len = 0 ok_statuses = {state.ComponentStatus.COMPLETED} if holds_ok: ok_statuses.add(state.ComponentStatus.HELD) if errors_ok: ok_statuses.add(state.ComponentStatus.ERRORED) while True: num_incomplete = sum(cs not in ok_statuses for cs in self.component_statuses) if show_progress_bar: pbar_len = self._num_components - num_incomplete pbar.update(pbar_len - previous_pbar_len) previous_pbar_len = pbar_len if num_incomplete == 0: break for component, status in enumerate(self.component_statuses): if status is state.ComponentStatus.HELD and not holds_ok: raise exceptions.MapComponentHeld( f"Component {component} of map {self.tag} was held. Reason: {self.holds[component]}" ) elif status is state.ComponentStatus.ERRORED and not errors_ok: raise exceptions.MapComponentError( f"Component {component} of map {self.tag} encountered error while executing. Error report:\n{self._load_error(component).report()}" ) if timeout is not None and time.time() - timeout > start_time: raise exceptions.TimeoutError(f"Timeout while waiting for {self}") time.sleep(settings["WAIT_TIME"]) finally: if show_progress_bar: pbar.close()
def _wait_for_component(self, component: int, timeout: utils.Timeout = None) -> None: """ Wait for a map component to terminate, which could either be because it completes successfully or encounters an error during execution. """ timeout = utils.timeout_to_seconds(timeout) start_time = time.time() while True: component_status = self.component_statuses[component] if component_status in ( state.ComponentStatus.COMPLETED, state.ComponentStatus.ERRORED, ): break elif component_status is state.ComponentStatus.HELD: raise exceptions.MapComponentHeld( f"Component {component} of map {self.tag} is held: {self.holds[component]}" ) if timeout is not None and (time.time() >= start_time + timeout): if timeout <= 0: raise exceptions.OutputNotFound( f"Output for component {component} of map {self.tag} not found" ) else: raise exceptions.TimeoutError( f"Timed out while waiting for component {component} of map {self.tag}" ) time.sleep(settings["WAIT_TIME"]) def _load_input(self, component: int) -> Tuple[Tuple[Any], Dict[str, Any]]: return htio.load_object(self._input_file_path(component)) def _peek_status(self, component: int,) -> str: try: return htio.load_object(self._output_file_path(component)) except FileNotFoundError as e: raise exceptions.OutputNotFound( f"Output for component {component} of map {self.tag} not found" ) from e def _load_output(self, component: int, timeout: utils.Timeout = None,) -> Any: """ Try to load a map component as if it succeeded. If the component actually failed, raise :class:`MapComponentError`. """ if component not in range(0, len(self)): raise IndexError( f"Tried to get output for component {component}, but map {self.tag} only has {len(self)} components" ) self._wait_for_component(component, timeout) status_and_result = htio.load_objects(self._output_file_path(component)) status = next(status_and_result) if status == "OK": return next(status_and_result) elif status == "ERR": raise exceptions.MapComponentError( f"Component {component} of map {self.tag} encountered error while executing. Error report:\n{self._load_error(component).report()}" ) else: raise exceptions.InvalidOutputStatus(f"Output status {status} is not valid") def _load_error(self, component: int, timeout: utils.Timeout = None,) -> errors.ComponentError: """ Try to load a map component as if it failed. If the component actually succeeded, raise :class:`ExpectedError`. """ self._wait_for_component(component, timeout) status_and_raw_error = htio.load_objects(self._output_file_path(component)) status = next(status_and_raw_error) if status == "OK": raise exceptions.ExpectedError( f"Tried to load component {component} as an error, but it succeeded" ) elif status == "ERR": return errors.ComponentError._from_raw_error(self, next(status_and_raw_error)) else: raise exceptions.InvalidOutputStatus(f"Output status {status} is not valid")
[docs] def get(self, component: int, timeout: utils.Timeout = None,) -> Any: """ Return the output associated with the input component index. If the component experienced an execution error, this will raise :class:`htmap.exceptions.MapComponentError`. Use :meth:`get_err`, :meth:`errors`, :meth:`error_reports` to see what went wrong! Parameters ---------- component The index of the input to get the output for. timeout How long to wait for the output to exist before raising a :class:`htmap.exceptions.TimeoutError`. If ``None``, wait forever. """ return self._load_output(component, timeout=timeout)
[docs] def __getitem__(self, item: int) -> Any: """Return the output associated with the input index. Does not block.""" return self.get(item, timeout=0)
[docs] def get_err(self, component: int, timeout: utils.Timeout = None,) -> errors.ComponentError: """ Return the error associated with the input component index. If the component actually succeeded, this will raise :class:`htmap.exceptions.ExpectedError`. Parameters ---------- component The index of the input to get the output for. timeout How long to wait for the output to exist before raising a :class:`htmap.exceptions.TimeoutError`. If ``None``, wait forever. """ return self._load_error(component, timeout=timeout)
def __iter__(self) -> Iterator[Any]: """ Iterating over the :class:`htmap.Map` yields the outputs in the same order as the inputs, waiting on each individual output to become available. """ yield from self.iter()
[docs] def iter(self, timeout: utils.Timeout = None,) -> Iterator[Any]: """ Returns an iterator over the output of the :class:`htmap.Map` in the same order as the inputs, waiting on each individual output to become available. Parameters ---------- timeout How long to wait for each output to be available before raising a :class:`htmap.exceptions.TimeoutError`. If ``None``, wait forever. """ for component in self.components: yield self._load_output(component, timeout=timeout)
[docs] def iter_with_inputs( self, timeout: utils.Timeout = None, ) -> Iterator[Tuple[Tuple[tuple, Dict[str, Any]], Any]]: """ Returns an iterator over the inputs and output of the :class:`htmap.Map` in the same order as the inputs, waiting on each individual output to become available. Parameters ---------- timeout How long to wait for each output to be available before raising a :class:`htmap.exceptions.TimeoutError`. If ``None``, wait forever. """ for component in self.components: output = self._load_output(component, timeout=timeout) input = self._load_input(component) yield input, output
[docs] def iter_as_available(self, timeout: utils.Timeout = None,) -> Iterator[Any]: """ Returns an iterator over the output of the :class:`htmap.Map`, yielding individual outputs as they become available. The iteration order is initially random, but is consistent within a single interpreter session once the map is completed. Parameters ---------- timeout How long to wait for the entire iteration to complete before raising a :class:`htmap.exceptions.TimeoutError`. If ``None``, wait forever. """ timeout = utils.timeout_to_seconds(timeout) start_time = time.time() remaining_indices = set(self.components) while len(remaining_indices) > 0: for component in copy(remaining_indices): try: output = self._load_output(component, timeout=0) remaining_indices.remove(component) yield output except exceptions.OutputNotFound: pass if timeout is not None and time.time() > start_time + timeout: raise exceptions.TimeoutError("Timed out while waiting for more output") time.sleep(settings["WAIT_TIME"])
[docs] def iter_as_available_with_inputs( self, timeout: utils.Timeout = None, ) -> Iterator[Tuple[Tuple[tuple, Dict[str, Any]], Any]]: """ Returns an iterator over the inputs and output of the :class:`htmap.Map`, yielding individual ``(input, output)`` pairs as they become available. The iteration order is initially random, but is consistent within a single interpreter session once the map is completed. Parameters ---------- timeout How long to wait for the entire iteration to complete before raising a :class:`htmap.exceptions.TimeoutError`. If ``None``, wait forever. """ timeout = utils.timeout_to_seconds(timeout) start_time = time.time() remaining_indices = set(self.components) while len(remaining_indices) > 0: for component in copy(remaining_indices): try: output = self._load_output(component, timeout=0) input = self._load_input(component) remaining_indices.remove(component) yield input, output except exceptions.OutputNotFound: pass if timeout is not None and time.time() > start_time + timeout: raise exceptions.TimeoutError("Timed out while waiting for more output") time.sleep(settings["WAIT_TIME"])
[docs] def iter_inputs(self) -> Iterator[Any]: """Returns an iterator over the inputs of the :class:`htmap.Map`.""" return (self._load_input(idx) for idx in self.components)
def _requirements(self, requirements: Optional[str] = None) -> str: """Build an HTCondor requirements expression that captures all of the ``cluster_id`` for this map.""" base = f"({' || '.join(f'ClusterId=={cid}' for cid in self._cluster_ids)})" extra = f" && {requirements}" if requirements is not None else "" return base + extra def _query( self, requirements: Optional[str] = None, projection: Optional[List[str]] = None, ) -> Iterator[classad.ClassAd]: """ Perform a _query against the HTCondor cluster to get information about the map jobs. Parameters ---------- requirements A ClassAd expression to use as the requirements for the _query. In addition to whatever restrictions given in this expression, the _query will only target the jobs for this map. projection The ClassAd attributes to return from the _query. Returns ------- classads An iterator of matching :class:`classad.ClassAd`, with only the projected fields. """ if projection is None: projection = [] req = self._requirements(requirements) schedd = condor.get_schedd() q = schedd.xquery(requirements=req, projection=projection,) logger.debug( f'Queried for map {self.tag} (requirements = "{req}") with projection {projection}' ) yield from q @property def component_statuses(self) -> List[state.ComponentStatus]: """ Return the current :class:`state.ComponentStatus` of each component in the map. """ return self._state.component_statuses
[docs] def components_by_status(self) -> Mapping[state.ComponentStatus, Tuple[int, ...]]: """ Return the component indices grouped by their states. Examples -------- This example finds the completed jobs for a submitted map, and processes those results: .. code:: python from time import sleep import htmap def job(x): sleep(x) return 1 / x m = htmap.map(job, [0, 2, 4, 6, 8], tag="foo") # Wait for all jobs to finish. # Alternatively, use `futures = htmap.load("foo")` on a different process sleep(10) completed = m.components_by_status()[htmap.JobStatus.COMPLETED] for component in completed: result = m.get(future) # Whatever processing needs to be done print(result) # prints "2", "4", "6", and "8" """ status_to_components: MutableMapping[ state.ComponentStatus, List[int] ] = collections.defaultdict(list) for component, status in enumerate(self.component_statuses): status_to_components[status].append(component) return { status: tuple(sorted(components)) for status, components in status_to_components.items() }
[docs] def status(self) -> str: """Return a string containing the number of jobs in each status.""" counts = collections.Counter(self.component_statuses) stat = " | ".join( f"{str(js)} = {counts[js]}" for js in state.ComponentStatus.display_statuses() ) msg = f"{self.__class__.__name__} {self.tag} ({len(self)} components): {stat}" return utils.rstr(msg)
@property def holds(self) -> Dict[int, holds.ComponentHold]: """ A dictionary of component indices to their :class:`Hold` (if they are held). """ return self._state.holds
[docs] def hold_report(self) -> str: """ Return a string containing a formatted table describing any held components. """ headers = ["Component", "Code", "Hold Reason"] rows = [(component, hold.code, hold.reason) for component, hold in self.holds.items()] return utils.table( headers=headers, rows=rows, alignment={"Component": "ljust", "Hold Reason": "ljust",}, )
@property def errors(self) -> Dict[int, errors.ComponentError]: """ A dictionary of component indices to their :class:`ExecutionError` (if that component experienced an error). """ err = {} for idx in self.components: try: err[idx] = self.get_err(idx) except ( exceptions.OutputNotFound, exceptions.ExpectedError, exceptions.MapComponentHeld, ) as e: pass return err
[docs] def error_reports(self) -> Iterator[str]: """ Yields the error reports for any components that experienced an error during execution. """ for idx in self.components: try: yield self.get_err(idx, timeout=0).report() except ( exceptions.OutputNotFound, exceptions.ExpectedError, exceptions.TimeoutError, exceptions.MapComponentHeld, ) as e: pass
@property def memory_usage(self) -> List[int]: """ Return the latest peak memory usage of each map component, measured in MB. A component that hasn't reported yet will show a ``0``. .. warning:: Due to current limitations in HTCondor, memory use for very short-lived components (<5 seconds) will not be accurate. """ return self._state.memory_usage @property def runtime(self) -> List[datetime.timedelta]: """Return the total runtime (user + system) of each component.""" return self._state.runtime @property def local_data(self) -> int: """Return the number of bytes stored on the local disk by the map.""" # this cache is invalidated by the state reader loop when appropriate if self._local_data is None: logger.debug( f"Getting map directory size for map {self.tag} (map directory is {self._map_dir})" ) with utils.Timer() as timer: self._local_data = utils.get_dir_size(self._map_dir, safe=False) logger.debug( f"Map directory size for map {self.tag} is {utils.num_bytes_to_str(self._local_data)} (took {timer.elapsed:.6f} seconds)" ) return self._local_data def _act( self, action: htcondor.JobAction, requirements: Optional[str] = None, ) -> classad.ClassAd: """Perform an action on all of the jobs associated with this map.""" if not self.is_active: return classad.ClassAd() schedd = condor.get_schedd() req = self._requirements(requirements) a = schedd.act(action, req) logger.debug(f'Acted on map {self.tag} (requirements = "{req}") with action {action}') return a
[docs] def remove(self, force: bool = False) -> None: """ This command removes a map from the Condor queue. Functionally, this command aborts a job. This function will completely remove a map from the Condor queue regardless of job state (running, executing, waiting, etc). All data associated with a removed map is permanently deleted. Parameters ---------- force If ``True``, do not wait for HTCondor to remove the map components before removing local data. """ try: self._remove_from_queue() except Exception as e: if not force: raise e logger.exception( f"Encountered error while force-removing map {self.tag}; ignoring and moving to cleanup step" ) self._cleanup_local_data(force=force) MAPS.remove(self) logger.info(f"Removed map {self.tag}")
def _remove_from_queue(self) -> classad.ClassAd: return self._act(htcondor.JobAction.Remove) def _cleanup_local_data(self, force: bool = False) -> None: """ Remove all of the local data associated with this map. Parameters ---------- force If ``True``, do not wait for HTCondor to confirm that all map components have been removed from the queue. """ if not force: while not all( cs in ( state.ComponentStatus.REMOVED, state.ComponentStatus.COMPLETED, state.ComponentStatus.ERRORED, state.ComponentStatus.UNMATERIALIZED, ) for cs in self.component_statuses ): time.sleep(settings["WAIT_TIME"]) # move the tagfile to the removed tags dir # renamed by uid to prevent duplicates removed_tagfile = ( Path(settings["HTMAP_DIR"]) / names.REMOVED_TAGS_DIR / self._tag_file_path.read_text() ) self._tag_file_path.rename(removed_tagfile) logger.debug(f"Moved tag file for map {self.tag} to the removed tags directory") # 5 attempts to remove the map directory for _ in range(5): try: shutil.rmtree(self._map_dir) logger.debug(f"Removed map directory for map {self.tag}") # only delete the tagfile after removing the map dir # if we don't get here, htmap.clean() will look for the "removed" # tagfile in the removed tags dir and cleanup removed_tagfile.unlink() logger.debug(f"Removed tag file for map {self.tag}") return # break out of the loop except OSError: logger.exception( f'Failed to remove map directory for map {self.tag}, retrying in {settings["WAIT_TIME"]} seconds' ) time.sleep(settings["WAIT_TIME"]) logger.error( f"Failed to remove map directory for map {self.tag}, run htmap.clean() to try to remove later" ) @property def exists(self) -> bool: """ ``True`` if and only if the map has **not** been successfully removed. Otherwise, ``False``. """ return self._map_dir.exists()
[docs] def hold(self) -> None: """ This command holds a map. The components of the map will not be allowed to run until released (see :func:`Map.release`). HTCondor may itself hold your map components if it detects that something has gone wrong with them. Resolve the underlying problem, then use the :func:`Map.release` command to allow the components to run again. """ self._act(htcondor.JobAction.Hold) logger.debug(f"Held map {self.tag}")
[docs] def release(self) -> None: """ This command releases a map, undoing holds (see :func:`Map.hold`). The held components of a released map will become idle again. HTCondor may itself hold your map components if it detects that something has gone wrong with them. Resolve the underlying problem, then use this command to allow the components to run again. """ self._act(htcondor.JobAction.Release) logger.debug(f"Released map {self.tag}")
[docs] def pause(self) -> None: """ This command pauses a map. The running components of a paused map will keep their resource claims, but will stop actively executing. The map can be un-paused by resuming it (see the :func:`Map.resume` command). """ self._act(htcondor.JobAction.Suspend) logger.debug(f"paused map {self.tag}")
[docs] def resume(self) -> None: """ This command resumes a map (reverses the :func:`Map.pause` command). The running components of a resumed map will resume execution on their claimed resources. """ self._act(htcondor.JobAction.Continue) logger.debug(f"Resumed map {self.tag}")
[docs] def vacate(self) -> None: """ This command vacates a map. The running components of a vacated map will give up their claimed resources and become idle again. Checkpointing maps will still have access to their last checkpoint, and will resume from it as if execution was interrupted for any other reason. """ self._act(htcondor.JobAction.Vacate) logger.debug(f"Vacated map {self.tag}")
def _edit(self, attr: str, value: str, requirements: Optional[str] = None) -> None: if not self.is_active: return schedd = condor.get_schedd() schedd.edit(self._requirements(requirements), attr, value) logger.debug(f"Set attribute {attr} for map {self.tag} to {value}")
[docs] def set_memory(self, memory: int) -> None: """ Change the amount of memory (RAM) each map component needs. .. warning:: Edits do not affect components that are currently running. To "restart" components so that they see the new attribute value, consider vacating their map (see the vacate command). Parameters ---------- memory The amount of memory (RAM) to request, as an integer number of MB. """ self._edit("RequestMemory", str(memory))
[docs] def set_disk(self, disk: int) -> None: """ Change the amount of disk space each map component needs. .. warning:: Edits do not affect components that are currently running. To "restart" components so that they see the new attribute value, consider vacating their map (see the vacate command). Parameters ---------- disk The amount of disk space to request, as an integer number of KB. """ self._edit("RequestDisk", str(disk))
def _submit(self, components: Optional[Iterable[int]] = None) -> None: if components is None: components = self.components components = sorted(components) itemdata = htio.load_itemdata(self._map_dir) sliced_itemdata = [item for item in itemdata if int(item["component"]) in components] submit_obj = htio.load_submit(self._map_dir) new_cluster_id = mapping.execute_submit(submit_obj, sliced_itemdata,) # if we fail to write the cluster id for any reason, abort the submit try: htio.append_cluster_id(self._map_dir, new_cluster_id) except BaseException as e: condor.get_schedd().act(htcondor.JobAction.Remove, f"ClusterId=={new_cluster_id}") logger.debug( f"Submitted {len(sliced_itemdata)} components (out of {self._num_components}) from map {self.tag}" )
[docs] def rerun(self, components: Optional[Iterable[int]] = None) -> None: """ Re-run (part of) the map from scratch. The selected components must be completed or errored. Any existing output of re-run components is removed; they are re-submitted to the HTCondor queue with their original map options (i.e., without any subsequent edits). Parameters ---------- components The components to rerun. If ``None``, the entire map will be re-run. """ if components is None: components = self.components components = set(components) legal_components = set(self.components) bad_components = components.difference(legal_components) if len(bad_components) > 0: raise exceptions.CannotRerunComponents( f"Cannot rerun components {bad_components} because they are not in the map" ) cant_be_rerun = { c for c, status in enumerate(self.component_statuses) if status not in (state.ComponentStatus.COMPLETED, state.ComponentStatus.ERRORED) } intersection = components.intersection(cant_be_rerun) if len(intersection) != 0: raise exceptions.CannotRerunComponents( f"Cannot rerun components {sorted(intersection)} of map {self.tag} because they are not complete" ) for path in (self._output_file_path(c) for c in components): try: path.unlink() except FileNotFoundError: pass for path in (self.output_files[c] for c in components): shutil.rmtree(path, ignore_errors=True) self._submit(components=components)
[docs] def retag(self, tag: str) -> None: """ Give this map a new ``tag``. The old ``tag`` will be available for re-use immediately. Retagging a map makes it not transient. Maps that have never had an explicit tag given to them are transient and can be easily cleaned up via the clean command. Parameters ---------- tag The ``tag`` to assign to the map. """ if tag == self.tag: raise exceptions.CannotRetagMap("Cannot retag a map to the same tag it already has") try: tags.raise_if_tag_is_invalid(tag) tags.raise_if_tag_already_exists(tag) except (exceptions.InvalidTag, exceptions.TagAlreadyExists) as e: raise exceptions.CannotRetagMap( f"Cannot retag map because of previous exception: {e}" ) from e submit_obj = htio.load_submit(self._map_dir) submit_obj["JobBatchName"] = tag htio.save_submit(self._map_dir, submit_obj) # self._edit('JobBatchName', tag) # todo: this doesn't seem to work as expected self._tag_file_path.rename(tags.tag_file_path(tag)) self._make_persistent() # must do this after everything else, because some of the things above # reference paths based on the tag self.tag = tag
@property def _transient_marker(self) -> Path: return self._map_dir / names.TRANSIENT_MARKER @property def is_transient(self) -> bool: """``True`` is the map is transient, ``False`` otherwise.""" return self._transient_marker.exists() def _make_transient(self): self._transient_marker.touch(exist_ok=True) def _make_persistent(self): if self.is_transient: self._transient_marker.unlink() @property def stdout(self) -> "MapStdOut": """ A sequence containing the ``stdout`` for each map component. You can index into it (with a component index) to get the ``stdout`` for that component, or iterate over the sequence to get all of the ``stdout`` from the map. """ return self._stdout @property def stderr(self) -> "MapStdErr": """ A sequence containing the ``stderr`` for each map component. You can index into it (with a component index) to get the ``stderr`` for that component, or iterate over the sequence to get all of the ``stderr`` from the map. """ return self._stderr @property def output_files(self) -> "MapOutputFiles": """ A sequence containing the path to the directory containing the output files for each map component. You can index into it (with a component index) to get the path for that component, or iterate over the sequence to get all of the paths from the map. """ return self._output_files
class MapStdX(collections.abc.Sequence): """ An object that helps implement a map's sequence over its ``stdout`` or ``stdin``. Don't both instantiating one yourself: use the ``Map.stdout`` or ``Map.stderr`` attributes instead. """ _func: Optional[str] = None def __init__(self, map): self.map = map def __len__(self): return len(self.map) def __getitem__(self, component: Any) -> str: try: return self.get(component, timeout=0) except exceptions.TimeoutError as e: raise FileNotFoundError( f"Standard output/error for component {component} of map {self.map.tag} is not available yet." ) from e def __contains__(self, component: Any) -> bool: return component in self.map def get(self, component: int, timeout: utils.Timeout = None,) -> str: """ Return a string containing the stdout/stderr from a single map component. Parameters ---------- component The index of the map component to look up. timeout How long to wait before raising a :class:`htmap.exceptions.TimeoutError`. If ``None``, wait forever. Returns ------- stdx : The standard output/error of the map component. """ if component not in range(0, len(self)): raise IndexError( f"Tried to get stdout/err file for component {component}, but map {self.map} only has {len(self.map)} components" ) path = getattr(self.map, f"_{self._func}_file_path")(component) utils.wait_for_path_to_exist( path, timeout=timeout, wait_time=settings["WAIT_TIME"], ) return utils.rstr(path.read_text())
[docs] class MapStdOut(MapStdX): """ An object that helps implement a map's sequence over its ``stdout``. Don't both instantiating one yourself: use the ``Map.stdout`` attribute instead. """ _func = "stdout"
[docs] class MapStdErr(MapStdX): """ An object that helps implement a map's sequence over its ``stderr``. Don't both instantiating one yourself: use the ``Map.stderr`` attribute instead. """ _func = "stderr"
[docs] class MapOutputFiles: """ An object that helps implement a map's sequence over its output file directories. Don't both instantiating one yourself: use the ``Map.output_files`` attribute instead. """ def __init__(self, map): self.map = map def __len__(self): return len(self.map) def __getitem__(self, component: int) -> Path: try: return self.get(component, timeout=0) except exceptions.TimeoutError as e: raise FileNotFoundError( f"The output file directory for component {component} of map {self.map.tag} is not available yet." ) from e def __contains__(self, component: int) -> bool: return component in self.map
[docs] def get(self, component: int, timeout: utils.Timeout = None,) -> Path: """ Return the :class:`pathlib.Path` to the directory containing the output files for the given component. Parameters ---------- component The index of the map component to look up. timeout How long to wait before raising a :class:`htmap.exceptions.TimeoutError`. If ``None``, wait forever. Returns ------- path : The path to the directory containing the output files for the given component. """ if component not in range(0, len(self)): raise IndexError( f"Tried to get output files for component {component}, but map {self.map} only has {len(self.map)} components" ) path = self.map._user_output_files_path(component) utils.wait_for_path_to_exist( path, timeout=timeout, wait_time=settings["WAIT_TIME"], ) return path