flytekit.dynamic#
- flytekit.dynamic(_task_function=None, task_config=None, cache=False, cache_serialize=False, cache_version='', retries=0, interruptible=None, deprecated='', timeout=0, container_image=None, environment=None, requests=None, limits=None, secret_requests=None, *, execution_mode=ExecutionBehavior.DYNAMIC, node_dependency_hints=None, task_resolver=None, docs=None, disable_deck=None, enable_deck=None, pod_template=None, pod_template_name=None, accelerator=None)#
Please first see the comments for
flytekit.task()andflytekit.workflow(). Thisdynamicconcept is an amalgamation of both and enables the user to pursue some pretty incredible constructs.In short, a task’s function is run at execution time only, and a workflow function is run at compilation time only (local execution notwithstanding). A dynamic workflow is modeled on the backend as a task, but at execution time, the function body is run to produce a workflow. It is almost as if the decorator changed from
@taskto@workflowexcept workflows cannot make use of their inputs like native Python values whereas dynamic workflows can. The resulting workflow is passed back to the Flyte engine and is run as a subworkflow. Simple usage@dynamic def my_dynamic_subwf(a: int) -> (typing.List[str], int): s = [] for i in range(a): s.append(t1(a=i)) return s, 5
Note in the code block that we call the Python
rangeoperator on the input. This is typically not allowed in a workflow but it is here. You can even express dependencies between tasks.@dynamic def my_dynamic_subwf(a: int, b: int) -> int: x = t1(a=a) return t2(b=b, x=x)
See the cookbook for a longer discussion.
- Parameters
_task_function (Optional[Callable[..., FuncOut]]) –
task_config (Optional[T]) –
cache (bool) –
cache_serialize (bool) –
cache_version (str) –
retries (int) –
interruptible (Optional[bool]) –
deprecated (str) –
timeout (Union[_datetime.timedelta, int]) –
container_image (Optional[Union[str, ImageSpec]]) –
requests (Optional[Resources]) –
limits (Optional[Resources]) –
secret_requests (Optional[List[Secret]]) –
execution_mode (PythonFunctionTask.ExecutionBehavior) –
node_dependency_hints (Optional[Iterable[Union[PythonFunctionTask, _annotated_launchplan.LaunchPlan, _annotated_workflow.WorkflowBase]]]) –
task_resolver (Optional[TaskResolverMixin]) –
docs (Optional[Documentation]) –
disable_deck (Optional[bool]) –
enable_deck (Optional[bool]) –
pod_template (Optional['PodTemplate']) –
pod_template_name (Optional[str]) –
accelerator (Optional[BaseAccelerator]) –
- Return type
Union[Callable[[Callable[…, FuncOut]], PythonFunctionTask[T]], PythonFunctionTask[T], Callable[…, FuncOut]]