• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

run-llama / llama_deploy / 12928339338

23 Jan 2025 11:35AM UTC coverage: 77.815% (+0.1%) from 77.68%
12928339338

Pull #437

github

web-flow
Merge 35174494b into 8722c8af0
Pull Request #437: fix: properly support disable_control_plane in deploy() function

0 of 14 new or added lines in 1 file covered. (0.0%)

18 existing lines in 1 file now uncovered.

2108 of 2709 relevant lines covered (77.81%)

0.78 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.39
/llama_deploy/deploy/deploy.py
1
import asyncio
1✔
2
from asyncio.exceptions import CancelledError
1✔
3

4
import httpx
1✔
5
from llama_index.core.workflow import Workflow
1✔
6
from pydantic_settings import BaseSettings
1✔
7

8
from llama_deploy.control_plane.server import ControlPlaneConfig, ControlPlaneServer
1✔
9
from llama_deploy.deploy.network_workflow import NetworkServiceManager
1✔
10
from llama_deploy.message_queues import (
1✔
11
    AbstractMessageQueue,
12
    AWSMessageQueue,
13
    AWSMessageQueueConfig,
14
    KafkaMessageQueue,
15
    KafkaMessageQueueConfig,
16
    RabbitMQMessageQueue,
17
    RabbitMQMessageQueueConfig,
18
    RedisMessageQueue,
19
    RedisMessageQueueConfig,
20
    SimpleMessageQueueConfig,
21
    SimpleMessageQueueServer,
22
    SolaceMessageQueue,
23
    SolaceMessageQueueConfig,
24
)
25
from llama_deploy.message_queues.simple import SimpleMessageQueue
1✔
26
from llama_deploy.orchestrators.simple import (
1✔
27
    SimpleOrchestrator,
28
    SimpleOrchestratorConfig,
29
)
30
from llama_deploy.services.workflow import WorkflowService, WorkflowServiceConfig
1✔
31

32
DEFAULT_TIMEOUT = 120.0
1✔
33

34

35
def _get_message_queue_config(config_dict: dict) -> BaseSettings:
1✔
36
    key = next(iter(config_dict.keys()))
×
37
    if key == SimpleMessageQueueConfig.__name__:
×
38
        return SimpleMessageQueueConfig(**config_dict[key])
×
39
    elif key == AWSMessageQueueConfig.__name__:
×
40
        return AWSMessageQueueConfig(**config_dict[key])
×
41
    elif key == KafkaMessageQueueConfig.__name__:
×
42
        return KafkaMessageQueueConfig(**config_dict[key])
×
43
    elif key == RabbitMQMessageQueueConfig.__name__:
×
44
        return RabbitMQMessageQueueConfig(**config_dict[key])
×
45
    elif key == RedisMessageQueueConfig.__name__:
×
46
        return RedisMessageQueueConfig(**config_dict[key])
×
47
    elif key == SolaceMessageQueueConfig.__name__:
×
UNCOV
48
        return SolaceMessageQueueConfig(**config_dict[key])
×
49
    else:
UNCOV
50
        raise ValueError(f"Unknown message queue: {key}")
×
51

52

53
def _get_message_queue_client(config: BaseSettings) -> AbstractMessageQueue:
1✔
54
    if isinstance(config, SimpleMessageQueueConfig):
×
55
        return SimpleMessageQueue(config)
×
56
    elif isinstance(config, AWSMessageQueueConfig):
×
57
        return AWSMessageQueue(config)
×
58
    elif isinstance(config, KafkaMessageQueueConfig):
×
59
        return KafkaMessageQueue(config)
×
60
    elif isinstance(config, RabbitMQMessageQueueConfig):
×
61
        return RabbitMQMessageQueue(config)
×
62
    elif isinstance(config, RedisMessageQueueConfig):
×
63
        return RedisMessageQueue(config)
×
64
    elif isinstance(config, SolaceMessageQueueConfig):
×
UNCOV
65
        return SolaceMessageQueue(config)
×
66
    else:
UNCOV
67
        raise ValueError(f"Invalid message queue config: {config}")
×
68

69

70
async def deploy_core(
1✔
71
    control_plane_config: ControlPlaneConfig | None = None,
72
    message_queue_config: BaseSettings | None = None,
73
    orchestrator_config: SimpleOrchestratorConfig | None = None,
74
    disable_message_queue: bool = False,
75
    disable_control_plane: bool = False,
76
) -> None:
77
    """
78
    Deploy the core components of the llama_deploy system.
79

80
    This function sets up and launches the message queue, control plane, and orchestrator.
81
    It handles the initialization and connection of these core components.
82

83
    Args:
84
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
85
        message_queue_config (Optional[BaseSettings]): Configuration for the message queue. Defaults to a local SimpleMessageQueue.
86
        orchestrator_config (Optional[SimpleOrchestratorConfig]): Configuration for the orchestrator.
87
            If not provided, a default SimpleOrchestratorConfig will be used.
88
        disable_message_queue (bool): Whether to disable deploying the message queue. Defaults to False.
89
        disable_control_plane (bool): Whether to disable deploying the control plane. Defaults to False.
90

91
    Raises:
92
        ValueError: If an unknown message queue type is specified in the config.
93
        Exception: If any of the launched tasks encounter an error.
94
    """
95
    control_plane_config = control_plane_config or ControlPlaneConfig()
×
96
    message_queue_config = message_queue_config or SimpleMessageQueueConfig()
×
UNCOV
97
    orchestrator_config = orchestrator_config or SimpleOrchestratorConfig()
×
98

NEW
UNCOV
99
    tasks = []
×
100

NEW
101
    message_queue_client = _get_message_queue_client(message_queue_config)
×
102
    # If needed, start the SimpleMessageQueueServer
NEW
103
    if (
×
104
        isinstance(message_queue_config, SimpleMessageQueueConfig)
105
        and not disable_message_queue
106
    ):
NEW
107
        queue = SimpleMessageQueueServer(message_queue_config)
×
NEW
108
        tasks.append(asyncio.create_task(queue.launch_server()))
×
109
        # let message queue boot up
NEW
110
        await asyncio.sleep(2)
×
111

112
    if not disable_control_plane:
×
NEW
113
        control_plane = ControlPlaneServer(
×
114
            message_queue_client,
115
            SimpleOrchestrator(**orchestrator_config.model_dump()),
116
            config=control_plane_config,
117
        )
NEW
118
        tasks.append(asyncio.create_task(control_plane.launch_server()))
×
119
        # let service spin up
NEW
120
        await asyncio.sleep(2)
×
121
        # register the control plane as a consumer
122
        control_plane_consumer_fn = await control_plane.register_to_message_queue()
×
NEW
123
        tasks.append(asyncio.create_task(control_plane_consumer_fn()))
×
124

125
    # let things run
126
    try:
×
NEW
127
        await asyncio.gather(*tasks)
×
128
    except CancelledError:
×
129
        await message_queue_client.cleanup()
×
130

131
        # Propagate the exception if any of the tasks exited with an error
NEW
132
        for task in tasks:
×
133
            if task.done() and task.exception():  # type: ignore
×
UNCOV
134
                raise task.exception()  # type: ignore
×
NEW
UNCOV
135
            task.cancel()
×
NEW
UNCOV
136
            await task
×
137

138

139
async def deploy_workflow(
1✔
140
    workflow: Workflow,
141
    workflow_config: WorkflowServiceConfig,
142
    control_plane_config: ControlPlaneConfig | None = None,
143
) -> None:
144
    """
145
    Deploy a workflow as a service within the llama_deploy system.
146

147
    This function sets up a workflow as a service, connects it to the message queue,
148
    and registers it with the control plane.
149

150
    Args:
151
        workflow (Workflow): The workflow to be deployed as a service.
152
        workflow_config (WorkflowServiceConfig): Configuration for the workflow service.
153
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
154

155
    Raises:
156
        httpx.HTTPError: If there's an error communicating with the control plane.
157
        ValueError: If an invalid message queue config is encountered.
158
        Exception: If any of the launched tasks encounter an error.
159
    """
160
    control_plane_config = control_plane_config or ControlPlaneConfig()
×
161
    control_plane_url = control_plane_config.url
×
162

UNCOV
163
    async with httpx.AsyncClient() as client:
×
164
        response = await client.get(f"{control_plane_url}/queue_config")
×
165
        queue_config_dict = response.json()
×
166

UNCOV
167
    message_queue_config = _get_message_queue_config(queue_config_dict)
×
168
    message_queue_client = _get_message_queue_client(message_queue_config)
×
169

170
    # override the service manager, while maintaining dict of existing services
UNCOV
171
    workflow._service_manager = NetworkServiceManager(
×
172
        control_plane_config, workflow._service_manager._services
173
    )
174

UNCOV
175
    service = WorkflowService(
×
176
        workflow=workflow,
177
        message_queue=message_queue_client,
178
        **workflow_config.model_dump(),
179
    )
180

181
    service_task = asyncio.create_task(service.launch_server())
×
182

183
    # let service spin up
184
    await asyncio.sleep(1)
×
185

186
    # register to control plane
187
    await service.register_to_control_plane(control_plane_url)
×
188

189
    # register to message queue
190
    consumer_fn = await service.register_to_message_queue()
×
191

192
    # create consumer task
193
    consumer_task = asyncio.create_task(consumer_fn())
×
194

195
    # let things sync up
UNCOV
196
    await asyncio.sleep(1)
×
197

UNCOV
198
    all_tasks = [consumer_task, service_task]
×
199

200
    await asyncio.gather(*all_tasks)
×
201
    # Propagate the exception if any of the tasks exited with an error
UNCOV
202
    for task in all_tasks:
×
UNCOV
203
        if task.done() and task.exception():  # type: ignore
×
UNCOV
204
            raise task.exception()  # type: ignore
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc