• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dcdpr / jp / 26664039893

29 May 2026 09:50PM UTC coverage: 66.375% (+0.003%) from 66.372%
26664039893

push

github

web-flow
chore: reformat all markdown files using `comfort` (#699)

Signed-off-by: Jean Mertz <git@jeanmertz.com>

32028 of 48253 relevant lines covered (66.38%)

269.79 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.08
/crates/jp_cli/src/cmd/plugin/dispatch.rs
1
//! Host-side plugin message loop.
2
//!
3
//! Spawns the plugin binary, sends `init`, and relays workspace queries until
4
//! the plugin sends `exit` or the process terminates.
5

6
use std::{
7
    collections::HashSet,
8
    io::{BufRead, BufReader, Write},
9
    process::{Command, Stdio},
10
    sync::{
11
        Arc, Mutex,
12
        atomic::{AtomicBool, Ordering},
13
    },
14
    thread,
15
};
16

17
use camino::{Utf8Path, Utf8PathBuf};
18
use jp_config::{
19
    AppConfig,
20
    plugins::{
21
        PluginsConfig,
22
        command::{CommandPluginConfig, RunPolicy},
23
    },
24
};
25
use jp_inquire::{InlineOption, InlineSelect};
26
use jp_plugin::{
27
    PROTOCOL_VERSION,
28
    message::{
29
        ConfigResponse, ConversationSummary, ConversationsResponse, DescribeResponse,
30
        ErrorResponse, EventsResponse, HostToPlugin, InitMessage, LogMessage, PathsInfo,
31
        PluginToHost, WorkspaceInfo,
32
    },
33
};
34
use jp_workspace::Workspace;
35
use serde_json::Value;
36
use tracing::{debug, error, trace, warn};
37

38
use super::registry;
39
use crate::{Ctx, cmd, signals::SignalPair};
40

41
/// Run a plugin binary, handling the full protocol lifecycle.
42
///
43
/// `binary` is the path to the plugin executable.
44
/// `args` are the remaining CLI arguments to forward.
45
pub(crate) fn run_plugin(
×
46
    name: &str,
×
47
    binary: &Utf8Path,
×
48
    args: &[String],
×
49
    workspace: &Workspace,
×
50
    storage_path: Option<&Utf8Path>,
×
51
    user_storage_path: Option<&Utf8Path>,
×
52
    config: &Arc<AppConfig>,
×
53
    signals: &SignalPair,
×
54
    log_level: u8,
×
55
) -> Result<(), cmd::Error> {
×
56
    let config_json = serde_json::to_value(config.as_ref().to_partial())
×
57
        .map_err(|e| cmd::Error::from(format!("failed to serialize config: {e}")))?;
×
58

59
    let options: serde_json::Map<String, Value> = config
×
60
        .plugins
×
61
        .command
×
62
        .get(name)
×
63
        .and_then(|c| c.options.as_ref())
×
64
        .and_then(Value::as_object)
×
65
        .cloned()
×
66
        .unwrap_or_default();
×
67

68
    let storage_path = storage_path.ok_or("workspace has no storage configured")?;
×
69

70
    let home = std::env::home_dir().and_then(|p| camino::Utf8PathBuf::from_path_buf(p).ok());
×
71

72
    let init = HostToPlugin::Init(InitMessage {
×
73
        version: PROTOCOL_VERSION,
×
74
        workspace: WorkspaceInfo {
×
75
            root: workspace.root().to_owned(),
×
76
            storage: storage_path.to_owned(),
×
77
            id: workspace.id().to_string(),
×
78
        },
×
79
        paths: PathsInfo {
×
80
            user_data: jp_workspace::user_data_dir().ok(),
×
81
            user_config: jp_config::fs::user_global_config_dir(home.as_deref()),
×
82
            user_workspace: user_storage_path.map(ToOwned::to_owned),
×
83
        },
×
84
        config: config_json.clone(),
×
85
        options,
×
86
        args: args.to_vec(),
×
87
        log_level,
×
88
    });
×
89

90
    debug!(%binary, "Spawning plugin.");
×
91

92
    let mut cmd = Command::new(binary);
×
93
    cmd.stdin(Stdio::piped())
×
94
        .stdout(Stdio::piped())
×
95
        .stderr(Stdio::piped());
×
96

97
    // Prevent the child from receiving SIGINT/SIGTERM directly. The host
98
    // sends `Shutdown` over the protocol instead, giving the plugin a
99
    // chance to exit gracefully.
100
    #[cfg(unix)]
101
    {
102
        use std::os::unix::process::CommandExt as _;
103
        cmd.process_group(0);
×
104
    }
105

106
    let mut child = cmd
×
107
        .spawn()
×
108
        .map_err(|e| cmd::Error::from(format!("failed to spawn plugin: {e}")))?;
×
109

110
    let child_stdin = child.stdin.take().expect("stdin piped");
×
111
    let stdout = child.stdout.take().expect("stdout piped");
×
112
    let stderr = child.stderr.take().expect("stderr piped");
×
113

114
    // Wrap stdin so the shutdown thread can write to it too.
115
    let stdin = Arc::new(Mutex::new(child_stdin));
×
116

117
    // Forward stderr to tracing in a background thread.
118
    let stderr_handle = thread::spawn(move || {
×
119
        let reader = BufReader::new(stderr);
×
120
        for line in reader.lines() {
×
121
            match line {
×
122
                Ok(line) => trace!(target: "plugin::stderr", "{}", line),
×
123
                Err(e) => {
×
124
                    warn!("Error reading plugin stderr: {e}");
×
125
                    break;
×
126
                }
127
            }
128
        }
129
    });
×
130

131
    // Shutdown thread: sends `Shutdown` directly to the plugin's stdin
132
    // when a signal arrives. If the plugin doesn't exit within the grace
133
    // period, sends SIGKILL.
134
    let shutdown_sent = Arc::new(AtomicBool::new(false));
×
135
    let shutdown_writer = stdin.clone();
×
136
    let shutdown_flag = shutdown_sent.clone();
×
137
    let child_id = child.id();
×
138
    let mut shutdown_rx = signals.receiver.resubscribe();
×
139
    let shutdown_handle = thread::spawn(move || {
×
140
        if futures::executor::block_on(shutdown_rx.recv()).is_err() {
×
141
            return;
×
142
        }
×
143

144
        // Send Shutdown over the protocol.
145
        if let Ok(mut writer) = shutdown_writer.lock() {
×
146
            drop(write_message(&mut *writer, &HostToPlugin::Shutdown));
×
147
        }
×
148
        shutdown_flag.store(true, Ordering::Release);
×
149

150
        // Grace period: wait in short intervals so we don't block cleanup
151
        // if the plugin exits promptly.
152
        for _ in 0..50 {
×
153
            thread::sleep(std::time::Duration::from_millis(100));
×
154
            if !is_process_alive(child_id) {
×
155
                return;
×
156
            }
×
157
        }
158

159
        kill_child(child_id);
×
160
    });
×
161

162
    // Send init.
163
    {
164
        let mut writer = stdin.lock().expect("stdin lock poisoned");
×
165
        write_message(&mut *writer, &init)
×
166
            .map_err(|e| cmd::Error::from(format!("failed to send init: {e}")))?;
×
167
    }
168

169
    // Read messages from plugin.
170
    let reader = BufReader::new(stdout);
×
171
    let result = message_loop(reader, &stdin, workspace, &config_json, &shutdown_sent);
×
172

173
    // Always clean up, even on error.
174
    drop(child.wait());
×
175
    drop(stderr_handle.join());
×
176
    drop(shutdown_handle);
×
177

178
    result
×
179
}
×
180

181
/// The main message loop: reads plugin requests and sends responses.
182
fn message_loop(
1✔
183
    reader: BufReader<impl std::io::Read>,
1✔
184
    stdin: &Mutex<impl Write>,
1✔
185
    workspace: &Workspace,
1✔
186
    config_json: &Value,
1✔
187
    shutdown_sent: &AtomicBool,
1✔
188
) -> Result<(), cmd::Error> {
1✔
189
    for line in reader.lines() {
2✔
190
        let line =
2✔
191
            line.map_err(|e| cmd::Error::from(format!("failed to read from plugin: {e}")))?;
2✔
192

193
        if line.trim().is_empty() {
2✔
194
            continue;
×
195
        }
2✔
196

197
        let msg: PluginToHost = serde_json::from_str(&line)
2✔
198
            .map_err(|e| cmd::Error::from(format!("invalid plugin message: {e}: {line}")))?;
2✔
199

200
        trace!(?msg, "Received plugin message.");
2✔
201

202
        let mut writer = stdin.lock().expect("stdin lock poisoned");
2✔
203

204
        match msg {
2✔
205
            PluginToHost::Ready => {
206
                debug!("Plugin signaled ready.");
1✔
207
            }
208

209
            PluginToHost::ListConversations(req) => {
×
210
                let response = handle_list_conversations(workspace, req.id);
×
211
                write_message(&mut *writer, &response)?;
×
212
            }
213

214
            PluginToHost::ReadEvents(req) => {
×
215
                let response = handle_read_events(workspace, &req.conversation, req.id);
×
216
                write_message(&mut *writer, &response)?;
×
217
            }
218

219
            PluginToHost::ReadConfig(req) => {
×
220
                let response = handle_read_config(config_json, req.path, req.id);
×
221
                write_message(&mut *writer, &response)?;
×
222
            }
223

224
            PluginToHost::Print(print) => {
×
225
                // In Phase 1, write to stdout directly. Full printer
×
226
                // integration comes later when we thread through &Printer.
×
227
                let stdout = std::io::stdout();
×
228
                let mut handle = stdout.lock();
×
229
                drop(handle.write_all(print.text.as_bytes()));
×
230
                drop(handle.flush());
×
231
            }
×
232

233
            PluginToHost::Log(log) => {
×
234
                emit_log(&log);
×
235
            }
×
236

237
            PluginToHost::Describe(_) => {
238
                debug!("Ignoring describe in message loop.");
×
239
            }
240

241
            PluginToHost::Exit(exit) => {
1✔
242
                debug!(code = exit.code, "Plugin exited.");
1✔
243
                if exit.code == 0 {
1✔
244
                    return Ok(());
1✔
245
                }
×
246
                return match exit.reason {
×
247
                    Some(reason) => Err(cmd::Error::from((exit.code, reason))),
×
248
                    None => Err(cmd::Error::from(exit.code)),
×
249
                };
250
            }
251
        }
252
    }
253

254
    // Plugin's stdout closed without an `exit` message. If we sent a
255
    // shutdown, this is expected (the child exited after receiving it).
256
    if shutdown_sent.load(Ordering::Acquire) {
×
257
        debug!("Plugin exited after shutdown.");
×
258
        return Ok(());
×
259
    }
×
260

261
    error!("Plugin exited without sending exit message.");
×
262
    Err(cmd::Error::from((
×
263
        1u8,
×
264
        "plugin exited unexpectedly without sending exit message",
×
265
    )))
×
266
}
1✔
267

268
fn handle_list_conversations(workspace: &Workspace, req_id: Option<String>) -> HostToPlugin {
×
269
    let data: Vec<ConversationSummary> = workspace
×
270
        .conversations()
×
271
        .map(|(id, meta)| ConversationSummary {
×
272
            id: id.as_deciseconds().to_string(),
×
273
            title: meta.title.clone(),
×
274
            last_activated_at: meta.last_activated_at,
×
275
            events_count: meta.events_count,
×
276
        })
×
277
        .collect();
×
278

279
    HostToPlugin::Conversations(ConversationsResponse { id: req_id, data })
×
280
}
×
281

282
fn handle_read_events(
×
283
    workspace: &Workspace,
×
284
    conversation_id: &str,
×
285
    req_id: Option<String>,
×
286
) -> HostToPlugin {
×
287
    let conv_id = match jp_conversation::ConversationId::try_from_deciseconds_str(conversation_id) {
×
288
        Ok(id) => id,
×
289
        Err(e) => {
×
290
            return HostToPlugin::Error(ErrorResponse {
×
291
                id: req_id,
×
292
                request: Some("read_events".to_owned()),
×
293
                message: format!("invalid conversation ID: {e}"),
×
294
            });
×
295
        }
296
    };
297

298
    let handle = match workspace.acquire_conversation(&conv_id) {
×
299
        Ok(h) => h,
×
300
        Err(e) => {
×
301
            return HostToPlugin::Error(ErrorResponse {
×
302
                id: req_id,
×
303
                request: Some("read_events".to_owned()),
×
304
                message: format!("conversation not found: {e}"),
×
305
            });
×
306
        }
307
    };
308

309
    let events = match workspace.events(&handle) {
×
310
        Ok(stream) => stream,
×
311
        Err(e) => {
×
312
            return HostToPlugin::Error(ErrorResponse {
×
313
                id: req_id,
×
314
                request: Some("read_events".to_owned()),
×
315
                message: format!("failed to load events: {e}"),
×
316
            });
×
317
        }
318
    };
319

320
    // Serialize events to JSON values, then decode base64-encoded storage
321
    // fields so plugins receive plain text.
322
    let (_, mut event_values) = match events.to_parts() {
×
323
        Ok(parts) => parts,
×
324
        Err(e) => {
×
325
            return HostToPlugin::Error(ErrorResponse {
×
326
                id: req_id,
×
327
                request: Some("read_events".to_owned()),
×
328
                message: format!("failed to serialize events: {e}"),
×
329
            });
×
330
        }
331
    };
332

333
    for value in &mut event_values {
×
334
        jp_conversation::decode_event_value(value);
×
335
    }
×
336

337
    HostToPlugin::Events(EventsResponse {
×
338
        id: req_id,
×
339
        conversation: conversation_id.to_owned(),
×
340
        data: event_values,
×
341
    })
×
342
}
×
343

344
fn handle_read_config(
3✔
345
    config_json: &Value,
3✔
346
    path: Option<String>,
3✔
347
    req_id: Option<String>,
3✔
348
) -> HostToPlugin {
3✔
349
    let data = match &path {
3✔
350
        Some(path) => {
2✔
351
            let mut current = config_json;
2✔
352
            for segment in path.split('.') {
3✔
353
                match current.get(segment) {
3✔
354
                    Some(v) => current = v,
2✔
355
                    None => {
356
                        return HostToPlugin::Error(ErrorResponse {
1✔
357
                            id: req_id,
1✔
358
                            request: Some("read_config".to_owned()),
1✔
359
                            message: format!("config path not found: {path}"),
1✔
360
                        });
1✔
361
                    }
362
                }
363
            }
364
            current.clone()
1✔
365
        }
366
        None => config_json.clone(),
1✔
367
    };
368

369
    HostToPlugin::Config(ConfigResponse {
2✔
370
        id: req_id,
2✔
371
        path,
2✔
372
        data,
2✔
373
    })
2✔
374
}
3✔
375

376
fn emit_log(log: &LogMessage) {
×
377
    match log.level.as_str() {
×
378
        "trace" => trace!(target: "plugin", message = %log.message),
×
379
        "debug" => debug!(target: "plugin", message = %log.message),
×
380
        "info" => tracing::info!(target: "plugin", message = %log.message),
×
381
        "warn" => warn!(target: "plugin", message = %log.message),
×
382
        "error" => error!(target: "plugin", message = %log.message),
×
383
        _ => {
384
            warn!(target: "plugin", level = %log.level, message = %log.message, "unknown log level");
×
385
        }
386
    }
387
}
×
388

389
fn write_message(writer: &mut impl Write, msg: &HostToPlugin) -> Result<(), cmd::Error> {
×
390
    let json = serde_json::to_string(msg)
×
391
        .map_err(|e| cmd::Error::from(format!("failed to serialize message: {e}")))?;
×
392
    writeln!(writer, "{json}")
×
393
        .map_err(|e| cmd::Error::from(format!("failed to write to plugin stdin: {e}")))?;
×
394
    writer
×
395
        .flush()
×
396
        .map_err(|e| cmd::Error::from(format!("failed to flush plugin stdin: {e}")))?;
×
397
    Ok(())
×
398
}
×
399

400
/// Check if a process is still alive by PID.
401
#[cfg(unix)]
402
fn is_process_alive(pid: u32) -> bool {
×
403
    // kill with signal 0 checks existence without sending a signal.
404
    unsafe { libc::kill(libc::pid_t::from(pid.cast_signed()), 0) == 0 }
×
405
}
×
406

407
#[cfg(windows)]
408
fn is_process_alive(pid: u32) -> bool {
409
    use windows_sys::Win32::{
410
        Foundation::{CloseHandle, STILL_ACTIVE},
411
        System::Threading::{GetExitCodeProcess, OpenProcess, PROCESS_QUERY_LIMITED_INFORMATION},
412
    };
413

414
    unsafe {
415
        let handle = OpenProcess(PROCESS_QUERY_LIMITED_INFORMATION, 0, pid);
416
        if handle.is_null() {
417
            return false;
418
        }
419
        let mut exit_code: u32 = 0;
420
        let ok = GetExitCodeProcess(handle, &mut exit_code);
421
        CloseHandle(handle);
422
        ok != 0 && (exit_code as i32) == STILL_ACTIVE
423
    }
424
}
425

426
/// Send SIGKILL to a child process by PID.
427
///
428
/// Used as a last resort when the plugin doesn't exit within the grace period
429
/// after receiving `Shutdown`.
430
#[cfg(unix)]
431
fn kill_child(pid: u32) {
×
432
    // SAFETY: We're sending a signal to a process we spawned.
433
    unsafe {
×
434
        libc::kill(libc::pid_t::from(pid.cast_signed()), libc::SIGKILL);
×
435
    }
×
436
    debug!(pid, "Sent SIGKILL to plugin after grace period.");
×
437
}
×
438

439
#[cfg(windows)]
440
fn kill_child(pid: u32) {
441
    use windows_sys::Win32::{
442
        Foundation::CloseHandle,
443
        System::Threading::{OpenProcess, PROCESS_TERMINATE, TerminateProcess},
444
    };
445

446
    // SAFETY: We're terminating a process we spawned.
447
    unsafe {
448
        let handle = OpenProcess(PROCESS_TERMINATE, 0, pid);
449
        if !handle.is_null() {
450
            TerminateProcess(handle, 1);
451
            CloseHandle(handle);
452
        }
453
    }
454
    debug!(pid, "Sent TerminateProcess to plugin after grace period.");
455
}
456

457
/// Search `$PATH` for a plugin binary matching the given subcommand segments.
458
///
459
/// For `["serve"]`, looks for `jp-serve`.
460
/// For `["conversation", "export"]`, looks for `jp-conversation-export`.
461
pub(crate) fn find_plugin_binary(segments: &[&str]) -> Option<Utf8PathBuf> {
1✔
462
    let name = format!("jp-{}", segments.join("-"));
1✔
463
    which::which(&name)
1✔
464
        .ok()
1✔
465
        .and_then(|p| Utf8PathBuf::from_path_buf(p).ok())
1✔
466
}
1✔
467

468
/// Find any existing plugin binary without downloading or prompting.
469
///
470
/// Checks the install directory first, then `$PATH`.
471
/// Used for non-mutating operations like help requests.
472
pub(crate) fn find_any_plugin_binary(name: &str) -> Option<Utf8PathBuf> {
×
473
    if let Some(path) = registry::find_installed(name) {
×
474
        return Some(path);
×
475
    }
×
476
    let segments: Vec<&str> = name.split('-').collect();
×
477
    find_plugin_binary(&segments)
×
478
}
×
479

480
/// Resolve a plugin binary through multiple sources:
481
///
482
/// 1. User-local install directory (previously installed plugins)
483
/// 2. Plugin registry (auto-install if official, prompt if third-party)
484
/// 3. `$PATH` (with approval check for unapproved plugins)
485
///
486
/// The `plugins_config` drives installation and execution policy.
487
/// Per-plugin settings override the defaults from the registry (official vs
488
/// third-party).
489
pub(crate) async fn resolve_plugin_binary(
×
490
    name: &str,
×
491
    plugins_config: &PluginsConfig,
×
492
    is_tty: bool,
×
493
) -> Result<Option<Utf8PathBuf>, cmd::Error> {
×
494
    let plugin_cfg = plugins_config.command.get(name);
×
495

496
    // Explicit deny in config.
497
    if plugin_cfg.is_some_and(|c| c.run == Some(RunPolicy::Deny)) {
×
498
        return Err(cmd::Error::from(format!(
×
499
            "plugin `{name}` is denied by configuration (plugins.command.{name}.run = \"deny\")"
×
500
        )));
×
501
    }
×
502

503
    // 1. Already installed locally.
504
    if let Some(path) = registry::find_installed(name) {
×
505
        debug!(name, %path, "Found installed plugin.");
×
506
        verify_checksum(name, &path, plugin_cfg)?;
×
507
        return Ok(Some(path));
×
508
    }
×
509

510
    // 2. Check registry.
511
    if let Some(path) = try_registry_install(name, plugins_config, is_tty).await? {
×
512
        return Ok(Some(path));
×
513
    }
×
514

515
    // 3. Check $PATH with run policy.
516
    let segments: Vec<&str> = name.split('-').collect();
×
517
    if let Some(path) = find_plugin_binary(&segments) {
×
518
        check_run_policy(name, &path, plugin_cfg, is_tty)?;
×
519
        return Ok(Some(path));
×
520
    }
×
521

522
    Ok(None)
×
523
}
×
524

525
/// Verify a binary's checksum against the config-pinned value, if any.
526
fn verify_checksum(
×
527
    name: &str,
×
528
    binary_path: &Utf8Path,
×
529
    plugin_cfg: Option<&CommandPluginConfig>,
×
530
) -> Result<(), cmd::Error> {
×
531
    let Some(checksum) = plugin_cfg.and_then(|c| c.checksum.as_ref()) else {
×
532
        return Ok(());
×
533
    };
534

535
    let actual = registry::sha256_file(binary_path)?;
×
536
    if actual != checksum.value {
×
537
        return Err(cmd::Error::from(format!(
×
538
            "plugin `{name}` binary checksum mismatch.\nexpected: {}\nactual:   {actual}\nThe \
×
539
             binary at {binary_path} has changed since it was pinned. Update \
×
540
             plugins.command.{name}.checksum.value in your config to accept the new binary.",
×
541
            checksum.value,
×
542
        )));
×
543
    }
×
544

545
    Ok(())
×
546
}
×
547

548
/// Try to install a plugin from the cached registry.
549
async fn try_registry_install(
×
550
    name: &str,
×
551
    plugins_config: &PluginsConfig,
×
552
    is_tty: bool,
×
553
) -> Result<Option<Utf8PathBuf>, cmd::Error> {
×
554
    let Some(reg) = registry::load_cached() else {
×
555
        return Ok(None);
×
556
    };
557

558
    // Find the registry entry whose `id` matches the requested name.
559
    // In Phase 5, this will use the command path (registry key) for
560
    // multi-segment routing. For now, we match on `id`.
561
    let Some(plugin) = reg.plugins.values().find(|p| p.id == name) else {
×
562
        return Ok(None);
×
563
    };
564

565
    // Only handle command plugins.
566
    let jp_plugin::registry::PluginKind::Command { ref binaries, .. } = plugin.kind else {
×
567
        return Ok(None);
×
568
    };
569

570
    let target = registry::current_target();
×
571
    let Some(binary_info) = binaries.get(&target) else {
×
572
        return Ok(None);
×
573
    };
574

575
    let id = &plugin.id;
×
576
    let plugin_cfg = plugins_config.command.get(id);
×
577

578
    // Check if auto-install is allowed.
579
    let auto_install = plugin_cfg
×
580
        .and_then(|c| c.install)
×
581
        .unwrap_or(plugins_config.auto_install);
×
582

583
    if !auto_install && !plugin.official {
×
584
        return Ok(None);
×
585
    }
×
586

587
    // Determine run policy: config > registry default.
588
    let run_policy = plugin_cfg
×
589
        .and_then(|c| c.run)
×
590
        .unwrap_or(if plugin.official {
×
591
            RunPolicy::Unattended
×
592
        } else {
593
            RunPolicy::Ask
×
594
        });
595

596
    match run_policy {
×
597
        RunPolicy::Deny => {
598
            return Err(cmd::Error::from(format!(
×
599
                "plugin `{id}` is denied by configuration"
×
600
            )));
×
601
        }
602
        RunPolicy::Ask => {
603
            if !is_tty {
×
604
                return Err(cmd::Error::from(format!(
×
605
                    "plugin `{id}` requires approval. Run `jp plugin install {id}` first, or set \
×
606
                     plugins.command.{id}.run = \"unattended\" in config."
×
607
                )));
×
608
            }
×
609

610
            let mut writer = std::io::stderr();
×
611
            drop(writeln!(
×
612
                writer,
×
613
                "  \u{2192} Plugin `{id}` found in registry."
614
            ));
615
            let options = vec![
×
616
                InlineOption::new('y', "install and run"),
×
617
                InlineOption::new('n', "cancel"),
×
618
            ];
619
            let answer = InlineSelect::new("Install and run it?", options)
×
620
                .prompt(&mut writer)
×
621
                .map_err(|e| cmd::Error::from(format!("prompt failed: {e}")))?;
×
622

623
            if answer != 'y' {
×
624
                return Err(cmd::Error::from("plugin execution cancelled"));
×
625
            }
×
626
        }
627
        RunPolicy::Unattended => {}
×
628
    }
629

630
    drop(writeln!(
×
631
        std::io::stderr(),
×
632
        "  \u{2192} Installing jp-{id} for {target}..."
633
    ));
634
    let client = reqwest::Client::new();
×
635
    let data = registry::download_and_verify(&client, binary_info).await?;
×
636

637
    let path = registry::install_binary(id, &data)?;
×
638
    drop(writeln!(
×
639
        std::io::stderr(),
×
640
        "  \u{2192} Installed to {path}",
641
    ));
642

643
    // Verify against pinned checksum if configured.
644
    verify_checksum(id, &path, plugin_cfg)?;
×
645

646
    Ok(Some(path))
×
647
}
×
648

649
/// Check run policy for a `$PATH`-discovered plugin.
650
fn check_run_policy(
×
651
    name: &str,
×
652
    binary_path: &Utf8Path,
×
653
    plugin_cfg: Option<&CommandPluginConfig>,
×
654
    is_tty: bool,
×
655
) -> Result<(), cmd::Error> {
×
656
    // Verify pinned checksum first.
657
    verify_checksum(name, binary_path, plugin_cfg)?;
×
658

659
    let run_policy = plugin_cfg.and_then(|c| c.run).unwrap_or(RunPolicy::Ask);
×
660

661
    match run_policy {
×
662
        RunPolicy::Unattended => Ok(()),
×
663
        RunPolicy::Deny => Err(cmd::Error::from(format!(
×
664
            "plugin `{name}` is denied by configuration"
×
665
        ))),
×
666
        RunPolicy::Ask => {
667
            if !is_tty {
×
668
                return Err(cmd::Error::from(format!(
×
669
                    "plugin `jp-{name}` found on $PATH but requires approval. Set \
×
670
                     plugins.command.{name}.run = \"unattended\" in config, or run `jp {name}` in \
×
671
                     a terminal."
×
672
                )));
×
673
            }
×
674

675
            // Check existing permanent approvals.
676
            if let Some(approvals) = registry::load_approvals()
×
677
                && let Some(approved) = approvals.approved.get(name)
×
678
                && approved.path == binary_path
×
679
                && registry::sha256_file(binary_path).is_ok_and(|sha| sha == approved.sha256)
×
680
            {
681
                debug!(name, %binary_path, "Plugin previously approved.");
×
682
                return Ok(());
×
683
            }
×
684

685
            let mut writer = std::io::stderr();
×
686
            drop(writeln!(
×
687
                writer,
×
688
                "  \u{2192} Found jp-{name} on $PATH ({binary_path})",
689
            ));
690
            let options = vec![
×
691
                InlineOption::new('y', "run this time"),
×
692
                InlineOption::new('Y', "run and remember permanently"),
×
693
                InlineOption::new('n', "deny"),
×
694
            ];
695
            let answer = InlineSelect::new("Run it?", options)
×
696
                .prompt(&mut writer)
×
697
                .map_err(|e| cmd::Error::from(format!("prompt failed: {e}")))?;
×
698

699
            match answer {
×
700
                'y' => Ok(()),
×
701
                'Y' => {
702
                    registry::save_approval(name, binary_path)?;
×
703
                    Ok(())
×
704
                }
705
                _ => Err(cmd::Error::from("plugin execution denied")),
×
706
            }
707
        }
708
    }
709
}
×
710

711
/// Send a `Describe` request to a plugin and return its metadata.
712
///
713
/// Spawns the binary, sends `{"type":"describe"}`, reads one response line, and
714
/// returns the parsed [`DescribeResponse`].
715
/// Returns `None` if the plugin doesn't support describe or fails to respond.
716
pub(crate) fn describe_plugin(binary: &Utf8Path) -> Option<DescribeResponse> {
×
717
    let mut child = Command::new(binary)
×
718
        .stdin(Stdio::piped())
×
719
        .stdout(Stdio::piped())
×
720
        .stderr(Stdio::null())
×
721
        .spawn()
×
722
        .ok()?;
×
723

724
    let mut child_stdin = child.stdin.take()?;
×
725
    let child_stdout = child.stdout.take()?;
×
726

727
    // Send describe request.
728
    let json = serde_json::to_string(&HostToPlugin::Describe).ok()?;
×
729
    writeln!(child_stdin, "{json}").ok()?;
×
730
    child_stdin.flush().ok()?;
×
731
    drop(child_stdin); // Signal no more messages.
×
732

733
    // Read one line response.
734
    let mut reader = BufReader::new(child_stdout);
×
735
    let mut line = String::new();
×
736
    reader.read_line(&mut line).ok()?;
×
737

738
    drop(child.wait());
×
739

740
    if line.trim().is_empty() {
×
741
        return None;
×
742
    }
×
743

744
    let msg: PluginToHost = serde_json::from_str(line.trim()).ok()?;
×
745
    match msg {
×
746
        PluginToHost::Describe(resp) => Some(resp),
×
747
        _ => None,
×
748
    }
749
}
×
750

751
/// Discover plugin binaries on `$PATH` and in the user-local install directory.
752
///
753
/// Returns `(subcommand_name, binary_path)` pairs, sorted by name.
754
/// For a binary named `jp-serve`, the subcommand name is `serve`.
755
/// Installed plugins take priority over `$PATH` duplicates.
756
pub(crate) fn discover_plugins() -> Vec<(String, Utf8PathBuf)> {
×
757
    let path_var = std::env::var_os("PATH").unwrap_or_default();
×
758
    let mut seen = HashSet::new();
×
759
    let mut plugins = Vec::new();
×
760

761
    // Scan install directory first so installed plugins take priority.
762
    if let Some(bin_dir) = registry::bin_dir() {
×
763
        scan_dir_for_plugins(&bin_dir, &mut seen, &mut plugins);
×
764
    }
×
765

766
    for dir in std::env::split_paths(&path_var) {
×
767
        let Some(dir) = Utf8Path::from_path(&dir) else {
×
768
            continue;
×
769
        };
770

771
        scan_dir_for_plugins(dir, &mut seen, &mut plugins);
×
772
    }
773

774
    plugins.sort_by(|a, b| a.0.cmp(&b.0));
×
775
    plugins
×
776
}
×
777

778
fn scan_dir_for_plugins(
×
779
    dir: &Utf8Path,
×
780
    seen: &mut HashSet<String>,
×
781
    plugins: &mut Vec<(String, Utf8PathBuf)>,
×
782
) {
×
783
    let Ok(entries) = dir.read_dir_utf8() else {
×
784
        return;
×
785
    };
786
    for entry in entries.flatten() {
×
787
        let name = entry.file_name();
×
788
        let Some(subcommand) = name.strip_prefix("jp-") else {
×
789
            continue;
×
790
        };
791

792
        // On Windows, strip the .exe extension.
793
        #[cfg(windows)]
794
        let subcommand = subcommand.strip_suffix(".exe").unwrap_or(subcommand);
795

796
        // On Unix, skip non-executable files.
797
        #[cfg(unix)]
798
        {
799
            use std::os::unix::fs::PermissionsExt as _;
800
            let Ok(meta) = entry.metadata() else {
×
801
                continue;
×
802
            };
803
            if meta.permissions().mode() & 0o111 == 0 {
×
804
                continue;
×
805
            }
×
806
        }
807

808
        if seen.insert(subcommand.to_owned()) {
×
809
            plugins.push((subcommand.to_owned(), entry.into_path()));
×
810
        }
×
811
    }
812
}
×
813

814
/// Show a plugin's help text via the `Describe` protocol.
815
pub(crate) fn show_plugin_help(binary: &Utf8Path) -> cmd::Output {
×
816
    match describe_plugin(binary) {
×
817
        Some(desc) => {
×
818
            let mut out = std::io::stdout().lock();
×
819
            if let Some(help) = &desc.help {
×
820
                drop(writeln!(out, "{help}"));
×
821
            } else {
×
822
                drop(writeln!(out, "{}: {}", desc.name, desc.description));
×
823
            }
×
824
            Ok(())
×
825
        }
826
        None => Err(cmd::Error::from("plugin does not support describe")),
×
827
    }
828
}
×
829

830
/// Produce a clap-formatted error for an unknown subcommand.
831
///
832
/// Uses `Command::error()` to get clap's standard error chrome (colored
833
/// `error:` prefix, usage line, help hint).
834
/// The message includes our plugin-specific context.
835
/// Returns exit code 2 (clap's convention for usage errors) with no message,
836
/// since the output was already written.
837
fn unknown_subcommand_error(name: &str) -> cmd::Error {
×
838
    use clap::CommandFactory as _;
839

840
    let mut cmd = crate::Cli::command();
×
841
    let err = cmd.error(
×
842
        clap::error::ErrorKind::InvalidSubcommand,
×
843
        format!(
×
844
            "unrecognized subcommand '{name}'\n\n  No built-in command, registry plugin, or \
845
             `jp-{name}` binary found on $PATH."
846
        ),
847
    );
848
    drop(err.print());
×
849
    cmd::Error::from(2u8)
×
850
}
×
851

852
/// Dispatch an external plugin subcommand.
853
///
854
/// Resolves the plugin binary, then runs the protocol loop.
855
/// Called from `Commands::run()` after the normal startup flow.
856
pub(crate) async fn run_external(args: &[String], ctx: &Ctx) -> cmd::Output {
×
857
    let (subcommand, plugin_args) = args
×
858
        .split_first()
×
859
        .ok_or("no subcommand provided for plugin dispatch")?;
×
860

861
    // Handle help without downloading or approval.
862
    if plugin_args.iter().any(|a| a == "-h" || a == "--help") {
×
863
        let binary = find_any_plugin_binary(subcommand).ok_or_else(|| {
×
864
            cmd::Error::from(format!(
×
865
                "plugin `{subcommand}` not found. No installed plugin or `jp-{subcommand}` binary \
866
                 found on $PATH.",
867
            ))
868
        })?;
×
869
        return show_plugin_help(&binary);
×
870
    }
×
871

872
    let config = ctx.config();
×
873
    let Some(binary) = resolve_plugin_binary(subcommand, &config.plugins, ctx.term.is_tty).await?
×
874
    else {
875
        return Err(unknown_subcommand_error(subcommand));
×
876
    };
877

878
    debug!(%binary, subcommand, "Dispatching to plugin.");
×
879

880
    run_plugin(
×
881
        subcommand,
×
882
        &binary,
×
883
        plugin_args,
×
884
        &ctx.workspace,
×
885
        ctx.storage_path(),
×
886
        ctx.user_storage_path(),
×
887
        &config,
×
888
        &ctx.signals,
×
889
        ctx.term.args.verbose,
×
890
    )?;
×
891
    Ok(())
×
892
}
×
893

894
#[cfg(test)]
895
#[path = "dispatch_tests.rs"]
896
mod tests;
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc