• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dcdpr / jp / 25394028458

05 May 2026 06:14PM UTC coverage: 64.676% (+0.4%) from 64.291%
25394028458

Pull #599

github

web-flow
Merge ff5d47690 into e8f4a64fc
Pull Request #599: feat(cli, render): Role-aware turn headers and unified rendering pipeline

272 of 324 new or added lines in 10 files covered. (83.95%)

4 existing lines in 4 files now uncovered.

24862 of 38441 relevant lines covered (64.68%)

175.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

65.41
/crates/jp_cli/src/cmd/query/turn_loop.rs
1
//! Extracted turn loop for testability.
2
//!
3
//! This module contains the core turn loop logic, extracted from `handle_turn`
4
//! to enable integration testing with mock providers.
5

6
use std::{
7
    pin::Pin,
8
    sync::Arc,
9
    task::{Context, Poll},
10
    time::Duration,
11
};
12

13
use camino::Utf8Path;
14
use futures::{Stream, StreamExt as _, future, stream::SelectAll};
15
use indexmap::IndexMap;
16
use jp_attachment::Attachment;
17
use jp_config::{
18
    AppConfig, PartialConfig, assistant::tool_choice::ToolChoice,
19
    conversation::tool::QuestionTarget, model::id::ProviderId, style::streaming::StreamingConfig,
20
};
21
use jp_conversation::{
22
    ConversationStream,
23
    event::{ChatRequest, ToolCallRequest, ToolCallResponse},
24
};
25
use jp_inquire::prompt::PromptBackend;
26
use jp_llm::{
27
    Provider,
28
    error::StreamError,
29
    event::{Event, EventPart, ToolCallPart},
30
    model::ModelDetails,
31
    provider::get_provider,
32
    query::ChatQuery,
33
    tool::{ToolDefinition, executor::Executor},
34
};
35
use jp_printer::Printer;
36
use jp_workspace::{ConversationLock, ConversationMut};
37
use tokio::{sync::mpsc, task::JoinHandle};
38
use tokio_stream::wrappers::{BroadcastStream, ReceiverStream, errors::BroadcastStreamRecvError};
39
use tokio_util::sync::CancellationToken;
40
use tracing::{debug, info, warn};
41

42
use super::{
43
    build_sections, build_thread,
44
    interrupt::{LoopAction, handle_llm_event, handle_streaming_signal},
45
    stream::{StreamRetryState, handle_stream_error},
46
    tool::{
47
        PendingEntry, PendingTools, ToolCallDecision, ToolCallState, ToolCoordinator, ToolPrompter,
48
        ToolRenderer, build_execution_plan,
49
        inquiry::{InquiryBackend, InquiryConfig, LlmInquiryBackend},
50
        spawn_line_timer,
51
    },
52
    turn::{Action, TurnCoordinator, TurnPhase, TurnState},
53
};
54
use crate::{
55
    cmd::query::tool::coordinator::ExecutionResult,
56
    error::Error,
57
    render::metadata::set_rendered_arguments,
58
    signals::{SignalRx, SignalTo},
59
};
60

61
/// Events produced by the merged streaming loop sources.
62
enum StreamingLoopEvent {
63
    /// A signal from the signal handler (e.g. Ctrl+C).
64
    Signal(SignalTo),
65
    /// An event from the LLM provider stream.
66
    Llm(Box<Result<Event, StreamError>>),
67
    /// A tick from the preparing indicator timer, carrying the elapsed
68
    /// time since the timer started.
69
    PreparingTick(Duration),
70
}
71

72
/// Wrapper enum that unifies heterogeneous stream sources for
73
/// [`SelectAll`].
74
///
75
/// Each variant holds a different concrete stream type, but they all
76
/// yield [`StreamingLoopEvent`]. This avoids boxing while allowing
77
/// `select_all` to poll them as a single merged stream.
78
enum StreamSource<S, L, T> {
79
    Signal(S),
80
    Llm(L),
81
    Tick(T),
82
}
83

84
impl<S, L, T> Stream for StreamSource<S, L, T>
85
where
86
    S: Stream<Item = StreamingLoopEvent> + Unpin,
87
    L: Stream<Item = StreamingLoopEvent> + Unpin,
88
    T: Stream<Item = StreamingLoopEvent> + Unpin,
89
{
90
    type Item = StreamingLoopEvent;
91

92
    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
305✔
93
        match self.get_mut() {
305✔
94
            Self::Signal(s) => Pin::new(s).poll_next(cx),
56✔
95
            Self::Llm(s) => Pin::new(s).poll_next(cx),
185✔
96
            Self::Tick(s) => Pin::new(s).poll_next(cx),
64✔
97
        }
98
    }
305✔
99
}
100

101
/// Spawns a waiting indicator task that prints elapsed time to the terminal.
102
///
103
/// Returns `None` if the indicator is disabled (not a TTY or config says no).
104
fn spawn_waiting_indicator(
56✔
105
    printer: Arc<Printer>,
56✔
106
    config: &StreamingConfig,
56✔
107
    is_tty: bool,
56✔
108
) -> Option<(CancellationToken, JoinHandle<()>)> {
56✔
109
    if !is_tty {
56✔
110
        return None;
40✔
111
    }
16✔
112

113
    spawn_line_timer(
16✔
114
        printer,
16✔
115
        config.progress.show,
16✔
116
        Duration::from_secs(u64::from(config.progress.delay_secs)),
16✔
117
        Duration::from_millis(u64::from(config.progress.interval_ms)),
16✔
118
        |secs| format!("\r\x1b[K⏱ Waiting… {secs:.1}s"),
5✔
119
    )
120
}
56✔
121

122
/// Runs the turn loop: streaming from LLM, handling signals, executing tools.
123
///
124
/// This is extracted from `handle_turn` to enable integration testing
125
/// without requiring a real LLM provider. The function handles the complete
126
/// turn lifecycle:
127
///
128
/// 1. Streaming LLM responses
129
/// 2. Handling user interrupts (Ctrl+C)
130
/// 3. Executing tool calls
131
/// 4. Persisting conversation state
132
///
133
/// # Errors
134
///
135
/// Returns an error if:
136
/// - LLM streaming fails with a non-retryable error
137
/// - Tool execution fails critically
138
/// - Workspace persistence fails
139
#[expect(clippy::too_many_lines, clippy::too_many_arguments)]
140
pub(super) async fn run_turn_loop(
32✔
141
    provider: Arc<dyn Provider>,
32✔
142
    model: &ModelDetails,
32✔
143
    cfg: &AppConfig,
32✔
144
    signals: &SignalRx,
32✔
145
    mcp_client: &jp_mcp::Client,
32✔
146
    root: &Utf8Path,
32✔
147
    is_tty: bool,
32✔
148
    attachments: &[Attachment],
32✔
149
    lock: &ConversationLock,
32✔
150
    mut tool_choice: ToolChoice,
32✔
151
    tools: &[ToolDefinition],
32✔
152
    printer: Arc<Printer>,
32✔
153
    prompt_backend: Arc<dyn PromptBackend>,
32✔
154
    mut tool_coordinator: ToolCoordinator,
32✔
155
    chat_request: ChatRequest,
32✔
156
) -> Result<(), Error> {
32✔
157
    let mut turn_state = TurnState::default();
32✔
158
    let mut stream_retry = StreamRetryState::new(cfg.assistant.request, is_tty);
32✔
159
    let mut turn_coordinator = TurnCoordinator::new(
32✔
160
        printer.clone(),
32✔
161
        cfg.style.clone(),
32✔
162
        cfg.user.name.clone(),
32✔
163
        cfg.assistant.name.clone(),
32✔
164
        Some(model.id.to_string()),
32✔
165
    );
166
    let mut tool_renderer = ToolRenderer::new(
32✔
167
        if cfg.style.tool_call.show && !printer.format().is_json() {
32✔
168
            printer.clone()
32✔
169
        } else {
170
            Printer::sink().into()
×
171
        },
172
        cfg.style.clone(),
32✔
173
        root.to_path_buf(),
32✔
174
        is_tty,
32✔
175
    );
176

177
    let inquiry_backend: Arc<dyn InquiryBackend> = build_inquiry_backend(
32✔
178
        cfg,
32✔
179
        tools.to_vec(),
32✔
180
        model.clone(),
32✔
181
        provider.clone(),
32✔
182
        attachments.to_vec(),
32✔
183
    )
32✔
184
    .await?;
32✔
185

186
    info!(model = model.name(), "Starting conversation turn.");
32✔
187

188
    // Set when an executing phase aborts via user-initiated restart.
189
    // The next executing phase walks the stream for unresponded requests
190
    // and re-prepares them.
191
    let mut restart_requested = false;
32✔
192

193
    // Id-keyed scratchpad for tool work produced during the streaming
194
    // phase. The executing phase derives an ordered execution plan from
195
    // the conversation stream + this scratchpad via `build_execution_plan`.
196
    // Crucially: there's no public way to enumerate this directly — the
197
    // stream is the source of truth for "what needs to run."
198
    let mut pending_tools = PendingTools::new();
32✔
199

200
    // Prompter shared between streaming (permission prompts) and
201
    // executing (tool question prompts) phases.
202
    let prompter = Arc::new(ToolPrompter::with_prompt_backend(
32✔
203
        printer.clone(),
32✔
204
        cfg.editor.path(),
32✔
205
        prompt_backend.clone(),
32✔
206
    ));
207

208
    loop {
209
        match turn_coordinator.current_phase() {
142✔
210
            TurnPhase::Idle => {
211
                lock.as_mut().update_events(|stream| {
32✔
212
                    turn_coordinator.start_turn(stream, chat_request.clone());
32✔
213
                });
32✔
214
            }
215

216
            TurnPhase::Complete | TurnPhase::Aborted => return Ok(()),
32✔
217

218
            TurnPhase::Streaming => {
219
                // Restore structural invariants before each provider request.
220
                // Specifically: any `ToolCallRequest` without a matching
221
                // `ToolCallResponse` gets a synthetic error response. Without
222
                // this, providers like Anthropic reject the request with
223
                // `tool_use ids were found without tool_result blocks`. The
224
                // top-level `query.rs` sanitize covers the first cycle; this
225
                // call covers every subsequent cycle within the turn so a
226
                // mid-turn corruption never reaches the wire.
227
                lock.as_mut().update_events(ConversationStream::sanitize);
56✔
228

229
                // Rebuild thread from workspace events to ensure latest context.
230
                let events_stream = lock.events().clone();
56✔
231

232
                let thread = build_thread(
56✔
233
                    events_stream,
56✔
234
                    attachments.to_vec(),
56✔
235
                    &cfg.assistant,
56✔
236
                    !tools.is_empty(),
56✔
237
                )?;
×
238

239
                let query = ChatQuery {
56✔
240
                    thread,
56✔
241
                    tools: tools.to_vec(),
56✔
242
                    tool_choice: tool_choice.clone(),
56✔
243
                };
56✔
244

245
                // Start waiting indicator BEFORE the HTTP request. The drop
246
                // guard ensures the indicator is cancelled if we exit early
247
                // (error from run_cycle, break, return).
248
                let waiting =
56✔
249
                    spawn_waiting_indicator(printer.clone(), &cfg.style.streaming, is_tty);
56✔
250
                let (waiting_token, mut waiting_handle) = match waiting {
56✔
251
                    Some((token, handle)) => (Some(token), Some(handle)),
15✔
252
                    None => (None, None),
41✔
253
                };
254
                let _waiting_guard = waiting_token
56✔
255
                    .as_ref()
56✔
256
                    .map(CancellationToken::drop_guard_ref);
56✔
257

258
                // Build the three event sources for the streaming loop.
259
                let sig_stream = StreamSource::Signal(
56✔
260
                    BroadcastStream::new(signals.resubscribe()).filter_map(|result| {
56✔
261
                        future::ready(match result {
×
262
                            Ok(signal) => Some(StreamingLoopEvent::Signal(signal)),
×
263
                            Err(BroadcastStreamRecvError::Lagged(n)) => {
×
264
                                warn!("Missed {n} signals due to receiver lag");
×
265
                                None
×
266
                            }
267
                        })
268
                    }),
×
269
                );
270

271
                let llm_stream = StreamSource::Llm(
56✔
272
                    provider
56✔
273
                        .chat_completion_stream(model, query)
56✔
274
                        .await
56✔
275
                        .map_err(|e| map_llm_error(e, vec![]))?
56✔
276
                        .fuse()
56✔
277
                        .map(|result| StreamingLoopEvent::Llm(Box::new(result))),
184✔
278
                );
279
                turn_state.request_count += 1;
56✔
280

281
                // Reset preparing display for this streaming cycle.
282
                tool_renderer.reset();
56✔
283

284
                // Channel for preparing ticks. The sender is passed to
285
                // PreparingDisplay which spawns a timer task. The receiver is
286
                // merged into the event loop via SelectAll.
287
                let (tick_tx, tick_rx) = mpsc::channel::<Duration>(1);
56✔
288
                let tick_stream = StreamSource::Tick(
56✔
289
                    ReceiverStream::new(tick_rx).map(StreamingLoopEvent::PreparingTick),
56✔
290
                );
56✔
291

292
                // Whether we've seen at least one provider event this cycle
293
                // — used to reset the retry budget on the first successful
294
                // event.
295
                let mut received_provider_event = false;
56✔
296

297
                let mut streams: SelectAll<_> =
56✔
298
                    SelectAll::from_iter([sig_stream, llm_stream, tick_stream]);
56✔
299

300
                let mut conv = lock.as_mut();
56✔
301

302
                while let Some(event) = streams.next().await {
188✔
303
                    // Cancel and await the waiting indicator on the first
304
                    // event, ensuring its cleanup (line clear) completes before
305
                    // we render any content.
306
                    if let Some(handle) = waiting_handle.take() {
188✔
307
                        if let Some(token) = &waiting_token {
15✔
308
                            token.cancel();
15✔
309
                        }
15✔
310
                        drop(handle.await);
15✔
311
                    }
173✔
312

313
                    match event {
188✔
314
                        StreamingLoopEvent::Signal(signal) => {
×
315
                            // Clear the preparing display before showing the
316
                            // interrupt menu to avoid visual conflicts.
317
                            tool_renderer.clear_temp_line();
×
318

319
                            let llm_alive =
×
320
                                streams.iter().any(|s| matches!(s, StreamSource::Llm(_)));
×
321

322
                            let action = conv.update_events(|stream| {
×
323
                                handle_streaming_signal(
×
324
                                    signal,
×
325
                                    &mut turn_coordinator,
×
326
                                    stream,
×
327
                                    &printer,
×
328
                                    prompt_backend.as_ref(),
×
329
                                    !llm_alive,
×
330
                                )
331
                            });
×
332
                            match action {
×
333
                                LoopAction::Continue => {}
×
334
                                LoopAction::Break => break,
×
335
                                LoopAction::Return(()) => return Ok(()),
×
336
                            }
337
                        }
338

339
                        StreamingLoopEvent::Llm(event) => {
184✔
340
                            let event = *event;
184✔
341

342
                            // Stream errors are handled by the unified retry
343
                            // logic — the single source of truth for retries.
344
                            let event = match event {
184✔
345
                                Ok(event) => event,
182✔
346
                                Err(e) => {
2✔
347
                                    tool_renderer.cancel_all();
2✔
348

349
                                    match handle_stream_error(
2✔
350
                                        e,
2✔
351
                                        &mut stream_retry,
2✔
352
                                        &mut turn_coordinator,
2✔
353
                                        &conv,
2✔
354
                                        &printer,
2✔
355
                                    )
356
                                    .await
2✔
357
                                    {
358
                                        LoopAction::Break => break,
2✔
359
                                        LoopAction::Return(result) => return result,
×
360
                                        LoopAction::Continue => continue,
×
361
                                    }
362
                                }
363
                            };
364

365
                            // Reset the retry counter on the first successful
366
                            // event in this cycle. This ensures that partially
367
                            // successful streams (rate-limited mid-response)
368
                            // don't permanently consume the retry budget.
369
                            if !received_provider_event {
182✔
370
                                received_provider_event = true;
56✔
371
                                stream_retry.clear_line(&printer);
56✔
372
                                stream_retry.reset();
56✔
373
                            }
126✔
374

375
                            // Register preparing tool calls. Flush the markdown
376
                            // buffer first so buffered text appears before the
377
                            // "Calling tool" line (fixes Issue 1).
378
                            if let Event::Part {
379
                                part: EventPart::ToolCall(ToolCallPart::Start { id, name }),
29✔
380
                                ..
381
                            } = &event
32✔
382
                            {
29✔
383
                                turn_coordinator.flush_renderer();
29✔
384
                                turn_coordinator.transition_to_tool_call();
29✔
385

29✔
386
                                tool_renderer.register(id, name, &tick_tx);
29✔
387
                                tool_coordinator
29✔
388
                                    .set_tool_state(id, ToolCallState::ReceivingArguments {
29✔
389
                                        name: name.clone(),
29✔
390
                                    });
29✔
391
                            }
153✔
392

393
                            let is_flush = matches!(event, Event::Flush { .. });
182✔
394
                            let is_finished = matches!(event, Event::Finished(_));
182✔
395

396
                            let action = conv.update_events(|stream| {
182✔
397
                                handle_llm_event(event, &mut turn_coordinator, stream)
182✔
398
                            });
182✔
399
                            match action {
182✔
400
                                LoopAction::Continue => {}
128✔
401
                                LoopAction::Break => break,
54✔
402
                                LoopAction::Return(()) => return Ok(()),
×
403
                            }
404

405
                            // On Flush of a tool call: clear the temp line,
406
                            // prepare the executor, decide permission, then
407
                            // render the tool call header + arguments. For
408
                            // attended tools the permission prompt comes first,
409
                            // so the user approves before seeing the full
410
                            // rendering.
411
                            let flushed_req = is_flush
128✔
412
                                .then(|| {
128✔
413
                                    conv.events()
61✔
414
                                        .last()
61✔
415
                                        .as_ref()
61✔
416
                                        .and_then(|e| e.as_tool_call_request())
61✔
417
                                        .cloned()
61✔
418
                                })
61✔
419
                                .flatten();
128✔
420
                            if let Some(req) = flushed_req {
128✔
421
                                tool_coordinator.set_tool_state(&req.id, ToolCallState::Queued);
28✔
422
                                tool_renderer.complete(&req.id);
28✔
423

424
                                match tool_coordinator.prepare_one(req.clone()) {
28✔
425
                                    Ok(executor) => {
18✔
426
                                        // Run the unified per-tool permission
427
                                        // pipeline. The await blocks the
428
                                        // streaming event loop while the user
429
                                        // decides; LLM events buffer in the
430
                                        // channel and are processed after.
431
                                        let decision = tool_coordinator
18✔
432
                                            .resolve_tool_call_decision(
18✔
433
                                                executor,
18✔
434
                                                &prompter,
18✔
435
                                                mcp_client,
18✔
436
                                                is_tty,
18✔
437
                                                &mut turn_state,
18✔
438
                                                &tool_renderer,
18✔
439
                                            )
18✔
440
                                            .await;
18✔
441

442
                                        match decision {
18✔
443
                                            ToolCallDecision::Approved {
444
                                                executor,
16✔
445
                                                rendered_arguments,
16✔
446
                                            } => {
447
                                                if let Some(content) = rendered_arguments {
16✔
NEW
448
                                                    conv.update_events(|stream| {
×
NEW
449
                                                        store_rendered_arguments(
×
NEW
450
                                                            stream, &req.id, &content,
×
451
                                                        );
NEW
452
                                                    });
×
453
                                                }
16✔
454
                                                pending_tools
16✔
455
                                                    .insert_approved(req.id.clone(), executor);
16✔
456
                                            }
457
                                            ToolCallDecision::Skipped(resp)
2✔
458
                                            | ToolCallDecision::Failed(resp) => {
2✔
459
                                                pending_tools.insert_resolved(req.id.clone(), resp);
2✔
460
                                            }
2✔
461
                                        }
462
                                    }
463
                                    Err(resp) => {
10✔
464
                                        pending_tools.insert_resolved(req.id.clone(), resp);
10✔
465
                                    }
10✔
466
                                }
467
                            }
100✔
468

469
                            if is_finished {
128✔
470
                                tool_renderer.cancel_all();
×
471
                            }
128✔
472
                        }
473

474
                        StreamingLoopEvent::PreparingTick(elapsed) => {
4✔
475
                            tool_renderer.tick(elapsed);
4✔
476
                        }
4✔
477
                    }
478
                }
479

480
                // Clean up any preparing state on early loop exit.
481
                tool_renderer.cancel_all();
56✔
482

483
                conv.flush()?;
56✔
484
            }
485

486
            TurnPhase::Executing => {
487
                // On restart: walk the stream for unresponded tool-call
488
                // requests in the current turn and re-prepare them into
489
                // `pending_tools` via the existing batch APIs. From there
490
                // the unified executing path below picks up.
491
                //
492
                // The streaming and restart prep flows are still two
493
                // separate codepaths today (see Bear note: "unify the
494
                // streaming and restart tool-prep codepaths"). Both
495
                // converge on `pending_tools` and `build_execution_plan`,
496
                // which is the load-bearing invariant for this refactor.
497
                if restart_requested {
22✔
NEW
498
                    restart_requested = false;
×
499

NEW
500
                    let restart_calls: Vec<ToolCallRequest> = lock
×
NEW
501
                        .events()
×
NEW
502
                        .iter_turns()
×
NEW
503
                        .next_back()
×
NEW
504
                        .map(|t| {
×
NEW
505
                            t.iter()
×
NEW
506
                                .filter_map(|e| e.event.as_tool_call_request())
×
NEW
507
                                .filter(|req| {
×
NEW
508
                                    lock.events().find_tool_call_response(&req.id).is_none()
×
NEW
509
                                })
×
NEW
510
                                .cloned()
×
NEW
511
                                .collect()
×
NEW
512
                        })
×
NEW
513
                        .unwrap_or_default();
×
514

515
                    if restart_calls.is_empty() {
×
516
                        break;
×
517
                    }
×
518

NEW
519
                    let unavailable = tool_coordinator.prepare(restart_calls);
×
520
                    let restart_prompter = ToolPrompter::with_prompt_backend(
×
521
                        printer.clone(),
×
522
                        cfg.editor.path(),
×
523
                        prompt_backend.clone(),
×
524
                    );
NEW
525
                    let (executors, skipped) = tool_coordinator
×
526
                        .run_permission_phase(
×
527
                            &restart_prompter,
×
528
                            mcp_client,
×
529
                            is_tty,
×
530
                            &mut turn_state,
×
531
                            &tool_renderer,
×
532
                        )
533
                        .await;
×
534

NEW
535
                    for (_idx, exec) in executors {
×
NEW
536
                        let id = exec.tool_id().to_owned();
×
NEW
537
                        pending_tools.insert_approved(id, exec);
×
UNCOV
538
                    }
×
NEW
539
                    for (_idx, resp) in skipped {
×
NEW
540
                        pending_tools.insert_resolved(resp.id.clone(), resp);
×
NEW
541
                    }
×
NEW
542
                    for (_idx, resp) in unavailable {
×
NEW
543
                        pending_tools.insert_resolved(resp.id.clone(), resp);
×
544
                    }
×
545
                }
22✔
546

547
                // Unified executing path: derive the work to do by walking
548
                // the conversation stream and reconciling against
549
                // `pending_tools`. The stream is the source of truth.
550
                let mut conv = lock.as_mut();
22✔
551
                let plan = build_execution_plan(&conv.events(), &mut pending_tools);
22✔
552

553
                if plan.is_empty() {
22✔
554
                    break;
×
555
                }
22✔
556

557
                let (items, orphaned) = plan.into_parts();
22✔
558

559
                let mut approved: Vec<(usize, Box<dyn Executor>)> = Vec::new();
22✔
560
                let mut pre_resolved: Vec<(usize, ToolCallResponse)> = Vec::new();
22✔
561
                for item in items {
28✔
562
                    match item.work {
28✔
563
                        PendingEntry::Approved(exec) => approved.push((item.index, exec)),
16✔
564
                        PendingEntry::Resolved(resp) => pre_resolved.push((item.index, resp)),
12✔
565
                    }
566
                }
567

568
                // Orphans: a `ToolCallRequest` in the stream's current turn
569
                // without a matching pending entry. Should never happen in
570
                // correct operation — every flushed request goes through
571
                // the prep flow which writes to `pending_tools`. Synthesize
572
                // an error response so the conversation stays valid (every
573
                // request must have a response before the next provider
574
                // call) and surface the inconsistency.
575
                for req in orphaned {
22✔
NEW
576
                    warn!(
×
577
                        id = %req.id,
578
                        name = %req.name,
579
                        "ToolCallRequest in stream without a pending entry; synthesizing error \
580
                         response.",
581
                    );
NEW
582
                    let next_idx = approved.len() + pre_resolved.len();
×
NEW
583
                    pre_resolved.push((next_idx, ToolCallResponse {
×
NEW
584
                        id: req.id,
×
NEW
585
                        result: Err(
×
NEW
586
                            "Tool call had no prepared executor (internal inconsistency).".into(),
×
NEW
587
                        ),
×
NEW
588
                    }));
×
589
                }
590

591
                tool_coordinator.reset_for_execution();
22✔
592

593
                let execution_result = tool_coordinator
22✔
594
                    .execute_with_prompting(
22✔
595
                        approved,
22✔
596
                        Arc::clone(&prompter),
22✔
597
                        signals.resubscribe(),
22✔
598
                        &mut turn_coordinator,
22✔
599
                        &mut turn_state,
22✔
600
                        &printer,
22✔
601
                        prompt_backend.as_ref(),
22✔
602
                        Arc::clone(&inquiry_backend),
22✔
603
                        &conv,
22✔
604
                        mcp_client,
22✔
605
                        root,
22✔
606
                        &tool_renderer,
22✔
607
                        is_tty,
22✔
608
                    )
22✔
609
                    .await;
22✔
610

611
                if execution_result.restart_requested {
22✔
NEW
612
                    restart_requested = true;
×
613
                    continue;
×
614
                }
22✔
615

616
                if commit_tool_responses(
22✔
617
                    execution_result,
22✔
618
                    pre_resolved,
22✔
619
                    &mut tool_coordinator,
22✔
620
                    &mut turn_coordinator,
22✔
621
                    &mut conv,
22✔
622
                )? {
22✔
623
                    tool_choice = ToolChoice::Auto;
22✔
624
                }
22✔
625
            }
626
        }
627
    }
628

629
    Ok(())
×
630
}
32✔
631

632
async fn build_inquiry_backend(
32✔
633
    cfg: &AppConfig,
32✔
634
    tools: Vec<ToolDefinition>,
32✔
635
    model: ModelDetails,
32✔
636
    provider: Arc<dyn Provider>,
32✔
637
    attachments: Vec<Attachment>,
32✔
638
) -> Result<Arc<LlmInquiryBackend>, Error> {
32✔
639
    let sections = build_sections(&cfg.assistant, !tools.is_empty());
32✔
640
    let inquiry_override = &cfg.conversation.inquiry.assistant;
32✔
641

642
    // Use the inquiry system prompt if configured, otherwise fall back to the
643
    // parent assistant's system prompt.
644
    let default_system_prompt = inquiry_override
32✔
645
        .system_prompt
32✔
646
        .clone()
32✔
647
        .or_else(|| cfg.assistant.system_prompt.clone());
32✔
648

649
    // Track providers we've already constructed to avoid duplicates.
650
    let mut providers: IndexMap<ProviderId, Arc<dyn Provider>> = IndexMap::new();
32✔
651

652
    // Build the default InquiryConfig from the global inquiry override
653
    // merged with the parent assistant config.
654
    let default_config = if let Some(inquiry_model_cfg) = inquiry_override.model.as_ref() {
32✔
655
        let inquiry_model_id = inquiry_model_cfg.id.resolved();
×
656
        let inquiry_provider: Arc<dyn Provider> =
×
657
            Arc::from(get_provider(inquiry_model_id.provider, &cfg.providers.llm)?);
×
658
        debug!(model = %inquiry_model_id, "Fetching inquiry model details.");
×
659
        let inquiry_model = inquiry_provider
×
660
            .model_details(&inquiry_model_id.name)
×
661
            .await?;
×
662

663
        if inquiry_model.structured_output == Some(false) {
×
664
            warn!(
×
665
                model = inquiry_model_id.to_string(),
×
666
                "Inquiry model does not support structured output. Inquiry responses may be \
667
                 unreliable.",
668
            );
669
        }
×
670

671
        info!(
×
672
            model = inquiry_model.name(),
×
673
            "Using dedicated model for inquiries."
674
        );
675

676
        providers.insert(inquiry_model_id.provider, Arc::clone(&inquiry_provider));
×
677

678
        InquiryConfig {
×
679
            provider: inquiry_provider,
×
680
            model: inquiry_model,
×
681
            system_prompt: default_system_prompt,
×
682
            sections: sections.clone(),
×
683
        }
×
684
    } else {
685
        providers.insert(model.id.provider, Arc::clone(&provider));
32✔
686

687
        InquiryConfig {
32✔
688
            provider: Arc::clone(&provider),
32✔
689
            model: model.clone(),
32✔
690
            system_prompt: default_system_prompt,
32✔
691
            sections: sections.clone(),
32✔
692
        }
32✔
693
    };
694

695
    let overrides = build_inquiry_overrides(cfg, &default_config, &mut providers).await?;
32✔
696

697
    Ok(Arc::new(LlmInquiryBackend::new(
32✔
698
        default_config,
32✔
699
        overrides,
32✔
700
        attachments,
32✔
701
        tools,
32✔
702
    )))
32✔
703
}
32✔
704

705
/// Walk active tool configs to build per-question [`InquiryConfig`] overrides
706
/// from `QuestionTarget::Assistant(config)` entries that have non-empty config.
707
async fn build_inquiry_overrides(
32✔
708
    cfg: &AppConfig,
32✔
709
    default_config: &InquiryConfig,
32✔
710
    providers: &mut IndexMap<ProviderId, Arc<dyn Provider>>,
32✔
711
) -> Result<IndexMap<(String, String), InquiryConfig>, Error> {
32✔
712
    let mut overrides = IndexMap::new();
32✔
713

714
    for (tool_name, tool_cfg) in cfg.conversation.tools.iter() {
32✔
715
        for (question_id, question_cfg) in tool_cfg.questions() {
20✔
716
            let QuestionTarget::Assistant(ref per_q) = question_cfg.target else {
7✔
717
                continue;
×
718
            };
719
            if PartialConfig::is_empty(per_q.as_ref()) {
7✔
720
                continue;
7✔
721
            }
×
722

723
            // Resolve per-question model (if overridden), falling back to
724
            // the default inquiry config's model.
725
            let has_model_override = !PartialConfig::is_empty(&per_q.model.id);
×
726
            let (inq_provider, inq_model) = if has_model_override {
×
727
                let model_id = per_q
×
728
                    .model
×
729
                    .id
×
730
                    .resolve(&cfg.providers.llm.aliases)
×
731
                    .map_err(|e| Error::CliConfig(e.to_string()))?;
×
732

733
                let prov = if let Some(p) = providers.get(&model_id.provider) {
×
734
                    Arc::clone(p)
×
735
                } else {
736
                    let p: Arc<dyn Provider> =
×
737
                        Arc::from(get_provider(model_id.provider, &cfg.providers.llm)?);
×
738
                    providers.insert(model_id.provider, Arc::clone(&p));
×
739
                    p
×
740
                };
741

742
                let details = prov.model_details(&model_id.name).await?;
×
743

744
                if details.structured_output == Some(false) {
×
745
                    warn!(
×
746
                        tool = %tool_name,
747
                        question = %question_id,
748
                        model = %model_id,
749
                        "Per-question inquiry model does not support structured \
750
                         output. Inquiry responses may be unreliable.",
751
                    );
752
                }
×
753

754
                (prov, details)
×
755
            } else {
756
                (
×
757
                    Arc::clone(&default_config.provider),
×
758
                    default_config.model.clone(),
×
759
                )
×
760
            };
761

762
            // System prompt: per-question -> global inquiry -> main.
763
            let system_prompt = per_q
×
764
                .system_prompt
×
765
                .as_ref()
×
766
                .map(|s| s.to_string())
×
767
                .or_else(|| default_config.system_prompt.clone());
×
768

769
            overrides.insert((tool_name.to_owned(), question_id.clone()), InquiryConfig {
×
770
                provider: inq_provider,
×
771
                model: inq_model,
×
772
                system_prompt,
×
773
                sections: default_config.sections.clone(),
×
774
            });
×
775
        }
776
    }
777

778
    Ok(overrides)
32✔
779
}
32✔
780

781
/// Assemble tool responses from the executor's results plus any pre-resolved
782
/// responses (skipped tools, unavailable tools, orphan synthesizations),
783
/// commit them to the conversation stream, and flush to disk.
784
///
785
/// Returns `true` if a follow-up LLM cycle is needed (i.e. tool responses were
786
/// added and the coordinator wants to continue).
787
fn commit_tool_responses(
22✔
788
    result: ExecutionResult,
22✔
789
    pre_resolved: Vec<(usize, ToolCallResponse)>,
22✔
790
    tool: &mut ToolCoordinator,
22✔
791
    turn: &mut super::turn::TurnCoordinator,
22✔
792
    conv: &mut ConversationMut,
22✔
793
) -> Result<bool, Error> {
22✔
794
    // Persist any rendered custom-argument output accumulated during the
795
    // permission phase into the corresponding ToolCallRequest events.
796
    flush_rendered_arguments(tool, conv);
22✔
797

798
    // Both `result.responses` and `pre_resolved` are already keyed by the
799
    // plan index assigned in `build_execution_plan`. Sorting by that
800
    // index restores stream order for the persisted responses.
801
    let mut indexed: Vec<(usize, ToolCallResponse)> = result.responses;
22✔
802
    indexed.extend(pre_resolved);
22✔
803
    indexed.sort_by_key(|(idx, _)| *idx);
22✔
804
    let responses: Vec<_> = indexed.into_iter().map(|(_, r)| r).collect();
22✔
805

806
    let action = conv.update_events(|stream| turn.handle_tool_responses(stream, responses));
22✔
807
    conv.flush()?;
22✔
808

809
    Ok(matches!(action, Action::SendFollowUp))
22✔
810
}
22✔
811

812
/// Write a single rendered-arguments entry into event metadata.
813
fn store_rendered_arguments(stream: &mut ConversationStream, tool_call_id: &str, content: &str) {
×
814
    for event in stream.iter_mut() {
×
815
        let is_match = event
×
816
            .event
×
817
            .as_tool_call_request()
×
818
            .is_some_and(|r| r.id == tool_call_id);
×
819
        if is_match {
×
820
            set_rendered_arguments(event.event, content);
×
821
            return;
×
822
        }
×
823
    }
824
}
×
825

826
/// Drain accumulated rendered arguments from the coordinator and write them
827
/// into the corresponding `ToolCallRequest` events in the stream.
828
fn flush_rendered_arguments(coordinator: &mut ToolCoordinator, conv: &mut ConversationMut) {
22✔
829
    let rendered = coordinator.drain_rendered_arguments();
22✔
830
    if rendered.is_empty() {
22✔
831
        return;
22✔
832
    }
×
833
    conv.update_events(|stream| {
×
834
        for (tool_call_id, content) in &rendered {
×
835
            store_rendered_arguments(stream, tool_call_id, content);
×
836
        }
×
837
    });
×
838
}
22✔
839

840
fn map_llm_error(error: jp_llm::Error, models: Vec<ModelDetails>) -> Error {
×
841
    match error {
×
842
        jp_llm::Error::UnknownModel(model) => Error::UnknownModel {
×
843
            model,
×
844
            available: models.into_iter().map(|v| v.id.name.to_string()).collect(),
×
845
        },
846
        _ => error.into(),
×
847
    }
848
}
×
849

850
#[cfg(test)]
851
#[path = "turn_loop_tests.rs"]
852
mod tests;
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc