• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dariusbakunas / cogrs / 13400597144

18 Feb 2025 09:42PM UTC coverage: 39.154% (-1.8%) from 40.988%
13400597144

push

github

dariusbakunas
feat: add very basic worker threads and implement message passing

0 of 80 new or added lines in 6 files covered. (0.0%)

2 existing lines in 2 files now uncovered.

639 of 1632 relevant lines covered (39.15%)

1.35 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/cogrs-core/src/executor/task_queue_manager.rs
1
use crate::executor::play_iterator::PlayIterator;
2
use crate::inventory::host::Host;
3
use crate::inventory::manager::InventoryManager;
4
use crate::playbook::play::Play;
5
use crate::strategy::linear::LinearStrategy;
6
use crate::strategy::Strategy;
7
use crate::vars::manager::VariableManager;
8
use anyhow::Result;
9
use cogrs_plugins::callback::{CallbackPlugin, EventType};
10
use serde_json::Value;
11
use std::cmp::min;
12
use std::collections::HashMap;
13
use std::sync::Arc;
14

15
pub struct TaskQueueManager {
16
    forks: usize,
17
    callbacks_loaded: bool,
18
    callbacks: HashMap<EventType, Vec<Arc<dyn CallbackPlugin>>>,
19
    terminated: bool,
20
    unreachable_hosts: HashMap<String, Host>,
21
    workers: Vec<tokio::task::JoinHandle<()>>,
22
}
23

24
const DEFAULT_FORKS: usize = 5;
25

26
impl TaskQueueManager {
NEW
27
    pub fn new(forks: Option<usize>) -> Self {
×
28
        Self {
29
            callbacks: HashMap::new(),
×
30
            callbacks_loaded: false,
31
            forks: forks.unwrap_or(DEFAULT_FORKS),
×
32
            terminated: false,
33
            unreachable_hosts: HashMap::new(),
×
NEW
34
            workers: Vec::with_capacity(forks.unwrap_or(DEFAULT_FORKS)),
×
35
        }
36
    }
37

NEW
38
    pub fn get_worker(&mut self, index: usize) -> Option<&tokio::task::JoinHandle<()>> {
×
NEW
39
        self.workers.get(index)
×
40
    }
41

NEW
42
    pub fn set_worker(&mut self, index: usize, worker: tokio::task::JoinHandle<()>) {
×
NEW
43
        self.workers.insert(index, worker);
×
44
    }
45

UNCOV
46
    pub async fn run(
×
47
        &mut self,
48
        play: Play,
49
        variable_manager: &VariableManager,
50
        inventory_manager: &InventoryManager,
51
    ) -> Result<()> {
52
        self.load_callbacks(
×
53
            // TODO: add logic to get callback plugin path
54
            "/Users/darius/Programming/cogrs/dist/minimal-apple_x86_64-apple-darwin",
55
        );
56
        let all_vars = variable_manager.get_vars(Some(&play), None, None, true, true);
×
57

58
        self.emit_event(EventType::PlaybookOnPlayStart, None).await;
×
59

NEW
60
        let strategy = *play.strategy();
×
61

62
        let mut play_iterator = PlayIterator::new(play);
×
63
        play_iterator.init(inventory_manager)?;
×
64

65
        let forks = min(self.forks, play_iterator.batch_size());
×
66

67
        match strategy {
×
68
            Strategy::Linear => {
NEW
69
                let mut strategy = LinearStrategy::new(self, inventory_manager, variable_manager);
×
70
                strategy.run(&mut play_iterator).await?;
×
71
            }
72
            Strategy::Free => {
73
                todo!()
74
            }
75
        }
76

77
        Ok(())
×
78
    }
79

80
    pub fn register_callback(&mut self, callback: Box<dyn CallbackPlugin>) {
×
81
        let callback: Arc<dyn CallbackPlugin> = Arc::from(callback);
×
82
        for event in callback.get_interested_events() {
×
83
            self.callbacks
×
84
                .entry(event)
85
                .or_insert_with(Vec::new)
86
                .push(callback.clone());
×
87
        }
88
    }
89

90
    pub fn get_unreachable_hosts(&self) -> &HashMap<String, Host> {
×
91
        &self.unreachable_hosts
×
92
    }
93

94
    pub fn is_terminated(&self) -> bool {
×
95
        self.terminated
×
96
    }
97

98
    fn load_callbacks(&mut self, plugin_dir: &str) {
×
99
        use libloading::{Library, Symbol};
100
        use std::fs;
101

102
        let plugin_extension = if cfg!(target_os = "windows") {
103
            "dll"
104
        } else if cfg!(target_os = "macos") {
105
            "dylib"
106
        } else {
107
            "so"
×
108
        };
109

110
        if self.callbacks_loaded {
×
111
            return;
112
        }
113

114
        for entry in fs::read_dir(plugin_dir).expect("Invalid plugin directory") {
×
115
            let path = entry.expect("Failed to read entry").path();
×
116
            if path.extension().and_then(|e| e.to_str()) == Some(plugin_extension) {
×
117
                unsafe {
118
                    let lib = Library::new(&path).expect("Failed to load plugin");
×
119

120
                    // Dynamically load the callback creation function
121
                    let create_callback: Symbol<fn() -> Box<dyn CallbackPlugin>> = lib
×
122
                        .get(b"create_plugin")
123
                        .expect("Failed to find create_plugin function");
124

125
                    let plugin = create_callback();
×
126

127
                    // Register the plugin for events
128
                    self.register_callback(plugin);
×
129
                }
130
            }
131
        }
132

133
        self.callbacks_loaded = true
×
134
    }
135

136
    pub async fn emit_event(&self, event: EventType, data: Option<Value>) {
×
137
        if let Some(callbacks) = self.callbacks.get(&event) {
×
138
            // Spawn and collect tasks
139
            let tasks: Vec<_> = callbacks
×
140
                .iter()
141
                .map(|callback| {
×
142
                    let callback = callback.clone();
×
143
                    let event = event.clone();
×
144
                    let data = data.clone();
×
145
                    tokio::spawn(async move {
×
146
                        callback.on_event(&event, data.as_ref()); // Async invocation
×
147
                    })
148
                })
149
                .collect();
150

151
            // Wait for all spawned tasks to complete
152
            for task in tasks {
×
153
                if let Err(err) = task.await {
×
154
                    eprintln!("Callback task panicked: {:?}", err);
×
155
                }
156
            }
157
        }
158
    }
159

NEW
160
    pub fn forks(&self) -> usize {
×
NEW
161
        self.forks
×
162
    }
163
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc