• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

popstas / telegram-mention-resender / 16632799616

30 Jul 2025 08:15PM UTC coverage: 86.827% (+0.3%) from 86.535%
16632799616

push

github

web-flow
feat: Add Langfuse tracing support (#32)

* Use langfuse openai integration

* Refactor Langfuse logging

* Fix Langfuse env setup and tracing

468 of 539 relevant lines covered (86.83%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

93.2
src/app.py
1
import asyncio
1✔
2
import logging
1✔
3
from typing import List
1✔
4

5
from telethon import TelegramClient, events, types
1✔
6

7
from . import langfuse_utils, prompts, telegram_utils
1✔
8
from .config import Instance, get_api_credentials, load_config, load_instances
1✔
9
from .prompts import Prompt, match_prompt
1✔
10
from .stats import stats as global_stats
1✔
11
from .telegram_utils import (
1✔
12
    find_word,
13
    get_chat_name,
14
    get_folders_chat_ids,
15
    get_forward_message_text,
16
    get_message_url,
17
    mute_chats_from_folders,
18
    normalize_chat_ids,
19
    resolve_entities,
20
    word_in_text,
21
)
22

23
logger = logging.getLogger(__name__)
1✔
24

25
client: TelegramClient | None = None
1✔
26
config: dict = {}
1✔
27
instances: List[Instance] = []
1✔
28

29
langfuse = None
1✔
30

31
# Use shared stats tracker
32
stats = global_stats
1✔
33

34
NEGATIVE_REACTIONS = {"👎"}  # thumbs down
1✔
35
POSITIVE_REACTIONS = {"👍"}  # thumbs up
1✔
36

37

38
def setup_logging(level: str = "info") -> None:
1✔
39
    """Configure logging for the application."""
40
    numeric_level = getattr(logging, level.upper(), logging.INFO)
1✔
41
    logging.basicConfig(level=numeric_level, format="%(levelname)s - %(message)s")
1✔
42
    logging.getLogger("telethon").setLevel(logging.WARNING)
1✔
43
    logging.getLogger("httpx").setLevel(logging.WARNING)
1✔
44
    logging.getLogger("httpcore").setLevel(logging.WARNING)
1✔
45
    logging.getLogger("openai").setLevel(logging.WARNING)
1✔
46

47

48
async def update_instance_chat_ids(instance: Instance, first_run: bool = False) -> None:
1✔
49
    """Refresh chat IDs for a single instance."""
50
    new_ids = await get_folders_chat_ids(instance.folders)
1✔
51
    new_ids.update(instance.chat_ids)
1✔
52
    new_ids.update(await resolve_entities(instance.entities))
1✔
53
    instance.chat_ids = await normalize_chat_ids(new_ids)
1✔
54
    if instance.folder_mute:
1✔
55
        await mute_chats_from_folders(instance.folders)
1✔
56
    log_level = logging.INFO if first_run else logging.DEBUG
1✔
57
    logger.log(
1✔
58
        log_level,
59
        "Instance '%s': listening to %d chats from %d folders and %d entities",
60
        instance.name,
61
        len(instance.chat_ids),
62
        len(instance.folders),
63
        len(instance.entities),
64
    )
65

66

67
async def rescan_loop(instance: Instance, interval: int = 3600) -> None:
1✔
68
    """Periodically rescan folders for chat IDs."""
69
    global config
70
    while True:
1✔
71
        await asyncio.sleep(interval)
1✔
72
        config = load_config()
1✔
73
        prompts.config.update(config)
1✔
74
        await update_instance_chat_ids(instance, False)
1✔
75

76

77
async def process_message(inst: Instance, event: events.NewMessage.Event) -> None:
1✔
78
    """Handle a new message for a specific instance."""
79
    message = event.message
1✔
80
    if message.raw_text and word_in_text(inst.ignore_words, message.raw_text):
1✔
81
        logger.debug(
1✔
82
            "Ignoring message %s for %s due to ignore_words",
83
            message.id,
84
            inst.name,
85
        )
86
        return
1✔
87
    stats.increment(inst.name)
1✔
88
    chat_name = await get_chat_name(event.chat_id, safe=True)
1✔
89
    forward = False
1✔
90
    used_word: str | None = None
1✔
91
    used_prompt: Prompt | None = None
1✔
92
    used_score = 0
1✔
93
    used_fragment: str | None = None
1✔
94

95
    if message.raw_text:
1✔
96
        w = find_word(inst.words, message.raw_text)
1✔
97
        if w:
1✔
98
            forward = True
1✔
99
            used_word = w
1✔
100
        else:
101
            for p in inst.prompts:
1✔
102
                res = await match_prompt(p, message.raw_text, inst.name, chat_name)
1✔
103
                sc = res.similarity
1✔
104
                if sc > used_score:
1✔
105
                    used_score = sc
1✔
106
                    used_prompt = p
1✔
107
                    used_fragment = res.main_fragment
1✔
108
                if sc >= (p.threshold or 4):
1✔
109
                    forward = True
1✔
110
                    break
1✔
111
    if forward:
1✔
112
        try:
1✔
113
            text = await get_forward_message_text(
1✔
114
                message,
115
                prompt=used_prompt,
116
                score=used_score,
117
                word=used_word,
118
                fragment=used_fragment,
119
            )
120
            destinations = []
1✔
121
            dest_names = []
1✔
122
            if inst.target_chat is not None:
1✔
123
                destinations.append(inst.target_chat)
1✔
124
                dest_names.append(await get_chat_name(inst.target_chat, safe=True))
1✔
125
            if inst.target_entity:
1✔
126
                destinations.append(inst.target_entity)
1✔
127
                dest_names.append(await get_chat_name(inst.target_entity, safe=True))
1✔
128
            for dest, dname in zip(destinations, dest_names):
1✔
129
                await client.send_message(dest, text)
1✔
130
                forwarded = await message.forward_to(dest)
1✔
131
                f_url = get_message_url(forwarded) if forwarded else None
1✔
132
                logger.info(
1✔
133
                    "Forwarded message %s from %s to %s for %s (target url: %s)",
134
                    message.id,
135
                    chat_name,
136
                    dname,
137
                    inst.name,
138
                    f_url,
139
                )
140
        except Exception as exc:  # pylint: disable=broad-except
×
141
            logger.error("Failed to forward message: %s", exc)
×
142
    else:
143
        logger.debug(
×
144
            "Message %s from %s not forwarded for %s",
145
            message.id,
146
            chat_name,
147
            inst.name,
148
        )
149

150

151
async def handle_reaction(update: "types.UpdateMessageReactions") -> None:
1✔
152
    """Forward reacted messages to true/false positive entities."""
153

154
    if not update or not hasattr(update, "reactions"):
1✔
155
        return
×
156

157
    emojis: list[str] = []
1✔
158
    for rc in getattr(update.reactions, "results", []):
1✔
159
        reaction = getattr(rc, "reaction", None)
1✔
160
        if isinstance(reaction, types.ReactionEmoji):
1✔
161
            emojis.append(reaction.emoticon)
1✔
162

163
    positive = any(e in POSITIVE_REACTIONS for e in emojis)
1✔
164
    negative = any(e in NEGATIVE_REACTIONS for e in emojis)
1✔
165
    if not (positive or negative):
1✔
166
        return
×
167

168
    peer_id = await telegram_utils.to_event_chat_id(update.peer)
1✔
169
    for inst in instances:
1✔
170
        if not inst.target_entity:
1✔
171
            continue
×
172
        entity = await telegram_utils.get_entity(inst.target_entity)
1✔
173
        target_id = await telegram_utils.to_event_chat_id(entity)
1✔
174
        if peer_id != target_id:
1✔
175
            continue
×
176

177
        dest = None
1✔
178
        if positive:
1✔
179
            dest = inst.true_positive_entity
1✔
180
        elif negative:
1✔
181
            dest = inst.false_positive_entity
1✔
182
        if not dest:
1✔
183
            continue
×
184

185
        message = await client.get_messages(update.peer, ids=update.msg_id)
1✔
186
        if not message:
1✔
187
            return
×
188
        forwarded = await message.forward_to(dest)
1✔
189
        f_url = get_message_url(forwarded) if forwarded else None
1✔
190
        logger.info(
1✔
191
            "Forwarded message %s from %s to %s for %s (target url: %s)",
192
            message.id,
193
            inst.target_entity,
194
            dest,
195
            inst.name,
196
            f_url,
197
        )
198
        break
1✔
199

200

201
async def main() -> None:
1✔
202
    global client, instances, config
203
    config = load_config()
1✔
204
    prompts.config.update(config)
1✔
205
    global langfuse
206
    langfuse = langfuse_utils.init_langfuse(config)
1✔
207
    prompts.langfuse = langfuse
1✔
208

209
    setup_logging(config.get("log_level", "info"))
1✔
210

211
    api_id, api_hash, session_name = get_api_credentials(config)
1✔
212

213
    client = TelegramClient(session_name, api_id, api_hash)
1✔
214
    telegram_utils.client = client
1✔
215
    await client.start()
1✔
216

217
    prompts.stats = stats
1✔
218

219
    instances = await load_instances(config)
1✔
220
    for inst in instances:
1✔
221
        await update_instance_chat_ids(inst, True)
1✔
222
        asyncio.create_task(rescan_loop(inst))
1✔
223

224
    @client.on(events.Raw(types.UpdateMessageReactions))
1✔
225
    async def reaction_event_handler(update) -> None:
1✔
226
        await handle_reaction(update)
×
227

228
    @client.on(events.NewMessage)
1✔
229
    async def handler(event: events.NewMessage.Event) -> None:
1✔
230
        username = getattr(getattr(event.message, "sender", None), "username", None)
1✔
231
        if username and username.lower() in [
1✔
232
            u.lower() for u in config.get("ignore_usernames", [])
233
        ]:
234
            logger.debug("Ignoring message from @%s", username)
1✔
235
            return
1✔
236

237
        for inst in instances:
1✔
238
            if event.chat_id in inst.chat_ids:
1✔
239
                await process_message(inst, event)
1✔
240

241
    await client.run_until_disconnected()
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc