• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Shulyaka / telegram_bot_conversation / 23500062102

24 Mar 2026 04:17PM UTC coverage: 70.833%. Remained the same
23500062102

push

github

Shulyaka
Update dependabot

765 of 1080 relevant lines covered (70.83%)

2.13 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

63.13
/custom_components/telegram_bot_conversation/entity.py
1
"""Per-chat handler for telegram_bot_conversation."""
2

3
import asyncio
3✔
4
from collections.abc import Mapping
3✔
5
import contextlib
3✔
6
from dataclasses import dataclass, field
3✔
7
from datetime import timedelta
3✔
8
import itertools
3✔
9
from pathlib import Path
3✔
10
import tempfile
3✔
11
from types import TracebackType
3✔
12
from typing import Any, Self
3✔
13

14
from telegramify_markdown import entities_to_markdownv2, markdownify, telegramify
3✔
15
from telegramify_markdown.content import ContentType
3✔
16

17
from homeassistant.components.ai_task import async_generate_image
3✔
18
from homeassistant.components.conversation import (
3✔
19
    AssistantContentDeltaDict,
20
    Attachment,
21
    ChatLog,
22
    ConversationEntity,
23
    ToolResultContent,
24
    UserContent,
25
    async_converse,
26
    async_get_chat_log,
27
)
28
from homeassistant.components.conversation.agent_manager import get_agent_manager
3✔
29
from homeassistant.components.conversation.chat_log import DATA_CHAT_LOGS
3✔
30
from homeassistant.components.conversation.const import DATA_COMPONENT, ChatLogEventType
3✔
31
from homeassistant.components.media_source import async_resolve_media
3✔
32
from homeassistant.components.telegram_bot import (  # type: ignore[attr-defined]
3✔
33
    InputMediaType,
34
)
35
from homeassistant.components.telegram_bot.const import (
3✔
36
    ATTR_CALLBACK_QUERY_ID,
37
    ATTR_CAPTION,
38
    ATTR_CHAT_ACTION,
39
    ATTR_CHAT_ID,
40
    ATTR_DISABLE_NOTIF,
41
    ATTR_DISABLE_WEB_PREV,
42
    ATTR_FILE,
43
    ATTR_FILE_ID,
44
    ATTR_FILE_MIME_TYPE,
45
    ATTR_FILE_PATH,
46
    ATTR_KEYBOARD_INLINE,
47
    ATTR_MEDIA_TYPE,
48
    ATTR_MESSAGE,
49
    ATTR_MESSAGE_ID,
50
    ATTR_MESSAGE_THREAD_ID,
51
    ATTR_MSG,
52
    ATTR_MSGID,
53
    ATTR_PARSER,
54
    ATTR_REACTION,
55
    ATTR_TEXT,
56
    ATTR_USER_ID,
57
    CHAT_ACTION_TYPING,
58
    CHAT_ACTION_UPLOAD_PHOTO,
59
    CONF_CONFIG_ENTRY_ID,
60
    DOMAIN as TELEGRAM_DOMAIN,
61
    EVENT_TELEGRAM_SENT,
62
    SERVICE_ANSWER_CALLBACK_QUERY,
63
    SERVICE_DELETE_MESSAGE,
64
    SERVICE_DOWNLOAD_FILE,
65
    SERVICE_EDIT_MESSAGE,
66
    SERVICE_EDIT_MESSAGE_MEDIA,
67
    SERVICE_EDIT_REPLYMARKUP,
68
    SERVICE_SEND_CHAT_ACTION,
69
    SERVICE_SEND_DOCUMENT,
70
    SERVICE_SEND_MESSAGE,
71
    SERVICE_SEND_PHOTO,
72
    SERVICE_SET_MESSAGE_REACTION,
73
)
74
from homeassistant.config_entries import ConfigEntry
3✔
75
from homeassistant.const import ATTR_ENTITY_ID
3✔
76
from homeassistant.core import CALLBACK_TYPE, Context, Event, HomeAssistant, callback
3✔
77
from homeassistant.exceptions import HomeAssistantError
3✔
78
from homeassistant.helpers.chat_session import (
3✔
79
    CONVERSATION_TIMEOUT,
80
    DATA_CHAT_SESSION,
81
    async_get_chat_session,
82
)
83
from homeassistant.helpers.event import async_call_later
3✔
84
from homeassistant.helpers.translation import async_get_cached_translations
3✔
85
from homeassistant.util import dt as dt_util
3✔
86

87
from .const import (
3✔
88
    CONF_AI_TASK,
89
    CONF_ATTACHMENTS,
90
    CONF_CONVERSATION_AGENT,
91
    CONF_CONVERSATION_TIMEOUT,
92
    CONF_DISABLE_WEB_PREV,
93
    CONF_LATEX,
94
    CONF_MERMAID,
95
    CONF_TELEGRAM_ENTRY,
96
    CONF_THOUGHTS,
97
    CONF_TMPDIR,
98
    CONF_USER,
99
    DOMAIN,
100
    LOGGER,
101
    REACTION_EMOJI,
102
)
103

104
try:
3✔
105
    from homeassistant.components.telegram_bot.const import (  # type: ignore[attr-defined]
3✔
106
        ATTR_DRAFT_ID,
107
        SERVICE_SEND_MESSAGE_DRAFT,
108
    )
109
except ImportError:
3✔
110
    SERVICE_SEND_MESSAGE_DRAFT = ATTR_DRAFT_ID = None
3✔
111

112
MAX_TELEGRAM_LENGTH = 4096
3✔
113

114

115
@callback
3✔
116
def async_translate_message(
3✔
117
    hass: HomeAssistant,
118
    translation_key: str,
119
    translation_placeholders: dict[str, str] | None = None,
120
) -> str:
121
    """Return a translated message."""
122
    localize_key = f"component.{DOMAIN}.common.{translation_key}"
3✔
123
    for language in (hass.config.language, "en"):
3✔
124
        translations = async_get_cached_translations(
3✔
125
            hass, language, category="common", integration=DOMAIN
126
        )
127
        if localize_key in translations:
3✔
128
            message = translations[localize_key]
3✔
129
            if not translation_placeholders:
3✔
130
                return message
3✔
131
            with contextlib.suppress(KeyError):
×
132
                message = message.format(**translation_placeholders)
×
133
            return message
×
134
    return translation_key
×
135

136

137
def get_telegram_service_target(
3✔
138
    chat_id: int, notify_entity_id: str | None
139
) -> dict[str, str | int | list[str]]:
140
    """Build a telegram service target payload."""
141
    if notify_entity_id:
3✔
142
        return {ATTR_ENTITY_ID: [notify_entity_id]}
3✔
143
    return {ATTR_CHAT_ID: chat_id}
×
144

145

146
class TelegramMessageWatcher:
3✔
147
    """Context management class to track sent messages."""
148

149
    def __init__(self, hass: HomeAssistant, telegram_config_entry_id: str) -> None:
3✔
150
        """Initialize the watcher."""
151
        self.hass = hass
3✔
152
        self.telegram_config_entry_id = telegram_config_entry_id
3✔
153
        self._unregister_listener: CALLBACK_TYPE | None = self.hass.bus.async_listen(
3✔
154
            EVENT_TELEGRAM_SENT,
155
            self.async_handle_sent,
156
            self.callback_sent_filter,
157
        )
158
        self.sent_messages: list[tuple[int, int]] = []
3✔
159
        self.watchers: dict[tuple[int, int], asyncio.Future[None]] = {}
3✔
160

161
    @callback
3✔
162
    def callback_sent_filter(self, event_data: Mapping[str, Any]) -> bool:
3✔
163
        """Filter sent events."""
164
        return bool(
3✔
165
            event_data.get("bot", {}).get(CONF_CONFIG_ENTRY_ID)
166
            == self.telegram_config_entry_id
167
        )
168

169
    async def async_handle_sent(self, event: Event) -> None:
3✔
170
        """Handle sent events."""
171
        message = (
3✔
172
            int(event.data[ATTR_CHAT_ID]),
173
            int(event.data.get(ATTR_MESSAGE_ID, 0)),
174
        )
175
        self.sent_messages.append(message)
3✔
176
        if message in self.watchers:
3✔
177
            self.watchers[message].set_result(None)
×
178
            del self.watchers[message]
×
179

180
    def wait_message(self, chat_id: int, message_id: int) -> asyncio.Future[None]:
3✔
181
        """Watch for a specific message to be sent."""
182
        message = (chat_id, message_id)
3✔
183
        if message in self.sent_messages:
3✔
184
            future: asyncio.Future[None] = asyncio.Future()
3✔
185
            future.set_result(None)
3✔
186
            return future
3✔
187
        self.watchers[message] = asyncio.Future()
×
188
        return self.watchers[message]
×
189

190
    def async_cleanup(self) -> None:
3✔
191
        """Clean up the watcher."""
192
        if not self._unregister_listener:
3✔
193
            return
3✔
194
        self._unregister_listener()
3✔
195
        self._unregister_listener = None
3✔
196

197
    def __enter__(self) -> Self:
3✔
198
        """Enter the context."""
199
        return self
3✔
200

201
    def __exit__(
3✔
202
        self,
203
        exc_type: type[BaseException] | None,
204
        exc_val: BaseException | None,
205
        exc_tb: TracebackType | None,
206
    ) -> bool | None:
207
        """Exit the context."""
208
        self.async_cleanup()
3✔
209
        return None
3✔
210

211
    def __del__(self) -> None:
3✔
212
        """Ensure cleanup on deletion."""
213
        self.async_cleanup()
3✔
214

215

216
@dataclass
3✔
217
class ConversationConfig:
3✔
218
    """Per-conversation runtime state tracked across messages."""
219

220
    task: asyncio.Task[None] | None = None
3✔
221
    draft: AssistantContentDeltaDict | None = None
3✔
222
    delta_lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock)
3✔
223
    content_lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock)
3✔
224
    send_lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock)
3✔
225
    draft_cancel: CALLBACK_TYPE | None = field(init=False, default=None)
3✔
226
    sent_drafts: dict[int, str] | None = field(init=False, default=None)
3✔
227

228

229
class TelegramChatHandler:
3✔
230
    """Handle conversation logic for a single Telegram chat."""
231

232
    def __init__(
3✔
233
        self,
234
        hass: HomeAssistant,
235
        entry: ConfigEntry,
236
        chat_id: int,
237
        notify_entity_id: str | None,
238
        subentry_id: str,
239
        user_id_map: dict[int, str],
240
        config: dict[str, Any],
241
    ) -> None:
242
        """Initialize the per-chat handler."""
243
        self.hass = hass
3✔
244
        self.entry = entry
3✔
245
        self.chat_id = chat_id
3✔
246
        self.config = config
3✔
247
        self.telegram_entry_id = config[CONF_TELEGRAM_ENTRY]
3✔
248
        self.user_id = config.get(CONF_USER)
3✔
249
        self.user_id_map = user_id_map
3✔
250
        self.agent_id: str | None = config.get(CONF_CONVERSATION_AGENT)
3✔
251
        self.notify_entity_id = notify_entity_id
3✔
252
        self.subentry_id = subentry_id
3✔
253
        self.conversations: dict[int, ConversationConfig] = {}
3✔
254

255
        def cancel_drafts() -> None:
3✔
256
            """Cancel the current draft if it exists."""
257
            for conversation in self.conversations.values():
3✔
258
                if conversation.draft_cancel:
3✔
259
                    conversation.draft_cancel()
×
260

261
        self.entry.async_on_unload(cancel_drafts)
3✔
262

263
        self.extra_prompt = "The user is interacting through Telegram"
3✔
264
        if chat_id < 0:
3✔
265
            self.extra_prompt += " in a group chat"
3✔
266
        self.extra_prompt += ". Markdown is fully supported. "
3✔
267
        if config[CONF_LATEX]:
3✔
268
            self.extra_prompt += "Inline LaTeX rendering is supported. "
×
269
        if config[CONF_ATTACHMENTS]:
3✔
270
            self.extra_prompt += "Long code blocks will be sent as files. "
3✔
271
        if config[CONF_MERMAID]:
3✔
272
            self.extra_prompt += (
×
273
                "Mermaid is supported as inline code blocks "
274
                "and will be rendered to an image. "
275
            )
276
        self.extra_prompt += (
3✔
277
            f"If the response message starts with any of {REACTION_EMOJI}, "
278
            "it will be added as a reaction to the user message."
279
        )
280

281
    def _get_conversation_id(self, thread_id: int) -> str:
3✔
282
        """Build the Home Assistant conversation ID for a Telegram chat."""
283
        conversation_id = f"telegram_{self.chat_id}"
3✔
284
        if thread_id:
3✔
285
            conversation_id += f"_{thread_id}"
×
286
        return conversation_id
3✔
287

288
    def _get_context(
3✔
289
        self, context: Context | None = None, effective_user_id: int | None = None
290
    ) -> Context:
291
        """Get a context with the user_id set."""
292
        if context is None:
3✔
293
            context = Context()
×
294
        if (
3✔
295
            context.user_id is None
296
            and effective_user_id is not None
297
            and effective_user_id in self.user_id_map
298
        ):
299
            context.user_id = self.user_id_map[effective_user_id]
3✔
300
        if context.user_id is None:
3✔
301
            context.user_id = self.user_id
3✔
302
        return context
3✔
303

304
    @callback
3✔
305
    def _async_schedule_update_draft(
3✔
306
        self,
307
        thread_id: int,
308
        delay: float,
309
        context: Context,
310
    ) -> CALLBACK_TYPE:
311
        """Schedule a draft update on the Home Assistant event loop."""
312

313
        def log_exceptions(task: asyncio.Task[dict[str, list[dict[str, int]]]]) -> None:
3✔
314
            """Log exceptions from send_message."""
315
            with contextlib.suppress(asyncio.CancelledError):
×
316
                if err := task.exception():
×
317
                    LOGGER.error(
×
318
                        "Error in send_message for chat_id=%s, thread_id=%s: %s",
319
                        self.chat_id,
320
                        thread_id,
321
                        err,
322
                        exc_info=err,
323
                    )
324

325
        @callback
3✔
326
        def _run_update_draft(_: Any) -> None:
3✔
327
            self.entry.async_create_task(
×
328
                self.hass,
329
                self.send_message(thread_id=thread_id, context=context, draft=True),
330
                "send_message",
331
            ).add_done_callback(log_exceptions)
332

333
        return async_call_later(self.hass, delay, _run_update_draft)
3✔
334

335
    async def send_message(  # noqa: C901
3✔
336
        self,
337
        context: Context,
338
        message: str = "",
339
        thread_id: int = 0,
340
        draft: bool = False,
341
    ) -> dict[str, list[dict[str, int]]]:
342
        """Send telegram message, taking care of formatting, length, and drafts."""
343
        current_conversation = self.conversations.setdefault(
3✔
344
            thread_id, ConversationConfig()
345
        )
346
        if current_conversation.draft_cancel is not None:
3✔
347
            current_conversation.draft_cancel()
3✔
348
        current_conversation.draft_cancel = None
3✔
349

350
        if draft:
3✔
351
            message = (
×
352
                (
353
                    current_conversation.draft["content"]
354
                    or (
355
                        current_conversation.draft["thinking_content"]
356
                        if self.config[CONF_THOUGHTS]
357
                        else ""
358
                    )
359
                    or ""
360
                )
361
                if current_conversation.draft
362
                else ""
363
            )
364

365
        messages: dict[str, list[dict[str, int]]] = {"chats": []}
3✔
366
        created_files: list[Path] = []
3✔
367

368
        def save_file(file_name: str, file_data: bytes) -> Path:
3✔
369
            """Save temp file."""
370
            with tempfile.NamedTemporaryFile(
×
371
                mode="wb",
372
                prefix=Path(file_name).stem,
373
                suffix=Path(file_name).suffix,
374
                dir=self.config[CONF_TMPDIR],
375
                delete=False,
376
            ) as temp_file:
377
                temp_file.write(file_data)
×
378
                filename = Path(temp_file.name)
×
379
                created_files.append(filename)
×
380
                return filename
×
381

382
        def cleanup_created_files() -> None:
3✔
383
            """Delete temporary files created while sending messages."""
384
            for file in created_files:
×
385
                file.unlink(missing_ok=True)
×
386

387
        items = await telegramify(
3✔
388
            content=message,
389
            latex_escape=self.config[CONF_LATEX],
390
            render_mermaid=False if draft else self.config[CONF_MERMAID],
391
            min_file_lines=0 if draft else self.config[CONF_ATTACHMENTS],
392
            max_message_length=MAX_TELEGRAM_LENGTH,
393
        )
394

395
        try:
3✔
396
            async with current_conversation.send_lock:
3✔
397
                if (
3✔
398
                    not draft
399
                    and current_conversation.sent_drafts is not None
400
                    and SERVICE_SEND_MESSAGE_DRAFT
401
                    and self.chat_id > 0
402
                ):
403
                    thinking_message = async_translate_message(
×
404
                        self.hass, translation_key="thinking"
405
                    )
406
                    await self.hass.services.async_call(
×
407
                        TELEGRAM_DOMAIN,
408
                        SERVICE_SEND_MESSAGE_DRAFT,
409
                        {
410
                            ATTR_MESSAGE: markdownify(thinking_message),
411
                            **get_telegram_service_target(
412
                                self.chat_id, self.notify_entity_id
413
                            ),
414
                            ATTR_MESSAGE_THREAD_ID: thread_id,
415
                            CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
416
                            ATTR_PARSER: "markdownv2",
417
                            ATTR_DRAFT_ID: 1,
418
                        },
419
                        blocking=True,
420
                        context=context,
421
                    )
422

423
                if current_conversation.sent_drafts is None:
3✔
424
                    current_conversation.sent_drafts = {}
3✔
425

426
                sent_drafts_iter = iter(current_conversation.sent_drafts.copy().items())
3✔
427

428
                with TelegramMessageWatcher(
3✔
429
                    self.hass, self.telegram_entry_id
430
                ) as watcher:
431
                    # All messages but the last are real messages
432
                    for item in (
3✔
433
                        items[:-1]
434
                        if draft and SERVICE_SEND_MESSAGE_DRAFT and self.chat_id > 0
435
                        else items
436
                    ):
437
                        if item.content_type == ContentType.TEXT:
3✔
438
                            text = entities_to_markdownv2(item.text, item.entities)
3✔
439
                        elif item.content_type == ContentType.PHOTO:
×
440
                            caption_md = entities_to_markdownv2(
×
441
                                item.caption_text, item.caption_entities
442
                            )
443
                            text = "Drawing " + caption_md
×
444
                        elif item.content_type == ContentType.FILE:
×
445
                            caption_md = entities_to_markdownv2(
×
446
                                item.caption_text, item.caption_entities
447
                            )
448
                            text = "Writing " + caption_md
×
449
                        else:
450
                            continue
×
451

452
                        disable_notification = draft or item is not items[-1]
3✔
453
                        disable_web_prev = draft or self.config[CONF_DISABLE_WEB_PREV]
3✔
454

455
                        try:
3✔
456
                            message_id, last_text = next(sent_drafts_iter)
3✔
457
                            # Message was already sent, edit it if the text has changed
458
                            if (
×
459
                                text == last_text
460
                                and disable_notification
461
                                and disable_web_prev
462
                            ):
463
                                continue
×
464
                            if (
×
465
                                not disable_notification
466
                            ):  # Cannot edit messages with a notification
467
                                for message_id, _ in itertools.chain(
×
468
                                    [(message_id, last_text)], sent_drafts_iter
469
                                ):  # Delete this draft and all remaining ones to maintain sequence
470
                                    await self.hass.services.async_call(
×
471
                                        TELEGRAM_DOMAIN,
472
                                        SERVICE_DELETE_MESSAGE,
473
                                        {
474
                                            ATTR_MESSAGE_ID: message_id,
475
                                            **get_telegram_service_target(
476
                                                self.chat_id, self.notify_entity_id
477
                                            ),
478
                                            CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
479
                                        },
480
                                        blocking=True,
481
                                        context=context,
482
                                    )
483

484
                                    current_conversation.sent_drafts.pop(
×
485
                                        message_id, None
486
                                    )
487
                                raise StopIteration  # noqa: TRY301
×
488

489
                            if draft or item.content_type == ContentType.TEXT:
×
490
                                await self.hass.services.async_call(
×
491
                                    TELEGRAM_DOMAIN,
492
                                    SERVICE_EDIT_MESSAGE,
493
                                    {
494
                                        ATTR_MESSAGE: text,
495
                                        **get_telegram_service_target(
496
                                            self.chat_id, self.notify_entity_id
497
                                        ),
498
                                        ATTR_MESSAGE_ID: message_id,
499
                                        CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
500
                                        ATTR_PARSER: "markdownv2",
501
                                        ATTR_DISABLE_WEB_PREV: disable_web_prev,
502
                                    },
503
                                    blocking=True,
504
                                    context=context,
505
                                )
506
                            elif item.content_type in (
×
507
                                ContentType.PHOTO,
508
                                ContentType.FILE,
509
                            ):
510
                                await self.hass.services.async_call(
×
511
                                    TELEGRAM_DOMAIN,
512
                                    SERVICE_EDIT_MESSAGE_MEDIA,
513
                                    {
514
                                        ATTR_FILE: (
515
                                            await self.hass.async_add_executor_job(
516
                                                save_file,
517
                                                item.file_name,
518
                                                item.file_data,
519
                                            )
520
                                        ).as_posix(),
521
                                        ATTR_CAPTION: entities_to_markdownv2(
522
                                            item.caption_text, item.caption_entities
523
                                        ),
524
                                        **get_telegram_service_target(
525
                                            self.chat_id, self.notify_entity_id
526
                                        ),
527
                                        ATTR_MESSAGE_ID: message_id,
528
                                        ATTR_MEDIA_TYPE: InputMediaType.PHOTO
529
                                        if item.content_type == ContentType.PHOTO
530
                                        else InputMediaType.DOCUMENT,
531
                                        CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
532
                                        ATTR_PARSER: "markdownv2",
533
                                    },
534
                                    blocking=True,
535
                                    context=context,
536
                                )
537
                            messages["chats"].append(
×
538
                                {
539
                                    ATTR_CHAT_ID: self.chat_id,
540
                                    ATTR_MESSAGE_ID: message_id,
541
                                }
542
                            )
543
                            current_conversation.sent_drafts.update({message_id: text})
×
544
                        except StopIteration:
3✔
545
                            # Message was not sent yet, create.
546
                            if draft or item.content_type == ContentType.TEXT:
3✔
547
                                item_messages: dict[str, list[dict[str, Any]]] = (
3✔
548
                                    await self.hass.services.async_call(
549
                                        TELEGRAM_DOMAIN,
550
                                        SERVICE_SEND_MESSAGE,
551
                                        {
552
                                            ATTR_MESSAGE: text,
553
                                            **get_telegram_service_target(
554
                                                self.chat_id, self.notify_entity_id
555
                                            ),
556
                                            ATTR_MESSAGE_THREAD_ID: thread_id,
557
                                            CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
558
                                            ATTR_PARSER: "markdownv2",
559
                                            ATTR_DISABLE_NOTIF: disable_notification,
560
                                            ATTR_DISABLE_WEB_PREV: disable_web_prev,
561
                                        },
562
                                        blocking=True,
563
                                        context=context,
564
                                        return_response=True,
565
                                    )  # type: ignore[assignment]
566
                                    or {"chats": []}
567
                                )
568
                            elif item.content_type in (
×
569
                                ContentType.PHOTO,
570
                                ContentType.FILE,
571
                            ):
572
                                item_messages = await self.hass.services.async_call(
×
573
                                    TELEGRAM_DOMAIN,
574
                                    SERVICE_SEND_PHOTO
575
                                    if item.content_type == ContentType.PHOTO
576
                                    else SERVICE_SEND_DOCUMENT,
577
                                    {
578
                                        ATTR_FILE: (
579
                                            await self.hass.async_add_executor_job(
580
                                                save_file,
581
                                                item.file_name,
582
                                                item.file_data,
583
                                            )
584
                                        ).as_posix(),
585
                                        ATTR_CAPTION: entities_to_markdownv2(
586
                                            item.caption_text, item.caption_entities
587
                                        ),
588
                                        **get_telegram_service_target(
589
                                            self.chat_id, self.notify_entity_id
590
                                        ),
591
                                        ATTR_MESSAGE_THREAD_ID: thread_id,
592
                                        CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
593
                                        ATTR_DISABLE_NOTIF: disable_notification,
594
                                        ATTR_PARSER: "markdownv2",
595
                                    },
596
                                    blocking=True,
597
                                    context=context,
598
                                    return_response=True,
599
                                ) or {"chats": []}  # type: ignore[assignment]
600

601
                            messages["chats"].extend(item_messages["chats"])
3✔
602
                            await asyncio.wait_for(
3✔
603
                                asyncio.gather(
604
                                    *(
605
                                        watcher.wait_message(
606
                                            msg[ATTR_CHAT_ID], msg[ATTR_MESSAGE_ID]
607
                                        )
608
                                        for msg in item_messages["chats"]
609
                                    )
610
                                ),
611
                                timeout=10.0,
612
                            )
613
                            current_conversation.sent_drafts.update(
3✔
614
                                {
615
                                    msg[ATTR_MESSAGE_ID]: text
616
                                    for msg in item_messages["chats"]
617
                                    if msg[ATTR_CHAT_ID] == self.chat_id
618
                                }
619
                            )
620

621
                # Delete the remaining draft messages if there are more drafts than current content items
622
                for message_id, _ in sent_drafts_iter:
3✔
623
                    await self.hass.services.async_call(
×
624
                        TELEGRAM_DOMAIN,
625
                        SERVICE_DELETE_MESSAGE,
626
                        {
627
                            ATTR_MESSAGE_ID: message_id,
628
                            **get_telegram_service_target(
629
                                self.chat_id, self.notify_entity_id
630
                            ),
631
                            CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
632
                        },
633
                        blocking=True,
634
                        context=context,
635
                    )
636

637
                    current_conversation.sent_drafts.pop(message_id, None)
×
638

639
                if draft and SERVICE_SEND_MESSAGE_DRAFT and self.chat_id > 0 and items:
3✔
640
                    item = items[-1]
×
641
                    text = entities_to_markdownv2(item.text, item.entities)
×
642

643
                    await self.hass.services.async_call(
×
644
                        TELEGRAM_DOMAIN,
645
                        SERVICE_SEND_MESSAGE_DRAFT,
646
                        {
647
                            ATTR_MESSAGE: text,
648
                            **get_telegram_service_target(
649
                                self.chat_id, self.notify_entity_id
650
                            ),
651
                            ATTR_MESSAGE_THREAD_ID: thread_id,
652
                            CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
653
                            ATTR_PARSER: "markdownv2",
654
                            ATTR_DRAFT_ID: len(items) + 1,
655
                        },
656
                        blocking=True,
657
                        context=context,
658
                    )
659
                elif not draft:
3✔
660
                    current_conversation.sent_drafts = None
3✔
661

662
        except HomeAssistantError as e:
×
663
            if draft and "Flood control exceeded. Retry in " in str(e):
×
664
                try:
×
665
                    if str(e).endswith(" seconds"):
×
666
                        retry_after = float(
×
667
                            str(e).split("Retry in ")[1].split(" seconds")[0]
668
                        )
669
                    elif str(e).endswith(" minutes"):
×
670
                        retry_after = (
×
671
                            float(str(e).split("Retry in ")[1].split(" minutes")[0])
672
                            * 60
673
                        )
674
                    else:
675
                        raise ValueError(f"Unknown time unit in error message: {e}")  # noqa: TRY301
×
676
                except ValueError:
×
677
                    retry_after = 3.0
×
678
                current_conversation.draft_cancel = self._async_schedule_update_draft(
×
679
                    thread_id,
680
                    retry_after,
681
                    context,
682
                )
683
        finally:
684
            if created_files:
3✔
685
                await self.hass.async_add_executor_job(cleanup_created_files)
×
686

687
        return messages
3✔
688

689
    async def async_handle_text(self, event: Event) -> None:
3✔
690
        """Handle text and attachment events."""
691
        thread_id = event.data.get(ATTR_MESSAGE_THREAD_ID) or 0
3✔
692

693
        current_conversation = self.conversations.setdefault(
3✔
694
            thread_id, ConversationConfig()
695
        )
696

697
        if (task := current_conversation.task) and not task.done():
3✔
698
            task.cancel("Conversation interrupted by new user message.")
×
699
            with contextlib.suppress(asyncio.CancelledError):
×
700
                await task
×
701

702
        task_name = f"telegram_conversation_{self.chat_id}_{thread_id}"
3✔
703
        task = self.entry.async_create_task(
3✔
704
            self.hass,
705
            self.async_process_message(event),
706
            task_name,
707
        )
708
        current_conversation.task = task
3✔
709

710
        def _clear_task(_task: asyncio.Task[None]) -> None:
3✔
711
            """Clear reference to completed task to avoid retaining tracebacks."""
712
            if current_conversation.task is _task:
3✔
713
                current_conversation.task = None
3✔
714
            try:
3✔
715
                if err := _task.exception():
3✔
716
                    LOGGER.error(
×
717
                        "Error in conversation task for chat_id=%s, thread_id=%s: %s",
718
                        self.chat_id,
719
                        thread_id,
720
                        err,
721
                        exc_info=err,
722
                    )
723
                else:
724
                    LOGGER.debug(
3✔
725
                        "Conversation task for chat_id=%s, thread_id=%s completed successfully",
726
                        self.chat_id,
727
                        thread_id,
728
                    )
729
            except asyncio.CancelledError as err:
×
730
                LOGGER.debug(
×
731
                    "Conversation task for chat_id=%s, thread_id=%s was cancelled: %s",
732
                    self.chat_id,
733
                    thread_id,
734
                    err,
735
                )
736

737
        task.add_done_callback(_clear_task)
3✔
738

739
    async def async_process_message(self, event: Event) -> None:
3✔
740
        """Handle conversation task."""
741
        context = self._get_context(event.context, event.data.get(ATTR_USER_ID))
3✔
742

743
        thread_id = event.data.get(ATTR_MESSAGE_THREAD_ID) or 0
3✔
744
        conversation_id = self._get_conversation_id(thread_id)
3✔
745

746
        error: BaseException | None = None
3✔
747

748
        @callback
3✔
749
        def chat_log_delta_listener(chat_log: ChatLog, delta: dict[str, Any]) -> None:
3✔
750
            """Handle chat log delta."""
751

752
            def log_exceptions(task: asyncio.Task[None]) -> None:
3✔
753
                """Log exceptions from the delta listener."""
754
                with contextlib.suppress(asyncio.CancelledError):
3✔
755
                    if err := task.exception():
3✔
756
                        LOGGER.error(
×
757
                            "Error in chat log delta listener for chat_id=%s, thread_id=%s: %s",
758
                            self.chat_id,
759
                            thread_id,
760
                            err,
761
                            exc_info=err,
762
                        )
763

764
            self.entry.async_create_task(
3✔
765
                self.hass,
766
                self.async_chat_log_delta_listener(
767
                    chat_log,
768
                    delta,
769
                    thread_id,
770
                    event.data.get(ATTR_MSGID),
771
                    context,
772
                ),
773
                "async_chat_log_delta_listener",
774
            ).add_done_callback(log_exceptions)
775

776
        with (
3✔
777
            async_get_chat_session(self.hass, conversation_id) as session,
778
            async_get_chat_log(
779
                self.hass,
780
                session,
781
                chat_log_delta_listener=chat_log_delta_listener,
782
            ) as chat_log,
783
        ):
784
            if event.data.get(ATTR_FILE_ID):
3✔
785
                file_path = Path(
×
786
                    (
787
                        await self.hass.services.async_call(  # type: ignore[arg-type]
788
                            TELEGRAM_DOMAIN,
789
                            SERVICE_DOWNLOAD_FILE,
790
                            {
791
                                CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
792
                                ATTR_FILE_ID: event.data[ATTR_FILE_ID],
793
                            },
794
                            blocking=True,
795
                            context=context,
796
                            return_response=True,
797
                        )
798
                    )[ATTR_FILE_PATH]  # type: ignore[index]
799
                )
800

801
                input_text = event.data.get(ATTR_TEXT) or file_path.name
×
802
                chat_log.async_add_user_content(
×
803
                    UserContent(
804
                        input_text,  # Must be exactly same text as in async_converse
805
                        attachments=[
806
                            Attachment(
807
                                media_content_id=f"media-source://{TELEGRAM_DOMAIN}/{event.data.get(ATTR_FILE_ID)}",
808
                                mime_type=event.data.get(ATTR_FILE_MIME_TYPE),  # type: ignore[arg-type]
809
                                path=file_path,
810
                            )
811
                        ],
812
                    )
813
                )
814

815
                def cleanup_file() -> None:
×
816
                    """Cleanup temporary file."""
817
                    file_path.unlink(missing_ok=True)
×
818

819
                @callback
×
820
                def cleanup_file_callback() -> None:
×
821
                    """Cleanup temporary file."""
822
                    self.hass.async_add_executor_job(cleanup_file)
×
823

824
                session.async_on_cleanup(cleanup_file_callback)
×
825
            else:
826
                input_text = event.data.get(ATTR_TEXT) or ""
3✔
827

828
            try:
3✔
829
                await async_converse(
3✔
830
                    self.hass,
831
                    text=input_text,
832
                    conversation_id=session.conversation_id,
833
                    context=context,
834
                    agent_id=self.agent_id,
835
                    extra_system_prompt=self.extra_prompt,
836
                )
837
            except (Exception, asyncio.CancelledError) as e:  # noqa: BLE001
×
838
                error = e
×
839

840
            # Flush any remaining delta
841
            chat_log_delta_listener(chat_log, {"role": None})
3✔
842

843
        timeout = self.config[CONF_CONVERSATION_TIMEOUT]
3✔
844
        session.last_updated = (
3✔
845
            dt_util.utcnow() + timedelta(**timeout) - CONVERSATION_TIMEOUT
846
        )
847

848
        if error:
3✔
849
            if not isinstance(error, asyncio.CancelledError):
×
850
                message = async_translate_message(
×
851
                    self.hass,
852
                    translation_key="conversation_error",
853
                    translation_placeholders={"error": str(error)},
854
                )
855
                await self.async_handle_chat_log_event(
×
856
                    thread_id=thread_id,
857
                    event_type=ChatLogEventType.CONTENT_ADDED,
858
                    data={"content": {"role": "assistant", "content": message}},
859
                    context=context,
860
                )
861
            raise error
×
862

863
    async def async_chat_log_delta_listener(
3✔
864
        self,
865
        chat_log: ChatLog,
866
        delta: dict[str, Any],
867
        thread_id: int,
868
        msg_id: int | None,
869
        context: Context,
870
    ) -> None:
871
        """Handle chat log delta."""
872
        current_conversation = self.conversations[thread_id]
3✔
873

874
        def get_reaction(content: str) -> str | None:
3✔
875
            """Extract reaction from content if it starts with a known reaction emoji."""
876
            matches = [e for e in REACTION_EMOJI if content.lstrip().startswith(e)]
3✔
877
            return max(matches, key=len) if matches else None
3✔
878

879
        async with current_conversation.delta_lock:
3✔
880
            LOGGER.debug("Chat log delta: %s", delta)
3✔
881
            if "role" in delta:
3✔
882
                if current_conversation.draft:
3✔
883
                    if (
3✔
884
                        (
885
                            current_conversation.draft["content"]
886
                            or current_conversation.draft["thinking_content"]
887
                            or current_conversation.draft["tool_calls"]
888
                        )
889
                        and (
890
                            delta["role"]
891
                            or (  # for last content, only commit if it exists in the chat log
892
                                chat_log.content[-1].role == "assistant"
893
                                and current_conversation.draft["content"]
894
                                and (chat_log.content[-1].content or "").endswith(
895
                                    current_conversation.draft["content"]
896
                                )  # endswith is to cater for a possible reaction
897
                            )
898
                        )
899
                    ):
900
                        await self.async_handle_chat_log_event(
3✔
901
                            thread_id=thread_id,
902
                            event_type=ChatLogEventType.CONTENT_ADDED,
903
                            data={"content": current_conversation.draft},
904
                            context=context,
905
                        )
906
                    else:
907
                        # Clear drafts
908
                        await self.send_message(
×
909
                            thread_id=thread_id,
910
                            context=context,
911
                        )
912
                elif delta["role"] is None:
3✔
913
                    # Also clear drafts
914
                    await self.send_message(
3✔
915
                        thread_id=thread_id,
916
                        context=context,
917
                    )
918
                if delta["role"] == "assistant":
3✔
919
                    current_conversation.draft = AssistantContentDeltaDict(
3✔
920
                        role="assistant",
921
                        content="",
922
                        thinking_content="",
923
                        tool_calls=[],
924
                    )
925
                else:
926
                    current_conversation.draft = None
3✔
927

928
                if delta["role"] is None:
3✔
929
                    responded_tool_calls: set[str] = set()
3✔
930
                    async with current_conversation.content_lock:
3✔
931
                        for content in reversed(chat_log.content):
3✔
932
                            if content.role == "tool_result":
3✔
933
                                responded_tool_calls.add(content.tool_call_id)
×
934
                                continue
×
935
                            if content.role == "assistant":
3✔
936
                                for tool_call in content.tool_calls or []:
3✔
937
                                    if tool_call.id not in responded_tool_calls:
×
938
                                        chat_log.async_add_assistant_content_without_tools(
×
939
                                            ToolResultContent(
940
                                                agent_id=self.agent_id or "",
941
                                                tool_call_id=tool_call.id,
942
                                                tool_name=tool_call.tool_name,
943
                                                tool_result={
944
                                                    "error": "asyncio.CancelledError: "
945
                                                    "Conversation interrupted before "
946
                                                    "tool call could be responded to. "
947
                                                    "Please try again."
948
                                                },
949
                                            )
950
                                        )
951
                            break
3✔
952

953
                if current_conversation.draft:
3✔
954
                    # Send typing action at the beginning of each assistant response
955
                    await self.hass.services.async_call(
3✔
956
                        TELEGRAM_DOMAIN,
957
                        SERVICE_SEND_CHAT_ACTION,
958
                        {
959
                            CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
960
                            **get_telegram_service_target(
961
                                self.chat_id,
962
                                self.notify_entity_id,
963
                            ),
964
                            ATTR_MESSAGE_THREAD_ID: thread_id,
965
                            ATTR_CHAT_ACTION: CHAT_ACTION_TYPING,
966
                        },
967
                        context=context,
968
                    )
969

970
            if current_conversation.draft:
3✔
971
                if "content" in delta:
3✔
972
                    if not current_conversation.draft["content"] and (
3✔
973
                        reaction := get_reaction(delta["content"])
974
                    ):
975
                        await self.hass.services.async_call(
×
976
                            TELEGRAM_DOMAIN,
977
                            SERVICE_SET_MESSAGE_REACTION,
978
                            {
979
                                CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
980
                                ATTR_MESSAGE_ID: msg_id or "last",
981
                                ATTR_CHAT_ID: self.chat_id,
982
                                ATTR_REACTION: reaction,
983
                            },
984
                            context=context,
985
                        )
986
                        current_conversation.draft["content"] = (
×
987
                            delta["content"].lstrip().removeprefix(reaction).lstrip()
988
                        )
989
                    else:
990
                        current_conversation.draft["content"] += delta["content"]
3✔
991
                    if not current_conversation.draft_cancel:
3✔
992
                        current_conversation.draft_cancel = (
3✔
993
                            self._async_schedule_update_draft(
994
                                thread_id,
995
                                1.0,
996
                                context,
997
                            )
998
                        )
999
                if "thinking_content" in delta:
3✔
1000
                    current_conversation.draft["thinking_content"] += delta[
×
1001
                        "thinking_content"
1002
                    ]
1003
                    if not current_conversation.draft_cancel:
×
1004
                        current_conversation.draft_cancel = (
×
1005
                            self._async_schedule_update_draft(
1006
                                thread_id,
1007
                                1.0,
1008
                                context,
1009
                            )
1010
                        )
1011
                if "tool_calls" in delta:
3✔
1012
                    current_conversation.draft["tool_calls"].extend(delta["tool_calls"])  # type: ignore[union-attr]
×
1013

1014
    async def async_handle_chat_log_event(
3✔
1015
        self,
1016
        thread_id: int,
1017
        event_type: ChatLogEventType,
1018
        data: dict[str, Any],
1019
        context: Context,
1020
    ) -> None:
1021
        """Handle chat log events."""
1022
        # async_subscribe_chat_logs does not provide context, ensure user_id is set
1023
        context = self._get_context(context)
3✔
1024
        current_conversation = self.conversations[thread_id]
3✔
1025
        async with current_conversation.content_lock:
3✔
1026
            if (
3✔
1027
                event_type == ChatLogEventType.CONTENT_ADDED
1028
                and (content := data.get("content"))
1029
                and content.get("role") == "assistant"
1030
                and (message := content.get("content"))
1031
            ):
1032
                if current_conversation.draft_cancel:
3✔
1033
                    current_conversation.draft_cancel()
3✔
1034
                await self.send_message(
3✔
1035
                    message=message,
1036
                    thread_id=thread_id,
1037
                    context=context,
1038
                )
1039

1040
    @callback
3✔
1041
    def _reset_conversation_history(self, thread_id: int) -> None:
3✔
1042
        """Remove the persisted Home Assistant session and chat log for a thread."""
1043
        conversation_id = self._get_conversation_id(thread_id)
3✔
1044

1045
        if (all_sessions := self.hass.data.get(DATA_CHAT_SESSION)) and (
3✔
1046
            session := all_sessions.pop(conversation_id, None)
1047
        ):
1048
            session.async_cleanup()
3✔
1049

1050
        if all_chat_logs := self.hass.data.get(DATA_CHAT_LOGS):
3✔
1051
            all_chat_logs.pop(conversation_id, None)
×
1052

1053
    async def _async_change_agent(self, agent_id: str) -> bool:
3✔
1054
        """Change the conversation agent for this chat."""
1055
        LOGGER.debug(
×
1056
            "Change agent for chat_id=%s to agent_id=%s", self.chat_id, agent_id
1057
        )
1058

1059
        self.agent_id = agent_id
×
1060
        # The above might be redundant because we are reloading the config entry
1061
        subentry = self.entry.subentries[self.subentry_id]
×
1062
        data = {**subentry.data, CONF_CONVERSATION_AGENT: agent_id}
×
1063
        LOGGER.debug("Updating subentry %s with data %s", subentry.subentry_id, data)
×
1064
        try:
×
1065
            self.hass.config_entries.async_update_subentry(
×
1066
                self.entry, subentry, data=data
1067
            )
1068
        except HomeAssistantError as e:
×
1069
            LOGGER.exception(
×
1070
                "Failed to update subentry %s: %s",
1071
                subentry.subentry_id,
1072
                e,
1073
                stack_info=True,
1074
            )
1075
            return False
×
1076
        LOGGER.debug("Subentry %s updated", subentry.subentry_id)
×
1077

1078
        return True
×
1079

1080
    async def async_process_command(
3✔
1081
        self,
1082
        thread_id: int,
1083
        command: str,
1084
        args: list[str],
1085
        context: Context,
1086
    ) -> None:
1087
        """Process a bot command."""
1088
        LOGGER.debug("Received command: %s with args: %s", command, args)
3✔
1089
        match command:
3✔
1090
            case "/model":
3✔
1091
                selected_agent = args[0] if len(args) > 0 else None
×
1092
                current_agent = self.agent_id
×
1093

1094
                agents = {
×
1095
                    agent.id: agent.name
1096
                    for agent in get_agent_manager(self.hass).async_get_agent_info()
1097
                    if not isinstance(
1098
                        get_agent_manager(self.hass).async_get_agent(agent.id),
1099
                        ConversationEntity,
1100
                    )
1101
                } | {
1102
                    entity.entity_id: (
1103
                        self.hass.states.get(entity.entity_id).name  # type: ignore[union-attr]
1104
                        if self.hass.states.get(entity.entity_id)
1105
                        else entity.entity_id
1106
                    )
1107
                    for entity in self.hass.data[DATA_COMPONENT].entities
1108
                }
1109

1110
                LOGGER.debug(
×
1111
                    "Selected agent: %s, current agent: %s, available agents: %s",
1112
                    selected_agent,
1113
                    current_agent,
1114
                    agents,
1115
                )
1116
                if (
×
1117
                    selected_agent is not None
1118
                    and selected_agent in agents
1119
                    and await self._async_change_agent(selected_agent)
1120
                ):
1121
                    LOGGER.debug(
×
1122
                        "Agent switched to %s for chat_id=%s",
1123
                        selected_agent,
1124
                        self.chat_id,
1125
                    )
1126
                    message = async_translate_message(
×
1127
                        self.hass,
1128
                        translation_key="conversation_agent_updated",
1129
                        translation_placeholders={
1130
                            "agent_id": agents.get(selected_agent, selected_agent)
1131
                        },
1132
                    )
1133
                    await self.send_message(
×
1134
                        message=message,
1135
                        thread_id=thread_id,
1136
                        context=context,
1137
                    )
1138
                else:
1139
                    message = async_translate_message(
×
1140
                        self.hass,
1141
                        translation_key="current_conversation_agent",
1142
                        translation_placeholders={
1143
                            "agent_id": agents.get(current_agent, current_agent)
1144
                            if current_agent
1145
                            else "None"
1146
                        },
1147
                    )
1148
                    messages = await self.send_message(
×
1149
                        message=message,
1150
                        thread_id=thread_id,
1151
                        context=context,
1152
                    )
1153
                    if messages["chats"]:
×
1154
                        msg = messages["chats"][-1]
×
1155
                        await self.hass.services.async_call(
×
1156
                            TELEGRAM_DOMAIN,
1157
                            SERVICE_EDIT_REPLYMARKUP,
1158
                            {
1159
                                CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
1160
                                **get_telegram_service_target(
1161
                                    msg[ATTR_CHAT_ID],
1162
                                    self.notify_entity_id,
1163
                                ),
1164
                                ATTR_MESSAGE_ID: msg[ATTR_MESSAGE_ID],
1165
                                ATTR_KEYBOARD_INLINE: [
1166
                                    [(agent_name, f"/model {agent_id}")]
1167
                                    for agent_id, agent_name in agents.items()
1168
                                ],
1169
                            },
1170
                            context=context,
1171
                        )
1172
            case "/new":
3✔
1173
                if current_conversation := self.conversations.get(thread_id):
3✔
1174
                    if (task := current_conversation.task) and not task.done():
×
1175
                        task.cancel(
×
1176
                            "Conversation interrupted by new conversation command."
1177
                        )
1178
                        with contextlib.suppress(asyncio.CancelledError):
×
1179
                            await task
×
1180
                    if current_conversation.draft_cancel:
×
1181
                        current_conversation.draft_cancel()
×
1182
                        current_conversation.draft_cancel = None
×
1183

1184
                self._reset_conversation_history(thread_id)
3✔
1185

1186
                message = async_translate_message(
3✔
1187
                    self.hass, translation_key="new_conversation"
1188
                )
1189
                await self.send_message(
3✔
1190
                    message=message,
1191
                    thread_id=thread_id,
1192
                    context=context,
1193
                )
1194

1195
    async def async_handle_command(self, event: Event) -> None:
3✔
1196
        """Handle command events."""
1197
        context = self._get_context(event.context, event.data.get(ATTR_USER_ID))
×
1198
        await self.async_process_command(
×
1199
            event.data.get(ATTR_MESSAGE_THREAD_ID) or 0,
1200
            event.data["command"].split("@")[0],
1201
            event.data.get("args", []),
1202
            context,
1203
        )
1204

1205
    async def async_handle_callback(self, event: Event) -> None:
3✔
1206
        """Handle callback query events."""
1207
        LOGGER.debug("callback_event data: %s", event.data)
×
1208
        context = self._get_context(event.context, event.data.get(ATTR_USER_ID))
×
1209
        args = event.data.get("data", "").split(" ")
×
1210

1211
        await self.async_process_command(
×
1212
            event.data.get(ATTR_MESSAGE, {}).get(ATTR_MESSAGE_THREAD_ID) or 0,
1213
            args.pop(0),
1214
            args,
1215
            context,
1216
        )
1217
        await self.hass.services.async_call(
×
1218
            TELEGRAM_DOMAIN,
1219
            SERVICE_ANSWER_CALLBACK_QUERY,
1220
            {
1221
                CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
1222
                ATTR_MESSAGE: "Done",
1223
                ATTR_CALLBACK_QUERY_ID: event.data.get(ATTR_MSGID),
1224
            },
1225
            context=context,
1226
        )
1227
        if event.data[ATTR_MSG]:
×
1228
            await self.hass.services.async_call(
×
1229
                TELEGRAM_DOMAIN,
1230
                SERVICE_EDIT_REPLYMARKUP,
1231
                {
1232
                    CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
1233
                    **get_telegram_service_target(
1234
                        self.chat_id,
1235
                        self.notify_entity_id,
1236
                    ),
1237
                    ATTR_MESSAGE_ID: event.data[ATTR_MSG][ATTR_MESSAGE_ID],
1238
                    ATTR_KEYBOARD_INLINE: [],
1239
                },
1240
                context=context,
1241
            )
1242

1243
    async def handle_generate_image_intent(
3✔
1244
        self, event: Event, context: Context, prompt: str
1245
    ) -> str:
1246
        """Handle the generate image intent."""
1247
        LOGGER.debug("generate_image_intent event data: %s", event.data)
×
1248
        context = self._get_context(context, event.data.get(ATTR_USER_ID))
×
1249

1250
        thread_id = event.data.get(ATTR_MESSAGE_THREAD_ID) or 0
×
1251

1252
        await self.hass.services.async_call(
×
1253
            TELEGRAM_DOMAIN,
1254
            SERVICE_SEND_CHAT_ACTION,
1255
            {
1256
                CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
1257
                **get_telegram_service_target(
1258
                    self.chat_id,
1259
                    self.notify_entity_id,
1260
                ),
1261
                ATTR_MESSAGE_THREAD_ID: thread_id,
1262
                ATTR_CHAT_ACTION: CHAT_ACTION_UPLOAD_PHOTO,
1263
            },
1264
            context=context,
1265
        )
1266

1267
        result: dict[str, Any] = await async_generate_image(
×
1268
            self.hass,
1269
            task_name=DOMAIN,
1270
            entity_id=self.config.get(CONF_AI_TASK),
1271
            instructions=prompt,
1272
        )  # type: ignore[assignment]
1273

1274
        media = await async_resolve_media(self.hass, result["media_source_id"], None)
×
1275

1276
        await self.hass.services.async_call(
×
1277
            TELEGRAM_DOMAIN,
1278
            SERVICE_SEND_PHOTO,
1279
            {
1280
                ATTR_FILE: media.path.as_posix(),  # type: ignore[union-attr]
1281
                ATTR_CAPTION: markdownify(result.get("revised_prompt") or ""),
1282
                **get_telegram_service_target(self.chat_id, self.notify_entity_id),
1283
                ATTR_MESSAGE_THREAD_ID: thread_id,
1284
                CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
1285
                ATTR_PARSER: "markdownv2",
1286
            },
1287
            blocking=True,
1288
            context=context,
1289
            return_response=True,
1290
        )
1291

1292
        message = "The image has been generated and sent to the user."
×
1293
        if (
×
1294
            revised_prompt := result.get("revised_prompt")
1295
        ) and revised_prompt != prompt:
1296
            message += f" Revised prompt: {revised_prompt}"
×
1297

1298
        return message
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc