• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dcdpr / jp / 25437138375

06 May 2026 01:07PM UTC coverage: 66.045% (-0.001%) from 66.046%
25437138375

push

github

web-flow
fix(cli, init): Prefer global git identity for name default (#606)

`detect_default_user_name` previously ran `git config --get user.name`
without a scope flag, meaning a repo-local override could be picked up
and written into the user-global JP config, where it would then be
inherited by every future workspace — leaking an unrelated identity.

The cascade now tries `git config --global --get user.name` first, then
falls back to the unscoped lookup for users who only have a repo-local
identity, and finally falls through to the `$USER`/`$USERNAME`
environment variable as before.

The name prompt also switches from `with_default()` to
`with_initial_value()`, so the detected name is pre-filled and editable
rather than silently substituted on empty submission. This lets users
clear the field and submit empty to skip attribution, which the previous
behaviour made impossible without typing a space first.

---------

Signed-off-by: Jean Mertz <git@jeanmertz.com>

0 of 14 new or added lines in 1 file covered. (0.0%)

48 existing lines in 3 files now uncovered.

25519 of 38639 relevant lines covered (66.04%)

238.46 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.55
/crates/jp_cli/src/cmd/query/turn_loop.rs
1
//! Extracted turn loop for testability.
2
//!
3
//! This module contains the core turn loop logic, extracted from `handle_turn`
4
//! to enable integration testing with mock providers.
5

6
use std::{
7
    pin::Pin,
8
    sync::Arc,
9
    task::{Context, Poll},
10
    time::Duration,
11
};
12

13
use camino::Utf8Path;
14
use futures::{Stream, StreamExt as _, future, stream::SelectAll};
15
use indexmap::IndexMap;
16
use jp_attachment::Attachment;
17
use jp_config::{
18
    AppConfig, PartialConfig, assistant::tool_choice::ToolChoice,
19
    conversation::tool::QuestionTarget, model::id::ProviderId, style::streaming::StreamingConfig,
20
};
21
use jp_conversation::{
22
    ConversationStream,
23
    event::{ChatRequest, ToolCallRequest, ToolCallResponse},
24
};
25
use jp_inquire::prompt::PromptBackend;
26
use jp_llm::{
27
    Provider,
28
    error::StreamError,
29
    event::{Event, EventPart, ToolCallPart},
30
    model::ModelDetails,
31
    provider::get_provider,
32
    query::ChatQuery,
33
    tool::{ToolDefinition, executor::Executor},
34
};
35
use jp_printer::Printer;
36
use jp_workspace::{ConversationLock, ConversationMut};
37
use tokio::{sync::mpsc, task::JoinHandle};
38
use tokio_stream::wrappers::{BroadcastStream, ReceiverStream, errors::BroadcastStreamRecvError};
39
use tokio_util::sync::CancellationToken;
40
use tracing::{debug, info, warn};
41

42
use super::{
43
    build_sections, build_thread,
44
    interrupt::{LoopAction, handle_llm_event, handle_streaming_signal},
45
    stream::{StreamRetryState, handle_stream_error},
46
    tool::{
47
        PendingEntry, PendingTools, ToolCallDecision, ToolCallState, ToolCoordinator, ToolPrompter,
48
        ToolRenderer, build_execution_plan,
49
        inquiry::{InquiryBackend, InquiryConfig, LlmInquiryBackend},
50
        spawn_line_timer,
51
    },
52
    turn::{Action, TurnCoordinator, TurnPhase, TurnState},
53
};
54
use crate::{
55
    cmd::query::tool::coordinator::ExecutionResult,
56
    error::Error,
57
    render::metadata::set_rendered_arguments,
58
    signals::{SignalRx, SignalTo},
59
};
60

61
/// Events produced by the merged streaming loop sources.
62
enum StreamingLoopEvent {
63
    /// A signal from the signal handler (e.g. Ctrl+C).
64
    Signal(SignalTo),
65
    /// An event from the LLM provider stream.
66
    Llm(Box<Result<Event, StreamError>>),
67
    /// A tick from the preparing indicator timer, carrying the elapsed
68
    /// time since the timer started.
69
    PreparingTick(Duration),
70
}
71

72
/// Wrapper enum that unifies heterogeneous stream sources for
73
/// [`SelectAll`].
74
///
75
/// Each variant holds a different concrete stream type, but they all
76
/// yield [`StreamingLoopEvent`]. This avoids boxing while allowing
77
/// `select_all` to poll them as a single merged stream.
78
enum StreamSource<S, L, T> {
79
    Signal(S),
80
    Llm(L),
81
    Tick(T),
82
}
83

84
impl<S, L, T> Stream for StreamSource<S, L, T>
85
where
86
    S: Stream<Item = StreamingLoopEvent> + Unpin,
87
    L: Stream<Item = StreamingLoopEvent> + Unpin,
88
    T: Stream<Item = StreamingLoopEvent> + Unpin,
89
{
90
    type Item = StreamingLoopEvent;
91

92
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
315✔
93
        match self.get_mut() {
315✔
94
            Self::Signal(s) => Pin::new(s).poll_next(cx),
58✔
95
            Self::Llm(s) => Pin::new(s).poll_next(cx),
191✔
96
            Self::Tick(s) => Pin::new(s).poll_next(cx),
66✔
97
        }
98
    }
315✔
99
}
100

101
/// Spawns a waiting indicator task that prints elapsed time to the terminal.
102
///
103
/// Returns `None` if the indicator is disabled (not a TTY or config says no).
104
fn spawn_waiting_indicator(
58✔
105
    printer: Arc<Printer>,
58✔
106
    config: &StreamingConfig,
58✔
107
    is_tty: bool,
58✔
108
) -> Option<(CancellationToken, JoinHandle<()>)> {
58✔
109
    if !is_tty {
58✔
110
        return None;
42✔
111
    }
16✔
112

113
    spawn_line_timer(
16✔
114
        printer,
16✔
115
        config.progress.show,
16✔
116
        Duration::from_secs(u64::from(config.progress.delay_secs)),
16✔
117
        Duration::from_millis(u64::from(config.progress.interval_ms)),
16✔
118
        |secs| format!("\r\x1b[K⏱ Waiting… {secs:.1}s"),
5✔
119
    )
120
}
58✔
121

122
/// Runs the turn loop: streaming from LLM, handling signals, executing tools.
123
///
124
/// This is extracted from `handle_turn` to enable integration testing
125
/// without requiring a real LLM provider. The function handles the complete
126
/// turn lifecycle:
127
///
128
/// 1. Streaming LLM responses
129
/// 2. Handling user interrupts (Ctrl+C)
130
/// 3. Executing tool calls
131
/// 4. Persisting conversation state
132
///
133
/// # Errors
134
///
135
/// Returns an error if:
136
/// - LLM streaming fails with a non-retryable error
137
/// - Tool execution fails critically
138
/// - Workspace persistence fails
139
#[expect(clippy::too_many_lines, clippy::too_many_arguments)]
140
pub(super) async fn run_turn_loop(
34✔
141
    provider: Arc<dyn Provider>,
34✔
142
    model: &ModelDetails,
34✔
143
    cfg: &AppConfig,
34✔
144
    signals: &SignalRx,
34✔
145
    mcp_client: &jp_mcp::Client,
34✔
146
    root: &Utf8Path,
34✔
147
    is_tty: bool,
34✔
148
    attachments: &[Attachment],
34✔
149
    lock: &ConversationLock,
34✔
150
    mut tool_choice: ToolChoice,
34✔
151
    tools: &[ToolDefinition],
34✔
152
    printer: Arc<Printer>,
34✔
153
    prompt_backend: Arc<dyn PromptBackend>,
34✔
154
    mut tool_coordinator: ToolCoordinator,
34✔
155
    chat_request: ChatRequest,
34✔
156
) -> Result<(), Error> {
34✔
157
    let mut turn_state = TurnState::default();
34✔
158
    let mut stream_retry = StreamRetryState::new(cfg.assistant.request, is_tty);
34✔
159
    let mut turn_coordinator = TurnCoordinator::new(
34✔
160
        printer.clone(),
34✔
161
        cfg.style.clone(),
34✔
162
        cfg.user.name.clone(),
34✔
163
        cfg.assistant.name.clone(),
34✔
164
        Some(model.id.to_string()),
34✔
165
    );
166
    let mut tool_renderer = ToolRenderer::new(
34✔
167
        if cfg.style.tool_call.show && !printer.format().is_json() {
34✔
168
            printer.clone()
34✔
169
        } else {
170
            Printer::sink().into()
×
171
        },
172
        cfg.style.clone(),
34✔
173
        root.to_path_buf(),
34✔
174
        is_tty,
34✔
175
    );
176

177
    let inquiry_backend: Arc<dyn InquiryBackend> = build_inquiry_backend(
34✔
178
        cfg,
34✔
179
        tools.to_vec(),
34✔
180
        model.clone(),
34✔
181
        provider.clone(),
34✔
182
        attachments.to_vec(),
34✔
183
    )
34✔
184
    .await?;
34✔
185

186
    info!(model = model.name(), "Starting conversation turn.");
34✔
187

188
    // Set when an executing phase aborts via user-initiated restart.
189
    // The next executing phase walks the stream for unresponded requests
190
    // and re-prepares them.
191
    let mut restart_requested = false;
34✔
192

193
    // Id-keyed scratchpad for tool work produced during the streaming
194
    // phase. The executing phase derives an ordered execution plan from
195
    // the conversation stream + this scratchpad via `build_execution_plan`.
196
    // Crucially: there's no public way to enumerate this directly — the
197
    // stream is the source of truth for "what needs to run."
198
    let mut pending_tools = PendingTools::new();
34✔
199

200
    // Prompter shared between streaming (permission prompts) and
201
    // executing (tool question prompts) phases.
202
    let prompter = Arc::new(ToolPrompter::with_prompt_backend(
34✔
203
        printer.clone(),
34✔
204
        cfg.editor.path(),
34✔
205
        prompt_backend.clone(),
34✔
206
    ));
207

208
    loop {
209
        match turn_coordinator.current_phase() {
148✔
210
            TurnPhase::Idle => {
211
                lock.as_mut().update_events(|stream| {
34✔
212
                    turn_coordinator.start_turn(stream, chat_request.clone());
34✔
213
                });
34✔
214
            }
215

216
            TurnPhase::Complete | TurnPhase::Aborted => return Ok(()),
34✔
217

218
            TurnPhase::Streaming => {
219
                // Restore structural invariants before each provider request.
220
                // Specifically: any `ToolCallRequest` without a matching
221
                // `ToolCallResponse` gets a synthetic error response. Without
222
                // this, providers like Anthropic reject the request with
223
                // `tool_use ids were found without tool_result blocks`. The
224
                // top-level `query.rs` sanitize covers the first cycle; this
225
                // call covers every subsequent cycle within the turn so a
226
                // mid-turn corruption never reaches the wire.
227
                lock.as_mut().update_events(ConversationStream::sanitize);
58✔
228

229
                // Rebuild thread from workspace events to ensure latest context.
230
                let events_stream = lock.events().clone();
58✔
231

232
                let thread = build_thread(
58✔
233
                    events_stream,
58✔
234
                    attachments.to_vec(),
58✔
235
                    &cfg.assistant,
58✔
236
                    !tools.is_empty(),
58✔
237
                )?;
×
238

239
                let query = ChatQuery {
58✔
240
                    thread,
58✔
241
                    tools: tools.to_vec(),
58✔
242
                    tool_choice: tool_choice.clone(),
58✔
243
                };
58✔
244

245
                // Start waiting indicator BEFORE the HTTP request. The drop
246
                // guard ensures the indicator is cancelled if we exit early
247
                // (error from run_cycle, break, return).
248
                let waiting =
58✔
249
                    spawn_waiting_indicator(printer.clone(), &cfg.style.streaming, is_tty);
58✔
250
                let (waiting_token, mut waiting_handle) = match waiting {
58✔
251
                    Some((token, handle)) => (Some(token), Some(handle)),
15✔
252
                    None => (None, None),
43✔
253
                };
254
                let _waiting_guard = waiting_token
58✔
255
                    .as_ref()
58✔
256
                    .map(CancellationToken::drop_guard_ref);
58✔
257

258
                // Build the three event sources for the streaming loop.
259
                let sig_stream = StreamSource::Signal(
58✔
260
                    BroadcastStream::new(signals.resubscribe()).filter_map(|result| {
58✔
261
                        future::ready(match result {
×
262
                            Ok(signal) => Some(StreamingLoopEvent::Signal(signal)),
×
263
                            Err(BroadcastStreamRecvError::Lagged(n)) => {
×
264
                                warn!("Missed {n} signals due to receiver lag");
×
265
                                None
×
266
                            }
267
                        })
268
                    }),
×
269
                );
270

271
                let llm_stream = StreamSource::Llm(
58✔
272
                    provider
58✔
273
                        .chat_completion_stream(model, query)
58✔
274
                        .await
58✔
275
                        .map_err(|e| map_llm_error(e, vec![]))?
58✔
276
                        .fuse()
58✔
277
                        .map(|result| StreamingLoopEvent::Llm(Box::new(result))),
190✔
278
                );
279
                turn_state.request_count += 1;
58✔
280

281
                // Reset preparing display for this streaming cycle.
282
                tool_renderer.reset();
58✔
283

284
                // Channel for preparing ticks. The sender is passed to
285
                // PreparingDisplay which spawns a timer task. The receiver is
286
                // merged into the event loop via SelectAll.
287
                let (tick_tx, tick_rx) = mpsc::channel::<Duration>(1);
58✔
288
                let tick_stream = StreamSource::Tick(
58✔
289
                    ReceiverStream::new(tick_rx).map(StreamingLoopEvent::PreparingTick),
58✔
290
                );
58✔
291

292
                // Whether we've seen at least one provider event this cycle
293
                // — used to reset the retry budget on the first successful
294
                // event.
295
                let mut received_provider_event = false;
58✔
296

297
                let mut streams: SelectAll<_> =
58✔
298
                    SelectAll::from_iter([sig_stream, llm_stream, tick_stream]);
58✔
299

300
                let mut conv = lock.as_mut();
58✔
301

302
                while let Some(event) = streams.next().await {
194✔
303
                    // Cancel and await the waiting indicator on the first
304
                    // event, ensuring its cleanup (line clear) completes before
305
                    // we render any content.
306
                    if let Some(handle) = waiting_handle.take() {
194✔
307
                        if let Some(token) = &waiting_token {
15✔
308
                            token.cancel();
15✔
309
                        }
15✔
310
                        drop(handle.await);
15✔
311
                    }
179✔
312

313
                    match event {
194✔
314
                        StreamingLoopEvent::Signal(signal) => {
×
315
                            // Clear the preparing display before showing the
316
                            // interrupt menu to avoid visual conflicts.
317
                            tool_renderer.clear_temp_line();
×
318

319
                            let llm_alive =
×
320
                                streams.iter().any(|s| matches!(s, StreamSource::Llm(_)));
×
321

322
                            let action = conv.update_events(|stream| {
×
323
                                handle_streaming_signal(
×
324
                                    signal,
×
325
                                    &mut turn_coordinator,
×
326
                                    stream,
×
327
                                    &printer,
×
328
                                    prompt_backend.as_ref(),
×
329
                                    !llm_alive,
×
330
                                )
331
                            });
×
332
                            match action {
×
333
                                LoopAction::Continue => {}
×
334
                                LoopAction::Break => break,
×
335
                                LoopAction::Return(()) => return Ok(()),
×
336
                            }
337
                        }
338

339
                        StreamingLoopEvent::Llm(event) => {
190✔
340
                            let event = *event;
190✔
341

342
                            // Stream errors are handled by the unified retry
343
                            // logic — the single source of truth for retries.
344
                            let event = match event {
190✔
345
                                Ok(event) => event,
188✔
346
                                Err(e) => {
2✔
347
                                    tool_renderer.cancel_all();
2✔
348

349
                                    match handle_stream_error(
2✔
350
                                        e,
2✔
351
                                        &mut stream_retry,
2✔
352
                                        &mut turn_coordinator,
2✔
353
                                        &conv,
2✔
354
                                        &printer,
2✔
355
                                    )
356
                                    .await
2✔
357
                                    {
358
                                        LoopAction::Break => break,
2✔
359
                                        LoopAction::Return(result) => return result,
×
360
                                        LoopAction::Continue => continue,
×
361
                                    }
362
                                }
363
                            };
364

365
                            // Reset the retry counter on the first successful
366
                            // event in this cycle. This ensures that partially
367
                            // successful streams (rate-limited mid-response)
368
                            // don't permanently consume the retry budget.
369
                            if !received_provider_event {
188✔
370
                                received_provider_event = true;
58✔
371
                                stream_retry.clear_line(&printer);
58✔
372
                                stream_retry.reset();
58✔
373
                            }
130✔
374

375
                            // Register preparing tool calls. Flush the markdown
376
                            // buffer first so buffered text appears before the
377
                            // "Calling tool" line (fixes Issue 1).
378
                            if let Event::Part {
379
                                part: EventPart::ToolCall(ToolCallPart::Start { id, name }),
29✔
380
                                ..
381
                            } = &event
32✔
382
                            {
29✔
383
                                turn_coordinator.flush_renderer();
29✔
384
                                turn_coordinator.transition_to_tool_call();
29✔
385

29✔
386
                                tool_renderer.register(id, name, &tick_tx);
29✔
387
                                tool_coordinator
29✔
388
                                    .set_tool_state(id, ToolCallState::ReceivingArguments {
29✔
389
                                        name: name.clone(),
29✔
390
                                    });
29✔
391
                            }
159✔
392

393
                            let is_flush = matches!(event, Event::Flush { .. });
188✔
394
                            let is_finished = matches!(event, Event::Finished(_));
188✔
395

396
                            let action = conv.update_events(|stream| {
188✔
397
                                handle_llm_event(event, &mut turn_coordinator, stream)
188✔
398
                            });
188✔
399
                            match action {
188✔
400
                                LoopAction::Continue => {}
132✔
401
                                LoopAction::Break => break,
56✔
402
                                LoopAction::Return(()) => return Ok(()),
×
403
                            }
404

405
                            // On Flush of a tool call: clear the temp line,
406
                            // prepare the executor, decide permission, then
407
                            // render the tool call header + arguments. For
408
                            // attended tools the permission prompt comes first,
409
                            // so the user approves before seeing the full
410
                            // rendering.
411
                            let flushed_req = is_flush
132✔
412
                                .then(|| {
132✔
413
                                    conv.events()
63✔
414
                                        .last()
63✔
415
                                        .as_ref()
63✔
416
                                        .and_then(|e| e.as_tool_call_request())
63✔
417
                                        .cloned()
63✔
418
                                })
63✔
419
                                .flatten();
132✔
420
                            if let Some(req) = flushed_req {
132✔
421
                                tool_coordinator.set_tool_state(&req.id, ToolCallState::Queued);
28✔
422
                                tool_renderer.complete(&req.id);
28✔
423

424
                                match tool_coordinator.prepare_one(req.clone()) {
28✔
425
                                    Ok(executor) => {
18✔
426
                                        // Run the unified per-tool permission
427
                                        // pipeline. The await blocks the
428
                                        // streaming event loop while the user
429
                                        // decides; LLM events buffer in the
430
                                        // channel and are processed after.
431
                                        let decision = tool_coordinator
18✔
432
                                            .resolve_tool_call_decision(
18✔
433
                                                executor,
18✔
434
                                                &prompter,
18✔
435
                                                mcp_client,
18✔
436
                                                is_tty,
18✔
437
                                                &mut turn_state,
18✔
438
                                                &tool_renderer,
18✔
439
                                            )
18✔
440
                                            .await;
18✔
441

442
                                        match decision {
18✔
443
                                            ToolCallDecision::Approved {
444
                                                executor,
16✔
445
                                                rendered_arguments,
16✔
446
                                            } => {
447
                                                if let Some(content) = rendered_arguments {
16✔
448
                                                    conv.update_events(|stream| {
×
449
                                                        store_rendered_arguments(
×
450
                                                            stream, &req.id, &content,
×
451
                                                        );
452
                                                    });
×
453
                                                }
16✔
454
                                                pending_tools
16✔
455
                                                    .insert_approved(req.id.clone(), executor);
16✔
456
                                            }
457
                                            ToolCallDecision::Skipped(resp)
2✔
458
                                            | ToolCallDecision::Failed(resp) => {
2✔
459
                                                pending_tools.insert_resolved(req.id.clone(), resp);
2✔
460
                                            }
2✔
461
                                        }
462
                                    }
463
                                    Err(resp) => {
10✔
464
                                        pending_tools.insert_resolved(req.id.clone(), resp);
10✔
465
                                    }
10✔
466
                                }
467
                            }
104✔
468

469
                            if is_finished {
132✔
470
                                tool_renderer.cancel_all();
×
471
                            }
132✔
472
                        }
473

474
                        StreamingLoopEvent::PreparingTick(elapsed) => {
4✔
475
                            tool_renderer.tick(elapsed);
4✔
476
                        }
4✔
477
                    }
478
                }
479

480
                // Clean up any preparing state on early loop exit.
481
                tool_renderer.cancel_all();
58✔
482

483
                conv.flush()?;
58✔
484
            }
485

486
            TurnPhase::Executing => {
487
                // On restart: walk the stream for unresponded tool-call
488
                // requests in the current turn and re-prepare them into
489
                // `pending_tools` via the existing batch APIs. From there
490
                // the unified executing path below picks up.
491
                //
492
                // The streaming and restart prep flows are still two
493
                // separate codepaths today; both converge on
494
                // `pending_tools` and `build_execution_plan`, which is the
495
                // load-bearing invariant for this refactor.
496
                if restart_requested {
22✔
UNCOV
497
                    restart_requested = false;
×
498

UNCOV
499
                    let restart_calls: Vec<ToolCallRequest> = lock
×
500
                        .events()
×
501
                        .iter_turns()
×
502
                        .next_back()
×
503
                        .map(|t| {
×
504
                            t.iter()
×
505
                                .filter_map(|e| e.event.as_tool_call_request())
×
506
                                .filter(|req| {
×
507
                                    lock.events().find_tool_call_response(&req.id).is_none()
×
508
                                })
×
509
                                .cloned()
×
510
                                .collect()
×
511
                        })
×
512
                        .unwrap_or_default();
×
513

UNCOV
514
                    if restart_calls.is_empty() {
×
515
                        break;
×
516
                    }
×
517

UNCOV
518
                    let unavailable = tool_coordinator.prepare(restart_calls);
×
519
                    let restart_prompter = ToolPrompter::with_prompt_backend(
×
520
                        printer.clone(),
×
521
                        cfg.editor.path(),
×
522
                        prompt_backend.clone(),
×
523
                    );
UNCOV
524
                    let (executors, skipped) = tool_coordinator
×
525
                        .run_permission_phase(
×
526
                            &restart_prompter,
×
527
                            mcp_client,
×
528
                            is_tty,
×
529
                            &mut turn_state,
×
530
                            &tool_renderer,
×
531
                        )
UNCOV
532
                        .await;
×
533

UNCOV
534
                    for (_idx, exec) in executors {
×
535
                        let id = exec.tool_id().to_owned();
×
536
                        pending_tools.insert_approved(id, exec);
×
537
                    }
×
538
                    for (_idx, resp) in skipped {
×
539
                        pending_tools.insert_resolved(resp.id.clone(), resp);
×
540
                    }
×
541
                    for (_idx, resp) in unavailable {
×
542
                        pending_tools.insert_resolved(resp.id.clone(), resp);
×
543
                    }
×
544
                }
22✔
545

546
                // Unified executing path: derive the work to do by walking
547
                // the conversation stream and reconciling against
548
                // `pending_tools`. The stream is the source of truth.
549
                let mut conv = lock.as_mut();
22✔
550
                let plan = build_execution_plan(&conv.events(), &mut pending_tools);
22✔
551

552
                if plan.is_empty() {
22✔
UNCOV
553
                    break;
×
554
                }
22✔
555

556
                let (items, orphaned) = plan.into_parts();
22✔
557

558
                let mut approved: Vec<(usize, Box<dyn Executor>)> = Vec::new();
22✔
559
                let mut pre_resolved: Vec<(usize, ToolCallResponse)> = Vec::new();
22✔
560
                for item in items {
28✔
561
                    match item.work {
28✔
562
                        PendingEntry::Approved(exec) => approved.push((item.index, exec)),
16✔
563
                        PendingEntry::Resolved(resp) => pre_resolved.push((item.index, resp)),
12✔
564
                    }
565
                }
566

567
                // Orphans: a `ToolCallRequest` in the stream's current turn
568
                // without a matching pending entry. Should never happen in
569
                // correct operation — every flushed request goes through
570
                // the prep flow which writes to `pending_tools`. Synthesize
571
                // an error response so the conversation stays valid (every
572
                // request must have a response before the next provider
573
                // call) and surface the inconsistency.
574
                for (idx, req) in orphaned {
22✔
UNCOV
575
                    warn!(
×
576
                        id = %req.id,
577
                        name = %req.name,
578
                        "ToolCallRequest in stream without a pending entry; synthesizing error \
579
                         response.",
580
                    );
UNCOV
581
                    pre_resolved.push((idx, ToolCallResponse {
×
582
                        id: req.id,
×
583
                        result: Err(
×
584
                            "Tool call had no prepared executor (internal inconsistency).".into(),
×
585
                        ),
×
586
                    }));
×
587
                }
588

589
                tool_coordinator.reset_for_execution();
22✔
590

591
                let execution_result = tool_coordinator
22✔
592
                    .execute_with_prompting(
22✔
593
                        approved,
22✔
594
                        Arc::clone(&prompter),
22✔
595
                        signals.resubscribe(),
22✔
596
                        &mut turn_coordinator,
22✔
597
                        &mut turn_state,
22✔
598
                        &printer,
22✔
599
                        prompt_backend.as_ref(),
22✔
600
                        Arc::clone(&inquiry_backend),
22✔
601
                        &conv,
22✔
602
                        mcp_client,
22✔
603
                        root,
22✔
604
                        &tool_renderer,
22✔
605
                        is_tty,
22✔
606
                    )
22✔
607
                    .await;
22✔
608

609
                if execution_result.restart_requested {
22✔
UNCOV
610
                    restart_requested = true;
×
UNCOV
611
                    continue;
×
612
                }
22✔
613

614
                if commit_tool_responses(
22✔
615
                    execution_result,
22✔
616
                    pre_resolved,
22✔
617
                    &mut tool_coordinator,
22✔
618
                    &mut turn_coordinator,
22✔
619
                    &mut conv,
22✔
620
                )? {
22✔
621
                    tool_choice = ToolChoice::Auto;
22✔
622
                }
22✔
623
            }
624
        }
625
    }
626

UNCOV
627
    Ok(())
×
628
}
34✔
629

630
async fn build_inquiry_backend(
34✔
631
    cfg: &AppConfig,
34✔
632
    tools: Vec<ToolDefinition>,
34✔
633
    model: ModelDetails,
34✔
634
    provider: Arc<dyn Provider>,
34✔
635
    attachments: Vec<Attachment>,
34✔
636
) -> Result<Arc<LlmInquiryBackend>, Error> {
34✔
637
    let sections = build_sections(&cfg.assistant, !tools.is_empty());
34✔
638
    let inquiry_override = &cfg.conversation.inquiry.assistant;
34✔
639

640
    // Use the inquiry system prompt if configured, otherwise fall back to the
641
    // parent assistant's system prompt.
642
    let default_system_prompt = inquiry_override
34✔
643
        .system_prompt
34✔
644
        .clone()
34✔
645
        .or_else(|| cfg.assistant.system_prompt.clone());
34✔
646

647
    // Track providers we've already constructed to avoid duplicates.
648
    let mut providers: IndexMap<ProviderId, Arc<dyn Provider>> = IndexMap::new();
34✔
649

650
    // Build the default InquiryConfig from the global inquiry override
651
    // merged with the parent assistant config.
652
    let default_config = if let Some(inquiry_model_cfg) = inquiry_override.model.as_ref() {
34✔
UNCOV
653
        let inquiry_model_id = inquiry_model_cfg.id.resolved();
×
UNCOV
654
        let inquiry_provider: Arc<dyn Provider> =
×
655
            Arc::from(get_provider(inquiry_model_id.provider, &cfg.providers.llm)?);
×
656
        debug!(model = %inquiry_model_id, "Fetching inquiry model details.");
×
657
        let inquiry_model = inquiry_provider
×
658
            .model_details(&inquiry_model_id.name)
×
659
            .await?;
×
660

661
        if inquiry_model.structured_output == Some(false) {
×
UNCOV
662
            warn!(
×
663
                model = inquiry_model_id.to_string(),
×
664
                "Inquiry model does not support structured output. Inquiry responses may be \
665
                 unreliable.",
666
            );
UNCOV
667
        }
×
668

669
        info!(
×
UNCOV
670
            model = inquiry_model.name(),
×
671
            "Using dedicated model for inquiries."
672
        );
673

UNCOV
674
        providers.insert(inquiry_model_id.provider, Arc::clone(&inquiry_provider));
×
675

676
        InquiryConfig {
×
UNCOV
677
            provider: inquiry_provider,
×
678
            model: inquiry_model,
×
679
            system_prompt: default_system_prompt,
×
680
            sections: sections.clone(),
×
681
        }
×
682
    } else {
683
        providers.insert(model.id.provider, Arc::clone(&provider));
34✔
684

685
        InquiryConfig {
34✔
686
            provider: Arc::clone(&provider),
34✔
687
            model: model.clone(),
34✔
688
            system_prompt: default_system_prompt,
34✔
689
            sections: sections.clone(),
34✔
690
        }
34✔
691
    };
692

693
    let overrides = build_inquiry_overrides(cfg, &default_config, &mut providers).await?;
34✔
694

695
    Ok(Arc::new(LlmInquiryBackend::new(
34✔
696
        default_config,
34✔
697
        overrides,
34✔
698
        attachments,
34✔
699
        tools,
34✔
700
    )))
34✔
701
}
34✔
702

703
/// Walk active tool configs to build per-question [`InquiryConfig`] overrides
704
/// from `QuestionTarget::Assistant(config)` entries that have non-empty config.
705
async fn build_inquiry_overrides(
34✔
706
    cfg: &AppConfig,
34✔
707
    default_config: &InquiryConfig,
34✔
708
    providers: &mut IndexMap<ProviderId, Arc<dyn Provider>>,
34✔
709
) -> Result<IndexMap<(String, String), InquiryConfig>, Error> {
34✔
710
    let mut overrides = IndexMap::new();
34✔
711

712
    for (tool_name, tool_cfg) in cfg.conversation.tools.iter() {
34✔
713
        for (question_id, question_cfg) in tool_cfg.questions() {
22✔
714
            let QuestionTarget::Assistant(ref per_q) = question_cfg.target else {
7✔
UNCOV
715
                continue;
×
716
            };
717
            if PartialConfig::is_empty(per_q.as_ref()) {
7✔
718
                continue;
7✔
UNCOV
719
            }
×
720

721
            // Resolve per-question model (if overridden), falling back to
722
            // the default inquiry config's model.
UNCOV
723
            let has_model_override = !PartialConfig::is_empty(&per_q.model.id);
×
UNCOV
724
            let (inq_provider, inq_model) = if has_model_override {
×
725
                let model_id = per_q
×
726
                    .model
×
727
                    .id
×
728
                    .resolve(&cfg.providers.llm.aliases)
×
729
                    .map_err(|e| Error::CliConfig(e.to_string()))?;
×
730

731
                let prov = if let Some(p) = providers.get(&model_id.provider) {
×
UNCOV
732
                    Arc::clone(p)
×
733
                } else {
734
                    let p: Arc<dyn Provider> =
×
UNCOV
735
                        Arc::from(get_provider(model_id.provider, &cfg.providers.llm)?);
×
736
                    providers.insert(model_id.provider, Arc::clone(&p));
×
737
                    p
×
738
                };
739

UNCOV
740
                let details = prov.model_details(&model_id.name).await?;
×
741

742
                if details.structured_output == Some(false) {
×
UNCOV
743
                    warn!(
×
744
                        tool = %tool_name,
745
                        question = %question_id,
746
                        model = %model_id,
747
                        "Per-question inquiry model does not support structured \
748
                         output. Inquiry responses may be unreliable.",
749
                    );
UNCOV
750
                }
×
751

752
                (prov, details)
×
753
            } else {
754
                (
×
UNCOV
755
                    Arc::clone(&default_config.provider),
×
756
                    default_config.model.clone(),
×
757
                )
×
758
            };
759

760
            // System prompt: per-question -> global inquiry -> main.
UNCOV
761
            let system_prompt = per_q
×
UNCOV
762
                .system_prompt
×
763
                .as_ref()
×
764
                .map(|s| s.to_string())
×
765
                .or_else(|| default_config.system_prompt.clone());
×
766

767
            overrides.insert((tool_name.to_owned(), question_id.clone()), InquiryConfig {
×
UNCOV
768
                provider: inq_provider,
×
769
                model: inq_model,
×
770
                system_prompt,
×
771
                sections: default_config.sections.clone(),
×
772
            });
×
773
        }
774
    }
775

776
    Ok(overrides)
34✔
777
}
34✔
778

779
/// Assemble tool responses from the executor's results plus any pre-resolved
780
/// responses (skipped tools, unavailable tools, orphan synthesizations),
781
/// commit them to the conversation stream, and flush to disk.
782
///
783
/// Returns `true` if a follow-up LLM cycle is needed (i.e. tool responses were
784
/// added and the coordinator wants to continue).
785
fn commit_tool_responses(
22✔
786
    result: ExecutionResult,
22✔
787
    pre_resolved: Vec<(usize, ToolCallResponse)>,
22✔
788
    tool: &mut ToolCoordinator,
22✔
789
    turn: &mut super::turn::TurnCoordinator,
22✔
790
    conv: &mut ConversationMut,
22✔
791
) -> Result<bool, Error> {
22✔
792
    // Persist any rendered custom-argument output accumulated during the
793
    // permission phase into the corresponding ToolCallRequest events.
794
    flush_rendered_arguments(tool, conv);
22✔
795

796
    // Both `result.responses` and `pre_resolved` are already keyed by the
797
    // plan index assigned in `build_execution_plan`. Sorting by that
798
    // index restores stream order for the persisted responses.
799
    let mut indexed: Vec<(usize, ToolCallResponse)> = result.responses;
22✔
800
    indexed.extend(pre_resolved);
22✔
801
    indexed.sort_by_key(|(idx, _)| *idx);
22✔
802
    let responses: Vec<_> = indexed.into_iter().map(|(_, r)| r).collect();
22✔
803

804
    let action = conv.update_events(|stream| turn.handle_tool_responses(stream, responses));
22✔
805
    conv.flush()?;
22✔
806

807
    Ok(matches!(action, Action::SendFollowUp))
22✔
808
}
22✔
809

810
/// Write a single rendered-arguments entry into event metadata.
UNCOV
811
fn store_rendered_arguments(stream: &mut ConversationStream, tool_call_id: &str, content: &str) {
×
UNCOV
812
    for event in stream.iter_mut() {
×
813
        let is_match = event
×
814
            .event
×
815
            .as_tool_call_request()
×
816
            .is_some_and(|r| r.id == tool_call_id);
×
817
        if is_match {
×
818
            set_rendered_arguments(event.event, content);
×
819
            return;
×
820
        }
×
821
    }
822
}
×
823

824
/// Drain accumulated rendered arguments from the coordinator and write them
825
/// into the corresponding `ToolCallRequest` events in the stream.
826
fn flush_rendered_arguments(coordinator: &mut ToolCoordinator, conv: &mut ConversationMut) {
22✔
827
    let rendered = coordinator.drain_rendered_arguments();
22✔
828
    if rendered.is_empty() {
22✔
829
        return;
22✔
UNCOV
830
    }
×
UNCOV
831
    conv.update_events(|stream| {
×
832
        for (tool_call_id, content) in &rendered {
×
833
            store_rendered_arguments(stream, tool_call_id, content);
×
834
        }
×
835
    });
×
836
}
22✔
837

UNCOV
838
fn map_llm_error(error: jp_llm::Error, models: Vec<ModelDetails>) -> Error {
×
UNCOV
839
    match error {
×
840
        jp_llm::Error::UnknownModel(model) => Error::UnknownModel {
×
841
            model,
×
842
            available: models.into_iter().map(|v| v.id.name.to_string()).collect(),
×
843
        },
844
        _ => error.into(),
×
845
    }
846
}
×
847

848
#[cfg(test)]
849
#[path = "turn_loop_tests.rs"]
850
mod tests;
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc