• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

gcivil-nyu-org / team2-wed-spring25 / 859

23 Apr 2025 01:52PM UTC coverage: 87.903% (-3.0%) from 90.945%
859

Pull #360

travis-pro

web-flow
Merge 1161be77f into 804524f90
Pull Request #360: Develop yd debug

45 of 134 new or added lines in 7 files covered. (33.58%)

29 existing lines in 2 files now uncovered.

3001 of 3414 relevant lines covered (87.9%)

0.88 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

33.33
/backend/nightwalkers/notifications/services.py
1
import firebase_admin
1✔
2
from firebase_admin import messaging
1✔
3
from django.conf import settings
1✔
4
from django.contrib.auth import get_user_model
1✔
5
from asgiref.sync import sync_to_async
1✔
6
import asyncio
1✔
7
import os
1✔
8
import json
1✔
9
from pathlib import Path
1✔
10
import sys
1✔
11

12
BASE_DIR = Path(__file__).resolve().parent.parent
1✔
13

14
# Load from environment variables
15
FIREBASE_CONFIG = {
1✔
16
    "type": "service_account",
17
    "project_id": os.getenv("NEXT_PUBLIC_FIREBASE_PROJECT_ID"),
18
    "private_key_id": os.getenv("NEXT_PUBLIC_FIREBASE_PRIVATE_KEY_ID"),
19
    "private_key": os.getenv("NEXT_PUBLIC_FIREBASE_PRIVATE_KEY").replace("\\n", "\n"),
20
    "client_email": os.getenv("NEXT_PUBLIC_FIREBASE_CLIENT_EMAIL"),
21
    "client_id": os.getenv("NEXT_PUBLIC_FIREBASE_CLIENT_ID"),
22
    "auth_uri": "https://accounts.google.com/o/oauth2/auth",
23
    "token_uri": "https://oauth2.googleapis.com/token",
24
    "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
25
    "client_x509_cert_url": os.getenv("NEXT_PUBLIC_FIREBASE_CERT_URL"),
26
    "universe_domain": "googleapis.com",
27
}
28

29
CREDS_PATH = BASE_DIR / "firebase-credentials.json"
1✔
30
with open(CREDS_PATH, "w") as f:
1✔
31
    json.dump(FIREBASE_CONFIG, f)
1✔
32
FIREBASE_CREDENTIALS_PATH = CREDS_PATH
1✔
33

34

35
class NotificationService:
1✔
36
    _initialized = False
1✔
37

38
    def __init__(self):
1✔
NEW
39
        if not self._initialized:
×
NEW
40
            self._initialize()
×
41

42
    def _initialize(self):
1✔
NEW
43
        if not firebase_admin._apps:
×
NEW
44
            cred = firebase_admin.credentials.Certificate(
×
45
                settings.FIREBASE_CREDENTIALS_PATH
46
            )
NEW
47
            firebase_admin.initialize_app(cred)
×
NEW
48
        self.User = None  # Will be loaded lazily
×
NEW
49
        self._initialized = True
×
50

51
    def _get_user_model(self):
1✔
NEW
52
        if self.User is None:
×
NEW
53
            self.User = get_user_model()
×
NEW
54
        return self.User
×
55

56
    async def asend_to_user(self, user_id, title, body, data=None):
1✔
57
        """Async version of send_to_user"""
NEW
58
        try:
×
NEW
59
            User = self._get_user_model()
×
NEW
60
            user = await sync_to_async(User.objects.get)(id=user_id)
×
NEW
61
            if not user.fcm_token:
×
NEW
62
                return False
×
63

NEW
64
            message = messaging.Message(
×
65
                notification=messaging.Notification(title=title, body=body),
66
                data=data or {},
67
                token=user.fcm_token,
68
            )
69

70
            # Run synchronous FCM send in thread pool
NEW
71
            return await asyncio.to_thread(messaging.send, message)
×
NEW
72
        except Exception as e:
×
NEW
73
            print(f"Async notification error: {str(e)}")
×
NEW
74
            return False
×
75

76
    def send_to_user(self, user_id, title, body, data=None):
1✔
77
        """Synchronous version"""
NEW
78
        try:
×
NEW
79
            User = self._get_user_model()
×
NEW
80
            user = User.objects.get(id=user_id)
×
NEW
81
            if not user.fcm_token:
×
NEW
82
                return False
×
83

NEW
84
            message = messaging.Message(
×
85
                notification=messaging.Notification(title=title, body=body),
86
                data=data or {},
87
                token=user.fcm_token,
88
            )
NEW
89
            messaging.send(message)
×
NEW
90
            return True
×
NEW
91
        except Exception as e:
×
NEW
92
            print(f"Notification error: {str(e)}")
×
NEW
93
            return False
×
94

95
    async def abroadcast_to_users(self, user_ids, title, body, data=None):
1✔
96
        """Async version of broadcast"""
NEW
97
        try:
×
NEW
98
            User = self._get_user_model()
×
99

100
            # Async ORM query
NEW
101
            tokens = await sync_to_async(list)(
×
102
                User.objects.filter(
103
                    id__in=user_ids, fcm_token__isnull=False
104
                ).values_list("fcm_token", flat=True)
105
            )
106

NEW
107
            if not tokens:
×
NEW
108
                return 0
×
109

NEW
110
            message = messaging.MulticastMessage(
×
111
                notification=messaging.Notification(title=title, body=body),
112
                data=data or {},
113
                tokens=tokens,
114
            )
115

116
            # Run synchronous FCM send in thread pool
NEW
117
            response = await asyncio.to_thread(messaging.send_multicast, message)
×
NEW
118
            return response.success_count
×
NEW
119
        except Exception as e:
×
NEW
120
            print(f"Async broadcast error: {str(e)}")
×
NEW
121
            return 0
×
122

123
    def broadcast_to_users(self, user_ids, title, body, data=None):
1✔
124
        """Synchronous version"""
NEW
125
        try:
×
NEW
126
            User = self._get_user_model()
×
127

NEW
128
            tokens = list(
×
129
                User.objects.filter(
130
                    id__in=user_ids, fcm_token__isnull=False
131
                ).values_list("fcm_token", flat=True)
132
            )
133

NEW
134
            if not tokens:
×
NEW
135
                return 0
×
136

NEW
137
            message = messaging.MulticastMessage(
×
138
                notification=messaging.Notification(title=title, body=body),
139
                data=data or {},
140
                tokens=tokens,
141
            )
NEW
142
            response = messaging.send_multicast(message)
×
NEW
143
            return response.success_count
×
NEW
144
        except Exception as e:
×
NEW
145
            print(f"Broadcast error: {str(e)}")
×
NEW
146
            return 0
×
147

148

149
if "test" not in sys.argv:
1✔
NEW
150
    notification_service = NotificationService()
×
151
else:
152
    notification_service = None  # or a mock
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc