flyte.extras
Flyte extras package. This package provides various utilities that make it possible to build highly customized workflows.
-
ContainerTask: Execute arbitrary pre-containerized applications, without needing the
flyte-sdkto be installed. This extra usesflyte copilotsystem to inject inputs and slurp outputs from the container run. -
Time utilities: Usage of Time.now, time.sleep or asyncio.sleep bring non-determinism to a program. This module provides a few utilities that make it possible to bring determinism to workflows that need to access time related functions. This determinism persists across crashes and restarts making the process durable.
Directory
Classes
| Class | Description |
|---|---|
ContainerTask |
This is an intermediate class that represents Flyte Tasks that run a container at execution time. |
Methods
| Method | Description |
|---|---|
durable_sleep() |
durable_sleep enables the process to sleep for seconds seconds even if the process recovers from a crash. |
durable_time() |
Returns the current time for every unique invocation of durable_time. |
Methods
durable_sleep()
Default invocation is sync and will block.
To call it asynchronously, use the function .aio() on the method name itself, e.g.,:
result = await durable_sleep.aio().
def durable_sleep(
seconds: float,
)durable_sleep enables the process to sleep for seconds seconds even if the process recovers from a crash.
This method can be invoked multiple times. If the process crashes, the invocation of durable_sleep will behave
like as-if the process has been sleeping since the first time this method was invoked.
Examples:
import flyte.durable
env = flyte.TaskEnvironment("env")
@env.task
async def main():
# Do something
my_work()
# Now we need to sleep for 1 hour before proceeding.
await flyte.durable.sleep.aio(3600) # Even if process crashes, it will resume and only sleep for
# 1 hour in agregate. If the scheduling takes longer, it
# will simply return immediately.
# thing to be done after 1 hour
my_work()| Parameter | Type | Description |
|---|---|---|
seconds |
float |
float time to sleep for |
durable_time()
Default invocation is sync and will block.
To call it asynchronously, use the function .aio() on the method name itself, e.g.,:
result = await durable_time.aio().
def durable_time()Returns the current time for every unique invocation of durable_time. If the same invocation is encountered again
the previously returned time is returned again, ensuring determinism.
Similar to using time.time() just durable!
Returns: float