• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

run-llama / llama_deploy / 12035989065

26 Nov 2024 05:33PM UTC coverage: 72.155% (+0.005%) from 72.15%
12035989065

push

github

web-flow
fix: fix rabbitmq import errors (#386)

* fix rabbitmq imports

* forgot

* fix tests

7 of 15 new or added lines in 3 files covered. (46.67%)

2 existing lines in 2 files now uncovered.

2669 of 3699 relevant lines covered (72.15%)

0.72 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.86
/llama_deploy/deploy/deploy.py
1
import asyncio
1✔
2
import signal
1✔
3
import sys
1✔
4
from typing import Any, Callable, List, Optional
1✔
5

6
import httpx
1✔
7
from llama_index.core.workflow import Workflow
1✔
8
from pydantic_settings import BaseSettings
1✔
9

10
from llama_deploy.control_plane.server import ControlPlaneConfig, ControlPlaneServer
1✔
11
from llama_deploy.deploy.network_workflow import NetworkServiceManager
1✔
12
from llama_deploy.message_queues import (
1✔
13
    AWSMessageQueue,
14
    AWSMessageQueueConfig,
15
    BaseMessageQueue,
16
    KafkaMessageQueue,
17
    KafkaMessageQueueConfig,
18
    RabbitMQMessageQueue,
19
    RabbitMQMessageQueueConfig,
20
    RedisMessageQueue,
21
    RedisMessageQueueConfig,
22
    SimpleMessageQueue,
23
    SimpleMessageQueueConfig,
24
    SolaceMessageQueue,
25
    SolaceMessageQueueConfig,
26
)
27
from llama_deploy.message_queues.simple import SimpleRemoteClientMessageQueue
1✔
28
from llama_deploy.orchestrators.simple import (
1✔
29
    SimpleOrchestrator,
30
    SimpleOrchestratorConfig,
31
)
32
from llama_deploy.services.workflow import WorkflowService, WorkflowServiceConfig
1✔
33

34
DEFAULT_TIMEOUT = 120.0
1✔
35

36

37
async def _deploy_local_message_queue(config: SimpleMessageQueueConfig) -> asyncio.Task:
1✔
38
    queue = SimpleMessageQueue(**config.model_dump())
×
39
    task = asyncio.create_task(queue.launch_server())
×
40

41
    # let message queue boot up
42
    await asyncio.sleep(2)
×
43

44
    return task
×
45

46

47
def _get_message_queue_config(config_dict: dict) -> BaseSettings:
1✔
48
    key = next(iter(config_dict.keys()))
×
49
    if key == SimpleMessageQueueConfig.__name__:
×
50
        return SimpleMessageQueueConfig(**config_dict[key])
×
51
    elif key == SimpleRemoteClientMessageQueue.__name__:
×
52
        return SimpleMessageQueueConfig(**config_dict[key])
×
53
    elif key == AWSMessageQueueConfig.__name__:
×
54
        return AWSMessageQueueConfig(**config_dict[key])
×
55
    elif key == KafkaMessageQueueConfig.__name__:
×
56
        return KafkaMessageQueueConfig(**config_dict[key])
×
57
    elif key == RabbitMQMessageQueueConfig.__name__:
×
58
        return RabbitMQMessageQueueConfig(**config_dict[key])
×
59
    elif key == RedisMessageQueueConfig.__name__:
×
60
        return RedisMessageQueueConfig(**config_dict[key])
×
61
    elif key == SolaceMessageQueueConfig.__name__:
×
62
        return SolaceMessageQueueConfig(**config_dict[key])
×
63
    else:
64
        raise ValueError(f"Unknown message queue: {key}")
×
65

66

67
def _get_message_queue_client(config: BaseSettings) -> BaseMessageQueue:
1✔
68
    if isinstance(config, SimpleMessageQueueConfig):
×
69
        queue = SimpleMessageQueue(**config.model_dump())
×
70
        return queue.client
×
71
    elif isinstance(config, AWSMessageQueueConfig):
×
72
        return AWSMessageQueue(**config.model_dump())
×
73
    elif isinstance(config, KafkaMessageQueueConfig):
×
74
        return KafkaMessageQueue(config)  # type: ignore
×
75
    elif isinstance(config, RabbitMQMessageQueueConfig):
×
NEW
76
        return RabbitMQMessageQueue(config)  # type: ignore
×
UNCOV
77
    elif isinstance(config, RedisMessageQueueConfig):
×
78
        return RedisMessageQueue(
×
79
            **config.model_dump(),
80
        )
81
    elif isinstance(config, SolaceMessageQueueConfig):
×
82
        return SolaceMessageQueue(
×
83
            **config.model_dump(),
84
        )
85
    else:
86
        raise ValueError(f"Invalid message queue config: {config}")
×
87

88

89
def _get_shutdown_handler(tasks: List[asyncio.Task]) -> Callable:
1✔
90
    def signal_handler(sig: Any, frame: Any) -> None:
×
91
        print("\nShutting down.")
×
92
        for task in tasks:
×
93
            task.cancel()
×
94
        sys.exit(0)
×
95

96
    return signal_handler
×
97

98

99
async def deploy_core(
1✔
100
    control_plane_config: Optional[ControlPlaneConfig] = None,
101
    message_queue_config: Optional[BaseSettings] = None,
102
    orchestrator_config: Optional[SimpleOrchestratorConfig] = None,
103
    disable_message_queue: bool = False,
104
    disable_control_plane: bool = False,
105
) -> None:
106
    """
107
    Deploy the core components of the llama_deploy system.
108

109
    This function sets up and launches the message queue, control plane, and orchestrator.
110
    It handles the initialization and connection of these core components.
111

112
    Args:
113
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
114
        message_queue_config (Optional[BaseSettings]): Configuration for the message queue. Defaults to a local SimpleMessageQueue.
115
        orchestrator_config (Optional[SimpleOrchestratorConfig]): Configuration for the orchestrator.
116
            If not provided, a default SimpleOrchestratorConfig will be used.
117
        disable_message_queue (bool): Whether to disable deploying the message queue. Defaults to False.
118
        disable_control_plane (bool): Whether to disable deploying the control plane. Defaults to False.
119

120
    Raises:
121
        ValueError: If an unknown message queue type is specified in the config.
122
        Exception: If any of the launched tasks encounter an error.
123
    """
124
    control_plane_config = control_plane_config or ControlPlaneConfig()
×
125
    message_queue_config = message_queue_config or SimpleMessageQueueConfig()
×
126
    orchestrator_config = orchestrator_config or SimpleOrchestratorConfig()
×
127

128
    message_queue_client = _get_message_queue_client(message_queue_config)
×
129

130
    control_plane = ControlPlaneServer(
×
131
        message_queue_client,
132
        SimpleOrchestrator(**orchestrator_config.model_dump()),
133
        config=control_plane_config,
134
    )
135

136
    if (
×
137
        isinstance(message_queue_config, SimpleMessageQueueConfig)
138
        and not disable_message_queue
139
    ):
140
        message_queue_task = await _deploy_local_message_queue(message_queue_config)
×
141
    elif (
×
142
        isinstance(message_queue_config, SimpleMessageQueueConfig)
143
        and disable_message_queue
144
    ):
145
        # create a dummy task to keep the event loop running
146
        message_queue_task = asyncio.create_task(asyncio.sleep(0))
×
147
    else:
148
        message_queue_task = asyncio.create_task(asyncio.sleep(0))
×
149

150
    if not disable_control_plane:
×
151
        control_plane_task = asyncio.create_task(control_plane.launch_server())
×
152

153
        # let services spin up
154
        await asyncio.sleep(1)
×
155

156
        # register the control plane as a consumer
157
        control_plane_consumer_fn = await control_plane.register_to_message_queue()
×
158

159
        consumer_task = asyncio.create_task(control_plane_consumer_fn())
×
160
    else:
161
        # create a dummy task to keep the event loop running
162
        control_plane_task = asyncio.create_task(asyncio.sleep(0))
×
163
        consumer_task = asyncio.create_task(asyncio.sleep(0))
×
164

165
    # let things sync up
166
    await asyncio.sleep(1)
×
167

168
    # let things run
169
    all_tasks = [control_plane_task, consumer_task, message_queue_task]
×
170

171
    shutdown_handler = _get_shutdown_handler(all_tasks)
×
172
    loop = asyncio.get_event_loop()
×
173
    while loop.is_running():
×
174
        await asyncio.sleep(0.1)
×
175
        signal.signal(signal.SIGINT, shutdown_handler)
×
176

177
        for task in all_tasks:
×
178
            if task.done() and task.exception():  # type: ignore
×
179
                raise task.exception()  # type: ignore
×
180

181

182
async def deploy_workflow(
1✔
183
    workflow: Workflow,
184
    workflow_config: WorkflowServiceConfig,
185
    control_plane_config: Optional[ControlPlaneConfig] = None,
186
) -> None:
187
    """
188
    Deploy a workflow as a service within the llama_deploy system.
189

190
    This function sets up a workflow as a service, connects it to the message queue,
191
    and registers it with the control plane.
192

193
    Args:
194
        workflow (Workflow): The workflow to be deployed as a service.
195
        workflow_config (WorkflowServiceConfig): Configuration for the workflow service.
196
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
197

198
    Raises:
199
        httpx.HTTPError: If there's an error communicating with the control plane.
200
        ValueError: If an invalid message queue config is encountered.
201
        Exception: If any of the launched tasks encounter an error.
202
    """
203
    control_plane_config = control_plane_config or ControlPlaneConfig()
×
204
    control_plane_url = control_plane_config.url
×
205

206
    async with httpx.AsyncClient() as client:
×
207
        response = await client.get(f"{control_plane_url}/queue_config")
×
208
        queue_config_dict = response.json()
×
209

210
    message_queue_config = _get_message_queue_config(queue_config_dict)
×
211
    message_queue_client = _get_message_queue_client(message_queue_config)
×
212

213
    # override the service manager, while maintaining dict of existing services
214
    workflow._service_manager = NetworkServiceManager(
×
215
        control_plane_config, workflow._service_manager._services
216
    )
217

218
    service = WorkflowService(
×
219
        workflow=workflow,
220
        message_queue=message_queue_client,
221
        **workflow_config.model_dump(),
222
    )
223

224
    service_task = asyncio.create_task(service.launch_server())
×
225

226
    # let service spin up
227
    await asyncio.sleep(1)
×
228

229
    # register to control plane
230
    control_plane_url = (
×
231
        f"http://{control_plane_config.host}:{control_plane_config.port}"
232
    )
233
    await service.register_to_control_plane(control_plane_url)
×
234

235
    # register to message queue
236
    consumer_fn = await service.register_to_message_queue()
×
237

238
    # create consumer task
239
    consumer_task = asyncio.create_task(consumer_fn())
×
240

241
    # let things sync up
242
    await asyncio.sleep(1)
×
243

244
    all_tasks = [consumer_task, service_task]
×
245

246
    shutdown_handler = _get_shutdown_handler(all_tasks)
×
247
    loop = asyncio.get_event_loop()
×
248
    while loop.is_running():
×
249
        await asyncio.sleep(0.1)
×
250
        signal.signal(signal.SIGINT, shutdown_handler)
×
251

252
        for task in all_tasks:
×
253
            if task.done() and task.exception():  # type: ignore
×
254
                raise task.exception()  # type: ignore
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc