Source code for htmap.options

# Copyright 2018 HTCondor Team, Computer Sciences Department,
# University of Wisconsin-Madison, WI.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#    http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import collections
import hashlib
import logging
import shutil
import sys
from pathlib import Path
from typing import Callable, Dict, Iterable, List, Optional, Tuple, Union

import htcondor

from . import exceptions, names, settings, transfer, utils
from .types import REMAPS, TRANSFER_PATH

logger = logging.getLogger(__name__)

BASE_OPTIONS_FUNCTION_BY_DELIVERY = {}
SETUP_FUNCTION_BY_DELIVERY = {}

REQUIREMENTS = "requirements"


[docs] class MapOptions(collections.UserDict): RESERVED_KEYS = { "jobbatchname", "universe", "arguments", "executable", "transfer_executable", "log", "submit_event_notes", "stdout", "stderr", "when_to_transfer_output", "transfer_output_files", "transfer_output_remaps", "transfer_input_files", "should_transfer_files", "component", "+component", "MY.component", "IsHTMapJob", "+IsHTMapJob", "MY.IsHTMapJob", } def __init__( self, *, fixed_input_files: Optional[Union[TRANSFER_PATH, Iterable[TRANSFER_PATH]]] = None, input_files: Optional[ Union[Iterable[TRANSFER_PATH], Iterable[Iterable[TRANSFER_PATH]]] ] = None, output_remaps: Optional[Union[REMAPS, Iterable[REMAPS]]] = None, custom_options: Optional[Dict[str, str]] = None, **kwargs: Union[str, Iterable[str]], ): """ Parameters ---------- fixed_input_files A single file, or an iterable of files, to send to all components of the map. input_files An iterable of single files or iterables of files to map over. This may be useful if you want additional files to be sent to each map component, but don't want them in your mapped function's arguments. output_remaps A dictionary, or an iterable of dictionaries, specifying output transfer remaps. A remapped output file is sent to a specified destination instead of back to the submit machine. If a single dictionary is passed, it will be applied to every map component (in this case, you may want to use the ``$(component)`` submit macro to differentiate them). Each dictionary should be a "mapping" between the **names** (last path component, as a string) of o utput files and their **destinations**, given as a :class:`TransferPath`. You must still call :func:`transfer_output_files` on the files for the them to be transferred at all; listing them here *only* sets up the remapping. custom_options A dictionary of submit descriptors that are *not* built-in HTCondor descriptors. These are the descriptors that, if you were writing a submit file, would have a leading ``+`` or ``MY.``. The leading characters are unnecessary here, but can be included if you'd like. kwargs Additional keyword arguments are interpreted as HTCondor submit descriptors. Values that are single strings are used for all components of the map. Providing an iterable for the value will map that option. Certain keywords are reserved for internal use (see the RESERVED_KEYS class attribute). Notes ----- .. warning:: The representation of the values in ``fixed_input_files``, ``input_files``, ``custom_options`` and ``kwargs`` should exactly match the characters in the submit file after the ``=``. For example, let's say your job requires this submit file: .. code:: # file: job.submit foo = "bar" aaa = xyz bbb = false ccc = 1 The ``MapOptions`` that express the same submit options would be: .. code:: pycon >>> options = {"foo": '"bar"', "aaa": "xyz", "bbb": "false", "ccc": "1"} >>> print(options["foo"]) # exactly matches the value in the submit file ... "bar" >>> options["foo"] = "\\"bar\\"" # alternative value >>> MapOptions(**options) Submit file values with quotes require escaped quotes in the Python string. """ self._check_keyword_arguments(kwargs) if custom_options is None: custom_options = {} cleaned_custom_options = { key.lower().replace("+", "").replace("my.", ""): val for key, val in custom_options.items() } self._check_keyword_arguments(cleaned_custom_options) kwargs = { **kwargs, **{f"MY.{key}": val for key, val in cleaned_custom_options.items()}, } super().__init__(**kwargs) if fixed_input_files is None: fixed_input_files = [] if isinstance(fixed_input_files, (str, Path)): fixed_input_files = [fixed_input_files] self.fixed_input_files = fixed_input_files self.input_files = input_files self.output_remaps = output_remaps def _check_keyword_arguments(self, kwargs): normalized_keys = set(k.lower() for k in kwargs.keys()) reserved_keys_in_kwargs = normalized_keys.intersection(self.RESERVED_KEYS) if len(reserved_keys_in_kwargs) != 0: if len(reserved_keys_in_kwargs) == 1: s = "is a reserved keyword" else: s = "are reserved keywords" raise exceptions.ReservedOptionKeyword( f'{",".join(reserved_keys_in_kwargs)} {s} and cannot be used' )
[docs] @classmethod def merge(cls, *others: "MapOptions") -> "MapOptions": """ Merge any number of :class:`MapOptions` together, like a :class:`collections.ChainMap`. Options closer to the left take priority. .. note:: ``fixed_input_files`` is a special case, and is merged up the chain instead of being overwritten. ``requirements`` are also combined, in a way where all requirements must be satisfied. """ new = cls() for other in reversed(others): new.data.update(other.data) # these need special handling, because they are stored as attributes # instead of in the dictionary new.fixed_input_files.extend(other.fixed_input_files) new.input_files = other.input_files merged_requirements = merge_requirements( *(other.get(REQUIREMENTS, None) for other in others) ) if merged_requirements is not None: new[REQUIREMENTS] = merged_requirements return new
def normalize_path(path: Union[str, Path]) -> str: """ Turn input file paths into a format that HTCondor can understand. In particular, all local file paths must be turned into posix-style paths (even on Windows!) """ if isinstance(path, transfer.TransferPath): return path.as_url() elif isinstance(path, Path) or "://" not in path: return Path(path).absolute().as_posix() return path def create_submit_object_and_itemdata( tag: str, map_dir: Path, num_components: int, map_options: MapOptions, ) -> Tuple[htcondor.Submit, List[Dict[str, str]]]: run_delivery_setup( tag, map_dir, settings["DELIVERY_METHOD"], ) descriptors = get_base_descriptors(tag, map_dir, settings["DELIVERY_METHOD"],) descriptors[REQUIREMENTS] = merge_requirements( descriptors.get(REQUIREMENTS, None), map_options.get(REQUIREMENTS, None), ) itemdata = [{"component": str(idx)} for idx in range(num_components)] input_files = descriptors.get("transfer_input_files", []) input_files += [ (map_dir / names.FUNC).as_posix(), (map_dir / names.INPUTS_DIR / f"$(component).{names.INPUT_EXT}").as_posix(), ] input_files.extend(normalize_path(f) for f in map_options.fixed_input_files) # if any of the components have per-component input files, use a submit macro to insert them if map_options.input_files is not None and any(map_options.input_files): input_files.append("$(extra_input_files)") joined = [ normalize_path(files) if isinstance(files, (str, Path, transfer.TransferPath)) # single file else ", ".join(normalize_path(f) for f in files) # multiple files for files in map_options.input_files ] if len(joined) != num_components: raise exceptions.MisalignedInputData( f"Length of input_files does not match length of input (len(input_files) = {len(input_files)}, len(inputs) = {num_components})" ) for itemdatum, files in zip(itemdata, joined): itemdatum["extra_input_files"] = files descriptors["transfer_input_files"] = ",".join(input_files) if map_options.output_remaps is not None and any(map_options.output_remaps): # TODO: I would prefer to do this in the base descriptors, but it looks like an "empty" remap triggers strange behavior descriptors["transfer_output_remaps"] = ( descriptors["transfer_output_remaps"].rstrip('"') + '; $(extra_remaps) "' ) if isinstance(map_options.output_remaps, dict): output_remaps = [map_options.output_remaps] * num_components else: output_remaps = map_options.output_remaps for component, (itemdatum, remaps) in enumerate(zip(itemdata, output_remaps)): itemdatum["extra_remaps"] = " ; ".join( f"{Path(names.USER_TRANSFER_DIR) / str(component) / k}={v.as_url()}" for k, v in remaps.items() ) for opt_key, opt_value in map_options.items(): if not isinstance(opt_value, str): # implies it is iterable itemdata_key = f"itemdata_for_{opt_key}" opt_value = tuple(opt_value) if len(opt_value) != num_components: raise exceptions.MisalignedInputData( f"Length of {opt_key} does not match length of input (len({opt_key}) = {len(opt_value)}, len(inputs) = {num_components})" ) for dct, v in zip(itemdata, opt_value): dct[itemdata_key] = v descriptors[opt_key] = f"$({itemdata_key})" else: descriptors[opt_key] = opt_value if descriptors[REQUIREMENTS] is None: descriptors.pop(REQUIREMENTS) sub = htcondor.Submit(descriptors) return sub, itemdata
[docs] def register_delivery_method( name: str, descriptors_func: Callable[[str, Path], dict], setup_func: Optional[Callable[[str, Path], None]] = None, ) -> None: """ Register a new delivery method with HTMap. Parameters ---------- name The name of the delivery method; this is what the ``DELIVERY_METHOD`` should be set to to use this delivery method. descriptors_func The function that provides the HTCondor submit descriptors for this delivery method. setup_func The function that does any setup necessary to running the map. """ if setup_func is None: setup_func = lambda *args: None BASE_OPTIONS_FUNCTION_BY_DELIVERY[name] = descriptors_func SETUP_FUNCTION_BY_DELIVERY[name] = setup_func
def unregister_delivery_mechanism(name: str) -> None: BASE_OPTIONS_FUNCTION_BY_DELIVERY.pop(name) SETUP_FUNCTION_BY_DELIVERY.pop(name) def merge_requirements(*requirements: Optional[str]) -> Optional[str]: requirements = [req for req in requirements if req is not None] if len(requirements) == 0: return None return " && ".join(f"({req})" for req in requirements) def get_base_descriptors(tag: str, map_dir: Path, delivery: str,) -> dict: map_dir = map_dir.absolute() output_files = [ f"{names.TRANSFER_DIR}/", f"{names.USER_TRANSFER_DIR}/$(component)", ] output_remaps = [ f'$(component).{names.OUTPUT_EXT}={(map_dir / names.OUTPUTS_DIR / f"$(component).{names.OUTPUT_EXT}").as_posix()}' ] if utils.CAN_USE_URL_OUTPUT_TRANSFER: output_files.append(names.TRANSFER_PLUGIN_MARKER) output_remaps.append(f"{names.TRANSFER_PLUGIN_MARKER}=htmap://_") core = { "JobBatchName": tag, "log": (map_dir / names.EVENT_LOG).as_posix(), "submit_event_notes": "$(component)", "stdout": (map_dir / names.JOB_LOGS_DIR / f"$(component).{names.STDOUT_EXT}").as_posix(), "stderr": (map_dir / names.JOB_LOGS_DIR / f"$(component).{names.STDERR_EXT}").as_posix(), "should_transfer_files": "YES", "when_to_transfer_output": "ON_EXIT_OR_EVICT", "transfer_output_files": " , ".join(output_files), "transfer_output_remaps": f'" {" ; ".join(output_remaps)} "', "on_exit_hold": "ExitCode =!= 0", "initialdir": f"{(map_dir / names.OUTPUT_FILES_DIR).as_posix()}", "MY.component": "$(component)", "MY.IsHTMapJob": "True", } if utils.CAN_USE_URL_OUTPUT_TRANSFER: core["transfer_plugins"] = f"htmap={(map_dir / names.TRANSFER_PLUGIN).as_posix()}" try: base = BASE_OPTIONS_FUNCTION_BY_DELIVERY[delivery](tag, map_dir) except KeyError: raise exceptions.UnknownPythonDeliveryMethod( f"'{delivery}' is not a known delivery mechanism" ) from_settings = settings.get("MAP_OPTIONS", default={}) merged = { **core, **base, **from_settings, } merged_requirements = merge_requirements( core.get(REQUIREMENTS, None), base.get(REQUIREMENTS, None), from_settings.get(REQUIREMENTS, None), ) if merged_requirements is not None: merged[REQUIREMENTS] = merged_requirements return merged def run_delivery_setup(tag: str, map_dir: Path, delivery: str,) -> None: _copy_run_scripts(map_dir) try: SETUP_FUNCTION_BY_DELIVERY[delivery](tag, map_dir) except KeyError: raise exceptions.UnknownPythonDeliveryMethod( f"'{delivery}' is not a known delivery mechanism" ) def _copy_run_scripts(map_dir: Path): run_script_source_dir = Path(__file__).parent / "run" run_scripts = [ run_script_source_dir / names.RUN_SCRIPT, run_script_source_dir / names.RUN_WITH_SINGULARITY_SCRIPT, run_script_source_dir / names.RUN_WITH_TRANSPLANT_SCRIPT, run_script_source_dir / names.TRANSFER_PLUGIN, ] for src in run_scripts: target = map_dir / src.name shutil.copy2(src, target) def _get_base_descriptors_for_assume(tag: str, map_dir: Path,) -> dict: return { "universe": "vanilla", "executable": (map_dir / names.RUN_SCRIPT).as_posix(), "arguments": "$(component)", } register_delivery_method( "assume", descriptors_func=_get_base_descriptors_for_assume, ) def _get_base_descriptors_for_docker(tag: str, map_dir: Path,) -> dict: return { "universe": "docker", "docker_image": settings["DOCKER.IMAGE"], "executable": (map_dir / names.RUN_SCRIPT).as_posix(), "arguments": "$(component)", "transfer_executable": "True", } register_delivery_method( "docker", descriptors_func=_get_base_descriptors_for_docker, ) def _get_base_descriptors_for_shared(tag: str, map_dir: Path,) -> dict: return { "universe": "vanilla", "executable": Path(sys.executable).absolute().as_posix(), "transfer_executable": "False", "arguments": f"{names.RUN_SCRIPT} $(component)", "transfer_input_files": [(map_dir / names.RUN_SCRIPT).as_posix(),], } register_delivery_method( "shared", descriptors_func=_get_base_descriptors_for_shared, ) def _get_base_descriptors_for_singularity(tag: str, map_dir: Path,) -> dict: return { "universe": "vanilla", REQUIREMENTS: "HasSingularity == true", "executable": (map_dir / names.RUN_WITH_SINGULARITY_SCRIPT).as_posix(), "transfer_input_files": [(map_dir / names.RUN_SCRIPT).as_posix(),], "arguments": f'{settings["SINGULARITY.IMAGE"]} $(component)', "transfer_executable": "True", } register_delivery_method( "singularity", descriptors_func=_get_base_descriptors_for_singularity, ) def _get_base_descriptors_for_transplant(tag: str, map_dir: Path,) -> dict: pip_freeze = _get_pip_freeze() h = _get_transplant_hash(pip_freeze) tif_path = settings.get("TRANSPLANT.ALTERNATE_INPUT_PATH") if tif_path is None: tif_path = (Path(settings["TRANSPLANT.DIR"]) / h).as_posix() return { "universe": "vanilla", "executable": (map_dir / names.RUN_WITH_TRANSPLANT_SCRIPT).as_posix(), "arguments": f"$(component) {h}", "transfer_input_files": [(map_dir / names.RUN_SCRIPT).as_posix(), tif_path,], } def _run_delivery_setup_for_transplant(tag: str, map_dir: Path,) -> None: if not settings.get("TRANSPLANT.ASSUME_EXISTS", False): if "usr" in sys.executable: raise exceptions.CannotTransplantPython( "System Python installations cannot be transplanted" ) if sys.platform == "win32": raise exceptions.CannotTransplantPython( "Transplant delivery does not work from Windows" ) py_dir = Path(sys.executable).parent.parent pip_freeze = _get_pip_freeze() target = Path(settings["TRANSPLANT.DIR"]) / _get_transplant_hash(pip_freeze) zip_path = target.with_name(f"{target.stem}.tar.gz") if zip_path.exists(): # cached version already exists logger.debug(f"Using cached zipped python install at {zip_path}") return logger.debug( f"Creating zipped Python install for transplant from {py_dir} in {target.parent} ..." ) try: shutil.make_archive( base_name=str(target), format="gztar", root_dir=py_dir, ) except BaseException as e: zip_path.unlink() logger.debug(f"Removed partial zipped Python install at {target}") raise e zip_path.rename(target) pip_path = zip_path.with_name(f"{target.stem}.pip") pip_path.write_bytes(pip_freeze) logger.debug(f"Created zipped Python install for transplant, stored at {zip_path}") def _get_pip_freeze() -> bytes: return utils.pip_freeze().encode("utf-8") def _get_transplant_hash(pip_freeze_output: bytes) -> str: h = hashlib.md5() h.update(pip_freeze_output) return h.hexdigest() register_delivery_method( "transplant", descriptors_func=_get_base_descriptors_for_transplant, setup_func=_run_delivery_setup_for_transplant, )