• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

run-llama / llama_deploy / 12434673142

20 Dec 2024 04:10PM UTC coverage: 71.153% (-2.7%) from 73.849%
12434673142

Pull #413

github

web-flow
Merge 94d9de762 into 9a14319e1
Pull Request #413: refact: Simple message queue refactoring

121 of 126 new or added lines in 14 files covered. (96.03%)

124 existing lines in 4 files now uncovered.

2622 of 3685 relevant lines covered (71.15%)

0.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.35
/llama_deploy/deploy/deploy.py
1
import asyncio
1✔
2
import signal
1✔
3
import sys
1✔
4
from typing import Any, Callable, List, Optional
1✔
5

6
import httpx
1✔
7
from llama_index.core.workflow import Workflow
1✔
8
from pydantic_settings import BaseSettings
1✔
9

10
from llama_deploy.control_plane.server import ControlPlaneConfig, ControlPlaneServer
1✔
11
from llama_deploy.deploy.network_workflow import NetworkServiceManager
1✔
12
from llama_deploy.message_queues import (
1✔
13
    AWSMessageQueue,
14
    AWSMessageQueueConfig,
15
    BaseMessageQueue,
16
    KafkaMessageQueue,
17
    KafkaMessageQueueConfig,
18
    RabbitMQMessageQueue,
19
    RabbitMQMessageQueueConfig,
20
    RedisMessageQueue,
21
    RedisMessageQueueConfig,
22
    SimpleMessageQueueConfig,
23
    SimpleMessageQueueServer,
24
    SolaceMessageQueue,
25
    SolaceMessageQueueConfig,
26
)
27
from llama_deploy.message_queues.simple import SimpleMessageQueue
1✔
28
from llama_deploy.orchestrators.simple import (
1✔
29
    SimpleOrchestrator,
30
    SimpleOrchestratorConfig,
31
)
32
from llama_deploy.services.workflow import WorkflowService, WorkflowServiceConfig
1✔
33

34
DEFAULT_TIMEOUT = 120.0
1✔
35

36

37
async def _deploy_local_message_queue(config: SimpleMessageQueueConfig) -> asyncio.Task:
1✔
NEW
38
    queue = SimpleMessageQueueServer(config)
×
39
    task = asyncio.create_task(queue.launch_server())
×
40

41
    # let message queue boot up
42
    await asyncio.sleep(2)
×
43

44
    return task
×
45

46

47
def _get_message_queue_config(config_dict: dict) -> BaseSettings:
1✔
48
    key = next(iter(config_dict.keys()))
×
49
    if key == SimpleMessageQueueConfig.__name__:
×
50
        return SimpleMessageQueueConfig(**config_dict[key])
×
51
    elif key == AWSMessageQueueConfig.__name__:
×
52
        return AWSMessageQueueConfig(**config_dict[key])
×
53
    elif key == KafkaMessageQueueConfig.__name__:
×
54
        return KafkaMessageQueueConfig(**config_dict[key])
×
55
    elif key == RabbitMQMessageQueueConfig.__name__:
×
56
        return RabbitMQMessageQueueConfig(**config_dict[key])
×
57
    elif key == RedisMessageQueueConfig.__name__:
×
58
        return RedisMessageQueueConfig(**config_dict[key])
×
59
    elif key == SolaceMessageQueueConfig.__name__:
×
60
        return SolaceMessageQueueConfig(**config_dict[key])
×
61
    else:
62
        raise ValueError(f"Unknown message queue: {key}")
×
63

64

65
def _get_message_queue_client(config: BaseSettings) -> BaseMessageQueue:
1✔
66
    if isinstance(config, SimpleMessageQueueConfig):
×
NEW
67
        return SimpleMessageQueue(config)  # type: ignore
×
68
    elif isinstance(config, AWSMessageQueueConfig):
×
69
        return AWSMessageQueue(**config.model_dump())
×
70
    elif isinstance(config, KafkaMessageQueueConfig):
×
71
        return KafkaMessageQueue(config)  # type: ignore
×
72
    elif isinstance(config, RabbitMQMessageQueueConfig):
×
73
        return RabbitMQMessageQueue(config)  # type: ignore
×
74
    elif isinstance(config, RedisMessageQueueConfig):
×
75
        return RedisMessageQueue(
×
76
            **config.model_dump(),
77
        )
78
    elif isinstance(config, SolaceMessageQueueConfig):
×
79
        return SolaceMessageQueue(
×
80
            **config.model_dump(),
81
        )
82
    else:
83
        raise ValueError(f"Invalid message queue config: {config}")
×
84

85

86
def _get_shutdown_handler(tasks: List[asyncio.Task]) -> Callable:
1✔
87
    def signal_handler(sig: Any, frame: Any) -> None:
×
88
        print("\nShutting down.")
×
89
        for task in tasks:
×
90
            task.cancel()
×
91
        sys.exit(0)
×
92

93
    return signal_handler
×
94

95

96
async def deploy_core(
1✔
97
    control_plane_config: Optional[ControlPlaneConfig] = None,
98
    message_queue_config: Optional[BaseSettings] = None,
99
    orchestrator_config: Optional[SimpleOrchestratorConfig] = None,
100
    disable_message_queue: bool = False,
101
    disable_control_plane: bool = False,
102
) -> None:
103
    """
104
    Deploy the core components of the llama_deploy system.
105

106
    This function sets up and launches the message queue, control plane, and orchestrator.
107
    It handles the initialization and connection of these core components.
108

109
    Args:
110
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
111
        message_queue_config (Optional[BaseSettings]): Configuration for the message queue. Defaults to a local SimpleMessageQueue.
112
        orchestrator_config (Optional[SimpleOrchestratorConfig]): Configuration for the orchestrator.
113
            If not provided, a default SimpleOrchestratorConfig will be used.
114
        disable_message_queue (bool): Whether to disable deploying the message queue. Defaults to False.
115
        disable_control_plane (bool): Whether to disable deploying the control plane. Defaults to False.
116

117
    Raises:
118
        ValueError: If an unknown message queue type is specified in the config.
119
        Exception: If any of the launched tasks encounter an error.
120
    """
121
    control_plane_config = control_plane_config or ControlPlaneConfig()
×
122
    message_queue_config = message_queue_config or SimpleMessageQueueConfig()
×
123
    orchestrator_config = orchestrator_config or SimpleOrchestratorConfig()
×
124

125
    message_queue_client = _get_message_queue_client(message_queue_config)
×
126

127
    control_plane = ControlPlaneServer(
×
128
        message_queue_client,
129
        SimpleOrchestrator(**orchestrator_config.model_dump()),
130
        config=control_plane_config,
131
    )
132

133
    if (
×
134
        isinstance(message_queue_config, SimpleMessageQueueConfig)
135
        and not disable_message_queue
136
    ):
137
        message_queue_task = await _deploy_local_message_queue(message_queue_config)
×
138
    elif (
×
139
        isinstance(message_queue_config, SimpleMessageQueueConfig)
140
        and disable_message_queue
141
    ):
142
        # create a dummy task to keep the event loop running
143
        message_queue_task = asyncio.create_task(asyncio.sleep(0))
×
144
    else:
145
        message_queue_task = asyncio.create_task(asyncio.sleep(0))
×
146

147
    if not disable_control_plane:
×
148
        control_plane_task = asyncio.create_task(control_plane.launch_server())
×
149

150
        # let services spin up
151
        await asyncio.sleep(1)
×
152

153
        # register the control plane as a consumer
154
        control_plane_consumer_fn = await control_plane.register_to_message_queue()
×
155

156
        consumer_task = asyncio.create_task(control_plane_consumer_fn())
×
157
    else:
158
        # create a dummy task to keep the event loop running
159
        control_plane_task = asyncio.create_task(asyncio.sleep(0))
×
160
        consumer_task = asyncio.create_task(asyncio.sleep(0))
×
161

162
    # let things sync up
163
    await asyncio.sleep(1)
×
164

165
    # let things run
166
    all_tasks = [control_plane_task, consumer_task, message_queue_task]
×
167

168
    shutdown_handler = _get_shutdown_handler(all_tasks)
×
169
    loop = asyncio.get_event_loop()
×
170
    while loop.is_running():
×
171
        await asyncio.sleep(0.1)
×
172
        signal.signal(signal.SIGINT, shutdown_handler)
×
173

174
        for task in all_tasks:
×
175
            if task.done() and task.exception():  # type: ignore
×
176
                raise task.exception()  # type: ignore
×
177

178

179
async def deploy_workflow(
1✔
180
    workflow: Workflow,
181
    workflow_config: WorkflowServiceConfig,
182
    control_plane_config: Optional[ControlPlaneConfig] = None,
183
) -> None:
184
    """
185
    Deploy a workflow as a service within the llama_deploy system.
186

187
    This function sets up a workflow as a service, connects it to the message queue,
188
    and registers it with the control plane.
189

190
    Args:
191
        workflow (Workflow): The workflow to be deployed as a service.
192
        workflow_config (WorkflowServiceConfig): Configuration for the workflow service.
193
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
194

195
    Raises:
196
        httpx.HTTPError: If there's an error communicating with the control plane.
197
        ValueError: If an invalid message queue config is encountered.
198
        Exception: If any of the launched tasks encounter an error.
199
    """
200
    control_plane_config = control_plane_config or ControlPlaneConfig()
×
201
    control_plane_url = control_plane_config.url
×
202

203
    async with httpx.AsyncClient() as client:
×
204
        response = await client.get(f"{control_plane_url}/queue_config")
×
205
        queue_config_dict = response.json()
×
206

207
    message_queue_config = _get_message_queue_config(queue_config_dict)
×
208
    message_queue_client = _get_message_queue_client(message_queue_config)
×
209

210
    # override the service manager, while maintaining dict of existing services
211
    workflow._service_manager = NetworkServiceManager(
×
212
        control_plane_config, workflow._service_manager._services
213
    )
214

215
    service = WorkflowService(
×
216
        workflow=workflow,
217
        message_queue=message_queue_client,
218
        **workflow_config.model_dump(),
219
    )
220

221
    service_task = asyncio.create_task(service.launch_server())
×
222

223
    # let service spin up
224
    await asyncio.sleep(1)
×
225

226
    # register to control plane
227
    control_plane_url = (
×
228
        f"http://{control_plane_config.host}:{control_plane_config.port}"
229
    )
230
    await service.register_to_control_plane(control_plane_url)
×
231

232
    # register to message queue
233
    consumer_fn = await service.register_to_message_queue()
×
234

235
    # create consumer task
236
    consumer_task = asyncio.create_task(consumer_fn())
×
237

238
    # let things sync up
239
    await asyncio.sleep(1)
×
240

241
    all_tasks = [consumer_task, service_task]
×
242

243
    shutdown_handler = _get_shutdown_handler(all_tasks)
×
244
    loop = asyncio.get_event_loop()
×
245
    while loop.is_running():
×
246
        await asyncio.sleep(0.1)
×
247
        signal.signal(signal.SIGINT, shutdown_handler)
×
248

249
        for task in all_tasks:
×
250
            if task.done() and task.exception():  # type: ignore
×
251
                raise task.exception()  # type: ignore
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc