• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

run-llama / llama_deploy / 12686708977

09 Jan 2025 09:03AM UTC coverage: 77.444% (-0.07%) from 77.514%
12686708977

push

github

web-flow
refact: make topic configurable for SolaceMessageQueue (#424)

26 of 30 new or added lines in 3 files covered. (86.67%)

1 existing line in 1 file now uncovered.

2115 of 2731 relevant lines covered (77.44%)

0.77 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

18.09
/llama_deploy/deploy/deploy.py
1
import asyncio
1✔
2
from asyncio.exceptions import CancelledError
1✔
3

4
import httpx
1✔
5
from llama_index.core.workflow import Workflow
1✔
6
from pydantic_settings import BaseSettings
1✔
7

8
from llama_deploy.control_plane.server import ControlPlaneConfig, ControlPlaneServer
1✔
9
from llama_deploy.deploy.network_workflow import NetworkServiceManager
1✔
10
from llama_deploy.message_queues import (
1✔
11
    AWSMessageQueue,
12
    AWSMessageQueueConfig,
13
    BaseMessageQueue,
14
    KafkaMessageQueue,
15
    KafkaMessageQueueConfig,
16
    RabbitMQMessageQueue,
17
    RabbitMQMessageQueueConfig,
18
    RedisMessageQueue,
19
    RedisMessageQueueConfig,
20
    SimpleMessageQueueConfig,
21
    SimpleMessageQueueServer,
22
    SolaceMessageQueue,
23
    SolaceMessageQueueConfig,
24
)
25
from llama_deploy.message_queues.simple import SimpleMessageQueue
1✔
26
from llama_deploy.orchestrators.simple import (
1✔
27
    SimpleOrchestrator,
28
    SimpleOrchestratorConfig,
29
)
30
from llama_deploy.services.workflow import WorkflowService, WorkflowServiceConfig
1✔
31

32
DEFAULT_TIMEOUT = 120.0
1✔
33

34

35
async def _deploy_local_message_queue(config: SimpleMessageQueueConfig) -> asyncio.Task:
1✔
36
    queue = SimpleMessageQueueServer(config)
×
37
    task = asyncio.create_task(queue.launch_server())
×
38

39
    # let message queue boot up
40
    await asyncio.sleep(2)
×
41

42
    return task
×
43

44

45
def _get_message_queue_config(config_dict: dict) -> BaseSettings:
1✔
46
    key = next(iter(config_dict.keys()))
×
47
    if key == SimpleMessageQueueConfig.__name__:
×
48
        return SimpleMessageQueueConfig(**config_dict[key])
×
49
    elif key == AWSMessageQueueConfig.__name__:
×
50
        return AWSMessageQueueConfig(**config_dict[key])
×
51
    elif key == KafkaMessageQueueConfig.__name__:
×
52
        return KafkaMessageQueueConfig(**config_dict[key])
×
53
    elif key == RabbitMQMessageQueueConfig.__name__:
×
54
        return RabbitMQMessageQueueConfig(**config_dict[key])
×
55
    elif key == RedisMessageQueueConfig.__name__:
×
56
        return RedisMessageQueueConfig(**config_dict[key])
×
57
    elif key == SolaceMessageQueueConfig.__name__:
×
58
        return SolaceMessageQueueConfig(**config_dict[key])
×
59
    else:
60
        raise ValueError(f"Unknown message queue: {key}")
×
61

62

63
def _get_message_queue_client(config: BaseSettings) -> BaseMessageQueue:
1✔
64
    if isinstance(config, SimpleMessageQueueConfig):
×
65
        return SimpleMessageQueue(config)  # type: ignore
×
66
    elif isinstance(config, AWSMessageQueueConfig):
×
67
        return AWSMessageQueue(config)  # type: ignore
×
68
    elif isinstance(config, KafkaMessageQueueConfig):
×
69
        return KafkaMessageQueue(config)  # type: ignore
×
70
    elif isinstance(config, RabbitMQMessageQueueConfig):
×
71
        return RabbitMQMessageQueue(config)  # type: ignore
×
72
    elif isinstance(config, RedisMessageQueueConfig):
×
73
        return RedisMessageQueue(config)  # type: ignore
×
74
    elif isinstance(config, SolaceMessageQueueConfig):
×
NEW
75
        return SolaceMessageQueue(config)  # type: ignore
×
76
    else:
77
        raise ValueError(f"Invalid message queue config: {config}")
×
78

79

80
async def deploy_core(
1✔
81
    control_plane_config: ControlPlaneConfig | None = None,
82
    message_queue_config: BaseSettings | None = None,
83
    orchestrator_config: SimpleOrchestratorConfig | None = None,
84
    disable_message_queue: bool = False,
85
    disable_control_plane: bool = False,
86
) -> None:
87
    """
88
    Deploy the core components of the llama_deploy system.
89

90
    This function sets up and launches the message queue, control plane, and orchestrator.
91
    It handles the initialization and connection of these core components.
92

93
    Args:
94
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
95
        message_queue_config (Optional[BaseSettings]): Configuration for the message queue. Defaults to a local SimpleMessageQueue.
96
        orchestrator_config (Optional[SimpleOrchestratorConfig]): Configuration for the orchestrator.
97
            If not provided, a default SimpleOrchestratorConfig will be used.
98
        disable_message_queue (bool): Whether to disable deploying the message queue. Defaults to False.
99
        disable_control_plane (bool): Whether to disable deploying the control plane. Defaults to False.
100

101
    Raises:
102
        ValueError: If an unknown message queue type is specified in the config.
103
        Exception: If any of the launched tasks encounter an error.
104
    """
105
    control_plane_config = control_plane_config or ControlPlaneConfig()
×
106
    message_queue_config = message_queue_config or SimpleMessageQueueConfig()
×
107
    orchestrator_config = orchestrator_config or SimpleOrchestratorConfig()
×
108

109
    message_queue_client = _get_message_queue_client(message_queue_config)
×
110

111
    control_plane = ControlPlaneServer(
×
112
        message_queue_client,
113
        SimpleOrchestrator(**orchestrator_config.model_dump()),
114
        config=control_plane_config,
115
    )
116

117
    if disable_message_queue or not isinstance(
×
118
        message_queue_config, SimpleMessageQueueConfig
119
    ):
120
        # create a dummy task to keep the event loop running
121
        message_queue_task = asyncio.create_task(asyncio.sleep(0))
×
122
    else:
123
        message_queue_task = await _deploy_local_message_queue(message_queue_config)
×
124

125
    if not disable_control_plane:
×
126
        control_plane_task = asyncio.create_task(control_plane.launch_server())
×
127

128
        # let services spin up
129
        await asyncio.sleep(1)
×
130

131
        # register the control plane as a consumer
132
        control_plane_consumer_fn = await control_plane.register_to_message_queue()
×
133

134
        consumer_task = asyncio.create_task(control_plane_consumer_fn())
×
135
    else:
136
        # create a dummy task to keep the event loop running
137
        control_plane_task = asyncio.create_task(asyncio.sleep(0))
×
138
        consumer_task = asyncio.create_task(asyncio.sleep(0))
×
139

140
    # let things sync up
141
    await asyncio.sleep(1)
×
142

143
    # let things run
144
    try:
×
145
        await asyncio.gather(control_plane_task, consumer_task)
×
146
    except CancelledError:
×
147
        await message_queue_client.cleanup()
×
148
        message_queue_task.cancel()
×
149
        await message_queue_task
×
150
        # Propagate the exception if any of the tasks exited with an error
151
        for task in (control_plane_task, consumer_task, message_queue_task):
×
152
            if task.done() and task.exception():  # type: ignore
×
153
                raise task.exception()  # type: ignore
×
154

155

156
async def deploy_workflow(
1✔
157
    workflow: Workflow,
158
    workflow_config: WorkflowServiceConfig,
159
    control_plane_config: ControlPlaneConfig | None = None,
160
) -> None:
161
    """
162
    Deploy a workflow as a service within the llama_deploy system.
163

164
    This function sets up a workflow as a service, connects it to the message queue,
165
    and registers it with the control plane.
166

167
    Args:
168
        workflow (Workflow): The workflow to be deployed as a service.
169
        workflow_config (WorkflowServiceConfig): Configuration for the workflow service.
170
        control_plane_config (Optional[ControlPlaneConfig]): Configuration for the control plane.
171

172
    Raises:
173
        httpx.HTTPError: If there's an error communicating with the control plane.
174
        ValueError: If an invalid message queue config is encountered.
175
        Exception: If any of the launched tasks encounter an error.
176
    """
177
    control_plane_config = control_plane_config or ControlPlaneConfig()
×
178
    control_plane_url = control_plane_config.url
×
179

180
    async with httpx.AsyncClient() as client:
×
181
        response = await client.get(f"{control_plane_url}/queue_config")
×
182
        queue_config_dict = response.json()
×
183

184
    message_queue_config = _get_message_queue_config(queue_config_dict)
×
185
    message_queue_client = _get_message_queue_client(message_queue_config)
×
186

187
    # override the service manager, while maintaining dict of existing services
188
    workflow._service_manager = NetworkServiceManager(
×
189
        control_plane_config, workflow._service_manager._services
190
    )
191

192
    service = WorkflowService(
×
193
        workflow=workflow,
194
        message_queue=message_queue_client,
195
        **workflow_config.model_dump(),
196
    )
197

198
    service_task = asyncio.create_task(service.launch_server())
×
199

200
    # let service spin up
201
    await asyncio.sleep(1)
×
202

203
    # register to control plane
204
    control_plane_url = (
×
205
        f"http://{control_plane_config.host}:{control_plane_config.port}"
206
    )
207
    await service.register_to_control_plane(control_plane_url)
×
208

209
    # register to message queue
210
    consumer_fn = await service.register_to_message_queue()
×
211

212
    # create consumer task
213
    consumer_task = asyncio.create_task(consumer_fn())
×
214

215
    # let things sync up
216
    await asyncio.sleep(1)
×
217

218
    all_tasks = [consumer_task, service_task]
×
219

220
    await asyncio.gather(*all_tasks)
×
221
    # Propagate the exception if any of the tasks exited with an error
222
    for task in all_tasks:
×
223
        if task.done() and task.exception():  # type: ignore
×
224
            raise task.exception()  # type: ignore
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc