• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

madara-alliance / madara-orchestrator / 13278517719

12 Feb 2025 05:09AM UTC coverage: 67.173% (+0.09%) from 67.086%
13278517719

Pull #206

github

web-flow
Merge 23a840adb into afb9afeb7
Pull Request #206: Changes based on Griddy Chain

68 of 94 new or added lines in 9 files covered. (72.34%)

3 existing lines in 2 files now uncovered.

3184 of 4740 relevant lines covered (67.17%)

20801.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

20.41
/crates/orchestrator/src/queue/job_queue.rs
1
use std::future::Future;
2
use std::str::FromStr;
3
use std::sync::Arc;
4
use std::time::Duration;
5

6
use color_eyre::eyre::Context;
7
use color_eyre::Result as EyreResult;
8
use omniqueue::{Delivery, QueueError};
9
use serde::{Deserialize, Deserializer, Serialize};
10
use strum::Display;
11
use thiserror::Error;
12
use tokio::time::sleep;
13
use uuid::Uuid;
14

15
use super::QueueType;
16
use crate::config::Config;
17
use crate::jobs::types::JobType;
18
use crate::jobs::{handle_job_failure, process_job, verify_job, JobError, OtherError};
19
use crate::workers::data_submission_worker::DataSubmissionWorker;
20
use crate::workers::proof_registration::ProofRegistrationWorker;
21
use crate::workers::proving::ProvingWorker;
22
use crate::workers::snos::SnosWorker;
23
use crate::workers::update_state::UpdateStateWorker;
24
use crate::workers::Worker;
25

26
#[derive(Error, Debug, PartialEq)]
×
27
pub enum ConsumptionError {
28
    #[error("Failed to consume message from queue, error {error_msg:?}")]
29
    FailedToConsumeFromQueue { error_msg: String },
30

31
    #[error("Failed to handle job with id {job_id:?}. Error: {error_msg:?}")]
32
    FailedToHandleJob { job_id: Uuid, error_msg: String },
33

34
    #[error("Failed to spawn {worker_trigger_type:?} worker. Error: {error_msg:?}")]
35
    FailedToSpawnWorker { worker_trigger_type: WorkerTriggerType, error_msg: String },
36

37
    #[error("Other error: {0}")]
38
    Other(#[from] OtherError),
39
}
40

41
#[derive(Debug, Serialize, Deserialize)]
6✔
42
pub struct JobQueueMessage {
43
    pub id: Uuid,
44
}
45

46
#[derive(Debug, Serialize, Deserialize, PartialEq, Clone, Display)]
×
47
#[strum(serialize_all = "PascalCase")]
48
pub enum WorkerTriggerType {
49
    Snos,
50
    Proving,
51
    ProofRegistration,
52
    DataSubmission,
53
    UpdateState,
54
}
55

56
#[derive(Debug, Serialize, Clone)]
57
pub struct WorkerTriggerMessage {
58
    pub worker: WorkerTriggerType,
59
}
60

61
#[derive(Error, Debug)]
×
62
pub enum WorkerTriggerTypeError {
63
    #[error("Unknown WorkerTriggerType: {0}")]
64
    UnknownType(String),
65
}
66

67
impl FromStr for WorkerTriggerType {
68
    type Err = WorkerTriggerTypeError;
69

70
    fn from_str(s: &str) -> Result<Self, Self::Err> {
×
71
        match s {
×
72
            "Proving" => Ok(WorkerTriggerType::Proving),
×
73
            "Snos" => Ok(WorkerTriggerType::Snos),
×
74
            "ProofRegistration" => Ok(WorkerTriggerType::ProofRegistration),
×
75
            "DataSubmission" => Ok(WorkerTriggerType::DataSubmission),
×
76
            "UpdateState" => Ok(WorkerTriggerType::UpdateState),
×
77
            _ => Err(WorkerTriggerTypeError::UnknownType(s.to_string())),
×
78
        }
79
    }
×
80
}
81

82
// TODO : Need to check why serde deserializer was failing here.
83
// TODO : Remove this custom deserializer.
84
/// Implemented a custom deserializer as when using serde json deserializer
85
/// It was unable to deserialize the response from the event trigger.
86
impl<'de> Deserialize<'de> for WorkerTriggerMessage {
87
    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
×
88
    where
×
89
        D: Deserializer<'de>,
×
90
    {
×
91
        #[derive(Deserialize, Debug)]
×
92
        struct Helper {
93
            worker: String,
94
        }
95
        let helper = Helper::deserialize(deserializer)?;
×
96
        Ok(WorkerTriggerMessage {
97
            worker: WorkerTriggerType::from_str(&helper.worker).map_err(serde::de::Error::custom)?,
×
98
        })
99
    }
×
100
}
101

102
#[derive(Debug)]
103
enum DeliveryReturnType {
104
    Message(Delivery),
105
    NoMessage,
106
}
107

108
pub trait QueueNameForJobType {
109
    fn process_queue_name(&self) -> QueueType;
110
    fn verify_queue_name(&self) -> QueueType;
111
}
112

113
impl QueueNameForJobType for JobType {
114
    fn process_queue_name(&self) -> QueueType {
40✔
115
        match self {
40✔
116
            JobType::SnosRun => QueueType::SnosJobProcessing,
13✔
117
            JobType::ProofCreation => QueueType::ProvingJobProcessing,
10✔
118
            JobType::ProofRegistration => QueueType::ProofRegistrationJobProcessing,
×
119
            JobType::DataSubmission => QueueType::DataSubmissionJobProcessing,
14✔
120
            JobType::StateTransition => QueueType::UpdateStateJobProcessing,
3✔
121
        }
122
    }
40✔
123
    fn verify_queue_name(&self) -> QueueType {
12✔
124
        match self {
12✔
125
            JobType::SnosRun => QueueType::SnosJobVerification,
5✔
126
            JobType::ProofCreation => QueueType::ProvingJobVerification,
×
127
            JobType::ProofRegistration => QueueType::ProofRegistrationJobVerification,
×
128
            JobType::DataSubmission => QueueType::DataSubmissionJobVerification,
7✔
129
            JobType::StateTransition => QueueType::UpdateStateJobVerification,
×
130
        }
131
    }
12✔
132
}
133

134
pub async fn add_job_to_process_queue(id: Uuid, job_type: &JobType, config: Arc<Config>) -> EyreResult<()> {
28✔
135
    tracing::info!("Adding job with id {:?} to processing queue", id);
28✔
136
    add_job_to_queue(id, job_type.process_queue_name(), None, config).await
32✔
137
}
28✔
138

139
pub async fn add_job_to_verification_queue(
5✔
140
    id: Uuid,
5✔
141
    job_type: &JobType,
5✔
142
    delay: Duration,
5✔
143
    config: Arc<Config>,
5✔
144
) -> EyreResult<()> {
5✔
145
    tracing::info!("Adding job with id {:?} to verification queue", id);
5✔
146
    add_job_to_queue(id, job_type.verify_queue_name(), Some(delay), config).await
20✔
147
}
5✔
148

149
pub async fn consume_job_from_queue<F, Fut>(
×
150
    queue: QueueType,
×
151
    handler: F,
×
152
    config: Arc<Config>,
×
153
) -> Result<(), ConsumptionError>
×
154
where
×
155
    F: FnOnce(Uuid, Arc<Config>) -> Fut,
×
156
    F: Send + 'static,
×
157
    Fut: Future<Output = Result<(), JobError>> + Send,
×
158
{
×
159
    tracing::trace!(queue = %queue, "Attempting to consume job from queue");
×
160

161
    let delivery = get_delivery_from_queue(queue.clone(), config.clone()).await?;
×
162

163
    let message = match delivery {
×
164
        DeliveryReturnType::Message(message) => {
×
165
            tracing::debug!(queue = %queue, "Message received from queue");
×
166
            message
×
167
        }
168
        DeliveryReturnType::NoMessage => {
169
            tracing::debug!(queue = %queue, "No message in queue");
×
170
            return Ok(());
×
171
        }
172
    };
173

174
    let job_message = parse_job_message(&message)?;
×
175

176
    if let Some(job_message) = job_message {
×
177
        tracing::info!(queue = %queue, job_id = %job_message.id, "Processing job message");
×
178
        tokio::spawn(async move {
×
179
            match handle_job_message(job_message, message, handler, config).await {
×
180
                Ok(_) => {}
×
181
                Err(e) => log::error!("Failed to handle job message. Error: {:?}", e),
×
182
            }
183
        });
×
184
    } else {
×
185
        tracing::warn!(queue = %queue, "Received empty job message");
×
186
    }
187

188
    tracing::info!(queue = %queue, "Job consumption completed successfully");
×
189
    Ok(())
×
190
}
×
191

192
/// Function to consume the message from the worker trigger queues and spawn the worker
193
/// for respective message received.
194
pub async fn consume_worker_trigger_messages_from_queue<F, Fut>(
×
195
    queue: QueueType,
×
196
    handler: F,
×
197
    config: Arc<Config>,
×
198
) -> Result<(), ConsumptionError>
×
199
where
×
200
    F: FnOnce(Box<dyn Worker>, Arc<Config>) -> Fut,
×
201
    F: Send + 'static,
×
202
    Fut: Future<Output = color_eyre::Result<()>> + Send,
×
203
{
×
204
    tracing::debug!("Consuming from queue {:?}", queue);
×
205
    let delivery = get_delivery_from_queue(queue, Arc::clone(&config)).await?;
×
206

207
    let message = match delivery {
×
208
        DeliveryReturnType::Message(message) => message,
×
209
        DeliveryReturnType::NoMessage => return Ok(()),
×
210
    };
211

212
    let job_message = parse_worker_message(&message)?;
×
213

214
    if let Some(job_message) = job_message {
×
215
        tokio::spawn(async move {
×
216
            match handle_worker_message(job_message, message, handler, config).await {
×
217
                Ok(_) => {}
×
218
                Err(e) => tracing::error!("Failed to handle worker message. Error: {:?}", e),
×
219
            }
220
        });
×
221
    }
×
222

223
    Ok(())
×
224
}
×
225

226
fn parse_job_message(message: &Delivery) -> Result<Option<JobQueueMessage>, ConsumptionError> {
×
227
    message
×
228
        .payload_serde_json()
×
229
        .wrap_err("Payload Serde Error")
×
230
        .map_err(|e| ConsumptionError::Other(OtherError::from(e)))
×
231
}
×
232

233
/// Using string since localstack currently is instable with deserializing maps.
234
/// Change this to accept a map after localstack is stable
235
fn parse_worker_message(message: &Delivery) -> Result<Option<WorkerTriggerMessage>, ConsumptionError> {
×
236
    let payload = message
×
237
        .borrow_payload()
×
238
        .ok_or_else(|| ConsumptionError::Other(OtherError::from("Empty payload".to_string())))?;
×
239
    let message_string = String::from_utf8_lossy(payload).to_string().trim_matches('\"').to_string();
×
240
    let trigger_type = WorkerTriggerType::from_str(message_string.as_str())
×
241
        .wrap_err("Failed to parse worker trigger type from message")
×
242
        .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?;
×
243
    Ok(Some(WorkerTriggerMessage { worker: trigger_type }))
×
244
}
×
245

246
async fn handle_job_message<F, Fut>(
×
247
    job_message: JobQueueMessage,
×
248
    message: Delivery,
×
249
    handler: F,
×
250
    config: Arc<Config>,
×
251
) -> Result<(), ConsumptionError>
×
252
where
×
253
    F: FnOnce(Uuid, Arc<Config>) -> Fut,
×
254
    Fut: Future<Output = Result<(), JobError>>,
×
255
{
×
256
    tracing::info!("Handling job with id {:?}", job_message.id);
×
257

258
    match handler(job_message.id, config.clone()).await {
×
259
        Ok(_) => {
260
            message
×
261
                .ack()
×
262
                .await
×
263
                .map_err(|(e, _)| e)
×
264
                .wrap_err("Queue Error")
×
265
                .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?;
×
266
            Ok(())
×
267
        }
268
        Err(e) => {
×
269
            tracing::error!("Failed to handle job with id {:?}. Error: {:?}", job_message.id, e);
×
270
            config
×
271
                .alerts()
×
272
                .send_alert_message(e.to_string())
×
273
                .await
×
274
                .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?;
×
275

276
            // not using `nack` as we dont' want retries in case of failures
277
            match message.ack().await {
×
278
                Ok(_) => Err(ConsumptionError::FailedToHandleJob {
×
279
                    job_id: job_message.id,
×
280
                    error_msg: "Job handling failed, message nack-ed".to_string(),
×
281
                }),
×
282
                Err(delivery_nack_error) => Err(ConsumptionError::FailedToHandleJob {
×
283
                    job_id: job_message.id,
×
284
                    error_msg: delivery_nack_error.0.to_string(),
×
285
                }),
×
286
            }
287
        }
288
    }
289
}
×
290

291
async fn handle_worker_message<F, Fut>(
×
292
    job_message: WorkerTriggerMessage,
×
293
    message: Delivery,
×
294
    handler: F,
×
295
    config: Arc<Config>,
×
296
) -> Result<(), ConsumptionError>
×
297
where
×
298
    F: FnOnce(Box<dyn Worker>, Arc<Config>) -> Fut,
×
299
    Fut: Future<Output = color_eyre::Result<()>>,
×
300
{
×
301
    let worker_handler = get_worker_handler_from_worker_trigger_type(job_message.worker.clone());
×
302

×
303
    match handler(worker_handler, config.clone()).await {
×
304
        Ok(_) => {
305
            message
×
306
                .ack()
×
307
                .await
×
308
                .map_err(|(e, _)| e)
×
309
                .wrap_err("Queue Error")
×
310
                .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?;
×
311
            Ok(())
×
312
        }
313
        Err(e) => {
×
314
            tracing::error!("Failed to handle worker trigger {:?}. Error: {:?}", job_message.worker, e);
×
315
            config
×
316
                .alerts()
×
317
                .send_alert_message(e.to_string())
×
318
                .await
×
319
                .map_err(|e| ConsumptionError::Other(OtherError::from(e)))?;
×
320

321
            // not using `nack` as we dont' want retries in case of failures
322
            message.ack().await.map_err(|(e, _)| ConsumptionError::Other(OtherError::from(e.to_string())))?;
×
323
            Err(ConsumptionError::FailedToSpawnWorker {
×
324
                worker_trigger_type: job_message.worker,
×
325
                error_msg: "Worker handling failed, message nack-ed".to_string(),
×
326
            })
×
327
        }
328
    }
329
}
×
330

331
/// To get Box<dyn Worker> handler from `WorkerTriggerType`.
332
fn get_worker_handler_from_worker_trigger_type(worker_trigger_type: WorkerTriggerType) -> Box<dyn Worker> {
×
333
    match worker_trigger_type {
×
334
        WorkerTriggerType::Snos => Box::new(SnosWorker),
×
335
        WorkerTriggerType::Proving => Box::new(ProvingWorker),
×
336
        WorkerTriggerType::DataSubmission => Box::new(DataSubmissionWorker),
×
337
        WorkerTriggerType::ProofRegistration => Box::new(ProofRegistrationWorker),
×
338
        WorkerTriggerType::UpdateState => Box::new(UpdateStateWorker),
×
339
    }
340
}
×
341

342
/// To get the delivery from the message queue using the queue name
343
async fn get_delivery_from_queue(
×
344
    queue: QueueType,
×
345
    config: Arc<Config>,
×
346
) -> Result<DeliveryReturnType, ConsumptionError> {
×
347
    match config.queue().consume_message_from_queue(queue).await {
×
348
        Ok(d) => Ok(DeliveryReturnType::Message(d)),
×
349
        Err(QueueError::NoData) => Ok(DeliveryReturnType::NoMessage),
×
350
        Err(e) => Err(ConsumptionError::FailedToConsumeFromQueue { error_msg: e.to_string() }),
×
351
    }
352
}
×
353

354
macro_rules! spawn_consumer {
355
    ($queue_type:expr, $handler:expr, $consume_function:expr, $config:expr) => {
356
        let config_clone = $config.clone();
357
        tokio::spawn(async move {
×
358
            loop {
359
                match $consume_function($queue_type, $handler, config_clone.clone()).await {
×
360
                    Ok(_) => {}
×
361
                    Err(e) => tracing::error!("Failed to consume from queue {:?}. Error: {:?}", $queue_type, e),
×
362
                }
NEW
363
                sleep(Duration::from_secs(2)).await;
×
364
            }
365
        });
366
    };
367
}
368

369
pub async fn init_consumers(config: Arc<Config>) -> Result<(), JobError> {
2✔
370
    spawn_consumer!(QueueType::SnosJobProcessing, process_job, consume_job_from_queue, config.clone());
2✔
371
    spawn_consumer!(QueueType::SnosJobVerification, verify_job, consume_job_from_queue, config.clone());
2✔
372

2✔
373
    spawn_consumer!(QueueType::ProvingJobProcessing, process_job, consume_job_from_queue, config.clone());
2✔
374
    spawn_consumer!(QueueType::ProvingJobVerification, verify_job, consume_job_from_queue, config.clone());
2✔
375

2✔
376
    spawn_consumer!(QueueType::DataSubmissionJobProcessing, process_job, consume_job_from_queue, config.clone());
2✔
377
    spawn_consumer!(QueueType::DataSubmissionJobVerification, verify_job, consume_job_from_queue, config.clone());
2✔
378

2✔
379
    spawn_consumer!(QueueType::UpdateStateJobProcessing, process_job, consume_job_from_queue, config.clone());
2✔
380
    spawn_consumer!(QueueType::UpdateStateJobVerification, verify_job, consume_job_from_queue, config.clone());
2✔
381

2✔
382
    spawn_consumer!(QueueType::JobHandleFailure, handle_job_failure, consume_job_from_queue, config.clone());
2✔
383

2✔
384
    spawn_consumer!(QueueType::WorkerTrigger, spawn_worker, consume_worker_trigger_messages_from_queue, config);
2✔
385
    Ok(())
2✔
386
}
2✔
387

388
/// To spawn the worker by passing the worker struct
389
async fn spawn_worker(worker: Box<dyn Worker>, config: Arc<Config>) -> color_eyre::Result<()> {
×
390
    if let Err(e) = worker.run_worker_if_enabled(config).await {
×
391
        log::error!("Failed to spawn worker. Error: {}", e);
×
392
        return Err(e);
×
393
    }
×
394
    Ok(())
×
395
}
×
396
async fn add_job_to_queue(id: Uuid, queue: QueueType, delay: Option<Duration>, config: Arc<Config>) -> EyreResult<()> {
33✔
397
    let message = JobQueueMessage { id };
33✔
398
    config.queue().send_message_to_queue(queue.clone(), serde_json::to_string(&message)?, delay).await?;
52✔
399
    tracing::info!(
33✔
400
        log_type = "JobQueue",
401
        category = "add_job_to_queue",
402
        function_type = "add_job_to_queue",
403
        "Added job with id {:?} to {:?} queue",
×
404
        id,
405
        queue
406
    );
407
    Ok(())
33✔
408
}
33✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc