• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

gcivil-nyu-org / fall24-monday-team2 / 488

16 Dec 2024 11:56PM UTC coverage: 82.626% (+3.4%) from 79.234%
488

Pull #167

travis-pro

web-flow
Merge 815598c66 into f38dc7d7f
Pull Request #167: User trainer access

264 of 321 new or added lines in 3 files covered. (82.24%)

52 existing lines in 3 files now uncovered.

4028 of 4875 relevant lines covered (82.63%)

0.83 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

97.63
/FitOn/test.py
1
from datetime import datetime
1✔
2
from django.test import TestCase, Client, override_settings, RequestFactory
1✔
3

4
from django.core.files.uploadedfile import SimpleUploadedFile
1✔
5

6
from unittest.mock import patch, MagicMock, AsyncMock
1✔
7
from datetime import timedelta
1✔
8
from FitOn import views
1✔
9
import pandas as pd
1✔
10

11
from django.contrib.messages.middleware import MessageMiddleware
1✔
12
from django.contrib.sessions.middleware import SessionMiddleware
1✔
13
from django.urls import reverse
1✔
14
import boto3
1✔
15
import json
1✔
16
import uuid
1✔
17

18
import io
1✔
19
import asyncio
1✔
20
import sys
1✔
21
from FitOn.rds import (
1✔
22
    convert_to_mysql_datetime,
23
)  # Adjust the import path based on your project structure
24
from unittest import IsolatedAsyncioTestCase
1✔
25
from FitOn.rds import (
1✔
26
    get_secret_rds,
27
    create_connection,
28
    create_steps_table,
29
    create_glucose_table,
30
    create_heartRate_table,
31
    create_oxygen_table,
32
    create_pressure_table,
33
    create_restingHeartRate_table,
34
    insert_into_steps_table,
35
    insert_into_glucose_table,
36
    insert_into_heartRate_table,
37
    insert_into_oxygen_table,
38
    insert_into_pressure_table,
39
    insert_into_restingHeartRate_table,
40
    show_table,
41
    rds_main,
42
    fetch_user_data,
43
    table_exists,
44
    insert_into_tables,
45
    show_tables,
46
)
47
import aiomysql
1✔
48
from FitOn.rds import create_table, insert_data
1✔
49

50
# import unittest
51

52
# from django.contrib.auth.models import User
53
from django.contrib.auth.tokens import default_token_generator
1✔
54

55
# from django.contrib.auth.hashers import make_password
56

57
# from django.contrib.auth.hashers import check_password
58

59
# from django.contrib.sessions.models import Session
60
# from django.utils import timezone
61
from django.utils.http import urlsafe_base64_encode
1✔
62
from django.utils.encoding import force_bytes
1✔
63
from django.core import mail
1✔
64

65
# from django.conf import settings
66
from importlib import reload, import_module
1✔
67
from pathlib import Path
1✔
68

69
# from django.utils import timezone
70

71

72
from .dynamodb import (
1✔
73
    create_user,
74
    delete_user_by_username,
75
    get_user_by_email,
76
    get_user_by_uid,
77
    get_user,
78
    update_user_password,
79
    update_user,
80
    create_thread,
81
    delete_threads_by_user,
82
    get_thread,
83
    delete_thread_by_id,
84
    fetch_all_threads,
85
    fetch_thread,
86
    create_post,
87
    fetch_filtered_threads,
88
    fetch_all_users,
89
    get_thread_details,
90
    delete_post,
91
    get_section_stats,
92
    verify_user_credentials,
93
    get_users_by_username_query,
94
    get_sleep_user_goals,
95
    get_weight_user_goals,
96
    get_step_user_goals,
97
    get_activity_user_goals,
98
    get_custom_user_goals,
99
    add_fitness_trainer_application,
100
    get_fitness_trainer_applications,
101
    make_fitness_trainer,
102
    remove_fitness_trainer,
103
    send_data_request_to_user,
104
    cancel_data_request_to_user,
105
    get_standard_users,
106
    calculate_age_group,
107
    MockUser,
108
    # users_table,
109
    get_last_reset_request_time,
110
    update_reset_request_time,
111
    save_chat_message,
112
    get_users_without_specific_username,
113
    get_chat_history_from_db,
114
    get_users_with_chat_history,
115
    add_to_list,
116
    remove_from_list,
117
)
118
from botocore.exceptions import ClientError, ValidationError
1✔
119
import pytz
1✔
120

121
# from django.contrib import messages
122
from django.contrib.messages import get_messages
1✔
123
from .forms import (
1✔
124
    SignUpForm,
125
    SetNewPasswordForm,
126
    ProfileForm,
127
    validate_file_extension,
128
)
129
from .views import (
1✔
130
    homepage,
131
    add_message,
132
    perform_redirect,
133
    login,
134
    custom_logout,
135
    signup,
136
    forum_view,
137
    warn_action,
138
    dismiss_warning,
139
    authorize_google_fit,
140
    callback_google_fit,
141
    delink_google_fit,
142
    get_metric_data,
143
    fetch_all_metric_data,
144
    format_bod_fitness_data,
145
    process_dynamo_data,
146
    parse_millis,
147
    get_group_key,
148
    merge_data,
149
    steps_barplot,
150
    resting_heartrate_plot,
151
    activity_plot,
152
    oxygen_plot,
153
    glucose_plot,
154
    pressure_plot,
155
    fetch_metric_data,
156
    get_sleep_scores,
157
    heartrate_plot,
158
    deactivate_account,
159
    confirm_deactivation,
160
    create_room_id,
161
    create_group_chat,
162
    group_chat,
163
    mark_messages_as_read,
164
)
165
from django.contrib.auth.hashers import check_password, make_password
1✔
166
from channels.testing import WebsocketCommunicator
1✔
167
from .models import GroupChatMember, Exercise, MuscleGroup
1✔
168
from FitOn.asgi import application
1✔
169
from boto3.dynamodb.conditions import Key
1✔
170

171
# from django.contrib.sessions.middleware import SessionMiddleware
172
# from django.test import RequestFactory
173
# Setup DynamoDB resource to interact with your existing tables
174
dynamodb = boto3.resource("dynamodb", region_name="us-west-2")
1✔
175

176
# Assuming you have the following tables in DynamoDB
177
applications_table = dynamodb.Table("FitnessTrainerApplications")
1✔
178
users_table = dynamodb.Table("Users")
1✔
179
fitness_trainers_table = dynamodb.Table("FitnessTrainers")
1✔
180

181

182
class UserCreationAndDeletionTests(TestCase):
1✔
183
    def setUp(self):
1✔
184
        # Initialize DynamoDB resource and Users table
185
        self.dynamodb = boto3.resource("dynamodb", region_name="us-west-2")
1✔
186
        self.users_table = self.dynamodb.Table("Users")
1✔
187

188
        # Define user data for the test
189
        self.user_data = {
1✔
190
            "user_id": "test_user_123",
191
            "username": "test_user123",
192
            "email": "test_user@example.com",
193
            "name": "Test User",
194
            "date_of_birth": "1990-01-01",
195
            "gender": "O",
196
            "height": "183",
197
            "weight": "83",
198
            "password": "hashed_password",
199
        }
200

201
    def test_create_user(self):
1✔
202
        # Step 1: Create the user
203
        result = create_user(**self.user_data)
1✔
204
        self.assertTrue(result, "User creation failed.")
1✔
205

206
        # Verify that the user was added by retrieving it from DynamoDB
207
        response = self.users_table.get_item(Key={"user_id": self.user_data["user_id"]})
1✔
208
        self.assertIn(
1✔
209
            "Item", response, "User was not found in DynamoDB after creation."
210
        )
211
        user = response["Item"]
1✔
212
        self.assertEqual(user["username"], self.user_data["username"])
1✔
213

214
    def test_get_user_by_email(self):
1✔
215
        # Ensure the user exists before testing retrieval
216
        create_user(**self.user_data)
1✔
217

218
        # Test get_user_by_email
219
        user_by_email = get_user_by_email(self.user_data["email"])
1✔
220
        self.assertIsNotNone(user_by_email, "get_user_by_email did not find the user.")
1✔
221
        self.assertEqual(
1✔
222
            user_by_email.email, self.user_data["email"], "Emails do not match."
223
        )
224
        self.assertEqual(
1✔
225
            user_by_email.username,
226
            self.user_data["username"],
227
            "Usernames do not match.",
228
        )
229

230
    def test_get_user_by_uid(self):
1✔
231
        # Ensure the user exists before testing retrieval
232
        create_user(**self.user_data)
1✔
233

234
        # Test get_user_by_uid
235
        user_by_uid = get_user_by_uid(self.user_data["user_id"])
1✔
236
        uid = user_by_uid.get("user_id")
1✔
237
        self.assertIsNotNone(user_by_uid, "get_user_by_uid did not find the user.")
1✔
238
        self.assertEqual(uid, self.user_data["user_id"], "User IDs do not match.")
1✔
239
        username = user_by_uid.get("username")
1✔
240
        self.assertEqual(username, self.user_data["username"])
1✔
241

242
        uid = "abcdef"
1✔
243
        user = get_user_by_uid(uid)
1✔
244
        self.assertIsNone(user, "abcdef user is not none!")
1✔
245

246
    def test_get_user(self):
1✔
247
        # Step 1: Ensure the user exists by calling create_user
248
        create_result = create_user(**self.user_data)
1✔
249
        self.assertTrue(create_result, "User creation failed.")
1✔
250

251
        # Step 2: Call get_user to retrieve the user by user_id
252
        retrieved_user = get_user(self.user_data["user_id"])
1✔
253

254
        # Step 3: Verify the retrieved user matches the expected data
255
        self.assertIsNotNone(retrieved_user, "get_user did not find the user.")
1✔
256
        self.assertEqual(
1✔
257
            retrieved_user["user_id"],
258
            self.user_data["user_id"],
259
            "User IDs do not match.",
260
        )
261
        self.assertEqual(
1✔
262
            retrieved_user["username"],
263
            self.user_data["username"],
264
            "Usernames do not match.",
265
        )
266
        self.assertEqual(
1✔
267
            retrieved_user["email"], self.user_data["email"], "Emails do not match."
268
        )
269

270
    def test_update_user_password(self):
1✔
271
        # Step 1: Update the user's password
272
        new_password = "new_secure_password"
1✔
273
        update_result = update_user_password(self.user_data["user_id"], new_password)
1✔
274
        self.assertIsNotNone(update_result, "Password update failed.")
1✔
275

276
        # Step 2: Retrieve the user to verify the password update
277
        updated_user = get_user(self.user_data["user_id"])
1✔
278
        self.assertIsNotNone(updated_user, "User not found after password update.")
1✔
279

280
        # Step 3: Verify the password was updated correctly
281
        is_password_correct = check_password(new_password, updated_user["password"])
1✔
282
        self.assertTrue(is_password_correct, "The password was not updated correctly.")
1✔
283

284
    def test_verify_false_credentials(self):
1✔
285
        username = "wasd"
1✔
286
        password = "wasd"
1✔
287
        result = verify_user_credentials(username, password)
1✔
288
        self.assertIsNone(result, "User was found. ")
1✔
289

290
    def test_verify_true_credentials(self):
1✔
291
        username = "sg8002"
1✔
292
        password = "sg8002"
1✔
293
        result = verify_user_credentials(username, password)
1✔
294
        self.assertIsNotNone(result, "Invalid credentials. ")
1✔
295

296
    def test_get_users_by_username_query(self):
1✔
297
        query = "sg8002"
1✔
298
        results = get_users_by_username_query(query)
1✔
UNCOV
299
        assert len(results) > 0, "Query should return 1 user"
×
300

301
    # def test_make_fitness_trainer(self):
302
    #     user = get_user_by_username("sg8002")
303
    #     uid = user.get("user_id")
304
    #     make_fitness_trainer(uid)
305

306
    #     is_FT = user.get("is_fitness_trainer")
307
    #     self.assertTrue(is_FT, "sg8002 is not a Fitness Trainer")
308

309
    #     remove_fitness_trainer(uid)
310
    #     is_FT = user.get("is_fitness_trainer")
311
    #     self.assertFalse(is_FT, "sg8002 is still a Fitness Trainer")
312

313
    def test_calculate_age_group(self):
1✔
314
        test_cases = [
1✔
315
            ("2015-06-15", "Child"),  # Age: 9
316
            ("2007-01-01", "Teenager"),  # Age: 17
317
            ("1995-11-20", "Young Adult"),  # Age: 29
318
            ("1980-05-10", "Middle-aged"),  # Age: 44
319
            ("1950-12-25", "Senior"),  # Age: 74
320
            ("1940-01-01", "Elderly"),  # Age: 84
321
        ]
322

323
        for date_of_birth, expected_group in test_cases:
1✔
324
            result = calculate_age_group(date_of_birth)
1✔
325
            assert (
1✔
326
                result == expected_group
327
            ), f"Expected {expected_group}, got {result} for DOB {date_of_birth}"
328

329
        invalid_cases = [
1✔
330
            ("invalid-date", "Unknown"),  # Invalid date format
331
            ("", "Unknown"),  # Empty string
332
            (None, "Unknown"),  # None as input
333
        ]
334

335
        for date_of_birth, expected_group in invalid_cases:
1✔
336
            result = calculate_age_group(date_of_birth)
1✔
337
            assert (
1✔
338
                result == expected_group
339
            ), f"Expected {expected_group}, got {result} for DOB {date_of_birth}"
340

341
    def test_update_user(self):
1✔
342
        # Step 1: Define the updates
343
        update_data = {
1✔
344
            "email": {"Value": "updated_user@example.com"},
345
            "name": {"Value": "Updated-Test User"},
346
            "gender": {"Value": "F"},
347
        }
348

349
        # Step 2: Call update_user
350
        update_result = update_user(self.user_data["user_id"], update_data)
1✔
351
        self.assertIsNotNone(update_result, "User update failed.")
1✔
352

353
        # Step 3: Retrieve the user to verify updates
354
        updated_user = get_user(self.user_data["user_id"])
1✔
355
        self.assertIsNotNone(updated_user, "User not found after update.")
1✔
356

357
        # Step 4: Check that the updated fields match the expected values
358
        self.assertEqual(
1✔
359
            updated_user["email"], update_data["email"]["Value"], "Email update failed."
360
        )
361
        self.assertEqual(
1✔
362
            updated_user["name"], update_data["name"]["Value"], "Name update failed."
363
        )
364
        self.assertEqual(
1✔
365
            updated_user["gender"],
366
            update_data["gender"]["Value"],
367
            "Gender update failed.",
368
        )
369

370
    def test_toggle_ban_user(self):
1✔
371
        # Step 1: Create the user
372
        create_user(**self.user_data)
1✔
373

374
        # Retrieve the user from DynamoDB to ensure the user is created
375
        user = get_user_by_uid("test_user_123")
1✔
376
        username = user.get("username")
1✔
377

378
        # Step 2: Ban the user by toggling `is_banned`
379
        response = self.client.post(
1✔
380
            "/ban_user/",
381
            data=json.dumps({"user_id": username}),
382
            content_type="application/json",
383
            HTTP_X_REQUESTED_WITH="XMLHttpRequest",
384
        )
385

386
        # Step 3: Manually retrieve the user from DynamoDB to verify `is_banned` is True
387
        response = self.users_table.get_item(Key={"user_id": self.user_data["user_id"]})
1✔
388
        self.assertIn("Item", response, "User was not found in DynamoDB after banning.")
1✔
389
        updated_user = response["Item"]
1✔
390

391
        # Check if `is_banned` is set to True
392
        # self.assertTrue(
393
        #     updated_user.get("is_banned") is True,  # Updated assertion
394
        #     "User should be banned (is_banned should be True).",
395
        # )
396

397
        # Step 4: Check that `punishment_date` is set
398
        self.assertIn(
1✔
399
            "punishment_date",
400
            updated_user,
401
            "punishment_date should be set when user is banned.",
402
        )
403

404
    def test_unban_user(self):
1✔
405
        # Ban the user first by directly setting is_banned to True and setting punishment_date
406
        self.users_table.update_item(
1✔
407
            Key={"user_id": self.user_data["user_id"]},
408
            UpdateExpression="set is_banned = :b, punishment_date = :d",
409
            ExpressionAttributeValues={
410
                ":b": True,
411
                ":d": datetime.now(pytz.timezone("US/Eastern")).isoformat(),
412
            },
413
        )
414

415
        create_user(**self.user_data)
1✔
416

417
        # Step 1: Unban the user
418
        self.client.post(
1✔
419
            "/unban_user/",
420
            data=json.dumps({"user_id": self.user_data["user_id"]}),
421
            content_type="application/json",
422
            HTTP_X_REQUESTED_WITH="XMLHttpRequest",
423
        )
424
        # self.assertEqual(response.status_code, 200)
425
        # data = response.json()
426
        # self.assertEqual(
427
        #     data["message"],
428
        #     "User has been unbanned",
429
        #     "Unban message should confirm unban success.",
430
        # )
431

432
        # Step 2: Verify the user is unbanned and punishment_date is removed
433
        unbanned_user = get_user(self.user_data["user_id"])
1✔
434
        self.assertFalse(
1✔
435
            unbanned_user.get("is_banned"),
436
            "User's is_banned should be False after unban.",
437
        )
438
        self.assertFalse(
1✔
439
            hasattr(unbanned_user, "punishment_date"),
440
            "punishment_date should be removed when user is unbanned.",
441
        )
442

443
    def test_toggle_mute_user(self):
1✔
444
        # Step 1: Create the user
445
        create_user(**self.user_data)
1✔
446

447
        # Retrieve the user from DynamoDB to ensure the user is created
448
        user = get_user_by_uid("test_user_123")
1✔
449
        username = user.get("username")
1✔
450

451
        # Step 2: Ban the user by toggling `is_muted`
452
        response = self.client.post(
1✔
453
            "/mute_user/",
454
            data=json.dumps({"user_id": username}),
455
            content_type="application/json",
456
            HTTP_X_REQUESTED_WITH="XMLHttpRequest",
457
        )
458

459
        # Step 3: Manually retrieve the user from DynamoDB to verify `is_muted` is True
460
        response = self.users_table.get_item(Key={"user_id": self.user_data["user_id"]})
1✔
461
        # self.assertIn("Item", response, "User was not found in DynamoDB after muting.")
462
        updated_user = response["Item"]
1✔
463

464
        # Check if `is_muted` is set to True
465
        # self.assertTrue(
466
        #     updated_user.get("is_muted", True),
467
        #     "User should be banned (is_muted should be True).",
468
        # )
469

470
        # Step 4: Check that `punishment_date` is set
471
        self.assertIn(
1✔
472
            "punishment_date",
473
            updated_user,
474
            "punishment_date should be set when user is banned.",
475
        )
476

477
    def test_unmute_user(self):
1✔
478
        # Unmute the user first by directly setting is_banned to True and setting punishment_date
479
        self.users_table.update_item(
1✔
480
            Key={"user_id": self.user_data["user_id"]},
481
            UpdateExpression="set is_muted = :b, punishment_date = :d",
482
            ExpressionAttributeValues={
483
                ":b": True,
484
                ":d": datetime.now(pytz.timezone("US/Eastern")).isoformat(),
485
            },
486
        )
487

488
        create_user(**self.user_data)
1✔
489

490
        # Step 1: Unmute the user
491
        self.client.post(
1✔
492
            "/unmute_user/",
493
            data=json.dumps({"user_id": self.user_data["user_id"]}),
494
            content_type="application/json",
495
            HTTP_X_REQUESTED_WITH="XMLHttpRequest",
496
        )
497
        # self.assertEqual(response.status_code, 200)
498
        # data = response.json()
499
        # #self.assertEqual(
500
        #     data["message"],
501
        #     "User has been unmuted",
502
        #     "Unmute message should confirm unmute success.",
503
        # #)
504

505
        # Step 2: Verify the user is unmuted and punishment_date is removed
506
        unmuted_user = get_user(self.user_data["user_id"])
1✔
507
        self.assertTrue(
1✔
508
            unmuted_user.get("is_muted") is not True,
509
            "User's is_muted should be False after unban.",
510
        )
511
        self.assertFalse(
1✔
512
            hasattr(unmuted_user, "punishment_date"),
513
            "punishment_date should be removed when user is unmuted.",
514
        )
515

516
    def test_delete_user(self):
1✔
517
        # Ensure the user exists before testing deletion
518
        self.users_table.put_item(Item=self.user_data)
1✔
519

520
        # Step 2: Delete the user by username
521
        delete_result = delete_user_by_username(self.user_data["username"])
1✔
522
        self.assertTrue(delete_result, "User deletion failed.")
1✔
523

524
        # Verify that the user was deleted
525
        response = self.users_table.get_item(Key={"user_id": self.user_data["user_id"]})
1✔
526
        self.assertNotIn("Item", response, "User was found in DynamoDB after deletion.")
1✔
527

528
    def tearDown(self):
1✔
529
        # Clean up: If the test fails to delete the user, remove it manually
530
        try:
1✔
531
            self.users_table.delete_item(Key={"user_id": self.user_data["user_id"]})
1✔
532
        except ClientError:
×
533
            pass  # Ignore if the item was already deleted
×
534

535

536
class WarnActionTest(TestCase):
1✔
537
    def setUp(self):
1✔
538
        # Mock user data
539
        self.user_data = {
1✔
540
            "user_id": "test_user_123",
541
            "username": "test_user123",
542
            "email": "test_user@example.com",
543
            "name": "Test User",
544
            "date_of_birth": "1990-01-01",
545
            "gender": "O",
546
            "height": "183",
547
            "weight": "83",
548
            "password": "hashed_password",
549
            "is_warned": False,
550
        }
551
        # Create the user in the database
552
        create_user(**self.user_data)
1✔
553

554
        self.factory = RequestFactory()
1✔
555

556
    def test_warn_user_for_thread(self):
1✔
557
        # Simulate POST request to warn a user for a thread
558
        data = {
1✔
559
            "action": "warn_thread",
560
            "thread_id": "thread_123",
561
            "user_id": self.user_data["username"],
562
        }
563
        request = self.factory.post(
1✔
564
            "/warn_action/",
565
            data=json.dumps(data),
566
            content_type="application/json",
567
        )
568
        response = warn_action(request)
1✔
569

570
        # Assert response and user's warning status
571
        self.assertEqual(response.status_code, 200)
1✔
572
        self.assertEqual(
1✔
573
            json.loads(response.content)["message"],
574
            "User warned for thread successfully.",
575
        )
576
        warned_user = get_user(self.user_data["user_id"])
1✔
577
        self.assertTrue(warned_user.get("is_warned"))
1✔
578

579
    def test_warn_user_for_comment(self):
1✔
580
        # Simulate POST request to warn a user for a comment
581
        data = {
1✔
582
            "action": "warn_comment",
583
            "post_id": "post_123",
584
            "user_id": self.user_data["username"],
585
        }
586
        request = self.factory.post(
1✔
587
            "/warn_action/",
588
            data=json.dumps(data),
589
            content_type="application/json",
590
        )
591
        response = warn_action(request)
1✔
592

593
        # Assert response and user's warning status
594
        self.assertEqual(response.status_code, 200)
1✔
UNCOV
595
        self.assertEqual(
×
596
            json.loads(response.content)["message"],
597
            "User warned for comment successfully.",
598
        )
UNCOV
599
        warned_user = get_user(self.user_data["user_id"])
×
UNCOV
600
        self.assertTrue(warned_user.get("is_warned"))
×
601

602
    def test_warn_user_invalid_action(self):
1✔
603
        # Simulate POST request with invalid action
604
        data = {
1✔
605
            "action": "invalid_action",
606
            "user_id": self.user_data["username"],
607
        }
608
        request = self.factory.post(
1✔
609
            "/warn_action/",
610
            data=json.dumps(data),
611
            content_type="application/json",
612
        )
613
        response = warn_action(request)
1✔
614

615
        # Assert response for invalid action
616
        self.assertEqual(response.status_code, 400)
1✔
617
        self.assertEqual(
1✔
618
            json.loads(response.content)["message"], "Invalid action or ID."
619
        )
620

621
    def tearDown(self):
1✔
622
        # Clean up the test user
623
        delete_user_by_username(self.user_data["username"])
1✔
624

625

626
class DismissWarningTest(TestCase):
1✔
627
    def setUp(self):
1✔
628
        # Mock user data
629
        self.user_data = {
1✔
630
            "user_id": "test_user_123",
631
            "username": "test_user123",
632
            "email": "test_user@example.com",
633
            "name": "Test User",
634
            "date_of_birth": "1990-01-01",
635
            "gender": "O",
636
            "height": "183",
637
            "weight": "83",
638
            "password": "hashed_password",
639
            "is_warned": True,
640
        }
641
        # Create the user in the database
642
        create_user(**self.user_data)
1✔
643

644
        self.factory = RequestFactory()
1✔
645

646
    def test_dismiss_warning(self):
1✔
647
        # Simulate user session
648
        request = self.factory.post("/dismiss_warning/")
1✔
649
        request.session = {"user_id": self.user_data["user_id"]}
1✔
650

651
        # Call the dismiss_warning view
652
        response = dismiss_warning(request)
1✔
653
        response_data = json.loads(response.content)
1✔
654
        # Assert response and user's warning status
655
        self.assertEqual(response.status_code, 200)
1✔
656
        self.assertEqual(
1✔
657
            response_data["message"],
658
            f"User {self.user_data['user_id']} warning dismissed.",
659
        )
660
        updated_user = get_user(self.user_data["user_id"])
1✔
661
        self.assertFalse(updated_user.get("is_warned"))
1✔
662

663
    def test_dismiss_warning_no_user_id(self):
1✔
664
        # Simulate request without a user ID in the session
665
        request = self.factory.post("/dismiss_warning/")
1✔
666
        request.session = {}
1✔
667

668
        # Call the dismiss_warning view
669
        response = dismiss_warning(request)
1✔
670

671
        # Assert response for missing user ID
672
        self.assertEqual(response.status_code, 400)
1✔
673
        self.assertEqual(json.loads(response.content)["message"], "User ID is missing.")
1✔
674

675
    def tearDown(self):
1✔
676
        # Clean up the test user
677
        delete_user_by_username(self.user_data["username"])
1✔
678

679

680
class DeactivateAccountTest(TestCase):
1✔
681
    def setUp(self):
1✔
682
        # Mock user data
683
        self.user_data = {
1✔
684
            "user_id": "test_user_123",
685
            "username": "test_user123",
686
            "email": "test_user@example.com",
687
            "name": "Test User",
688
            "date_of_birth": "1990-01-01",
689
            "gender": "O",
690
            "height": "183",
691
            "weight": "83",
692
            "password": "hashed_password",
693
        }
694

695
        # Create the user in the database
696
        create_user(**self.user_data)
1✔
697

698
        self.factory = RequestFactory()
1✔
699

700
    def add_session_to_request(self, request):
1✔
701
        """Helper function to attach session middleware to the request."""
702
        middleware = SessionMiddleware(lambda x: x)
1✔
703
        middleware.process_request(request)
1✔
704
        request.session.save()
1✔
705

706
    def test_deactivate_account_get(self):
1✔
707
        """Test that deactivate_account view renders the confirmation page."""
708
        request = self.factory.get("/deactivate/")
1✔
709
        self.add_session_to_request(request)
1✔
710

711
        response = deactivate_account(request)
1✔
712

713
        self.assertEqual(response.status_code, 200)
1✔
714
        self.assertContains(response, "Deactivate Your Account")
1✔
715
        self.assertContains(
1✔
716
            response, "Are you sure you want to deactivate your account?"
717
        )
718

719
    @patch("FitOn.views.delete_user_by_username", return_value=True)
1✔
720
    def test_confirm_deactivation_success(self, mock_delete_user):
1✔
721
        """Test successful account deactivation."""
722
        request = self.factory.post("/deactivate/confirm/")
1✔
723
        self.add_session_to_request(request)
1✔
724
        request.session["username"] = self.user_data["username"]
1✔
725

726
        # Call the confirm_deactivation view
727
        response = confirm_deactivation(request)
1✔
728

729
        # Assertions
730
        self.assertEqual(response.status_code, 302)  # Redirect after success
1✔
731
        self.assertEqual(response.url, reverse("homepage"))  # Redirects to homepage
1✔
732
        self.assertNotIn("username", request.session)  # Session flushed
1✔
733
        mock_delete_user.assert_called_once_with(self.user_data["username"])
1✔
734

735
    def test_confirm_deactivation_no_session_user(self):
1✔
736
        """Test deactivation attempt with no username in session."""
737
        request = self.factory.post("/deactivate/confirm/")
1✔
738
        self.add_session_to_request(request)
1✔
739

740
        response = confirm_deactivation(request)
1✔
741

742
        self.assertEqual(response.status_code, 302)  # Redirect to login page
1✔
743
        self.assertEqual(response.url, reverse("login"))
1✔
744

745
    def tearDown(self):
1✔
746
        """Clean up the test user."""
747
        delete_user_by_username(self.user_data["username"])
1✔
748

749

750
class ForumTests(TestCase):
1✔
751
    def setUp(self):
1✔
752
        # Initialize DynamoDB resource and Threads table
753
        self.dynamodb = boto3.resource("dynamodb", region_name="us-west-2")
1✔
754

755
        self.users_table = self.dynamodb.Table("Users")
1✔
756
        self.posts_table = self.dynamodb.Table("ForumPosts")
1✔
757

758
        # Define user data for the test
759
        self.user_data = {
1✔
760
            "user_id": "test_user_123",
761
            "username": "test_user123",
762
            "email": "test_user@example.com",
763
            "name": "Test User",
764
            "date_of_birth": "1990-01-01",
765
            "gender": "O",
766
            "password": "hashed_password",
767
        }
768
        self.threads_table = self.dynamodb.Table("ForumThreads")
1✔
769

770
        # Define thread data for the test
771
        self.thread_data = {
1✔
772
            "title": "Test Thread",
773
            "user_id": "test_user_123",
774
            "content": "This is a test thread content",
775
        }
776

777
    def test_create_thread(self):
1✔
778
        # Step 1: Create the thread
779
        thread = create_thread(
1✔
780
            title=self.thread_data["title"],
781
            user_id=self.thread_data["user_id"],
782
            content=self.thread_data["content"],
783
        )
784

785
        # Verify that the thread has a ThreadID and CreatedAt
786
        self.assertIn("ThreadID", thread, "ThreadID should be generated and set.")
1✔
787
        self.assertIn("CreatedAt", thread, "CreatedAt should be generated and set.")
1✔
788

789
        # Step 2: Retrieve the thread from DynamoDB to verify it was added
790
        response = self.threads_table.get_item(Key={"ThreadID": thread["ThreadID"]})
1✔
791
        self.assertIn("Item", response, "Thread not found in DynamoDB after creation.")
1✔
792

793
        created_thread = response["Item"]
1✔
794
        self.assertEqual(
1✔
795
            created_thread["Title"],
796
            self.thread_data["title"],
797
            "Thread title does not match.",
798
        )
799
        self.assertEqual(
1✔
800
            created_thread["UserID"],
801
            self.thread_data["user_id"],
802
            "Thread user_id does not match.",
803
        )
804
        self.assertEqual(
1✔
805
            created_thread["Content"],
806
            self.thread_data["content"],
807
            "Thread content does not match.",
808
        )
809
        self.assertEqual(created_thread["Likes"], 0, "Initial likes count should be 0.")
1✔
810
        self.assertEqual(
1✔
811
            created_thread["LikedBy"], [], "Initial LikedBy list should be empty."
812
        )
813

814
    def test_get_thread(self):
1✔
815
        # Step 1: Create a sample thread
816
        thread = create_thread(
1✔
817
            title=self.thread_data["title"],
818
            user_id=self.thread_data["user_id"],
819
            content=self.thread_data["content"],
820
        )
821

822
        # Step 2: Retrieve the thread using get_thread with matching parameters
823
        retrieved_thread = get_thread(
1✔
824
            title=thread["Title"],
825
            user_id=thread["UserID"],
826
            content=thread["Content"],
827
            created_at=thread["CreatedAt"],
828
        )
829

830
        # Step 3: Verify the retrieved thread matches the created thread
831
        self.assertIsNotNone(retrieved_thread, "Thread should be found.")
1✔
832
        self.assertEqual(
1✔
833
            retrieved_thread["Title"], thread["Title"], "Thread title does not match."
834
        )
835
        self.assertEqual(
1✔
836
            retrieved_thread["UserID"],
837
            thread["UserID"],
838
            "Thread user ID does not match.",
839
        )
840
        self.assertEqual(
1✔
841
            retrieved_thread["Content"],
842
            thread["Content"],
843
            "Thread content does not match.",
844
        )
845

846
    def test_fetch_all_users(self):
1✔
847
        # Add threads to the DynamoDB table for testing
848
        test_threads = [
1✔
849
            {
850
                "ThreadID": "201",
851
                "Title": "Thread 1",
852
                "UserID": "user_123",
853
                "CreatedAt": "2024-11-01T10:00:00",
854
                "Content": "This is thread 1 content.",
855
            },
856
            {
857
                "ThreadID": "202",
858
                "Title": "Thread 2",
859
                "UserID": "user_456",
860
                "CreatedAt": "2024-11-02T11:00:00",
861
                "Content": "This is thread 2 content.",
862
            },
863
            {
864
                "ThreadID": "203",
865
                "Title": "Thread 3",
866
                "UserID": "user_123",  # Duplicate UserID
867
                "CreatedAt": "2024-11-03T12:00:00",
868
                "Content": "This is thread 3 content.",
869
            },
870
        ]
871
        for thread in test_threads:
1✔
872
            self.threads_table.put_item(Item=thread)
1✔
873

874
        try:
1✔
875
            # Fetch all unique users
876
            users = fetch_all_users()
1✔
877

878
            # Verify that the correct number of unique users is returned
879
            self.assertGreater(len(users), 2)
1✔
880

881
            # Verify the unique usernames
882
            usernames = [user["username"] for user in users]
1✔
883
            self.assertIn("user_123", usernames)
1✔
884
            self.assertIn("user_456", usernames)
1✔
885

886
        finally:
887
            # Cleanup the test data
888
            for thread in test_threads:
1✔
889
                self.threads_table.delete_item(Key={"ThreadID": thread["ThreadID"]})
1✔
890

891
    def test_get_thread_details(self):
1✔
892
        # Add posts to the DynamoDB table for testing
893
        test_posts = [
1✔
894
            {
895
                "PostID": "301",
896
                "ThreadID": "thread_123",
897
                "UserID": "user_123",
898
                "Content": "This is a post in thread_123.",
899
                "CreatedAt": "2024-11-01T10:00:00",
900
            },
901
            {
902
                "PostID": "302",
903
                "ThreadID": "thread_456",
904
                "UserID": "user_456",
905
                "Content": "This is a post in thread_456.",
906
                "CreatedAt": "2024-11-02T11:00:00",
907
            },
908
        ]
909
        for post in test_posts:
1✔
910
            self.posts_table.put_item(Item=post)
1✔
911

912
        try:
1✔
913
            # Fetch details for an existing thread
914
            thread_details = get_thread_details("thread_123")
1✔
915
            self.assertIsNotNone(thread_details, "Thread details should not be None.")
1✔
916
            self.assertEqual(thread_details["ThreadID"], "thread_123")
1✔
917
            self.assertEqual(thread_details["Content"], "This is a post in thread_123.")
1✔
918
            self.assertEqual(thread_details["UserID"], "user_123")
1✔
919

920
            # Fetch details for a non-existing thread
921
            non_existing_thread_details = get_thread_details("non_existing_thread")
1✔
922
            self.assertIsNone(
1✔
923
                non_existing_thread_details,
924
                "Thread details for a non-existing thread should be None.",
925
            )
926

927
        finally:
928
            # Cleanup the test data
929
            for post in test_posts:
1✔
930
                self.posts_table.delete_item(
1✔
931
                    Key={"PostID": post["PostID"], "ThreadID": post["ThreadID"]}
932
                )
933

934
    def test_create_post(self):
1✔
935
        # Setup: Create a thread for the post to be attached to
936
        thread = create_thread(
1✔
937
            title=self.thread_data["title"],
938
            user_id=self.thread_data["user_id"],
939
            content=self.thread_data["content"],
940
        )
941
        thread_id = thread["ThreadID"]
1✔
942

943
        # Create a post in the thread
944
        post = create_post(thread_id, "test_user_123", "This is a test post content.")
1✔
945
        post_id = post["PostID"]
1✔
946

947
        try:
1✔
948
            # Fetch the post from DynamoDB
949
            post_response = self.posts_table.get_item(
1✔
950
                Key={"PostID": post_id, "ThreadID": thread_id}
951
            )
952

953
            # Assertions for the post
954
            self.assertIn("Item", post_response)
1✔
955
            self.assertEqual(post_response["Item"]["ThreadID"], thread_id)
1✔
956
            self.assertEqual(post_response["Item"]["UserID"], "test_user_123")
1✔
957
            self.assertEqual(
1✔
958
                post_response["Item"]["Content"], "This is a test post content."
959
            )
960

961
        finally:
962
            # Cleanup: Delete the created thread and post from DynamoDB
963
            self.threads_table.delete_item(Key={"ThreadID": thread_id})
1✔
964
            self.posts_table.delete_item(Key={"PostID": post_id, "ThreadID": thread_id})
1✔
965

966
    def test_fetch_filtered_threads(self):
1✔
967
        # Add threads to the DynamoDB table for testing
968
        test_threads = [
1✔
969
            {
970
                "ThreadID": "101",
971
                "Title": "General Discussion",
972
                "UserID": "test_user_123",
973
                "CreatedAt": "2024-09-02T10:00:00",
974
                "Section": "general",
975
                "Content": "This is a general discussion thread.",
976
                "ReplyCount": 0,
977
            },
978
            {
979
                "ThreadID": "102",
980
                "Title": "Support Needed",
981
                "UserID": "test_user_456",
982
                "CreatedAt": "2024-11-02T11:00:00",
983
                "Section": "support",
984
                "Content": "This is a support thread.",
985
                "ReplyCount": 3,
986
            },
987
        ]
988
        for thread in test_threads:
1✔
989
            self.threads_table.put_item(Item=thread)
1✔
990

991
        try:
1✔
992
            # Test filtering by section
993
            threads = fetch_filtered_threads(section="general")
1✔
994
            self.assertEqual(len(threads), 1)
1✔
995
            self.assertEqual(threads[0]["ThreadID"], "101")
1✔
996
            self.assertEqual(threads[0]["Section"], "general")
1✔
997

998
            # Test filtering by username
999
            threads = fetch_filtered_threads(username="test_user_456")
1✔
1000
            self.assertEqual(len(threads), 1)
1✔
1001
            self.assertEqual(threads[0]["UserID"], "test_user_456")
1✔
1002

1003
            # Test filtering by date range
1004
            threads = fetch_filtered_threads(
1✔
1005
                start_date="2024-09-01", end_date="2024-09-02"
1006
            )
1007
            self.assertEqual(len(threads), 0)
1✔
1008

1009
            # Test filtering by text search
1010
            threads = fetch_filtered_threads(search_text="support")
1✔
1011
            self.assertEqual(len(threads), 1)
1✔
1012
            self.assertEqual(threads[0]["ThreadID"], "102")
1✔
1013

1014
            # Test fetching all threads
1015
            threads = fetch_filtered_threads()
1✔
1016
            self.assertGreater(len(threads), 0)
1✔
1017

1018
        finally:
1019
            # Cleanup the test data
1020
            for thread in test_threads:
1✔
1021
                self.threads_table.delete_item(Key={"ThreadID": thread["ThreadID"]})
1✔
1022

1023
    def test_delete_post(self):
1✔
1024
        # Add a post to the DynamoDB table for testing
1025
        test_thread = {
1✔
1026
            "ThreadID": "101",
1027
            "Title": "General Discussion",
1028
            "UserID": "test_user_123",
1029
            "CreatedAt": "2024-09-02T10:00:00",
1030
            "Section": "general",
1031
            "Content": "This is a general discussion thread.",
1032
            "ReplyCount": 0,
1033
        }
1034

1035
        test_post = {
1✔
1036
            "PostID": "401",
1037
            "ThreadID": "101",
1038
            "UserID": "user_123",
1039
            "Content": "This is a test post.",
1040
            "CreatedAt": "2024-11-01T10:00:00",
1041
        }
1042
        self.threads_table.put_item(Item=test_thread)
1✔
1043
        self.posts_table.put_item(Item=test_post)
1✔
1044

1045
        try:
1✔
1046
            # Verify the post exists before deletion
1047
            response = self.posts_table.get_item(
1✔
1048
                Key={"ThreadID": test_post["ThreadID"], "PostID": test_post["PostID"]}
1049
            )
1050
            self.assertIn(
1✔
1051
                "Item", response, "Post should exist in the table before deletion."
1052
            )
1053

1054
            # Call the delete_post function
1055
            delete_result = delete_post(test_post["PostID"], test_post["ThreadID"])
1✔
1056
            self.assertTrue(delete_result, "Post deletion should return True.")
1✔
1057

1058
            # Verify the post no longer exists after deletion
1059
            response = self.posts_table.get_item(
1✔
1060
                Key={"ThreadID": test_post["ThreadID"], "PostID": test_post["PostID"]}
1061
            )
1062
            self.assertNotIn(
1✔
1063
                "Item",
1064
                response,
1065
                "Post should no longer exist in the table after deletion.",
1066
            )
1067

1068
        finally:
1069
            # Cleanup: Ensure the post is deleted, in case the function failed
1070
            self.posts_table.delete_item(
1✔
1071
                Key={"ThreadID": test_post["ThreadID"], "PostID": test_post["PostID"]}
1072
            )
1073

1074
            self.threads_table.delete_item(Key={"ThreadID": test_post["ThreadID"]})
1✔
1075

1076
    def test_delete_thread_by_id(self):
1✔
1077
        # Step 1: Create a sample thread
1078
        thread = create_thread(
1✔
1079
            title=self.thread_data["title"],
1080
            user_id=self.thread_data["user_id"],
1081
            content=self.thread_data["content"],
1082
        )
1083
        thread_id = thread["ThreadID"]
1✔
1084

1085
        # Verify the thread exists in DynamoDB
1086
        response = self.threads_table.get_item(Key={"ThreadID": thread_id})
1✔
1087
        self.assertIn(
1✔
1088
            "Item", response, "Thread should exist in DynamoDB before deletion."
1089
        )
1090

1091
        # Step 2: Delete the thread
1092
        delete_result = delete_thread_by_id(thread_id)
1✔
1093
        self.assertTrue(delete_result, "Thread deletion failed.")
1✔
1094

1095
        # Step 3: Verify the thread is deleted
1096
        response = self.threads_table.get_item(Key={"ThreadID": thread_id})
1✔
1097
        self.assertNotIn("Item", response, "Thread should be deleted from DynamoDB.")
1✔
1098

1099
    def test_fetch_all_threads(self):
1✔
1100
        self.test_threads = [
1✔
1101
            {
1102
                "ThreadID": "1",
1103
                "Title": "Test",
1104
                "UserID": "test_user_123",
1105
                "CreatedAt": "2024-11-01T10:00:00",
1106
                "Content": "Test",
1107
            },
1108
        ]
1109
        for thread in self.test_threads:
1✔
1110
            self.threads_table.put_item(Item=thread)
1✔
1111

1112
        # Call the function
1113
        threads = fetch_all_threads()
1✔
1114

1115
        # Check the first thread of test user's threads' properties
1116
        thread = next((t for t in threads if t.get("UserID") == "test_user_123"), None)
1✔
1117
        self.assertEqual(thread["ThreadID"], "1")
1✔
1118
        self.assertEqual(thread["ReplyCount"], 0)
1✔
1119
        self.assertEqual(thread["LastPostUser"], "No replies yet")
1✔
1120
        self.assertEqual(
1✔
1121
            thread["CreatedAt"], datetime.fromisoformat("2024-11-01T10:00:00")
1122
        )
1123

1124
    def test_fetch_thread(self):
1✔
1125
        self.test_thread = {
1✔
1126
            "ThreadID": "123",
1127
            "Title": "Test Thread",
1128
            "UserID": "test_user_123",
1129
            "CreatedAt": "2024-11-14T10:00:00",
1130
        }
1131
        self.threads_table.put_item(Item=self.test_thread)
1✔
1132

1133
        thread = fetch_thread("123")
1✔
1134

1135
        # Assertions to verify the thread details
1136
        self.assertIsNotNone(thread)
1✔
1137
        self.assertEqual(thread["ThreadID"], "123")
1✔
1138
        self.assertEqual(thread["Title"], "Test Thread")
1✔
1139
        self.assertEqual(thread["CreatedAt"], "2024-11-14T10:00:00")
1✔
1140

1141
        # Call the function to fetch a non-existing thread
1142
        thread = fetch_thread("999")
1✔
1143

1144
        # Assertion to verify the function returns None
1145
        self.assertIsNone(thread)
1✔
1146

1147
    def test_get_section_stats(self):
1✔
1148
        # Add threads to the DynamoDB table for testing
1149
        test_threads = [
1✔
1150
            {
1151
                "ThreadID": "501",
1152
                "Title": "Thread 1",
1153
                "UserID": "user_123",
1154
                "Section": "general",
1155
                "CreatedAt": "2024-11-01T10:00:00",
1156
                "PostCount": 5,
1157
            },
1158
            {
1159
                "ThreadID": "502",
1160
                "Title": "Thread 2",
1161
                "UserID": "user_456",
1162
                "Section": "general",
1163
                "CreatedAt": "2024-11-02T11:00:00",
1164
                "PostCount": 2,
1165
            },
1166
            {
1167
                "ThreadID": "503",
1168
                "Title": "Thread 3",
1169
                "UserID": "user_789",
1170
                "Section": "support",
1171
                "CreatedAt": "2024-11-03T12:00:00",
1172
                "PostCount": 7,
1173
            },
1174
        ]
1175
        for thread in test_threads:
1✔
1176
            self.threads_table.put_item(Item=thread)
1✔
1177

1178
        try:
1✔
1179
            # Fetch stats for the "general" section
1180
            stats = get_section_stats("general")
1✔
1181

1182
            # Verify thread count is greater than 1
1183
            self.assertGreater(stats["thread_count"], 1)
1✔
1184

1185
            # Verify post count is greater than 6
1186
            self.assertGreater(stats["post_count"], 6)
1✔
1187

1188
            # Verify the latest thread details
1189
            latest_thread = stats["latest_thread"]
1✔
1190
            self.assertEqual(latest_thread["title"], "Thread 2")
1✔
1191
            self.assertEqual(latest_thread["author"], "user_456")
1✔
1192
            self.assertEqual(latest_thread["thread_id"], "502")
1✔
1193
            self.assertEqual(
1✔
1194
                latest_thread["created_at"],
1195
                datetime.fromisoformat("2024-11-02T11:00:00"),
1196
            )
1197

1198
            # Fetch stats for a section with no threads
1199
            empty_section_stats = get_section_stats("non_existing_section")
1✔
1200
            self.assertEqual(empty_section_stats["thread_count"], 0)
1✔
1201
            self.assertEqual(empty_section_stats["post_count"], 0)
1✔
1202
            self.assertEqual(
1✔
1203
                empty_section_stats["latest_thread"]["title"], "No threads"
1204
            )
1205
            self.assertEqual(empty_section_stats["latest_thread"]["author"], "N/A")
1✔
1206

1207
        finally:
1208
            # Cleanup the test data
1209
            for thread in test_threads:
1✔
1210
                self.threads_table.delete_item(Key={"ThreadID": thread["ThreadID"]})
1✔
1211

1212
    def tearDown(self):
1✔
1213
        delete_threads_by_user("test_user_123")
1✔
1214

1215

1216
# ##########################################################
1217
#       TEST CASE FOR GOOGLE AUTHENTICATION               #
1218
# ##########################################################
1219

1220

1221
def add_middleware(request):
1✔
1222
    """Helper function to add session and message middleware to the request."""
1223
    session_middleware = SessionMiddleware(lambda req: None)
1✔
1224
    session_middleware.process_request(request)
1✔
1225
    request.session.save()
1✔
1226

1227
    message_middleware = MessageMiddleware(lambda req: None)
1✔
1228
    message_middleware.process_request(request)
1✔
1229

1230

1231
class GoogleFitViewsTestCase(TestCase):
1✔
1232
    def setUp(self):
1✔
1233
        self.factory = RequestFactory()
1✔
1234

1235
    @patch("google_auth_oauthlib.flow.Flow.from_client_config")
1✔
1236
    @patch("django.shortcuts.redirect")
1✔
1237
    def test_authorize_google_fit_redirects_to_auth_url(
1✔
1238
        self, mock_redirect, mock_flow_from_client_config
1239
    ):
1240
        # Mock the Flow object
1241
        mock_flow = MagicMock()
1✔
1242
        mock_flow.authorization_url.return_value = (
1✔
1243
            "https://example.com/auth",
1244
            "state123",
1245
        )
1246
        mock_flow_from_client_config.return_value = mock_flow
1✔
1247

1248
        # Create a request with no credentials in session
1249
        request = self.factory.get(reverse("authorize_google_fit"))
1✔
1250
        add_middleware(request)
1✔
1251
        request.session["google_fit_credentials"] = None
1✔
1252

1253
        # Call the view
1254
        authorize_google_fit(request)
1✔
1255

1256
        # # Assertions
1257
        # mock_flow_from_client_config.assert_called_once_with(GOOGLEFIT_CLIENT_CONFIG, SCOPES)
1258
        # mock_flow.authorization_url.assert_called_once_with(
1259
        #     access_type="offline", include_granted_scopes="true"
1260
        # )
1261
        # mock_redirect.assert_called_once_with("https://example.com/auth")
1262
        # self.assertIn("google_fit_state", request.session)
1263
        # self.assertEqual(request.session["google_fit_state"], "state123")
1264

1265
    @patch("google_auth_oauthlib.flow.Flow.from_client_config")
1✔
1266
    @patch("django.shortcuts.render")
1✔
1267
    @patch("FitOn.views.get_user")
1✔
1268
    def test_callback_google_fit_success(
1✔
1269
        self, mock_get_user, mock_render, mock_flow_from_client_config
1270
    ):
1271
        # Mock the Flow object
1272
        mock_flow = MagicMock()
1✔
1273
        mock_flow.fetch_token.return_value = None
1✔
1274
        mock_flow.credentials = MagicMock(
1✔
1275
            token="test_token",
1276
            refresh_token="test_refresh_token",
1277
            token_uri="test_token_uri",
1278
            client_id="test_client_id",
1279
            client_secret="test_client_secret",
1280
            scopes=["scope1", "scope2"],
1281
        )
1282
        mock_flow_from_client_config.return_value = mock_flow
1✔
1283

1284
        # Mock user data
1285
        mock_user = {
1✔
1286
            "name": "Test User",
1287
            "date_of_birth": "1990-01-01",
1288
            "email": "test@example.com",
1289
            "gender": "male",
1290
            "phone_number": "1234567890",
1291
            "address": "123 Test Street",
1292
            "bio": "Test bio",
1293
            "country_code": "US",
1294
        }
1295
        mock_get_user.return_value = mock_user
1✔
1296

1297
        # Create a request with state in session
1298
        request = self.factory.get(reverse("callback_google_fit"))
1✔
1299
        add_middleware(request)
1✔
1300
        request.session["google_fit_state"] = "state123"
1✔
1301
        request.session["user_id"] = "user123"
1✔
1302

1303
        # Call the view
1304
        callback_google_fit(request)
1✔
1305

1306
        # # Assertions
1307
        # self.assertIn("credentials", request.session)
1308
        # messages = list(get_messages(request))
1309
        # self.assertTrue(any("Signed in Successfully" in str(m) for m in messages))
1310
        # mock_render.assert_called_once_with(
1311
        #     request,
1312
        #     "profile.html",
1313
        #     {"login_success": True, "form": mock.ANY, "user": mock_user},
1314
        # )
1315

1316
    @patch("google_auth_oauthlib.flow.Flow.from_client_config")
1✔
1317
    @patch("django.shortcuts.redirect")
1✔
1318
    def test_callback_google_fit_invalid_state(
1✔
1319
        self, mock_redirect, mock_flow_from_client_config
1320
    ):
1321
        # Create a request with no state in session
1322
        request = self.factory.get(reverse("callback_google_fit"))
1✔
1323
        add_middleware(request)
1✔
1324
        request.session["user_id"] = "user123"
1✔
1325

1326
        # Call the view
1327
        callback_google_fit(request)
1✔
1328

1329
        # # Assertions
1330
        # messages = list(get_messages(request))
1331
        # self.assertTrue(any("Sign-in failed. Please try again." in str(m) for m in messages))
1332
        # mock_redirect.assert_called_once_with(reverse("homepage"))
1333

1334
    @patch("requests.post")
1✔
1335
    def test_delink_google_fit_success(self, mock_post):
1✔
1336
        # Mock successful revoke response
1337
        mock_post.return_value.status_code = 200
1✔
1338

1339
        # Create a request with credentials in the session
1340
        request = self.factory.get(reverse("delink_google_fit"))
1✔
1341
        add_middleware(request)
1✔
1342
        request.session["credentials"] = {
1✔
1343
            "token": "test_token",
1344
            "refresh_token": "test_refresh_token",
1345
            "token_uri": "test_token_uri",
1346
            "client_id": "test_client_id",
1347
            "client_secret": "test_client_secret",
1348
            "scopes": ["scope1", "scope2"],
1349
        }
1350

1351
        # Call the view
1352
        delink_google_fit(request)
1✔
1353

1354

1355
###########################################################
1356
#       TEST CASEs FOR PASSWORD RESET              #
1357
###########################################################
1358

1359

1360
class PasswordResetTests(TestCase):
1✔
1361
    @classmethod
1✔
1362
    @override_settings(EMAIL_BACKEND="django.core.mail.backends.locmem.EmailBackend")
1✔
1363
    def setUpClass(cls):
1✔
1364
        super().setUpClass()
1✔
1365
        cls.client = Client()
1✔
1366

1367
        # Set up connection to actual DynamoDB tables
1368
        cls.dynamodb = boto3.resource("dynamodb", region_name="us-west-2")
1✔
1369
        cls.users_table = cls.dynamodb.Table("Users")
1✔
1370
        cls.password_reset_table = cls.dynamodb.Table("PasswordResetRequests")
1✔
1371

1372
    def setUp(self):
1✔
1373
        # Clear outbox for each test
1374
        mail.outbox = []
1✔
1375

1376
        # Create a mock user for testing in the actual Users table
1377
        self.mock_user = MockUser(
1✔
1378
            {
1379
                "user_id": "mock_user_id",
1380
                "username": "mockuser",
1381
                "email": "mockuser@example.com",
1382
                "password": make_password("mockpassword"),
1383
                "is_active": True,
1384
            }
1385
        )
1386

1387
        # Insert the mock user into the Users table
1388
        self.__class__.users_table.put_item(Item=self.mock_user.__dict__)
1✔
1389
        # print("Mock user inserted into DynamoDB for testing.")
1390

1391
    def tearDown(self):
1✔
1392
        # Delete the mock user from the Users and PasswordResetRequests tables
1393
        self.__class__.users_table.delete_item(Key={"user_id": self.mock_user.user_id})
1✔
1394
        self.__class__.password_reset_table.delete_item(
1✔
1395
            Key={"user_id": self.mock_user.user_id}
1396
        )
1397

1398
        # Clear the email outbox after each test
1399
        mail.outbox = []
1✔
1400
        super().tearDown()
1✔
1401

1402
    @classmethod
1✔
1403
    def tearDownClass(cls):
1✔
1404
        # Any additional cleanup at the class level can go here
1405
        super().tearDownClass()
1✔
1406

1407
    def test_password_reset_request_invalid_email(self):
1✔
1408
        # Test with an email that does not exist in the database
1409
        self.client.post(
1✔
1410
            reverse("password_reset_request"), {"email": "nonexistent@example.com"}
1411
        )
1412
        print("Testing password reset with a nonexistent email.")
1✔
1413

1414
        # Ensure no email was sent
1415
        self.assertEqual(
1✔
1416
            len(mail.outbox), 0, "Expected no email to be sent for non-existent email."
1417
        )
1418

1419
    def test_password_reset_request_valid_email(self):
1✔
1420
        # Test with the mock user's email
1421
        self.client.post(
1✔
1422
            reverse("password_reset_request"), {"email": self.mock_user.email}
1423
        )
1424

1425
        # Ensure an email was sent
1426
        self.assertEqual(len(mail.outbox), 1, "Expected exactly one email to be sent.")
1✔
1427
        email = mail.outbox[0]
1✔
1428
        self.assertEqual(email.to, [self.mock_user.email])
1✔
1429

1430
    def test_password_reset_link_in_email(self):
1✔
1431
        # Test if the password reset link is in the email
1432
        self.client.post(
1✔
1433
            reverse("password_reset_request"), {"email": self.mock_user.email}
1434
        )
1435
        self.assertEqual(len(mail.outbox), 1, "Expected one email in the outbox")
1✔
UNCOV
1436
        email = mail.outbox[0]
×
1437

1438
        # Check if the email contains a reset link
UNCOV
1439
        self.assertIn("reset your password", email.body.lower())
×
UNCOV
1440
        print(f"Password reset link sent to: {email.to}")
×
1441

1442
    def test_password_reset_confirm_with_valid_token(self):
1✔
1443
        # Generate a valid token for the mock user
1444
        token = default_token_generator.make_token(self.mock_user)
1✔
1445
        uid = urlsafe_base64_encode(force_bytes(self.mock_user.user_id))
1✔
1446

1447
        # Test accessing the password reset confirm page with a valid token
1448
        response = self.client.get(reverse("password_reset_confirm", args=[uid, token]))
1✔
1449
        self.assertEqual(response.status_code, 200)
1✔
1450
        self.assertContains(response, "Set New Password")
1✔
1451

1452
    def test_password_reset_confirm_with_invalid_token(self):
1✔
1453
        # Generate an invalid token and test the reset confirm page
1454
        uid = urlsafe_base64_encode(force_bytes(self.mock_user.user_id))
1✔
1455
        invalid_token = "invalid-token"
1✔
1456

1457
        response = self.client.get(
1✔
1458
            reverse("password_reset_confirm", args=[uid, invalid_token])
1459
        )
1460
        self.assertEqual(response.status_code, 200)
1✔
1461
        self.assertContains(
1✔
1462
            response, "The password reset link is invalid or has expired."
1463
        )
1464

1465
    def test_password_reset_mismatched_passwords(self):
1✔
1466
        # Generate a valid token
1467
        token = default_token_generator.make_token(self.mock_user)
1✔
1468
        uid = urlsafe_base64_encode(force_bytes(self.mock_user.user_id))
1✔
1469

1470
        # Post mismatched passwords
1471
        response = self.client.post(
1✔
1472
            reverse("password_reset_confirm", args=[uid, token]),
1473
            {"new_password": "newpassword123", "confirm_password": "wrongpassword"},
1474
        )
1475
        self.assertContains(response, "Passwords do not match.", status_code=200)
1✔
1476

1477
    def test_successful_password_reset(self):
1✔
1478
        # Generate a valid token and UID
1479
        token = default_token_generator.make_token(self.mock_user)
1✔
1480
        uid = urlsafe_base64_encode(force_bytes(self.mock_user.user_id))
1✔
1481

1482
        # Successfully reset the password
1483
        response = self.client.post(
1✔
1484
            reverse("password_reset_confirm", args=[uid, token]),
1485
            {"new_password": "newpassword123", "confirm_password": "newpassword123"},
1486
            follow=True,
1487
        )
1488
        self.assertRedirects(response, reverse("password_reset_complete"))
1✔
1489

1490
        # Verify new password by attempting to log in
1491
        updated_user = get_user_by_email(self.mock_user.email)
1✔
1492
        self.assertTrue(
1✔
1493
            updated_user and updated_user.password, "Password reset was not successful."
1494
        )
1495

1496
    def test_password_reset_complete_view(self):
1✔
1497
        # Test if the password reset complete page renders correctly
1498
        response = self.client.get(reverse("password_reset_complete"))
1✔
1499
        self.assertEqual(response.status_code, 200)
1✔
1500
        self.assertContains(response, "Your password has been successfully reset.")
1✔
1501

1502
    def test_password_reset_throttling(self):
1✔
1503
        # First password reset request
1504
        response = self.client.post(
1✔
1505
            reverse("password_reset_request"), {"email": self.mock_user.email}
1506
        )
1507
        self.assertEqual(len(mail.outbox), 1, "Expected exactly one email to be sent.")
1✔
1508

1509
        # Attempt a second request immediately, which should be throttled
1510
        response = self.client.post(
1✔
1511
            reverse("password_reset_request"), {"email": self.mock_user.email}
1512
        )
1513
        self.assertContains(response, "Please wait", status_code=200)
1✔
1514
        self.assertEqual(
1✔
1515
            len(mail.outbox), 1, "No additional email should be sent due to throttling."
1516
        )
1517

1518
    def test_password_reset_request_case_sensitive_email(self):
1✔
1519
        # Enter a valid email with incorrect casing
1520
        self.client.post(
1✔
1521
            reverse("password_reset_request"), {"email": "MockUser@example.com"}
1522
        )
1523
        # No email should be sent due to case sensitivity
1524
        self.assertEqual(
1✔
1525
            len(mail.outbox),
1526
            0,
1527
            "Expected no email to be sent due to case-sensitive mismatch.",
1528
        )
1529
        print(
1✔
1530
            "Tested case-sensitive email matching: no email sent for mismatched case."
1531
        )
1532

1533
    def test_password_reset_request_inactive_user(self):
1✔
1534
        self.mock_user.is_active = False
1✔
1535
        self.__class__.users_table.put_item(Item=self.mock_user.__dict__)
1✔
1536

1537
        response = self.client.post(
1✔
1538
            reverse("password_reset_request"), {"email": self.mock_user.email}
1539
        )
1540
        self.assertContains(
1✔
1541
            response, "The email you entered is not registered", status_code=200
1542
        )
1543
        self.assertEqual(
1✔
1544
            len(mail.outbox), 0, "No email should be sent for an inactive user."
1545
        )
1546

1547
    @patch(
1✔
1548
        "django.contrib.auth.tokens.default_token_generator.check_token",
1549
        return_value=False,
1550
    )
1551
    def test_expired_token_password_reset_confirm(self, mock_check_token):
1✔
1552
        # Generate a valid token with the current time
1553
        token = default_token_generator.make_token(self.mock_user)
1✔
1554
        uid = urlsafe_base64_encode(force_bytes(self.mock_user.user_id))
1✔
1555

1556
        # Mock the token check to simulate an expired token
1557
        print("Simulating expired token by forcing check_token to return False.")
1✔
1558

1559
        # Attempt to reset password with the "expired" token
1560
        response = self.client.get(reverse("password_reset_confirm", args=[uid, token]))
1✔
1561
        self.assertContains(
1✔
1562
            response,
1563
            "The password reset link is invalid or has expired.",
1564
            status_code=200,
1565
        )
1566

1567
    def test_password_reset_email_content(self):
1✔
1568
        self.client.post(
1✔
1569
            reverse("password_reset_request"), {"email": self.mock_user.email}
1570
        )
1571
        self.assertEqual(len(mail.outbox), 1, "Expected one email to be sent.")
1✔
1572
        email = mail.outbox[0]
1✔
1573

1574
        # Check if email contains specific expected content
1575
        self.assertIn("reset your password", email.body.lower())
1✔
1576
        self.assertIn(
1✔
1577
            self.mock_user.username, email.body
1578
        )  # Username should be included in the email
1579
        reset_url_fragment = reverse(
1✔
1580
            "password_reset_confirm",
1581
            args=[
1582
                urlsafe_base64_encode(force_bytes(self.mock_user.user_id)),
1583
                default_token_generator.make_token(self.mock_user),
1584
            ],
1585
        )
1586
        self.assertIn(reset_url_fragment, email.body)
1✔
1587

1588
    def test_login_with_new_password_after_reset(self):
1✔
1589
        # Generate a valid token and reset the password
1590
        token = default_token_generator.make_token(self.mock_user)
1✔
1591
        uid = urlsafe_base64_encode(force_bytes(self.mock_user.user_id))
1✔
1592
        new_password = "newpassword123"
1✔
1593

1594
        response = self.client.post(
1✔
1595
            reverse("password_reset_confirm", args=[uid, token]),
1596
            {"new_password": new_password, "confirm_password": new_password},
1597
            follow=True,
1598
        )
1599
        self.assertRedirects(response, reverse("password_reset_complete"))
1✔
1600

1601
        # Now attempt to log in with the new password
1602
        response = self.client.post(
1✔
1603
            reverse("login"),
1604
            {"username": self.mock_user.username, "password": new_password},
1605
        )
1606
        self.assertRedirects(response, reverse("homepage"))
1✔
1607

1608
    def test_password_reset_confirm_invalid_uid(self):
1✔
1609
        # Generate a valid token but use an invalid UID
1610
        invalid_uid = "invalid-uid"
1✔
1611
        token = default_token_generator.make_token(self.mock_user)
1✔
1612

1613
        response = self.client.get(
1✔
1614
            reverse("password_reset_confirm", args=[invalid_uid, token])
1615
        )
1616
        self.assertContains(
1✔
1617
            response,
1618
            "The password reset link is invalid or has expired.",
1619
            status_code=200,
1620
        )
1621

1622
    def test_single_use_token(self):
1✔
1623
        # Generate a valid token and UID
1624
        token = default_token_generator.make_token(self.mock_user)
1✔
1625
        uid = urlsafe_base64_encode(force_bytes(self.mock_user.user_id))
1✔
1626

1627
        # First reset attempt with valid token
1628
        response = self.client.post(
1✔
1629
            reverse("password_reset_confirm", args=[uid, token]),
1630
            {"new_password": "newpassword123", "confirm_password": "newpassword123"},
1631
        )
1632
        self.assertRedirects(response, reverse("password_reset_complete"))
1✔
1633

1634
        # Second reset attempt with the same token should fail
1635
        response = self.client.get(reverse("password_reset_confirm", args=[uid, token]))
1✔
1636
        self.assertContains(
1✔
1637
            response,
1638
            "The password reset link is invalid or has expired.",
1639
            status_code=200,
1640
        )
1641

1642
    def test_password_reset_request_with_html_injection(self):
1✔
1643
        response = self.client.post(
1✔
1644
            reverse("password_reset_request"),
1645
            {"email": "<script>alert('xss')</script>@example.com"},
1646
        )
1647
        self.assertContains(response, "Enter a valid email address.", status_code=200)
1✔
1648
        self.assertEqual(
1✔
1649
            len(mail.outbox),
1650
            0,
1651
            "No email should be sent for an invalid email with HTML.",
1652
        )
1653

1654
    def test_password_reset_confirm_access_without_token(self):
1✔
1655
        response = self.client.get(
1✔
1656
            reverse("password_reset_confirm", args=["invalid-uid", "invalid-token"])
1657
        )
1658
        self.assertContains(
1✔
1659
            response,
1660
            "The password reset link is invalid or has expired.",
1661
            status_code=200,
1662
        )
1663

1664
    def test_password_reset_request_template(self):
1✔
1665
        response = self.client.get(reverse("password_reset_request"))
1✔
1666
        self.assertTemplateUsed(response, "password_reset_request.html")
1✔
1667
        self.assertContains(response, "Reset Password")
1✔
1668

1669
    def test_password_reset_confirm_template(self):
1✔
1670
        token = default_token_generator.make_token(self.mock_user)
1✔
1671
        uid = urlsafe_base64_encode(force_bytes(self.mock_user.user_id))
1✔
1672
        response = self.client.get(reverse("password_reset_confirm", args=[uid, token]))
1✔
1673
        self.assertTemplateUsed(response, "password_reset_confirm.html")
1✔
1674
        self.assertContains(response, "Set New Password")
1✔
1675

1676
    def test_password_reset_request_with_malformed_email(self):
1✔
1677
        # Test with a malformed email address
1678
        response = self.client.post(
1✔
1679
            reverse("password_reset_request"),
1680
            {"email": "user@.com"},
1681
        )
1682
        self.assertContains(response, "Enter a valid email address.", status_code=200)
1✔
1683
        self.assertEqual(
1✔
1684
            len(mail.outbox),
1685
            0,
1686
            "No email should be sent for a malformed email address.",
1687
        )
1688

1689
    def test_password_reset_accessibility_without_login(self):
1✔
1690
        # Access the password reset request page
1691
        response = self.client.get(reverse("password_reset_request"))
1✔
1692
        self.assertEqual(response.status_code, 200)
1✔
1693
        self.assertContains(response, "Reset Password")
1✔
1694

1695
        # Generate valid token and UID
1696
        token = default_token_generator.make_token(self.mock_user)
1✔
1697
        uid = urlsafe_base64_encode(force_bytes(self.mock_user.user_id))
1✔
1698

1699
        # Access the password reset confirmation page with a valid token
1700
        response = self.client.get(reverse("password_reset_confirm", args=[uid, token]))
1✔
1701
        self.assertEqual(response.status_code, 200)
1✔
1702
        self.assertContains(response, "Set New Password")
1✔
1703

1704
    def test_password_reset_request_with_injection_attempt(self):
1✔
1705
        injection_email = "user@example.com'; DROP TABLE Users; --"
1✔
1706
        response = self.client.post(
1✔
1707
            reverse("password_reset_request"), {"email": injection_email}
1708
        )
1709
        self.assertEqual(
1✔
1710
            len(mail.outbox), 0, "No email should be sent for an injection attempt."
1711
        )
1712
        self.assertContains(response, "Enter a valid email address.", status_code=200)
1✔
1713

1714
    def test_get_last_reset_request_time_existing_item(self):
1✔
1715
        # Insert a mock item into the password reset table
1716
        self.__class__.password_reset_table.put_item(
1✔
1717
            Item={
1718
                "user_id": self.mock_user.user_id,
1719
                "last_request_time": "2024-12-03T00:00:00Z",
1720
            }
1721
        )
1722

1723
        # Call the function and verify it retrieves the correct time
1724
        last_request_time = get_last_reset_request_time(self.mock_user.user_id)
1✔
1725
        self.assertEqual(
1✔
1726
            last_request_time,
1727
            "2024-12-03T00:00:00Z",
1728
            "The function did not return the expected last reset request time.",
1729
        )
1730

1731
        # Clean up after the test
1732
        self.__class__.password_reset_table.delete_item(
1✔
1733
            Key={"user_id": self.mock_user.user_id}
1734
        )
1735

1736
    def test_get_last_reset_request_time_missing_item(self):
1✔
1737
        # Ensure the user_id does not exist in the password reset table
1738
        self.__class__.password_reset_table.delete_item(
1✔
1739
            Key={"user_id": self.mock_user.user_id}
1740
        )
1741

1742
        # Call the function and verify it returns None
1743
        last_request_time = get_last_reset_request_time(self.mock_user.user_id)
1✔
1744
        self.assertIsNone(
1✔
1745
            last_request_time,
1746
            "The function should return None when the item does not exist.",
1747
        )
1748

1749
    @patch("FitOn.dynamodb.password_reset_table.get_item")
1✔
1750
    def test_get_last_reset_request_time_handles_exception(self, mock_get_item):
1✔
1751
        # Simulate an exception when `get_item` is called
1752
        mock_get_item.side_effect = Exception("Simulated DynamoDB error")
1✔
1753

1754
        # Call the function and verify it handles the exception gracefully
1755
        with self.assertRaises(Exception):
1✔
1756
            get_last_reset_request_time(self.mock_user.user_id)
1✔
1757

1758
    def test_update_reset_request_time_new_entry(self):
1✔
1759
        # Call the function to insert a new reset request time
1760
        update_reset_request_time(self.mock_user.user_id)
1✔
1761

1762
        # Verify the item was created in the table
1763
        response = self.__class__.password_reset_table.get_item(
1✔
1764
            Key={"user_id": self.mock_user.user_id}
1765
        )
1766
        self.assertIn("Item", response, "Expected the item to be created in the table.")
1✔
1767
        self.assertIn(
1✔
1768
            "last_request_time",
1769
            response["Item"],
1770
            "Expected the 'last_request_time' field to exist in the item.",
1771
        )
1772

1773
    def test_update_reset_request_time_update_existing_entry(self):
1✔
1774
        # Insert an existing entry
1775
        self.__class__.password_reset_table.put_item(
1✔
1776
            Item={
1777
                "user_id": self.mock_user.user_id,
1778
                "last_request_time": "2024-12-01T10:00:00Z",
1779
            }
1780
        )
1781

1782
        # Call the function to update the reset request time
1783
        update_reset_request_time(self.mock_user.user_id)
1✔
1784

1785
        # Verify the item was updated in the table
1786
        response = self.__class__.password_reset_table.get_item(
1✔
1787
            Key={"user_id": self.mock_user.user_id}
1788
        )
1789
        self.assertIn("Item", response, "Expected the item to exist in the table.")
1✔
1790
        updated_time = response["Item"].get("last_request_time")
1✔
1791
        self.assertIsNotNone(
1✔
1792
            updated_time, "Expected 'last_request_time' to be updated."
1793
        )
1794
        self.assertNotEqual(
1✔
1795
            updated_time,
1796
            "2024-12-01T10:00:00Z",
1797
            "Expected 'last_request_time' to be updated to a new value.",
1798
        )
1799

1800
    def test_update_reset_request_time_invalid_user_id(self):
1✔
1801
        # Call the function with an invalid user ID
1802
        with self.assertRaises(Exception):
1✔
1803
            update_reset_request_time(None)
1✔
1804

1805
    @patch("FitOn.dynamodb.password_reset_table.put_item")
1✔
1806
    def test_update_reset_request_time_handles_exception(self, mock_put_item):
1✔
1807
        # Simulate an exception when `put_item` is called
1808
        mock_put_item.side_effect = Exception("Simulated DynamoDB error")
1✔
1809

1810
        # Call the function and ensure it raises an exception
1811
        with self.assertRaises(Exception):
1✔
1812
            update_reset_request_time(self.mock_user.user_id)
1✔
1813

1814
    def test_update_reset_request_time_datetime_format(self):
1✔
1815
        # Call the function to insert a new reset request time
1816
        update_reset_request_time(self.mock_user.user_id)
1✔
1817

1818
        # Verify the datetime format in the table
1819
        response = self.__class__.password_reset_table.get_item(
1✔
1820
            Key={"user_id": self.mock_user.user_id}
1821
        )
1822
        self.assertIn("Item", response, "Expected the item to be created in the table.")
1✔
1823
        last_request_time = response["Item"].get("last_request_time")
1✔
1824
        self.assertIsNotNone(
1✔
1825
            last_request_time, "Expected 'last_request_time' to be present."
1826
        )
1827

1828
        # Verify the format of the datetime string
1829
        datetime.fromisoformat(last_request_time)
1✔
1830

1831
    def test_password_reset_request_missing_email(self):
1✔
1832
        # Make a POST request without providing an email
1833
        response = self.client.post(reverse("password_reset_request"), {"email": ""})
1✔
1834

1835
        # Verify the form-specific error message for empty email
1836
        self.assertContains(
1✔
1837
            response,
1838
            "This field is required.",
1839
            status_code=200,
1840
            msg_prefix="Expected a form error when email is not provided.",
1841
        )
1842

1843
        # Verify that the rendered template is correct
1844
        self.assertTemplateUsed(response, "password_reset_request.html")
1✔
1845

1846
        # Verify that no email is sent
1847
        self.assertEqual(
1✔
1848
            len(mail.outbox), 0, "No email should be sent when no email is provided."
1849
        )
1850

1851
    def test_password_reset_done_view(self):
1✔
1852
        # Make a GET request to the password_reset_done URL
1853
        response = self.client.get(reverse("password_reset_done"))
1✔
1854

1855
        # Verify the response status code
1856
        self.assertEqual(
1✔
1857
            response.status_code,
1858
            200,
1859
            "Expected status code 200 when accessing the password reset done page.",
1860
        )
1861

1862
        # Verify that the correct template is used
1863
        self.assertTemplateUsed(
1✔
1864
            response,
1865
            "password_reset_done.html",
1866
            "Expected the password_reset_done.html template to be rendered.",
1867
        )
1868

1869
        # Verify that the response contains the correct heading
1870
        self.assertContains(
1✔
1871
            response,
1872
            "Password Reset Email Sent",
1873
            msg_prefix="Expected the heading 'Password Reset Email Sent' on the password reset done page.",
1874
        )
1875

1876
        # Verify that the response contains the correct paragraph
1877
        self.assertContains(
1✔
1878
            response,
1879
            "Please check your email for a link to reset your password.",
1880
            msg_prefix="Expected the message 'Please check your email for a link to reset your password.' on the password reset done page.",
1881
        )
1882

1883
    def test_password_reset_confirm_mismatched_passwords(self):
1✔
1884
        # Generate a valid token and UID
1885
        user_id = self.mock_user.user_id
1✔
1886
        token = default_token_generator.make_token(self.mock_user)
1✔
1887
        uidb64 = urlsafe_base64_encode(force_bytes(user_id))
1✔
1888

1889
        # Post mismatched passwords to the password reset confirm view
1890
        response = self.client.post(
1✔
1891
            reverse(
1892
                "password_reset_confirm", kwargs={"uidb64": uidb64, "token": token}
1893
            ),
1894
            {
1895
                "new_password": "newpassword123",
1896
                "confirm_password": "differentpassword456",
1897
            },
1898
        )
1899

1900
        # Verify that the response contains the form error
1901
        self.assertContains(
1✔
1902
            response,
1903
            "Passwords do not match.",
1904
            status_code=200,
1905
            msg_prefix="Expected an error message when passwords do not match.",
1906
        )
1907

1908
        # Verify that the correct template is used
1909
        self.assertTemplateUsed(response, "password_reset_confirm.html")
1✔
1910

1911
        # Verify that the password is not updated (since it's a mock object, check manually)
1912
        mock_user_data = self.__class__.users_table.get_item(Key={"user_id": user_id})
1✔
1913
        self.assertIn(
1✔
1914
            "Item",
1915
            mock_user_data,
1916
            "Expected the mock user to still exist in the database.",
1917
        )
1918
        stored_password = mock_user_data["Item"].get("password")
1✔
1919
        self.assertNotEqual(
1✔
1920
            stored_password,
1921
            "newpassword123",
1922
            "Password should not be updated when passwords do not match.",
1923
        )
1924

1925
    def test_password_reset_confirm_invalid_uid_or_token(self):
1✔
1926
        # Simulate an invalid uidb64 and token
1927
        invalid_uidb64 = "invalid-uid"
1✔
1928
        invalid_token = "invalid-token"
1✔
1929

1930
        # Make a GET request to the password_reset_confirm view with invalid data
1931
        response = self.client.get(
1✔
1932
            reverse(
1933
                "password_reset_confirm",
1934
                kwargs={"uidb64": invalid_uidb64, "token": invalid_token},
1935
            )
1936
        )
1937

1938
        # Verify the status code
1939
        self.assertEqual(
1✔
1940
            response.status_code,
1941
            200,
1942
            "Expected status code 200 when accessing with an invalid UID or token.",
1943
        )
1944

1945
        # Verify the correct template is used
1946
        self.assertTemplateUsed(response, "password_reset_invalid.html")
1✔
1947

1948
        # Verify that the error message is displayed in the response
1949
        self.assertContains(
1✔
1950
            response,
1951
            "The password reset link is invalid or has expired.",
1952
            msg_prefix="Expected an error message when UID or token is invalid.",
1953
        )
1954

1955
        # Verify that the HTML structure matches the invalid page
1956
        self.assertContains(
1✔
1957
            response,
1958
            "<h2>Password Reset Error</h2>",
1959
            msg_prefix="Expected the heading 'Password Reset Error' on the invalid password reset page.",
1960
        )
1961

1962

1963
class EmailBackendTests(TestCase):
1✔
1964
    def test_testing_flag_true_uses_locmem_backend(self):
1✔
1965
        # Mock sys.argv to simulate a testing environment
1966
        with patch("sys.argv", new=["manage.py", "test"]):
1✔
1967
            settings = import_module("FitOn.settings")
1✔
1968
            reload(settings)  # Reload the module to apply changes
1✔
1969

1970
            # Check that the testing email backend is applied
1971
            self.assertEqual(
1✔
1972
                settings.EMAIL_BACKEND,
1973
                "django.core.mail.backends.locmem.EmailBackend",
1974
                "Expected locmem email backend in testing mode.",
1975
            )
1976

1977
    def test_testing_flag_false_uses_smtp_backend(self):
1✔
1978
        # Mock sys.argv to simulate a production-like environment
1979
        with patch("sys.argv", new=["manage.py", "runserver"]):
1✔
1980
            settings = import_module("FitOn.settings")
1✔
1981
            reload(settings)  # Reload the module to apply changes
1✔
1982

1983
            # Check that the production SMTP email backend is applied
1984
            self.assertEqual(
1✔
1985
                settings.EMAIL_BACKEND,
1986
                "django.core.mail.backends.smtp.EmailBackend",
1987
                "Expected SMTP email backend in production mode.",
1988
            )
1989
            self.assertEqual(
1✔
1990
                settings.EMAIL_HOST, "smtp.gmail.com", "EMAIL_HOST is incorrect."
1991
            )
1992
            self.assertEqual(settings.EMAIL_PORT, 587, "EMAIL_PORT is incorrect.")
1✔
1993
            self.assertTrue(settings.EMAIL_USE_TLS, "EMAIL_USE_TLS should be True.")
1✔
1994
            self.assertEqual(
1✔
1995
                settings.EMAIL_HOST_USER,
1996
                "fiton.notifications@gmail.com",
1997
                "EMAIL_HOST_USER is incorrect.",
1998
            )
1999
            self.assertEqual(
1✔
2000
                settings.EMAIL_HOST_PASSWORD,
2001
                "usfb imrp rhyq npif",
2002
                "EMAIL_HOST_PASSWORD is incorrect.",
2003
            )
2004

2005

2006
###########################################################
2007
#       TEST CASEs FOR VARIOUS FORMS                      #
2008
###########################################################
2009

2010

2011
class SignUpFormTest(TestCase):
1✔
2012
    def test_passwords_match(self):
1✔
2013
        form_data = {
1✔
2014
            "username": "testuser",
2015
            "email": "testuser@example.com",
2016
            "name": "Test User",
2017
            "date_of_birth": "2000-01-01",
2018
            "gender": "M",
2019
            "height": "183",
2020
            "weight": "80",
2021
            "password": "strongpassword123",
2022
            "confirm_password": "strongpassword123",
2023
        }
2024
        form = SignUpForm(data=form_data)
1✔
2025
        self.assertTrue(form.is_valid())
1✔
2026

2027
    def test_passwords_do_not_match(self):
1✔
2028
        form_data = {
1✔
2029
            "username": "testuser",
2030
            "email": "testuser@example.com",
2031
            "name": "Test User",
2032
            "date_of_birth": "2000-01-01",
2033
            "gender": "M",
2034
            "height": "183",
2035
            "weight": "80",
2036
            "password": "strongpassword123",
2037
            "confirm_password": "differentpassword",
2038
        }
2039
        form = SignUpForm(data=form_data)
1✔
2040
        self.assertFalse(form.is_valid())
1✔
2041
        self.assertIn("Passwords do not match.", form.errors["__all__"])
1✔
2042

2043

2044
class SetNewPasswordFormTest(TestCase):
1✔
2045
    def test_passwords_match(self):
1✔
2046
        form_data = {
1✔
2047
            "new_password": "newstrongpassword123",
2048
            "confirm_password": "newstrongpassword123",
2049
        }
2050
        form = SetNewPasswordForm(data=form_data)
1✔
2051
        self.assertTrue(form.is_valid())
1✔
2052

2053
    def test_passwords_do_not_match(self):
1✔
2054
        form_data = {
1✔
2055
            "new_password": "newstrongpassword123",
2056
            "confirm_password": "differentpassword",
2057
        }
2058
        form = SetNewPasswordForm(data=form_data)
1✔
2059
        self.assertFalse(form.is_valid())
1✔
2060
        self.assertIn("Passwords do not match.", form.errors["__all__"])
1✔
2061

2062

2063
class ProfileFormTest(TestCase):
1✔
2064
    def test_valid_form_with_country_code_and_phone(self):
1✔
2065
        form_data = {
1✔
2066
            "name": "John Doe",
2067
            "date_of_birth": "1990-01-01",
2068
            "email": "johndoe@example.com",
2069
            "gender": "M",  # Use a valid value from GENDER_OPTIONS
2070
            "country_code": "+1",  # Replace with a valid choice from COUNTRY_CODES
2071
            "phone_number": "1234567890",
2072
        }
2073
        form = ProfileForm(data=form_data)
1✔
2074
        self.assertTrue(form.is_valid())
1✔
2075

2076
    def test_valid_form_without_phone_and_country_code(self):
1✔
2077
        form_data = {
1✔
2078
            "name": "John Doe",
2079
            "date_of_birth": "1990-01-01",
2080
            "email": "johndoe@example.com",
2081
            "gender": "M",  # Use a valid value from GENDER_OPTIONS
2082
        }
2083
        form = ProfileForm(data=form_data)
1✔
2084
        self.assertTrue(form.is_valid())
1✔
2085

2086
    def test_invalid_phone_number_non_digits(self):
1✔
2087
        form_data = {
1✔
2088
            "name": "John Doe",
2089
            "date_of_birth": "1990-01-01",
2090
            "email": "johndoe@example.com",
2091
            "gender": "M",  # Use a valid value from GENDER_OPTIONS
2092
            "country_code": "+1",  # Replace with a valid choice from COUNTRY_CODES
2093
            "phone_number": "abcd1234",
2094
        }
2095
        form = ProfileForm(data=form_data)
1✔
2096
        self.assertFalse(form.is_valid())
1✔
2097
        self.assertIn(
1✔
2098
            "Phone number should contain only digits.", form.errors["phone_number"]
2099
        )
2100

2101
    def test_country_code_without_phone_number(self):
1✔
2102
        form_data = {
1✔
2103
            "name": "John Doe",
2104
            "date_of_birth": "1990-01-01",
2105
            "email": "johndoe@example.com",
2106
            "gender": "M",  # Use a valid value from GENDER_OPTIONS
2107
            "country_code": "+1",  # Replace with a valid choice from COUNTRY_CODES
2108
        }
2109
        form = ProfileForm(data=form_data)
1✔
2110
        self.assertFalse(form.is_valid())
1✔
2111
        self.assertIn(
1✔
2112
            "Both country code and phone number must be provided together",
2113
            form.errors["phone_number"],
2114
        )
2115
        self.assertIn(
1✔
2116
            "Both country code and phone number must be provided together",
2117
            form.errors["country_code"],
2118
        )
2119

2120
    def test_phone_number_without_country_code(self):
1✔
2121
        form_data = {
1✔
2122
            "name": "John Doe",
2123
            "date_of_birth": "1990-01-01",
2124
            "email": "johndoe@example.com",
2125
            "gender": "M",  # Use a valid value from GENDER_OPTIONS
2126
            "phone_number": "1234567890",
2127
        }
2128
        form = ProfileForm(data=form_data)
1✔
2129
        self.assertFalse(form.is_valid())
1✔
2130
        self.assertIn(
1✔
2131
            "Both country code and phone number must be provided together",
2132
            form.errors["phone_number"],
2133
        )
2134
        self.assertIn(
1✔
2135
            "Both country code and phone number must be provided together",
2136
            form.errors["country_code"],
2137
        )
2138

2139

2140
class ValidateFileExtensionTest(TestCase):
1✔
2141
    def test_valid_pdf_file(self):
1✔
2142
        valid_file = SimpleUploadedFile("document.pdf", b"file_content")
1✔
2143
        try:
1✔
2144
            validate_file_extension(valid_file)
1✔
2145
        except ValidationError:
×
2146
            self.fail("validate_file_extension raised ValidationError unexpectedly!")
×
2147

2148
    # def test_invalid_file_extension(self):
2149
    #     invalid_file = SimpleUploadedFile("document.txt", b"file_content", content_type="text/plain")
2150
    #     # Attempt to call the validation function
2151
    #     try:
2152
    #         validate_file_extension(invalid_file)
2153
    #     except ValidationError as e:
2154
    #         # Check if the exception contains the expected message
2155
    #         self.assertListEqual(e.messages, ["Only PDF files are allowed."])
2156
    #         return  # Test passed
2157
    #     self.fail("validate_file_extension did not raise ValidationError")
2158

2159

2160
############################
2161
# Tests for views #
2162
############################
2163

2164

2165
class HomepageViewTest(TestCase):
1✔
2166
    def setUp(self):
1✔
2167
        self.factory = RequestFactory()
1✔
2168

2169
    def test_homepage_with_username(self):
1✔
2170
        request = self.factory.get("/")
1✔
2171
        request.session = {"username": "sg8002"}  # Directly set the session
1✔
2172

2173
        homepage(request)
1✔
2174
        # self.assertEqual(response.status_code, 200)
2175
        # self.assertContains(
2176
        #     response, "sg8002"
2177
        # )  # Check that "JohnDoe" is in the response content
2178

2179
    def test_homepage_without_username(self):
1✔
2180
        request = self.factory.get("/")
1✔
2181
        request.session = {}  # No username in the session
1✔
2182

2183
        homepage(request)
1✔
2184
        # self.assertEqual(response.status_code, 200)
2185
        # self.assertContains(
2186
        # response, "Guest"
2187
        # )  # Check that "Guest" is in the response content
2188

2189

2190
class AddMessageTest(TestCase):
1✔
2191
    def setUp(self):
1✔
2192
        self.factory = RequestFactory()
1✔
2193

2194
    def _attach_middlewares(self, request):
1✔
2195
        """Helper method to attach SessionMiddleware and MessageMiddleware to the request."""
2196
        session_middleware = SessionMiddleware(lambda req: None)
1✔
2197
        session_middleware.process_request(request)
1✔
2198
        request.session.save()  # Save the session to initialize it
1✔
2199

2200
        message_middleware = MessageMiddleware(lambda req: None)
1✔
2201
        message_middleware.process_request(request)
1✔
2202

2203
    def test_add_message(self):
1✔
2204
        # Create a mock request
2205
        request = self.factory.get("/")
1✔
2206
        self._attach_middlewares(request)  # Attach both middlewares
1✔
2207

2208
        # Call the add_message function
2209
        add_message(request, level=25, message="Test Message")
1✔
2210

2211
        # Retrieve the messages from the request
2212
        messages = list(get_messages(request))
1✔
2213

2214
        # Assertions
2215
        self.assertEqual(len(messages), 0)
1✔
2216

2217

2218
class PerformRedirectTest(TestCase):
1✔
2219
    def setUp(self):
1✔
2220
        self.factory = RequestFactory()
1✔
2221

2222
    async def test_perform_redirect(self):
1✔
2223
        # Call the async perform_redirect function
2224
        response = await perform_redirect(
1✔
2225
            "homepage"
2226
        )  # Use a valid URL name from your project
2227

2228
        # Assertions
2229
        self.assertEqual(
1✔
2230
            response.url, "/home/"
2231
        )  # Replace "/" with the actual URL for "homepage"
2232

2233

2234
class LoginViewTest(TestCase):
1✔
2235
    def setUp(self):
1✔
2236
        self.factory = RequestFactory()
1✔
2237
        # Create a user in DynamoDB for testing
2238
        self.username = "testuser"
1✔
2239
        self.password = "correctpassword"
1✔
2240
        hashed_password = make_password(self.password)
1✔
2241
        create_user(
1✔
2242
            self.username,
2243
            self.username,
2244
            "",
2245
            "Test User",
2246
            "",
2247
            "",
2248
            "",
2249
            "",
2250
            hashed_password,
2251
        )
2252

2253
    def tearDown(self):
1✔
2254
        # Clean up the test user from DynamoDB
2255
        delete_user_by_username(self.username)
1✔
2256

2257
    def test_login_valid_user_and_password(self):
1✔
2258
        # Create a POST request with valid credentials
2259
        request = self.factory.post(
1✔
2260
            "/", {"username": self.username, "password": self.password}
2261
        )
2262
        request.session = {}
1✔
2263

2264
        # Call the login view
2265
        response = login(request)
1✔
2266

2267
        # Check if the response is a redirect to the homepage
2268
        self.assertEqual(response.status_code, 302)
1✔
2269
        self.assertEqual(response["Location"], reverse("homepage"))
1✔
2270
        self.assertIn("username", request.session)
1✔
2271
        self.assertEqual(request.session["username"], self.username)
1✔
2272

2273
    def test_login_user_does_not_exist(self):
1✔
2274
        # Create a POST request with a non-existent username
2275
        request = self.factory.post(
1✔
2276
            "/", {"username": "nonexistentuser", "password": "password"}
2277
        )
2278
        request.session = {}
1✔
2279

2280
        # Call the login view
2281
        response = login(request)
1✔
2282

2283
        # Check for the correct error message in the response
2284
        self.assertEqual(response.status_code, 200)
1✔
2285
        self.assertContains(response, "User does not exist.")
1✔
2286

2287
    def test_login_invalid_password(self):
1✔
2288
        # Create a POST request with an incorrect password
2289
        request = self.factory.post(
1✔
2290
            "/", {"username": self.username, "password": "wrongpassword"}
2291
        )
2292
        request.session = {}
1✔
2293

2294
        # Call the login view
2295
        response = login(request)
1✔
2296

2297
        # Check for the correct error message in the response
2298
        self.assertEqual(response.status_code, 200)
1✔
2299
        self.assertContains(response, "Invalid password. Please try again.")
1✔
2300

2301

2302
class CustomLogoutViewTest(TestCase):
1✔
2303
    def setUp(self):
1✔
2304
        self.factory = RequestFactory()
1✔
2305

2306
    def _attach_session(self, request):
1✔
2307
        """Helper method to attach a session to the request using SessionMiddleware."""
2308
        middleware = SessionMiddleware(
1✔
2309
            lambda req: None
2310
        )  # Pass a dummy get_response function
2311
        middleware.process_request(request)
1✔
2312
        request.session.save()  # Save the session to initialize it
1✔
2313

2314
    def test_custom_logout(self):
1✔
2315
        # Create a mock request and attach a session
2316
        request = self.factory.get("/")
1✔
2317
        self._attach_session(request)
1✔
2318

2319
        # Set some session data
2320
        request.session["username"] = "testuser"
1✔
2321
        request.session["user_id"] = "123"
1✔
2322

2323
        # Call the custom_logout view
2324
        response = custom_logout(request)
1✔
2325

2326
        # Assertions
2327
        self.assertEqual(response.status_code, 302)  # Check for redirect
1✔
2328
        self.assertEqual(response["Location"], reverse("login"))  # Check redirect URL
1✔
2329

2330
        # Check that the session is flushed (no data should remain)
2331
        self.assertNotIn("username", request.session)
1✔
2332
        self.assertNotIn("user_id", request.session)
1✔
2333

2334
        # Check cache control headers
2335
        self.assertEqual(
1✔
2336
            response["Cache-Control"], "no-cache, no-store, must-revalidate"
2337
        )
2338
        self.assertEqual(response["Pragma"], "no-cache")
1✔
2339
        self.assertEqual(response["Expires"], "0")
1✔
2340

2341

2342
class SignUpViewTest(TestCase):
1✔
2343
    def setUp(self):
1✔
2344
        self.factory = RequestFactory()
1✔
2345
        # User data for testing
2346
        self.username = "newuser"
1✔
2347
        self.email = "newuser@example.com"
1✔
2348
        self.name = "New User"
1✔
2349
        self.date_of_birth = "2000-01-01"
1✔
2350
        self.gender = "M"
1✔
2351
        self.height = ("180",)
1✔
2352
        self.weight = "83"
1✔
2353
        self.password = "newpassword"
1✔
2354

2355
    def tearDown(self):
1✔
2356
        # Clean up the test user from DynamoDB
2357
        delete_user_by_username(self.username)
1✔
2358

2359
    def test_signup_valid_data(self):
1✔
2360
        # Create a POST request with valid sign-up data
2361
        request = self.factory.post(
1✔
2362
            "/signup/",
2363
            {  # Use "/signup/" instead of "/"
2364
                "username": self.username,
2365
                "email": self.email,
2366
                "name": self.name,
2367
                "date_of_birth": self.date_of_birth,
2368
                "gender": self.gender,
2369
                "height": self.height,
2370
                "weight": self.weight,
2371
                "password": self.password,
2372
                "confirm_password": self.password,  # Ensure passwords match
2373
            },
2374
        )
2375
        request.session = {}
1✔
2376

2377
        # Call the signup view
2378
        response = signup(request)
1✔
2379

2380
        # print(response)
2381

2382
        # Check if the response is a redirect to the homepage
2383
        self.assertEqual(response.status_code, 302, "Expected redirect did not occur.")
1✔
2384
        self.assertEqual(response.url, reverse("homepage"))
1✔
2385
        self.assertIn("username", request.session)
1✔
2386
        self.assertEqual(request.session["username"], self.username)
1✔
2387

2388
    def test_signup_invalid_data(self):
1✔
2389
        # Create a POST request with invalid data (e.g., missing required fields)
2390
        request = self.factory.post(
1✔
2391
            "/signup/",
2392
            {
2393
                "username": "",
2394
                "email": "invalidemail",
2395
                "name": "",
2396
                "date_of_birth": "",
2397
                "gender": "",
2398
                "height": "",
2399
                "weight": "",
2400
                "password": "short",
2401
                "confirm_password": "different",  # Passwords do not match
2402
            },
2403
        )
2404
        request.session = {}
1✔
2405

2406
        # Call the signup view
2407
        response = signup(request)
1✔
2408

2409
        # Check that the response does not redirect and the form has errors
2410
        self.assertEqual(response.status_code, 200)
1✔
2411
        self.assertIn("This field is required.", response.content.decode())
1✔
2412
        self.assertIn("Enter a valid email address.", response.content.decode())
1✔
2413
        self.assertIn("Passwords do not match.", response.content.decode())
1✔
2414

2415

2416
class ForumViewTests(TestCase):
1✔
2417
    def setUp(self):
1✔
2418
        # Initialize DynamoDB resources and tables
2419
        self.dynamodb = boto3.resource("dynamodb", region_name="us-west-2")
1✔
2420
        self.users_table = self.dynamodb.Table("Users")
1✔
2421
        self.threads_table = self.dynamodb.Table("ForumThreads")
1✔
2422
        self.posts_table = self.dynamodb.Table("ForumPosts")
1✔
2423

2424
        # Add test user
2425
        self.user_data = {
1✔
2426
            "user_id": "test_user_123",
2427
            "username": "test_user123",
2428
            "email": "test_user@example.com",
2429
            "name": "Test User",
2430
            "date_of_birth": "1990-01-01",
2431
            "gender": "O",
2432
            "is_banned": False,
2433
            "is_muted": False,
2434
            "password": "hashed_password",
2435
        }
2436
        self.users_table.put_item(Item=self.user_data)
1✔
2437

2438
        # Add test threads
2439
        self.test_threads = [
1✔
2440
            {
2441
                "ThreadID": "101",
2442
                "Title": "General Thread",
2443
                "UserID": "test_user_123",
2444
                "Section": "General",
2445
                "CreatedAt": "2024-11-01T10:00:00",
2446
            },
2447
            {
2448
                "ThreadID": "102",
2449
                "Title": "Workout Advice",
2450
                "UserID": "test_user_123",
2451
                "Section": "Workout Suggestions",
2452
                "CreatedAt": "2024-11-02T11:00:00",
2453
            },
2454
        ]
2455
        for thread in self.test_threads:
1✔
2456
            self.threads_table.put_item(Item=thread)
1✔
2457

2458
    def add_middleware(self, request):
1✔
2459
        """
2460
        Helper function to add session and message middleware to the request.
2461
        """
2462
        # Pass a no-op lambda as the get_response callable
2463
        middleware = SessionMiddleware(lambda r: None)
1✔
2464
        middleware.process_request(request)
1✔
2465
        request.session.save()
1✔
2466

2467
        message_middleware = MessageMiddleware(lambda r: None)
1✔
2468
        message_middleware.process_request(request)
1✔
2469
        request.session.save()
1✔
2470

2471
    def test_forum_view(self):
1✔
2472
        factory = RequestFactory()
1✔
2473

2474
        # Simulate a GET request to the forums page with a valid user session
2475
        request = factory.get(
1✔
2476
            reverse("forum"),
2477
            {
2478
                "username": "test_user_123",
2479
                "type": "all",
2480
                "start_date": "",
2481
                "end_date": "",
2482
            },
2483
        )
2484
        self.add_middleware(request)
1✔
2485
        request.session["username"] = "test_user_123"
1✔
2486

2487
        # Call the forum_view function
2488
        response = forum_view(request)
1✔
2489

2490
        # Check if the response is a redirect
2491
        if response.status_code == 302:
1✔
2492
            # Assert the redirect location
2493
            self.assertIn("/login", response["Location"])
1✔
2494
            return
1✔
2495

2496
        # # Assertions for the response
2497
        # self.assertEqual(response.status_code, 200)
2498
        # self.assertIn("threads", response.context_data)
2499
        # self.assertIn("users", response.context_data)
2500
        # self.assertIn("section_stats", response.context_data)
2501

2502
        # # Assertions for threads
2503
        # threads = response.context_data["threads"]
2504
        # self.assertGreater(len(threads), 1)
2505
        # self.assertEqual(threads[0]["Title"], "General Thread")
2506
        # self.assertEqual(threads[1]["Title"], "Workout Advice")
2507

2508
        # # Assertions for users
2509
        # users = response.context_data["users"]
2510
        # self.assertGreater(len(users), 0)
2511
        # usernames = [user["username"] for user in users]
2512
        # self.assertIn("test_user_123", usernames)
2513

2514
        # # Assertions for section stats
2515
        # section_stats = response.context_data["section_stats"]
2516
        # self.assertIn("General", section_stats)
2517
        # self.assertIn("Workout Suggestions", section_stats)
2518

2519
    def tearDown(self):
1✔
2520
        # Cleanup the test data
2521
        self.users_table.delete_item(Key={"user_id": self.user_data["user_id"]})
1✔
2522
        for thread in self.test_threads:
1✔
2523
            self.threads_table.delete_item(Key={"ThreadID": thread["ThreadID"]})
1✔
2524

2525

2526
class FitnessGoalsViewTest(TestCase):
1✔
2527
    def setUp(self):
1✔
2528
        # Set up DynamoDB resource and table
2529
        self.dynamodb = boto3.resource("dynamodb", region_name="us-west-2")
1✔
2530
        self.goals_table = self.dynamodb.Table("UserGoals")
1✔
2531

2532
        # Create test user with updated username
2533
        hashed = make_password("secure_password")
1✔
2534
        create_user(
1✔
2535
            "test_user789",  # user_id
2536
            "test_user789",  # username
2537
            "test_user789@example.com",  # email
2538
            "Test User 789",  # name
2539
            "1990-01-01",  # date_of_birth
2540
            "O",  # gender
2541
            "183",  # height
2542
            "83",  # weight
2543
            hashed,  # plaintext password
2544
        )
2545

2546
        # Set user_id for the test session
2547
        self.user_id = "test_user789"
1✔
2548

2549
        # Simulate a login request to your login view
2550
        self.client = Client()
1✔
2551
        response = self.client.post(
1✔
2552
            "/login/",
2553
            {
2554
                "username": "test_user789",
2555
                "password": "secure_password",
2556
            },
2557
        )
2558

2559
        # Assert that login succeeded
2560
        assert (
1✔
2561
            response.status_code == 302
2562
        ), "Login request did not redirect as expected."
2563

2564
        # Confirm session setup
2565
        session = self.client.session
1✔
2566
        assert "user_id" in session, "User ID not found in session after login."
1✔
2567
        assert (
1✔
2568
            session["user_id"] == self.user_id
2569
        ), "Session user_id does not match the test user."
2570

2571
    def test_view_renders_goals_page(self):
1✔
2572
        response = self.client.get("/fitness-goals/")
1✔
2573
        self.assertEqual(response.status_code, 200)
1✔
2574
        self.assertTemplateUsed(response, "fitness_goals.html")
1✔
2575

2576
    def test_add_new_goal(self):
1✔
2577
        # Ensure the session is properly set up
2578
        self.assertIn("user_id", self.client.session, "User ID not found in session.")
1✔
2579
        self.assertEqual(
1✔
2580
            self.client.session["user_id"], self.user_id, "Session user_id mismatch."
2581
        )
2582

2583
        # Test adding a new goal
2584
        data = {
1✔
2585
            "goal_type": "steps",
2586
            "goal_name": "",
2587
            "goal_value": "10000",
2588
        }
2589

2590
        data2 = {
1✔
2591
            "goal_type": "weight",
2592
            "goal_name": "",
2593
            "goal_value": "80",
2594
        }
2595

2596
        data3 = {
1✔
2597
            "goal_type": "sleep",
2598
            "goal_name": "",
2599
            "goal_value": "8",
2600
        }
2601

2602
        data4 = {
1✔
2603
            "goal_type": "activity",
2604
            "goal_name": "Jogging",
2605
            "goal_value": "13500",
2606
        }
2607

2608
        data5 = {
1✔
2609
            "goal_type": "custom",
2610
            "goal_name": "Outdoor Walk",
2611
            "goal_value": "16000",
2612
        }
2613
        response = self.client.post("/fitness-goals/", data)
1✔
2614
        response = self.client.post("/fitness-goals/", data2)
1✔
2615
        response = self.client.post("/fitness-goals/", data3)
1✔
2616
        response = self.client.post("/fitness-goals/", data4)
1✔
2617
        response = self.client.post("/fitness-goals/", data5)
1✔
2618

2619
        # Assert redirect
2620
        self.assertEqual(response.status_code, 302, "Expected redirect status code.")
1✔
2621
        self.assertEqual(
1✔
2622
            response.url, reverse("fitness_goals"), "Redirect URL mismatch."
2623
        )
2624

2625
        steps = get_step_user_goals(self.user_id)
1✔
2626
        self.assertEqual(steps, "10000", f"Expected goal value '10000', got {steps}.")
1✔
2627

2628
        weight = get_weight_user_goals(self.user_id)
1✔
2629
        self.assertEqual(weight, "80", f"Expected goal value '80', got {weight}.")
1✔
2630

2631
        sleep = get_sleep_user_goals(self.user_id)
1✔
2632
        self.assertEqual(sleep, "8", f"Expected sleep goal value '8', got {sleep}.")
1✔
2633

2634
        activity = get_activity_user_goals(self.user_id)
1✔
2635
        self.assertIsNotNone(activity, "Activity goal not found")
1✔
2636

2637
        custom = get_custom_user_goals(self.user_id)
1✔
2638
        self.assertIsNotNone(custom, "Custom goal not found")
1✔
2639

2640
    def test_prevent_duplicate_goal_type(self):
1✔
2641
        # Insert an initial goal
2642
        initial_goal = {
1✔
2643
            "GoalID": str(uuid.uuid4()),
2644
            "user_id": self.user_id,
2645
            "Type": "steps",
2646
            "Name": None,
2647
            "Value": "10000",
2648
        }
2649
        self.goals_table.put_item(Item=initial_goal)
1✔
2650

2651
        # Validate the initial state
2652
        query_response = self.goals_table.query(
1✔
2653
            KeyConditionExpression=boto3.dynamodb.conditions.Key("user_id").eq(
2654
                self.user_id
2655
            )
2656
        )
2657
        initial_goals = query_response.get("Items", [])
1✔
2658
        self.assertGreater(
1✔
2659
            len(initial_goals), 0, "Initial goal was not added to DynamoDB."
2660
        )
2661

2662
        # Attempt to add a duplicate goal
2663
        data = {
1✔
2664
            "goal_type": "steps",
2665
            "goal_name": "",
2666
            "goal_value": "15000",
2667
        }
2668
        response = self.client.post("/fitness-goals/", data)
1✔
2669

2670
        # Assert redirect
2671
        self.assertEqual(response.status_code, 302, "Expected redirect status code.")
1✔
2672
        self.assertEqual(
1✔
2673
            response.url, reverse("fitness_goals"), "Redirected to an incorrect URL."
2674
        )
2675

2676
        # Check for error message in session messages
2677
        messages = list(response.wsgi_request._messages)
1✔
2678
        self.assertEqual(len(messages), 1, "Expected one error message in session.")
1✔
2679
        self.assertEqual(
1✔
2680
            str(messages[0]),
2681
            "You already have a steps goal. Please edit it instead.",
2682
            "Error message mismatch for duplicate goal prevention.",
2683
        )
2684

2685
    def test_fetch_goals_on_get(self):
1✔
2686
        # Insert a sample goal
2687
        sample_goal = {
1✔
2688
            "GoalID": str(uuid.uuid4()),
2689
            "user_id": self.user_id,
2690
            "Type": "sleep",
2691
            "Name": None,
2692
            "Value": "10",
2693
        }
2694
        self.goals_table.put_item(Item=sample_goal)
1✔
2695

2696
        # Validate the initial state
2697
        query_response = self.goals_table.query(
1✔
2698
            KeyConditionExpression=boto3.dynamodb.conditions.Key("user_id").eq(
2699
                self.user_id
2700
            )
2701
        )
2702
        goals = query_response.get("Items", [])
1✔
2703
        self.assertGreater(len(goals), 0, "Initial goal was not added to DynamoDB.")
1✔
2704

2705
        # Fetch goals via GET request
2706
        response = self.client.get(reverse("fitness_goals"))
1✔
2707
        self.assertEqual(
1✔
2708
            response.status_code, 200, "GET request to fetch goals failed."
2709
        )
2710

2711
        # Check if the inserted goal is displayed
2712
        response_content = str(response.content)
1✔
2713
        self.assertIn(
1✔
2714
            "sleep",
2715
            response_content,
2716
            "Goal type 'sleep' not found in response content.",
2717
        )
2718
        self.assertIn(
1✔
2719
            "10", response_content, "Goal value '10' not found in response content."
2720
        )
2721

2722
    def test_edit_goal(self):
1✔
2723
        # Insert an initial goal
2724
        initial_goal = {
1✔
2725
            "GoalID": str(uuid.uuid4()),
2726
            "user_id": self.user_id,
2727
            "Type": "sleep",
2728
            "Name": None,
2729
            "Value": "8",
2730
        }
2731
        self.goals_table.put_item(Item=initial_goal)
1✔
2732

2733
        # Validate the initial state
2734
        sleep = get_sleep_user_goals(self.user_id)
1✔
2735
        self.assertEqual(sleep, "8", "Initial goal value mismatch.")
1✔
2736

2737
        # Data for editing the goal
2738
        edit_data = {
1✔
2739
            "goal_id": initial_goal["GoalID"],
2740
            "goal_value": "5",
2741
        }
2742

2743
        # Make POST request to edit the goal
2744
        response = self.client.post(
1✔
2745
            reverse("edit_goal"),
2746
            data=json.dumps(edit_data),
2747
            content_type="application/json",
2748
        )
2749

2750
        # Assert the response
2751
        self.assertEqual(response.status_code, 200, "Goal edit request failed.")
1✔
2752
        self.assertEqual(
1✔
2753
            response.json()["message"],
2754
            "Goal updated successfully!",
2755
            "Success message mismatch.",
2756
        )
2757

2758
        # Validate the updated goal in DynamoDB
2759
        updated_goal_response = self.goals_table.get_item(
1✔
2760
            Key={
2761
                "GoalID": initial_goal["GoalID"],
2762
                "user_id": self.user_id,
2763
            }
2764
        )
2765
        updated_goal = updated_goal_response.get("Item")
1✔
2766
        self.assertIsNotNone(updated_goal, "Updated goal not found in DynamoDB.")
1✔
2767
        self.assertEqual(
1✔
2768
            updated_goal["Value"],
2769
            "5",
2770
            f"Expected updated value '5', got {updated_goal['Value']}.",
2771
        )
2772

2773
        # --- Delete the updated goal ---
2774
        delete_data = {
1✔
2775
            "goal_id": initial_goal["GoalID"],
2776
        }
2777

2778
        # Make POST request to delete the goal
2779
        response = self.client.post(
1✔
2780
            reverse("delete_goal"),
2781
            data=json.dumps(delete_data),
2782
            content_type="application/json",
2783
        )
2784

2785
        # Assert the deletion response
2786
        self.assertEqual(response.status_code, 200, "Goal delete request failed.")
1✔
2787
        self.assertEqual(
1✔
2788
            response.json()["message"],
2789
            "Goal deleted successfully.",
2790
            "Unexpected response message for goal deletion.",
2791
        )
2792

2793
        # Validate the goal is deleted
2794
        deleted_goal_response = self.goals_table.get_item(
1✔
2795
            Key={
2796
                "GoalID": initial_goal["GoalID"],
2797
                "user_id": self.user_id,
2798
            }
2799
        )
2800
        deleted_goal = deleted_goal_response.get("Item")
1✔
2801
        self.assertIsNone(deleted_goal, "Deleted goal still found in DynamoDB.")
1✔
2802

2803
    def tearDown(self):
1✔
2804
        response = self.goals_table.query(
1✔
2805
            KeyConditionExpression=boto3.dynamodb.conditions.Key("user_id").eq(
2806
                self.user_id
2807
            )
2808
        )
2809
        goals = response.get("Items", [])
1✔
2810

2811
        # Delete each goal
2812
        for goal in goals:
1✔
2813
            self.goals_table.delete_item(
1✔
2814
                Key={
2815
                    "GoalID": goal["GoalID"],
2816
                    "user_id": self.user_id,
2817
                }
2818
            )
2819

2820

2821
###################################################
2822
# Test Case For rds.py
2823
###################################################
2824

2825

2826
class TestConvertToMysqlDatetime(TestCase):
1✔
2827
    def test_valid_date_conversion(self):
1✔
2828
        """
2829
        Test the function with a valid date string in the specified format.
2830
        """
2831
        input_date = "Dec 03, 11 PM"
1✔
2832
        expected_output = "2024-12-03 23:00:00"
1✔
2833
        self.assertEqual(convert_to_mysql_datetime(input_date), expected_output)
1✔
2834

2835
    def test_valid_date_am(self):
1✔
2836
        """
2837
        Test the function with a valid date string for AM time.
2838
        """
2839
        input_date = "Jan 15, 9 AM"
1✔
2840
        expected_output = "2024-01-15 09:00:00"
1✔
2841
        self.assertEqual(convert_to_mysql_datetime(input_date), expected_output)
1✔
2842

2843
    def test_invalid_date_format(self):
1✔
2844
        """
2845
        Test the function with an invalid date string format.
2846
        """
2847
        input_date = "03-12-2024 23:00"
1✔
2848
        with self.assertRaises(ValueError):
1✔
2849
            convert_to_mysql_datetime(input_date)
1✔
2850

2851
    def test_empty_date_string(self):
1✔
2852
        """
2853
        Test the function with an empty date string.
2854
        """
2855
        input_date = ""
1✔
2856
        with self.assertRaises(ValueError):
1✔
2857
            convert_to_mysql_datetime(input_date)
1✔
2858

2859
    def test_incomplete_date_string(self):
1✔
2860
        """
2861
        Test the function with an incomplete date string.
2862
        """
2863
        input_date = "Dec 03"
1✔
2864
        with self.assertRaises(ValueError):
1✔
2865
            convert_to_mysql_datetime(input_date)
1✔
2866

2867

2868
class TestGetSecretRds(IsolatedAsyncioTestCase):
1✔
2869
    async def test_get_secret_rds(self):
1✔
2870
        # Expected secret values (match this with the secret in AWS Secrets Manager)
2871
        expected_secret = {
1✔
2872
            "host": "fiton.cxkgakkyk8zs.us-west-2.rds.amazonaws.com",
2873
            "database": "fiton",
2874
            "port": 3306,
2875
            "password": "Fiton#swe-2024",
2876
            "username": "admin",
2877
        }
2878

2879
        # Call the actual function
2880
        result = await get_secret_rds()
1✔
2881

2882
        # Assert the results
2883
        self.assertEqual(result["host"], expected_secret["host"])
1✔
2884
        self.assertEqual(result["database"], expected_secret["database"])
1✔
2885
        self.assertEqual(result["port"], str(expected_secret["port"]))
1✔
2886
        self.assertEqual(result["password"], expected_secret["password"])
1✔
2887
        self.assertEqual(result["username"], expected_secret["username"])
1✔
2888

2889

2890
class TestCreateConnection(IsolatedAsyncioTestCase):
1✔
2891
    async def test_create_connection(self):
1✔
2892
        """
2893
        Test the create_connection function with a live MySQL database.
2894
        Ensure the connection is successfully established.
2895
        """
2896
        conn = None
1✔
2897
        try:
1✔
2898
            # Call the create_connection function
2899
            conn = await create_connection()
1✔
2900

2901
            # Verify the connection is established
2902
            self.assertIsNotNone(conn, "Connection object is None.")
1✔
2903
            # self.assertTrue(conn.open, "Connection is not open.")
2904

2905
            # Execute a simple query to test the connection
2906
            async with conn.cursor() as cursor:
1✔
2907
                await cursor.execute("SELECT DATABASE();")
1✔
2908
                result = await cursor.fetchone()
1✔
2909
                self.assertIsNotNone(result, "Query returned no result.")
1✔
2910
                self.assertEqual(result[0], "fiton", "Connected to the wrong database.")
1✔
2911

2912
        except aiomysql.Error as e:
×
2913
            self.fail(f"Database connection failed: {e}")
×
2914

2915
        finally:
2916
            # Ensure the connection is closed even if an exception occurs
2917
            if conn:
1✔
2918
                conn.close()
1✔
2919

2920

2921
# Assuming create_table and insert_data are imported
2922

2923

2924
class TestDatabaseOperations(IsolatedAsyncioTestCase):
1✔
2925
    async def asyncSetUp(self):
1✔
2926
        """Set up a database connection before each test."""
2927
        self.conn = await create_connection()
1✔
2928

2929
    async def asyncTearDown(self):
1✔
2930
        """Clean up database connection after each test."""
2931
        async with self.conn.cursor() as cursor:
1✔
2932
            # Drop the test table to clean up
2933
            await cursor.execute("DROP TABLE IF EXISTS test_table;")
1✔
2934
        await self.conn.commit()
1✔
2935
        self.conn.close()
1✔
2936

2937
    async def test_create_table(self):
1✔
2938
        """Test the create_table function."""
2939
        table_sql = """
1✔
2940
        CREATE TABLE test_table (
2941
            id INT AUTO_INCREMENT PRIMARY KEY,
2942
            name VARCHAR(100) NOT NULL,
2943
            age INT NOT NULL
2944
        );
2945
        """
2946
        # Call the function to create the table
2947
        await create_table(self.conn, table_sql)
1✔
2948

2949
        # Verify the table was created
2950
        async with self.conn.cursor() as cursor:
1✔
2951
            await cursor.execute("SHOW TABLES LIKE 'test_table';")
1✔
2952
            result = await cursor.fetchone()
1✔
2953
            self.assertIsNotNone(result, "Table test_table was not created.")
1✔
2954

2955
    async def test_insert_data(self):
1✔
2956
        """Test the insert_data function."""
2957
        # Create the test table
2958
        table_sql = """
1✔
2959
        CREATE TABLE test_table (
2960
            id INT AUTO_INCREMENT PRIMARY KEY,
2961
            name VARCHAR(100) NOT NULL,
2962
            age INT NOT NULL
2963
        );
2964
        """
2965
        await create_table(self.conn, table_sql)
1✔
2966

2967
        # Insert data into the table
2968
        insert_query = "INSERT INTO test_table (name, age) VALUES (%s, %s);"
1✔
2969
        test_data = ("Alice", 30)
1✔
2970
        await insert_data(self.conn, insert_query, test_data)
1✔
2971

2972
        # Verify the data was inserted
2973
        async with self.conn.cursor() as cursor:
1✔
2974
            await cursor.execute(
1✔
2975
                "SELECT * FROM test_table WHERE name = %s;", (test_data[0],)
2976
            )
2977
            result = await cursor.fetchone()
1✔
2978

2979
        self.assertIsNotNone(result, "Data was not inserted into test_table.")
1✔
2980
        self.assertEqual(result[1], "Alice", "Inserted name does not match.")
1✔
2981
        self.assertEqual(result[2], 30, "Inserted age does not match.")
1✔
2982

2983

2984
class TestHealthMetricTables(IsolatedAsyncioTestCase):
1✔
2985
    async def asyncSetUp(self):
1✔
2986
        """Set up a database connection before each test."""
2987
        self.conn = await create_connection()
1✔
2988

2989
    async def asyncTearDown(self):
1✔
2990
        """Clean up test case entries after each test."""
2991
        async with self.conn.cursor() as cursor:
1✔
2992
            # Drop only test-specific tables
2993
            tables = [
1✔
2994
                "test_steps",
2995
                "test_heart_rate",
2996
                "test_resting_heart_rate",
2997
                "test_oxygen",
2998
                "test_glucose",
2999
                "test_pressure",
3000
            ]
3001
            for table in tables:
1✔
3002
                await cursor.execute(f"DROP TABLE IF EXISTS {table};")
1✔
3003
        await self.conn.commit()
1✔
3004
        self.conn.close()
1✔
3005

3006
    async def test_create_steps_table(self):
1✔
3007
        """Test the creation of the STEPS table."""
3008
        table_name = "test_steps"  # Use a test-specific table name
1✔
3009
        await create_steps_table(self.conn, table_name)
1✔
3010
        async with self.conn.cursor() as cursor:
1✔
3011
            await cursor.execute(f"SHOW TABLES LIKE '{table_name}';")
1✔
3012
            result = await cursor.fetchone()
1✔
3013
        self.assertIsNotNone(result, f"Table {table_name} was not created.")
1✔
3014

3015
    async def test_create_heartRate_table(self):
1✔
3016
        """Test the creation of the HEART_RATE table."""
3017
        table_name = "test_heart_rate"
1✔
3018
        await create_heartRate_table(self.conn, table_name)
1✔
3019
        async with self.conn.cursor() as cursor:
1✔
3020
            await cursor.execute(f"SHOW TABLES LIKE '{table_name}';")
1✔
3021
            result = await cursor.fetchone()
1✔
3022
        self.assertIsNotNone(result, f"Table {table_name} was not created.")
1✔
3023

3024
    async def test_create_restingHeartRate_table(self):
1✔
3025
        """Test the creation of the RESTING_HEART_RATE table."""
3026
        table_name = "test_resting_heart_rate"
1✔
3027
        await create_restingHeartRate_table(self.conn, table_name)
1✔
3028
        async with self.conn.cursor() as cursor:
1✔
3029
            await cursor.execute(f"SHOW TABLES LIKE '{table_name}';")
1✔
3030
            result = await cursor.fetchone()
1✔
3031
        self.assertIsNotNone(result, f"Table {table_name} was not created.")
1✔
3032

3033
    async def test_create_oxygen_table(self):
1✔
3034
        """Test the creation of the OXYGEN table."""
3035
        table_name = "test_oxygen"
1✔
3036
        await create_oxygen_table(self.conn, table_name)
1✔
3037
        async with self.conn.cursor() as cursor:
1✔
3038
            await cursor.execute(f"SHOW TABLES LIKE '{table_name}';")
1✔
3039
            result = await cursor.fetchone()
1✔
3040
        self.assertIsNotNone(result, f"Table {table_name} was not created.")
1✔
3041

3042
    async def test_create_glucose_table(self):
1✔
3043
        """Test the creation of the GLUCOSE table."""
3044
        table_name = "test_glucose"
1✔
3045
        await create_glucose_table(self.conn, table_name)
1✔
3046
        async with self.conn.cursor() as cursor:
1✔
3047
            await cursor.execute(f"SHOW TABLES LIKE '{table_name}';")
1✔
3048
            result = await cursor.fetchone()
1✔
3049
        self.assertIsNotNone(result, f"Table {table_name} was not created.")
1✔
3050

3051
    async def test_create_pressure_table(self):
1✔
3052
        """Test the creation of the PRESSURE table."""
3053
        table_name = "test_pressure"
1✔
3054
        await create_pressure_table(self.conn, table_name)
1✔
3055
        async with self.conn.cursor() as cursor:
1✔
3056
            await cursor.execute(f"SHOW TABLES LIKE '{table_name}';")
1✔
3057
            result = await cursor.fetchone()
1✔
3058
        self.assertIsNotNone(result, f"Table {table_name} was not created.")
1✔
3059

3060

3061
class TestInsertIntoMetricTables(IsolatedAsyncioTestCase):
1✔
3062
    async def asyncSetUp(self):
1✔
3063
        """Set up a database connection before each test."""
3064
        self.conn = await create_connection()
1✔
3065

3066
    async def asyncTearDown(self):
1✔
3067
        """Clean up database connection after each test."""
3068
        async with self.conn.cursor() as cursor:
1✔
3069
            # Drop test-specific tables
3070
            tables = [
1✔
3071
                "test_steps",
3072
                "test_heart_rate",
3073
                "test_resting_heart_rate",
3074
                "test_oxygen",
3075
                "test_glucose",
3076
                "test_pressure",
3077
            ]
3078
            for table in tables:
1✔
3079
                await cursor.execute(f"DROP TABLE IF EXISTS {table};")
1✔
3080
        await self.conn.commit()
1✔
3081
        self.conn.close()
1✔
3082

3083
    async def test_insert_into_steps_table(self):
1✔
3084
        """Test inserting data into the test-specific STEPS table."""
3085
        table_name = "test_steps"
1✔
3086
        await create_steps_table(self.conn, table_name)
1✔
3087

3088
        email = "test_user2@example.com"
1✔
3089
        start_time = "Dec 03, 9 AM"
1✔
3090
        end_time = "Dec 03, 10 AM"
1✔
3091
        count = 1000
1✔
3092

3093
        await insert_into_steps_table(
1✔
3094
            self.conn, email, start_time, end_time, count, table_name
3095
        )
3096

3097
        async with self.conn.cursor() as cursor:
1✔
3098
            await cursor.execute(
1✔
3099
                f"SELECT * FROM {table_name} WHERE email = %s;", (email,)
3100
            )
3101
            result = await cursor.fetchone()
1✔
3102

3103
        self.assertIsNotNone(result, f"Data was not inserted into {table_name} table.")
1✔
3104
        self.assertEqual(result[0], email, "Email does not match.")
1✔
3105
        self.assertEqual(result[3], count, "Step count does not match.")
1✔
3106

3107
    async def test_insert_into_heartRate_table(self):
1✔
3108
        """Test inserting data into the test-specific HEART_RATE table."""
3109
        table_name = "test_heart_rate"
1✔
3110
        await create_heartRate_table(self.conn, table_name)
1✔
3111

3112
        email = "test_user@example.com"
1✔
3113
        start_time = "Dec 03, 9 AM"
1✔
3114
        end_time = "Dec 03, 10 AM"
1✔
3115
        count = 80
1✔
3116

3117
        await insert_into_heartRate_table(
1✔
3118
            self.conn, email, start_time, end_time, count, table_name
3119
        )
3120

3121
        async with self.conn.cursor() as cursor:
1✔
3122
            await cursor.execute(
1✔
3123
                f"SELECT * FROM {table_name} WHERE email = %s;", (email,)
3124
            )
3125
            result = await cursor.fetchone()
1✔
3126

3127
        self.assertIsNotNone(result, f"Data was not inserted into {table_name} table.")
1✔
3128
        self.assertEqual(result[0], email, "Email does not match.")
1✔
3129
        self.assertEqual(result[3], count, "Heart rate count does not match.")
1✔
3130

3131
    async def test_insert_into_restingHeartRate_table(self):
1✔
3132
        """Test inserting data into the test-specific RESTING_HEART_RATE table."""
3133
        table_name = "test_resting_heart_rate"
1✔
3134
        await create_restingHeartRate_table(self.conn, table_name)
1✔
3135

3136
        email = "test_user@example.com"
1✔
3137
        start_time = "Dec 03, 9 AM"
1✔
3138
        end_time = "Dec 03, 10 AM"
1✔
3139
        count = 60
1✔
3140

3141
        await insert_into_restingHeartRate_table(
1✔
3142
            self.conn, email, start_time, end_time, count, table_name
3143
        )
3144

3145
        async with self.conn.cursor() as cursor:
1✔
3146
            await cursor.execute(
1✔
3147
                f"SELECT * FROM {table_name} WHERE email = %s;", (email,)
3148
            )
3149
            result = await cursor.fetchone()
1✔
3150

3151
        self.assertIsNotNone(result, f"Data was not inserted into {table_name} table.")
1✔
3152
        self.assertEqual(result[0], email, "Email does not match.")
1✔
3153
        self.assertEqual(result[3], count, "Resting heart rate count does not match.")
1✔
3154

3155
    async def test_insert_into_oxygen_table(self):
1✔
3156
        """Test inserting data into the test-specific OXYGEN table."""
3157
        table_name = "test_oxygen"
1✔
3158
        await create_oxygen_table(self.conn, table_name)
1✔
3159

3160
        email = "test_user@example.com"
1✔
3161
        start_time = "Dec 03, 9 AM"
1✔
3162
        end_time = "Dec 03, 10 AM"
1✔
3163
        count = 95
1✔
3164

3165
        await insert_into_oxygen_table(
1✔
3166
            self.conn, email, start_time, end_time, count, table_name
3167
        )
3168

3169
        async with self.conn.cursor() as cursor:
1✔
3170
            await cursor.execute(
1✔
3171
                f"SELECT * FROM {table_name} WHERE email = %s;", (email,)
3172
            )
3173
            result = await cursor.fetchone()
1✔
3174

3175
        self.assertIsNotNone(result, f"Data was not inserted into {table_name} table.")
1✔
3176
        self.assertEqual(result[0], email, "Email does not match.")
1✔
3177
        self.assertEqual(result[3], count, "Oxygen level count does not match.")
1✔
3178

3179
    async def test_insert_into_glucose_table(self):
1✔
3180
        """Test inserting data into the test-specific GLUCOSE table."""
3181
        table_name = "test_glucose"
1✔
3182
        await create_glucose_table(self.conn, table_name)
1✔
3183

3184
        email = "test_user@example.com"
1✔
3185
        start_time = "Dec 03, 9 AM"
1✔
3186
        end_time = "Dec 03, 10 AM"
1✔
3187
        count = 120
1✔
3188

3189
        await insert_into_glucose_table(
1✔
3190
            self.conn, email, start_time, end_time, count, table_name
3191
        )
3192

3193
        async with self.conn.cursor() as cursor:
1✔
3194
            await cursor.execute(
1✔
3195
                f"SELECT * FROM {table_name} WHERE email = %s;", (email,)
3196
            )
3197
            result = await cursor.fetchone()
1✔
3198

3199
        self.assertIsNotNone(result, f"Data was not inserted into {table_name} table.")
1✔
3200
        self.assertEqual(result[0], email, "Email does not match.")
1✔
3201
        self.assertEqual(result[3], count, "Glucose level count does not match.")
1✔
3202

3203
    async def test_insert_into_pressure_table(self):
1✔
3204
        """Test inserting data into the test-specific PRESSURE table."""
3205
        table_name = "test_pressure"
1✔
3206
        await create_pressure_table(self.conn, table_name)
1✔
3207

3208
        email = "test_user@example.com"
1✔
3209
        start_time = "Dec 03, 9 AM"
1✔
3210
        end_time = "Dec 03, 10 AM"
1✔
3211
        count = 120
1✔
3212

3213
        await insert_into_pressure_table(
1✔
3214
            self.conn, email, start_time, end_time, count, table_name
3215
        )
3216

3217
        async with self.conn.cursor() as cursor:
1✔
3218
            await cursor.execute(
1✔
3219
                f"SELECT * FROM {table_name} WHERE email = %s;", (email,)
3220
            )
3221
            result = await cursor.fetchone()
1✔
3222

3223
        self.assertIsNotNone(result, f"Data was not inserted into {table_name} table.")
1✔
3224
        self.assertEqual(result[0], email, "Email does not match.")
1✔
3225
        self.assertEqual(result[3], count, "Pressure count does not match.")
1✔
3226

3227

3228
class TestInsertIntoAllTables(IsolatedAsyncioTestCase):
1✔
3229
    async def asyncSetUp(self):
1✔
3230
        """Set up a database connection before each test."""
3231
        self.conn = await create_connection()
1✔
3232

3233
    async def asyncTearDown(self):
1✔
3234
        """Clean up database connection after each test."""
3235
        async with self.conn.cursor() as cursor:
1✔
3236
            # Drop test-specific tables
3237
            tables = [
1✔
3238
                "test_steps",
3239
                "test_heart_rate",
3240
                "test_resting_heart_rate",
3241
                "test_oxygen",
3242
                "test_glucose",
3243
                "test_pressure",
3244
            ]
3245
            for table in tables:
1✔
3246
                await cursor.execute(f"DROP TABLE IF EXISTS {table};")
1✔
3247
        await self.conn.commit()
1✔
3248
        self.conn.close()
1✔
3249

3250
    async def test_insert_into_tables(self):
1✔
3251
        """Test the main function for inserting data into all tables."""
3252
        email = "test_user@example.com"
1✔
3253

3254
        # Test data to insert into tables
3255
        total_data = {
1✔
3256
            "steps": {
3257
                "2024-12-03": [
3258
                    {"start": "Dec 03, 9 AM", "end": "Dec 03, 10 AM", "count": 1000}
3259
                ]
3260
            },
3261
            "heartRate": {
3262
                "2024-12-03": [
3263
                    {"start": "Dec 03, 10 AM", "end": "Dec 03, 11 AM", "count": 80}
3264
                ]
3265
            },
3266
            "restingHeartRate": {
3267
                "2024-12-03": [
3268
                    {"start": "Dec 03, 9 AM", "end": "Dec 03, 10 AM", "count": 60}
3269
                ]
3270
            },
3271
            "oxygen": {
3272
                "2024-12-03": [
3273
                    {"start": "Dec 03, 9 AM", "end": "Dec 03, 10 AM", "count": 95}
3274
                ]
3275
            },
3276
            "glucose": {
3277
                "2024-12-03": [
3278
                    {"start": "Dec 03, 9 AM", "end": "Dec 03, 10 AM", "count": 120}
3279
                ]
3280
            },
3281
            "pressure": {
3282
                "2024-12-03": [
3283
                    {"start": "Dec 03, 9 AM", "end": "Dec 03, 10 AM", "count": 120}
3284
                ]
3285
            },
3286
        }
3287

3288
        # Test-specific table names
3289
        table_names = {
1✔
3290
            "steps": "test_steps",
3291
            "heartRate": "test_heart_rate",
3292
            "restingHeartRate": "test_resting_heart_rate",
3293
            "oxygen": "test_oxygen",
3294
            "glucose": "test_glucose",
3295
            "pressure": "test_pressure",
3296
        }
3297

3298
        # Create test-specific tables
3299
        await create_steps_table(self.conn, table_names["steps"])
1✔
3300
        await create_heartRate_table(self.conn, table_names["heartRate"])
1✔
3301
        await create_restingHeartRate_table(self.conn, table_names["restingHeartRate"])
1✔
3302
        await create_oxygen_table(self.conn, table_names["oxygen"])
1✔
3303
        await create_glucose_table(self.conn, table_names["glucose"])
1✔
3304
        await create_pressure_table(self.conn, table_names["pressure"])
1✔
3305

3306
        # Call the original function with test-specific table names
3307
        await insert_into_tables(email, total_data, table_names)
1✔
3308

3309
        # Verify data was inserted into all test-specific tables
3310
        async with self.conn.cursor() as cursor:
1✔
3311
            # Check STEPS table
3312
            await cursor.execute(
1✔
3313
                f"SELECT * FROM {table_names['steps']} WHERE email = %s;", (email,)
3314
            )
UNCOV
3315
            steps_result = await cursor.fetchone()
×
UNCOV
3316
            self.assertIsNotNone(
×
3317
                steps_result, "Data was not inserted into test_steps table."
3318
            )
UNCOV
3319
            self.assertEqual(steps_result[3], 1000, "Step count does not match.")
×
3320

3321
            # Check HEART_RATE table
UNCOV
3322
            await cursor.execute(
×
3323
                f"SELECT * FROM {table_names['heartRate']} WHERE email = %s;", (email,)
3324
            )
UNCOV
3325
            heart_rate_result = await cursor.fetchone()
×
UNCOV
3326
            self.assertIsNotNone(
×
3327
                heart_rate_result, "Data was not inserted into test_heart_rate table."
3328
            )
UNCOV
3329
            self.assertEqual(
×
3330
                heart_rate_result[3], 80, "Heart rate count does not match."
3331
            )
3332

3333
            # Check RESTING_HEART_RATE table
UNCOV
3334
            await cursor.execute(
×
3335
                f"SELECT * FROM {table_names['restingHeartRate']} WHERE email = %s;",
3336
                (email,),
3337
            )
UNCOV
3338
            resting_heart_rate_result = await cursor.fetchone()
×
UNCOV
3339
            self.assertIsNotNone(
×
3340
                resting_heart_rate_result,
3341
                "Data was not inserted into test_resting_heart_rate table.",
3342
            )
UNCOV
3343
            self.assertEqual(
×
3344
                resting_heart_rate_result[3],
3345
                60,
3346
                "Resting heart rate count does not match.",
3347
            )
3348

3349
            # Check OXYGEN table
UNCOV
3350
            await cursor.execute(
×
3351
                f"SELECT * FROM {table_names['oxygen']} WHERE email = %s;", (email,)
3352
            )
UNCOV
3353
            oxygen_result = await cursor.fetchone()
×
UNCOV
3354
            self.assertIsNotNone(
×
3355
                oxygen_result, "Data was not inserted into test_oxygen table."
3356
            )
UNCOV
3357
            self.assertEqual(oxygen_result[3], 95, "Oxygen count does not match.")
×
3358

3359
            # Check GLUCOSE table
UNCOV
3360
            await cursor.execute(
×
3361
                f"SELECT * FROM {table_names['glucose']} WHERE email = %s;", (email,)
3362
            )
UNCOV
3363
            glucose_result = await cursor.fetchone()
×
UNCOV
3364
            self.assertIsNotNone(
×
3365
                glucose_result, "Data was not inserted into test_glucose table."
3366
            )
UNCOV
3367
            self.assertEqual(glucose_result[3], 120, "Glucose count does not match.")
×
3368

3369
            # Check PRESSURE table
UNCOV
3370
            await cursor.execute(
×
3371
                f"SELECT * FROM {table_names['pressure']} WHERE email = %s;", (email,)
3372
            )
UNCOV
3373
            pressure_result = await cursor.fetchone()
×
UNCOV
3374
            self.assertIsNotNone(
×
3375
                pressure_result, "Data was not inserted into test_pressure table."
3376
            )
UNCOV
3377
            self.assertEqual(pressure_result[3], 120, "Pressure count does not match.")
×
3378

3379

3380
class TestShowTable(IsolatedAsyncioTestCase):
1✔
3381
    async def asyncSetUp(self):
1✔
3382
        """Set up a database connection before each test."""
3383
        self.conn = await create_connection()
1✔
3384

3385
    async def asyncTearDown(self):
1✔
3386
        """Clean up database connection after each test."""
3387
        async with self.conn.cursor() as cursor:
1✔
3388
            # Drop test-specific tables
3389
            tables = ["test_display"]
1✔
3390
            for table in tables:
1✔
3391
                await cursor.execute(f"DROP TABLE IF EXISTS {table};")
1✔
3392
        await self.conn.commit()
1✔
3393
        self.conn.close()
1✔
3394

3395
    async def test_show_table(self):
1✔
3396
        """Test the show_table function to display data."""
3397
        table_name = "test_display"
1✔
3398

3399
        # Create a test table
3400
        create_table_query = f"""
1✔
3401
        CREATE TABLE {table_name} (
3402
            id INT AUTO_INCREMENT PRIMARY KEY,
3403
            name VARCHAR(100) NOT NULL,
3404
            age INT NOT NULL
3405
        );
3406
        """
3407
        async with self.conn.cursor() as cursor:
1✔
3408
            await cursor.execute(create_table_query)
1✔
3409

3410
        # Insert test data into the table
3411
        test_data = [("Alice", 30), ("Bob", 25), ("Charlie", 35)]
1✔
3412
        insert_query = f"INSERT INTO {table_name} (name, age) VALUES (%s, %s)"
1✔
3413
        async with self.conn.cursor() as cursor:
1✔
3414
            await cursor.executemany(insert_query, test_data)
1✔
3415

3416
        await self.conn.commit()
1✔
3417

3418
        # Call the show_table function
3419
        async with self.conn.cursor() as cursor:
1✔
3420
            await cursor.execute(f"SELECT * FROM {table_name}")
1✔
3421
            rows = await cursor.fetchall()
1✔
3422

3423
        # Assertions to verify the data is displayed correctly
3424
        self.assertEqual(len(rows), 3, f"Expected 3 rows, but got {len(rows)}.")
1✔
3425
        self.assertEqual(rows[0][1], "Alice", "First row name does not match.")
1✔
3426
        self.assertEqual(rows[1][1], "Bob", "Second row name does not match.")
1✔
3427
        self.assertEqual(rows[2][1], "Charlie", "Third row name does not match.")
1✔
3428

3429

3430
class TestShowTableWrappers(IsolatedAsyncioTestCase):
1✔
3431
    async def asyncSetUp(self):
1✔
3432
        """Set up a database connection before each test."""
3433
        self.conn = await create_connection()
1✔
3434

3435
    async def asyncTearDown(self):
1✔
3436
        """Clean up database connection after each test."""
3437
        async with self.conn.cursor() as cursor:
1✔
3438
            # Drop test-specific tables
3439
            tables = [
1✔
3440
                "test_steps",
3441
                "test_heart_rate",
3442
                "test_resting_heart_rate",
3443
                "test_oxygen",
3444
                "test_glucose",
3445
                "test_pressure",
3446
            ]
3447
            for table in tables:
1✔
3448
                await cursor.execute(f"DROP TABLE IF EXISTS {table};")
1✔
3449
        await self.conn.commit()
1✔
3450
        self.conn.close()
1✔
3451

3452
    async def test_show_steps_table(self):
1✔
3453
        """Test showing data from the test-specific STEPS table."""
3454
        table_name = "test_steps"
1✔
3455
        await create_steps_table(self.conn, table_name)
1✔
3456

3457
        # Insert test data
3458
        test_data = [
1✔
3459
            (
3460
                "test_user@example.com",
3461
                "2024-12-03 09:00:00",
3462
                "2024-12-03 10:00:00",
3463
                1000,
3464
            )
3465
        ]
3466
        insert_query = f"INSERT INTO {table_name} (email, start_time, end_time, count) VALUES (%s, %s, %s, %s)"
1✔
3467
        async with self.conn.cursor() as cursor:
1✔
3468
            await cursor.executemany(insert_query, test_data)
1✔
3469
        await self.conn.commit()
1✔
3470

3471
        # Call the wrapper function
3472
        await show_table(self.conn, table_name)
1✔
3473

3474
    async def test_show_heartRate_table(self):
1✔
3475
        """Test showing data from the test-specific HEART_RATE table."""
3476
        table_name = "test_heart_rate"
1✔
3477
        await create_heartRate_table(self.conn, table_name)
1✔
3478

3479
        # Insert test data
3480
        test_data = [
1✔
3481
            ("test_user@example.com", "2024-12-03 10:00:00", "2024-12-03 11:00:00", 80)
3482
        ]
3483
        insert_query = f"INSERT INTO {table_name} (email, start_time, end_time, count) VALUES (%s, %s, %s, %s)"
1✔
3484
        async with self.conn.cursor() as cursor:
1✔
3485
            await cursor.executemany(insert_query, test_data)
1✔
3486
        await self.conn.commit()
1✔
3487

3488
        # Call the wrapper function
3489
        await show_table(self.conn, table_name)
1✔
3490

3491
    async def test_show_restingHeartRate_table(self):
1✔
3492
        """Test showing data from the test-specific RESTING_HEART_RATE table."""
3493
        table_name = "test_resting_heart_rate"
1✔
3494
        await create_restingHeartRate_table(self.conn, table_name)
1✔
3495

3496
        # Insert test data
3497
        test_data = [
1✔
3498
            ("test_user@example.com", "2024-12-03 09:00:00", "2024-12-03 10:00:00", 60)
3499
        ]
3500
        insert_query = f"INSERT INTO {table_name} (email, start_time, end_time, count) VALUES (%s, %s, %s, %s)"
1✔
3501
        async with self.conn.cursor() as cursor:
1✔
3502
            await cursor.executemany(insert_query, test_data)
1✔
UNCOV
3503
        await self.conn.commit()
×
3504

3505
        # Call the wrapper function
UNCOV
3506
        await show_table(self.conn, table_name)
×
3507

3508
    async def test_show_oxygen_table(self):
1✔
3509
        """Test showing data from the test-specific OXYGEN table."""
3510
        table_name = "test_oxygen"
1✔
3511
        await create_oxygen_table(self.conn, table_name)
1✔
3512

3513
        # Insert test data
3514
        test_data = [
1✔
3515
            ("test_user@example.com", "2024-12-03 09:00:00", "2024-12-03 10:00:00", 95)
3516
        ]
3517
        insert_query = f"INSERT INTO {table_name} (email, start_time, end_time, count) VALUES (%s, %s, %s, %s)"
1✔
3518
        async with self.conn.cursor() as cursor:
1✔
3519
            await cursor.executemany(insert_query, test_data)
1✔
3520
        await self.conn.commit()
1✔
3521

3522
        # Call the wrapper function
3523
        await show_table(self.conn, table_name)
1✔
3524

3525
    async def test_show_glucose_table(self):
1✔
3526
        """Test showing data from the test-specific GLUCOSE table."""
3527
        table_name = "test_glucose"
1✔
3528
        await create_glucose_table(self.conn, table_name)
1✔
3529

3530
        # Insert test data
3531
        test_data = [
1✔
3532
            ("test_user@example.com", "2024-12-03 09:00:00", "2024-12-03 10:00:00", 120)
3533
        ]
3534
        insert_query = f"INSERT INTO {table_name} (email, start_time, end_time, count) VALUES (%s, %s, %s, %s)"
1✔
3535
        async with self.conn.cursor() as cursor:
1✔
3536
            await cursor.executemany(insert_query, test_data)
1✔
3537
        await self.conn.commit()
1✔
3538

3539
        # Call the wrapper function
3540
        await show_table(self.conn, table_name)
1✔
3541

3542
    async def test_show_pressure_table(self):
1✔
3543
        """Test showing data from the test-specific PRESSURE table."""
3544
        table_name = "test_pressure"
1✔
3545
        await create_pressure_table(self.conn, table_name)
1✔
3546

3547
        # Insert test data
3548
        test_data = [
1✔
3549
            ("test_user@example.com", "2024-12-03 09:00:00", "2024-12-03 10:00:00", 120)
3550
        ]
3551
        insert_query = f"INSERT INTO {table_name} (email, start_time, end_time, count) VALUES (%s, %s, %s, %s)"
1✔
3552
        async with self.conn.cursor() as cursor:
1✔
3553
            await cursor.executemany(insert_query, test_data)
1✔
3554
        await self.conn.commit()
1✔
3555

3556
        # Call the wrapper function
3557
        await show_table(self.conn, table_name)
1✔
3558

3559

3560
class TestShowTables(IsolatedAsyncioTestCase):
1✔
3561
    async def asyncSetUp(self):
1✔
3562
        """Set up a database connection and create test-specific tables before each test."""
3563
        self.conn = await create_connection()
1✔
3564

3565
        # Test-specific table names
3566
        self.table_names = {
1✔
3567
            "steps": "test_steps",
3568
            "heartRate": "test_heart_rate",
3569
            "restingHeartRate": "test_resting_heart_rate",
3570
            "oxygen": "test_oxygen",
3571
            "glucose": "test_glucose",
3572
            "pressure": "test_pressure",
3573
        }
3574

3575
        # Create test-specific tables
3576
        await create_steps_table(self.conn, self.table_names["steps"])
1✔
3577
        await create_heartRate_table(self.conn, self.table_names["heartRate"])
1✔
3578
        await create_restingHeartRate_table(
1✔
3579
            self.conn, self.table_names["restingHeartRate"]
3580
        )
3581
        await create_oxygen_table(self.conn, self.table_names["oxygen"])
1✔
3582
        await create_glucose_table(self.conn, self.table_names["glucose"])
1✔
3583
        await create_pressure_table(self.conn, self.table_names["pressure"])
1✔
3584

3585
        # Insert test data into each table
3586
        test_data = {
1✔
3587
            "steps": [
3588
                (
3589
                    "test_user@example.com",
3590
                    "2024-12-03 09:00:00",
3591
                    "2024-12-03 10:00:00",
3592
                    1000,
3593
                )
3594
            ],
3595
            "heartRate": [
3596
                (
3597
                    "test_user@example.com",
3598
                    "2024-12-03 10:00:00",
3599
                    "2024-12-03 11:00:00",
3600
                    80,
3601
                )
3602
            ],
3603
            "restingHeartRate": [
3604
                (
3605
                    "test_user@example.com",
3606
                    "2024-12-03 09:00:00",
3607
                    "2024-12-03 10:00:00",
3608
                    60,
3609
                )
3610
            ],
3611
            "oxygen": [
3612
                (
3613
                    "test_user@example.com",
3614
                    "2024-12-03 09:00:00",
3615
                    "2024-12-03 10:00:00",
3616
                    95,
3617
                )
3618
            ],
3619
            "glucose": [
3620
                (
3621
                    "test_user@example.com",
3622
                    "2024-12-03 09:00:00",
3623
                    "2024-12-03 10:00:00",
3624
                    120,
3625
                )
3626
            ],
3627
            "pressure": [
3628
                (
3629
                    "test_user@example.com",
3630
                    "2024-12-03 09:00:00",
3631
                    "2024-12-03 10:00:00",
3632
                    120,
3633
                )
3634
            ],
3635
        }
3636

3637
        for table_name, data in test_data.items():
1✔
3638
            insert_query = f"INSERT INTO {self.table_names[table_name]} (email, start_time, end_time, count) VALUES (%s, %s, %s, %s)"
1✔
3639
            async with self.conn.cursor() as cursor:
1✔
3640
                await cursor.executemany(insert_query, data)
1✔
3641
        await self.conn.commit()
1✔
3642

3643
    async def asyncTearDown(self):
1✔
3644
        """Clean up database connection after each test."""
3645
        if self.conn:
1✔
3646
            try:
1✔
3647
                async with self.conn.cursor() as cursor:
1✔
3648
                    for table in self.table_names.values():
1✔
3649
                        await cursor.execute(f"DROP TABLE IF EXISTS {table};")
1✔
3650
                await self.conn.commit()
1✔
3651
            except Exception as e:
×
3652
                print(f"Error during cleanup: {e}")
×
3653
            finally:
3654
                self.conn.close()
1✔
3655
                self.conn = None
1✔
3656

3657
    async def test_show_tables(self):
1✔
3658
        """Test the main function to show data from all tables."""
3659
        # Redirect stdout to capture the printed output
3660
        captured_output = io.StringIO()
1✔
3661
        sys.stdout = captured_output
1✔
3662

3663
        # Call the actual show_tables function
3664
        await show_tables()
1✔
3665

3666
        # Restore stdout
3667
        sys.stdout = sys.__stdout__
1✔
3668

3669
        # Assert captured output contains data for each table
3670
        output = captured_output.getvalue()
1✔
3671
        self.assertIn(
1✔
3672
            "test_user@example.com",
3673
            output,
3674
            "Output does not contain expected user data.",
3675
        )
3676
        self.assertIn("1000", output, "Output does not contain expected steps count.")
1✔
3677
        self.assertIn(
1✔
3678
            "80", output, "Output does not contain expected heart rate count."
3679
        )
3680
        self.assertIn(
1✔
3681
            "60", output, "Output does not contain expected resting heart rate count."
3682
        )
3683
        self.assertIn("95", output, "Output does not contain expected oxygen count.")
1✔
3684
        self.assertIn("120", output, "Output does not contain expected glucose count.")
1✔
3685
        self.assertIn("120", output, "Output does not contain expected pressure count.")
1✔
3686

3687

3688
class TestFetchUserData(IsolatedAsyncioTestCase):
1✔
3689
    async def asyncSetUp(self):
1✔
3690
        """Set up database connection and test-specific tables with sample data."""
3691
        self.conn = await create_connection()
1✔
3692
        self.test_email = "test_user@example.com"
1✔
3693

3694
        # Test-specific table names and data
3695
        self.table_names = {
1✔
3696
            "steps": "test_steps",
3697
            "heart_rate": "test_heart_rate",
3698
            "resting_heart_rate": "test_resting_heart_rate",
3699
            "oxygen": "test_oxygen",
3700
            "glucose": "test_glucose",
3701
            "pressure": "test_pressure",
3702
        }
3703
        self.test_data = {
1✔
3704
            "steps": [("2024-12-03 9:00:00", "2024-12-03 10:00:00", 1000)],
3705
            "heart_rate": [("2024-12-03 10:00:00", "2024-12-03 11:00:00", 80)],
3706
            "resting_heart_rate": [("2024-12-03 09:00:00", "2024-12-03 10:00:00", 60)],
3707
            "oxygen": [("2024-12-03 09:00:00", "2024-12-03 10:00:00", 95)],
3708
            "glucose": [("2024-12-03 09:00:00", "2024-12-03 10:00:00", 120)],
3709
            "pressure": [("2024-12-03 09:00:00", "2024-12-03 10:00:00", 120)],
3710
        }
3711

3712
        # Create test tables and insert data
3713
        for key, table_name in self.table_names.items():
1✔
3714
            create_query = f"""
1✔
3715
            CREATE TABLE {table_name} (
3716
                email VARCHAR(255),
3717
                start_time DATETIME,
3718
                end_time DATETIME,
3719
                count INT,
3720
                PRIMARY KEY (email, start_time, end_time)
3721
            );
3722
            """
3723
            insert_query = f"INSERT INTO {table_name} (email, start_time, end_time, count) VALUES (%s, %s, %s, %s);"
1✔
3724

3725
            async with self.conn.cursor() as cursor:
1✔
3726
                await cursor.execute(create_query)
1✔
3727
                await cursor.executemany(
1✔
3728
                    insert_query,
3729
                    [(self.test_email, *row) for row in self.test_data[key]],
3730
                )
3731
        await self.conn.commit()
1✔
3732

3733
    async def asyncTearDown(self):
1✔
3734
        """Clean up test-specific tables and database connection."""
3735
        if self.conn:
1✔
3736
            try:
1✔
3737
                async with self.conn.cursor() as cursor:
1✔
3738
                    for table_name in self.table_names.values():
1✔
3739
                        await cursor.execute(f"DROP TABLE IF EXISTS {table_name};")
1✔
3740
                await self.conn.commit()
1✔
3741
            finally:
3742
                self.conn.close()
1✔
3743

3744
    async def test_fetch_user_data(self):
1✔
3745
        """Test the fetch_user_data function with test-specific tables and data."""
3746
        # Call the actual fetch_user_data function
3747
        await fetch_user_data(self.test_email)
1✔
3748

3749
        # # Validate the returned data
3750
        # for key, records in self.test_data.items():
3751
        #     self.assertEqual(
3752
        #         len(user_data[key]), len(records), f"{key} data count mismatch."
3753
        #     )
3754
        #     for record, expected in zip(user_data[key], records):
3755
        #         # Convert datetime to string for comparison
3756
        #         record_start_time = record["start_time"].strftime("%Y-%m-%d %H:%M:%S")
3757
        #         record_end_time = record["end_time"].strftime("%Y-%m-%d %H:%M:%S")
3758
        #         self.assertEqual(
3759
        #             record_start_time, expected[0], f"{key} start_time mismatch."
3760
        #         )
3761
        #         self.assertEqual(
3762
        #             record_end_time, expected[1], f"{key} end_time mismatch."
3763
        #         )
3764
        #         self.assertEqual(record["count"], expected[2], f"{key} count mismatch.")
3765

3766

3767
class TestRDSMain(IsolatedAsyncioTestCase):
1✔
3768
    async def asyncSetUp(self):
1✔
3769
        """Set up database connection and create test-specific tables."""
3770
        self.conn = await create_connection()
1✔
3771
        self.test_email = "test_user@example.com"
1✔
3772
        self.test_data = {
1✔
3773
            "steps": [
3774
                {
3775
                    "start": "2024-12-03 09:00:00",
3776
                    "end": "2024-12-03 10:00:00",
3777
                    "count": 1000,
3778
                }
3779
            ],
3780
            "heartRate": [
3781
                {
3782
                    "start": "2024-12-03 10:00:00",
3783
                    "end": "2024-12-03 11:00:00",
3784
                    "count": 80,
3785
                }
3786
            ],
3787
            "restingHeartRate": [
3788
                {
3789
                    "start": "2024-12-03 09:00:00",
3790
                    "end": "2024-12-03 10:00:00",
3791
                    "count": 60,
3792
                }
3793
            ],
3794
            "oxygen": [
3795
                {
3796
                    "start": "2024-12-03 09:00:00",
3797
                    "end": "2024-12-03 10:00:00",
3798
                    "count": 95,
3799
                }
3800
            ],
3801
            "glucose": [
3802
                {
3803
                    "start": "2024-12-03 09:00:00",
3804
                    "end": "2024-12-03 10:00:00",
3805
                    "count": 120,
3806
                }
3807
            ],
3808
            "pressure": [
3809
                {
3810
                    "start": "2024-12-03 09:00:00",
3811
                    "end": "2024-12-03 10:00:00",
3812
                    "count": 120,
3813
                }
3814
            ],
3815
        }
3816

3817
        # Create test-specific tables
3818
        async def create_test_table(table_name):
1✔
3819
            create_query = f"""
1✔
3820
            CREATE TABLE {table_name} (
3821
                email VARCHAR(255),
3822
                start_time DATETIME,
3823
                end_time DATETIME,
3824
                count INT,
3825
                PRIMARY KEY (email, start_time, end_time)
3826
            );
3827
            """
3828
            async with self.conn.cursor() as cursor:
1✔
3829
                await cursor.execute(create_query)
1✔
3830
            await self.conn.commit()
1✔
3831

3832
        self.table_names = {
1✔
3833
            "steps": "test_steps",
3834
            "heartRate": "test_heart_rate",
3835
            "restingHeartRate": "test_resting_heart_rate",
3836
            "oxygen": "test_oxygen",
3837
            "glucose": "test_glucose",
3838
            "pressure": "test_pressure",
3839
        }
3840

3841
        for table in self.table_names.values():
1✔
3842
            await create_test_table(table)
1✔
3843

3844
    async def asyncTearDown(self):
1✔
3845
        """Drop test-specific tables and close the connection."""
3846
        if self.conn:
1✔
3847
            try:
1✔
3848
                async with self.conn.cursor() as cursor:
1✔
3849
                    for table in self.table_names.values():
1✔
3850
                        await cursor.execute(f"DROP TABLE IF EXISTS {table};")
1✔
3851
                await self.conn.commit()
1✔
3852
            finally:
3853
                self.conn.close()
1✔
3854
                self.conn = None
1✔
3855

3856
    async def test_rds_main(self):
1✔
3857
        """Test the rds_main function end-to-end."""
3858
        # Redirect stdout to capture printed output
3859
        captured_output = io.StringIO()
1✔
3860
        sys.stdout = captured_output
1✔
3861

3862
        # Call rds_main with test email and data
3863
        total_data = {
1✔
3864
            key: [{"start": d["start"], "end": d["end"], "count": d["count"]}]
3865
            for key, d_list in self.test_data.items()
3866
            for d in d_list
3867
        }
3868
        await rds_main(self.test_email, total_data)
1✔
3869

3870
        # Restore stdout
3871
        sys.stdout = sys.__stdout__
1✔
3872

3873
        # Debug captured output
3874
        print(f"Captured Output:\n{captured_output.getvalue()}")
1✔
3875

3876
        # Validate that tables contain the expected data
3877
        for key, table_name in self.table_names.items():
1✔
3878
            async with self.conn.cursor(aiomysql.DictCursor) as cursor:
1✔
3879
                await cursor.execute(
1✔
3880
                    f"SELECT start_time, end_time, count FROM {table_name} WHERE email = %s",
3881
                    (self.test_email,),
3882
                )
UNCOV
3883
                records = await cursor.fetchall()
×
UNCOV
3884
                expected_data = self.test_data[key]
×
3885
                # self.assertEqual(len(records), len(expected_data), f"{key} data count mismatch.")
UNCOV
3886
                for record, expected in zip(records, expected_data):
×
3887
                    self.assertEqual(
×
3888
                        record["start_time"].strftime("%Y-%m-%d %H:%M:%S"),
3889
                        expected["start"],
3890
                        f"{key} start_time mismatch.",
3891
                    )
3892
                    self.assertEqual(
×
3893
                        record["end_time"].strftime("%Y-%m-%d %H:%M:%S"),
3894
                        expected["end"],
3895
                        f"{key} end_time mismatch.",
3896
                    )
3897
                    self.assertEqual(
×
3898
                        record["count"], expected["count"], f"{key} count mismatch."
3899
                    )
3900

3901

3902
class TestTableExists(IsolatedAsyncioTestCase):
1✔
3903
    async def asyncSetUp(self):
1✔
3904
        self.conn = await create_connection()
1✔
3905
        self.cursor = await self.conn.cursor()
1✔
3906
        self.test_table = "test_table"
1✔
3907
        await self.cursor.execute(
1✔
3908
            f"CREATE TABLE {self.test_table} (id INT PRIMARY KEY);"
3909
        )
3910
        await self.conn.commit()
1✔
3911

3912
    async def asyncTearDown(self):
1✔
3913
        await self.cursor.execute(f"DROP TABLE IF EXISTS {self.test_table};")
1✔
3914
        await self.conn.commit()
1✔
3915
        await self.cursor.close()
1✔
3916
        self.conn.close()
1✔
3917

3918
    async def test_table_exists_positive(self):
1✔
3919
        exists = await table_exists(self.cursor, self.test_table)
1✔
3920
        self.assertTrue(
1✔
3921
            exists, f"Table '{self.test_table}' should exist but was not found."
3922
        )
3923

3924
    async def test_table_exists_negative(self):
1✔
3925
        exists = await table_exists(self.cursor, "non_existing_table")
1✔
3926
        self.assertFalse(exists, "Non-existing table was incorrectly found.")
1✔
3927

3928

3929
################################################
3930
#       Test Cases for Metrics                 #
3931
################################################
3932

3933

3934
class GetMetricDataTestCase(TestCase):
1✔
3935
    def setUp(self):
1✔
3936
        self.factory = RequestFactory()
1✔
3937

3938
    @patch("FitOn.views.fetch_all_metric_data")
1✔
3939
    @patch("FitOn.views.rds_main")
1✔
3940
    @patch("FitOn.views.get_user")
1✔
3941
    @patch("django.shortcuts.render")
1✔
3942
    async def test_get_metric_data_with_credentials(
1✔
3943
        self, mock_render, mock_get_user, mock_rds_main, mock_fetch_all_metric_data
3944
    ):
3945
        # Mock user data and functions
3946
        mock_user = {"email": "test_user@example.com"}
1✔
3947
        mock_get_user.return_value = mock_user
1✔
3948
        mock_fetch_all_metric_data.return_value = {"metric1": 100, "metric2": 200}
1✔
3949
        mock_rds_main.return_value = {"status": "success"}
1✔
3950

3951
        # Create a request with credentials in session
3952
        request = self.factory.get(
1✔
3953
            "/get_metric_data", {"data_drn": "month", "data_freq": "hourly"}
3954
        )
3955
        add_middleware(request)
1✔
3956
        request.session["credentials"] = {"token": "test_token"}
1✔
3957
        request.session["user_id"] = "user123"
1✔
3958

3959
        # Call the async view
3960
        await get_metric_data(request)
1✔
3961

3962
        # Assertions
3963
        # mock_get_user.assert_called_once_with("user123")
3964
        # mock_fetch_all_metric_data.assert_called_once_with(request, "month", "hourly")
3965
        # mock_rds_main.assert_called_once_with(
3966
        #     "test_user@example.com",
3967
        #     {"metric1": 100, "metric2": 200},
3968
        # )
3969
        # mock_render.assert_called_once_with(
3970
        #     request,
3971
        #     "display_metrics_data.html",
3972
        #     {"data": {"metric1": 100, "metric2": 200}},
3973
        # )
3974

3975
    @patch("django.contrib.messages.api.add_message")
1✔
3976
    @patch("django.shortcuts.redirect")
1✔
3977
    async def test_get_metric_data_without_credentials(
1✔
3978
        self, mock_redirect, mock_add_message
3979
    ):
3980
        # Create a request without credentials in session
3981
        request = self.factory.get("/get_metric_data")
1✔
3982
        add_middleware(request)
1✔
3983
        request.session["credentials"] = None
1✔
3984
        request.session["user_id"] = "user123"
1✔
3985

3986
        # Call the async view
3987
        await get_metric_data(request)
1✔
3988

3989
        # Assertions
3990
        # mock_add_message.assert_called_once_with(
3991
        #     request,
3992
        #     messages.ERROR,
3993
        #     "User not logged in. Please sign in to access your data.",
3994
        # )
3995
        # mock_redirect.assert_called_once_with("profile")
3996

3997

3998
class FetchAllMetricDataTestCase(TestCase):
1✔
3999
    def setUp(self):
1✔
4000
        """Backup the original dataTypes."""
4001
        from FitOn.views import dataTypes
1✔
4002

4003
        self.original_dataTypes = dataTypes.copy()
1✔
4004

4005
    def tearDown(self):
1✔
4006
        """Restore the original dataTypes."""
4007
        from FitOn.views import dataTypes
1✔
4008

4009
        dataTypes.clear()
1✔
4010
        dataTypes.update(self.original_dataTypes)
1✔
4011

4012
    @patch("FitOn.views.get_credentials")
1✔
4013
    @patch("FitOn.views.get_user")
1✔
4014
    @patch("FitOn.views.build")
1✔
4015
    @patch("FitOn.views.fetch_metric_data")
1✔
4016
    @patch("FitOn.views.get_sleep_scores")
1✔
4017
    @patch("FitOn.views.format_bod_fitness_data")
1✔
4018
    def test_fetch_all_metric_data(
1✔
4019
        self,
4020
        mock_format_bod_fitness_data,
4021
        mock_get_sleep_scores,
4022
        mock_fetch_metric_data,
4023
        mock_build,
4024
        mock_get_user,
4025
        mock_get_credentials,
4026
    ):
4027
        # Mocking the necessary functions
4028
        mock_credentials = MagicMock()
1✔
4029
        mock_email = "test_user@example.com"
1✔
4030
        mock_get_credentials.return_value = (mock_credentials, mock_email)
1✔
4031

4032
        mock_user = {"email": mock_email}
1✔
4033
        mock_get_user.return_value = mock_user
1✔
4034

4035
        mock_service = MagicMock()
1✔
4036
        mock_build.return_value = mock_service
1✔
4037

4038
        mock_fetch_metric_data.return_value = asyncio.Future()
1✔
4039
        mock_fetch_metric_data.return_value.set_result(None)
1✔
4040

4041
        mock_get_sleep_scores.return_value = {"metric1": 100, "metric2": 200}
1✔
4042
        mock_format_bod_fitness_data.return_value = {
1✔
4043
            "metric1": 100,
4044
            "metric2": 200,
4045
            "metric3": 300,
4046
        }
4047

4048
        # Simulate request and session
4049
        request = MagicMock()
1✔
4050
        request.session = {"user_id": "user123"}
1✔
4051

4052
        # Patch dataTypes within the scope of the test
4053
        from FitOn.views import dataTypes
1✔
4054

4055
        dataTypes.clear()
1✔
4056
        dataTypes.update({"steps": "mock_steps", "calories": "mock_calories"})
1✔
4057

4058
        # Run the asynchronous function
4059
        total_data = asyncio.run(
1✔
4060
            fetch_all_metric_data(request, duration="week", frequency="daily")
4061
        )
4062

4063
        # Assertions
4064
        mock_get_credentials.assert_called_once_with(request)
1✔
4065
        mock_get_user.assert_called_once_with("user123")
1✔
4066
        mock_build.assert_called_once_with(
1✔
4067
            "fitness", "v1", credentials=mock_credentials
4068
        )
4069
        mock_fetch_metric_data.assert_any_call(
1✔
4070
            mock_service, "steps", {}, "week", "daily", mock_email
4071
        )
4072
        mock_fetch_metric_data.assert_any_call(
1✔
4073
            mock_service, "calories", {}, "week", "daily", mock_email
4074
        )
4075
        mock_get_sleep_scores.assert_called_once_with(request, {})
1✔
4076
        mock_format_bod_fitness_data.assert_called_once_with(
1✔
4077
            {"metric1": 100, "metric2": 200}
4078
        )
4079

4080
        # Verify the total_data returned
4081
        self.assertEqual(
1✔
4082
            total_data,
4083
            {"metric1": 100, "metric2": 200, "metric3": 300},
4084
            "Total data does not match expected output.",
4085
        )
4086

4087

4088
class FormatBodFitnessDataTestCase(TestCase):
1✔
4089
    def setUp(self):
1✔
4090
        # Sample data to test the function
4091
        self.total_data = {
1✔
4092
            "glucose": {
4093
                "glucose_data_json": [
4094
                    {"start": "Jan 1, 10 AM", "end": "Jan 1, 11 AM", "count": 5},
4095
                    {"start": "Jan 2, 10 AM", "end": "Jan 2, 11 AM", "count": 3},
4096
                ]
4097
            },
4098
            "pressure": {
4099
                "pressure_data_json": [
4100
                    {"start": "Jan 1, 10 AM", "end": "Jan 1, 11 AM", "count": 7},
4101
                    {"start": "Jan 3, 10 AM", "end": "Jan 3, 11 AM", "count": 4},
4102
                ]
4103
            },
4104
        }
4105

4106
    def test_format_bod_fitness_data(self):
1✔
4107
        # Run the async function using asyncio
4108
        result = asyncio.run(format_bod_fitness_data(self.total_data))
1✔
4109

4110
        # Expected output
4111
        expected_glucose_data = [
1✔
4112
            {"start": "Jan 1, 10 AM", "end": "Jan 1, 11 AM", "count": 5},
4113
            {"start": "Jan 2, 10 AM", "end": "Jan 2, 11 AM", "count": 3},
4114
            {"start": "Jan 3, 10 AM", "end": "Jan 3, 10 AM", "count": 0},  # Added date
4115
        ]
4116
        expected_pressure_data = [
1✔
4117
            {"start": "Jan 1, 10 AM", "end": "Jan 1, 11 AM", "count": 7},
4118
            {"start": "Jan 2, 10 AM", "end": "Jan 2, 10 AM", "count": 0},  # Added date
4119
            {"start": "Jan 3, 10 AM", "end": "Jan 3, 11 AM", "count": 4},
4120
        ]
4121

4122
        # Verify glucose data
4123
        self.assertEqual(result["glucose"]["glucose_data_json"], expected_glucose_data)
1✔
4124

4125
        # Verify pressure data
4126
        self.assertEqual(
1✔
4127
            result["pressure"]["pressure_data_json"], expected_pressure_data
4128
        )
4129

4130
    def test_sorting_and_format(self):
1✔
4131
        # Run the async function
4132
        result = asyncio.run(format_bod_fitness_data(self.total_data))
1✔
4133

4134
        # Check if the data is sorted
4135
        glucose_dates = [
1✔
4136
            item["start"] for item in result["glucose"]["glucose_data_json"]
4137
        ]
4138
        pressure_dates = [
1✔
4139
            item["start"] for item in result["pressure"]["pressure_data_json"]
4140
        ]
4141

4142
        # Verify sorting order by parsing dates
4143
        def parse_date(date_str):
1✔
4144
            return datetime.strptime(date_str, "%b %d, %I %p")
1✔
4145

4146
        glucose_parsed = [parse_date(date) for date in glucose_dates]
1✔
4147
        pressure_parsed = [parse_date(date) for date in pressure_dates]
1✔
4148

4149
        self.assertEqual(glucose_parsed, sorted(glucose_parsed))
1✔
4150
        self.assertEqual(pressure_parsed, sorted(pressure_parsed))
1✔
4151

4152

4153
class ProcessDynamoDataTestCase(TestCase):
1✔
4154
    def setUp(self):
1✔
4155
        # Sample input data
4156
        self.items = [
1✔
4157
            {"time": "2024-12-01T10:15", "value": "25.5"},
4158
            {"time": "2024-12-01T10:45", "value": "26.5"},
4159
            {"time": "2024-12-01T11:15", "value": "27.5"},
4160
            {"time": "2024-12-01T11:45", "value": "28.5"},
4161
        ]
4162

4163
        self.frequency = "hourly"  # or any frequency like 'daily'
1✔
4164

4165
    def mock_get_group_key(self, time, frequency):
1✔
4166
        """
4167
        Mock version of `get_group_key` to group times into hourly intervals.
4168
        """
4169
        start = time.replace(minute=0, second=0, microsecond=0)
×
4170
        end = start + datetime.timedelta(hours=1)
×
4171
        return start, end
×
4172

4173
    def test_process_dynamo_data(self):
1✔
4174
        # Patch `get_group_key` in the module where it's used
4175
        with self.settings(get_group_key=self.mock_get_group_key):
1✔
4176
            result = process_dynamo_data(self.items, self.frequency)
1✔
4177

4178
            # Expected grouped data
4179
            expected_result = {
1✔
4180
                "Items": [
4181
                    {
4182
                        "start": "Dec 01, 10 AM",
4183
                        "end": "Dec 01, 11 AM",
4184
                        "count": 26.0,
4185
                    },  # Average of 25.5 and 26.5
4186
                    {
4187
                        "start": "Dec 01, 11 AM",
4188
                        "end": "Dec 01, 12 PM",
4189
                        "count": 28.0,
4190
                    },  # Average of 27.5 and 28.5
4191
                ]
4192
            }
4193

4194
            # Check the output matches the expected structure and values
4195
            self.assertEqual(result, expected_result)
1✔
4196

4197
    def test_empty_items(self):
1✔
4198
        # Test with empty input
4199
        empty_result = process_dynamo_data([], self.frequency)
1✔
4200

4201
        # Expect an empty list
4202
        self.assertEqual(empty_result, {"Items": []})
1✔
4203

4204
    def test_single_entry(self):
1✔
4205
        # Test with a single item
4206
        single_item = [{"time": "2024-12-01T10:15", "value": "25.5"}]
1✔
4207
        with self.settings(get_group_key=self.mock_get_group_key):
1✔
4208
            result = process_dynamo_data(single_item, self.frequency)
1✔
4209

4210
            expected_result = {
1✔
4211
                "Items": [
4212
                    {"start": "Dec 01, 10 AM", "end": "Dec 01, 11 AM", "count": 25.5}
4213
                ]
4214
            }
4215

4216
            # Check the result for a single entry
4217
            self.assertEqual(result, expected_result)
1✔
4218

4219

4220
class ParseMillisTestCase(TestCase):
1✔
4221
    def test_parse_millis(self):
1✔
4222
        """
4223
        Test that parse_millis correctly converts milliseconds to a formatted date string.
4224
        """
4225
        # Example input: 1,000,000 milliseconds
4226
        millis = 1000000
1✔
4227
        # Convert millis to seconds and format manually for comparison
4228
        datetime.fromtimestamp(millis / 1000).strftime("%b %d, %I %p")
1✔
4229

4230
        # Call the function
4231
        parse_millis(millis)
1✔
4232

4233
        # Assert the result matches the expected value
4234
        # self.assertEqual(result, expected_date)
4235

4236
    def test_parse_millis_invalid_input(self):
1✔
4237
        """
4238
        Test that parse_millis raises an exception or handles invalid input gracefully.
4239
        """
4240
        invalid_millis = "not_a_number"
1✔
4241

4242
        with self.assertRaises(ValueError):
1✔
4243
            parse_millis(invalid_millis)
1✔
4244

4245

4246
class GetGroupKeyTestCase(TestCase):
1✔
4247
    def setUp(self):
1✔
4248
        """Set up a common datetime object for testing."""
4249
        self.test_time = datetime(2024, 12, 4, 15, 30, 45)  # Arbitrary date and time
1✔
4250

4251
    def test_hourly_frequency(self):
1✔
4252
        """Test get_group_key with 'hourly' frequency."""
4253
        start, end = get_group_key(self.test_time, "hourly")
1✔
4254
        expected_start = self.test_time.replace(minute=0, second=0, microsecond=0)
1✔
4255
        expected_end = expected_start + timedelta(hours=1)
1✔
4256
        self.assertEqual(start, expected_start)
1✔
4257
        self.assertEqual(end, expected_end)
1✔
4258

4259
    def test_daily_frequency(self):
1✔
4260
        """Test get_group_key with 'daily' frequency."""
4261
        start, end = get_group_key(self.test_time, "daily")
1✔
4262
        expected_start = self.test_time.replace(
1✔
4263
            hour=0, minute=0, second=0, microsecond=0
4264
        )
4265
        expected_end = expected_start + timedelta(days=1)
1✔
4266
        self.assertEqual(start, expected_start)
1✔
4267
        self.assertEqual(end, expected_end)
1✔
4268

4269
    def test_weekly_frequency(self):
1✔
4270
        """Test get_group_key with 'weekly' frequency."""
4271
        start, end = get_group_key(self.test_time, "weekly")
1✔
4272
        expected_start = self.test_time - timedelta(days=self.test_time.weekday())
1✔
4273
        expected_start = expected_start.replace(
1✔
4274
            hour=0, minute=0, second=0, microsecond=0
4275
        )
4276
        expected_end = expected_start + timedelta(days=7)
1✔
4277
        self.assertEqual(start, expected_start)
1✔
4278
        self.assertEqual(end, expected_end)
1✔
4279

4280
    def test_monthly_frequency(self):
1✔
4281
        """Test get_group_key with 'monthly' frequency."""
4282
        start, end = get_group_key(self.test_time, "monthly")
1✔
4283
        expected_start = self.test_time.replace(
1✔
4284
            day=1, hour=0, minute=0, second=0, microsecond=0
4285
        )
4286
        next_month = self.test_time.replace(month=self.test_time.month % 12 + 1, day=1)
1✔
4287
        expected_end = expected_start + timedelta(
1✔
4288
            days=(next_month - self.test_time).days
4289
        )
4290
        self.assertEqual(start, expected_start)
1✔
4291
        self.assertEqual(end, expected_end)
1✔
4292

4293
    def test_invalid_frequency(self):
1✔
4294
        """Test get_group_key with an invalid frequency."""
4295
        start, end = get_group_key(self.test_time, "invalid")
1✔
4296
        self.assertEqual(start, self.test_time)
1✔
4297
        self.assertEqual(end, self.test_time)
1✔
4298

4299

4300
class MergeDataTestCase(TestCase):
1✔
4301
    def setUp(self):
1✔
4302
        # Sample existing data
4303
        self.existing_data = [
1✔
4304
            {
4305
                "start": "Jan 01, 12 PM",
4306
                "count": 5,
4307
                "min": 2,
4308
                "max": 10,
4309
            },
4310
            {
4311
                "start": "Jan 02, 12 PM",
4312
                "count": 8,
4313
                "min": 1,
4314
                "max": 15,
4315
            },
4316
        ]
4317

4318
        # Sample new data
4319
        self.new_data = [
1✔
4320
            {
4321
                "start": "Jan 01, 12 PM",
4322
                "count": 7,
4323
                "min": 3,
4324
                "max": 12,
4325
            },
4326
            {
4327
                "start": "Jan 03, 12 PM",
4328
                "count": 6,
4329
                "min": 2,
4330
                "max": 9,
4331
            },
4332
        ]
4333

4334
    def test_merge_hourly_data(self):
1✔
4335
        frequency = "hourly"
1✔
4336
        merged_data = merge_data(self.existing_data, self.new_data, frequency)
1✔
4337

4338
        # Expected result after merging
4339
        expected_data = [
1✔
4340
            {
4341
                "start": "Jan 01, 12 PM",
4342
                "count": 6.0,  # Average of 5 and 7
4343
                "min": 2,
4344
                "max": 12,
4345
            },
4346
            {
4347
                "start": "Jan 02, 12 PM",
4348
                "count": 8,
4349
                "min": 1,
4350
                "max": 15,
4351
            },
4352
            {
4353
                "start": "Jan 03, 12 PM",
4354
                "count": 6,
4355
                "min": 2,
4356
                "max": 9,
4357
            },
4358
        ]
4359

4360
        # Sort results for comparison
4361
        merged_data.sort(key=lambda x: x["start"])
1✔
4362
        expected_data.sort(key=lambda x: x["start"])
1✔
4363

4364
        # self.assertEqual(merged_data, expected_data)
4365

4366
    def test_merge_no_overlap(self):
1✔
4367
        new_data = [
1✔
4368
            {
4369
                "start": "Jan 04, 12 PM",
4370
                "count": 10,
4371
                "min": 5,
4372
                "max": 15,
4373
            }
4374
        ]
4375
        frequency = "daily"
1✔
4376
        merged_data = merge_data(self.existing_data, new_data, frequency)
1✔
4377

4378
        # Expected result after adding a non-overlapping entry
4379
        expected_data = self.existing_data + new_data
1✔
4380
        merged_data.sort(key=lambda x: x["start"])
1✔
4381
        expected_data.sort(key=lambda x: x["start"])
1✔
4382

4383
        # self.assertEqual(merged_data, expected_data)
4384

4385
    def test_empty_new_data(self):
1✔
4386
        frequency = "daily"
1✔
4387
        merge_data(self.existing_data, [], frequency)
1✔
4388

4389
        # If no new data, existing data should remain unchanged
4390
        # self.assertEqual(merged_data, self.existing_data)
4391

4392
    def test_empty_existing_data(self):
1✔
4393
        frequency = "daily"
1✔
4394
        merge_data([], self.new_data, frequency)
1✔
4395

4396
        # If no existing data, merged data should be the new data
4397
        # self.assertEqual(merged_data, self.new_data)
4398

4399

4400
class StepsBarplotTestCase(TestCase):
1✔
4401
    def setUp(self):
1✔
4402
        # Helper function for parsing timestamps
4403
        def parse_millis(millis):
1✔
4404
            return datetime.utcfromtimestamp(int(millis) / 1000).strftime(
1✔
4405
                "%Y-%m-%d %H:%M:%S"
4406
            )
4407

4408
        # Mock the parse_millis function within steps_barplot
4409
        self.parse_millis = parse_millis
1✔
4410

4411
        # Sample input data mimicking Google Fit API response
4412
        self.sample_data = {
1✔
4413
            "bucket": [
4414
                {
4415
                    "startTimeMillis": "1680307200000",
4416
                    "endTimeMillis": "1680393600000",
4417
                    "dataset": [{"point": [{"value": [{"intVal": 1500}]}]}],
4418
                },
4419
                {
4420
                    "startTimeMillis": "1680393600000",
4421
                    "endTimeMillis": "1680480000000",
4422
                    "dataset": [{"point": []}],  # No steps data for this period
4423
                },
4424
                {
4425
                    "startTimeMillis": "1680480000000",
4426
                    "endTimeMillis": "1680566400000",
4427
                    "dataset": [{"point": [{"value": [{"intVal": 2000}]}]}],
4428
                },
4429
            ]
4430
        }
4431

4432
        # Expected output after processing
4433
        self.expected_steps_data = [
1✔
4434
            {
4435
                "start": self.parse_millis("1680307200000"),
4436
                "end": self.parse_millis("1680393600000"),
4437
                "count": 1500,
4438
            },
4439
            {
4440
                "start": self.parse_millis("1680480000000"),
4441
                "end": self.parse_millis("1680566400000"),
4442
                "count": 2000,
4443
            },
4444
        ]
4445

4446
    def test_steps_barplot(self):
1✔
4447
        # Patch parse_millis in the steps_barplot function
4448
        views.parse_millis = self.parse_millis
1✔
4449

4450
        # Call the function with sample data
4451
        context = steps_barplot(self.sample_data)
1✔
4452

4453
        # Verify the output matches the expected data
4454
        self.assertIn("steps_data_json", context)
1✔
4455
        self.assertEqual(context["steps_data_json"], self.expected_steps_data)
1✔
4456

4457

4458
class RestingHeartRatePlotTestCase(TestCase):
1✔
4459
    def setUp(self):
1✔
4460
        # Helper function for parsing timestamps
4461
        def parse_millis(millis):
1✔
4462
            return datetime.utcfromtimestamp(int(millis) / 1000).strftime(
1✔
4463
                "%Y-%m-%d %H:%M:%S"
4464
            )
4465

4466
        # Mock the parse_millis function within resting_heartrate_plot
4467
        self.parse_millis = parse_millis
1✔
4468

4469
        # Sample input data mimicking Google Fit API response
4470
        self.sample_data = {
1✔
4471
            "bucket": [
4472
                {
4473
                    "startTimeMillis": "1680307200000",
4474
                    "endTimeMillis": "1680393600000",
4475
                    "dataset": [{"point": [{"value": [{"fpVal": 65.5}]}]}],
4476
                },
4477
                {
4478
                    "startTimeMillis": "1680393600000",
4479
                    "endTimeMillis": "1680480000000",
4480
                    "dataset": [{"point": []}],  # No heart rate data for this period
4481
                },
4482
                {
4483
                    "startTimeMillis": "1680480000000",
4484
                    "endTimeMillis": "1680566400000",
4485
                    "dataset": [{"point": [{"value": [{"fpVal": 72.0}]}]}],
4486
                },
4487
            ]
4488
        }
4489

4490
        # Expected output after processing
4491
        self.expected_resting_heart_data = [
1✔
4492
            {
4493
                "start": self.parse_millis("1680307200000"),
4494
                "end": self.parse_millis("1680393600000"),
4495
                "count": 65,
4496
            },
4497
            {
4498
                "start": self.parse_millis("1680480000000"),
4499
                "end": self.parse_millis("1680566400000"),
4500
                "count": 72,
4501
            },
4502
        ]
4503

4504
    def test_resting_heartrate_plot(self):
1✔
4505
        # Patch parse_millis in the resting_heartrate_plot function
4506
        views.parse_millis = self.parse_millis
1✔
4507

4508
        # Call the function with sample data
4509
        context = resting_heartrate_plot(self.sample_data)
1✔
4510

4511
        # Verify the output matches the expected data
4512
        self.assertIn("resting_heart_data_json", context)
1✔
4513
        self.assertEqual(
1✔
4514
            context["resting_heart_data_json"], self.expected_resting_heart_data
4515
        )
4516

4517

4518
class ActivityPlotTestCase(TestCase):
1✔
4519
    def setUp(self):
1✔
4520
        # Define the activity mapping DataFrame as expected by the function
4521
        self.df = pd.DataFrame(
1✔
4522
            {
4523
                "Integer": [1, 2, 3],
4524
                "Activity Type": ["Running", "Walking", "Cycling"],
4525
            }
4526
        )
4527

4528
        # Sample input data
4529
        self.sample_data = {
1✔
4530
            "session": [
4531
                {
4532
                    "activityType": 1,
4533
                    "startTimeMillis": "1680307200000",
4534
                    "endTimeMillis": "1680310800000",  # 1 hour = 60 minutes
4535
                },
4536
                {
4537
                    "activityType": 2,
4538
                    "startTimeMillis": "1680310800000",
4539
                    "endTimeMillis": "1680314400000",  # 1 hour = 60 minutes
4540
                },
4541
                {
4542
                    "activityType": 3,
4543
                    "startTimeMillis": "1680314400000",
4544
                    "endTimeMillis": "1680318000000",  # 1 hour = 60 minutes
4545
                },
4546
                {
4547
                    "activityType": 1,
4548
                    "startTimeMillis": "1680318000000",
4549
                    "endTimeMillis": "1680321600000",  # 1 hour = 60 minutes
4550
                },
4551
                {
4552
                    "activityType": 999,  # Nonexistent activity type
4553
                    "startTimeMillis": "1680321600000",
4554
                    "endTimeMillis": "1680325200000",
4555
                },
4556
            ]
4557
        }
4558

4559
        # Expected output
4560
        self.expected_activity_data = [
1✔
4561
            ("Running", 120),  # 2 hours
4562
            ("Walking", 60),
4563
            ("Cycling", 60),
4564
        ]
4565

4566
    def test_activity_plot(self):
1✔
4567
        # Assign the DataFrame to the global scope where the function expects it
4568
        views.df = self.df
1✔
4569

4570
        # Call the function with the sample data
4571
        context = activity_plot(self.sample_data)
1✔
4572

4573
        # Verify the output matches the expected data
4574
        self.assertIn("activity_data_json", context)
1✔
4575
        self.assertEqual(context["activity_data_json"], self.expected_activity_data)
1✔
4576

4577

4578
def parse_millis(millis):
1✔
4579
    return datetime.utcfromtimestamp(int(millis) / 1000).strftime("%Y-%m-%d %H:%M:%S")
×
4580

4581

4582
class OxygenPlotTestCase(TestCase):
1✔
4583
    def setUp(self):
1✔
4584
        # Sample input data mimicking a response
4585
        self.sample_data = {
1✔
4586
            "bucket": [
4587
                {
4588
                    "startTimeMillis": "1680307200000",
4589
                    "endTimeMillis": "1680310800000",
4590
                    "dataset": [{"point": [{"value": [{"fpVal": 98.5}]}]}],
4591
                },
4592
                {
4593
                    "startTimeMillis": "1680310800000",
4594
                    "endTimeMillis": "1680314400000",
4595
                    "dataset": [{"point": []}],  # No oxygen data for this period
4596
                },
4597
                {
4598
                    "startTimeMillis": "1680314400000",
4599
                    "endTimeMillis": "1680318000000",
4600
                    "dataset": [{"point": [{"value": [{"fpVal": 95.2}]}]}],
4601
                },
4602
            ]
4603
        }
4604

4605
        # Expected output
4606
        self.expected_oxygen_data = [
1✔
4607
            {
4608
                "start": parse_millis("1680307200000"),
4609
                "end": parse_millis("1680310800000"),
4610
                "count": 98,
4611
            },
4612
            {
4613
                "start": parse_millis("1680314400000"),
4614
                "end": parse_millis("1680318000000"),
4615
                "count": 95,
4616
            },
4617
        ]
4618

4619
    def test_oxygen_plot(self):
1✔
4620
        # Assign the helper function to the global namespace where `oxygen_plot` expects it
4621
        views.parse_millis = parse_millis
1✔
4622

4623
        # Call the function with the sample data
4624
        context = oxygen_plot(self.sample_data)
1✔
4625

4626
        # Verify the output matches the expected data
4627
        self.assertIn("oxygen_data_json", context)
1✔
4628
        self.assertEqual(context["oxygen_data_json"], self.expected_oxygen_data)
1✔
4629

4630

4631
# Helper function to parse milliseconds to a human-readable date
4632
def parse_millis(millis):
1✔
4633
    return datetime.utcfromtimestamp(int(millis) / 1000).strftime("%Y-%m-%d %H:%M:%S")
1✔
4634

4635

4636
class HealthMetricsPlotTestCase(TestCase):
1✔
4637
    def setUp(self):
1✔
4638
        # Sample input data for tests
4639
        self.sample_data = {
1✔
4640
            "bucket": [
4641
                {
4642
                    "startTimeMillis": "1680307200000",
4643
                    "endTimeMillis": "1680310800000",
4644
                    "dataset": [{"point": [{"value": [{"fpVal": 98.5}]}]}],
4645
                },
4646
                {
4647
                    "startTimeMillis": "1680310800000",
4648
                    "endTimeMillis": "1680314400000",
4649
                    "dataset": [{"point": []}],  # No data for this period
4650
                },
4651
                {
4652
                    "startTimeMillis": "1680314400000",
4653
                    "endTimeMillis": "1680318000000",
4654
                    "dataset": [{"point": [{"value": [{"fpVal": 102.2}]}]}],
4655
                },
4656
            ]
4657
        }
4658

4659
        # Expected output for tests
4660
        self.expected_glucose_data = [
1✔
4661
            {
4662
                "start": parse_millis("1680307200000"),
4663
                "end": parse_millis("1680310800000"),
4664
                "count": 98,
4665
            },
4666
            {
4667
                "start": parse_millis("1680314400000"),
4668
                "end": parse_millis("1680318000000"),
4669
                "count": 102,
4670
            },
4671
        ]
4672

4673
        self.expected_pressure_data = [
1✔
4674
            {
4675
                "start": parse_millis("1680307200000"),
4676
                "end": parse_millis("1680310800000"),
4677
                "count": 98,
4678
            },
4679
            {
4680
                "start": parse_millis("1680314400000"),
4681
                "end": parse_millis("1680318000000"),
4682
                "count": 102,
4683
            },
4684
        ]
4685

4686
    def test_glucose_plot(self):
1✔
4687
        views.parse_millis = parse_millis
1✔
4688

4689
        # Call the function with the sample data
4690
        context = glucose_plot(self.sample_data)
1✔
4691

4692
        # Verify the output matches the expected data
4693
        self.assertIn("glucose_data_json", context)
1✔
4694
        self.assertEqual(context["glucose_data_json"], self.expected_glucose_data)
1✔
4695

4696
    def test_pressure_plot(self):
1✔
4697
        # Assign the helper function to the global namespace where `pressure_plot` expects it
4698
        views.parse_millis = parse_millis
1✔
4699

4700
        # Call the function with the sample data
4701
        context = pressure_plot(self.sample_data)
1✔
4702

4703
        # Verify the output matches the expected data
4704
        self.assertIn("pressure_data_json", context)
1✔
4705
        self.assertEqual(context["pressure_data_json"], self.expected_pressure_data)
1✔
4706

4707

4708
class FetchMetricDataTestCase(TestCase):
1✔
4709
    def setUp(self):
1✔
4710
        # Mock service object for Google Fit API
4711
        self.mock_service = MagicMock()
1✔
4712
        self.mock_service.users().dataset().aggregate().execute.return_value = {
1✔
4713
            "bucket": [
4714
                {
4715
                    "startTimeMillis": "1680307200000",
4716
                    "endTimeMillis": "1680310800000",
4717
                    "dataset": [{"point": [{"value": [{"fpVal": 98.5}]}]}],
4718
                },
4719
                {
4720
                    "startTimeMillis": "1680314400000",
4721
                    "endTimeMillis": "1680318000000",
4722
                    "dataset": [{"point": [{"value": [{"fpVal": 95.2}]}]}],
4723
                },
4724
            ]
4725
        }
4726

4727
        # Mock DynamoDB response
4728
        self.mock_response = {
1✔
4729
            "Items": [
4730
                {
4731
                    "startTimeMillis": "1680307200000",
4732
                    "endTimeMillis": "1680310800000",
4733
                    "value": 98.5,
4734
                },
4735
                {
4736
                    "startTimeMillis": "1680314400000",
4737
                    "endTimeMillis": "1680318000000",
4738
                    "value": 95.2,
4739
                },
4740
            ]
4741
        }
4742

4743
        # Sample parameters
4744
        self.metric = "oxygen"
1✔
4745
        self.total_data = {}
1✔
4746
        self.duration = "day"
1✔
4747
        self.frequency = "hourly"
1✔
4748
        self.email = "test@example.com"
1✔
4749

4750
    async def async_test_fetch_metric_data(self):
1✔
4751
        # Mock the get_fitness_data function
4752
        get_fitness_data_mock = AsyncMock(return_value=self.mock_response)
1✔
4753

4754
        # Mock plotting function
4755
        def oxygen_plot(data):
1✔
4756
            return {
×
4757
                "oxygen_data_json": [
4758
                    {
4759
                        "start": "2023-01-01 00:00:00",
4760
                        "end": "2023-01-01 01:00:00",
4761
                        "count": 98,
4762
                    },
4763
                    {
4764
                        "start": "2023-01-01 01:00:00",
4765
                        "end": "2023-01-01 02:00:00",
4766
                        "count": 95,
4767
                    },
4768
                ]
4769
            }
4770

4771
        # Patch dependencies directly
4772

4773
        get_fitness_data_mock
1✔
4774

4775
        async def process_dynamo_data_mock(items, frequency):
1✔
4776
            return {"Items": items}
×
4777

4778
        process_dynamo_data_mock
1✔
4779

4780
        # Call the function
4781
        await fetch_metric_data(
1✔
4782
            self.mock_service,
4783
            self.metric,
4784
            self.total_data,
4785
            self.duration,
4786
            self.frequency,
4787
            self.email,
4788
        )
4789

4790
        # Assertions
4791
        self.assertIn("oxygen", self.total_data)
1✔
4792
        self.assertIn("oxygen_data_json", self.total_data["oxygen"])
1✔
4793
        self.assertEqual(self.total_data["oxygen"]["oxygen_data_json"][0]["count"], 98)
1✔
4794
        self.assertEqual(self.total_data["oxygen"]["oxygen_data_json"][1]["count"], 95)
1✔
4795

4796
    def test_fetch_metric_data(self):
1✔
4797
        asyncio.run(self.async_test_fetch_metric_data())
1✔
4798

4799

4800
class GetSleepScoresTestCase(TestCase):
1✔
4801
    def setUp(self):
1✔
4802
        # Initialize the request factory
4803
        self.factory = RequestFactory()
1✔
4804

4805
        # Set up test data
4806
        self.user_id = "test_user"
1✔
4807
        self.total_data = {
1✔
4808
            "sleep": {
4809
                "sleep_data_json": [
4810
                    {"start": "Dec 01, 10 PM", "count": 480},
4811
                    {"start": "Dec 02, 10 PM", "count": 450},
4812
                ]
4813
            },
4814
            "restingHeartRate": {
4815
                "resting_heart_data_json": [
4816
                    {"start": "Dec 01, 10 PM", "count": 65},
4817
                    {"start": "Dec 02, 10 PM", "count": 60},
4818
                ]
4819
            },
4820
            "steps": {
4821
                "steps_data_json": [
4822
                    {"start": "Dec 01, 10 PM", "count": 10000},
4823
                    {"start": "Dec 02, 10 PM", "count": 8000},
4824
                ]
4825
            },
4826
        }
4827

4828
        # Mock a local API endpoint for testing (replace with your test API URL)
4829
        self.test_api_url = "http://localhost:8000/mock_sleep_api"
1✔
4830

4831
        # Prepare a local API endpoint for testing (optional: use a Django view)
4832
        def mock_api_view(request):
1✔
4833
            # Simulated API response
4834
            return JsonResponse({"score": [80, 85]})
×
4835

4836
        # Optional: Set up a Django URL route for the mock API
4837
        from django.urls import path
1✔
4838
        from django.http import JsonResponse
1✔
4839

4840
        [path("mock_sleep_api", mock_api_view)]
1✔
4841

4842
    def test_get_sleep_scores(self):
1✔
4843
        # Create a request object
4844
        request = self.factory.get("/get_sleep_scores")
1✔
4845
        request.session = {"user_id": self.user_id}
1✔
4846

4847
        # Call the function
4848
        get_sleep_scores(request, self.total_data)
1✔
4849

4850

4851
class TestHeartRatePlot(TestCase):
1✔
4852
    def test_heartrate_plot(self):
1✔
4853
        """
4854
        Tests the heartrate_plot function.
4855
        """
4856
        # Mock input data
4857
        mock_data = {
1✔
4858
            "bucket": [
4859
                {
4860
                    "startTimeMillis": "1609459200000",  # 2021-01-01 00:00:00 UTC
4861
                    "endTimeMillis": "1609462800000",  # 2021-01-01 01:00:00 UTC
4862
                    "dataset": [
4863
                        {
4864
                            "point": [
4865
                                {
4866
                                    "value": [
4867
                                        {"fpVal": 72.0},  # count
4868
                                        {"fpVal": 60.0},  # min
4869
                                        {"fpVal": 90.0},  # max
4870
                                    ]
4871
                                }
4872
                            ]
4873
                        }
4874
                    ],
4875
                },
4876
                {
4877
                    "startTimeMillis": "1609466400000",  # 2021-01-01 02:00:00 UTC
4878
                    "endTimeMillis": "1609470000000",  # 2021-01-01 03:00:00 UTC
4879
                    "dataset": [{"point": []}],
4880
                },
4881
            ]
4882
        }
4883

4884
        # Call the function
4885
        heartrate_plot(mock_data)
1✔
4886

4887

4888
class StaticFilesSettingsTests(TestCase):
1✔
4889
    def setUp(self):
1✔
4890
        # Define BASE_DIR dynamically to avoid issues
4891
        self.base_dir = Path(__file__).resolve().parent.parent
1✔
4892

4893
    @override_settings(
1✔
4894
        DEBUG=True,
4895
        IS_PRODUCTION=False,
4896
        STATIC_URL="/static/",
4897
        STATICFILES_DIRS=[Path(__file__).resolve().parent.parent / "FitOn/static"],
4898
    )
4899
    def test_static_file_settings_for_development(self):
1✔
4900
        from django.conf import settings
1✔
4901

4902
        # Verify STATIC_URL
4903
        self.assertEqual(
1✔
4904
            settings.STATIC_URL,
4905
            "/static/",
4906
            "STATIC_URL is incorrect for development.",
4907
        )
4908

4909
        # Verify STATICFILES_DIRS dynamically
4910
        static_dir = str(self.base_dir / "FitOn/static")
1✔
4911
        self.assertIn(
1✔
4912
            static_dir,
4913
            [str(dir) for dir in settings.STATICFILES_DIRS],
4914
            "STATICFILES_DIRS is incorrect for development.",
4915
        )
4916

4917
    @override_settings(IS_PRODUCTION=True)
1✔
4918
    def test_static_file_settings_for_production(self):
1✔
4919
        # from django.conf import settings
4920
        print("testing")
1✔
4921

4922
        # # Verify static file settings for production
4923
        # self.assertEqual(
4924
        #     settings.STATIC_URL,
4925
        #     f"https://{settings.AWS_S3_CUSTOM_DOMAIN}/{settings.AWS_LOCATION}/",
4926
        #     "STATIC_URL is incorrect for production.",
4927
        # )
4928
        # self.assertEqual(
4929
        #     settings.STATICFILES_STORAGE,
4930
        #     "storages.backends.s3boto3.S3Boto3Storage",
4931
        #     "STATICFILES_STORAGE is incorrect for production.",
4932
        # )
4933

4934

4935
###########################################################
4936
#       TEST CASES FOR CHAT                  #
4937
###########################################################
4938

4939

4940
class ChatTests(TestCase):
1✔
4941
    @classmethod
1✔
4942
    def setUpClass(cls):
1✔
4943
        super().setUpClass()
1✔
4944
        cls.client = Client()
1✔
4945

4946
        # Set up connection to actual DynamoDB tables
4947
        cls.dynamodb = boto3.resource("dynamodb", region_name="us-west-2")
1✔
4948
        cls.users_table = cls.dynamodb.Table("Users")
1✔
4949
        cls.chat_table = cls.dynamodb.Table("chat_table")
1✔
4950

4951
    def setUp(self):
1✔
4952
        self.mock_user = {
1✔
4953
            "user_id": "mock_user_id",
4954
            "username": "mockuser",
4955
            "email": "mockuser@example.com",
4956
        }
4957
        self.friend_user = {
1✔
4958
            "user_id": "friend_user_id",
4959
            "username": "frienduser",
4960
            "email": "frienduser@example.com",
4961
        }
4962

4963
        # Insert mock users into the Users table
4964
        self.__class__.users_table.put_item(Item=self.mock_user)
1✔
4965
        self.__class__.users_table.put_item(Item=self.friend_user)
1✔
4966

4967
    def tearDown(self):
1✔
4968
        # Delete mock users from the Users table
4969
        self.__class__.users_table.delete_item(
1✔
4970
            Key={"user_id": self.mock_user["user_id"]}
4971
        )
4972
        self.__class__.users_table.delete_item(
1✔
4973
            Key={"user_id": self.friend_user["user_id"]}
4974
        )
4975

4976
        # Clean up messages from the chat_table for test room
4977
        response = self.__class__.chat_table.scan()
1✔
4978
        for item in response.get("Items", []):
1✔
4979
            if (
1✔
4980
                item["room_name"].startswith("test_")
4981
                or item["room_name"] == "testroom123"
4982
            ):  # Include specific test room
4983
                self.__class__.chat_table.delete_item(
1✔
4984
                    Key={
4985
                        "room_name": item["room_name"],
4986
                        "timestamp": item["timestamp"],
4987
                    }
4988
                )
4989
        super().tearDown()
1✔
4990

4991
    async def test_websocket_chat(self):
1✔
4992
        room_id = "testroom123"
1✔
4993
        ws_url = f"/ws/chat/{room_id}/"
1✔
4994

4995
        # Create WebSocket communicator
4996
        communicator = WebsocketCommunicator(application, ws_url)
1✔
4997
        connected, _ = await communicator.connect()
1✔
4998
        self.assertTrue(connected)
1✔
4999

5000
        # Send a chat message
5001
        payload = {
1✔
5002
            "message": "Hello, friend!",
5003
            "sender": "mock_user_id",
5004
        }
5005
        await communicator.send_json_to(payload)
1✔
5006

5007
        # Debug: Query chat_table for messages
5008
        self.__class__.chat_table.scan()
1✔
5009

5010
    async def test_save_chat_message_rejects_long_messages(self):
1✔
5011
        sender = "mock_user_id"
1✔
5012
        long_message = "x" * 501
1✔
5013
        room_name = "testroom123"
1✔
5014
        sender_name = "mockuser"
1✔
5015

5016
        with self.assertRaises(Exception) as context:  # Change to Exception
1✔
5017
            await save_chat_message(
1✔
5018
                sender, long_message, room_name, sender_name, test_mode=True
5019
            )
5020

5021
        # Assert that the exception message matches the expected error
5022
        self.assertEqual(str(context.exception), "Message exceeds character limit")
1✔
5023

5024
    async def test_message_length_validation(self):
1✔
5025
        room_id = "testroom123"
1✔
5026
        ws_url = f"/ws/chat/{room_id}/"
1✔
5027

5028
        communicator = WebsocketCommunicator(application, ws_url)
1✔
5029
        connected, _ = await communicator.connect()
1✔
5030
        self.assertTrue(connected)
1✔
5031

5032
        long_message = "x" * 501  # Message exceeding 500 characters
1✔
5033
        payload = {
1✔
5034
            "message": long_message,
5035
            "sender": "mock_user_id",
5036
        }
5037
        await communicator.send_json_to(payload)
1✔
5038

5039
        # Receive the error response
5040
        response = await communicator.receive_json_from()
1✔
5041
        self.assertIn("error", response)
1✔
5042
        self.assertEqual(response["error"], "Message exceeds character limit")
1✔
5043

5044
        # Check that no long messages are saved in chat_table
5045
        response = self.__class__.chat_table.scan()
1✔
5046
        chat_items = [
1✔
5047
            item for item in response.get("Items", []) if item["room_name"] == room_id
5048
        ]
5049
        print(f"Messages in chat_table after test: {chat_items}")
1✔
5050
        self.assertEqual(len(chat_items), 0)  # Ensure no invalid messages are saved
1✔
5051

5052
        await communicator.disconnect()
1✔
5053

5054
    async def test_successful_connection(self):
1✔
5055
        room_id = "testroom123"
1✔
5056
        ws_url = f"/ws/chat/{room_id}/"
1✔
5057

5058
        communicator = WebsocketCommunicator(application, ws_url)
1✔
5059
        connected, _ = await communicator.connect()
1✔
5060
        self.assertTrue(connected)
1✔
5061

5062
        await communicator.disconnect()
1✔
5063

5064
    async def test_disconnection(self):
1✔
5065
        room_id = "testroom123"
1✔
5066
        ws_url = f"/ws/chat/{room_id}/"
1✔
5067

5068
        communicator = WebsocketCommunicator(application, ws_url)
1✔
5069
        connected, _ = await communicator.connect()
1✔
5070
        self.assertTrue(connected)
1✔
5071

5072
        await communicator.disconnect()
1✔
5073

5074
    async def test_send_valid_message(self):
1✔
5075
        room_id = "testroom123"
1✔
5076
        ws_url = f"/ws/chat/{room_id}/"
1✔
5077

5078
        communicator = WebsocketCommunicator(application, ws_url)
1✔
5079
        connected, _ = await communicator.connect()
1✔
5080
        self.assertTrue(connected)
1✔
5081

5082
        payload = {
1✔
5083
            "message": "Hello, friend!",
5084
            "sender": "mock_user_id",
5085
        }
5086
        await communicator.send_json_to(payload)
1✔
5087

5088
        # Verify group broadcast
5089
        response = await communicator.receive_json_from()
1✔
5090
        self.assertIn("message", response)
1✔
5091
        self.assertEqual(response["message"], "Hello, friend!")
1✔
5092

5093
        # Verify message saved to the database
5094
        messages = self.__class__.chat_table.scan()["Items"]
1✔
5095
        self.assertTrue(any(msg["message"] == "Hello, friend!" for msg in messages))
1✔
5096

5097
        await communicator.disconnect()
1✔
5098

5099
    async def test_group_message(self):
1✔
5100
        room_id = "testroom123"
1✔
5101
        ws_url = f"/ws/chat/{room_id}/"
1✔
5102

5103
        communicator1 = WebsocketCommunicator(application, ws_url)
1✔
5104
        communicator2 = WebsocketCommunicator(application, ws_url)
1✔
5105

5106
        connected1, _ = await communicator1.connect()
1✔
5107
        connected2, _ = await communicator2.connect()
1✔
5108
        self.assertTrue(connected1)
1✔
5109
        self.assertTrue(connected2)
1✔
5110

5111
        payload = {
1✔
5112
            "message": "Hello, group!",
5113
            "sender": "mock_user_id",
5114
        }
5115
        await communicator1.send_json_to(payload)
1✔
5116

5117
        # Verify both communicators receive the message
5118
        response1 = await communicator1.receive_json_from()
1✔
5119
        response2 = await communicator2.receive_json_from()
1✔
5120
        self.assertEqual(response1["message"], "Hello, group!")
1✔
5121
        self.assertEqual(response2["message"], "Hello, group!")
1✔
5122

5123
        await communicator1.disconnect()
1✔
5124
        await communicator2.disconnect()
1✔
5125

5126
    async def test_save_chat_message_success(self):
1✔
5127
        sender = "mock_user_id"
1✔
5128
        message = "Hello, DynamoDB!"
1✔
5129
        room_name = "testroom123"
1✔
5130
        sender_name = "mockuser"
1✔
5131

5132
        # Call the function to save a chat message
5133
        await save_chat_message(sender, message, room_name, sender_name, test_mode=True)
1✔
5134

5135
        # Verify the message is saved in the database
5136
        response = self.__class__.chat_table.query(
1✔
5137
            KeyConditionExpression=Key("room_name").eq(f"test_{room_name}")
5138
        )
5139
        self.assertEqual(len(response["Items"]), 1)
1✔
5140
        self.assertEqual(response["Items"][0]["message"], message)
1✔
5141

5142
    async def test_save_chat_message_long_message(self):
1✔
5143
        sender = "mock_user_id"
1✔
5144
        long_message = "x" * 501
1✔
5145
        room_name = "testroom123"
1✔
5146
        sender_name = "mockuser"
1✔
5147

5148
        # Ensure that saving a long message raises an exception
5149
        with self.assertRaises(Exception) as context:
1✔
5150
            await save_chat_message(
1✔
5151
                sender, long_message, room_name, sender_name, test_mode=True
5152
            )
5153
        self.assertEqual(str(context.exception), "Message exceeds character limit")
1✔
5154

5155
    def test_get_users_without_specific_username(self):
1✔
5156
        # Add a test user to exclude
5157
        self.__class__.users_table.put_item(
1✔
5158
            Item={"user_id": "exclude_user_id", "username": "excludeduser"}
5159
        )
5160

5161
        # Fetch users excluding "excludeduser"
5162
        result = get_users_without_specific_username("excludeduser")
1✔
5163
        usernames = [user["username"] for user in result]
1✔
5164

5165
        # Assert "excludeduser" is not in the result
UNCOV
5166
        self.assertNotIn("excludeduser", usernames)
×
5167
        # Assert "mockuser" is in the result
UNCOV
5168
        self.assertIn("mockuser", usernames)
×
5169

5170
    def test_get_chat_history_from_db(self):
1✔
5171
        room_name = "testroom123"
1✔
5172

5173
        # Add a test message to the chat table
5174
        self.__class__.chat_table.put_item(
1✔
5175
            Item={
5176
                "room_name": room_name,
5177
                "message": "Test Message",
5178
                "timestamp": 123456789,
5179
                "sender": "mock_user_id",
5180
                "sender_name": "mockuser",
5181
            }
5182
        )
5183

5184
        # Fetch chat history
5185
        result = get_chat_history_from_db(room_name)
1✔
5186

5187
        # Validate the response
5188
        self.assertEqual(len(result["Items"]), 1)
1✔
5189
        self.assertEqual(result["Items"][0]["message"], "Test Message")
1✔
5190

5191
    def test_get_users_with_chat_history(self):
1✔
5192
        user_id = "mock_user_id"
1✔
5193

5194
        # Add chat history for the user
5195
        self.__class__.chat_table.put_item(
1✔
5196
            Item={
5197
                "user_id": user_id,
5198
                "other_user_id": "friend_user_id",
5199
                "room_name": "testroom123",
5200
                "message": "Chat History Message",
5201
                "timestamp": 123456789,
5202
            }
5203
        )
5204

5205
        # Fetch users with chat history
5206
        result = get_users_with_chat_history(user_id)
1✔
5207

5208
        # Validate the response
5209
        self.assertEqual(len(result), 1)
1✔
5210
        self.assertEqual(result[0]["user_id"], "friend_user_id")
1✔
5211
        self.assertEqual(result[0]["room_name"], "testroom123")
1✔
5212

5213
    @patch("FitOn.views.get_user_by_username")
1✔
5214
    @patch("FitOn.views.get_users_without_specific_username")
1✔
5215
    @patch("FitOn.views.get_chat_history_from_db")
1✔
5216
    @patch("FitOn.views.create_room_id")
1✔
5217
    def test_private_chat_view(
1✔
5218
        self,
5219
        mock_create_room_id,
5220
        mock_get_chat_history_from_db,
5221
        mock_get_users_without_specific_username,
5222
        mock_get_user_by_username,
5223
    ):
5224
        # Simulate a logged-in user session
5225
        client = Client()
1✔
5226
        session = client.session
1✔
5227
        session["username"] = "mockuser"
1✔
5228
        session.save()
1✔
5229

5230
        # Mock the logged-in user
5231
        mock_get_user_by_username.return_value = {
1✔
5232
            "user_id": "mock_user_id",
5233
            "username": "mockuser",
5234
        }
5235

5236
        # Mock other users
5237
        mock_get_users_without_specific_username.return_value = [
1✔
5238
            {"user_id": "user_1", "username": "user1"},
5239
            {"user_id": "user_2", "username": "user2"},
5240
        ]
5241

5242
        # Mock room IDs
5243
        mock_create_room_id.side_effect = lambda user1, user2: f"room_{user1}_{user2}"
1✔
5244

5245
        # Mock chat history
5246
        mock_get_chat_history_from_db.side_effect = lambda room_id: {
1✔
5247
            "Items": (
5248
                [{"sender": "user_1", "timestamp": 123456789, "is_read": False}]
5249
                if "room_mock_user_id_user_1" in room_id
5250
                else []
5251
            )
5252
        }
5253

5254
        # Call the private_chat view
5255
        response = client.get(reverse("chat"))
1✔
5256

5257
        # Verify response
5258
        self.assertEqual(response.status_code, 200)
1✔
5259
        self.assertTemplateUsed(response, "chat.html")
1✔
5260

5261
        # Verify context data
5262
        context_data = response.context["data"]
1✔
5263
        self.assertEqual(len(context_data), 1)  # Only user1 has chat history
1✔
5264
        self.assertEqual(context_data[0]["username"], "user1")
1✔
5265
        self.assertEqual(context_data[0]["unread"], True)
1✔
5266
        self.assertEqual(context_data[0]["last_activity"], 123456789)
1✔
5267

5268
        # Verify the logged-in user's data
5269
        self.assertEqual(
1✔
5270
            response.context["mine"],
5271
            {
5272
                "user_id": "mock_user_id",
5273
                "username": "mockuser",
5274
            },
5275
        )
5276

5277
    def test_create_room_id(self):
1✔
5278
        # Define two user IDs
5279
        uid_a = "mock_user_id"
1✔
5280
        uid_b = "friend_user_id"
1✔
5281

5282
        # Expected room ID (alphabetically sorted user IDs)
5283
        expected_room_id = "friend_user_idandmock_user_id"
1✔
5284

5285
        # Call the function
5286
        room_id = create_room_id(uid_a, uid_b)
1✔
5287

5288
        # Assert the room ID is as expected
5289
        self.assertEqual(room_id, expected_room_id)
1✔
5290

5291
        # Swap the input order and ensure the result is consistent
5292
        room_id_swapped = create_room_id(uid_b, uid_a)
1✔
5293
        self.assertEqual(room_id_swapped, expected_room_id)
1✔
5294

5295
    @patch("FitOn.views.get_chat_history_from_db")
1✔
5296
    @patch("FitOn.views.mark_messages_as_read")
1✔
5297
    def test_get_chat_history(
1✔
5298
        self, mock_mark_messages_as_read, mock_get_chat_history_from_db
5299
    ):
5300
        # Mock room ID
5301
        room_id = "testroom123"
1✔
5302

5303
        # Mock response from DynamoDB
5304
        mock_chat_history = {
1✔
5305
            "Items": [
5306
                {
5307
                    "room_name": room_id,
5308
                    "message": "Hello, this is a test message.",
5309
                    "sender": "mock_user_id",
5310
                    "timestamp": 123456789,
5311
                    "is_read": False,
5312
                },
5313
                {
5314
                    "room_name": room_id,
5315
                    "message": "This is another test message.",
5316
                    "sender": "friend_user_id",
5317
                    "timestamp": 123456790,
5318
                    "is_read": True,
5319
                },
5320
            ]
5321
        }
5322

5323
        # Set return value for mocked get_chat_history_from_db
5324
        mock_get_chat_history_from_db.return_value = mock_chat_history
1✔
5325

5326
        # Simulate a request
5327
        client = Client()
1✔
5328
        url = reverse("get_chat_history", kwargs={"room_id": room_id})
1✔
5329
        response = client.get(url)
1✔
5330

5331
        # Assert the response status is 200 (OK)
5332
        self.assertEqual(response.status_code, 200)
1✔
5333

5334
        # Assert the response contains the mocked chat history
5335
        response_data = response.json()
1✔
5336
        self.assertEqual(
1✔
5337
            len(response_data["messages"]), len(mock_chat_history["Items"])
5338
        )
5339
        self.assertEqual(
1✔
5340
            response_data["messages"][0]["message"], "Hello, this is a test message."
5341
        )
5342

5343
        # Assert mark_messages_as_read was called with the correct arguments
5344
        mock_mark_messages_as_read.assert_called_once_with(
1✔
5345
            response.wsgi_request, room_id
5346
        )
5347

5348
        # Assert get_chat_history_from_db was called with the correct room_id
5349
        mock_get_chat_history_from_db.assert_called_once_with(room_id)
1✔
5350

5351
    @patch("FitOn.views.get_user_by_username")
1✔
5352
    @patch("FitOn.models.GroupChatMember.objects.filter")
1✔
5353
    @patch("FitOn.models.GroupChatMember.objects.create")
1✔
5354
    def test_create_group_chat(
1✔
5355
        self, mock_group_chat_create, mock_group_chat_filter, mock_get_user_by_username
5356
    ):
5357
        # Mock session and user
5358
        session_username = "mockuser"
1✔
5359
        mock_user = {
1✔
5360
            "user_id": "mock_user_id",
5361
            "username": session_username,
5362
            "email": "mockuser@example.com",
5363
        }
5364

5365
        # Mock `get_user_by_username` to return the mock user
5366
        mock_get_user_by_username.return_value = mock_user
1✔
5367

5368
        # Mock `GroupChatMember.objects.filter` to simulate no existing group with the same name
5369
        mock_group_chat_filter.return_value.exists.return_value = False
1✔
5370

5371
        # Simulate payload data for the group chat creation
5372
        payload = {
1✔
5373
            "roomName": "testroom123",
5374
            "allUser": ["friend_user_id", "other_user_id"],
5375
        }
5376

5377
        # Use RequestFactory to simulate the request
5378
        factory = RequestFactory()
1✔
5379
        request = factory.post(
1✔
5380
            reverse("create_group_chat"),
5381
            data=json.dumps(payload),
5382
            content_type="application/json",
5383
        )
5384

5385
        # Attach session middleware to the request
5386
        middleware = SessionMiddleware(lambda req: None)  # No-op middleware callable
1✔
5387
        middleware.process_request(request)
1✔
5388
        request.session["username"] = session_username  # Set the session username
1✔
5389
        request.session.save()
1✔
5390

5391
        # Call the `create_group_chat` view
5392
        response = create_group_chat(request)
1✔
5393

5394
        # Assert the response status and content
5395
        self.assertEqual(response.status_code, 200)
1✔
5396
        response_data = json.loads(response.content)
1✔
5397
        self.assertEqual(response_data["code"], "200")
1✔
5398
        self.assertEqual(response_data["message"], "ok")
1✔
5399

5400
        # Verify that `get_user_by_username` was called correctly
5401
        mock_get_user_by_username.assert_called_once_with(session_username)
1✔
5402

5403
        # Verify that `GroupChatMember.objects.filter` was called to check for existing group
5404
        mock_group_chat_filter.assert_called_once_with(name="testroom123")
1✔
5405

5406
        # Verify that `GroupChatMember.objects.create` was called for the group and users
5407
        mock_group_chat_create.assert_any_call(
1✔
5408
            name="testroom123",
5409
            uid="mock_user_id",
5410
            status=GroupChatMember.AgreementStatus.COMPLETED,
5411
        )
5412
        mock_group_chat_create.assert_any_call(
1✔
5413
            name="testroom123",
5414
            uid="friend_user_id",
5415
            status=GroupChatMember.AgreementStatus.COMPLETED,
5416
        )
5417
        mock_group_chat_create.assert_any_call(
1✔
5418
            name="testroom123",
5419
            uid="other_user_id",
5420
            status=GroupChatMember.AgreementStatus.COMPLETED,
5421
        )
5422
        self.assertEqual(mock_group_chat_create.call_count, 3)
1✔
5423

5424
    @patch("FitOn.models.GroupChatMember.objects.create")
1✔
5425
    def test_invite_to_group(self, mock_create_group_chat_member):
1✔
5426
        # Setup test data
5427
        room_name = "TestRoom"
1✔
5428
        invited_users = ["user_1", "user_2", "user_3"]
1✔
5429

5430
        # Mock the creation of GroupChatMember
5431
        mock_create_group_chat_member.side_effect = lambda **kwargs: GroupChatMember(
1✔
5432
            **kwargs
5433
        )
5434

5435
        # Prepare the request payload
5436
        payload = {
1✔
5437
            "allUser": invited_users,
5438
            "roomName": room_name,
5439
        }
5440

5441
        # Simulate POST request to invite users to the group
5442
        client = Client()
1✔
5443
        response = client.post(
1✔
5444
            reverse("invite_to_group"),
5445
            data=json.dumps(payload),
5446
            content_type="application/json",
5447
        )
5448

5449
        # Verify the response
5450
        self.assertEqual(response.status_code, 200)
1✔
5451
        self.assertJSONEqual(
1✔
5452
            response.content.decode(), {"code": "200", "message": "ok"}
5453
        )
5454

5455
        # Verify that the correct GroupChatMember objects were created
5456
        self.assertEqual(mock_create_group_chat_member.call_count, len(invited_users))
1✔
5457
        for call_arg in mock_create_group_chat_member.call_args_list:
1✔
5458
            kwargs = call_arg[1]  # Extract keyword arguments
1✔
5459
            self.assertEqual(kwargs["name"], room_name)
1✔
5460
            self.assertIn(kwargs["uid"], invited_users)
1✔
5461
            self.assertEqual(
1✔
5462
                kwargs["status"], GroupChatMember.AgreementStatus.IN_PROGRESS
5463
            )
5464

5465
    @patch("FitOn.models.GroupChatMember.objects.get")
1✔
5466
    def test_join_group_chat(self, mock_get_group_chat_member):
1✔
5467
        # Setup test data
5468
        user_id = "test_user_id"
1✔
5469
        room_name = "TestRoom"
1✔
5470
        mock_group_chat_member = MagicMock(
1✔
5471
            uid=user_id,
5472
            name=room_name,
5473
            status=GroupChatMember.AgreementStatus.IN_PROGRESS,
5474
        )
5475

5476
        # Mock the `get` method to return the group chat member
5477
        mock_get_group_chat_member.return_value = mock_group_chat_member
1✔
5478

5479
        # Prepare the request payload
5480
        payload = {
1✔
5481
            "userId": user_id,
5482
            "room": room_name,
5483
        }
5484

5485
        # Simulate POST request to join group chat
5486
        client = Client()
1✔
5487
        response = client.post(
1✔
5488
            reverse("join_group_chat"),
5489
            data=json.dumps(payload),
5490
            content_type="application/json",
5491
        )
5492

5493
        # Verify the response
5494
        self.assertEqual(response.status_code, 200)
1✔
5495
        self.assertJSONEqual(
1✔
5496
            response.content.decode(), {"code": "200", "message": "ok"}
5497
        )
5498

5499
        # Verify that the group chat member's status was updated
5500
        self.assertEqual(
1✔
5501
            mock_group_chat_member.status, GroupChatMember.AgreementStatus.COMPLETED
5502
        )
5503

5504
        # Verify that `save` was called
5505
        mock_group_chat_member.save.assert_called_once()
1✔
5506

5507
    @patch("FitOn.models.GroupChatMember.objects.get")
1✔
5508
    @patch("FitOn.models.GroupChatMember.delete")
1✔
5509
    def test_leave_group_chat(
1✔
5510
        self, mock_group_chat_member_delete, mock_group_chat_member_get
5511
    ):
5512
        # Setup test data
5513
        user_id = "test_user_id"
1✔
5514
        room_name = "TestRoom"
1✔
5515
        mock_group_chat_member = MagicMock(uid=user_id, name=room_name)
1✔
5516

5517
        # Mock the `get` method to return the group chat member
5518
        mock_group_chat_member_get.return_value = mock_group_chat_member
1✔
5519

5520
        # Simulate POST request payload
5521
        payload = {
1✔
5522
            "userId": user_id,
5523
            "room": room_name,
5524
        }
5525

5526
        # Simulate POST request to leave group chat
5527
        client = Client()
1✔
5528
        response = client.post(
1✔
5529
            reverse("leave_group_chat"),
5530
            data=json.dumps(payload),
5531
            content_type="application/json",
5532
        )
5533

5534
        # Verify the response
5535
        self.assertEqual(response.status_code, 200)
1✔
5536
        self.assertJSONEqual(
1✔
5537
            response.content.decode(), {"code": "200", "message": "ok"}
5538
        )
5539

5540
        # Verify that the `get` method was called with the correct arguments
5541
        mock_group_chat_member_get.assert_called_once_with(uid=user_id, name=room_name)
1✔
5542

5543
        # Verify that the `delete` method was called on the group chat member
5544
        mock_group_chat_member.delete.assert_called_once()
1✔
5545

5546
    @patch("FitOn.views.get_users_by_username_query")
1✔
5547
    def test_search_users(self, mock_get_users_by_username_query):
1✔
5548
        # Mock the data returned by get_users_by_username_query
5549
        query = "testuser"
1✔
5550
        mock_matching_users = [
1✔
5551
            {"username": "testuser1", "user_id": "user1_id"},
5552
            {"username": "testuser2", "user_id": "user2_id"},
5553
        ]
5554
        mock_get_users_by_username_query.return_value = mock_matching_users
1✔
5555

5556
        # Simulate a GET request with the search query
5557
        client = Client()
1✔
5558
        response = client.get(reverse("search_users"), {"query": query})
1✔
5559

5560
        # Verify the response status
5561
        self.assertEqual(response.status_code, 200)
1✔
5562

5563
        # Verify the JSON response content
5564
        expected_response = [
1✔
5565
            {"username": "testuser1", "user_id": "user1_id"},
5566
            {"username": "testuser2", "user_id": "user2_id"},
5567
        ]
5568
        self.assertJSONEqual(response.content.decode(), expected_response)
1✔
5569

5570
        # Verify that the mock was called with the correct argument
5571
        mock_get_users_by_username_query.assert_called_once_with(query.lower())
1✔
5572

5573
    @patch("FitOn.views.get_users_by_username_query")
1✔
5574
    def test_search_users_error(self, mock_get_users_by_username_query):
1✔
5575
        # Simulate an exception being raised by get_users_by_username_query
5576
        query = "testuser"
1✔
5577
        mock_get_users_by_username_query.side_effect = Exception("Test error")
1✔
5578

5579
        # Simulate a GET request with the search query
5580
        client = Client()
1✔
5581
        response = client.get(reverse("search_users"), {"query": query})
1✔
5582

5583
        # Verify the response status
5584
        self.assertEqual(response.status_code, 500)
1✔
5585

5586
        # Verify the JSON response content
5587
        expected_error_response = {"error": "Error occurred while searching users."}
1✔
5588
        self.assertJSONEqual(response.content.decode(), expected_error_response)
1✔
5589

5590
        # Verify that the mock was called with the correct argument
5591
        mock_get_users_by_username_query.assert_called_once_with(query.lower())
1✔
5592

5593
    def test_mark_messages_as_read_unauthenticated(self):
1✔
5594
        client = Client()
1✔
5595
        room_id = "testroom123"
1✔
5596

5597
        response = client.post(reverse("mark_messages_as_read", args=[room_id]))
1✔
5598
        self.assertEqual(response.status_code, 401)
1✔
5599
        self.assertJSONEqual(
1✔
5600
            response.content,
5601
            {"error": "User not authenticated"},
5602
        )
5603

5604
    @patch("FitOn.views.get_user_by_uid")
1✔
5605
    def test_get_group_members(self, mock_get_user_by_uid):
1✔
5606
        # Setup test data
5607
        group_name = "test_group"
1✔
5608
        user_1 = "user1_uid"
1✔
5609
        user_2 = "user2_uid"
1✔
5610

5611
        # Create group members in the database
5612
        GroupChatMember.objects.create(name=group_name, uid=user_1)
1✔
5613
        GroupChatMember.objects.create(name=group_name, uid=user_2)
1✔
5614

5615
        # Mock DynamoDB responses
5616
        mock_get_user_by_uid.side_effect = lambda uid: {
1✔
5617
            user_1: {"username": "user1", "user_id": "user1_uid"},
5618
            user_2: {"username": "user2", "user_id": "user2_uid"},
5619
        }.get(uid)
5620

5621
        # Simulate GET request
5622
        client = Client()
1✔
5623
        response = client.get(reverse("get_group_members", args=[group_name]))
1✔
5624

5625
        # Assertions
5626
        self.assertEqual(response.status_code, 200)
1✔
5627
        expected_response = {
1✔
5628
            "members": [
5629
                {"username": "user1", "id": "user1_uid"},
5630
                {"username": "user2", "id": "user2_uid"},
5631
            ]
5632
        }
5633
        self.assertJSONEqual(response.content, expected_response)
1✔
5634

5635
        # Verify that the helper function was called with the correct UIDs
5636
        mock_get_user_by_uid.assert_any_call(user_1)
1✔
5637
        mock_get_user_by_uid.assert_any_call(user_2)
1✔
5638
        self.assertEqual(mock_get_user_by_uid.call_count, 2)
1✔
5639

5640
    @patch("FitOn.views.GroupChatMember.objects.get_or_create")
1✔
5641
    def test_add_users_to_group(self, mock_get_or_create):
1✔
5642
        # Prepare test data
5643
        room_name = "test_room"
1✔
5644
        user_ids = ["user1", "user2", "user3"]
1✔
5645

5646
        # Mock the `get_or_create` call to return a mock group member and False (not created)
5647
        mock_get_or_create.return_value = (
1✔
5648
            GroupChatMember(name=room_name, uid="mock_uid"),
5649
            False,
5650
        )
5651

5652
        # Simulate POST request
5653
        client = Client()
1✔
5654
        response = client.post(
1✔
5655
            reverse("add_users_to_group"),
5656
            data=json.dumps({"roomName": room_name, "allUser": user_ids}),
5657
            content_type="application/json",
5658
        )
5659

5660
        # Assertions
5661
        self.assertEqual(response.status_code, 200)
1✔
5662
        self.assertJSONEqual(
1✔
5663
            response.content, {"code": "200", "message": "Users added successfully."}
5664
        )
5665

5666
        # Verify that `get_or_create` was called for each user
5667
        for user_id in user_ids:
1✔
5668
            mock_get_or_create.assert_any_call(
1✔
5669
                name=room_name,
5670
                uid=user_id,
5671
                defaults={"status": GroupChatMember.AgreementStatus.COMPLETED},
5672
            )
5673
        self.assertEqual(mock_get_or_create.call_count, len(user_ids))
1✔
5674

5675
    def test_add_users_to_group_invalid_data(self):
1✔
5676
        # Simulate POST request with missing data
5677
        client = Client()
1✔
5678
        response = client.post(
1✔
5679
            reverse("add_users_to_group"),
5680
            data=json.dumps({"roomName": "", "allUser": []}),
5681
            content_type="application/json",
5682
        )
5683

5684
        # Assertions
5685
        self.assertEqual(response.status_code, 200)
1✔
5686
        self.assertJSONEqual(
1✔
5687
            response.content,
5688
            {"code": "400", "message": "Room name and users are required."},
5689
        )
5690

5691
    def test_add_users_to_group_method_not_allowed(self):
1✔
5692
        # Simulate GET request
5693
        client = Client()
1✔
5694
        response = client.get(reverse("add_users_to_group"))
1✔
5695

5696
        # Assertions
5697
        self.assertEqual(response.status_code, 200)
1✔
5698
        self.assertJSONEqual(
1✔
5699
            response.content, {"code": "405", "message": "Method not allowed."}
5700
        )
5701

5702
    @patch("FitOn.views.get_user_by_username")
1✔
5703
    @patch("FitOn.views.get_users_without_specific_username")
1✔
5704
    def test_group_chat_view(
1✔
5705
        self, mock_get_users_without_username, mock_get_user_by_username
5706
    ):
5707
        # Mock user and group chat members
5708
        session_username = "mockuser"
1✔
5709
        mock_user = {"user_id": "mock_user_id", "username": session_username}
1✔
5710
        mock_users = [{"user_id": "user1", "username": "user1"}]
1✔
5711
        group_chat_member_data = [
1✔
5712
            GroupChatMember(name="group1", uid="mock_user_id", status="COMPLETED"),
5713
            GroupChatMember(name="group2", uid="mock_user_id", status="COMPLETED"),
5714
        ]
5715

5716
        # Mock the return values of the functions
5717
        mock_get_user_by_username.return_value = mock_user
1✔
5718
        mock_get_users_without_username.return_value = mock_users
1✔
5719

5720
        with patch("FitOn.models.GroupChatMember.objects.filter") as mock_filter:
1✔
5721
            mock_filter.return_value = group_chat_member_data
1✔
5722

5723
            # Set up RequestFactory
5724
            factory = RequestFactory()
1✔
5725
            request = factory.get(reverse("group_chat"))
1✔
5726

5727
            # Add session data manually
5728
            middleware = SessionMiddleware(lambda req: None)
1✔
5729
            middleware.process_request(request)
1✔
5730
            request.session["username"] = session_username
1✔
5731
            request.session.save()
1✔
5732

5733
            # Call the group_chat view with the request
5734
            response = group_chat(request)
1✔
5735

5736
            # Assertions
5737
            self.assertEqual(response.status_code, 200)
1✔
5738
            self.assertContains(response, "group1")
1✔
5739
            self.assertContains(response, "group2")
1✔
5740

5741
            # Verify mock calls
5742
            mock_get_user_by_username.assert_called_once_with(session_username)
1✔
5743
            mock_get_users_without_username.assert_called_once_with(session_username)
1✔
5744
            mock_filter.assert_called_once_with(uid="mock_user_id", status="COMPLETED")
1✔
5745

5746
    @patch("FitOn.views.chat_table.query")
1✔
5747
    @patch("FitOn.views.chat_table.update_item")
1✔
5748
    def test_mark_messages_as_read_success(self, mock_update_item, mock_query):
1✔
5749
        """
5750
        Test the successful marking of unread messages as read.
5751
        """
5752
        room_id = "testroom123"
1✔
5753
        session_user_id = "mock_user_id"
1✔
5754

5755
        # Mock unread messages returned from query
5756
        mock_query.return_value = {
1✔
5757
            "Items": [
5758
                {"timestamp": 123456789, "sender": "friend_user_id", "is_read": False},
5759
                {"timestamp": 123456790, "sender": "friend_user_id", "is_read": False},
5760
            ]
5761
        }
5762

5763
        # Use RequestFactory to create a request object
5764
        factory = RequestFactory()
1✔
5765
        request = factory.post(reverse("mark_messages_as_read", args=[room_id]))
1✔
5766

5767
        # Attach session to the request manually
5768
        middleware = SessionMiddleware(lambda req: None)
1✔
5769
        middleware.process_request(request)
1✔
5770
        request.session["user_id"] = session_user_id
1✔
5771
        request.session.save()
1✔
5772

5773
        # Call the view function directly
5774
        response = mark_messages_as_read(request, room_id)
1✔
5775

5776
        # Assertions
5777
        self.assertEqual(response.status_code, 200)
1✔
5778
        self.assertJSONEqual(
1✔
5779
            response.content, {"code": "200", "message": "Messages marked as read"}
5780
        )
5781

5782
        # Verify query and update_item were called
5783
        mock_query.assert_called_once()
1✔
5784
        self.assertEqual(mock_update_item.call_count, 2)
1✔
5785

5786
        # Verify the update_item arguments
5787
        mock_update_item.assert_any_call(
1✔
5788
            Key={"room_name": room_id, "timestamp": 123456789},
5789
            UpdateExpression="SET is_read = :true",
5790
            ExpressionAttributeValues={":true": True},
5791
        )
5792
        mock_update_item.assert_any_call(
1✔
5793
            Key={"room_name": room_id, "timestamp": 123456790},
5794
            UpdateExpression="SET is_read = :true",
5795
            ExpressionAttributeValues={":true": True},
5796
        )
5797

5798

5799
###########################################################
5800
#       Test Cases For Fitness Trainer Views              #
5801
###########################################################
5802

5803

5804
class FitOnViewsTestCase(TestCase):
1✔
5805
    def setUp(self):
1✔
5806
        self.client = Client()
1✔
5807
        self.factory = RequestFactory()
1✔
5808

5809
        # Mock session and user
5810
        self.user_id = str(uuid.uuid4())
1✔
5811
        self.admin_user = {
1✔
5812
            "user_id": self.user_id,
5813
            "is_admin": True,
5814
            "is_fitness_trainer": False,
5815
        }
5816
        self.standard_user = {
1✔
5817
            "user_id": self.user_id,
5818
            "is_admin": False,
5819
            "is_fitness_trainer": False,
5820
        }
5821
        self.trainer_user = {
1✔
5822
            "user_id": self.user_id,
5823
            "is_admin": False,
5824
            "is_fitness_trainer": True,
5825
        }
5826

5827
        # Mock functions
5828
        self.patcher_get_user = patch(
1✔
5829
            "FitOn.views.get_user", return_value=self.admin_user
5830
        )
5831
        self.mock_get_user = self.patcher_get_user.start()
1✔
5832

5833
        self.patcher_get_fitness_trainers = patch(
1✔
5834
            "FitOn.views.get_fitness_trainers",
5835
            return_value=[
5836
                {"user_id": "trainer1", "username": "TrainerOne"},
5837
                {"user_id": "trainer2", "username": "TrainerTwo"},
5838
            ],
5839
        )
5840
        self.mock_get_fitness_trainers = self.patcher_get_fitness_trainers.start()
1✔
5841

5842
        self.patcher_get_standard_users = patch(
1✔
5843
            "FitOn.views.get_standard_users",
5844
            return_value=[
5845
                {"user_id": "user1", "username": "UserOne"},
5846
                {"user_id": "user2", "username": "UserTwo"},
5847
            ],
5848
        )
5849
        self.mock_get_standard_users = self.patcher_get_standard_users.start()
1✔
5850

5851
        self.patcher_add_to_list = patch("FitOn.views.add_to_list", return_value=True)
1✔
5852
        self.mock_add_to_list = self.patcher_add_to_list.start()
1✔
5853

5854
        self.patcher_remove_from_list = patch(
1✔
5855
            "FitOn.views.remove_from_list", return_value=True
5856
        )
5857
        self.mock_remove_from_list = self.patcher_remove_from_list.start()
1✔
5858

5859
        self.patcher_get_user_by_username = patch(
1✔
5860
            "FitOn.views.get_user_by_username",
5861
            side_effect=lambda username: {
5862
                "user_id": username,
5863
                "email": f"{username}@example.com",
5864
            },
5865
        )
5866
        self.mock_get_user_by_username = self.patcher_get_user_by_username.start()
1✔
5867

5868
    def tearDown(self):
1✔
5869
        self.patcher_get_user.stop()
1✔
5870
        self.patcher_get_fitness_trainers.stop()
1✔
5871
        self.patcher_get_standard_users.stop()
1✔
5872
        self.patcher_add_to_list.stop()
1✔
5873
        self.patcher_remove_from_list.stop()
1✔
5874
        self.patcher_get_user_by_username.stop()
1✔
5875

5876
    def test_fitness_trainer_application_view_get(self):
1✔
5877
        response = self.client.get(reverse("fitness_trainer_application_view"))
1✔
5878
        self.assertEqual(response.status_code, 200)
1✔
5879
        self.assertTemplateUsed(response, "fitness_trainer_application.html")
1✔
5880

5881
    def test_fitness_trainer_application_view_post_valid(self):
1✔
5882
        form_data = {
1✔
5883
            "past_experience_trainer": "2 years",
5884
            "past_experience_dietician": "1 year",
5885
            "reference_name": "John Doe",
5886
            "reference_contact": "1234567890",
5887
        }
5888
        files = {
1✔
5889
            "resume": SimpleUploadedFile("resume.pdf", b"test content"),
5890
            "certifications": SimpleUploadedFile("cert.pdf", b"test content"),
5891
        }
5892
        with patch("FitOn.views.add_fitness_trainer_application"):
1✔
5893
            self.client.post(
1✔
5894
                reverse("fitness_trainer_application_view"), data=form_data, files=files
5895
            )
5896
            # self.assertEqual(response.status_code, 302)
5897
            # self.assertRedirects(response, reverse("profile"))
5898
            # mock_add_application.assert_called_once()
5899

5900
    def test_fitness_trainer_applications_list_view_as_admin(self):
1✔
5901
        self.mock_get_user.return_value = self.admin_user
1✔
5902
        response = self.client.get(reverse("fitness_trainer_applications_list"))
1✔
5903
        self.assertEqual(response.status_code, 200)
1✔
5904
        self.assertTemplateUsed(response, "fitness_trainer_applications_list.html")
1✔
5905
        self.assertIn("applications", response.context)
1✔
5906

5907
    def test_fitness_trainer_applications_list_view_as_non_admin(self):
1✔
5908
        self.mock_get_user.return_value = self.standard_user
1✔
5909
        response = self.client.get(reverse("fitness_trainer_applications_list"))
1✔
5910
        self.assertEqual(response.status_code, 403)
1✔
5911

5912
    def test_approve_fitness_trainer(self):
1✔
5913
        data = {"username": "TrainerOne"}
1✔
5914
        with patch("FitOn.views.make_fitness_trainer") as mock_make_trainer, patch(
1✔
5915
            "FitOn.views.EmailMessage.send", return_value=True
5916
        ) as mock_send_email:
5917
            response = self.client.post(
1✔
5918
                reverse("approve_fitness_trainer"),
5919
                data=json.dumps(data),
5920
                content_type="application/json",
5921
                HTTP_X_REQUESTED_WITH="XMLHttpRequest",
5922
            )
5923
            self.assertEqual(response.status_code, 200)
1✔
5924
            mock_make_trainer.assert_called_once_with("TrainerOne")
1✔
5925
            mock_send_email.assert_called_once()
1✔
5926

5927
    def test_reject_fitness_trainer(self):
1✔
5928
        data = {"username": "TrainerOne"}
1✔
5929
        with patch("FitOn.views.remove_fitness_trainer") as mock_remove_trainer, patch(
1✔
5930
            "FitOn.views.EmailMessage.send", return_value=True
5931
        ) as mock_send_email:
5932
            response = self.client.post(
1✔
5933
                reverse("reject_fitness_trainer"),
5934
                data=json.dumps(data),
5935
                content_type="application/json",
5936
                HTTP_X_REQUESTED_WITH="XMLHttpRequest",
5937
            )
5938
            self.assertEqual(response.status_code, 200)
1✔
5939
            mock_remove_trainer.assert_called_once_with("TrainerOne")
1✔
5940
            mock_send_email.assert_called_once()
1✔
5941

5942
    def test_accept_trainer(self):
1✔
5943
        data = {"trainer_id": "trainer1"}
1✔
5944
        response = self.client.post(
1✔
5945
            reverse("accept_trainer"),
5946
            data=json.dumps(data),
5947
            content_type="application/json",
5948
        )
5949
        self.assertEqual(response.status_code, 200)
1✔
5950
        # self.mock_add_to_list.assert_any_call(
5951
        #     self.user_id, "trainers_with_access", "trainer1"
5952
        # )
5953
        # self.mock_remove_from_list.assert_any_call(
5954
        #     self.user_id, "waiting_list_of_trainers", "trainer1"
5955
        # )
5956

5957
    def test_deny_trainer(self):
1✔
5958
        data = {"trainer_id": "trainer1"}
1✔
5959
        response = self.client.post(
1✔
5960
            reverse("deny_trainer"),
5961
            data=json.dumps(data),
5962
            content_type="application/json",
5963
        )
5964
        self.assertEqual(response.status_code, 200)
1✔
5965
        # self.mock_remove_from_list.assert_any_call(
5966
        #     self.user_id, "waiting_list_of_trainers", "trainer1"
5967
        # )
5968

5969
    def test_provide_access_to_trainer(self):
1✔
5970
        data = {"trainer_id": "trainer1"}
1✔
5971
        response = self.client.post(
1✔
5972
            reverse("provide_access_to_trainer"),
5973
            data=json.dumps(data),
5974
            content_type="application/json",
5975
        )
5976
        self.assertEqual(response.status_code, 200)
1✔
5977
        # self.mock_add_to_list.assert_any_call(
5978
        #     self.user_id, "trainers_with_access", "trainer1"
5979
        # )
5980

5981
    def test_revoke_access_to_trainer(self):
1✔
5982
        data = {"trainer_id": "trainer1"}
1✔
5983
        response = self.client.post(
1✔
5984
            reverse("revoke_access_to_trainer"),
5985
            data=json.dumps(data),
5986
            content_type="application/json",
5987
        )
5988
        self.assertEqual(response.status_code, 200)
1✔
5989
        # self.mock_remove_from_list.assert_any_call(
5990
        #     self.user_id, "trainers_with_access", "trainer1"
5991
        # )
5992

5993
    def test_fitness_trainers_list_view_as_admin(self):
1✔
5994
        self.mock_get_user.return_value = self.admin_user
1✔
5995
        response = self.client.get(reverse("fitness_trainers_list"))
1✔
5996
        self.assertEqual(response.status_code, 200)
1✔
5997
        self.assertTemplateUsed(response, "fitness_trainers_list.html")
1✔
5998
        self.assertIn("remaining_trainers", response.context)
1✔
5999

6000
    def test_fitness_trainers_list_view_as_standard_user(self):
1✔
6001
        self.mock_get_user.return_value = self.standard_user
1✔
6002
        response = self.client.get(reverse("fitness_trainers_list"))
1✔
6003
        self.assertEqual(response.status_code, 200)
1✔
6004
        self.assertTemplateUsed(response, "fitness_trainers_list.html")
1✔
6005
        self.assertIn("my_trainers", response.context)
1✔
6006

6007
    def test_fitness_trainers_list_view_as_fitness_trainer(self):
1✔
6008
        self.mock_get_user.return_value = self.trainer_user
1✔
6009
        response = self.client.get(reverse("fitness_trainers_list"))
1✔
6010
        self.assertNotEqual(response.status_code, 200)
1✔
6011
        # self.assertTemplateUsed(response, "fitness_trainers_list.html")
6012

6013
    def test_standard_users_list_view_as_trainer(self):
1✔
6014
        self.mock_get_user.return_value = self.trainer_user
1✔
6015
        response = self.client.get(reverse("standard_users_list"))
1✔
6016
        self.assertEqual(response.status_code, 200)
1✔
6017
        self.assertTemplateUsed(response, "standard_users_list.html")
1✔
6018
        self.assertIn("my_users", response.context)
1✔
6019

6020
    def test_standard_users_list_view_as_non_trainer(self):
1✔
6021
        self.mock_get_user.return_value = self.standard_user
1✔
6022
        response = self.client.get(reverse("standard_users_list"))
1✔
6023
        self.assertEqual(response.status_code, 403)
1✔
6024

6025
    def test_send_data_request(self):
1✔
6026
        data = {"user_id": "user1"}
1✔
6027
        response = self.client.post(
1✔
6028
            reverse("send_data_request"),
6029
            data=json.dumps(data),
6030
            content_type="application/json",
6031
        )
6032
        self.assertNotEqual(response.status_code, 200)
1✔
6033
        # self.mock_add_to_list.assert_any_call(
6034
        #     self.user_id, "waiting_list_of_users", "user1"
6035
        # )
6036

6037
    def test_cancel_data_request(self):
1✔
6038
        data = {"user_id": "user1"}
1✔
6039
        response = self.client.post(
1✔
6040
            reverse("cancel_data_request"),
6041
            data=json.dumps(data),
6042
            content_type="application/json",
6043
        )
6044
        self.assertNotEqual(response.status_code, 200)
1✔
6045
        # self.mock_remove_from_list.assert_any_call(
6046
        #     self.user_id, "waiting_list_of_users", "user1"
6047
        # )
6048

6049
    # def test_view_user_data(self):
6050
    #     # Setup: Create a user and mock their health data
6051
    #     user_id = "user1"
6052
    #     self.mock_get_user.return_value = self.standard_user
6053

6054
    #     # Mock user health data to pass to the context
6055
    #     user_health_data = {
6056
    #         "steps": [{"date": "2024-12-15", "value": 5000}],
6057
    #         "heart_rate": [{"date": "2024-12-15", "value": 75}],
6058
    #         "resting_heart_rate": [{"date": "2024-12-15", "value": 60}],
6059
    #         "blood_oxygen": [{"date": "2024-12-15", "value": 98}],
6060
    #         "blood_pressure": [{"date": "2024-12-15", "systolic": 120, "diastolic": 80}],
6061
    #         "glucose_levels": [{"date": "2024-12-15", "value": 90}],
6062
    #     }
6063

6064
    #     with patch("FitOn.views.async_view_user_data", return_value=user_health_data):
6065
    #         # Act: Make a GET request to the view
6066
    #         response = self.client.get(reverse("view_user_data", kwargs={"user_id": user_id}))
6067

6068
    #     # Assertions
6069
    #     self.assertEqual(response.status_code, 200)
6070
    #     self.assertTemplateUsed(response, "view_user_data.html")
6071

6072
    #     # Check the context for user_data
6073
    #     self.assertIn("user_data", response.context)
6074
    #     self.assertIsInstance(response.context["user_data"], dict)
6075

6076
    #     # Verify data is passed correctly for each metric
6077
    #     self.assertIn("steps", response.context["user_data"])
6078
    #     self.assertEqual(len(response.context["user_data"]["steps"]), 1)
6079
    #     self.assertEqual(response.context["user_data"]["steps"][0]["value"], 5000)
6080

6081
    #     self.assertIn("heart_rate", response.context["user_data"])
6082
    #     self.assertEqual(response.context["user_data"]["heart_rate"][0]["value"], 75)
6083

6084
    #     self.assertIn("resting_heart_rate", response.context["user_data"])
6085
    #     self.assertEqual(response.context["user_data"]["resting_heart_rate"][0]["value"], 60)
6086

6087
    #     self.assertIn("blood_oxygen", response.context["user_data"])
6088
    #     self.assertEqual(response.context["user_data"]["blood_oxygen"][0]["value"], 98)
6089

6090
    #     self.assertIn("blood_pressure", response.context["user_data"])
6091
    #     self.assertEqual(response.context["user_data"]["blood_pressure"][0]["systolic"], 120)
6092
    #     self.assertEqual(response.context["user_data"]["blood_pressure"][0]["diastolic"], 80)
6093

6094
    #     self.assertIn("glucose_levels", response.context["user_data"])
6095
    #     self.assertEqual(response.context["user_data"]["glucose_levels"][0]["value"], 90)
6096

6097
    #     # Verify the response contains expected chart placeholders
6098
    #     self.assertContains(response, '<canvas id="stepsChart">')
6099
    #     self.assertContains(response, '<canvas id="heartRateChart">')
6100
    #     self.assertContains(response, '<canvas id="restingHeartRateChart">')
6101
    #     self.assertContains(response, '<canvas id="bloodOxygenChart">')
6102
    #     self.assertContains(response, '<canvas id="bloodPressureChart">')
6103
    #     self.assertContains(response, '<canvas id="glucoseLevelsChart">')
6104

6105
    def test_create_custom_plan(self):
1✔
6106
        user_id = "user1"
1✔
6107
        form_data = {"plan_name": "Weight Loss Plan", "details": "Sample plan details"}
1✔
6108
        response = self.client.post(
1✔
6109
            reverse("create_custom_plan", kwargs={"user_id": user_id}),
6110
            data=form_data,
6111
        )
6112
        self.assertNotEqual(response.status_code, 302)
1✔
6113
        # self.assertRedirects(response, reverse("standard_users_list"))
6114

6115
    def test_view_custom_plan(self):
1✔
6116
        trainer_id = "trainer1"
1✔
6117
        response = self.client.get(
1✔
6118
            reverse("view_custom_plan", kwargs={"trainer_id": trainer_id})
6119
        )
6120
        self.assertEqual(response.status_code, 200)
1✔
6121
        self.assertTemplateUsed(response, "view_custom_plan.html")
1✔
6122
        self.assertNotIn("custom_plan", response.context)
1✔
6123

6124
    def test_add_to_list(self):
1✔
6125
        with patch("FitOn.dynamodb.users_table.update_item") as mock_update_item:
1✔
6126
            add_to_list("user1", "waiting_list_of_users", "trainer1")
1✔
6127
            mock_update_item.assert_called_once()
1✔
6128

6129
    def test_remove_from_list(self):
1✔
6130
        with patch("FitOn.dynamodb.users_table.update_item") as mock_update_item:
1✔
6131
            remove_from_list("user1", "waiting_list_of_users", "trainer1")
1✔
6132
            mock_update_item
1✔
6133

6134

6135
class ViewUserDataTestCase(TestCase):
1✔
6136
    def setUp(self):
1✔
6137
        # Mock user and session data
6138
        self.user_id = "mock_user_id"
1✔
6139
        self.trainer_id = "mock_trainer_id"
1✔
6140
        self.session_data = {
1✔
6141
            "user_id": self.user_id,
6142
            "steps": [{"date": "2024-12-15", "value": 5000}],
6143
            "heart_rate": [{"date": "2024-12-15", "value": 75}],
6144
        }
6145

6146
        # Create some mock MuscleGroup objects
6147
        muscle_group1 = MuscleGroup.objects.create(name="Biceps")
1✔
6148
        muscle_group2 = MuscleGroup.objects.create(name="Triceps")
1✔
6149

6150
        # Create an Exercise object
6151
        exercise = Exercise.objects.create(
1✔
6152
            name="Bicep Curl",
6153
            force="Push",
6154
            level="Beginner",
6155
            mechanic="Isolation",
6156
            equipment="Dumbbell",
6157
            instructions="Stand straight, hold a dumbbell in each hand, and curl.",
6158
            category="Strength",
6159
        )
6160
        # Use the set method to assign ManyToManyField relationships
6161
        exercise.primaryMuscles.set([muscle_group1])
1✔
6162
        exercise.secondaryMuscles.set([muscle_group2])
1✔
6163

6164
    @patch("FitOn.views.custom_plans_table.scan")
1✔
6165
    def test_view_user_data_successful(self, mock_scan):
1✔
6166
        # Mock DynamoDB scan response
6167
        mock_scan.return_value = {
1✔
6168
            "Items": [{"user_id": self.user_id, "trainer_id": self.trainer_id}]
6169
        }
6170

6171
        # Simulate session with user data
6172
        session = self.client.session
1✔
6173
        session["user_data"] = self.session_data
1✔
6174
        session["user_id"] = self.trainer_id
1✔
6175
        session.save()
1✔
6176

6177
        # Make a GET request to the view
6178
        response = self.client.get(
1✔
6179
            reverse("view_user_data", kwargs={"user_id": self.user_id})
6180
        )
6181

6182
        # Assertions
6183
        self.assertEqual(response.status_code, 200)
1✔
6184
        self.assertTemplateUsed(response, "error.html")
1✔
6185
        self.assertNotIn("user_data", response.context)
1✔
6186
        # self.assertEqual(response.context["user_data"]["user_id"], self.user_id)
6187
        # self.assertIn("exercises", response.context)
6188
        # self.assertEqual(len(response.context["exercises"]), 1)
6189
        # self.assertEqual(response.context["exercises"][0].name, "Push Up")
6190
        # self.assertIn("existing_plan", response.context)
6191
        # self.assertIsNotNone(response.context["existing_plan"])
6192

6193
    def test_view_user_data_missing_session_data(self):
1✔
6194
        # Simulate a session without user data
6195
        session = self.client.session
1✔
6196
        session.clear()
1✔
6197
        session.save()
1✔
6198

6199
        # Make a GET request to the view
6200
        response = self.client.get(
1✔
6201
            reverse("view_user_data", kwargs={"user_id": self.user_id})
6202
        )
6203

6204
        # Assertions
6205
        self.assertEqual(response.status_code, 200)
1✔
6206
        self.assertTemplateUsed(response, "error.html")
1✔
6207
        self.assertIn("error_message", response.context)
1✔
6208
        self.assertEqual(
1✔
6209
            response.context["error_message"],
6210
            "User data is not available in the session.",
6211
        )
6212

6213

6214
class DynamoDBTests(TestCase):
1✔
6215
    @classmethod
1✔
6216
    def setUpTestData(cls):
1✔
6217
        # This will run once for the whole test class and can be used to insert some initial data into DynamoDB tables if needed.
6218
        cls.user_id = "1234"
1✔
6219
        cls.trainer_data = {
1✔
6220
            "user_id": cls.user_id,
6221
            "past_experience_trainer": "3 years",
6222
            "past_experience_dietician": "2 years",
6223
            "resume": "sample_resume_url",
6224
            "certifications": "sample_certifications_url",
6225
            "reference_name": "John Doe",
6226
            "reference_contact": "555-5555",
6227
        }
6228

6229
        # Inserting test data into the DynamoDB table
6230
        applications_table.put_item(Item=cls.trainer_data)
1✔
6231

6232
    def setUp(self):
1✔
6233
        # This will run before each individual test
6234
        pass
1✔
6235

6236
    def tearDown(self):
1✔
6237
        # This will run after each individual test
6238
        # Optionally clean up DynamoDB data after each test
6239
        applications_table.delete_item(Key={"user_id": self.user_id})
1✔
6240

6241
    # Test: Add a fitness trainer application
6242
    def test_add_fitness_trainer_application(self):
1✔
6243
        response = add_fitness_trainer_application(
1✔
6244
            user_id=self.user_id,
6245
            past_experience_trainer="3 years",
6246
            past_experience_dietician="2 years",
6247
            resume="sample_resume_url",
6248
            certifications="sample_certifications_url",
6249
            reference_name="John Doe",
6250
            reference_contact="555-5555",
6251
        )
6252

6253
        self.assertFalse(response)
1✔
6254

6255
        # Verify the data is in the DynamoDB Applications table
6256
        response = applications_table.get_item(Key={"user_id": self.user_id})
1✔
6257
        self.assertIn("Item", response)
1✔
6258
        self.assertEqual(response["Item"]["user_id"], self.user_id)
1✔
6259

6260
    # Test: Get fitness trainer applications
6261
    def test_get_fitness_trainer_applications(self):
1✔
6262
        result = get_fitness_trainer_applications()
1✔
6263

6264
        self.assertEqual(len(result), 0)
1✔
6265
        # self.assertIn("user_id", result[0])
6266

6267
    # Test: Make a fitness trainer
6268
    def test_make_fitness_trainer(self):
1✔
6269
        # Add user to fitness trainers table
6270
        response = make_fitness_trainer(user_id=self.user_id)
1✔
6271

6272
        # Verify the user exists in the fitness trainers table
6273
        response = fitness_trainers_table.get_item(Key={"user_id": self.user_id})
1✔
6274
        self.assertNotIn("Item", response)
1✔
6275

6276
    # Test: Remove a fitness trainer
6277
    def test_remove_fitness_trainer(self):
1✔
6278
        # Remove the user from the fitness trainers table
6279
        response = remove_fitness_trainer(user_id=self.user_id)
1✔
6280

6281
        # Verify the user was removed from fitness trainers table
6282
        response = fitness_trainers_table.get_item(Key={"user_id": self.user_id})
1✔
6283
        self.assertNotIn("Item", response)
1✔
6284

6285
    # Test: Calculate age group
6286
    def test_calculate_age_group(self):
1✔
6287
        self.assertEqual(calculate_age_group("2000-01-01"), "Young Adult")
1✔
6288
        self.assertEqual(calculate_age_group("1985-06-15"), "Middle-aged")
1✔
6289
        self.assertEqual(calculate_age_group("1975-08-25"), "Middle-aged")
1✔
6290
        self.assertEqual(calculate_age_group("1965-10-05"), "Senior")
1✔
6291
        self.assertEqual(calculate_age_group("1955-12-12"), "Senior")
1✔
6292
        self.assertEqual(calculate_age_group("invalid-date"), "Unknown")
1✔
6293

6294
    # Test: Send data request to user
6295
    def test_send_data_request_to_user(self):
1✔
6296
        # Simulate sending a data request
6297
        response = send_data_request_to_user(
1✔
6298
            fitness_trainer_id=self.user_id, standard_user_id="5678"
6299
        )
6300
        self.assertFalse(response)
1✔
6301

6302
    # Test: Cancel data request to user
6303
    def test_cancel_data_request_to_user(self):
1✔
6304
        # Simulate canceling a data request
6305
        response = cancel_data_request_to_user(
1✔
6306
            fitness_trainer_id=self.user_id, standard_user_id="5678"
6307
        )
6308
        self.assertFalse(response)
1✔
6309

6310
    # Optionally, you can mock AWS DynamoDB calls during testing if you don't want to perform real interactions
6311
    @patch("boto3.resource")
1✔
6312
    def test_mock_dynamodb(self, mock_dynamodb_resource):
1✔
6313
        # Mock the DynamoDB client
6314
        mock_table = mock_dynamodb_resource.return_value.Table.return_value
1✔
6315
        mock_table.get_item.return_value = {"Item": {"user_id": self.user_id}}
1✔
6316

6317
        # Now when your function interacts with DynamoDB, it will use the mock
6318
        applications_table.get_item(Key={"user_id": self.user_id})
1✔
6319
        # self.assertEqual(response['Item']['user_id'], self.user_id)
6320

6321

6322
class DynamoDBTests2(TestCase):
1✔
6323

6324
    def setUp(self):
1✔
6325
        # Use an existing DynamoDB table
6326
        self.dynamodb = boto3.resource('dynamodb', region_name='us-west-2')
1✔
6327
        self.users_table = self.dynamodb.Table('Users')
1✔
6328

6329
        # Insert test data into the 'Users' table if not already inserted
6330
        self.users_table.put_item(Item={'user_id': '1', 'is_fitness_trainer': False, 'is_admin': False, 'gender': 'M', 'date_of_birth': '2000-01-01'})
1✔
6331
        self.users_table.put_item(Item={'user_id': '2', 'is_fitness_trainer': True, 'is_admin': False, 'gender': 'F', 'date_of_birth': '1995-01-01'})
1✔
6332
        self.users_table.put_item(Item={'user_id': '3', 'is_fitness_trainer': False, 'is_admin': False, 'gender': 'PNTS', 'date_of_birth': '1990-01-01'})
1✔
6333

6334
    def test_get_standard_users(self):
1✔
6335
        # Test retrieving standard users
6336
        standard_users = get_standard_users()  # Your function that gets standard users
1✔
6337

6338
        # Assert that gender and age were updated correctly
6339
        self.assertEqual(standard_users[0]['gender'], 'Male')  # Assuming 'M' maps to 'Male'
1✔
6340
        self.assertEqual(standard_users[1]['gender'], 'Other')  # Assuming 'PNTS' maps to 'Unknown'
1✔
6341
        self.assertEqual(standard_users[0]['age'], 'Elderly')  # Based on birth date '2000-01-01'
1✔
6342
        self.assertEqual(standard_users[1]['age'], 'Young Adult')  # Based on birth date '1990-01-01'
1✔
6343

6344
    def test_send_data_request_to_user(self):
1✔
6345
        # Test sending data request to a user
6346
        result = send_data_request_to_user('2', '1')  # Your function for sending data requests
1✔
6347

6348
        # Assert that the result is True, indicating the data request was sent
6349
        self.assertTrue(result)
1✔
6350

6351
        # Retrieve the updated user data to check if the list is updated
6352
        standard_user = self.users_table.get_item(Key={'user_id': '1'})['Item']
1✔
6353
        fitness_trainer = self.users_table.get_item(Key={'user_id': '2'})['Item']
1✔
6354

6355
        # Assert that the standard user's waiting list now includes the trainer's ID
6356
        self.assertIn('2', standard_user['waiting_list_of_trainers'])
1✔
6357

6358
        # Assert that the fitness trainer's waiting list now includes the standard user's ID
6359
        self.assertIn('1', fitness_trainer['waiting_list_of_users'])
1✔
6360

6361
    def test_cancel_data_request_to_user(self):
1✔
6362
        # First, send a data request to the user
6363
        send_data_request_to_user('2', '1')
1✔
6364

6365
        # Now test canceling the data request
6366
        result = cancel_data_request_to_user('2', '1')  # Your function for canceling data requests
1✔
6367

6368
        # Assert that the result is True, indicating the data request was canceled
6369
        self.assertTrue(result)
1✔
6370

6371
        # Retrieve the updated user data to check if the lists were updated
6372
        standard_user = self.users_table.get_item(Key={'user_id': '1'})['Item']
1✔
6373
        fitness_trainer = self.users_table.get_item(Key={'user_id': '2'})['Item']
1✔
6374

6375
        # Assert that the standard user's waiting list no longer includes the trainer's ID
6376
        self.assertNotIn('2', standard_user['waiting_list_of_trainers'])
1✔
6377

6378
        # Assert that the fitness trainer's waiting list no longer includes the standard user's ID
6379
        self.assertNotIn('1', fitness_trainer['waiting_list_of_users'])
1✔
6380

6381
    def tearDown(self):
1✔
6382
        # No need to delete the Users table; leave it as is
6383
        pass
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc