Skip to content

FutureOutput

chemcloud.models

TaskStatus

Tasks status for a submitted compute job

FutureOutput

Represents one or more asynchronous compute tasks.

Developer Note

Rather than refactoring into a BaseFutureOutput with FutureOutput and FutureOutputs subclass, we use the return_single_output flag to determine when the .get() method should return a ProgramOutput or list[ProgramOutput] and when to enable the .task_id attribute. This simplifies the API, reduces code complexity, removes the need for isinstance checks, and still gives the same end user experience. We can rethink this design if it becomes an issue.

Attributes:

Name Type Description
task_ids list[str]

A list of task IDs from a compute submission.

input_data list[str]

A list of input data objects for each task.

program str

The program used for the computation.

client Any

A CCClient instance that can perform HTTP requests to check task status.

return_single_output bool

If True, indicates that the .get() method will return a single ProgramOutput rather than a list. Also enables the .task_id property.

outputs list[Optional[ProgramOutput]]

A list of ProgramOutputs once tasks are completed (order corresponds to task_ids). Generally not passed by the user, but used internally to track task outputs.

statuses list[TaskStatus]

A list of TaskStatus enums corresponding to the status of each task. Generally not passed by the user, but used internally to track task status.

task_id property

task_id: str

Return the task id if only a single computation was submitted.

is_ready property

is_ready: bool

Synch wrapper around is_ready_async.

refresh_async async

refresh_async() -> None

Refresh the status and output for uncollected tasks.

Source code in chemcloud/models.py
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
async def refresh_async(self) -> None:
    """Refresh the status and output for uncollected tasks."""
    logger.debug("Refreshing task statuses and outputs.")

    # Identify unfinished tasks
    assert self.statuses is not None  # For mypy
    unfinished_indices = [
        i for i, status in enumerate(self.statuses) if status not in READY_STATES
    ]
    if not unfinished_indices:
        logger.debug("No unfinished tasks to refresh.")
        return  # Nothing to refresh

    # Build coroutines to refresh unfinished tasks
    coroutines = [
        self.client.fetch_output_async(self.task_ids[i]) for i in unfinished_indices
    ]
    # Run the coroutines in parallel; collect statues and results.
    logger.info(f"Refreshing {len(unfinished_indices)} unfinished task(s).")
    results = await asyncio.gather(*coroutines, return_exceptions=True)

    # Update statuses and outputs based on results
    for i, result in zip(unfinished_indices, results):
        # Insulate users against all HTTP errors
        if isinstance(result, HTTPError):
            logger.error(
                f"Error collecting task {self.task_ids[i]}: {result}", exc_info=True
            )
            self.statuses[i] = TaskStatus.FAILURE
            self.outputs[i] = self._output_from_exception(result, self.inputs[i])
        else:
            assert (
                isinstance(result, tuple) and len(result) == 2
            ), "Invalid result returned."
            logger.debug(f"Task {self.task_ids[i]} collected: status {result[0]}")
            self.statuses[i], self.outputs[i] = result

refresh

refresh()

Sync wrapper around refresh_async.

Source code in chemcloud/models.py
178
179
180
def refresh(self):
    """Sync wrapper around `refresh_async`."""
    return self.client.run(self.refresh_async())

get_async async

get_async(
    timeout: Optional[float] = None,
    initial_interval: float = 1.0,
) -> Union[ProgramOutput, list[ProgramOutput]]

Block until all tasks complete and return their ProgramOutputs.

If only one task was submitted, returns the single result; otherwise, returns a list of program_outputs.

Parameters:

Name Type Description Default
timeout Optional[float]

The maximum time to wait for all tasks to complete.

None
initial_interval float

The initial interval between status checks.

1.0

Returns:

Type Description
Union[ProgramOutput, list[ProgramOutput]]

The ProgramOutput objects for all tasks once they are complete.

Raises:

Type Description
TimeoutError

If the timeout is exceeded before all tasks complete.

Source code in chemcloud/models.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
async def get_async(
    self, timeout: Optional[float] = None, initial_interval: float = 1.0
) -> Union[ProgramOutput, list[ProgramOutput]]:
    """
    Block until all tasks complete and return their ProgramOutputs.

    If only one task was submitted, returns the single result;
    otherwise, returns a list of program_outputs.

    Parameters:
        timeout: The maximum time to wait for all tasks to complete.
        initial_interval: The initial interval between status checks.

    Returns:
        The ProgramOutput objects for all tasks once they are complete.

    Raises:
        TimeoutError: If the timeout is exceeded before all tasks complete.
    """
    start = time()
    interval = initial_interval
    completed = 0
    while not await self.is_ready_async():
        # Check for timeout
        elapsed = time() - start
        logger.debug(
            f"Waiting for tasks to complete... elapsed time: {elapsed:.2f}s"
        )
        if timeout is not None and elapsed > timeout:
            raise TimeoutError(
                f"Timeout of {timeout} seconds exceeded while waiting for tasks."
            )
        # Refresh statuses and outputs
        await self.refresh_async()
        # Check for new completions
        new_completed = sum(status in READY_STATES for status in self.statuses)
        if new_completed > completed:
            completed = new_completed
            interval = initial_interval  # Reset interval if new completions found
        else:
            # Increase interval gradually (up to a max value)
            interval = min(interval * 1.5, 30.0)
        logger.debug(f"Sleeping for {interval:.2f} seconds before next poll.")
        await asyncio.sleep(interval)

    logger.info("All tasks are ready. Returning results.")
    assert all(
        output is not None for output in self.outputs
    ), "All outputs should be collected at this point."
    if self.return_single_output:
        return cast(ProgramOutput, self.outputs[0])
    return cast(list[ProgramOutput], self.outputs)

get

get(
    *args, **kwargs
) -> Union[ProgramOutput, list[ProgramOutput]]

Sync wrapper around get_async.

Source code in chemcloud/models.py
235
236
237
def get(self, *args, **kwargs) -> Union[ProgramOutput, list[ProgramOutput]]:
    """Sync wrapper around `get_async`."""
    return self.client.run(self.get_async(*args, **kwargs))

is_ready_async async

is_ready_async() -> bool

Asynchronously refreshes the statuses and checks if all tasks are complete.

Source code in chemcloud/models.py
239
240
241
242
243
244
async def is_ready_async(self) -> bool:
    """
    Asynchronously refreshes the statuses and checks if all tasks are complete.
    """
    await self.refresh_async()
    return all(status in READY_STATES for status in self.statuses)

as_completed_async async

as_completed_async(
    initial_interval: float = 1.0,
) -> AsyncGenerator[ProgramOutput, None]

Yields ProgramOutput objects as tasks become ready (SUCCESS, FAILURE, or REVOKED). Blocks until all tasks have finished or the generator is exhausted.

This uses the same refresh logic as .get_async(), so it will automatically handle errors and generate ProgramOutput objects (including error placeholders) in the same way. The order in which results are yielded is not guaranteed to match the exact order tasks finish on the server.

Parameters:

Name Type Description Default
initial_interval float

The initial interval (in seconds) between refresh calls.

1.0

Yields:

Type Description
AsyncGenerator[ProgramOutput, None]

ProgramOutput objects for each task as they become ready.

AsyncGenerator[ProgramOutput, None]

If a task fails, the yielded ProgramOutput will contain

AsyncGenerator[ProgramOutput, None]

error/traceback information (just like .get_async()).

Source code in chemcloud/models.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
async def as_completed_async(
    self, initial_interval: float = 1.0
) -> AsyncGenerator[ProgramOutput, None]:
    """
    Yields ProgramOutput objects as tasks become ready (SUCCESS, FAILURE, or REVOKED).
    Blocks until all tasks have finished or the generator is exhausted.

    This uses the same refresh logic as `.get_async()`, so it will automatically
    handle errors and generate ProgramOutput objects (including error placeholders)
    in the same way. The order in which results are yielded is not guaranteed
    to match the exact order tasks finish on the server.

    Parameters:
        initial_interval: The initial interval (in seconds) between refresh calls.
        This interval is increased by a factor of 1.5 on every poll cycle that
        finds no newly completed tasks, up to a maximum of 30 seconds.

    Yields:
        ProgramOutput objects for each task as they become ready.
        If a task fails, the yielded ProgramOutput will contain
        error/traceback information (just like `.get_async()`).
    """
    done_indices: set[int] = set()
    interval = initial_interval
    while len(done_indices) < len(self.task_ids):
        logger.debug("Polling for task completions...")
        await self.refresh_async()
        any_new = False
        for i, status in enumerate(self.statuses):
            if i not in done_indices and status in READY_STATES:
                logger.info(
                    f"Task {self.task_ids[i]} is complete with status {status}."
                )
                done_indices.add(i)
                any_new = True
                if self.outputs[i] is not None:
                    yield cast(ProgramOutput, self.outputs[i])
                    self.outputs[i] = None  # Optional: clear to free memory

        if any_new:
            # Reset interval if new completions were found.
            interval = initial_interval

        else:
            interval = min(interval * 1.5, 30.0)
            logger.debug(f"No new completions; sleeping {interval:.2f} seconds.")
            await asyncio.sleep(interval)

as_completed

as_completed(
    initial_interval: float = 1.0,
) -> Generator[ProgramOutput, None, None]

Synchronous implementation of as_completed_async.

Cannot directly wrap async version due to it containing an AsyncGenerator, and asyncio.sleep() so we must reimplement the logic here.

Source code in chemcloud/models.py
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def as_completed(
    self, initial_interval: float = 1.0
) -> Generator[ProgramOutput, None, None]:
    """
    Synchronous implementation of `as_completed_async`.

    Cannot directly wrap async version due to it containing an AsyncGenerator, and
    asyncio.sleep() so we must reimplement the logic here.
    """
    done_indices: set[int] = set()
    interval = initial_interval

    # Keep polling until all tasks are completed
    while len(done_indices) < len(self.task_ids):
        logger.debug("Polling for task completions...")
        self.refresh()
        any_new = False
        for i, status in enumerate(self.statuses):
            if i not in done_indices and status in READY_STATES:
                logger.info(
                    f"Task {self.task_ids[i]} is complete with status {status}."
                )
                done_indices.add(i)
                any_new = True
                assert self.outputs[i] is not None
                yield cast(ProgramOutput, self.outputs[i])
                self.outputs[i] = None  # Clear the output to save memory

        if any_new:
            # Reset interval if new completions were found.
            interval = initial_interval
        else:
            # Increase interval if nothing new was found.
            interval = min(interval * 1.5, 30.0)
            logger.debug(f"No new completions; sleeping {interval:.2f} seconds.")
            sleep(interval)

save

save(path: Optional[Union[str, Path]] = None) -> None

Save the FutureOutput to a JSON file.

Source code in chemcloud/models.py
368
369
370
371
372
373
374
375
376
def save(self, path: Optional[Union[str, Path]] = None) -> None:
    """Save the FutureOutput to a JSON file."""
    path = (
        Path(path)
        if path is not None
        else Path.cwd() / f"future-{uuid4().hex}.json"
    )
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(json.dumps(self.model_dump()))

open classmethod

open(path: Union[str, Path]) -> FutureOutput

Load a FutureOutput from a JSON file.

Source code in chemcloud/models.py
378
379
380
381
382
@classmethod
def open(self, path: Union[str, Path]) -> "FutureOutput":
    """Load a FutureOutput from a JSON file."""
    data = json.loads(Path(path).read_text())
    return FutureOutput(**data)