• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

run-llama / llama_deploy / 15192340923

22 May 2025 04:52PM UTC coverage: 83.388% (+0.04%) from 83.349%
15192340923

Pull #507

github

web-flow
Merge 41c9c3086 into 31ec7f8eb
Pull Request #507: chore: remove orchestrators

12 of 19 new or added lines in 1 file covered. (63.16%)

1 existing line in 1 file now uncovered.

2555 of 3064 relevant lines covered (83.39%)

1.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

16.67
/llama_deploy/deploy/deploy.py
1
import asyncio
2✔
2

3
import httpx
2✔
4
from llama_index.core.workflow import Workflow
2✔
5
from pydantic_settings import BaseSettings
2✔
6

7
from llama_deploy.control_plane.server import ControlPlaneConfig, ControlPlaneServer
2✔
8
from llama_deploy.deploy.network_workflow import NetworkServiceManager
2✔
9
from llama_deploy.message_queues import (
2✔
10
    AbstractMessageQueue,
11
    AWSMessageQueue,
12
    AWSMessageQueueConfig,
13
    KafkaMessageQueue,
14
    KafkaMessageQueueConfig,
15
    RabbitMQMessageQueue,
16
    RabbitMQMessageQueueConfig,
17
    RedisMessageQueue,
18
    RedisMessageQueueConfig,
19
    SimpleMessageQueueConfig,
20
    SimpleMessageQueueServer,
21
    SolaceMessageQueue,
22
    SolaceMessageQueueConfig,
23
)
24
from llama_deploy.message_queues.simple import SimpleMessageQueue
2✔
25
from llama_deploy.services.workflow import WorkflowService, WorkflowServiceConfig
2✔
26

27
DEFAULT_TIMEOUT = 120.0
2✔
28

29

30
def _get_message_queue_config(config_dict: dict) -> BaseSettings:
2✔
31
    key = next(iter(config_dict.keys()))
×
32
    if key == SimpleMessageQueueConfig.__name__:
×
33
        return SimpleMessageQueueConfig(**config_dict[key])
×
34
    elif key == AWSMessageQueueConfig.__name__:
×
35
        return AWSMessageQueueConfig(**config_dict[key])
×
36
    elif key == KafkaMessageQueueConfig.__name__:
×
37
        return KafkaMessageQueueConfig(**config_dict[key])
×
38
    elif key == RabbitMQMessageQueueConfig.__name__:
×
39
        return RabbitMQMessageQueueConfig(**config_dict[key])
×
40
    elif key == RedisMessageQueueConfig.__name__:
×
41
        return RedisMessageQueueConfig(**config_dict[key])
×
42
    elif key == SolaceMessageQueueConfig.__name__:
×
43
        return SolaceMessageQueueConfig(**config_dict[key])
×
44
    else:
45
        raise ValueError(f"Unknown message queue: {key}")
×
46

47

48
def _get_message_queue_client(config: BaseSettings) -> AbstractMessageQueue:
2✔
49
    if isinstance(config, SimpleMessageQueueConfig):
×
50
        return SimpleMessageQueue(config)
×
51
    elif isinstance(config, AWSMessageQueueConfig):
×
52
        return AWSMessageQueue(config)
×
53
    elif isinstance(config, KafkaMessageQueueConfig):
×
54
        return KafkaMessageQueue(config)
×
55
    elif isinstance(config, RabbitMQMessageQueueConfig):
×
56
        return RabbitMQMessageQueue(config)
×
57
    elif isinstance(config, RedisMessageQueueConfig):
×
58
        return RedisMessageQueue(config)
×
59
    elif isinstance(config, SolaceMessageQueueConfig):
×
60
        return SolaceMessageQueue(config)
×
61
    else:
62
        raise ValueError(f"Invalid message queue config: {config}")
×
63

64

65
async def deploy_core(
2✔
66
    control_plane_config: ControlPlaneConfig | None = None,
67
    message_queue_config: BaseSettings | None = None,
68
    disable_message_queue: bool = False,
69
    disable_control_plane: bool = False,
70
) -> None:
71
    """
72
    Deploy the core components of the llama_deploy system.
73

74
    This function sets up and launches the message queue, control plane, and orchestrator.
75
    It handles the initialization and connection of these core components.
76

77
    Args:
78
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
79
        message_queue_config (Optional[BaseSettings]): Configuration for the message queue. Defaults to a local SimpleMessageQueue.
80
        orchestrator_config (Optional[SimpleOrchestratorConfig]): Configuration for the orchestrator.
81
            If not provided, a default SimpleOrchestratorConfig will be used.
82
        disable_message_queue (bool): Whether to disable deploying the message queue. Defaults to False.
83
        disable_control_plane (bool): Whether to disable deploying the control plane. Defaults to False.
84

85
    Raises:
86
        ValueError: If an unknown message queue type is specified in the config.
87
        Exception: If any of the launched tasks encounter an error.
88
    """
89
    control_plane_config = control_plane_config or ControlPlaneConfig()
×
90
    message_queue_config = message_queue_config or SimpleMessageQueueConfig()
×
91

UNCOV
92
    tasks = []
×
93

94
    message_queue_client = _get_message_queue_client(message_queue_config)
×
95
    # If needed, start the SimpleMessageQueueServer
96
    if (
×
97
        isinstance(message_queue_config, SimpleMessageQueueConfig)
98
        and not disable_message_queue
99
    ):
100
        queue = SimpleMessageQueueServer(message_queue_config)
×
101
        tasks.append(asyncio.create_task(queue.launch_server()))
×
102
        # let message queue boot up
103
        await asyncio.sleep(2)
×
104

105
    if not disable_control_plane:
×
106
        control_plane = ControlPlaneServer(
×
107
            message_queue_client, config=control_plane_config
108
        )
109
        tasks.append(asyncio.create_task(control_plane.launch_server()))
×
110
        # let service spin up
111
        await asyncio.sleep(2)
×
112
        # register the control plane as a consumer
113
        control_plane_consumer_fn = await control_plane.register_to_message_queue()
×
114
        tasks.append(asyncio.create_task(control_plane_consumer_fn()))
×
115

116
    # let things run
117
    try:
×
118
        await asyncio.gather(*tasks)
×
119
    except (Exception, asyncio.CancelledError):
×
120
        await message_queue_client.cleanup()
×
121
        for task in tasks:
×
122
            if not task.done():
×
123
                task.cancel()
×
124

125
        await asyncio.gather(*tasks, return_exceptions=True)
×
126

127

128
async def deploy_workflow(
2✔
129
    workflow: Workflow,
130
    workflow_config: WorkflowServiceConfig,
131
    control_plane_config: ControlPlaneConfig | None = None,
132
) -> None:
133
    """
134
    Deploy a workflow as a service within the llama_deploy system.
135

136
    This function sets up a workflow as a service, connects it to the message queue,
137
    and registers it with the control plane.
138

139
    Args:
140
        workflow (Workflow): The workflow to be deployed as a service.
141
        workflow_config (WorkflowServiceConfig): Configuration for the workflow service.
142
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
143

144
    Raises:
145
        httpx.HTTPError: If there's an error communicating with the control plane.
146
        ValueError: If an invalid message queue config is encountered.
147
        Exception: If any of the launched tasks encounter an error.
148
    """
149
    control_plane_config = control_plane_config or ControlPlaneConfig()
×
150
    control_plane_url = control_plane_config.url
×
151

152
    async with httpx.AsyncClient() as client:
×
153
        response = await client.get(f"{control_plane_url}/queue_config")
×
154
        queue_config_dict = response.json()
×
155

156
    message_queue_config = _get_message_queue_config(queue_config_dict)
×
157
    message_queue_client = _get_message_queue_client(message_queue_config)
×
158

159
    # override the service manager, while maintaining dict of existing services
160
    workflow._service_manager = NetworkServiceManager(
×
161
        workflow._service_manager._services
162
    )
163

164
    service = WorkflowService(
×
165
        workflow=workflow,
166
        message_queue=message_queue_client,
167
        config=workflow_config,
168
    )
169

170
    service_task = asyncio.create_task(service.launch_server())
×
171

172
    # let service spin up
173
    await asyncio.sleep(1)
×
174

175
    # register to control plane
176
    await service.register_to_control_plane(control_plane_url)
×
177

178
    # register to message queue
179
    consumer_fn = await service.register_to_message_queue()
×
180

181
    # create consumer task
182
    consumer_task = asyncio.create_task(consumer_fn())
×
183

184
    # let things sync up
185
    await asyncio.sleep(1)
×
186

187
    try:
×
188
        # Propagate the exception if any of the tasks exited with an error
189
        await asyncio.gather(service_task, consumer_task, return_exceptions=True)
×
190
    except asyncio.CancelledError:
×
191
        consumer_task.cancel()
×
192
        service_task.cancel()
×
193

194
        await asyncio.gather(service_task, consumer_task)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc