Source code for flytekit.image_spec.image_spec

import base64
import copy
import hashlib
import os
import pathlib
import re
import typing
from abc import abstractmethod
from dataclasses import asdict, dataclass
from functools import lru_cache
from importlib import metadata
from typing import Dict, List, Optional, Tuple, Union

import click
import requests
from packaging.version import Version

from flytekit.exceptions.user import FlyteAssertion

DOCKER_HUB = "docker.io"
_F_IMG_ID = "_F_IMG_ID"
FLYTE_FORCE_PUSH_IMAGE_SPEC = "FLYTE_FORCE_PUSH_IMAGE_SPEC"


[docs] @dataclass class ImageSpec: """ This class is used to specify the docker image that will be used to run the task. Args: name: name of the image. python_version: python version of the image. Use default python in the base image if None. builder: Type of plugin to build the image. Use envd by default. source_root: source root of the image. env: environment variables of the image. registry: registry of the image. packages: list of python packages to install. conda_packages: list of conda packages to install. conda_channels: list of conda channels. requirements: path to the requirements.txt file. apt_packages: list of apt packages to install. cuda: version of cuda to install. cudnn: version of cudnn to install. base_image: base image of the image. platform: Specify the target platforms for the build output (for example, windows/amd64 or linux/amd64,darwin/arm64 pip_index: Specify the custom pip index url pip_extra_index_url: Specify one or more pip index urls as a list registry_config: Specify the path to a JSON registry config file entrypoint: List of strings to overwrite the entrypoint of the base image with, set to [] to remove the entrypoint. commands: Command to run during the building process tag_format: Custom string format for image tag. The ImageSpec hash passed in as `spec_hash`. For example, to add a "dev" suffix to the image tag, set `tag_format="{spec_hash}-dev"` """ name: str = "flytekit" python_version: str = None # Use default python in the base image if None. builder: Optional[str] = None source_root: Optional[str] = None env: Optional[typing.Dict[str, str]] = None registry: Optional[str] = None packages: Optional[List[str]] = None conda_packages: Optional[List[str]] = None conda_channels: Optional[List[str]] = None requirements: Optional[str] = None apt_packages: Optional[List[str]] = None cuda: Optional[str] = None cudnn: Optional[str] = None base_image: Optional[Union[str, "ImageSpec"]] = None platform: str = "linux/amd64" pip_index: Optional[str] = None pip_extra_index_url: Optional[List[str]] = None registry_config: Optional[str] = None entrypoint: Optional[List[str]] = None commands: Optional[List[str]] = None tag_format: Optional[str] = None def __post_init__(self): self.name = self.name.lower() self._is_force_push = os.environ.get(FLYTE_FORCE_PUSH_IMAGE_SPEC, False) # False by default if self.registry: self.registry = self.registry.lower() parameters_str_list = [ "packages", "conda_channels", "conda_packages", "apt_packages", "pip_extra_index_url", "entrypoint", "commands", ] for parameter in parameters_str_list: attr = getattr(self, parameter) parameter_is_None = attr is None parameter_is_list_string = isinstance(attr, list) and all(isinstance(v, str) for v in attr) if not (parameter_is_None or parameter_is_list_string): error_msg = f"{parameter} must be a list of strings or None" raise ValueError(error_msg)
[docs] def image_name(self) -> str: """Full image name with tag.""" image_name = self._image_name() try: return ImageBuildEngine._IMAGE_NAME_TO_REAL_NAME[image_name] except KeyError: return image_name
def _image_name(self) -> str: """Construct full image name with tag.""" tag = calculate_hash_from_image_spec(self) if self.tag_format: tag = self.tag_format.format(spec_hash=tag) container_image = f"{self.name}:{tag}" if self.registry: container_image = f"{self.registry}/{container_image}" return container_image
[docs] def is_container(self) -> bool: from flytekit.core.context_manager import ExecutionState, FlyteContextManager state = FlyteContextManager.current_context().execution_state if state and state.mode and state.mode != ExecutionState.Mode.LOCAL_WORKFLOW_EXECUTION: return os.environ.get(_F_IMG_ID) == self.image_name() return True
[docs] def exist(self) -> Optional[bool]: """ Check if the image exists in the registry. Return True if the image exists in the registry, False otherwise. Return None if failed to check if the image exists due to the permission issue or other reasons. """ import docker from docker.errors import APIError, ImageNotFound try: client = docker.from_env() if self.registry: client.images.get_registry_data(self.image_name()) else: client.images.get(self.image_name()) return True except APIError as e: if e.response.status_code == 404: return False if re.match(f"unknown: repository .*{self.name} not found", e.explanation): click.secho(f"Received 500 error with explanation: {e.explanation}", fg="yellow") return False click.secho(f"Failed to check if the image exists with error:\n {e}", fg="red") return None except ImageNotFound: return False except Exception as e: tag = calculate_hash_from_image_spec(self) # if docker engine is not running locally, use requests to check if the image exists. if "localhost:" in self.registry: container_registry = self.registry elif self.registry and "/" in self.registry: container_registry = self.registry.split("/")[0] else: # Assume the image is in docker hub if users don't specify a registry, such as ghcr.io, docker.io. container_registry = DOCKER_HUB if container_registry == DOCKER_HUB: url = f"https://hub.docker.com/v2/repositories/{self.registry}/{self.name}/tags/{tag}" response = requests.get(url) if response.status_code == 200: return True if response.status_code == 404 and "not found" in str(response.content): return False if "Not supported URL scheme http+docker" in str(e): raise RuntimeError( f"{str(e)}\n" f"Error: Incompatible Docker package version.\n" f"Current version: {docker.__version__}\n" f"Please upgrade the Docker package to version 7.1.0 or higher.\n" f"You can upgrade the package by running:\n" f" pip install --upgrade docker" ) click.secho(f"Failed to check if the image exists with error:\n {e}", fg="red") return None
def __hash__(self): return hash(asdict(self).__str__())
[docs] def with_commands(self, commands: Union[str, List[str]]) -> "ImageSpec": """ Builder that returns a new image spec with an additional list of commands that will be executed during the building process. """ new_image_spec = copy.deepcopy(self) if new_image_spec.commands is None: new_image_spec.commands = [] if isinstance(commands, List): new_image_spec.commands.extend(commands) else: new_image_spec.commands.append(commands) return new_image_spec
[docs] def with_packages(self, packages: Union[str, List[str]]) -> "ImageSpec": """ Builder that returns a new image speck with additional python packages that will be installed during the building process. """ new_image_spec = copy.deepcopy(self) if new_image_spec.packages is None: new_image_spec.packages = [] if isinstance(packages, List): new_image_spec.packages.extend(packages) else: new_image_spec.packages.append(packages) return new_image_spec
[docs] def with_apt_packages(self, apt_packages: Union[str, List[str]]) -> "ImageSpec": """ Builder that returns a new image spec with additional list of apt packages that will be executed during the building process. """ new_image_spec = copy.deepcopy(self) if new_image_spec.apt_packages is None: new_image_spec.apt_packages = [] if isinstance(apt_packages, List): new_image_spec.apt_packages.extend(apt_packages) else: new_image_spec.apt_packages.append(apt_packages) return new_image_spec
[docs] def force_push(self) -> "ImageSpec": """ Builder that returns a new image spec with force push enabled. """ new_image_spec = copy.deepcopy(self) new_image_spec._is_force_push = True return new_image_spec
class ImageSpecBuilder: @abstractmethod def build_image(self, image_spec: ImageSpec) -> Optional[str]: """ Build the docker image and push it to the registry. Args: image_spec: image spec of the task. Returns: fully_qualified_image_name: Fully qualified image name. If None, then `image_spec.image_name()` is used. """ raise NotImplementedError("This method is not implemented in the base class.") def should_build(self, image_spec: ImageSpec) -> bool: """ Whether or not the builder should build the ImageSpec. Args: image_spec: image spec of the task. Returns: True if the image should be built, otherwise it returns False. """ img_name = image_spec.image_name() exist = image_spec.exist() if exist is False: click.secho(f"Image {img_name} not found. building...", fg="blue") return True elif exist is True: if image_spec._is_force_push: click.secho(f"Overwriting existing image {img_name}.", fg="blue") return True click.secho(f"Image {img_name} found. Skip building.", fg="blue") else: click.secho(f"Flytekit assumes the image {img_name} already exists.", fg="blue") return False class ImageBuildEngine: """ ImageBuildEngine contains a list of builders that can be used to build an ImageSpec. """ _REGISTRY: typing.Dict[str, Tuple[ImageSpecBuilder, int]] = {} # _IMAGE_NAME_TO_REAL_NAME is used to keep track of the fully qualified image name # returned by the image builder. This allows ImageSpec to map from `image_spc.image_name()` # to the real qualified name. _IMAGE_NAME_TO_REAL_NAME: Dict[str, str] = {} @classmethod def register(cls, builder_type: str, image_spec_builder: ImageSpecBuilder, priority: int = 5): cls._REGISTRY[builder_type] = (image_spec_builder, priority) @classmethod @lru_cache def build(cls, image_spec: ImageSpec): from flytekit.core.context_manager import FlyteContextManager execution_mode = FlyteContextManager.current_context().execution_state.mode # Do not build in executions if execution_mode is not None: return if isinstance(image_spec.base_image, ImageSpec): cls.build(image_spec.base_image) image_spec.base_image = image_spec.base_image.image_name() if image_spec.builder is None and cls._REGISTRY: builder = max(cls._REGISTRY, key=lambda name: cls._REGISTRY[name][1]) else: builder = image_spec.builder img_name = image_spec.image_name() if cls._get_builder(builder).should_build(image_spec): cls._build_image(builder, image_spec, img_name) @classmethod def _get_builder(cls, builder: str) -> ImageSpecBuilder: if builder is None: raise AssertionError("There is no image builder registered.") if builder not in cls._REGISTRY: raise AssertionError(f"Image builder {builder} is not registered.") if builder == "envd": envd_version = metadata.version("envd") # flytekit v1.10.2+ copies the workflow code to the WorkDir specified in the Dockerfile. However, envd<0.3.39 # overwrites the WorkDir when building the image, resulting in a permission issue when flytekit downloads the file. if Version(envd_version) < Version("0.3.39"): raise FlyteAssertion( f"envd version {envd_version} is not compatible with flytekit>v1.10.2." f" Please upgrade envd to v0.3.39+." ) return cls._REGISTRY[builder][0] @classmethod def _build_image(cls, builder: str, image_spec: ImageSpec, img_name: str): fully_qualified_image_name = cls._get_builder(builder).build_image(image_spec) if fully_qualified_image_name is not None: cls._IMAGE_NAME_TO_REAL_NAME[img_name] = fully_qualified_image_name @lru_cache def _calculate_deduped_hash_from_image_spec(image_spec: ImageSpec): """ Calculate this special hash from the image spec, and it used to identify the imageSpec in the ImageConfig in the serialization context. ImageConfig: - deduced hash 1: flyteorg/flytekit: 123 - deduced hash 2: flyteorg/flytekit: 456 """ image_spec_bytes = asdict(image_spec).__str__().encode("utf-8") # copy the image spec to avoid modifying the original image spec. otherwise, the hash will be different. return base64.urlsafe_b64encode(hashlib.md5(image_spec_bytes).digest()).decode("ascii").rstrip("=") @lru_cache def calculate_hash_from_image_spec(image_spec: ImageSpec): """ Calculate the hash from the image spec. """ # copy the image spec to avoid modifying the original image spec. otherwise, the hash will be different. spec = copy.deepcopy(image_spec) if isinstance(spec.base_image, ImageSpec): spec.base_image = spec.base_image.image_name() if image_spec.source_root: from flytekit.tools.fast_registration import compute_digest from flytekit.tools.ignore import DockerIgnore, GitIgnore, IgnoreGroup, StandardIgnore ignore = IgnoreGroup(image_spec.source_root, [GitIgnore, DockerIgnore, StandardIgnore]) digest = compute_digest(image_spec.source_root, ignore.is_ignored) spec.source_root = digest if spec.requirements: spec.requirements = hashlib.sha1(pathlib.Path(spec.requirements).read_bytes()).hexdigest() # won't rebuild the image if we change the registry_config path spec.registry_config = None image_spec_bytes = asdict(spec).__str__().encode("utf-8") tag = base64.urlsafe_b64encode(hashlib.md5(image_spec_bytes).digest()).decode("ascii").rstrip("=") # replace "-" with "_" to make it a valid tag return tag.replace("-", "_") def hash_directory(path): """ Return the SHA-256 hash of the directory at the given path. """ hasher = hashlib.sha256() for root, dirs, files in os.walk(path): for file in files: with open(os.path.join(root, file), "rb") as f: while True: # Read file in small chunks to avoid loading large files into memory all at once chunk = f.read(4096) if not chunk: break hasher.update(chunk) return bytes(hasher.hexdigest(), "utf-8")