• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dariusbakunas / cogrs / 13486377277

23 Feb 2025 07:54PM UTC coverage: 36.597% (-0.4%) from 36.957%
13486377277

push

github

dariusbakunas
refactor: extract init code from adhoc cli to cli trait

0 of 29 new or added lines in 2 files covered. (0.0%)

95 existing lines in 4 files now uncovered.

714 of 1951 relevant lines covered (36.6%)

1.21 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/cogrs-core/src/executor/task_queue_manager.rs
1
use crate::executor::play_iterator::PlayIterator;
2
use crate::inventory::host::Host;
3
use crate::inventory::manager::InventoryManager;
4
use crate::playbook::play::Play;
5
use crate::strategy::linear::LinearStrategy;
6
use crate::strategy::Strategy;
7
use crate::vars::manager::VariableManager;
8
use anyhow::Result;
9
use cogrs_plugins::callback::{CallbackPlugin, EventType};
10
use serde_json::Value;
11
use std::cmp::min;
12
use std::collections::HashMap;
13
use std::path::PathBuf;
14
use std::sync::Arc;
15

16
pub struct TaskQueueManager {
17
    forks: usize,
18
    callbacks_loaded: bool,
19
    callbacks: HashMap<EventType, Vec<Arc<dyn CallbackPlugin>>>,
20
    terminated: bool,
21
    unreachable_hosts: HashMap<String, Host>,
22
    workers: Vec<tokio::task::JoinHandle<()>>,
23
}
24

25
const DEFAULT_FORKS: usize = 5;
26

27
impl TaskQueueManager {
UNCOV
28
    pub fn new(forks: Option<usize>) -> Self {
×
29
        Self {
UNCOV
30
            callbacks: HashMap::new(),
×
31
            callbacks_loaded: false,
UNCOV
32
            forks: forks.unwrap_or(DEFAULT_FORKS),
×
33
            terminated: false,
UNCOV
34
            unreachable_hosts: HashMap::new(),
×
35
            workers: Vec::with_capacity(forks.unwrap_or(DEFAULT_FORKS)),
×
36
        }
37
    }
38

UNCOV
39
    pub fn get_worker(&mut self, index: usize) -> Option<&tokio::task::JoinHandle<()>> {
×
UNCOV
40
        self.workers.get(index)
×
41
    }
42

UNCOV
43
    pub fn set_worker(&mut self, index: usize, worker: tokio::task::JoinHandle<()>) {
×
UNCOV
44
        self.workers.insert(index, worker);
×
45
    }
46

47
    /// Iterates over the roles/tasks in a play, using the given (or default)
48
    /// strategy for queueing tasks. The default is the linear strategy, which
49
    /// operates like classic Ansible by keeping all hosts in lock-step with
50
    /// a given task (meaning no hosts move on to the next task until all hosts
51
    /// are done with the current task).
UNCOV
52
    pub async fn run(
×
53
        &mut self,
54
        play: Play,
55
        variable_manager: &VariableManager,
56
        inventory_manager: &InventoryManager,
57
    ) -> Result<()> {
UNCOV
58
        self.load_callbacks().await?;
×
UNCOV
59
        let all_vars = variable_manager.get_vars(Some(&play), None, None, None, true, true);
×
60

61
        self.emit_event(EventType::PlaybookOnPlayStart, None).await;
×
62

63
        let strategy = *play.strategy();
×
64

65
        let mut play_iterator = PlayIterator::new(play);
×
UNCOV
66
        play_iterator.init(inventory_manager)?;
×
67

68
        self.forks = min(self.forks, play_iterator.batch_size());
×
69

70
        match strategy {
×
71
            Strategy::Linear => {
72
                let mut strategy = LinearStrategy::new(self, inventory_manager, variable_manager);
×
UNCOV
73
                strategy.run(&mut play_iterator).await?;
×
74
            }
75
            Strategy::Free => {
76
                todo!()
77
            }
78
        }
79

UNCOV
80
        Ok(())
×
81
    }
82

UNCOV
83
    pub fn register_callback(&mut self, callback: Arc<dyn CallbackPlugin>) {
×
UNCOV
84
        for event in callback.get_interested_events() {
×
85
            self.callbacks
×
86
                .entry(event)
87
                .or_insert_with(Vec::new)
UNCOV
88
                .push(callback.clone());
×
89
        }
90
    }
91

UNCOV
92
    pub fn get_unreachable_hosts(&self) -> &HashMap<String, Host> {
×
UNCOV
93
        &self.unreachable_hosts
×
94
    }
95

UNCOV
96
    pub fn is_terminated(&self) -> bool {
×
UNCOV
97
        self.terminated
×
98
    }
99

UNCOV
100
    async fn load_callbacks(&mut self) -> Result<()> {
×
UNCOV
101
        let plugin_loader = cogrs_plugins::plugin_loader::PluginLoader::instance();
×
102
        let loader = plugin_loader.lock().await;
×
103

104
        let plugins = loader.get_callback_plugins().await?;
×
105

106
        for plugin in plugins {
×
107
            self.register_callback(plugin);
×
108
        }
109

110
        self.callbacks_loaded = true;
×
111
        Ok(())
×
112
    }
113

114
    pub async fn emit_event(&self, event: EventType, data: Option<Value>) {
×
115
        if let Some(callbacks) = self.callbacks.get(&event) {
×
116
            // Spawn and collect tasks
UNCOV
117
            let tasks: Vec<_> = callbacks
×
118
                .iter()
119
                .map(|callback| {
×
UNCOV
120
                    let callback = callback.clone();
×
121
                    let event = event.clone();
×
UNCOV
122
                    let data = data.clone();
×
123
                    tokio::spawn(async move {
×
124
                        callback.on_event(&event, data.as_ref()); // Async invocation
×
125
                    })
126
                })
127
                .collect();
128

129
            // Wait for all spawned tasks to complete
UNCOV
130
            for task in tasks {
×
UNCOV
131
                if let Err(err) = task.await {
×
UNCOV
132
                    eprintln!("Callback task panicked: {:?}", err);
×
133
                }
134
            }
135
        }
136
    }
137

UNCOV
138
    pub fn forks(&self) -> usize {
×
UNCOV
139
        self.forks
×
140
    }
141
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc