• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

run-llama / llama_deploy / 15236931540

25 May 2025 10:29AM UTC coverage: 83.037% (-0.6%) from 83.616%
15236931540

Pull #511

github

web-flow
Merge 618bc1411 into f31124175
Pull Request #511: feat: refactor message queue communication strategy

75 of 140 new or added lines in 10 files covered. (53.57%)

35 existing lines in 7 files now uncovered.

2379 of 2865 relevant lines covered (83.04%)

1.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

17.72
/llama_deploy/deploy/deploy.py
1
import asyncio
2✔
2

3
import httpx
2✔
4
from llama_index.core.workflow import Workflow
2✔
5
from pydantic_settings import BaseSettings
2✔
6

7
from llama_deploy.control_plane.server import ControlPlaneConfig, ControlPlaneServer
2✔
8
from llama_deploy.deploy.network_workflow import NetworkServiceManager
2✔
9
from llama_deploy.message_queues import (
2✔
10
    AbstractMessageQueue,
11
    AWSMessageQueue,
12
    AWSMessageQueueConfig,
13
    KafkaMessageQueue,
14
    KafkaMessageQueueConfig,
15
    RabbitMQMessageQueue,
16
    RabbitMQMessageQueueConfig,
17
    RedisMessageQueue,
18
    RedisMessageQueueConfig,
19
    SimpleMessageQueueConfig,
20
    SimpleMessageQueueServer,
21
    SolaceMessageQueue,
22
    SolaceMessageQueueConfig,
23
)
24
from llama_deploy.message_queues.simple import SimpleMessageQueue
2✔
25
from llama_deploy.services.workflow import WorkflowService, WorkflowServiceConfig
2✔
26

27
DEFAULT_TIMEOUT = 120.0
2✔
28

29

30
def _get_message_queue_config(config_dict: dict) -> BaseSettings:
2✔
31
    key = next(iter(config_dict.keys()))
×
32
    if key == SimpleMessageQueueConfig.__name__:
×
33
        return SimpleMessageQueueConfig(**config_dict[key])
×
34
    elif key == AWSMessageQueueConfig.__name__:
×
35
        return AWSMessageQueueConfig(**config_dict[key])
×
36
    elif key == KafkaMessageQueueConfig.__name__:
×
37
        return KafkaMessageQueueConfig(**config_dict[key])
×
38
    elif key == RabbitMQMessageQueueConfig.__name__:
×
39
        return RabbitMQMessageQueueConfig(**config_dict[key])
×
40
    elif key == RedisMessageQueueConfig.__name__:
×
41
        return RedisMessageQueueConfig(**config_dict[key])
×
42
    elif key == SolaceMessageQueueConfig.__name__:
×
43
        return SolaceMessageQueueConfig(**config_dict[key])
×
44
    else:
45
        raise ValueError(f"Unknown message queue: {key}")
×
46

47

48
def _get_message_queue_client(config: BaseSettings) -> AbstractMessageQueue:
2✔
49
    if isinstance(config, SimpleMessageQueueConfig):
×
50
        return SimpleMessageQueue(config)
×
51
    elif isinstance(config, AWSMessageQueueConfig):
×
52
        return AWSMessageQueue(config)
×
53
    elif isinstance(config, KafkaMessageQueueConfig):
×
54
        return KafkaMessageQueue(config)
×
55
    elif isinstance(config, RabbitMQMessageQueueConfig):
×
56
        return RabbitMQMessageQueue(config)
×
57
    elif isinstance(config, RedisMessageQueueConfig):
×
58
        return RedisMessageQueue(config)
×
59
    elif isinstance(config, SolaceMessageQueueConfig):
×
60
        return SolaceMessageQueue(config)
×
61
    else:
62
        raise ValueError(f"Invalid message queue config: {config}")
×
63

64

65
async def deploy_core(
2✔
66
    control_plane_config: ControlPlaneConfig | None = None,
67
    message_queue_config: BaseSettings | None = None,
68
    disable_message_queue: bool = False,
69
    disable_control_plane: bool = False,
70
) -> None:
71
    """
72
    Deploy the core components of the llama_deploy system.
73

74
    This function sets up and launches the message queue, control plane, and orchestrator.
75
    It handles the initialization and connection of these core components.
76

77
    Args:
78
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
79
        message_queue_config (Optional[BaseSettings]): Configuration for the message queue. Defaults to a local SimpleMessageQueue.
80
        orchestrator_config (Optional[SimpleOrchestratorConfig]): Configuration for the orchestrator.
81
            If not provided, a default SimpleOrchestratorConfig will be used.
82
        disable_message_queue (bool): Whether to disable deploying the message queue. Defaults to False.
83
        disable_control_plane (bool): Whether to disable deploying the control plane. Defaults to False.
84

85
    Raises:
86
        ValueError: If an unknown message queue type is specified in the config.
87
        Exception: If any of the launched tasks encounter an error.
88
    """
89
    control_plane_config = control_plane_config or ControlPlaneConfig()
×
90
    message_queue_config = message_queue_config or SimpleMessageQueueConfig()
×
91

92
    tasks = []
×
93

94
    message_queue_client = _get_message_queue_client(message_queue_config)
×
95
    # If needed, start the SimpleMessageQueueServer
96
    if (
×
97
        isinstance(message_queue_config, SimpleMessageQueueConfig)
98
        and not disable_message_queue
99
    ):
100
        queue = SimpleMessageQueueServer(message_queue_config)
×
101
        tasks.append(asyncio.create_task(queue.launch_server()))
×
102
        # let message queue boot up
103
        await asyncio.sleep(2)
×
104

105
    if not disable_control_plane:
×
106
        control_plane = ControlPlaneServer(
×
107
            message_queue_client, config=control_plane_config
108
        )
109
        tasks.append(asyncio.create_task(control_plane.launch_server()))
×
110
        # let service spin up
111
        await asyncio.sleep(4)
×
112

113
    # let things run
114
    try:
×
115
        await asyncio.gather(*tasks)
×
116
    except (Exception, asyncio.CancelledError):
×
117
        await message_queue_client.cleanup()
×
118
        for task in tasks:
×
119
            if not task.done():
×
120
                task.cancel()
×
121

122
        await asyncio.gather(*tasks, return_exceptions=True)
×
123

124

125
async def deploy_workflow(
2✔
126
    workflow: Workflow,
127
    workflow_config: WorkflowServiceConfig,
128
    control_plane_config: ControlPlaneConfig | None = None,
129
) -> None:
130
    """
131
    Deploy a workflow as a service within the llama_deploy system.
132

133
    This function sets up a workflow as a service, connects it to the message queue,
134
    and registers it with the control plane.
135

136
    Args:
137
        workflow (Workflow): The workflow to be deployed as a service.
138
        workflow_config (WorkflowServiceConfig): Configuration for the workflow service.
139
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
140

141
    Raises:
142
        httpx.HTTPError: If there's an error communicating with the control plane.
143
        ValueError: If an invalid message queue config is encountered.
144
        Exception: If any of the launched tasks encounter an error.
145
    """
146
    control_plane_config = control_plane_config or ControlPlaneConfig()
×
147
    control_plane_url = control_plane_config.url
×
148

149
    async with httpx.AsyncClient() as client:
×
150
        response = await client.get(f"{control_plane_url}/queue_config")
×
151
        queue_config_dict = response.json()
×
152

153
    message_queue_config = _get_message_queue_config(queue_config_dict)
×
154
    message_queue_client = _get_message_queue_client(message_queue_config)
×
155

156
    # override the service manager, while maintaining dict of existing services
157
    workflow._service_manager = NetworkServiceManager(
×
158
        workflow._service_manager._services
159
    )
160

161
    service = WorkflowService(
×
162
        workflow=workflow,
163
        message_queue=message_queue_client,
164
        config=workflow_config,
165
    )
166

167
    # register to control plane
NEW
168
    await service.register_to_control_plane(control_plane_url)
×
169

UNCOV
170
    service_task = asyncio.create_task(service.launch_server())
×
171

172
    # let service spin up
173
    await asyncio.sleep(1)
×
174

175
    # register to message queue
176
    # consumer_fn = await service.register_to_message_queue()
177

178
    # create consumer task
179
    # consumer_task = asyncio.create_task(consumer_fn())
180

181
    # let things sync up
182
    await asyncio.sleep(1)
×
183

184
    try:
×
185
        # Propagate the exception if any of the tasks exited with an error
NEW
186
        await asyncio.gather(service_task, return_exceptions=True)
×
187
    except asyncio.CancelledError:
×
188
        service_task.cancel()
×
189

NEW
190
        await asyncio.gather(service_task)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc