• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

run-llama / llama_deploy / 12927070927

23 Jan 2025 10:20AM UTC coverage: 77.843% (+0.2%) from 77.68%
12927070927

Pull #437

github

web-flow
Merge 3f72a7327 into 8722c8af0
Pull Request #437: fix: properly support disable_control_plane in deploy() function

0 of 13 new or added lines in 1 file covered. (0.0%)

21 existing lines in 1 file now uncovered.

2108 of 2708 relevant lines covered (77.84%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.6
/llama_deploy/deploy/deploy.py
1
import asyncio
1✔
2
from asyncio.exceptions import CancelledError
1✔
3

4
import httpx
1✔
5
from llama_index.core.workflow import Workflow
1✔
6
from pydantic_settings import BaseSettings
1✔
7

8
from llama_deploy.control_plane.server import ControlPlaneConfig, ControlPlaneServer
1✔
9
from llama_deploy.deploy.network_workflow import NetworkServiceManager
1✔
10
from llama_deploy.message_queues import (
1✔
11
    AbstractMessageQueue,
12
    AWSMessageQueue,
13
    AWSMessageQueueConfig,
14
    KafkaMessageQueue,
15
    KafkaMessageQueueConfig,
16
    RabbitMQMessageQueue,
17
    RabbitMQMessageQueueConfig,
18
    RedisMessageQueue,
19
    RedisMessageQueueConfig,
20
    SimpleMessageQueueConfig,
21
    SimpleMessageQueueServer,
22
    SolaceMessageQueue,
23
    SolaceMessageQueueConfig,
24
)
25
from llama_deploy.message_queues.simple import SimpleMessageQueue
1✔
26
from llama_deploy.orchestrators.simple import (
1✔
27
    SimpleOrchestrator,
28
    SimpleOrchestratorConfig,
29
)
30
from llama_deploy.services.workflow import WorkflowService, WorkflowServiceConfig
1✔
31

32
DEFAULT_TIMEOUT = 120.0
1✔
33

34

35
def _get_message_queue_config(config_dict: dict) -> BaseSettings:
1✔
36
    key = next(iter(config_dict.keys()))
×
37
    if key == SimpleMessageQueueConfig.__name__:
×
38
        return SimpleMessageQueueConfig(**config_dict[key])
×
39
    elif key == AWSMessageQueueConfig.__name__:
×
40
        return AWSMessageQueueConfig(**config_dict[key])
×
41
    elif key == KafkaMessageQueueConfig.__name__:
×
42
        return KafkaMessageQueueConfig(**config_dict[key])
×
43
    elif key == RabbitMQMessageQueueConfig.__name__:
×
44
        return RabbitMQMessageQueueConfig(**config_dict[key])
×
45
    elif key == RedisMessageQueueConfig.__name__:
×
46
        return RedisMessageQueueConfig(**config_dict[key])
×
47
    elif key == SolaceMessageQueueConfig.__name__:
×
UNCOV
48
        return SolaceMessageQueueConfig(**config_dict[key])
×
49
    else:
UNCOV
50
        raise ValueError(f"Unknown message queue: {key}")
×
51

52

53
def _get_message_queue_client(config: BaseSettings) -> AbstractMessageQueue:
1✔
54
    if isinstance(config, SimpleMessageQueueConfig):
×
55
        return SimpleMessageQueue(config)
×
56
    elif isinstance(config, AWSMessageQueueConfig):
×
57
        return AWSMessageQueue(config)
×
58
    elif isinstance(config, KafkaMessageQueueConfig):
×
59
        return KafkaMessageQueue(config)
×
60
    elif isinstance(config, RabbitMQMessageQueueConfig):
×
61
        return RabbitMQMessageQueue(config)
×
62
    elif isinstance(config, RedisMessageQueueConfig):
×
63
        return RedisMessageQueue(config)
×
64
    elif isinstance(config, SolaceMessageQueueConfig):
×
UNCOV
65
        return SolaceMessageQueue(config)
×
66
    else:
UNCOV
67
        raise ValueError(f"Invalid message queue config: {config}")
×
68

69

70
async def deploy_core(
1✔
71
    control_plane_config: ControlPlaneConfig | None = None,
72
    message_queue_config: BaseSettings | None = None,
73
    orchestrator_config: SimpleOrchestratorConfig | None = None,
74
    disable_message_queue: bool = False,
75
    disable_control_plane: bool = False,
76
) -> None:
77
    """
78
    Deploy the core components of the llama_deploy system.
79

80
    This function sets up and launches the message queue, control plane, and orchestrator.
81
    It handles the initialization and connection of these core components.
82

83
    Args:
84
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
85
        message_queue_config (Optional[BaseSettings]): Configuration for the message queue. Defaults to a local SimpleMessageQueue.
86
        orchestrator_config (Optional[SimpleOrchestratorConfig]): Configuration for the orchestrator.
87
            If not provided, a default SimpleOrchestratorConfig will be used.
88
        disable_message_queue (bool): Whether to disable deploying the message queue. Defaults to False.
89
        disable_control_plane (bool): Whether to disable deploying the control plane. Defaults to False.
90

91
    Raises:
92
        ValueError: If an unknown message queue type is specified in the config.
93
        Exception: If any of the launched tasks encounter an error.
94
    """
95
    control_plane_config = control_plane_config or ControlPlaneConfig()
×
96
    message_queue_config = message_queue_config or SimpleMessageQueueConfig()
×
UNCOV
97
    orchestrator_config = orchestrator_config or SimpleOrchestratorConfig()
×
98

NEW
UNCOV
99
    tasks = []
×
100

NEW
101
    message_queue_client = _get_message_queue_client(message_queue_config)
×
102
    # If needed, start the SimpleMessageQueueServer
NEW
103
    if (
×
104
        isinstance(message_queue_config, SimpleMessageQueueConfig)
105
        and not disable_message_queue
106
    ):
NEW
107
        queue = SimpleMessageQueueServer(message_queue_config)
×
NEW
108
        tasks.append(asyncio.create_task(queue.launch_server()))
×
109
        # let message queue boot up
NEW
110
        await asyncio.sleep(2)
×
111

112
    if not disable_control_plane:
×
NEW
113
        control_plane = ControlPlaneServer(
×
114
            message_queue_client,
115
            SimpleOrchestrator(**orchestrator_config.model_dump()),
116
            config=control_plane_config,
117
        )
NEW
118
        tasks.append(asyncio.create_task(control_plane.launch_server()))
×
119
        # let service spin up
NEW
120
        await asyncio.sleep(2)
×
121
        # register the control plane as a consumer
122
        control_plane_consumer_fn = await control_plane.register_to_message_queue()
×
NEW
123
        tasks.append(asyncio.create_task(control_plane_consumer_fn()))
×
124

125
    # let things run
126
    try:
×
NEW
127
        await asyncio.gather(*tasks)
×
128
    except CancelledError:
×
129
        await message_queue_client.cleanup()
×
130

131
        # Propagate the exception if any of the tasks exited with an error
NEW
132
        for task in tasks:
×
133
            if task.done() and task.exception():  # type: ignore
×
UNCOV
134
                raise task.exception()  # type: ignore
×
NEW
UNCOV
135
            await task
×
136

137

138
async def deploy_workflow(
1✔
139
    workflow: Workflow,
140
    workflow_config: WorkflowServiceConfig,
141
    control_plane_config: ControlPlaneConfig | None = None,
142
) -> None:
143
    """
144
    Deploy a workflow as a service within the llama_deploy system.
145

146
    This function sets up a workflow as a service, connects it to the message queue,
147
    and registers it with the control plane.
148

149
    Args:
150
        workflow (Workflow): The workflow to be deployed as a service.
151
        workflow_config (WorkflowServiceConfig): Configuration for the workflow service.
152
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
153

154
    Raises:
155
        httpx.HTTPError: If there's an error communicating with the control plane.
156
        ValueError: If an invalid message queue config is encountered.
157
        Exception: If any of the launched tasks encounter an error.
158
    """
UNCOV
159
    control_plane_config = control_plane_config or ControlPlaneConfig()
×
160
    control_plane_url = control_plane_config.url
×
161

162
    async with httpx.AsyncClient() as client:
×
UNCOV
163
        response = await client.get(f"{control_plane_url}/queue_config")
×
164
        queue_config_dict = response.json()
×
165

UNCOV
166
    message_queue_config = _get_message_queue_config(queue_config_dict)
×
UNCOV
167
    message_queue_client = _get_message_queue_client(message_queue_config)
×
168

169
    # override the service manager, while maintaining dict of existing services
UNCOV
170
    workflow._service_manager = NetworkServiceManager(
×
171
        control_plane_config, workflow._service_manager._services
172
    )
173

UNCOV
174
    service = WorkflowService(
×
175
        workflow=workflow,
176
        message_queue=message_queue_client,
177
        **workflow_config.model_dump(),
178
    )
179

UNCOV
180
    service_task = asyncio.create_task(service.launch_server())
×
181

182
    # let service spin up
UNCOV
183
    await asyncio.sleep(1)
×
184

185
    # register to control plane
UNCOV
186
    await service.register_to_control_plane(control_plane_url)
×
187

188
    # register to message queue
UNCOV
189
    consumer_fn = await service.register_to_message_queue()
×
190

191
    # create consumer task
UNCOV
192
    consumer_task = asyncio.create_task(consumer_fn())
×
193

194
    # let things sync up
195
    await asyncio.sleep(1)
×
196

197
    all_tasks = [consumer_task, service_task]
×
198

199
    await asyncio.gather(*all_tasks)
×
200
    # Propagate the exception if any of the tasks exited with an error
201
    for task in all_tasks:
×
UNCOV
202
        if task.done() and task.exception():  # type: ignore
×
UNCOV
203
            raise task.exception()  # type: ignore
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc