• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

Shulyaka / telegram_bot_conversation / 23544302582

25 Mar 2026 01:46PM UTC coverage: 70.702% (-0.1%) from 70.833%
23544302582

Pull #44

github

web-flow
Merge 13452fb86 into 2a80eda2c
Pull Request #44: Fix exception logging and partial message send error

0 of 3 new or added lines in 1 file covered. (0.0%)

1 existing line in 1 file now uncovered.

765 of 1082 relevant lines covered (70.7%)

2.12 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

62.83
/custom_components/telegram_bot_conversation/entity.py
1
"""Per-chat handler for telegram_bot_conversation."""
2

3
import asyncio
3✔
4
from collections.abc import Mapping
3✔
5
import contextlib
3✔
6
from dataclasses import dataclass, field
3✔
7
from datetime import timedelta
3✔
8
import itertools
3✔
9
from pathlib import Path
3✔
10
import tempfile
3✔
11
from types import TracebackType
3✔
12
from typing import Any, Self
3✔
13

14
from telegramify_markdown import entities_to_markdownv2, markdownify, telegramify
3✔
15
from telegramify_markdown.content import ContentType
3✔
16

17
from homeassistant.components.ai_task import async_generate_image
3✔
18
from homeassistant.components.conversation import (
3✔
19
    AssistantContentDeltaDict,
20
    Attachment,
21
    ChatLog,
22
    ConversationEntity,
23
    ToolResultContent,
24
    UserContent,
25
    async_converse,
26
    async_get_chat_log,
27
)
28
from homeassistant.components.conversation.agent_manager import get_agent_manager
3✔
29
from homeassistant.components.conversation.chat_log import DATA_CHAT_LOGS
3✔
30
from homeassistant.components.conversation.const import DATA_COMPONENT, ChatLogEventType
3✔
31
from homeassistant.components.media_source import async_resolve_media
3✔
32
from homeassistant.components.telegram_bot import (  # type: ignore[attr-defined]
3✔
33
    InputMediaType,
34
)
35
from homeassistant.components.telegram_bot.const import (
3✔
36
    ATTR_CALLBACK_QUERY_ID,
37
    ATTR_CAPTION,
38
    ATTR_CHAT_ACTION,
39
    ATTR_CHAT_ID,
40
    ATTR_DISABLE_NOTIF,
41
    ATTR_DISABLE_WEB_PREV,
42
    ATTR_FILE,
43
    ATTR_FILE_ID,
44
    ATTR_FILE_MIME_TYPE,
45
    ATTR_FILE_PATH,
46
    ATTR_KEYBOARD_INLINE,
47
    ATTR_MEDIA_TYPE,
48
    ATTR_MESSAGE,
49
    ATTR_MESSAGE_ID,
50
    ATTR_MESSAGE_THREAD_ID,
51
    ATTR_MSG,
52
    ATTR_MSGID,
53
    ATTR_PARSER,
54
    ATTR_REACTION,
55
    ATTR_TEXT,
56
    ATTR_USER_ID,
57
    CHAT_ACTION_TYPING,
58
    CHAT_ACTION_UPLOAD_PHOTO,
59
    CONF_CONFIG_ENTRY_ID,
60
    DOMAIN as TELEGRAM_DOMAIN,
61
    EVENT_TELEGRAM_SENT,
62
    SERVICE_ANSWER_CALLBACK_QUERY,
63
    SERVICE_DELETE_MESSAGE,
64
    SERVICE_DOWNLOAD_FILE,
65
    SERVICE_EDIT_MESSAGE,
66
    SERVICE_EDIT_MESSAGE_MEDIA,
67
    SERVICE_EDIT_REPLYMARKUP,
68
    SERVICE_SEND_CHAT_ACTION,
69
    SERVICE_SEND_DOCUMENT,
70
    SERVICE_SEND_MESSAGE,
71
    SERVICE_SEND_PHOTO,
72
    SERVICE_SET_MESSAGE_REACTION,
73
)
74
from homeassistant.config_entries import ConfigEntry
3✔
75
from homeassistant.const import ATTR_ENTITY_ID
3✔
76
from homeassistant.core import CALLBACK_TYPE, Context, Event, HomeAssistant, callback
3✔
77
from homeassistant.exceptions import HomeAssistantError
3✔
78
from homeassistant.helpers.chat_session import (
3✔
79
    CONVERSATION_TIMEOUT,
80
    DATA_CHAT_SESSION,
81
    async_get_chat_session,
82
)
83
from homeassistant.helpers.event import async_call_later
3✔
84
from homeassistant.helpers.translation import async_get_cached_translations
3✔
85
from homeassistant.util import dt as dt_util
3✔
86

87
from .const import (
3✔
88
    CONF_AI_TASK,
89
    CONF_ATTACHMENTS,
90
    CONF_CONVERSATION_AGENT,
91
    CONF_CONVERSATION_TIMEOUT,
92
    CONF_DISABLE_WEB_PREV,
93
    CONF_LATEX,
94
    CONF_MERMAID,
95
    CONF_TELEGRAM_ENTRY,
96
    CONF_THOUGHTS,
97
    CONF_TMPDIR,
98
    CONF_USER,
99
    DOMAIN,
100
    LOGGER,
101
    REACTION_EMOJI,
102
)
103

104
try:
3✔
105
    from homeassistant.components.telegram_bot.const import (  # type: ignore[attr-defined]
3✔
106
        ATTR_DRAFT_ID,
107
        SERVICE_SEND_MESSAGE_DRAFT,
108
    )
109
except ImportError:
3✔
110
    SERVICE_SEND_MESSAGE_DRAFT = ATTR_DRAFT_ID = None
3✔
111

112
MAX_TELEGRAM_LENGTH = 4096
3✔
113

114

115
@callback
3✔
116
def async_translate_message(
3✔
117
    hass: HomeAssistant,
118
    translation_key: str,
119
    translation_placeholders: dict[str, str] | None = None,
120
) -> str:
121
    """Return a translated message."""
122
    localize_key = f"component.{DOMAIN}.common.{translation_key}"
3✔
123
    for language in (hass.config.language, "en"):
3✔
124
        translations = async_get_cached_translations(
3✔
125
            hass, language, category="common", integration=DOMAIN
126
        )
127
        if localize_key in translations:
3✔
128
            message = translations[localize_key]
3✔
129
            if not translation_placeholders:
3✔
130
                return message
3✔
131
            with contextlib.suppress(KeyError):
×
132
                message = message.format(**translation_placeholders)
×
133
            return message
×
134
    return translation_key
×
135

136

137
def get_telegram_service_target(
3✔
138
    chat_id: int, notify_entity_id: str | None
139
) -> dict[str, str | int | list[str]]:
140
    """Build a telegram service target payload."""
141
    if notify_entity_id:
3✔
142
        return {ATTR_ENTITY_ID: [notify_entity_id]}
3✔
143
    return {ATTR_CHAT_ID: chat_id}
×
144

145

146
class TelegramMessageWatcher:
3✔
147
    """Context management class to track sent messages."""
148

149
    def __init__(self, hass: HomeAssistant, telegram_config_entry_id: str) -> None:
3✔
150
        """Initialize the watcher."""
151
        self.hass = hass
3✔
152
        self.telegram_config_entry_id = telegram_config_entry_id
3✔
153
        self._unregister_listener: CALLBACK_TYPE | None = self.hass.bus.async_listen(
3✔
154
            EVENT_TELEGRAM_SENT,
155
            self.async_handle_sent,
156
            self.callback_sent_filter,
157
        )
158
        self.sent_messages: list[tuple[int, int]] = []
3✔
159
        self.watchers: dict[tuple[int, int], asyncio.Future[None]] = {}
3✔
160

161
    @callback
3✔
162
    def callback_sent_filter(self, event_data: Mapping[str, Any]) -> bool:
3✔
163
        """Filter sent events."""
164
        return bool(
3✔
165
            event_data.get("bot", {}).get(CONF_CONFIG_ENTRY_ID)
166
            == self.telegram_config_entry_id
167
        )
168

169
    async def async_handle_sent(self, event: Event) -> None:
3✔
170
        """Handle sent events."""
171
        message = (
3✔
172
            int(event.data[ATTR_CHAT_ID]),
173
            int(event.data.get(ATTR_MESSAGE_ID, 0)),
174
        )
175
        self.sent_messages.append(message)
3✔
176
        if message in self.watchers:
3✔
177
            self.watchers[message].set_result(None)
×
178
            del self.watchers[message]
×
179

180
    def wait_message(self, chat_id: int, message_id: int) -> asyncio.Future[None]:
3✔
181
        """Watch for a specific message to be sent."""
182
        message = (chat_id, message_id)
3✔
183
        if message in self.sent_messages:
3✔
184
            future: asyncio.Future[None] = asyncio.Future()
3✔
185
            future.set_result(None)
3✔
186
            return future
3✔
187
        self.watchers[message] = asyncio.Future()
×
188
        return self.watchers[message]
×
189

190
    def async_cleanup(self) -> None:
3✔
191
        """Clean up the watcher."""
192
        if not self._unregister_listener:
3✔
193
            return
3✔
194
        self._unregister_listener()
3✔
195
        self._unregister_listener = None
3✔
196

197
    def __enter__(self) -> Self:
3✔
198
        """Enter the context."""
199
        return self
3✔
200

201
    def __exit__(
3✔
202
        self,
203
        exc_type: type[BaseException] | None,
204
        exc_val: BaseException | None,
205
        exc_tb: TracebackType | None,
206
    ) -> bool | None:
207
        """Exit the context."""
208
        self.async_cleanup()
3✔
209
        return None
3✔
210

211
    def __del__(self) -> None:
3✔
212
        """Ensure cleanup on deletion."""
213
        self.async_cleanup()
3✔
214

215

216
@dataclass
3✔
217
class ConversationConfig:
3✔
218
    """Per-conversation runtime state tracked across messages."""
219

220
    task: asyncio.Task[None] | None = None
3✔
221
    draft: AssistantContentDeltaDict | None = None
3✔
222
    delta_lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock)
3✔
223
    content_lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock)
3✔
224
    send_lock: asyncio.Lock = field(init=False, default_factory=asyncio.Lock)
3✔
225
    draft_cancel: CALLBACK_TYPE | None = field(init=False, default=None)
3✔
226
    sent_drafts: dict[int, str] | None = field(init=False, default=None)
3✔
227

228

229
class TelegramChatHandler:
3✔
230
    """Handle conversation logic for a single Telegram chat."""
231

232
    def __init__(
3✔
233
        self,
234
        hass: HomeAssistant,
235
        entry: ConfigEntry,
236
        chat_id: int,
237
        notify_entity_id: str | None,
238
        subentry_id: str,
239
        user_id_map: dict[int, str],
240
        config: dict[str, Any],
241
    ) -> None:
242
        """Initialize the per-chat handler."""
243
        self.hass = hass
3✔
244
        self.entry = entry
3✔
245
        self.chat_id = chat_id
3✔
246
        self.config = config
3✔
247
        self.telegram_entry_id = config[CONF_TELEGRAM_ENTRY]
3✔
248
        self.user_id = config.get(CONF_USER)
3✔
249
        self.user_id_map = user_id_map
3✔
250
        self.agent_id: str | None = config.get(CONF_CONVERSATION_AGENT)
3✔
251
        self.notify_entity_id = notify_entity_id
3✔
252
        self.subentry_id = subentry_id
3✔
253
        self.conversations: dict[int, ConversationConfig] = {}
3✔
254

255
        def cancel_drafts() -> None:
3✔
256
            """Cancel the current draft if it exists."""
257
            for conversation in self.conversations.values():
3✔
258
                if conversation.draft_cancel:
3✔
259
                    conversation.draft_cancel()
×
260

261
        self.entry.async_on_unload(cancel_drafts)
3✔
262

263
        self.extra_prompt = "The user is interacting through Telegram"
3✔
264
        if chat_id < 0:
3✔
265
            self.extra_prompt += " in a group chat"
3✔
266
        self.extra_prompt += ". Markdown is fully supported. "
3✔
267
        if config[CONF_LATEX]:
3✔
268
            self.extra_prompt += "Inline LaTeX rendering is supported. "
×
269
        if config[CONF_ATTACHMENTS]:
3✔
270
            self.extra_prompt += "Long code blocks will be sent as files. "
3✔
271
        if config[CONF_MERMAID]:
3✔
272
            self.extra_prompt += (
×
273
                "Mermaid is supported as inline code blocks "
274
                "and will be rendered to an image. "
275
            )
276
        self.extra_prompt += (
3✔
277
            f"If the response message starts with any of {REACTION_EMOJI}, "
278
            "it will be added as a reaction to the user message."
279
        )
280

281
    def _get_conversation_id(self, thread_id: int) -> str:
3✔
282
        """Build the Home Assistant conversation ID for a Telegram chat."""
283
        conversation_id = f"telegram_{self.chat_id}"
3✔
284
        if thread_id:
3✔
285
            conversation_id += f"_{thread_id}"
×
286
        return conversation_id
3✔
287

288
    def _get_context(
3✔
289
        self, context: Context | None = None, effective_user_id: int | None = None
290
    ) -> Context:
291
        """Get a context with the user_id set."""
292
        if context is None:
3✔
293
            context = Context()
×
294
        if (
3✔
295
            context.user_id is None
296
            and effective_user_id is not None
297
            and effective_user_id in self.user_id_map
298
        ):
299
            context.user_id = self.user_id_map[effective_user_id]
3✔
300
        if context.user_id is None:
3✔
301
            context.user_id = self.user_id
3✔
302
        return context
3✔
303

304
    @callback
3✔
305
    def _async_schedule_update_draft(
3✔
306
        self,
307
        thread_id: int,
308
        delay: float,
309
        context: Context,
310
    ) -> CALLBACK_TYPE:
311
        """Schedule a draft update on the Home Assistant event loop."""
312

313
        def log_exceptions(task: asyncio.Task[dict[str, list[dict[str, int]]]]) -> None:
3✔
314
            """Log exceptions from send_message."""
315
            with contextlib.suppress(asyncio.CancelledError):
×
316
                if err := task.exception():
×
317
                    LOGGER.error(
×
318
                        "Error in send_message for chat_id=%s, thread_id=%s: %s",
319
                        self.chat_id,
320
                        thread_id,
321
                        err,
322
                        exc_info=err,
323
                    )
324

325
        @callback
3✔
326
        def _run_update_draft(_: Any) -> None:
3✔
327
            self.entry.async_create_task(
×
328
                self.hass,
329
                self.send_message(thread_id=thread_id, context=context, draft=True),
330
                "send_message",
331
            ).add_done_callback(log_exceptions)
332

333
        return async_call_later(self.hass, delay, _run_update_draft)
3✔
334

335
    async def send_message(  # noqa: C901
3✔
336
        self,
337
        context: Context,
338
        message: str = "",
339
        thread_id: int = 0,
340
        draft: bool = False,
341
    ) -> dict[str, list[dict[str, int]]]:
342
        """Send telegram message, taking care of formatting, length, and drafts."""
343
        current_conversation = self.conversations.setdefault(
3✔
344
            thread_id, ConversationConfig()
345
        )
346
        if current_conversation.draft_cancel is not None:
3✔
347
            current_conversation.draft_cancel()
3✔
348
        current_conversation.draft_cancel = None
3✔
349

350
        if draft:
3✔
351
            message = (
×
352
                (
353
                    current_conversation.draft["content"]
354
                    or (
355
                        current_conversation.draft["thinking_content"]
356
                        if self.config[CONF_THOUGHTS]
357
                        else ""
358
                    )
359
                    or ""
360
                )
361
                if current_conversation.draft
362
                else ""
363
            )
364

365
        messages: dict[str, list[dict[str, int]]] = {"chats": []}
3✔
366
        created_files: list[Path] = []
3✔
367

368
        def save_file(file_name: str, file_data: bytes) -> Path:
3✔
369
            """Save temp file."""
370
            with tempfile.NamedTemporaryFile(
×
371
                mode="wb",
372
                prefix=Path(file_name).stem,
373
                suffix=Path(file_name).suffix,
374
                dir=self.config[CONF_TMPDIR],
375
                delete=False,
376
            ) as temp_file:
377
                temp_file.write(file_data)
×
378
                filename = Path(temp_file.name)
×
379
                created_files.append(filename)
×
380
                return filename
×
381

382
        def cleanup_created_files() -> None:
3✔
383
            """Delete temporary files created while sending messages."""
384
            for file in created_files:
×
385
                file.unlink(missing_ok=True)
×
386

387
        items = await telegramify(
3✔
388
            content=message,
389
            latex_escape=self.config[CONF_LATEX],
390
            render_mermaid=False if draft else self.config[CONF_MERMAID],
391
            min_file_lines=0 if draft else self.config[CONF_ATTACHMENTS],
392
            max_message_length=MAX_TELEGRAM_LENGTH,
393
        )
394

395
        try:
3✔
396
            async with current_conversation.send_lock:
3✔
397
                if (
3✔
398
                    not draft
399
                    and current_conversation.sent_drafts is not None
400
                    and SERVICE_SEND_MESSAGE_DRAFT
401
                    and self.chat_id > 0
402
                ):
403
                    thinking_message = async_translate_message(
×
404
                        self.hass, translation_key="thinking"
405
                    )
406
                    await self.hass.services.async_call(
×
407
                        TELEGRAM_DOMAIN,
408
                        SERVICE_SEND_MESSAGE_DRAFT,
409
                        {
410
                            ATTR_MESSAGE: markdownify(thinking_message),
411
                            **get_telegram_service_target(
412
                                self.chat_id, self.notify_entity_id
413
                            ),
414
                            ATTR_MESSAGE_THREAD_ID: thread_id,
415
                            CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
416
                            ATTR_PARSER: "markdownv2",
417
                            ATTR_DRAFT_ID: 1,
418
                        },
419
                        blocking=True,
420
                        context=context,
421
                    )
422

423
                if current_conversation.sent_drafts is None:
3✔
424
                    current_conversation.sent_drafts = {}
3✔
425

426
                sent_drafts_iter = iter(current_conversation.sent_drafts.copy().items())
3✔
427

428
                with TelegramMessageWatcher(
3✔
429
                    self.hass, self.telegram_entry_id
430
                ) as watcher:
431
                    # All messages but the last are real messages
432
                    for item in (
3✔
433
                        items[:-1]
434
                        if draft and SERVICE_SEND_MESSAGE_DRAFT and self.chat_id > 0
435
                        else items
436
                    ):
437
                        if item.content_type == ContentType.TEXT:
3✔
438
                            text = entities_to_markdownv2(item.text, item.entities)
3✔
439
                        elif item.content_type == ContentType.PHOTO:
×
440
                            caption_md = entities_to_markdownv2(
×
441
                                item.caption_text, item.caption_entities
442
                            )
443
                            text = "Drawing " + caption_md
×
444
                        elif item.content_type == ContentType.FILE:
×
445
                            caption_md = entities_to_markdownv2(
×
446
                                item.caption_text, item.caption_entities
447
                            )
448
                            text = "Writing " + caption_md
×
449
                        else:
450
                            continue
×
451

452
                        disable_notification = draft or item is not items[-1]
3✔
453
                        disable_web_prev = draft or self.config[CONF_DISABLE_WEB_PREV]
3✔
454

455
                        try:
3✔
456
                            message_id, last_text = next(sent_drafts_iter)
3✔
457
                            # Message was already sent, edit it if the text has changed
458
                            if (
×
459
                                text == last_text
460
                                and disable_notification
461
                                and disable_web_prev
462
                            ):
463
                                continue
×
NEW
464
                            if text == last_text:
×
465
                                # Cannot edit messages with a notification
UNCOV
466
                                for message_id, _ in itertools.chain(
×
467
                                    [(message_id, last_text)], sent_drafts_iter
468
                                ):  # Delete this draft and all remaining ones to maintain sequence
469
                                    await self.hass.services.async_call(
×
470
                                        TELEGRAM_DOMAIN,
471
                                        SERVICE_DELETE_MESSAGE,
472
                                        {
473
                                            ATTR_MESSAGE_ID: message_id,
474
                                            **get_telegram_service_target(
475
                                                self.chat_id, self.notify_entity_id
476
                                            ),
477
                                            CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
478
                                        },
479
                                        blocking=True,
480
                                        context=context,
481
                                    )
482

483
                                    current_conversation.sent_drafts.pop(
×
484
                                        message_id, None
485
                                    )
486
                                raise StopIteration  # noqa: TRY301
×
487

488
                            if draft or item.content_type == ContentType.TEXT:
×
489
                                await self.hass.services.async_call(
×
490
                                    TELEGRAM_DOMAIN,
491
                                    SERVICE_EDIT_MESSAGE,
492
                                    {
493
                                        ATTR_MESSAGE: text,
494
                                        **get_telegram_service_target(
495
                                            self.chat_id, self.notify_entity_id
496
                                        ),
497
                                        ATTR_MESSAGE_ID: message_id,
498
                                        CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
499
                                        ATTR_PARSER: "markdownv2",
500
                                        ATTR_DISABLE_WEB_PREV: disable_web_prev,
501
                                    },
502
                                    blocking=True,
503
                                    context=context,
504
                                )
505
                            elif item.content_type in (
×
506
                                ContentType.PHOTO,
507
                                ContentType.FILE,
508
                            ):
509
                                await self.hass.services.async_call(
×
510
                                    TELEGRAM_DOMAIN,
511
                                    SERVICE_EDIT_MESSAGE_MEDIA,
512
                                    {
513
                                        ATTR_FILE: (
514
                                            await self.hass.async_add_executor_job(
515
                                                save_file,
516
                                                item.file_name,
517
                                                item.file_data,
518
                                            )
519
                                        ).as_posix(),
520
                                        ATTR_CAPTION: entities_to_markdownv2(
521
                                            item.caption_text, item.caption_entities
522
                                        ),
523
                                        **get_telegram_service_target(
524
                                            self.chat_id, self.notify_entity_id
525
                                        ),
526
                                        ATTR_MESSAGE_ID: message_id,
527
                                        ATTR_MEDIA_TYPE: InputMediaType.PHOTO
528
                                        if item.content_type == ContentType.PHOTO
529
                                        else InputMediaType.DOCUMENT,
530
                                        CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
531
                                        ATTR_PARSER: "markdownv2",
532
                                    },
533
                                    blocking=True,
534
                                    context=context,
535
                                )
536
                            messages["chats"].append(
×
537
                                {
538
                                    ATTR_CHAT_ID: self.chat_id,
539
                                    ATTR_MESSAGE_ID: message_id,
540
                                }
541
                            )
542
                            current_conversation.sent_drafts.update({message_id: text})
×
543
                        except StopIteration:
3✔
544
                            # Message was not sent yet, create.
545
                            if draft or item.content_type == ContentType.TEXT:
3✔
546
                                item_messages: dict[str, list[dict[str, Any]]] = (
3✔
547
                                    await self.hass.services.async_call(
548
                                        TELEGRAM_DOMAIN,
549
                                        SERVICE_SEND_MESSAGE,
550
                                        {
551
                                            ATTR_MESSAGE: text,
552
                                            **get_telegram_service_target(
553
                                                self.chat_id, self.notify_entity_id
554
                                            ),
555
                                            ATTR_MESSAGE_THREAD_ID: thread_id,
556
                                            CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
557
                                            ATTR_PARSER: "markdownv2",
558
                                            ATTR_DISABLE_NOTIF: disable_notification,
559
                                            ATTR_DISABLE_WEB_PREV: disable_web_prev,
560
                                        },
561
                                        blocking=True,
562
                                        context=context,
563
                                        return_response=True,
564
                                    )  # type: ignore[assignment]
565
                                    or {"chats": []}
566
                                )
567
                            elif item.content_type in (
×
568
                                ContentType.PHOTO,
569
                                ContentType.FILE,
570
                            ):
571
                                item_messages = await self.hass.services.async_call(
×
572
                                    TELEGRAM_DOMAIN,
573
                                    SERVICE_SEND_PHOTO
574
                                    if item.content_type == ContentType.PHOTO
575
                                    else SERVICE_SEND_DOCUMENT,
576
                                    {
577
                                        ATTR_FILE: (
578
                                            await self.hass.async_add_executor_job(
579
                                                save_file,
580
                                                item.file_name,
581
                                                item.file_data,
582
                                            )
583
                                        ).as_posix(),
584
                                        ATTR_CAPTION: entities_to_markdownv2(
585
                                            item.caption_text, item.caption_entities
586
                                        ),
587
                                        **get_telegram_service_target(
588
                                            self.chat_id, self.notify_entity_id
589
                                        ),
590
                                        ATTR_MESSAGE_THREAD_ID: thread_id,
591
                                        CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
592
                                        ATTR_DISABLE_NOTIF: disable_notification,
593
                                        ATTR_PARSER: "markdownv2",
594
                                    },
595
                                    blocking=True,
596
                                    context=context,
597
                                    return_response=True,
598
                                ) or {"chats": []}  # type: ignore[assignment]
599

600
                            messages["chats"].extend(item_messages["chats"])
3✔
601
                            await asyncio.wait_for(
3✔
602
                                asyncio.gather(
603
                                    *(
604
                                        watcher.wait_message(
605
                                            msg[ATTR_CHAT_ID], msg[ATTR_MESSAGE_ID]
606
                                        )
607
                                        for msg in item_messages["chats"]
608
                                    )
609
                                ),
610
                                timeout=10.0,
611
                            )
612
                            current_conversation.sent_drafts.update(
3✔
613
                                {
614
                                    msg[ATTR_MESSAGE_ID]: text
615
                                    for msg in item_messages["chats"]
616
                                    if msg[ATTR_CHAT_ID] == self.chat_id
617
                                }
618
                            )
619

620
                # Delete the remaining draft messages if there are more drafts than current content items
621
                for message_id, _ in sent_drafts_iter:
3✔
622
                    await self.hass.services.async_call(
×
623
                        TELEGRAM_DOMAIN,
624
                        SERVICE_DELETE_MESSAGE,
625
                        {
626
                            ATTR_MESSAGE_ID: message_id,
627
                            **get_telegram_service_target(
628
                                self.chat_id, self.notify_entity_id
629
                            ),
630
                            CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
631
                        },
632
                        blocking=True,
633
                        context=context,
634
                    )
635

636
                    current_conversation.sent_drafts.pop(message_id, None)
×
637

638
                if draft and SERVICE_SEND_MESSAGE_DRAFT and self.chat_id > 0 and items:
3✔
639
                    item = items[-1]
×
640
                    text = entities_to_markdownv2(item.text, item.entities)
×
641

642
                    await self.hass.services.async_call(
×
643
                        TELEGRAM_DOMAIN,
644
                        SERVICE_SEND_MESSAGE_DRAFT,
645
                        {
646
                            ATTR_MESSAGE: text,
647
                            **get_telegram_service_target(
648
                                self.chat_id, self.notify_entity_id
649
                            ),
650
                            ATTR_MESSAGE_THREAD_ID: thread_id,
651
                            CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
652
                            ATTR_PARSER: "markdownv2",
653
                            ATTR_DRAFT_ID: len(items) + 1,
654
                        },
655
                        blocking=True,
656
                        context=context,
657
                    )
658
                elif not draft:
3✔
659
                    current_conversation.sent_drafts = None
3✔
660

661
        except HomeAssistantError as e:
×
662
            if draft and "Flood control exceeded. Retry in " in str(e):
×
NEW
663
                LOGGER.warning(
×
664
                    "Telegram flood control hit for chat_id=%s, thread_id=%s: %s. "
665
                    "Scheduling draft update after retry period.",
666
                    self.chat_id,
667
                    thread_id,
668
                    e,
669
                )
670
                try:
×
671
                    if str(e).endswith(" seconds"):
×
672
                        retry_after = float(
×
673
                            str(e).split("Retry in ")[1].split(" seconds")[0]
674
                        )
675
                    elif str(e).endswith(" minutes"):
×
676
                        retry_after = (
×
677
                            float(str(e).split("Retry in ")[1].split(" minutes")[0])
678
                            * 60
679
                        )
680
                    else:
681
                        raise ValueError(f"Unknown time unit in error message: {e}")  # noqa: TRY301
×
682
                except ValueError:
×
683
                    retry_after = 3.0
×
684
                current_conversation.draft_cancel = self._async_schedule_update_draft(
×
685
                    thread_id,
686
                    retry_after,
687
                    context,
688
                )
689
            else:
NEW
690
                raise
×
691
        finally:
692
            if created_files:
3✔
693
                await self.hass.async_add_executor_job(cleanup_created_files)
×
694

695
        return messages
3✔
696

697
    async def async_handle_text(self, event: Event) -> None:
3✔
698
        """Handle text and attachment events."""
699
        thread_id = event.data.get(ATTR_MESSAGE_THREAD_ID) or 0
3✔
700

701
        current_conversation = self.conversations.setdefault(
3✔
702
            thread_id, ConversationConfig()
703
        )
704

705
        if (task := current_conversation.task) and not task.done():
3✔
706
            task.cancel("Conversation interrupted by new user message.")
×
707
            with contextlib.suppress(asyncio.CancelledError):
×
708
                await task
×
709

710
        task_name = f"telegram_conversation_{self.chat_id}_{thread_id}"
3✔
711
        task = self.entry.async_create_task(
3✔
712
            self.hass,
713
            self.async_process_message(event),
714
            task_name,
715
        )
716
        current_conversation.task = task
3✔
717

718
        def _clear_task(_task: asyncio.Task[None]) -> None:
3✔
719
            """Clear reference to completed task to avoid retaining tracebacks."""
720
            if current_conversation.task is _task:
3✔
721
                current_conversation.task = None
3✔
722
            try:
3✔
723
                if err := _task.exception():
3✔
724
                    LOGGER.error(
×
725
                        "Error in conversation task for chat_id=%s, thread_id=%s: %s",
726
                        self.chat_id,
727
                        thread_id,
728
                        err,
729
                        exc_info=err,
730
                    )
731
                else:
732
                    LOGGER.debug(
3✔
733
                        "Conversation task for chat_id=%s, thread_id=%s completed successfully",
734
                        self.chat_id,
735
                        thread_id,
736
                    )
737
            except asyncio.CancelledError as err:
×
738
                LOGGER.debug(
×
739
                    "Conversation task for chat_id=%s, thread_id=%s was cancelled: %s",
740
                    self.chat_id,
741
                    thread_id,
742
                    err,
743
                )
744

745
        task.add_done_callback(_clear_task)
3✔
746

747
    async def async_process_message(self, event: Event) -> None:
3✔
748
        """Handle conversation task."""
749
        context = self._get_context(event.context, event.data.get(ATTR_USER_ID))
3✔
750

751
        thread_id = event.data.get(ATTR_MESSAGE_THREAD_ID) or 0
3✔
752
        conversation_id = self._get_conversation_id(thread_id)
3✔
753

754
        error: BaseException | None = None
3✔
755

756
        @callback
3✔
757
        def chat_log_delta_listener(chat_log: ChatLog, delta: dict[str, Any]) -> None:
3✔
758
            """Handle chat log delta."""
759

760
            def log_exceptions(task: asyncio.Task[None]) -> None:
3✔
761
                """Log exceptions from the delta listener."""
762
                with contextlib.suppress(asyncio.CancelledError):
3✔
763
                    if err := task.exception():
3✔
764
                        LOGGER.error(
×
765
                            "Error in chat log delta listener for chat_id=%s, thread_id=%s: %s",
766
                            self.chat_id,
767
                            thread_id,
768
                            err,
769
                            exc_info=err,
770
                        )
771

772
            self.entry.async_create_task(
3✔
773
                self.hass,
774
                self.async_chat_log_delta_listener(
775
                    chat_log,
776
                    delta,
777
                    thread_id,
778
                    event.data.get(ATTR_MSGID),
779
                    context,
780
                ),
781
                "async_chat_log_delta_listener",
782
            ).add_done_callback(log_exceptions)
783

784
        with (
3✔
785
            async_get_chat_session(self.hass, conversation_id) as session,
786
            async_get_chat_log(
787
                self.hass,
788
                session,
789
                chat_log_delta_listener=chat_log_delta_listener,
790
            ) as chat_log,
791
        ):
792
            if event.data.get(ATTR_FILE_ID):
3✔
793
                file_path = Path(
×
794
                    (
795
                        await self.hass.services.async_call(  # type: ignore[arg-type]
796
                            TELEGRAM_DOMAIN,
797
                            SERVICE_DOWNLOAD_FILE,
798
                            {
799
                                CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
800
                                ATTR_FILE_ID: event.data[ATTR_FILE_ID],
801
                            },
802
                            blocking=True,
803
                            context=context,
804
                            return_response=True,
805
                        )
806
                    )[ATTR_FILE_PATH]  # type: ignore[index]
807
                )
808

809
                input_text = event.data.get(ATTR_TEXT) or file_path.name
×
810
                chat_log.async_add_user_content(
×
811
                    UserContent(
812
                        input_text,  # Must be exactly same text as in async_converse
813
                        attachments=[
814
                            Attachment(
815
                                media_content_id=f"media-source://{TELEGRAM_DOMAIN}/{event.data.get(ATTR_FILE_ID)}",
816
                                mime_type=event.data.get(ATTR_FILE_MIME_TYPE),  # type: ignore[arg-type]
817
                                path=file_path,
818
                            )
819
                        ],
820
                    )
821
                )
822

823
                def cleanup_file() -> None:
×
824
                    """Cleanup temporary file."""
825
                    file_path.unlink(missing_ok=True)
×
826

827
                @callback
×
828
                def cleanup_file_callback() -> None:
×
829
                    """Cleanup temporary file."""
830
                    self.hass.async_add_executor_job(cleanup_file)
×
831

832
                session.async_on_cleanup(cleanup_file_callback)
×
833
            else:
834
                input_text = event.data.get(ATTR_TEXT) or ""
3✔
835

836
            try:
3✔
837
                await async_converse(
3✔
838
                    self.hass,
839
                    text=input_text,
840
                    conversation_id=session.conversation_id,
841
                    context=context,
842
                    agent_id=self.agent_id,
843
                    extra_system_prompt=self.extra_prompt,
844
                )
845
            except (Exception, asyncio.CancelledError) as e:  # noqa: BLE001
×
846
                error = e
×
847

848
            # Flush any remaining delta
849
            chat_log_delta_listener(chat_log, {"role": None})
3✔
850

851
        timeout = self.config[CONF_CONVERSATION_TIMEOUT]
3✔
852
        session.last_updated = (
3✔
853
            dt_util.utcnow() + timedelta(**timeout) - CONVERSATION_TIMEOUT
854
        )
855

856
        if error:
3✔
857
            if not isinstance(error, asyncio.CancelledError):
×
858
                message = async_translate_message(
×
859
                    self.hass,
860
                    translation_key="conversation_error",
861
                    translation_placeholders={"error": str(error)},
862
                )
863
                await self.async_handle_chat_log_event(
×
864
                    thread_id=thread_id,
865
                    event_type=ChatLogEventType.CONTENT_ADDED,
866
                    data={"content": {"role": "assistant", "content": message}},
867
                    context=context,
868
                )
869
            raise error
×
870

871
    async def async_chat_log_delta_listener(
3✔
872
        self,
873
        chat_log: ChatLog,
874
        delta: dict[str, Any],
875
        thread_id: int,
876
        msg_id: int | None,
877
        context: Context,
878
    ) -> None:
879
        """Handle chat log delta."""
880
        current_conversation = self.conversations[thread_id]
3✔
881

882
        def get_reaction(content: str) -> str | None:
3✔
883
            """Extract reaction from content if it starts with a known reaction emoji."""
884
            matches = [e for e in REACTION_EMOJI if content.lstrip().startswith(e)]
3✔
885
            return max(matches, key=len) if matches else None
3✔
886

887
        async with current_conversation.delta_lock:
3✔
888
            LOGGER.debug("Chat log delta: %s", delta)
3✔
889
            if "role" in delta:
3✔
890
                if current_conversation.draft:
3✔
891
                    if (
3✔
892
                        (
893
                            current_conversation.draft["content"]
894
                            or current_conversation.draft["thinking_content"]
895
                            or current_conversation.draft["tool_calls"]
896
                        )
897
                        and (
898
                            delta["role"]
899
                            or (  # for last content, only commit if it exists in the chat log
900
                                chat_log.content[-1].role == "assistant"
901
                                and current_conversation.draft["content"]
902
                                and (chat_log.content[-1].content or "").endswith(
903
                                    current_conversation.draft["content"]
904
                                )  # endswith is to cater for a possible reaction
905
                            )
906
                        )
907
                    ):
908
                        await self.async_handle_chat_log_event(
3✔
909
                            thread_id=thread_id,
910
                            event_type=ChatLogEventType.CONTENT_ADDED,
911
                            data={"content": current_conversation.draft},
912
                            context=context,
913
                        )
914
                    else:
915
                        # Clear drafts
916
                        await self.send_message(
×
917
                            thread_id=thread_id,
918
                            context=context,
919
                        )
920
                elif delta["role"] is None:
3✔
921
                    # Also clear drafts
922
                    await self.send_message(
3✔
923
                        thread_id=thread_id,
924
                        context=context,
925
                    )
926
                if delta["role"] == "assistant":
3✔
927
                    current_conversation.draft = AssistantContentDeltaDict(
3✔
928
                        role="assistant",
929
                        content="",
930
                        thinking_content="",
931
                        tool_calls=[],
932
                    )
933
                else:
934
                    current_conversation.draft = None
3✔
935

936
                if delta["role"] is None:
3✔
937
                    responded_tool_calls: set[str] = set()
3✔
938
                    async with current_conversation.content_lock:
3✔
939
                        for content in reversed(chat_log.content):
3✔
940
                            if content.role == "tool_result":
3✔
941
                                responded_tool_calls.add(content.tool_call_id)
×
942
                                continue
×
943
                            if content.role == "assistant":
3✔
944
                                for tool_call in content.tool_calls or []:
3✔
945
                                    if tool_call.id not in responded_tool_calls:
×
946
                                        chat_log.async_add_assistant_content_without_tools(
×
947
                                            ToolResultContent(
948
                                                agent_id=self.agent_id or "",
949
                                                tool_call_id=tool_call.id,
950
                                                tool_name=tool_call.tool_name,
951
                                                tool_result={
952
                                                    "error": "asyncio.CancelledError: "
953
                                                    "Conversation interrupted before "
954
                                                    "tool call could be responded to. "
955
                                                    "Please try again."
956
                                                },
957
                                            )
958
                                        )
959
                            break
3✔
960

961
                if current_conversation.draft:
3✔
962
                    # Send typing action at the beginning of each assistant response
963
                    await self.hass.services.async_call(
3✔
964
                        TELEGRAM_DOMAIN,
965
                        SERVICE_SEND_CHAT_ACTION,
966
                        {
967
                            CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
968
                            **get_telegram_service_target(
969
                                self.chat_id,
970
                                self.notify_entity_id,
971
                            ),
972
                            ATTR_MESSAGE_THREAD_ID: thread_id,
973
                            ATTR_CHAT_ACTION: CHAT_ACTION_TYPING,
974
                        },
975
                        context=context,
976
                    )
977

978
            if current_conversation.draft:
3✔
979
                if "content" in delta:
3✔
980
                    if not current_conversation.draft["content"] and (
3✔
981
                        reaction := get_reaction(delta["content"])
982
                    ):
983
                        await self.hass.services.async_call(
×
984
                            TELEGRAM_DOMAIN,
985
                            SERVICE_SET_MESSAGE_REACTION,
986
                            {
987
                                CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
988
                                ATTR_MESSAGE_ID: msg_id or "last",
989
                                ATTR_CHAT_ID: self.chat_id,
990
                                ATTR_REACTION: reaction,
991
                            },
992
                            context=context,
993
                        )
994
                        current_conversation.draft["content"] = (
×
995
                            delta["content"].lstrip().removeprefix(reaction).lstrip()
996
                        )
997
                    else:
998
                        current_conversation.draft["content"] += delta["content"]
3✔
999
                    if not current_conversation.draft_cancel:
3✔
1000
                        current_conversation.draft_cancel = (
3✔
1001
                            self._async_schedule_update_draft(
1002
                                thread_id,
1003
                                1.0,
1004
                                context,
1005
                            )
1006
                        )
1007
                if "thinking_content" in delta:
3✔
1008
                    current_conversation.draft["thinking_content"] += delta[
×
1009
                        "thinking_content"
1010
                    ]
1011
                    if not current_conversation.draft_cancel:
×
1012
                        current_conversation.draft_cancel = (
×
1013
                            self._async_schedule_update_draft(
1014
                                thread_id,
1015
                                1.0,
1016
                                context,
1017
                            )
1018
                        )
1019
                if "tool_calls" in delta:
3✔
1020
                    current_conversation.draft["tool_calls"].extend(delta["tool_calls"])  # type: ignore[union-attr]
×
1021

1022
    async def async_handle_chat_log_event(
3✔
1023
        self,
1024
        thread_id: int,
1025
        event_type: ChatLogEventType,
1026
        data: dict[str, Any],
1027
        context: Context,
1028
    ) -> None:
1029
        """Handle chat log events."""
1030
        # async_subscribe_chat_logs does not provide context, ensure user_id is set
1031
        context = self._get_context(context)
3✔
1032
        current_conversation = self.conversations[thread_id]
3✔
1033
        async with current_conversation.content_lock:
3✔
1034
            if (
3✔
1035
                event_type == ChatLogEventType.CONTENT_ADDED
1036
                and (content := data.get("content"))
1037
                and content.get("role") == "assistant"
1038
                and (message := content.get("content"))
1039
            ):
1040
                if current_conversation.draft_cancel:
3✔
1041
                    current_conversation.draft_cancel()
3✔
1042
                await self.send_message(
3✔
1043
                    message=message,
1044
                    thread_id=thread_id,
1045
                    context=context,
1046
                )
1047

1048
    @callback
3✔
1049
    def _reset_conversation_history(self, thread_id: int) -> None:
3✔
1050
        """Remove the persisted Home Assistant session and chat log for a thread."""
1051
        conversation_id = self._get_conversation_id(thread_id)
3✔
1052

1053
        if (all_sessions := self.hass.data.get(DATA_CHAT_SESSION)) and (
3✔
1054
            session := all_sessions.pop(conversation_id, None)
1055
        ):
1056
            session.async_cleanup()
3✔
1057

1058
        if all_chat_logs := self.hass.data.get(DATA_CHAT_LOGS):
3✔
1059
            all_chat_logs.pop(conversation_id, None)
×
1060

1061
    async def _async_change_agent(self, agent_id: str) -> bool:
3✔
1062
        """Change the conversation agent for this chat."""
1063
        LOGGER.debug(
×
1064
            "Change agent for chat_id=%s to agent_id=%s", self.chat_id, agent_id
1065
        )
1066

1067
        self.agent_id = agent_id
×
1068
        # The above might be redundant because we are reloading the config entry
1069
        subentry = self.entry.subentries[self.subentry_id]
×
1070
        data = {**subentry.data, CONF_CONVERSATION_AGENT: agent_id}
×
1071
        LOGGER.debug("Updating subentry %s with data %s", subentry.subentry_id, data)
×
1072
        try:
×
1073
            self.hass.config_entries.async_update_subentry(
×
1074
                self.entry, subentry, data=data
1075
            )
1076
        except HomeAssistantError as e:
×
1077
            LOGGER.exception(
×
1078
                "Failed to update subentry %s: %s",
1079
                subentry.subentry_id,
1080
                e,
1081
                stack_info=True,
1082
            )
1083
            return False
×
1084
        LOGGER.debug("Subentry %s updated", subentry.subentry_id)
×
1085

1086
        return True
×
1087

1088
    async def async_process_command(
3✔
1089
        self,
1090
        thread_id: int,
1091
        command: str,
1092
        args: list[str],
1093
        context: Context,
1094
    ) -> None:
1095
        """Process a bot command."""
1096
        LOGGER.debug("Received command: %s with args: %s", command, args)
3✔
1097
        match command:
3✔
1098
            case "/model":
3✔
1099
                selected_agent = args[0] if len(args) > 0 else None
×
1100
                current_agent = self.agent_id
×
1101

1102
                agents = {
×
1103
                    agent.id: agent.name
1104
                    for agent in get_agent_manager(self.hass).async_get_agent_info()
1105
                    if not isinstance(
1106
                        get_agent_manager(self.hass).async_get_agent(agent.id),
1107
                        ConversationEntity,
1108
                    )
1109
                } | {
1110
                    entity.entity_id: (
1111
                        self.hass.states.get(entity.entity_id).name  # type: ignore[union-attr]
1112
                        if self.hass.states.get(entity.entity_id)
1113
                        else entity.entity_id
1114
                    )
1115
                    for entity in self.hass.data[DATA_COMPONENT].entities
1116
                }
1117

1118
                LOGGER.debug(
×
1119
                    "Selected agent: %s, current agent: %s, available agents: %s",
1120
                    selected_agent,
1121
                    current_agent,
1122
                    agents,
1123
                )
1124
                if (
×
1125
                    selected_agent is not None
1126
                    and selected_agent in agents
1127
                    and await self._async_change_agent(selected_agent)
1128
                ):
1129
                    LOGGER.debug(
×
1130
                        "Agent switched to %s for chat_id=%s",
1131
                        selected_agent,
1132
                        self.chat_id,
1133
                    )
1134
                    message = async_translate_message(
×
1135
                        self.hass,
1136
                        translation_key="conversation_agent_updated",
1137
                        translation_placeholders={
1138
                            "agent_id": agents.get(selected_agent, selected_agent)
1139
                        },
1140
                    )
1141
                    await self.send_message(
×
1142
                        message=message,
1143
                        thread_id=thread_id,
1144
                        context=context,
1145
                    )
1146
                else:
1147
                    message = async_translate_message(
×
1148
                        self.hass,
1149
                        translation_key="current_conversation_agent",
1150
                        translation_placeholders={
1151
                            "agent_id": agents.get(current_agent, current_agent)
1152
                            if current_agent
1153
                            else "None"
1154
                        },
1155
                    )
1156
                    messages = await self.send_message(
×
1157
                        message=message,
1158
                        thread_id=thread_id,
1159
                        context=context,
1160
                    )
1161
                    if messages["chats"]:
×
1162
                        msg = messages["chats"][-1]
×
1163
                        await self.hass.services.async_call(
×
1164
                            TELEGRAM_DOMAIN,
1165
                            SERVICE_EDIT_REPLYMARKUP,
1166
                            {
1167
                                CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
1168
                                **get_telegram_service_target(
1169
                                    msg[ATTR_CHAT_ID],
1170
                                    self.notify_entity_id,
1171
                                ),
1172
                                ATTR_MESSAGE_ID: msg[ATTR_MESSAGE_ID],
1173
                                ATTR_KEYBOARD_INLINE: [
1174
                                    [(agent_name, f"/model {agent_id}")]
1175
                                    for agent_id, agent_name in agents.items()
1176
                                ],
1177
                            },
1178
                            context=context,
1179
                        )
1180
            case "/new":
3✔
1181
                if current_conversation := self.conversations.get(thread_id):
3✔
1182
                    if (task := current_conversation.task) and not task.done():
×
1183
                        task.cancel(
×
1184
                            "Conversation interrupted by new conversation command."
1185
                        )
1186
                        with contextlib.suppress(asyncio.CancelledError):
×
1187
                            await task
×
1188
                    if current_conversation.draft_cancel:
×
1189
                        current_conversation.draft_cancel()
×
1190
                        current_conversation.draft_cancel = None
×
1191

1192
                self._reset_conversation_history(thread_id)
3✔
1193

1194
                message = async_translate_message(
3✔
1195
                    self.hass, translation_key="new_conversation"
1196
                )
1197
                await self.send_message(
3✔
1198
                    message=message,
1199
                    thread_id=thread_id,
1200
                    context=context,
1201
                )
1202

1203
    async def async_handle_command(self, event: Event) -> None:
3✔
1204
        """Handle command events."""
1205
        context = self._get_context(event.context, event.data.get(ATTR_USER_ID))
×
1206
        await self.async_process_command(
×
1207
            event.data.get(ATTR_MESSAGE_THREAD_ID) or 0,
1208
            event.data["command"].split("@")[0],
1209
            event.data.get("args", []),
1210
            context,
1211
        )
1212

1213
    async def async_handle_callback(self, event: Event) -> None:
3✔
1214
        """Handle callback query events."""
1215
        LOGGER.debug("callback_event data: %s", event.data)
×
1216
        context = self._get_context(event.context, event.data.get(ATTR_USER_ID))
×
1217
        args = event.data.get("data", "").split(" ")
×
1218

1219
        await self.async_process_command(
×
1220
            event.data.get(ATTR_MESSAGE, {}).get(ATTR_MESSAGE_THREAD_ID) or 0,
1221
            args.pop(0),
1222
            args,
1223
            context,
1224
        )
1225
        await self.hass.services.async_call(
×
1226
            TELEGRAM_DOMAIN,
1227
            SERVICE_ANSWER_CALLBACK_QUERY,
1228
            {
1229
                CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
1230
                ATTR_MESSAGE: "Done",
1231
                ATTR_CALLBACK_QUERY_ID: event.data.get(ATTR_MSGID),
1232
            },
1233
            context=context,
1234
        )
1235
        if event.data[ATTR_MSG]:
×
1236
            await self.hass.services.async_call(
×
1237
                TELEGRAM_DOMAIN,
1238
                SERVICE_EDIT_REPLYMARKUP,
1239
                {
1240
                    CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
1241
                    **get_telegram_service_target(
1242
                        self.chat_id,
1243
                        self.notify_entity_id,
1244
                    ),
1245
                    ATTR_MESSAGE_ID: event.data[ATTR_MSG][ATTR_MESSAGE_ID],
1246
                    ATTR_KEYBOARD_INLINE: [],
1247
                },
1248
                context=context,
1249
            )
1250

1251
    async def handle_generate_image_intent(
3✔
1252
        self, event: Event, context: Context, prompt: str
1253
    ) -> str:
1254
        """Handle the generate image intent."""
1255
        LOGGER.debug("generate_image_intent event data: %s", event.data)
×
1256
        context = self._get_context(context, event.data.get(ATTR_USER_ID))
×
1257

1258
        thread_id = event.data.get(ATTR_MESSAGE_THREAD_ID) or 0
×
1259

1260
        await self.hass.services.async_call(
×
1261
            TELEGRAM_DOMAIN,
1262
            SERVICE_SEND_CHAT_ACTION,
1263
            {
1264
                CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
1265
                **get_telegram_service_target(
1266
                    self.chat_id,
1267
                    self.notify_entity_id,
1268
                ),
1269
                ATTR_MESSAGE_THREAD_ID: thread_id,
1270
                ATTR_CHAT_ACTION: CHAT_ACTION_UPLOAD_PHOTO,
1271
            },
1272
            context=context,
1273
        )
1274

1275
        result: dict[str, Any] = await async_generate_image(
×
1276
            self.hass,
1277
            task_name=DOMAIN,
1278
            entity_id=self.config.get(CONF_AI_TASK),
1279
            instructions=prompt,
1280
        )  # type: ignore[assignment]
1281

1282
        media = await async_resolve_media(self.hass, result["media_source_id"], None)
×
1283

1284
        await self.hass.services.async_call(
×
1285
            TELEGRAM_DOMAIN,
1286
            SERVICE_SEND_PHOTO,
1287
            {
1288
                ATTR_FILE: media.path.as_posix(),  # type: ignore[union-attr]
1289
                ATTR_CAPTION: markdownify(result.get("revised_prompt") or ""),
1290
                **get_telegram_service_target(self.chat_id, self.notify_entity_id),
1291
                ATTR_MESSAGE_THREAD_ID: thread_id,
1292
                CONF_CONFIG_ENTRY_ID: self.telegram_entry_id,
1293
                ATTR_PARSER: "markdownv2",
1294
            },
1295
            blocking=True,
1296
            context=context,
1297
            return_response=True,
1298
        )
1299

1300
        message = "The image has been generated and sent to the user."
×
1301
        if (
×
1302
            revised_prompt := result.get("revised_prompt")
1303
        ) and revised_prompt != prompt:
1304
            message += f" Revised prompt: {revised_prompt}"
×
1305

1306
        return message
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc