• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dariusbakunas / cogrs / 13391271547

18 Feb 2025 01:06PM UTC coverage: 40.988% (+1.0%) from 39.987%
13391271547

push

github

dariusbakunas
refactor: remove unnecessary references to avoid lifetimes in async strategy function

0 of 25 new or added lines in 6 files covered. (0.0%)

3 existing lines in 2 files now uncovered.

639 of 1559 relevant lines covered (40.99%)

1.32 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/cogrs-core/src/executor/task_queue_manager.rs
1
use crate::executor::play_iterator::PlayIterator;
2
use crate::inventory::host::Host;
3
use crate::inventory::manager::InventoryManager;
4
use crate::playbook::play::Play;
5
use crate::strategy::linear::LinearStrategy;
6
use crate::strategy::Strategy;
7
use crate::vars::manager::VariableManager;
8
use anyhow::Result;
9
use cogrs_plugins::callback::{CallbackPlugin, EventType};
10
use serde_json::Value;
11
use std::cmp::min;
12
use std::collections::HashMap;
13
use std::sync::Arc;
14

15
pub struct TaskQueueManager {
16
    forks: u32,
17
    callbacks_loaded: bool,
18
    callbacks: HashMap<EventType, Vec<Arc<dyn CallbackPlugin>>>,
19
    terminated: bool,
20
    unreachable_hosts: HashMap<String, Host>,
21
}
22

23
const DEFAULT_FORKS: u32 = 5;
24

25
impl TaskQueueManager {
NEW
26
    pub fn new(forks: Option<u32>) -> Self {
×
27
        Self {
28
            callbacks: HashMap::new(),
×
29
            callbacks_loaded: false,
30
            forks: forks.unwrap_or(DEFAULT_FORKS),
×
31
            terminated: false,
UNCOV
32
            unreachable_hosts: HashMap::new(),
×
33
        }
34
    }
35

NEW
36
    pub async fn run(
×
37
        &mut self,
38
        play: Play,
39
        variable_manager: &VariableManager,
40
        inventory_manager: &InventoryManager,
41
    ) -> Result<()> {
UNCOV
42
        self.load_callbacks(
×
43
            // TODO: add logic to get callback plugin path
44
            "/Users/darius/Programming/cogrs/dist/minimal-apple_x86_64-apple-darwin",
45
        );
NEW
46
        let all_vars = variable_manager.get_vars(Some(&play), None, None, true, true);
×
47

48
        self.emit_event(EventType::PlaybookOnPlayStart, None).await;
×
49

NEW
50
        let strategy = play.strategy().clone();
×
51

52
        let mut play_iterator = PlayIterator::new(play);
×
NEW
53
        play_iterator.init(inventory_manager)?;
×
54

55
        let forks = min(self.forks, play_iterator.batch_size());
×
56

NEW
57
        match strategy {
×
58
            Strategy::Linear => {
NEW
59
                let mut strategy = LinearStrategy::new(&self, inventory_manager, variable_manager);
×
NEW
60
                strategy.run(&mut play_iterator).await?;
×
61
            }
62
            Strategy::Free => {
63
                todo!()
64
            }
65
        }
66

67
        Ok(())
×
68
    }
69

70
    pub fn register_callback(&mut self, callback: Box<dyn CallbackPlugin>) {
×
71
        let callback: Arc<dyn CallbackPlugin> = Arc::from(callback);
×
72
        for event in callback.get_interested_events() {
×
73
            self.callbacks
×
74
                .entry(event)
75
                .or_insert_with(Vec::new)
76
                .push(callback.clone());
×
77
        }
78
    }
79

80
    pub fn get_unreachable_hosts(&self) -> &HashMap<String, Host> {
×
81
        &self.unreachable_hosts
×
82
    }
83

84
    pub fn is_terminated(&self) -> bool {
×
85
        self.terminated
×
86
    }
87

88
    fn load_callbacks(&mut self, plugin_dir: &str) {
×
89
        use libloading::{Library, Symbol};
90
        use std::fs;
91

92
        let plugin_extension = if cfg!(target_os = "windows") {
93
            "dll"
94
        } else if cfg!(target_os = "macos") {
95
            "dylib"
96
        } else {
97
            "so"
×
98
        };
99

100
        if self.callbacks_loaded {
×
101
            return;
102
        }
103

104
        for entry in fs::read_dir(plugin_dir).expect("Invalid plugin directory") {
×
105
            let path = entry.expect("Failed to read entry").path();
×
106
            if path.extension().and_then(|e| e.to_str()) == Some(plugin_extension) {
×
107
                unsafe {
108
                    let lib = Library::new(&path).expect("Failed to load plugin");
×
109

110
                    // Dynamically load the callback creation function
111
                    let create_callback: Symbol<fn() -> Box<dyn CallbackPlugin>> = lib
×
112
                        .get(b"create_plugin")
113
                        .expect("Failed to find create_plugin function");
114

115
                    let plugin = create_callback();
×
116

117
                    // Register the plugin for events
118
                    self.register_callback(plugin);
×
119
                }
120
            }
121
        }
122

123
        self.callbacks_loaded = true
×
124
    }
125

126
    pub async fn emit_event(&self, event: EventType, data: Option<Value>) {
×
127
        if let Some(callbacks) = self.callbacks.get(&event) {
×
128
            // Spawn and collect tasks
129
            let tasks: Vec<_> = callbacks
×
130
                .iter()
131
                .map(|callback| {
×
132
                    let callback = callback.clone();
×
133
                    let event = event.clone();
×
134
                    let data = data.clone();
×
135
                    tokio::spawn(async move {
×
136
                        callback.on_event(&event, data.as_ref()); // Async invocation
×
137
                    })
138
                })
139
                .collect();
140

141
            // Wait for all spawned tasks to complete
142
            for task in tasks {
×
143
                if let Err(err) = task.await {
×
144
                    eprintln!("Callback task panicked: {:?}", err);
×
145
                }
146
            }
147
        }
148
    }
149
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc