Skip to content

AsyncDaprAgentSpecLoader

Asynchronous version of the loader for non-blocking operations.

Class Reference

AsyncDaprAgentSpecLoader

AsyncDaprAgentSpecLoader(
    tool_registry: ToolRegistry | None = None,
)

Async loader for converting OAS specifications to Dapr Agents components.

This class provides async methods to load OAS (Open Agent Spec) configurations from JSON or YAML format and convert them to Dapr Agents components.

The async operations use asyncio.to_thread for offloading synchronous conversion logic to a thread.

Example
async def main():
    loader = AsyncDaprAgentSpecLoader(
        tool_registry={
            "search_tool": search_function,
        }
    )

    # Load multiple files concurrently
    configs = await asyncio.gather(
        loader.load_yaml_file("agent1.yaml"),
        loader.load_yaml_file("agent2.yaml"),
        loader.load_json_file("workflow.json"),
    )

Initialize the async loader.

Parameters:

Name Type Description Default
tool_registry ToolRegistry | None

Dictionary mapping tool names to their callable implementations.

None
Source code in src/dapr_agents_oas_adapter/async_loader.py
def __init__(
    self,
    tool_registry: ToolRegistry | None = None,
) -> None:
    """Initialize the async loader.

    Args:
        tool_registry: Dictionary mapping tool names to their callable
                      implementations.
    """
    # Import here to avoid circular imports
    from dapr_agents_oas_adapter.loader import DaprAgentSpecLoader

    self._sync_loader = DaprAgentSpecLoader(tool_registry)
    self._logger = get_logger()

Functions

__aenter__ async

__aenter__() -> AsyncDaprAgentSpecLoader

Enter async context manager.

Source code in src/dapr_agents_oas_adapter/async_loader.py
async def __aenter__(self) -> AsyncDaprAgentSpecLoader:
    """Enter async context manager."""
    return self

__aexit__ async

__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: Any,
) -> None

Exit async context manager.

Source code in src/dapr_agents_oas_adapter/async_loader.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: Any,
) -> None:
    """Exit async context manager."""
    del exc_type, exc_val, exc_tb  # Unused but required by protocol
    await self.close()

load_yaml async

load_yaml(
    yaml_content: str,
) -> DaprAgentConfig | WorkflowDefinition

Load an OAS specification from YAML string.

Parameters:

Name Type Description Default
yaml_content str

YAML string containing the OAS specification

required

Returns:

Type Description
DaprAgentConfig | WorkflowDefinition

DaprAgentConfig for Agent components, WorkflowDefinition for Flows

Raises:

Type Description
ConversionError

If the YAML cannot be parsed or converted

Source code in src/dapr_agents_oas_adapter/async_loader.py
async def load_yaml(self, yaml_content: str) -> DaprAgentConfig | WorkflowDefinition:
    """Load an OAS specification from YAML string.

    Args:
        yaml_content: YAML string containing the OAS specification

    Returns:
        DaprAgentConfig for Agent components, WorkflowDefinition for Flows

    Raises:
        ConversionError: If the YAML cannot be parsed or converted
    """
    return await asyncio.to_thread(self._sync_loader.load_yaml, yaml_content)

load_json async

load_json(
    json_content: str,
) -> DaprAgentConfig | WorkflowDefinition

Load an OAS specification from JSON string.

Parameters:

Name Type Description Default
json_content str

JSON string containing the OAS specification

required

Returns:

Type Description
DaprAgentConfig | WorkflowDefinition

DaprAgentConfig for Agent components, WorkflowDefinition for Flows

Raises:

Type Description
ConversionError

If the JSON cannot be parsed or converted

Source code in src/dapr_agents_oas_adapter/async_loader.py
async def load_json(self, json_content: str) -> DaprAgentConfig | WorkflowDefinition:
    """Load an OAS specification from JSON string.

    Args:
        json_content: JSON string containing the OAS specification

    Returns:
        DaprAgentConfig for Agent components, WorkflowDefinition for Flows

    Raises:
        ConversionError: If the JSON cannot be parsed or converted
    """
    return await asyncio.to_thread(self._sync_loader.load_json, json_content)

load_dict async

load_dict(
    spec_dict: dict[str, Any],
) -> DaprAgentConfig | WorkflowDefinition

Load an OAS specification from a dictionary.

Parameters:

Name Type Description Default
spec_dict dict[str, Any]

Dictionary containing the OAS specification

required

Returns:

Type Description
DaprAgentConfig | WorkflowDefinition

DaprAgentConfig for Agent components, WorkflowDefinition for Flows

Raises:

Type Description
ConversionError

If the component type is not supported

Source code in src/dapr_agents_oas_adapter/async_loader.py
async def load_dict(self, spec_dict: dict[str, Any]) -> DaprAgentConfig | WorkflowDefinition:
    """Load an OAS specification from a dictionary.

    Args:
        spec_dict: Dictionary containing the OAS specification

    Returns:
        DaprAgentConfig for Agent components, WorkflowDefinition for Flows

    Raises:
        ConversionError: If the component type is not supported
    """
    return await asyncio.to_thread(self._sync_loader.load_dict, spec_dict)

run_sync

Utility function to run async code synchronously.

run_sync

run_sync(coro: Any) -> Any

Run an async coroutine synchronously.

This is a utility function for running async code from sync contexts.

Parameters:

Name Type Description Default
coro Any

The coroutine to run

required

Returns:

Type Description
Any

The result of the coroutine

Example
from dapr_agents_oas_adapter.async_loader import AsyncDaprAgentSpecLoader, run_sync

async def load_async():
    loader = AsyncDaprAgentSpecLoader()
    return await loader.load_yaml_file("agent.yaml")

# Run from sync code
result = run_sync(load_async())
Source code in src/dapr_agents_oas_adapter/async_loader.py
def run_sync(coro: Any) -> Any:
    """Run an async coroutine synchronously.

    This is a utility function for running async code from sync contexts.

    Args:
        coro: The coroutine to run

    Returns:
        The result of the coroutine

    Example:
        ```python
        from dapr_agents_oas_adapter.async_loader import AsyncDaprAgentSpecLoader, run_sync

        async def load_async():
            loader = AsyncDaprAgentSpecLoader()
            return await loader.load_yaml_file("agent.yaml")

        # Run from sync code
        result = run_sync(load_async())
        ```
    """
    try:
        asyncio.get_running_loop()
        # Already in an async context, create a new thread to run it
        import concurrent.futures  # pragma: no cover

        with concurrent.futures.ThreadPoolExecutor() as pool:  # pragma: no cover
            future = pool.submit(asyncio.run, coro)  # pragma: no cover
            return future.result()  # pragma: no cover
    except RuntimeError:
        # No running loop, create a new one
        return asyncio.run(coro)

Usage Examples

Basic Async Usage

from dapr_agents_oas_adapter import AsyncDaprAgentSpecLoader

loader = AsyncDaprAgentSpecLoader()

# Async load
config = await loader.load_yaml(yaml_content)

Context Manager

async with AsyncDaprAgentSpecLoader() as loader:
    config = await loader.load_dict(spec)
    # Resources cleaned up automatically

Concurrent Loading

import asyncio

async def load_all(yaml_contents: list[str]):
    async with AsyncDaprAgentSpecLoader() as loader:
        tasks = [loader.load_yaml(c) for c in yaml_contents]
        return await asyncio.gather(*tasks)

configs = await load_all(yaml_list)

Synchronous Wrapper

from dapr_agents_oas_adapter import AsyncDaprAgentSpecLoader, run_sync

loader = AsyncDaprAgentSpecLoader()

# Run async code in sync context
config = run_sync(loader.load_dict(spec))

With Tool Registry

async def async_tool(query: str) -> str:
    await asyncio.sleep(0.1)
    return f"Result: {query}"

loader = AsyncDaprAgentSpecLoader(
    tool_registry={"my_tool": async_tool}
)