• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

dcdpr / jp / 15615106765

12 Jun 2025 03:43PM UTC coverage: 38.703% (-0.03%) from 38.737%
15615106765

Pull #160

github

web-flow
Merge 7d2c3f38e into a74d2b6f3
Pull Request #160: refactor: simplify `Query` command implementation

0 of 211 new or added lines in 3 files covered. (0.0%)

9 existing lines in 1 file now uncovered.

3618 of 9348 relevant lines covered (38.7%)

4.66 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

0.0
/crates/jp_cli/src/cmd/query.rs
1
use std::{
2
    collections::{BTreeMap, HashSet},
3
    convert::Infallible,
4
    env, fs,
5
    path::{Path, PathBuf},
6
    str::FromStr,
7
};
8

9
use clap::{builder::TypedValueParser as _, ArgAction};
10
use crossterm::style::{Color, Stylize as _};
11
use futures::StreamExt as _;
12
use jp_config::{expand_tilde, style::code::LinkStyle};
13
use jp_conversation::{
14
    message::{ToolCallRequest, ToolCallResult},
15
    persona::Instructions,
16
    thread::{Thread, ThreadBuilder},
17
    AssistantMessage, Context, ContextId, Conversation, ConversationId, MessagePair, Model,
18
    ModelId, PersonaId, UserMessage,
19
};
20
use jp_llm::provider::{self, CompletionChunk, StreamEvent};
21
use jp_mcp::{config::McpServerId, tool::ToolChoice, ResourceContents};
22
use jp_query::query::{ChatQuery, StructuredQuery};
23
use jp_task::task::TitleGeneratorTask;
24
use jp_term::{code, osc::hyperlink, stdout};
25
use minijinja::{Environment, UndefinedBehavior};
26
use termimad::FmtText;
27
use tracing::{debug, info, trace};
28
use url::Url;
29

30
use super::{attachment::register_attachment, Output};
31
use crate::{
32
    cmd::Success,
33
    editor::{self, Editor},
34
    error::{Error, Result},
35
    parser, Ctx, KeyValue, PATH_STRING_PREFIX,
36
};
37

38
#[derive(Debug, clap::Args)]
39
pub struct Query {
40
    /// The query to send. If not provided, uses `$JP_EDITOR`, `$VISUAL` or
41
    /// `$EDITOR` to open edit the query in an editor.
42
    #[arg(value_parser = string_or_path)]
43
    pub query: Option<String>,
44

45
    /// Use the query string as a Jinja2 template.
46
    ///
47
    /// You can provide values for template variables using the
48
    /// `template.values` config key.
49
    #[arg(long)]
50
    pub template: bool,
51

52
    #[arg(long, value_parser = string_or_path.try_map(json_schema))]
53
    pub schema: Option<schemars::Schema>,
54

55
    /// Replay the last message in the conversation.
56
    ///
57
    /// If a query is provided, it will be appended to the end of the previous
58
    /// message. If no query is provided, $EDITOR will open with the last
59
    /// message in the conversation.
60
    #[arg(long = "replay", conflicts_with = "new_conversation")]
61
    pub replay: bool,
62

63
    /// Start a new conversation without any message history.
64
    ///
65
    /// If a context named `default` exists, it will be attached to the
66
    /// conversation.
67
    #[arg(short = 'n', long = "new")]
68
    pub new_conversation: bool,
69

70
    /// Store the conversation locally, outside of the workspace.
71
    #[arg(short = 'l', long = "local", requires = "new_conversation")]
72
    pub local: bool,
73

74
    /// Add attachment to the context.
75
    #[arg(short = 'a', long = "attachment", value_parser = |s: &str| parser::attachment_url(s))]
×
76
    pub attachments: Vec<Url>,
77

78
    /// Use specific persona.
79
    #[arg(short = 'p', long = "persona", value_parser = PersonaId::from_str)]
80
    pub persona: Option<PersonaId>,
81

82
    /// Use specific context.
83
    #[arg(short = 'x', long = "context", value_parser = |s: &str| ContextId::try_from(s))]
×
84
    pub context: Option<ContextId>,
85

86
    /// Use specific MCP servers exclusively.
87
    #[arg(short = 'm', long = "mcp", value_parser = |s: &str| Ok::<_, Infallible>(McpServerId::new(s)))]
×
88
    pub mcp: Vec<McpServerId>,
89

90
    /// Whether and how to edit the query.
91
    #[arg(short = 'e', long = "edit", conflicts_with = "no_edit")]
92
    pub edit: Option<Option<Editor>>,
93

94
    /// Do not edit the query.
95
    #[arg(short = 'E', long = "no-edit", conflicts_with = "edit")]
96
    pub no_edit: bool,
97

98
    /// The model to use.
99
    #[arg(short = 'o', long = "model", value_parser = ModelId::from_str)]
100
    pub model: Option<ModelId>,
101

102
    /// The model parameters to use.
103
    #[arg(short = 'r', long = "param", value_name = "KEY=VALUE", action = ArgAction::Append, value_parser = KeyValue::from_str)]
104
    pub parameters: Vec<KeyValue>,
105

106
    /// Do not display the reasoning content.
107
    ///
108
    /// This does not stop the assistant from generating reasoning tokens to
109
    /// help with its accuracy, but it does not display them in the output.
110
    #[arg(long = "hide-reasoning")]
111
    pub hide_reasoning: bool,
112

113
    /// Stream the assistant's response as it is generated.
114
    ///
115
    /// This is the default behaviour for TTY sessions, but can be forced for
116
    /// non-TTY sessions by setting this flag.
117
    #[arg(short = 's', long = "stream", conflicts_with = "no_stream")]
118
    pub stream: bool,
119

120
    /// Disable streaming the assistant's response.
121
    ///
122
    /// This is the default behaviour for non-TTY sessions, or for structured
123
    /// responses, but can be forced by setting this flag.
124
    #[arg(short = 'S', long = "no-stream", conflicts_with = "stream")]
125
    pub no_stream: bool,
126

127
    /// The tool to use.
128
    ///
129
    /// If a value is provided, the tool matching the value will be used.
130
    ///
131
    /// Note that this setting is *not* persisted across queries. To persist
132
    /// tool choice behavior, use a named context with the `tool_choice` field,
133
    /// or set `llm.tool_choice` in the config file.
134
    #[arg(short = 't', long = "tool")]
135
    pub tool_choice: Option<Option<String>>,
136

137
    /// Disable tool use by the assistant.
138
    #[arg(short = 'T', long = "no-tool")]
139
    pub no_tool_choice: bool,
140
}
141

142
/// How to render the response to the user.
143
#[derive(Debug, Default, Clone, Copy, PartialEq, Eq)]
144
pub(crate) enum RenderMode {
145
    /// Use the default render mode, depending on whether the output is a TTY,
146
    /// and if a structured response is requested.
147
    #[default]
148
    Auto,
149

150
    /// Render the response as a stream of tokens.
151
    Streamed,
152

153
    /// Render the response as a buffered string.
154
    Buffered,
155
}
156

157
impl Query {
UNCOV
158
    pub async fn run(self, ctx: &mut Ctx) -> Output {
×
159
        debug!("Running `query` command.");
×
160
        trace!(args = ?self, "Received arguments.");
×
161

NEW
162
        let previous_id = self.update_active_conversation(ctx)?;
×
163

NEW
164
        self.update_config(&mut ctx.config)?;
×
UNCOV
165
        self.update_context(ctx).await?;
×
UNCOV
166
        ctx.configure_active_mcp_servers().await?;
×
UNCOV
167
        let model = self.get_model(ctx)?;
×
168

NEW
169
        let (message, query_file) = self.build_message(ctx, &model).await?;
×
170

171
        if let UserMessage::Query(query) = &message {
×
UNCOV
172
            if query.is_empty() {
×
NEW
173
                return cleanup(ctx, previous_id, query_file.as_deref()).map_err(Into::into);
×
UNCOV
174
            }
×
175

176
            // Generate title for new conversations.
177
            if ctx.term.args.persist
×
178
                && self.new_conversation
×
179
                && ctx.config.conversation.title.generate.auto
×
180
            {
181
                debug!("Generating title for new conversation");
×
182
                ctx.task_handler.spawn(TitleGeneratorTask::new(
×
NEW
183
                    ctx.workspace.active_conversation_id(),
×
184
                    &ctx.config,
×
185
                    &ctx.workspace,
×
186
                    Some(query.clone()),
×
187
                ));
188
            }
×
189
        }
×
190

191
        let conversation = ctx.workspace.get_active_conversation();
×
NEW
192
        let thread = self.build_thread(ctx, message.clone()).await?;
×
193

194
        let context = conversation.context.clone();
×
195
        let mut messages = vec![];
×
196
        if let Some(schema) = self.schema.clone() {
×
197
            messages.push(handle_structured_output(ctx, context, thread, &model, schema).await?);
×
198
        } else {
NEW
199
            self.handle_stream(
×
200
                ctx,
×
201
                context,
×
202
                thread,
×
203
                &model,
×
NEW
204
                self.tool_choice(ctx),
×
205
                &mut messages,
×
206
            )
×
207
            .await?;
×
208
        }
209

210
        let mut reply = String::new();
×
211
        for message in messages {
×
NEW
212
            let conversation_id = ctx.workspace.active_conversation_id();
×
UNCOV
213
            trace!(
×
214
                conversation = %conversation_id,
215
                content_size = message.reply.content.as_deref().unwrap_or_default().len(),
×
216
                reasoning_size = message
×
217
                    .reply
×
218
                    .reasoning
×
219
                    .as_ref()
×
220
                    .map(ToString::to_string)
×
221
                    .unwrap_or_default()
×
222
                    .len(),
×
223
                "Storing response message in conversation."
×
224
            );
225

226
            if let Some(content) = &message.reply.content {
×
227
                reply.push_str(content);
×
228
            }
×
229
            ctx.workspace.add_message(conversation_id, message.clone());
×
230
        }
231

232
        // Clean up the query file.
NEW
233
        if let Some(path) = query_file {
×
234
            fs::remove_file(path)?;
×
235
        }
×
236

237
        if self.schema.is_some() && !reply.is_empty() {
×
NEW
238
            if let RenderMode::Streamed = self.render_mode() {
×
239
                stdout::typewriter(&reply, ctx.config.style.typewriter.code_delay)?;
×
240
            } else {
241
                return Ok(Success::Json(serde_json::from_str(&reply)?));
×
242
            }
243
        }
×
244

245
        Ok(Success::Ok)
×
246
    }
×
247

248
    async fn build_message(
×
249
        &self,
×
250
        ctx: &mut Ctx,
×
251
        model: &Model,
×
252
    ) -> Result<(UserMessage, Option<PathBuf>)> {
×
NEW
253
        let conversation_id = ctx.workspace.active_conversation_id();
×
254

255
        // If replaying, remove the last message from the conversation, and use
256
        // its query message to build the new query.
257
        let mut message = self
×
258
            .replay
×
259
            .then(|| ctx.workspace.pop_message(&conversation_id))
×
260
            .flatten()
×
261
            .map_or(UserMessage::Query(String::new()), |m| m.message);
×
262

263
        // If replaying a tool call, re-run the requested tool(s) and return the
264
        // new results.
265
        if let UserMessage::ToolCallResults(_) = &mut message {
×
266
            let Some(response) = ctx.workspace.get_messages(&conversation_id).last() else {
×
267
                return Err(Error::Replay("No assistant response found".into()));
×
268
            };
269

270
            let results = handle_tool_calls(ctx, response.reply.tool_calls.clone()).await?;
×
271
            message = UserMessage::ToolCallResults(results);
×
272
        }
×
273

274
        // If a query is provided, prepend it to the existing message. This is
275
        // only relevant for replays, otherwise the existing message is empty,
276
        // and we replace it with the provided query.
277
        if let Some(text) = &self.query {
×
278
            match &mut message {
×
279
                UserMessage::Query(query) if query.is_empty() => text.clone_into(query),
×
280
                UserMessage::Query(query) => *query = format!("{text}\n\n{query}"),
×
281
                UserMessage::ToolCallResults(_) => {}
×
282
            }
283
        }
×
284

285
        let query_file_path = self.edit_message(&mut message, ctx, conversation_id, model)?;
×
286

287
        if let UserMessage::Query(query) = &mut message
×
288
            && self.template
×
289
        {
290
            let mut env = Environment::empty();
×
291
            env.set_undefined_behavior(UndefinedBehavior::SemiStrict);
×
292
            env.add_template("query", query)?;
×
293

294
            let tmpl = env.get_template("query")?;
×
295
            // TODO: supported nested variables
296
            for var in tmpl.undeclared_variables(false) {
×
297
                if ctx.config.template.values.contains_key(&var) {
×
298
                    continue;
×
299
                }
×
300

301
                return Err(Error::TemplateUndefinedVariable(var));
×
302
            }
303

304
            *query = tmpl.render(&ctx.config.template.values)?;
×
305
        }
×
306

307
        Ok((message, query_file_path))
×
308
    }
×
309

310
    /// Update the conversation context based on the contextual information
311
    /// passed in through the CLI, configuration, and environment variables.
312
    async fn update_context(&self, ctx: &mut Ctx) -> Result<()> {
×
313
        // Update context if specified
314
        if let Some(id) = ctx.config.conversation.context.clone() {
×
315
            debug!(
×
316
                %id,
317
                "Using named context in conversation due to conversation.context config."
×
318
            );
319

320
            // Get context.
321
            let context = ctx
×
322
                .workspace
×
323
                .get_named_context(&id)
×
324
                .ok_or(Error::NotFound("Context", id.to_string()))?
×
325
                .clone();
×
326

327
            // Update conversation context.
328
            ctx.workspace.get_active_conversation_mut().context = context;
×
329
        }
×
330

331
        // Update persona if specified
332
        if let Some(id) = ctx.config.conversation.persona.clone() {
×
333
            debug!(
×
334
                %id,
335
                "Changing persona in conversation context due to conversation.persona config."
×
336
            );
337

338
            // Ensure persona exists.
339
            ctx.workspace
×
340
                .get_persona(&id)
×
341
                .ok_or(Error::NotFound("Persona", id.to_string()))?;
×
342

343
            // Update context with new persona.
344
            ctx.workspace
×
345
                .get_active_conversation_mut()
×
346
                .context
×
347
                .persona_id = id;
×
348
        }
×
349

350
        // Add any new attachments specified in arguments
351
        for attachment in &self.attachments {
×
352
            let context = &mut ctx.workspace.get_active_conversation_mut().context;
×
353
            register_attachment(attachment, context).await?;
×
354
        }
355

356
        // Set exclusive MCP servers
357
        let mut servers = HashSet::new();
×
358
        for id in &self.mcp {
×
359
            // Ensure MCP server exists.
360
            ctx.workspace
×
361
                .get_mcp_server(id)
×
362
                .ok_or(Error::NotFound("MCP server", id.to_string()))?;
×
363

364
            servers.insert(id.clone());
×
365
        }
366

367
        if !servers.is_empty() {
×
368
            debug!(
×
369
                servers = servers
×
370
                    .iter()
×
371
                    .map(ToString::to_string)
×
372
                    .collect::<Vec<_>>()
×
373
                    .join(", "),
×
374
                "Overriding MCP server in conversation context due to --mcp flag."
×
375
            );
376

377
            ctx.workspace
×
378
                .get_active_conversation_mut()
×
379
                .context
×
380
                .mcp_server_ids = servers;
×
381
        }
×
382

383
        Ok(())
×
384
    }
×
385

386
    /// Update the config based on overrides from the CLI.
387
    ///
388
    /// The `--cfg` global flag is handled separately, this is specifically for
389
    /// "convenience" flags such as `--persona` or `--context`, which are
390
    /// equivalent to `--cfg conversation.persona` and `--cfg
391
    /// conversation.context`.
392
    fn update_config(&self, config: &mut jp_config::Config) -> Result<()> {
×
393
        // Hide reasoning.
394
        if self.hide_reasoning {
×
395
            config.style.reasoning.show = false;
×
396
        }
×
397

398
        // Update the conversation context.
399
        if let Some(context) = self.context.as_ref() {
×
400
            config.conversation.context = Some(context.clone());
×
401
        }
×
402

403
        // Update the persona.
404
        if let Some(persona) = self.persona.as_ref() {
×
405
            config.conversation.persona = Some(persona.clone());
×
406
        }
×
407

408
        // Update the model parameters.
409
        for KeyValue(key, value) in &self.parameters {
×
410
            config
×
411
                .llm
×
412
                .model
×
413
                .parameters
×
414
                .get_or_insert_default()
×
415
                .set(key, value.to_owned())?;
×
416
        }
417

418
        Ok(())
×
419
    }
×
420

NEW
421
    fn update_active_conversation(&self, ctx: &mut Ctx) -> Result<ConversationId> {
×
NEW
422
        let last_active_conversation_id = ctx.workspace.active_conversation_id();
×
423

424
        if !self.new_conversation {
×
NEW
425
            return Ok(last_active_conversation_id);
×
426
        }
×
427

428
        let id = ctx.workspace.create_conversation(Conversation {
×
429
            local: self.local,
×
430
            ..Default::default()
×
431
        });
×
432

433
        debug!(
×
434
            %id,
435
            local = %self.local,
436
            "Creating new active conversation due to --new flag."
×
437
        );
438

439
        ctx.workspace.set_active_conversation_id(id)?;
×
NEW
440
        Ok(last_active_conversation_id)
×
441
    }
×
442

443
    // Open the editor for the query, if requested.
444
    fn edit_message(
×
445
        &self,
×
446
        message: &mut UserMessage,
×
447
        ctx: &mut Ctx,
×
448
        conversation_id: ConversationId,
×
449
        model: &Model,
×
450
    ) -> Result<Option<PathBuf>> {
×
451
        let UserMessage::Query(query) = message else {
×
452
            return Ok(None);
×
453
        };
454

455
        let mut editor = Editor::from_cli_or_config(self.edit.clone(), ctx.config.editor.clone());
×
456

457
        // Explicitly disable editing if the `--no-edit` flag is set.
458
        if self.no_edit || self.query.as_ref().is_some_and(|_| self.edit.is_none()) {
×
459
            editor = Some(Editor::Disabled);
×
460
        }
×
461

462
        let editor = match editor {
×
463
            None => return Ok(None),
×
464
            Some(Editor::Default) => unreachable!("handled in `from_cli_or_config`"),
×
465
            // If editing is disabled, we set the query as a single whitespace,
466
            // which allows the query to pass through to the assistant.
467
            Some(Editor::Disabled) => {
468
                if query.is_empty() {
×
469
                    " ".clone_into(query);
×
470
                }
×
471
                return Ok(None);
×
472
            }
473
            Some(cmd @ Editor::Command(_)) => match cmd.command() {
×
474
                Some(cmd) => cmd,
×
475
                None => return Ok(None),
×
476
            },
477
        };
478

479
        let initial_message = if query.is_empty() {
×
480
            None
×
481
        } else {
482
            Some(query.to_owned())
×
483
        };
484

485
        // If replaying, pass the last query as the text to be edited,
486
        // otherwise open an empty editor.
487
        let query_file_path;
488
        (*query, query_file_path) =
×
489
            editor::edit_query(ctx, conversation_id, model, initial_message, editor)
×
490
                .map(|(q, p)| (q, Some(p)))?;
×
491

492
        Ok(query_file_path)
×
493
    }
×
494

495
    /// Get the model to use for the current query.
496
    ///
497
    /// 1. If the `model` CLI flag is set, use that.
498
    /// 2. If the current persona has a model, use that.
499
    /// 3. If a model is configured in a configuration file or environment
500
    ///    variable, use that.
501
    /// 4. Otherwise return an error.
502
    fn get_model(&self, ctx: &Ctx) -> Result<Model> {
×
503
        let persona_id = &ctx.workspace.get_active_conversation().context.persona_id;
×
504
        let persona = ctx
×
505
            .workspace
×
506
            .get_persona(persona_id)
×
507
            .ok_or(Error::NotFound("Persona", persona_id.to_string()))?;
×
508

509
        let Some(id) = self
×
510
            .model
×
511
            .clone()
×
512
            .or_else(|| persona.model.clone())
×
513
            .or_else(|| ctx.config.llm.model.id.clone())
×
514
        else {
515
            return Err(Error::UndefinedModel);
×
516
        };
517

518
        let mut parameters = ctx.config.llm.model.parameters.clone().unwrap_or_default();
×
519
        if persona.inherit_parameters {
×
520
            parameters.merge(persona.parameters.clone());
×
521
        } else {
×
522
            parameters = persona.parameters.clone();
×
523
        }
×
524

525
        let model = Model { id, parameters };
×
526

527
        trace!(provider = %model.id.provider(), slug = %model.id.slug(), "Loaded LLM model.");
×
528

529
        Ok(model)
×
530
    }
×
531

532
    fn tool_choice(&self, ctx: &Ctx) -> ToolChoice {
×
533
        self.no_tool_choice
×
534
            .then_some(ToolChoice::None)
×
535
            .or_else(|| {
×
536
                self.tool_choice.as_ref().map(|v| match v.as_deref() {
×
537
                    None | Some("true") => ToolChoice::Required,
×
538
                    Some(v) => match v {
×
539
                        "false" => ToolChoice::None,
×
540
                        _ => ToolChoice::Function(v.to_owned()),
×
541
                    },
542
                })
×
543
            })
×
544
            .or_else(|| {
×
545
                ctx.workspace
×
546
                    .get_active_conversation()
×
547
                    .context
×
548
                    .tool_choice
×
549
                    .clone()
×
550
            })
×
551
            .or_else(|| ctx.config.llm.tool_choice.clone())
×
552
            .unwrap_or(ToolChoice::Auto)
×
553
    }
×
554

NEW
555
    async fn build_thread(&self, ctx: &Ctx, message: UserMessage) -> Result<Thread> {
×
NEW
556
        let conversation_id = ctx.workspace.active_conversation_id();
×
NEW
557
        let conversation = ctx.workspace.get_active_conversation();
×
NEW
558
        let persona_id = &conversation.context.persona_id;
×
NEW
559
        let persona = ctx
×
NEW
560
            .workspace
×
NEW
561
            .get_persona(persona_id)
×
NEW
562
            .ok_or(Error::persona_not_found(persona_id))?;
×
563

NEW
564
        let tools = ctx.mcp_client.list_tools().await?;
×
NEW
565
        let mut attachments = vec![];
×
NEW
566
        for handler in conversation.context.attachment_handlers.values() {
×
NEW
567
            attachments.extend(
×
NEW
568
                handler
×
NEW
569
                    .get(&ctx.workspace.root, ctx.mcp_client.clone())
×
NEW
570
                    .await
×
NEW
571
                    .map_err(|e| Error::Attachment(e.to_string()))?,
×
572
            );
573
        }
574

NEW
575
        let mut thread_builder = ThreadBuilder::default()
×
NEW
576
            .with_system_prompt(persona.system_prompt.clone())
×
NEW
577
            .with_instructions(persona.instructions.clone())
×
NEW
578
            .with_attachments(attachments)
×
NEW
579
            .with_history(ctx.workspace.get_messages(&conversation_id).to_vec())
×
NEW
580
            .with_message(message);
×
581

NEW
582
        if !tools.is_empty() {
×
NEW
583
            let instruction = Instructions::default()
×
NEW
584
                .with_title("Tool Usage")
×
NEW
585
                .with_description("How to leverage the tools available to you.".to_string())
×
NEW
586
                .with_item("Use all the tools available to you to give the best possible answer.")
×
NEW
587
                .with_item("Verify the tool name, description and parameters are correct.")
×
NEW
588
                .with_item(
×
NEW
589
                    "Even if you've reasoned yourself towards a solution, use any available tool \
×
NEW
590
                     to verify your answer.",
×
NEW
591
                );
×
NEW
592

×
NEW
593
            thread_builder = thread_builder.with_instruction(instruction);
×
NEW
594
        }
×
595

NEW
596
        Ok(thread_builder.build()?)
×
NEW
597
    }
×
598

599
    #[expect(clippy::too_many_lines)]
NEW
600
    async fn handle_stream(
×
NEW
601
        &self,
×
NEW
602
        ctx: &mut Ctx,
×
NEW
603
        context: Context,
×
NEW
604
        mut thread: Thread,
×
NEW
605
        model: &Model,
×
NEW
606
        tool_choice: ToolChoice,
×
NEW
607
        messages: &mut Vec<MessagePair>,
×
NEW
608
    ) -> Result<()> {
×
NEW
609
        let tools = ctx.mcp_client.list_tools().await?;
×
NEW
610
        let provider = provider::get_provider(model.id.provider(), &ctx.config.llm.provider)?;
×
NEW
611
        let message = thread.message.clone();
×
NEW
612
        let query = ChatQuery {
×
NEW
613
            thread: thread.clone(),
×
614

615
            // Limit the tools to the ones that are relevant to the tool choice.
NEW
616
            tools: match &tool_choice {
×
NEW
617
                ToolChoice::None => vec![],
×
NEW
618
                ToolChoice::Auto | ToolChoice::Required => tools.clone(),
×
NEW
619
                ToolChoice::Function(name) => tools
×
NEW
620
                    .clone()
×
NEW
621
                    .into_iter()
×
NEW
622
                    .filter(|v| &v.name == name)
×
NEW
623
                    .collect(),
×
624
            },
NEW
625
            tool_choice,
×
NEW
626
            ..Default::default()
×
627
        };
NEW
628
        let mut stream = provider.chat_completion_stream(model, query).await?;
×
629

NEW
630
        let mut content_tokens = String::new();
×
NEW
631
        let mut reasoning_tokens = String::new();
×
NEW
632
        let mut handler = ResponseHandler::new(self.render_mode());
×
NEW
633
        let mut metadata = BTreeMap::new();
×
NEW
634
        let mut tool_calls = Vec::new();
×
NEW
635
        let mut tool_call_results = Vec::new();
×
636

NEW
637
        while let Some(event) = stream.next().await {
×
NEW
638
            let data = match event? {
×
NEW
639
                StreamEvent::ChatChunk(chunk) => match chunk {
×
NEW
640
                    CompletionChunk::Reasoning(data) if !data.is_empty() => {
×
NEW
641
                        reasoning_tokens.push_str(&data);
×
642

NEW
643
                        if !ctx.config.style.reasoning.show {
×
NEW
644
                            continue;
×
NEW
645
                        }
×
646

NEW
647
                        data
×
648
                    }
NEW
649
                    CompletionChunk::Content(mut data) if !data.is_empty() => {
×
NEW
650
                        let reasoning_ended = !reasoning_tokens.is_empty()
×
NEW
651
                            && ctx.config.style.reasoning.show
×
NEW
652
                            && content_tokens.is_empty();
×
653

NEW
654
                        content_tokens.push_str(&data);
×
655

656
                        // If the response includes reasoning, we add two newlines
657
                        // after the reasoning, but before the content.
NEW
658
                        if reasoning_ended {
×
NEW
659
                            data = format!("\n\n{data}");
×
NEW
660
                        }
×
661

NEW
662
                        data
×
663
                    }
NEW
664
                    _ => continue,
×
665
                },
666
                // Tool calls are handled after the stream is finished.
667
                //
668
                // We do add a history of the call to the content tokens for the
669
                // LLMs understanding, but we do not print it to the terminal.
NEW
670
                StreamEvent::ToolCall(call) => {
×
NEW
671
                    tool_calls.push(call.clone());
×
672

NEW
673
                    let data = indoc::formatdoc!(
×
NEW
674
                        "
×
NEW
675
                    ---
×
NEW
676
                    executing tool: **{}**
×
NEW
677

×
NEW
678
                    arguments:
×
NEW
679
                    ```json
×
NEW
680
                    {:#}
×
NEW
681
                    ```
×
NEW
682

×
NEW
683
                ",
×
684
                        call.name,
685
                        call.arguments
686
                    );
687

NEW
688
                    handler.handle(&data, ctx)?;
×
NEW
689
                    let result = handle_tool_call(ctx, call.clone()).await?;
×
NEW
690
                    tool_call_results.push(result.clone());
×
691

NEW
692
                    let content = if result.content.starts_with("```") {
×
NEW
693
                        result.content
×
694
                    } else {
NEW
695
                        format!("```\n{}\n```", result.content)
×
696
                    };
697

NEW
698
                    indoc::formatdoc! {"
×
NEW
699
                    result:
×
NEW
700

×
NEW
701
                    {content}
×
NEW
702
                    ---
×
NEW
703
                    "
×
704
                    }
705
                }
NEW
706
                StreamEvent::Metadata(key, data) => {
×
NEW
707
                    metadata.insert(key, data);
×
NEW
708
                    continue;
×
709
                }
710
            };
711

NEW
712
            handler.handle(&data, ctx)?;
×
713
        }
714

715
        // Ensure we handle the last line of the stream.
NEW
716
        if !handler.buffer.is_empty() {
×
NEW
717
            handler.handle("\n", ctx)?;
×
NEW
718
        }
×
719

NEW
720
        let content_tokens = content_tokens.trim().to_string();
×
NEW
721
        let content = if !content_tokens.is_empty() {
×
NEW
722
            Some(content_tokens)
×
NEW
723
        } else if content_tokens.is_empty() && tool_calls.is_empty() {
×
NEW
724
            Some("<no reply>".to_string())
×
725
        } else {
NEW
726
            None
×
727
        };
728

NEW
729
        let reasoning_tokens = reasoning_tokens.trim().to_string();
×
NEW
730
        let reasoning = if reasoning_tokens.is_empty() {
×
NEW
731
            None
×
732
        } else {
NEW
733
            Some(reasoning_tokens)
×
734
        };
735

NEW
736
        if let RenderMode::Buffered = handler.render_mode {
×
NEW
737
            println!("{}", handler.parsed.join("\n"));
×
NEW
738
        } else if content.is_some() || reasoning.is_some() {
×
NEW
739
            // Final newline.
×
NEW
740
            println!();
×
NEW
741
        }
×
742

NEW
743
        let message = MessagePair::new(message, AssistantMessage {
×
NEW
744
            metadata,
×
NEW
745
            content,
×
NEW
746
            reasoning,
×
NEW
747
            tool_calls: tool_calls.clone(),
×
NEW
748
        })
×
NEW
749
        .with_context(context.clone());
×
NEW
750
        messages.push(message.clone());
×
751

752
        // If the assistant asked for a tool call, we handle it automatically,
753
        // essentially going into a "loop" until no more tool calls are
754
        // requested.
755
        //
756
        // TODO:
757
        //
758
        // This should be handled differently, asking for permission to run a
759
        // tool (unless whitelisted per conversation/globally), it should log
760
        // the fact that a tool call is triggered, and it should guard against
761
        // infinite loops.
NEW
762
        if !tool_call_results.is_empty() {
×
NEW
763
            thread.history.push(message);
×
NEW
764
            thread.message = UserMessage::ToolCallResults(tool_call_results);
×
765

NEW
766
            Box::pin(self.handle_stream(
×
NEW
767
                ctx,
×
NEW
768
                context,
×
NEW
769
                thread,
×
NEW
770
                model,
×
NEW
771
                // After the first tool call, we revert back to letting the LLM
×
NEW
772
                // decide if/which tool to use.
×
NEW
773
                ToolChoice::Auto,
×
NEW
774
                messages,
×
NEW
775
            ))
×
NEW
776
            .await?;
×
NEW
777
        }
×
778

NEW
779
        Ok(())
×
NEW
780
    }
×
781

NEW
782
    fn render_mode(&self) -> RenderMode {
×
NEW
783
        if self.no_stream {
×
NEW
784
            return RenderMode::Buffered;
×
NEW
785
        } else if self.stream {
×
NEW
786
            return RenderMode::Streamed;
×
NEW
787
        }
×
788

NEW
789
        RenderMode::Auto
×
NEW
790
    }
×
791
}
792

793
/// Clean up empty queries.
NEW
794
fn cleanup(
×
NEW
795
    ctx: &mut Ctx,
×
NEW
796
    last_active_conversation_id: ConversationId,
×
NEW
797
    query_file_path: Option<&Path>,
×
NEW
798
) -> Result<Success> {
×
NEW
799
    let conversation_id = ctx.workspace.active_conversation_id();
×
800

NEW
801
    info!("Query is empty, exiting.");
×
NEW
802
    if last_active_conversation_id != conversation_id {
×
NEW
803
        ctx.workspace
×
NEW
804
            .set_active_conversation_id(last_active_conversation_id)?;
×
NEW
805
        ctx.workspace.remove_conversation(&conversation_id)?;
×
NEW
806
    }
×
807

NEW
808
    if let Some(path) = query_file_path {
×
NEW
809
        fs::remove_file(path)?;
×
UNCOV
810
    }
×
811

NEW
812
    Ok("Query is empty, ignoring.".into())
×
813
}
×
814

815
async fn handle_structured_output(
×
816
    ctx: &mut Ctx,
×
817
    context: Context,
×
818
    thread: Thread,
×
819
    model: &Model,
×
820
    schema: schemars::Schema,
×
821
) -> Result<MessagePair> {
×
822
    let provider = provider::get_provider(model.id.provider(), &ctx.config.llm.provider)?;
×
823
    let message = thread.message.clone();
×
824
    let query =
×
825
        StructuredQuery::new(schema, thread).map_err(|err| Error::Schema(err.to_string()))?;
×
826

827
    let value = provider.structured_completion(model, query).await?;
×
828
    let content = if ctx.term.is_tty {
×
829
        serde_json::to_string_pretty(&value)?
×
830
    } else {
831
        serde_json::to_string(&value)?
×
832
    };
833

834
    Ok(MessagePair::new(message, AssistantMessage::from(content)).with_context(context))
×
835
}
×
836

837
#[expect(clippy::needless_pass_by_value)]
838
fn json_schema(s: String) -> Result<schemars::Schema> {
×
839
    serde_json::from_str::<serde_json::Value>(&s)?
×
840
        .try_into()
×
841
        .map_err(Into::into)
×
842
}
×
843

844
fn string_or_path(s: &str) -> Result<String> {
×
845
    if let Some(s) = s
×
846
        .strip_prefix(PATH_STRING_PREFIX)
×
847
        .and_then(|s| expand_tilde(s, env::var("HOME").ok()))
×
848
    {
849
        return fs::read_to_string(s).map_err(Into::into);
×
850
    }
×
851

852
    Ok(s.to_owned())
×
853
}
×
854

UNCOV
855
async fn handle_tool_calls(
×
856
    ctx: &Ctx,
×
857
    tool_calls: Vec<ToolCallRequest>,
×
858
) -> Result<Vec<ToolCallResult>> {
×
859
    let mut results = vec![];
×
860
    for call in tool_calls {
×
861
        results.push(handle_tool_call(ctx, call).await?);
×
862
    }
863

864
    Ok(results)
×
865
}
×
866

867
async fn handle_tool_call(ctx: &Ctx, call: ToolCallRequest) -> Result<ToolCallResult> {
×
868
    info!(tool = %call.name, arguments = %call.arguments, "Calling tool.");
×
869

870
    let result = ctx.mcp_client.call_tool(&call.name, call.arguments).await?;
×
871
    trace!(result = ?result, "Tool call completed.");
×
872

873
    Ok(ToolCallResult {
874
        id: call.id,
×
875
        error: result.is_error.unwrap_or(false),
×
876
        content: result
×
877
            .content
×
878
            .into_iter()
×
879
            .filter_map(|c| match c.raw {
×
880
                jp_mcp::RawContent::Text(text_content) => Some(text_content.text),
×
881
                jp_mcp::RawContent::Resource(embedded_resource) => {
×
882
                    match embedded_resource.resource {
×
883
                        ResourceContents::TextResourceContents { text, .. } => Some(text),
×
884
                        ResourceContents::BlobResourceContents { .. } => None,
×
885
                    }
886
                }
887
                _ => None,
×
888
            })
×
889
            .collect::<Vec<_>>()
×
890
            .join("\n\n"),
×
891
    })
892
}
×
893

894
struct Line {
895
    content: String,
896
    variant: LineVariant,
897
}
898

899
#[derive(Debug)]
900
enum LineVariant {
901
    Normal,
902
    Code,
903
    FencedCodeBlockStart { language: Option<String> },
904
    FencedCodeBlockEnd { indent: usize },
905
}
906

907
impl Line {
908
    fn new(content: String, in_fenced_code_block: bool) -> Self {
×
909
        let variant = if in_fenced_code_block && content.trim().ends_with("```") {
×
910
            let indent = content.chars().take_while(|c| c.is_whitespace()).count();
×
911

912
            LineVariant::FencedCodeBlockEnd { indent }
×
913
        } else if content.trim_start().starts_with("```") {
×
914
            let language = content
×
915
                .trim_start()
×
916
                .chars()
×
917
                .skip(3)
×
918
                .take_while(|c| c.is_alphanumeric())
×
919
                .collect::<String>();
×
920
            let language = if language.is_empty() {
×
921
                None
×
922
            } else {
923
                Some(language)
×
924
            };
925

926
            LineVariant::FencedCodeBlockStart { language }
×
927
        } else if in_fenced_code_block {
×
928
            LineVariant::Code
×
929
        } else {
930
            LineVariant::Normal
×
931
        };
932

933
        Line { content, variant }
×
934
    }
×
935
}
936

937
#[derive(Debug, Default)]
938
struct ResponseHandler {
939
    /// How to render the response.
940
    render_mode: RenderMode,
941

942
    /// The streamed, unprocessed lines received from the LLM.
943
    received: Vec<String>,
944

945
    /// The lines that have been parsed so far.
946
    ///
947
    /// If `should_stream` is `true`, these lines have been printed to the
948
    /// terminal. Otherwise they will be printed when the response handler is
949
    /// finished.
950
    parsed: Vec<String>,
951

952
    /// A temporary buffer of data received from the LLM.
953
    buffer: String,
954

955
    in_fenced_code_block: bool,
956
    // (language, code)
957
    code_buffer: (Option<String>, Vec<String>),
958
    code_line: usize,
959

960
    // The last index of the line that ends a code block.
961
    // (streamed, printed)
962
    last_fenced_code_block_end: (usize, usize),
963
}
964

965
impl ResponseHandler {
NEW
966
    fn new(render_mode: RenderMode) -> Self {
×
967
        Self {
×
NEW
968
            render_mode,
×
969
            ..Default::default()
×
970
        }
×
971
    }
×
972

973
    fn handle(&mut self, data: &str, ctx: &Ctx) -> Result<()> {
×
974
        self.buffer.push_str(data);
×
975

976
        while let Some(Line { content, variant }) = self.get_line() {
×
977
            self.received.push(content);
×
978

979
            let delay = match variant {
×
980
                LineVariant::Code => ctx.config.style.typewriter.code_delay,
×
981
                _ => ctx.config.style.typewriter.text_delay,
×
982
            };
983

984
            let lines = self.handle_line(&variant, ctx)?;
×
985

NEW
986
            if !matches!(self.render_mode, RenderMode::Buffered) {
×
987
                stdout::typewriter(&lines.join("\n"), delay)?;
×
988
            }
×
989

990
            self.parsed.extend(lines);
×
991
        }
992

993
        Ok(())
×
994
    }
×
995

996
    #[expect(clippy::too_many_lines)]
997
    fn handle_line(&mut self, variant: &LineVariant, ctx: &Ctx) -> Result<Vec<String>> {
×
998
        let Some(content) = self.received.last().map(String::as_str) else {
×
999
            return Ok(vec![]);
×
1000
        };
1001

1002
        match variant {
×
1003
            LineVariant::Code => {
1004
                self.code_line += 1;
×
1005
                self.code_buffer.1.push(content.to_owned());
×
1006

1007
                let mut buf = String::new();
×
1008
                let config = code::Config {
×
1009
                    language: self.code_buffer.0.clone(),
×
1010
                    theme: ctx
×
1011
                        .config
×
1012
                        .style
×
1013
                        .code
×
1014
                        .color
×
1015
                        .then(|| ctx.config.style.code.theme.clone()),
×
1016
                };
1017

1018
                if !code::format(content, &mut buf, &config)? {
×
1019
                    let config = code::Config {
×
1020
                        language: None,
×
1021
                        theme: config.theme,
×
1022
                    };
×
1023

1024
                    code::format(content, &mut buf, &config)?;
×
1025
                }
×
1026

1027
                if ctx.config.style.code.line_numbers {
×
1028
                    buf.insert_str(
×
1029
                        0,
×
1030
                        &format!("{:2} │ ", self.code_line)
×
1031
                            .with(Color::AnsiValue(238))
×
1032
                            .to_string(),
×
1033
                    );
×
1034
                }
×
1035

1036
                Ok(vec![buf])
×
1037
            }
1038
            LineVariant::FencedCodeBlockStart { language } => {
×
1039
                self.code_buffer.0.clone_from(language);
×
1040
                self.code_buffer.1.clear();
×
1041
                self.code_line = 0;
×
1042
                self.in_fenced_code_block = true;
×
1043

1044
                Ok(vec![content.with(Color::AnsiValue(238)).to_string()])
×
1045
            }
1046
            LineVariant::FencedCodeBlockEnd { indent } => {
×
1047
                self.last_fenced_code_block_end = (self.received.len(), self.parsed.len() + 2);
×
1048

1049
                let path = self.persist_code_block()?;
×
1050
                let mut links = vec![];
×
1051

1052
                match ctx.config.style.code.file_link {
×
1053
                    LinkStyle::Off => {}
×
1054
                    LinkStyle::Full => {
×
1055
                        links.push(format!(
×
1056
                            "{}see: file://{}",
×
1057
                            " ".repeat(*indent),
×
1058
                            path.display()
×
1059
                        ));
×
1060
                    }
×
1061
                    LinkStyle::Osc8 => {
×
1062
                        links.push(format!(
×
1063
                            "{}[{}]",
×
1064
                            " ".repeat(*indent),
×
1065
                            hyperlink(
×
1066
                                format!("file://{}", path.display()),
×
1067
                                "open in editor".red().to_string()
×
1068
                            )
×
1069
                        ));
×
1070
                    }
×
1071
                }
1072

1073
                match ctx.config.style.code.copy_link {
×
1074
                    LinkStyle::Off => {}
×
1075
                    LinkStyle::Full => {
×
1076
                        links.push(format!(
×
1077
                            "{}copy: copy://{}",
×
1078
                            " ".repeat(*indent),
×
1079
                            path.display()
×
1080
                        ));
×
1081
                    }
×
1082
                    LinkStyle::Osc8 => {
×
1083
                        links.push(format!(
×
1084
                            "{}[{}]",
×
1085
                            " ".repeat(*indent),
×
1086
                            hyperlink(
×
1087
                                format!("copy://{}", path.display()),
×
1088
                                "copy to clipboard".red().to_string()
×
1089
                            )
×
1090
                        ));
×
1091
                    }
×
1092
                }
1093

1094
                self.in_fenced_code_block = false;
×
1095

1096
                let mut lines = vec![content.with(Color::AnsiValue(238)).to_string()];
×
1097
                if !links.is_empty() {
×
1098
                    lines.push(links.join(" "));
×
1099
                }
×
1100

1101
                Ok(lines)
×
1102
            }
1103
            LineVariant::Normal => {
1104
                // We feed all the lines for markdown formatting, but only
1105
                // print the last one, as the others are already printed.
1106
                //
1107
                // This helps the parser to use previous context to apply
1108
                // the correct formatting to the current line.
1109
                //
1110
                // We only care about the lines after the last code block
1111
                // end, because a) formatting context is reset after a code
1112
                // block, and b) we dot not limit the line length of code, makes
1113
                // it impossible to correctly find the non-printed lines based
1114
                // on wrapped vs non-wrapped lines.
1115
                let lines = self
×
1116
                    .received
×
1117
                    .iter()
×
1118
                    .skip(self.last_fenced_code_block_end.0)
×
1119
                    .cloned()
×
1120
                    .collect::<Vec<_>>();
×
1121

1122
                // `termimad` removes empty lines at the start or end, but we
1123
                // want to keep them as we will have more lines to print.
1124
                let empty_lines_start_count = lines.iter().take_while(|s| s.is_empty()).count();
×
1125
                let empty_lines_end_count = lines.iter().rev().take_while(|s| s.is_empty()).count();
×
1126

1127
                let options = comrak::Options {
×
1128
                    render: comrak::RenderOptions {
×
1129
                        unsafe_: true,
×
1130
                        prefer_fenced: true,
×
1131
                        experimental_minimize_commonmark: true,
×
1132
                        ..Default::default()
×
1133
                    },
×
1134
                    ..Default::default()
×
1135
                };
×
1136

1137
                let formatted = comrak::markdown_to_commonmark(&lines.join("\n"), &options);
×
1138

1139
                let mut formatted =
×
1140
                    FmtText::from(&termimad::MadSkin::default(), &formatted, Some(100)).to_string();
×
1141

1142
                for _ in 0..empty_lines_start_count {
×
1143
                    formatted.insert(0, '\n');
×
1144
                }
×
1145

1146
                // Only add an extra newline if we have more than one line,
1147
                // otherwise a single empty line will be interpreted as both a
1148
                // missing start and end newline.
1149
                if lines.iter().any(|s| !s.is_empty()) {
×
1150
                    for _ in 0..empty_lines_end_count {
×
1151
                        formatted.push('\n');
×
1152
                    }
×
1153
                }
×
1154

1155
                let lines = formatted
×
1156
                    .lines()
×
1157
                    .skip(self.parsed.len() - self.last_fenced_code_block_end.1)
×
1158
                    .map(ToOwned::to_owned)
×
1159
                    .collect::<Vec<_>>();
×
1160

1161
                Ok(lines)
×
1162
            }
1163
        }
1164
    }
×
1165

1166
    fn get_line(&mut self) -> Option<Line> {
×
1167
        let s = &mut self.buffer;
×
1168
        let idx = s.find('\n')?;
×
1169

1170
        // Determine the end index of the actual line *content*.
1171
        // Check if the character before '\n' is '\r'.
1172
        let end_idx = if idx > 0 && s.as_bytes().get(idx - 1) == Some(&b'\r') {
×
1173
            idx - 1
×
1174
        } else {
1175
            idx
×
1176
        };
1177

1178
        // Extract the line content *before* draining.
1179
        // Creating a slice and then converting to owned String.
1180
        let extracted_line = s[..end_idx].to_string();
×
1181

1182
        // Calculate the index *after* the newline sequence to drain up to.
1183
        // This ensures we remove the '\n' and potentially the preceding '\r'.
1184
        let drain_end_idx = idx + 1;
×
1185
        s.drain(..drain_end_idx);
×
1186

1187
        Some(Line::new(extracted_line, self.in_fenced_code_block))
×
1188
    }
×
1189

1190
    fn persist_code_block(&self) -> Result<PathBuf> {
×
1191
        let code = self.code_buffer.1.clone();
×
1192
        let language = self.code_buffer.0.as_deref().unwrap_or("txt");
×
1193
        let ext = match language {
×
1194
            "c++" => "cpp",
×
1195
            "javascript" => "js",
×
1196
            "python" => "py",
×
1197
            "ruby" => "rb",
×
1198
            "rust" => "rs",
×
1199
            "typescript" => "ts",
×
1200
            lang => lang,
×
1201
        };
1202

1203
        let millis = std::time::SystemTime::now()
×
1204
            .duration_since(std::time::UNIX_EPOCH)
×
1205
            .unwrap_or_default()
×
1206
            .subsec_millis();
×
1207
        let path = std::env::temp_dir().join(format!("code_{millis}.{ext}"));
×
1208

1209
        fs::write(&path, code.join("\n"))?;
×
1210

1211
        Ok(path)
×
1212
    }
×
1213
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc