• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 18005d26-875b-4397-9529-ebc464e07316

16 May 2025 06:29PM UTC coverage: 85.352% (+0.03%) from 85.323%
18005d26-875b-4397-9529-ebc464e07316

Pull #5572

circleci

groovecoder
for MPP-3439: add IDNAEmailCleaner to clean email address domains with non-ASCII chars
Pull Request #5572: for MPP-3439: add IDNAEmailCleaner to clean email address domains with non-ASCII chars

2471 of 3617 branches covered (68.32%)

Branch coverage included in aggregate %.

58 of 59 new or added lines in 4 files covered. (98.31%)

61 existing lines in 2 files now uncovered.

17561 of 19853 relevant lines covered (88.46%)

9.58 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.71
/emails/views.py
1
import html
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5
import shlex
1✔
6
from datetime import UTC, datetime
1✔
7
from email import message_from_bytes
1✔
8
from email.iterators import _structure
1✔
9
from email.message import EmailMessage
1✔
10
from email.utils import parseaddr
1✔
11
from io import StringIO
1✔
12
from json import JSONDecodeError
1✔
13
from textwrap import dedent
1✔
14
from typing import Any, Literal, NamedTuple, TypedDict, TypeVar
1✔
15
from urllib.parse import urlencode
1✔
16
from uuid import uuid4
1✔
17

18
from django.conf import settings
1✔
19
from django.contrib.auth.models import User
1✔
20
from django.core.exceptions import ObjectDoesNotExist
1✔
21
from django.db import transaction
1✔
22
from django.db.models import prefetch_related_objects
1✔
23
from django.http import HttpRequest, HttpResponse
1✔
24
from django.shortcuts import render
1✔
25
from django.template.loader import render_to_string
1✔
26
from django.utils.html import escape
1✔
27
from django.views.decorators.csrf import csrf_exempt
1✔
28

29
from botocore.exceptions import ClientError
1✔
30
from codetiming import Timer
1✔
31
from decouple import strtobool
1✔
32
from markus.utils import generate_tag
1✔
33
from sentry_sdk import capture_message
1✔
34
from waffle import get_waffle_flag_model, sample_is_active
1✔
35

36
from privaterelay.ftl_bundles import main as ftl_bundle
1✔
37
from privaterelay.models import Profile
1✔
38
from privaterelay.utils import (
1✔
39
    flag_is_active_in_task,
40
    get_subplat_upgrade_link_by_language,
41
    glean_logger,
42
)
43

44
from .exceptions import CannotMakeAddressException
1✔
45
from .models import (
1✔
46
    DeletedAddress,
47
    DomainAddress,
48
    RelayAddress,
49
    Reply,
50
    address_hash,
51
    get_domain_numerical,
52
)
53
from .policy import relay_policy
1✔
54
from .sns import SUPPORTED_SNS_TYPES, verify_from_sns
1✔
55
from .types import (
1✔
56
    AWS_MailJSON,
57
    AWS_SNSMessageJSON,
58
    EmailForwardingIssues,
59
    EmailHeaderIssues,
60
    OutgoingHeaders,
61
)
62
from .utils import (
1✔
63
    InvalidFromHeader,
64
    _get_bucket_and_key_from_s3_json,
65
    b64_lookup_key,
66
    count_all_trackers,
67
    decrypt_reply_metadata,
68
    derive_reply_keys,
69
    encode_dict_gza85,
70
    encrypt_reply_metadata,
71
    generate_from_header,
72
    get_domains_from_settings,
73
    get_message_content_from_s3,
74
    get_message_id_bytes,
75
    get_reply_to_address,
76
    histogram_if_enabled,
77
    incr_if_enabled,
78
    parse_email_header,
79
    remove_message_from_s3,
80
    remove_trackers,
81
    ses_send_raw_email,
82
    urlize_and_linebreaks,
83
)
84

85
logger = logging.getLogger("events")
1✔
86
info_logger = logging.getLogger("eventsinfo")
1✔
87

88

89
class ReplyHeadersNotFound(Exception):
1✔
90
    def __init__(self, message="No In-Reply-To or References headers."):
1✔
91
        self.message = message
1✔
92

93

94
def first_time_user_test(request):
1✔
95
    """
96
    Demonstrate rendering of the "First time Relay user" email.
97
    Settings like language can be given in the querystring, otherwise settings
98
    come from a random free profile.
99
    """
UNCOV
100
    in_bundle_country = strtobool(request.GET.get("in_bundle_country", "yes"))
×
101
    email_context = {
×
102
        "in_bundle_country": in_bundle_country,
103
        "SITE_ORIGIN": settings.SITE_ORIGIN,
104
    }
UNCOV
105
    if request.GET.get("format", "html") == "text":
×
106
        return render(
×
107
            request,
108
            "emails/first_time_user.txt",
109
            email_context,
110
            "text/plain; charset=utf-8",
111
        )
UNCOV
112
    return render(request, "emails/first_time_user.html", email_context)
×
113

114

115
def reply_requires_premium_test(request):
1✔
116
    """
117
    Demonstrate rendering of the "Reply requires premium" email.
118

119
    Settings like language can be given in the querystring, otherwise settings
120
    come from a random free profile.
121
    """
122
    email_context = {
1✔
123
        "sender": "test@example.com",
124
        "forwarded": True,
125
        "SITE_ORIGIN": settings.SITE_ORIGIN,
126
    }
127
    for param in request.GET:
1✔
128
        email_context[param] = request.GET.get(param)
1✔
129
        if param == "forwarded" and request.GET[param] == "True":
1✔
130
            email_context[param] = True
1✔
131

132
    for param in request.GET:
1✔
133
        if param == "content-type" and request.GET[param] == "text/plain":
1✔
134
            return render(
1✔
135
                request,
136
                "emails/reply_requires_premium.txt",
137
                email_context,
138
                "text/plain; charset=utf-8",
139
            )
140
    return render(request, "emails/reply_requires_premium.html", email_context)
1✔
141

142

143
def disabled_mask_for_spam_test(request):
1✔
144
    """
145
    Demonstrate rendering of the "Disabled mask for spam" email.
146

147
    Settings like language can be given in the querystring, otherwise settings
148
    come from a random free profile.
149
    """
UNCOV
150
    mask = "abc123456@mozmail.com"
×
151
    email_context = {
×
152
        "mask": mask,
153
        "SITE_ORIGIN": settings.SITE_ORIGIN,
154
    }
UNCOV
155
    for param in request.GET:
×
156
        email_context[param] = request.GET.get(param)
×
157

UNCOV
158
    for param in request.GET:
×
159
        if param == "content-type" and request.GET[param] == "text/plain":
×
160
            return render(
×
161
                request,
162
                "emails/disabled_mask_for_spam.txt",
163
                email_context,
164
                "text/plain; charset=utf-8",
165
            )
UNCOV
166
    return render(request, "emails/disabled_mask_for_spam.html", email_context)
×
167

168

169
def first_forwarded_email_test(request: HttpRequest) -> HttpResponse:
1✔
170
    # TO DO: Update with correct context when trigger is created
UNCOV
171
    first_forwarded_email_html = render_to_string(
×
172
        "emails/first_forwarded_email.html",
173
        {
174
            "SITE_ORIGIN": settings.SITE_ORIGIN,
175
        },
176
    )
177

UNCOV
178
    wrapped_email = wrap_html_email(
×
179
        first_forwarded_email_html,
180
        "en-us",
181
        True,
182
        "test@example.com",
183
        0,
184
    )
185

UNCOV
186
    return HttpResponse(wrapped_email)
×
187

188

189
def wrap_html_email(
1✔
190
    original_html: str,
191
    language: str,
192
    has_premium: bool,
193
    display_email: str,
194
    num_level_one_email_trackers_removed: int | None = None,
195
    tracker_report_link: str | None = None,
196
) -> str:
197
    """Add Relay banners, surveys, etc. to an HTML email"""
198
    subplat_upgrade_link = get_subplat_upgrade_link_by_language(language)
1✔
199
    email_context = {
1✔
200
        "original_html": original_html,
201
        "language": language,
202
        "has_premium": has_premium,
203
        "subplat_upgrade_link": subplat_upgrade_link,
204
        "display_email": display_email,
205
        "tracker_report_link": tracker_report_link,
206
        "num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
207
        "SITE_ORIGIN": settings.SITE_ORIGIN,
208
    }
209
    content = render_to_string("emails/wrapped_email.html", email_context)
1✔
210
    # Remove empty lines
211
    content_lines = [line for line in content.splitlines() if line.strip()]
1✔
212
    return "\n".join(content_lines) + "\n"
1✔
213

214

215
def wrapped_email_test(request: HttpRequest) -> HttpResponse:
1✔
216
    """
217
    Demonstrate rendering of forwarded HTML emails.
218

219
    Settings like language can be given in the querystring, otherwise settings
220
    come from a randomly chosen profile.
221
    """
222

223
    if all(key in request.GET for key in ("language", "has_premium")):
1✔
224
        user_profile = None
1✔
225
    else:
226
        user_profile = Profile.objects.order_by("?").first()
1✔
227

228
    if "language" in request.GET:
1✔
229
        language = request.GET["language"]
1✔
230
    else:
231
        if user_profile is None:
1!
UNCOV
232
            raise ValueError("user_profile must not be None")
×
233
        language = user_profile.language
1✔
234

235
    if "has_premium" in request.GET:
1✔
236
        has_premium = strtobool(request.GET["has_premium"])
1✔
237
    else:
238
        if user_profile is None:
1!
UNCOV
239
            raise ValueError("user_profile must not be None")
×
240
        has_premium = user_profile.has_premium
1✔
241

242
    if "num_level_one_email_trackers_removed" in request.GET:
1✔
243
        num_level_one_email_trackers_removed = int(
1✔
244
            request.GET["num_level_one_email_trackers_removed"]
245
        )
246
    else:
247
        num_level_one_email_trackers_removed = 0
1✔
248

249
    if "has_tracker_report_link" in request.GET:
1✔
250
        has_tracker_report_link = strtobool(request.GET["has_tracker_report_link"])
1✔
251
    else:
252
        has_tracker_report_link = False
1✔
253
    if has_tracker_report_link:
1✔
254
        if num_level_one_email_trackers_removed:
1✔
255
            trackers = {
1✔
256
                "fake-tracker.example.com": num_level_one_email_trackers_removed
257
            }
258
        else:
259
            trackers = {}
1✔
260
        tracker_report_link = (
1✔
261
            "/tracker-report/#{"
262
            '"sender": "sender@example.com", '
263
            '"received_at": 1658434657, '
264
            f'"trackers": { json.dumps(trackers) }'
265
            "}"
266
        )
267
    else:
268
        tracker_report_link = ""
1✔
269

270
    path = "/emails/wrapped_email_test"
1✔
271
    old_query = {
1✔
272
        "language": language,
273
        "has_premium": "Yes" if has_premium else "No",
274
        "has_tracker_report_link": "Yes" if has_tracker_report_link else "No",
275
        "num_level_one_email_trackers_removed": str(
276
            num_level_one_email_trackers_removed
277
        ),
278
    }
279

280
    def switch_link(key, value):
1✔
281
        if old_query[key] == value:
1✔
282
            return str(value)
1✔
283
        new_query = old_query.copy()
1✔
284
        new_query[key] = value
1✔
285
        return f'<a href="{path}?{urlencode(new_query)}">{value}</a>'
1✔
286

287
    html_content = dedent(
1✔
288
        f"""\
289
    <p>
290
      <strong>Email rendering Test</strong>
291
    </p>
292
    <p>Settings: (<a href="{path}">clear all</a>)</p>
293
    <ul>
294
      <li>
295
        <strong>language</strong>:
296
        {escape(language)}
297
        (switch to
298
        {switch_link("language", "en-us")},
299
        {switch_link("language", "de")},
300
        {switch_link("language", "en-gb")},
301
        {switch_link("language", "fr")},
302
        {switch_link("language", "ru-ru")},
303
        {switch_link("language", "es-es")},
304
        {switch_link("language", "pt-br")},
305
        {switch_link("language", "it-it")},
306
        {switch_link("language", "en-ca")},
307
        {switch_link("language", "de-de")},
308
        {switch_link("language", "es-mx")})
309
      </li>
310
      <li>
311
        <strong>has_premium</strong>:
312
        {"Yes" if has_premium else "No"}
313
        (switch to
314
        {switch_link("has_premium", "Yes")},
315
        {switch_link("has_premium", "No")})
316
      </li>
317
      <li>
318
        <strong>has_tracker_report_link</strong>:
319
        {"Yes" if has_tracker_report_link else "No"}
320
        (switch to
321
        {switch_link("has_tracker_report_link", "Yes")},
322
        {switch_link("has_tracker_report_link", "No")})
323
      </li>
324
      <li>
325
        <strong>num_level_one_email_trackers_removed</strong>:
326
        {num_level_one_email_trackers_removed}
327
        (switch to
328
        {switch_link("num_level_one_email_trackers_removed", "0")},
329
        {switch_link("num_level_one_email_trackers_removed", "1")},
330
        {switch_link("num_level_one_email_trackers_removed", "2")})
331
      </li>
332
    </ul>
333
    """
334
    )
335

336
    wrapped_email = wrap_html_email(
1✔
337
        original_html=html_content,
338
        language=language,
339
        has_premium=has_premium,
340
        tracker_report_link=tracker_report_link,
341
        display_email="test@relay.firefox.com",
342
        num_level_one_email_trackers_removed=num_level_one_email_trackers_removed,
343
    )
344
    return HttpResponse(wrapped_email)
1✔
345

346

347
def _store_reply_record(
1✔
348
    mail: AWS_MailJSON, message_id: str, address: RelayAddress | DomainAddress
349
) -> AWS_MailJSON:
350
    # After relaying email, store a Reply record for it
351
    reply_metadata = {}
1✔
352
    for header in mail["headers"]:
1✔
353
        if header["name"].lower() in ["message-id", "from", "reply-to"]:
1✔
354
            reply_metadata[header["name"].lower()] = header["value"]
1✔
355
    message_id_bytes = get_message_id_bytes(message_id)
1✔
356
    (lookup_key, encryption_key) = derive_reply_keys(message_id_bytes)
1✔
357
    lookup = b64_lookup_key(lookup_key)
1✔
358
    encrypted_metadata = encrypt_reply_metadata(encryption_key, reply_metadata)
1✔
359
    reply_create_args: dict[str, Any] = {
1✔
360
        "lookup": lookup,
361
        "encrypted_metadata": encrypted_metadata,
362
    }
363
    if isinstance(address, DomainAddress):
1✔
364
        reply_create_args["domain_address"] = address
1✔
365
    else:
366
        if not isinstance(address, RelayAddress):
1!
UNCOV
367
            raise TypeError("address must be type RelayAddress")
×
368
        reply_create_args["relay_address"] = address
1✔
369
    Reply.objects.create(**reply_create_args)
1✔
370
    return mail
1✔
371

372

373
@csrf_exempt
1✔
374
def sns_inbound(request):
1✔
375
    incr_if_enabled("sns_inbound", 1)
1✔
376
    # First thing we do is verify the signature
377
    json_body = json.loads(request.body)
1✔
378
    verified_json_body = verify_from_sns(json_body)
1✔
379

380
    # Validate ARN and message type
381
    topic_arn = verified_json_body.get("TopicArn", None)
1✔
382
    message_type = verified_json_body.get("Type", None)
1✔
383
    error_details = validate_sns_arn_and_type(topic_arn, message_type)
1✔
384
    if error_details:
1✔
385
        logger.error("validate_sns_arn_and_type_error", extra=error_details)
1✔
386
        return HttpResponse(error_details["error"], status=400)
1✔
387

388
    return _sns_inbound_logic(topic_arn, message_type, verified_json_body)
1✔
389

390

391
def validate_sns_arn_and_type(
1✔
392
    topic_arn: str | None, message_type: str | None
393
) -> dict[str, Any] | None:
394
    """
395
    Validate Topic ARN and SNS Message Type.
396

397
    If an error is detected, the return is a dictionary of error details.
398
    If no error is detected, the return is None.
399
    """
400
    if not topic_arn:
1✔
401
        error = "Received SNS request without Topic ARN."
1✔
402
    elif topic_arn not in settings.AWS_SNS_TOPIC:
1✔
403
        error = "Received SNS message for wrong topic."
1✔
404
    elif not message_type:
1✔
405
        error = "Received SNS request without Message Type."
1✔
406
    elif message_type not in SUPPORTED_SNS_TYPES:
1✔
407
        error = "Received SNS message for unsupported Type."
1✔
408
    else:
409
        error = None
1✔
410

411
    if error:
1✔
412
        return {
1✔
413
            "error": error,
414
            "received_topic_arn": shlex.quote(topic_arn) if topic_arn else topic_arn,
415
            "supported_topic_arn": sorted(settings.AWS_SNS_TOPIC),
416
            "received_sns_type": (
417
                shlex.quote(message_type) if message_type else message_type
418
            ),
419
            "supported_sns_types": SUPPORTED_SNS_TYPES,
420
        }
421
    return None
1✔
422

423

424
def _sns_inbound_logic(topic_arn, message_type, json_body):
1✔
UNCOV
425
    if message_type == "SubscriptionConfirmation":
×
426
        info_logger.info(
×
427
            "SNS SubscriptionConfirmation",
428
            extra={"SubscribeURL": json_body["SubscribeURL"]},
429
        )
UNCOV
430
        return HttpResponse("Logged SubscribeURL", status=200)
×
431
    if message_type == "Notification":
×
432
        incr_if_enabled("sns_inbound_Notification", 1)
×
433
        return _sns_notification(json_body)
×
434

UNCOV
435
    logger.error(
×
436
        "SNS message type did not fall under the SNS inbound logic",
437
        extra={"message_type": shlex.quote(message_type)},
438
    )
UNCOV
439
    capture_message(
×
440
        "Received SNS message with type not handled in inbound log",
441
        level="error",
442
        stack=True,
443
    )
UNCOV
444
    return HttpResponse(
×
445
        "Received SNS message with type not handled in inbound log", status=400
446
    )
447

448

449
def _sns_notification(json_body):
1✔
450
    try:
1✔
451
        message_json = json.loads(json_body["Message"])
1✔
452
    except JSONDecodeError:
1✔
453
        logger.error(
1✔
454
            "SNS notification has non-JSON message body",
455
            extra={"content": shlex.quote(json_body["Message"])},
456
        )
457
        return HttpResponse("Received SNS notification with non-JSON body", status=400)
1✔
458

459
    event_type = message_json.get("eventType")
1✔
460
    notification_type = message_json.get("notificationType")
1✔
461
    if notification_type not in {
1✔
462
        "Complaint",
463
        "Received",
464
        "Bounce",
465
    } and event_type not in {"Complaint", "Bounce"}:
466
        logger.error(
1✔
467
            "SNS notification for unsupported type",
468
            extra={
469
                "notification_type": shlex.quote(notification_type),
470
                "event_type": shlex.quote(event_type),
471
                "keys": [shlex.quote(key) for key in message_json.keys()],
472
            },
473
        )
474
        return HttpResponse(
1✔
475
            (
476
                "Received SNS notification for unsupported Type: "
477
                f"{html.escape(shlex.quote(notification_type))}"
478
            ),
479
            status=400,
480
        )
481
    response = _sns_message(message_json)
1✔
482
    bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
483
    if response.status_code < 500:
1✔
484
        remove_message_from_s3(bucket, object_key)
1✔
485

486
    return response
1✔
487

488

489
def _get_recipient_with_relay_domain(recipients):
1✔
490
    domains_to_check = get_domains_from_settings().values()
1✔
491
    for recipient in recipients:
1✔
492
        for domain in domains_to_check:
1✔
493
            if domain in recipient:
1✔
494
                return recipient
1✔
495
    return None
1✔
496

497

498
def _get_relay_recipient_from_message_json(message_json):
1✔
499
    # Go thru all To, Cc, and Bcc fields and
500
    # return the one that has a Relay domain
501

502
    # First check common headers for to or cc match
503
    headers_to_check = "to", "cc"
1✔
504
    common_headers = message_json["mail"]["commonHeaders"]
1✔
505
    for header in headers_to_check:
1✔
506
        if header in common_headers:
1✔
507
            recipient = _get_recipient_with_relay_domain(common_headers[header])
1✔
508
            if recipient is not None:
1✔
509
                return parseaddr(recipient)[1]
1✔
510

511
    # SES-SNS sends bcc in a different part of the message
512
    recipients = message_json["receipt"]["recipients"]
1✔
513
    return _get_recipient_with_relay_domain(recipients)
1✔
514

515

516
def _sns_message(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
517
    incr_if_enabled("sns_inbound_Notification_Received", 1)
1✔
518
    init_waffle_flags()
1✔
519
    notification_type = message_json.get("notificationType")
1✔
520
    event_type = message_json.get("eventType")
1✔
521
    if notification_type == "Bounce" or event_type == "Bounce":
1✔
522
        return _handle_bounce(message_json)
1✔
523
    if notification_type == "Complaint" or event_type == "Complaint":
1✔
524
        return _handle_complaint(message_json)
1✔
525
    if notification_type != "Received":
1!
UNCOV
526
        raise ValueError('notification_type must be "Received"')
×
527
    if event_type is not None:
1!
UNCOV
528
        raise ValueError("event_type must be None")
×
529
    return _handle_received(message_json)
1✔
530

531

532
# Enumerate the reasons that an email was not forwarded.
533
# This excludes emails dropped due to mask forwarding settings,
534
# such as "block all" and "block promotional". Those are logged
535
# as Glean email_blocked events.
536
EmailDroppedReason = Literal[
1✔
537
    "auto_block_spam",  # Email identified as spam, user has the auto_block_spam flag
538
    "dmarc_reject_failed",  # Email failed DMARC check with a reject policy
539
    "hard_bounce_pause",  # The user recently had a hard bounce
540
    "soft_bounce_pause",  # The user recently has a soft bounce
541
    "abuse_flag",  # The user exceeded an abuse limit, like mails forwarded
542
    "user_deactivated",  # The user account is deactivated
543
    "reply_requires_premium",  # The email is a reply from a free user
544
    "content_missing",  # Could not load the email from storage
545
    "error_from_header",  # Error generating the From: header, retryable
546
    "error_storage",  # Error fetching the email contents from storage (S3), retryable
547
    "error_sending",  # Error sending the forwarded email (SES), retryable
548
]
549

550

551
def log_email_dropped(
1✔
552
    reason: EmailDroppedReason,
553
    mask: RelayAddress | DomainAddress,
554
    is_reply: bool = False,
555
    can_retry: bool = False,
556
) -> None:
557
    """
558
    Log that an email was dropped for a reason other than a mask blocking setting.
559

560
    This mirrors the interface of glean_logger().log_email_blocked(), which
561
    records emails dropped due to the mask's blocking setting.
562
    """
563
    extra: dict[str, str | int | bool] = {"reason": reason}
1✔
564
    if mask.user.profile.metrics_enabled:
1✔
565
        if mask.user.profile.fxa is not None:
1✔
566
            extra["fxa_id"] = mask.user.profile.fxa.uid
1✔
567
        extra["mask_id"] = mask.metrics_id
1✔
568
    extra |= {
1✔
569
        "is_random_mask": isinstance(mask, RelayAddress),
570
        "is_reply": is_reply,
571
        "can_retry": can_retry,
572
    }
573
    info_logger.info("email_dropped", extra=extra)
1✔
574

575

576
def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
577
    """
578
    Handle an AWS SES received notification.
579

580
    For more information, see:
581
    https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
582
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
583

584
    Returns (may be incomplete):
585
    * 200 if the email was sent, the Relay address is disabled, the Relay user is
586
      flagged for abuse, the email is under a bounce pause, the email was suppressed
587
      for spam, the list email was blocked, or the noreply address was the recipient.
588
    * 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
589
      the email failed DMARC with reject policy, or the email is a reply chain to a
590
      non-premium user.
591
    * 404 if an S3-stored email was not found, no Relay address was found in the "To",
592
      "CC", or "BCC" fields, or the Relay address is not in the database.
593
    * 503 if the "From" address is malformed, the S3 client returned an error different
594
      from "not found", or the SES client fails
595

596
    And many other returns conditions if the email is a reply. The HTTP returns are an
597
    artifact from an earlier time when emails were sent to a webhook. Currently,
598
    production instead pulls events from a queue.
599

600
    TODO: Return a more appropriate status object
601
    TODO: Document the metrics emitted
602
    """
603
    mail = message_json["mail"]
1✔
604
    if "commonHeaders" not in mail:
1✔
605
        logger.error("SNS message without commonHeaders")
1✔
606
        return HttpResponse(
1✔
607
            "Received SNS notification without commonHeaders.", status=400
608
        )
609
    common_headers = mail["commonHeaders"]
1✔
610
    receipt = message_json["receipt"]
1✔
611

612
    _record_receipt_verdicts(receipt, "all")
1✔
613
    to_address = _get_relay_recipient_from_message_json(message_json)
1✔
614
    if to_address is None:
1✔
615
        incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
1✔
616
        return HttpResponse("Address does not exist", status=404)
1✔
617

618
    _record_receipt_verdicts(receipt, "relay_recipient")
1✔
619
    from_addresses = parse_email_header(common_headers["from"][0])
1✔
620
    if not from_addresses:
1✔
621
        info_logger.error(
1✔
622
            "_handle_received: no from address",
623
            extra={
624
                "source": mail["source"],
625
                "common_headers_from": common_headers["from"],
626
            },
627
        )
628
        return HttpResponse("Unable to parse From address", status=400)
1✔
629
    from_address = from_addresses[0][1]
1✔
630

631
    try:
1✔
632
        [to_local_portion, to_domain_portion] = to_address.split("@")
1✔
633
    except ValueError:
1✔
634
        # TODO: Add metric
635
        return HttpResponse("Malformed to field.", status=400)
1✔
636

637
    if to_local_portion.lower() == "noreply":
1✔
638
        incr_if_enabled("email_for_noreply_address", 1)
1✔
639
        return HttpResponse("noreply address is not supported.")
1✔
640
    try:
1✔
641
        # FIXME: this ambiguous return of either
642
        # RelayAddress or DomainAddress types makes the Rustacean in me throw
643
        # up a bit.
644
        address = _get_address(to_address)
1✔
645
        prefetch_related_objects([address.user], "socialaccount_set", "profile")
1✔
646
        user_profile = address.user.profile
1✔
647
    except (
1✔
648
        ObjectDoesNotExist,
649
        CannotMakeAddressException,
650
        DeletedAddress.MultipleObjectsReturned,
651
    ):
652
        if to_local_portion.lower() == "replies":
1✔
653
            response = _handle_reply(from_address, message_json, to_address)
1✔
654
        else:
655
            response = HttpResponse("Address does not exist", status=404)
1✔
656
        return response
1✔
657

658
    _record_receipt_verdicts(receipt, "valid_user")
1✔
659
    # if this is spam and the user is set to auto-block spam, early return
660
    if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
1✔
661
        incr_if_enabled("email_auto_suppressed_for_spam", 1)
1✔
662
        log_email_dropped(reason="auto_block_spam", mask=address)
1✔
663
        return HttpResponse("Address rejects spam.")
1✔
664

665
    if _get_verdict(receipt, "dmarc") == "FAIL":
1✔
666
        policy = receipt.get("dmarcPolicy", "none")
1✔
667
        # TODO: determine action on dmarcPolicy "quarantine"
668
        if policy == "reject":
1!
669
            log_email_dropped(reason="dmarc_reject_failed", mask=address)
1✔
670
            incr_if_enabled(
1✔
671
                "email_suppressed_for_dmarc_failure",
672
                tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
673
            )
674
            return HttpResponse("DMARC failure, policy is reject", status=400)
1✔
675

676
    # if this user is over bounce limits, early return
677
    bounce_paused, bounce_type = user_profile.check_bounce_pause()
1✔
678
    if bounce_paused:
1✔
679
        _record_receipt_verdicts(receipt, "user_bounce_paused")
1✔
680
        incr_if_enabled(f"email_suppressed_for_{bounce_type}_bounce", 1)
1✔
681
        reason: Literal["soft_bounce_pause", "hard_bounce_pause"] = (
1✔
682
            "soft_bounce_pause" if bounce_type == "soft" else "hard_bounce_pause"
683
        )
684
        log_email_dropped(reason=reason, mask=address)
1✔
685
        return HttpResponse("Address is temporarily disabled.")
1✔
686

687
    # check if this is a reply from an external sender to a Relay user
688
    try:
1✔
689
        (lookup_key, _) = _get_keys_from_headers(mail["headers"])
1✔
690
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
691
        user_address = address
1✔
692
        address = reply_record.address
1✔
693
        message_id = _get_message_id_from_headers(mail["headers"])
1✔
694
        # make sure the relay user is premium
695
        if not _reply_allowed(from_address, to_address, reply_record, message_id):
1!
696
            log_email_dropped(reason="reply_requires_premium", mask=user_address)
1✔
697
            return HttpResponse("Relay replies require a premium account", status=403)
1✔
698
    except (ReplyHeadersNotFound, Reply.DoesNotExist):
1✔
699
        # if there's no In-Reply-To header, or the In-Reply-To value doesn't
700
        # match a Reply record, continue to treat this as a regular email from
701
        # an external sender to a relay user
702
        pass
1✔
703

704
    # if account flagged for abuse, early return
705
    if user_profile.is_flagged:
1✔
706
        log_email_dropped(reason="abuse_flag", mask=address)
1✔
707
        return HttpResponse("Address is temporarily disabled.")
1✔
708

709
    if not user_profile.user.is_active:
1✔
710
        log_email_dropped(reason="user_deactivated", mask=address)
1✔
711
        return HttpResponse("Account is deactivated.")
1✔
712

713
    # if address is set to block, early return
714
    if not address.enabled:
1✔
715
        incr_if_enabled("email_for_disabled_address", 1)
1✔
716
        address.num_blocked += 1
1✔
717
        address.save(update_fields=["num_blocked"])
1✔
718
        _record_receipt_verdicts(receipt, "disabled_alias")
1✔
719
        user_profile.last_engagement = datetime.now(UTC)
1✔
720
        user_profile.save()
1✔
721
        glean_logger().log_email_blocked(mask=address, reason="block_all")
1✔
722
        return HttpResponse("Address is temporarily disabled.")
1✔
723

724
    _record_receipt_verdicts(receipt, "active_alias")
1✔
725
    incr_if_enabled("email_for_active_address", 1)
1✔
726

727
    # if address is blocking list emails, and email is from list, early return
728
    if (
1✔
729
        address
730
        and address.block_list_emails
731
        and user_profile.has_premium
732
        and _check_email_from_list(mail["headers"])
733
    ):
734
        incr_if_enabled("list_email_for_address_blocking_lists", 1)
1✔
735
        address.num_blocked += 1
1✔
736
        address.save(update_fields=["num_blocked"])
1✔
737
        user_profile.last_engagement = datetime.now(UTC)
1✔
738
        user_profile.save()
1✔
739
        glean_logger().log_email_blocked(mask=address, reason="block_promotional")
1✔
740
        return HttpResponse("Address is not accepting list emails.")
1✔
741

742
    # Collect new headers
743
    subject = common_headers.get("subject", "")
1✔
744
    destination_address = user_profile.user.email
1✔
745
    reply_address = get_reply_to_address()
1✔
746
    try:
1✔
747
        from_header = generate_from_header(from_address, to_address)
1✔
748
    except InvalidFromHeader:
1✔
749
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
750
        header_from = []
1✔
751
        for header in mail["headers"]:
1✔
752
            if header["name"].lower() == "from":
1✔
753
                header_from.append(header)
1✔
754
        info_logger.error(
1✔
755
            "generate_from_header",
756
            extra={
757
                "from_address": from_address,
758
                "source": mail["source"],
759
                "common_headers_from": common_headers["from"],
760
                "headers_from": header_from,
761
            },
762
        )
763
        log_email_dropped(reason="error_from_header", mask=address, can_retry=True)
1✔
764
        return HttpResponse("Cannot parse the From address", status=503)
1✔
765

766
    # Get incoming email
767
    try:
1✔
768
        (incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
769
    except ClientError as e:
1✔
770
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
771
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
772
            log_email_dropped(reason="content_missing", mask=address)
1✔
773
            return HttpResponse("Email not in S3", status=404)
1✔
774
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
775
        log_email_dropped(reason="error_storage", mask=address, can_retry=True)
1✔
776
        # we are returning a 503 so that SNS can retry the email processing
777
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
778

779
    # Handle developer overrides, logging
780
    dev_action = _get_developer_mode_action(address)
1✔
781
    if dev_action:
1✔
782
        if dev_action.new_destination_address:
1!
783
            destination_address = dev_action.new_destination_address
1✔
784
        _log_dev_notification(
1✔
785
            "_handle_received: developer_mode", dev_action, message_json
786
        )
787

788
    # Convert to new email
789
    headers: OutgoingHeaders = {
1✔
790
        "Subject": subject,
791
        "From": from_header,
792
        "To": destination_address,
793
        "Reply-To": reply_address,
794
        "Resent-From": from_address,
795
    }
796
    sample_trackers = bool(sample_is_active("tracker_sample"))
1✔
797
    tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
1✔
798
    remove_level_one_trackers = bool(
1✔
799
        tracker_removal_flag and user_profile.remove_level_one_email_trackers
800
    )
801
    (
1✔
802
        forwarded_email,
803
        issues,
804
        level_one_trackers_removed,
805
        has_html,
806
        has_text,
807
    ) = _convert_to_forwarded_email(
808
        incoming_email_bytes=incoming_email_bytes,
809
        headers=headers,
810
        to_address=to_address,
811
        from_address=from_address,
812
        language=user_profile.language,
813
        has_premium=user_profile.has_premium,
814
        sample_trackers=sample_trackers,
815
        remove_level_one_trackers=remove_level_one_trackers,
816
    )
817
    if has_html:
1✔
818
        incr_if_enabled("email_with_html_content", 1)
1✔
819
    if has_text:
1✔
820
        incr_if_enabled("email_with_text_content", 1)
1✔
821
    if issues:
1✔
822
        info_logger.info(
1✔
823
            "_handle_received: forwarding issues", extra={"issues": issues}
824
        )
825

826
    # Send new email
827
    try:
1✔
828
        ses_response = ses_send_raw_email(
1✔
829
            source_address=reply_address,
830
            destination_address=destination_address,
831
            message=forwarded_email,
832
        )
833
    except ClientError:
1✔
834
        # 503 service unavailable response to SNS so it can retry
835
        log_email_dropped(reason="error_sending", mask=address, can_retry=True)
1✔
836
        return HttpResponse("SES client error on Raw Email", status=503)
1✔
837

838
    message_id = ses_response["MessageId"]
1✔
839
    _store_reply_record(mail, message_id, address)
1✔
840

841
    user_profile.update_abuse_metric(
1✔
842
        email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
843
    )
844
    user_profile.last_engagement = datetime.now(UTC)
1✔
845
    user_profile.save()
1✔
846
    address.num_forwarded += 1
1✔
847
    address.last_used_at = datetime.now(UTC)
1✔
848
    if level_one_trackers_removed:
1!
UNCOV
849
        address.num_level_one_trackers_blocked = (
×
850
            address.num_level_one_trackers_blocked or 0
851
        ) + level_one_trackers_removed
852
    address.save(
1✔
853
        update_fields=[
854
            "num_forwarded",
855
            "last_used_at",
856
            "block_list_emails",
857
            "num_level_one_trackers_blocked",
858
        ]
859
    )
860
    glean_logger().log_email_forwarded(mask=address, is_reply=False)
1✔
861
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
862

863

864
class DeveloperModeAction(NamedTuple):
1✔
865
    mask_id: str
1✔
866
    action: Literal["log", "simulate_complaint"] = "log"
1✔
867
    new_destination_address: str | None = None
1✔
868

869

870
def _get_verdict(receipt, verdict_type):
1✔
871
    return receipt[f"{verdict_type}Verdict"]["status"]
1✔
872

873

874
def _check_email_from_list(headers):
1✔
875
    for header in headers:
1!
876
        if header["name"].lower().startswith("list-"):
1!
877
            return True
1✔
UNCOV
878
    return False
×
879

880

881
def _record_receipt_verdicts(receipt, state):
1✔
882
    verdict_tags = []
1✔
883
    for key in sorted(receipt.keys()):
1✔
884
        if key.endswith("Verdict"):
1✔
885
            value = receipt[key]["status"]
1✔
886
            verdict_tags.append(f"{key}:{value}")
1✔
887
            incr_if_enabled(f"relay.emails.verdicts.{key}", 1, [f"state:{state}"])
1✔
888
        elif key == "dmarcPolicy":
1✔
889
            value = receipt[key]
1✔
890
            verdict_tags.append(f"{key}:{value}")
1✔
891
    incr_if_enabled(f"relay.emails.state.{state}", 1, verdict_tags)
1✔
892

893

894
def _get_message_id_from_headers(headers):
1✔
895
    message_id = None
1✔
896
    for header in headers:
1✔
897
        if header["name"].lower() == "message-id":
1✔
898
            message_id = header["value"]
1✔
899
    return message_id
1✔
900

901

902
def _get_keys_from_headers(headers):
1✔
903
    in_reply_to = None
1✔
904
    for header in headers:
1✔
905
        if header["name"].lower() == "in-reply-to":
1✔
906
            in_reply_to = header["value"]
1✔
907
            message_id_bytes = get_message_id_bytes(in_reply_to)
1✔
908
            return derive_reply_keys(message_id_bytes)
1✔
909

910
        if header["name"].lower() == "references":
1✔
911
            message_ids = header["value"]
1✔
912
            for message_id in message_ids.split(" "):
1✔
913
                message_id_bytes = get_message_id_bytes(message_id)
1✔
914
                lookup_key, encryption_key = derive_reply_keys(message_id_bytes)
1✔
915
                try:
1✔
916
                    # FIXME: calling code is likely to duplicate this query
917
                    _get_reply_record_from_lookup_key(lookup_key)
1✔
918
                    return lookup_key, encryption_key
1✔
919
                except Reply.DoesNotExist:
1✔
920
                    pass
1✔
921
            raise Reply.DoesNotExist
1✔
922
    incr_if_enabled("mail_to_replies_without_reply_headers", 1)
1✔
923
    raise ReplyHeadersNotFound
1✔
924

925

926
def _get_reply_record_from_lookup_key(lookup_key):
1✔
927
    lookup = b64_lookup_key(lookup_key)
1✔
928
    return Reply.objects.get(lookup=lookup)
1✔
929

930

931
def _strip_localpart_tag(address):
1✔
932
    [localpart, domain] = address.split("@")
1✔
933
    subaddress_parts = localpart.split("+")
1✔
934
    return f"{subaddress_parts[0]}@{domain}"
1✔
935

936

937
_TransportType = Literal["sns", "s3"]
1✔
938

939

940
def _get_email_bytes(
1✔
941
    message_json: AWS_SNSMessageJSON,
942
) -> tuple[bytes, _TransportType, float]:
943
    with Timer(logger=None) as load_timer:
1✔
944
        if "content" in message_json:
1✔
945
            # email content in sns message
946
            message_content = message_json["content"].encode("utf-8")
1✔
947
            transport: Literal["sns", "s3"] = "sns"
1✔
948
        else:
949
            # assume email content in S3
950
            bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
951
            message_content = get_message_content_from_s3(bucket, object_key)
1✔
952
            transport = "s3"
1✔
953
        histogram_if_enabled("relayed_email.size", len(message_content))
1✔
954
    load_time_s = round(load_timer.last, 3)
1✔
955
    return (message_content, transport, load_time_s)
1✔
956

957

958
def _get_developer_mode_action(
1✔
959
    mask: RelayAddress | DomainAddress,
960
) -> DeveloperModeAction | None:
961
    """Get the developer mode actions for this mask, if enabled."""
962

963
    if not (
1✔
964
        flag_is_active_in_task("developer_mode", mask.user)
965
        and "DEV:" in mask.description
966
    ):
967
        return None
1✔
968

969
    if "DEV:simulate_complaint" in mask.description:
1!
970
        action = DeveloperModeAction(
1✔
971
            mask_id=mask.metrics_id,
972
            action="simulate_complaint",
973
            new_destination_address=f"complaint+{mask.metrics_id}@simulator.amazonses.com",
974
        )
975
    else:
UNCOV
976
        action = DeveloperModeAction(mask_id=mask.metrics_id, action="log")
×
977
    return action
1✔
978

979

980
def _log_dev_notification(
1✔
981
    log_message: str, dev_action: DeveloperModeAction, notification: dict[str, Any]
982
) -> None:
983
    """
984
    Log notification JSON
985

986
    This will log information beyond our privacy policy, so it should only be used on
987
    Relay staff accounts with prior permission.
988

989
    The notification JSON will be compressed, Ascii85-encoded with padding, and broken
990
    into 1024-bytes chunks. This will ensure it fits into GCP's log entry, which has a
991
    64KB limit per label value.
992
    """
993

994
    notification_gza85 = encode_dict_gza85(notification)
1✔
995
    total_parts = notification_gza85.count("\n") + 1
1✔
996
    log_group_id = str(uuid4())
1✔
997
    for partnum, part in enumerate(notification_gza85.splitlines()):
1✔
998
        info_logger.info(
1✔
999
            log_message,
1000
            extra={
1001
                "mask_id": dev_action.mask_id,
1002
                "dev_action": dev_action.action,
1003
                "log_group_id": log_group_id,
1004
                "part": partnum,
1005
                "parts": total_parts,
1006
                "notification_gza85": part,
1007
            },
1008
        )
1009

1010

1011
def _convert_to_forwarded_email(
1✔
1012
    incoming_email_bytes: bytes,
1013
    headers: OutgoingHeaders,
1014
    to_address: str,
1015
    from_address: str,
1016
    language: str,
1017
    has_premium: bool,
1018
    sample_trackers: bool,
1019
    remove_level_one_trackers: bool,
1020
    now: datetime | None = None,
1021
) -> tuple[EmailMessage, EmailForwardingIssues, int, bool, bool]:
1022
    """
1023
    Convert an email (as bytes) to a forwarded email.
1024

1025
    Return is a tuple:
1026
    - email - The forwarded email
1027
    - issues - Any detected issues in conversion
1028
    - level_one_trackers_removed (int) - Number of trackers removed
1029
    - has_html - True if the email has an HTML representation
1030
    - has_text - True if the email has a plain text representation
1031
    """
1032
    email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
1✔
1033
    # python/typeshed issue 2418
1034
    # The Python 3.2 default was Message, 3.6 uses policy.message_factory, and
1035
    # policy.default.message_factory is EmailMessage
1036
    if not isinstance(email, EmailMessage):
1!
UNCOV
1037
        raise TypeError("email must be type EmailMessage")
×
1038

1039
    # Replace headers in the original email
1040
    header_issues = _replace_headers(email, headers)
1✔
1041

1042
    # Find and replace text content
1043
    text_body = email.get_body("plain")
1✔
1044
    text_content = None
1✔
1045
    has_text = False
1✔
1046
    if text_body:
1✔
1047
        has_text = True
1✔
1048
        if not isinstance(text_body, EmailMessage):
1!
UNCOV
1049
            raise TypeError("text_body must be type EmailMessage")
×
1050
        text_content = text_body.get_content()
1✔
1051
        new_text_content = _convert_text_content(text_content, to_address)
1✔
1052
        text_body.set_content(new_text_content)
1✔
1053

1054
    # Find and replace HTML content
1055
    html_body = email.get_body("html")
1✔
1056
    level_one_trackers_removed = 0
1✔
1057
    has_html = False
1✔
1058
    if html_body:
1✔
1059
        has_html = True
1✔
1060
        if not isinstance(html_body, EmailMessage):
1!
UNCOV
1061
            raise TypeError("html_body must be type EmailMessage")
×
1062
        html_content = html_body.get_content()
1✔
1063
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
1064
            html_content,
1065
            to_address,
1066
            from_address,
1067
            language,
1068
            has_premium,
1069
            sample_trackers,
1070
            remove_level_one_trackers,
1071
        )
1072
        html_body.set_content(new_content, subtype="html")
1✔
1073
    elif text_content:
1!
1074
        # Try to use the text content to generate HTML content
1075
        html_content = urlize_and_linebreaks(text_content)
1✔
1076
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
1077
            html_content,
1078
            to_address,
1079
            from_address,
1080
            language,
1081
            has_premium,
1082
            sample_trackers,
1083
            remove_level_one_trackers,
1084
        )
1085
        if not isinstance(text_body, EmailMessage):
1!
UNCOV
1086
            raise TypeError("text_body must be type EmailMessage")
×
1087
        try:
1✔
1088
            text_body.add_alternative(new_content, subtype="html")
1✔
UNCOV
1089
        except TypeError as e:
×
UNCOV
1090
            out = StringIO()
×
1091
            _structure(email, fp=out)
×
1092
            info_logger.error(
×
1093
                "Adding HTML alternate failed",
1094
                extra={"exception": str(e), "structure": out.getvalue()},
1095
            )
1096

1097
    issues: EmailForwardingIssues = {}
1✔
1098
    if header_issues:
1✔
1099
        issues["headers"] = header_issues
1✔
1100
    return (email, issues, level_one_trackers_removed, has_html, has_text)
1✔
1101

1102

1103
def _replace_headers(
1✔
1104
    email: EmailMessage, headers: OutgoingHeaders
1105
) -> EmailHeaderIssues:
1106
    """
1107
    Replace the headers in email with new headers.
1108

1109
    This replaces headers in the passed email object, rather than returns an altered
1110
    copy. The primary reason is that the Python email package can read an email with
1111
    non-compliant headers or content, but can't write it. A read/write is required to
1112
    create a copy that we then alter. This code instead alters the passed EmailMessage
1113
    object, making header-specific changes in try / except statements.
1114

1115
    The other reason is the object size. An Email can be up to 10 MB, and we hope to
1116
    support 40 MB emails someday. Modern servers may be OK with this, but it would be
1117
    nice to handle the non-compliant headers without crashing before we add a source of
1118
    memory-related crashes.
1119
    """
1120
    # Look for headers to drop
1121
    to_drop: list[str] = []
1✔
1122
    replacements: set[str] = {_k.lower() for _k in headers.keys()}
1✔
1123
    issues: EmailHeaderIssues = []
1✔
1124

1125
    # Detect non-compliant headers in incoming emails
1126
    for header in email.keys():
1✔
1127
        try:
1✔
1128
            value = email[header]
1✔
1129
        except Exception as e:
1✔
1130
            issues.append(
1✔
1131
                {"header": header, "direction": "in", "exception_on_read": repr(e)}
1132
            )
1133
            value = None
1✔
1134
        if getattr(value, "defects", None):
1✔
1135
            issues.append(
1✔
1136
                {
1137
                    "header": header,
1138
                    "direction": "in",
1139
                    "defect_count": len(value.defects),
1140
                    "parsed_value": str(value),
1141
                    "raw_value": str(value.as_raw),
1142
                }
1143
            )
1144
        elif getattr(getattr(value, "_parse_tree", None), "all_defects", []):
1✔
1145
            issues.append(
1✔
1146
                {
1147
                    "header": header,
1148
                    "direction": "in",
1149
                    "defect_count": len(value._parse_tree.all_defects),
1150
                    "parsed_value": str(value),
1151
                    "raw_value": str(value.as_raw),
1152
                }
1153
            )
1154

1155
    # Collect headers that will not be forwarded
1156
    for header in email.keys():
1✔
1157
        header_lower = header.lower()
1✔
1158
        if (
1✔
1159
            header_lower not in replacements
1160
            and header_lower != "mime-version"
1161
            and not header_lower.startswith("content-")
1162
        ):
1163
            to_drop.append(header)
1✔
1164

1165
    # Drop headers that should be dropped
1166
    for header in to_drop:
1✔
1167
        del email[header]
1✔
1168

1169
    # Replace the requested headers
1170
    for header, value in headers.items():
1✔
1171
        del email[header]
1✔
1172
        try:
1✔
1173
            email[header] = value.rstrip("\r\n")
1✔
UNCOV
1174
        except Exception as e:
×
UNCOV
1175
            issues.append(
×
1176
                {
1177
                    "header": header,
1178
                    "direction": "out",
1179
                    "exception_on_write": repr(e),
1180
                    "value": value,
1181
                }
1182
            )
UNCOV
1183
            continue
×
1184
        try:
1✔
1185
            parsed_value = email[header]
1✔
UNCOV
1186
        except Exception as e:
×
UNCOV
1187
            issues.append(
×
1188
                {
1189
                    "header": header,
1190
                    "direction": "out",
1191
                    "exception_on_write": repr(e),
1192
                    "value": value,
1193
                }
1194
            )
UNCOV
1195
            continue
×
1196
        if parsed_value.defects:
1!
1197
            issues.append(
×
1198
                {
1199
                    "header": header,
1200
                    "direction": "out",
1201
                    "defect_count": len(parsed_value.defects),
1202
                    "parsed_value": str(parsed_value),
1203
                    "raw_value": str(parsed_value.as_raw),
1204
                },
1205
            )
1206

1207
    return issues
1✔
1208

1209

1210
def _convert_html_content(
1✔
1211
    html_content: str,
1212
    to_address: str,
1213
    from_address: str,
1214
    language: str,
1215
    has_premium: bool,
1216
    sample_trackers: bool,
1217
    remove_level_one_trackers: bool,
1218
    now: datetime | None = None,
1219
) -> tuple[str, int]:
1220
    # frontend expects a timestamp in milliseconds
1221
    now = now or datetime.now(UTC)
1✔
1222
    datetime_now_ms = int(now.timestamp() * 1000)
1✔
1223

1224
    # scramble alias so that clients don't recognize it
1225
    # and apply default link styles
1226
    display_email = re.sub("([@.:])", r"<span>\1</span>", to_address)
1✔
1227

1228
    # sample tracker numbers
1229
    if sample_trackers:
1!
UNCOV
1230
        count_all_trackers(html_content)
×
1231

1232
    tracker_report_link = ""
1✔
1233
    removed_count = 0
1✔
1234
    if remove_level_one_trackers:
1!
UNCOV
1235
        html_content, tracker_details = remove_trackers(
×
1236
            html_content, from_address, datetime_now_ms
1237
        )
UNCOV
1238
        removed_count = tracker_details["tracker_removed"]
×
UNCOV
1239
        tracker_report_details = {
×
1240
            "sender": from_address,
1241
            "received_at": datetime_now_ms,
1242
            "trackers": tracker_details["level_one"]["trackers"],
1243
        }
UNCOV
1244
        tracker_report_link = f"{settings.SITE_ORIGIN}/tracker-report/#" + json.dumps(
×
1245
            tracker_report_details
1246
        )
1247

1248
    wrapped_html = wrap_html_email(
1✔
1249
        original_html=html_content,
1250
        language=language,
1251
        has_premium=has_premium,
1252
        display_email=display_email,
1253
        tracker_report_link=tracker_report_link,
1254
        num_level_one_email_trackers_removed=removed_count,
1255
    )
1256
    return wrapped_html, removed_count
1✔
1257

1258

1259
def _convert_text_content(text_content: str, to_address: str) -> str:
1✔
1260
    relay_header_text = (
1✔
1261
        "This email was sent to your alias "
1262
        f"{to_address}. To stop receiving emails sent to this alias, "
1263
        "update the forwarding settings in your dashboard.\n"
1264
        "---Begin Email---\n"
1265
    )
1266
    wrapped_text = relay_header_text + text_content
1✔
1267
    return wrapped_text
1✔
1268

1269

1270
def _build_reply_requires_premium_email(
1✔
1271
    from_address: str,
1272
    reply_record: Reply,
1273
    message_id: str | None,
1274
    decrypted_metadata: dict[str, Any] | None,
1275
) -> EmailMessage:
1276
    # If we haven't forwarded a first reply for this user yet, _reply_allowed
1277
    # will forward.  So, tell the user we forwarded it.
1278
    forwarded = not reply_record.address.user.profile.forwarded_first_reply
1✔
1279
    sender: str | None = ""
1✔
1280
    if decrypted_metadata is not None:
1!
1281
        sender = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1282
    ctx = {
1✔
1283
        "sender": sender or "",
1284
        "forwarded": forwarded,
1285
        "SITE_ORIGIN": settings.SITE_ORIGIN,
1286
    }
1287
    html_body = render_to_string("emails/reply_requires_premium.html", ctx)
1✔
1288
    text_body = render_to_string("emails/reply_requires_premium.txt", ctx)
1✔
1289

1290
    # Create the message
1291
    msg = EmailMessage()
1✔
1292
    msg["Subject"] = ftl_bundle.format("replies-not-included-in-free-account-header")
1✔
1293
    msg["From"] = get_reply_to_address()
1✔
1294
    msg["To"] = from_address
1✔
1295
    if message_id:
1!
1296
        msg["In-Reply-To"] = message_id
1✔
1297
        msg["References"] = message_id
1✔
1298
    msg.set_content(text_body)
1✔
1299
    msg.add_alternative(html_body, subtype="html")
1✔
1300
    return msg
1✔
1301

1302

1303
def _set_forwarded_first_reply(profile):
1✔
1304
    profile.forwarded_first_reply = True
1✔
1305
    profile.save()
1✔
1306

1307

1308
def _send_reply_requires_premium_email(
1✔
1309
    from_address: str,
1310
    reply_record: Reply,
1311
    message_id: str | None,
1312
    decrypted_metadata: dict[str, Any] | None,
1313
) -> None:
UNCOV
1314
    msg = _build_reply_requires_premium_email(
×
1315
        from_address, reply_record, message_id, decrypted_metadata
1316
    )
UNCOV
1317
    try:
×
UNCOV
1318
        ses_send_raw_email(
×
1319
            source_address=get_reply_to_address(premium=False),
1320
            destination_address=from_address,
1321
            message=msg,
1322
        )
1323
        # If we haven't forwarded a first reply for this user yet, _reply_allowed will.
1324
        # So, updated the DB.
UNCOV
1325
        _set_forwarded_first_reply(reply_record.address.user.profile)
×
UNCOV
1326
    except ClientError as e:
×
UNCOV
1327
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
×
1328
    incr_if_enabled("free_user_reply_attempt", 1)
×
1329

1330

1331
def _reply_allowed(
1✔
1332
    from_address, to_address, reply_record, message_id=None, decrypted_metadata=None
1333
):
1334
    stripped_from_address = _strip_localpart_tag(from_address)
1✔
1335
    reply_record_email = reply_record.address.user.email
1✔
1336
    stripped_reply_record_address = _strip_localpart_tag(reply_record_email)
1✔
1337
    if (from_address == reply_record_email) or (
1!
1338
        stripped_from_address == stripped_reply_record_address
1339
    ):
1340
        # This is a Relay user replying to an external sender;
1341

1342
        if not reply_record.profile.user.is_active:
1!
UNCOV
1343
            return False
×
1344

1345
        if reply_record.profile.is_flagged:
1!
1346
            return False
×
1347

1348
        if reply_record.owner_has_premium:
1!
1349
            return True
1✔
1350

1351
        # if we haven't forwarded a first reply for this user, return True to allow
1352
        # this first reply
UNCOV
1353
        allow_first_reply = not reply_record.address.user.profile.forwarded_first_reply
×
UNCOV
1354
        _send_reply_requires_premium_email(
×
1355
            from_address, reply_record, message_id, decrypted_metadata
1356
        )
1357
        return allow_first_reply
×
1358
    else:
1359
        # The From: is not a Relay user, so make sure this is a reply *TO* a
1360
        # premium Relay user
UNCOV
1361
        try:
×
UNCOV
1362
            address = _get_address(to_address)
×
UNCOV
1363
            if address.user.profile.has_premium:
×
1364
                return True
×
1365
        except ObjectDoesNotExist:
×
1366
            return False
×
1367
    incr_if_enabled("free_user_reply_attempt", 1)
×
1368
    return False
×
1369

1370

1371
def _handle_reply(
1✔
1372
    from_address: str, message_json: AWS_SNSMessageJSON, to_address: str
1373
) -> HttpResponse:
1374
    """
1375
    Handle a reply from a Relay user to an external email.
1376

1377
    Returns (may be incomplete):
1378
    * 200 if the reply was sent
1379
    * 400 if the In-Reply-To and References headers are missing, none of the References
1380
      headers are a reply record, or the SES client raises an error
1381
    * 403 if the Relay user is not allowed to reply
1382
    * 404 if the S3-stored email is not found, or there is no matching Reply record in
1383
      the database
1384
    * 503 if the S3 client returns an error (other than not found), or the SES client
1385
      returns an error
1386

1387
    TODO: Return a more appropriate status object (see _handle_received)
1388
    TODO: Document metrics emitted
1389
    """
1390
    mail = message_json["mail"]
1✔
1391
    try:
1✔
1392
        (lookup_key, encryption_key) = _get_keys_from_headers(mail["headers"])
1✔
1393
    except ReplyHeadersNotFound:
1✔
1394
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-header"])
1✔
1395
        return HttpResponse("No In-Reply-To header", status=400)
1✔
1396

1397
    try:
1✔
1398
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
1399
    except Reply.DoesNotExist:
1✔
1400
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-reply-record"])
1✔
1401
        return HttpResponse("Unknown or stale In-Reply-To header", status=404)
1✔
1402

1403
    address = reply_record.address
1✔
1404
    message_id = _get_message_id_from_headers(mail["headers"])
1✔
1405
    decrypted_metadata = json.loads(
1✔
1406
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
1407
    )
1408
    if not _reply_allowed(
1✔
1409
        from_address, to_address, reply_record, message_id, decrypted_metadata
1410
    ):
1411
        log_email_dropped(reason="reply_requires_premium", mask=address, is_reply=True)
1✔
1412
        return HttpResponse("Relay replies require a premium account", status=403)
1✔
1413

1414
    outbound_from_address = address.full_address
1✔
1415
    incr_if_enabled("reply_email", 1)
1✔
1416
    subject = mail["commonHeaders"].get("subject", "")
1✔
1417
    to_address = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1418
    headers: OutgoingHeaders = {
1✔
1419
        "Subject": subject,
1420
        "From": outbound_from_address,
1421
        "To": to_address,
1422
        "Reply-To": outbound_from_address,
1423
    }
1424

1425
    try:
1✔
1426
        (email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
1427
    except ClientError as e:
1✔
1428
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
1429
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
1430
            log_email_dropped(reason="content_missing", mask=address, is_reply=True)
1✔
1431
            return HttpResponse("Email not in S3", status=404)
1✔
1432
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
1433
        log_email_dropped(
1✔
1434
            reason="error_storage", mask=address, is_reply=True, can_retry=True
1435
        )
1436
        # we are returning a 500 so that SNS can retry the email processing
1437
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
1438

1439
    email = message_from_bytes(email_bytes, policy=relay_policy)
1✔
1440
    if not isinstance(email, EmailMessage):
1!
UNCOV
1441
        raise TypeError("email must be type EmailMessage")
×
1442

1443
    # Convert to a reply email
1444
    # TODO: Issue #1747 - Remove wrapper / prefix in replies
1445
    _replace_headers(email, headers)
1✔
1446

1447
    try:
1✔
1448
        ses_send_raw_email(
1✔
1449
            source_address=outbound_from_address,
1450
            destination_address=to_address,
1451
            message=email,
1452
        )
1453
    except ClientError:
1✔
1454
        log_email_dropped(reason="error_sending", mask=address, is_reply=True)
1✔
1455
        return HttpResponse("SES client error", status=400)
1✔
1456

1457
    reply_record.increment_num_replied()
1✔
1458
    profile = address.user.profile
1✔
1459
    profile.update_abuse_metric(replied=True)
1✔
1460
    profile.last_engagement = datetime.now(UTC)
1✔
1461
    profile.save()
1✔
1462
    glean_logger().log_email_forwarded(mask=address, is_reply=True)
1✔
1463
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
1464

1465

1466
def _get_domain_address(
1✔
1467
    local_portion: str, domain_portion: str, create: bool = True
1468
) -> DomainAddress:
1469
    """
1470
    Find or create the DomainAddress for the parts of an email address.
1471

1472
    If the domain_portion is for a valid subdomain, and create=True, a new DomainAddress
1473
    will be created and returned. If create=False, DomainAddress.DoesNotExist is raised.
1474

1475
    If the domain_portion is for an unknown domain, ObjectDoesNotExist is raised.
1476

1477
    If the domain_portion is for an unclaimed subdomain, Profile.DoesNotExist is raised.
1478
    """
1479

1480
    [address_subdomain, address_domain] = domain_portion.split(".", 1)
1✔
1481
    if address_domain != get_domains_from_settings()["MOZMAIL_DOMAIN"]:
1✔
1482
        if create:
1✔
1483
            incr_if_enabled("email_for_not_supported_domain", 1)
1✔
1484
        raise ObjectDoesNotExist("Address does not exist")
1✔
1485
    try:
1✔
1486
        with transaction.atomic():
1✔
1487
            locked_profile = Profile.objects.select_for_update().get(
1✔
1488
                subdomain=address_subdomain
1489
            )
1490
            domain_numerical = get_domain_numerical(address_domain)
1✔
1491
            # filter DomainAddress because it may not exist
1492
            # which will throw an error with get()
1493
            domain_address = DomainAddress.objects.filter(
1✔
1494
                user=locked_profile.user, address=local_portion, domain=domain_numerical
1495
            ).first()
1496
            if domain_address is None:
1✔
1497
                if not create:
1✔
1498
                    raise DomainAddress.DoesNotExist()
1✔
1499
                # TODO: Consider flows when a user generating alias on a fly
1500
                # was unable to receive an email due to user no longer being a
1501
                # premium user as seen in exception thrown on make_domain_address
1502
                domain_address = DomainAddress.make_domain_address(
1✔
1503
                    locked_profile.user, local_portion, True
1504
                )
1505
                glean_logger().log_email_mask_created(
1✔
1506
                    mask=domain_address,
1507
                    created_by_api=False,
1508
                )
1509
            domain_address.last_used_at = datetime.now(UTC)
1✔
1510
            domain_address.save()
1✔
1511
            return domain_address
1✔
1512
    except Profile.DoesNotExist as e:
1✔
1513
        if create:
1✔
1514
            incr_if_enabled("email_for_dne_subdomain", 1)
1✔
1515
        raise e
1✔
1516

1517

1518
def _get_address(address: str, create: bool = True) -> RelayAddress | DomainAddress:
1✔
1519
    """
1520
    Find or create the RelayAddress or DomainAddress for an email address.
1521

1522
    If an unknown email address is for a valid subdomain, and create is True,
1523
    a new DomainAddress will be created.
1524

1525
    On failure, raises exception based on Django's ObjectDoesNotExist:
1526
    * RelayAddress.DoesNotExist - looks like RelayAddress, deleted or does not exist
1527
    * Profile.DoesNotExist - looks like DomainAddress, no subdomain match
1528
    * DomainAddress.DoesNotExist - looks like unknown DomainAddress, create is False
1529
    * ObjectDoesNotExist - Unknown domain
1530
    """
1531

1532
    local_portion, domain_portion = address.split("@")
1✔
1533
    local_address = local_portion.lower()
1✔
1534
    domain = domain_portion.lower()
1✔
1535

1536
    # if the domain is not the site's 'top' relay domain,
1537
    # it may be for a user's subdomain
1538
    email_domains = get_domains_from_settings().values()
1✔
1539
    if domain not in email_domains:
1✔
1540
        return _get_domain_address(local_address, domain, create)
1✔
1541

1542
    # the domain is the site's 'top' relay domain, so look up the RelayAddress
1543
    try:
1✔
1544
        domain_numerical = get_domain_numerical(domain)
1✔
1545
        relay_address = RelayAddress.objects.get(
1✔
1546
            address=local_address, domain=domain_numerical
1547
        )
1548
        return relay_address
1✔
1549
    except RelayAddress.DoesNotExist as e:
1✔
1550
        if not create:
1✔
1551
            raise e
1✔
1552
        try:
1✔
1553
            DeletedAddress.objects.get(
1✔
1554
                address_hash=address_hash(local_address, domain=domain)
1555
            )
1556
            incr_if_enabled("email_for_deleted_address", 1)
1✔
1557
            # TODO: create a hard bounce receipt rule in SES
1558
        except DeletedAddress.DoesNotExist:
1✔
1559
            incr_if_enabled("email_for_unknown_address", 1)
1✔
1560
        except DeletedAddress.MultipleObjectsReturned:
1✔
1561
            # not sure why this happens on stage but let's handle it
1562
            incr_if_enabled("email_for_deleted_address_multiple", 1)
1✔
1563
        raise e
1✔
1564

1565

1566
def _get_address_if_exists(address: str) -> RelayAddress | DomainAddress | None:
1✔
1567
    """Get the matching RelayAddress or DomainAddress, or None if it doesn't exist."""
1568
    try:
1✔
1569
        return _get_address(address, create=False)
1✔
1570
    except (RelayAddress.DoesNotExist, Profile.DoesNotExist, ObjectDoesNotExist):
1✔
1571
        return None
1✔
1572

1573

1574
def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1575
    """
1576
    Handle an AWS SES bounce notification.
1577

1578
    For more information, see:
1579
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object
1580

1581
    Returns:
1582
    * 404 response if any email address does not match a user,
1583
    * 200 response if all match or none are given
1584

1585
    Emits a counter metric "email_bounce" with these tags:
1586
    * bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
1587
    * bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
1588
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1589
    * relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'
1590

1591
    Emits an info log "bounce_notification", same data as metric, plus:
1592
    * bounce_action: 'action' from bounced recipient data, or None
1593
    * bounce_status: 'status' from bounced recipient data, or None
1594
    * bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
1595
    * bounce_extra: Extra data from bounce_recipient data, if any
1596
    * domain: User's real email address domain, if an address was given
1597
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1598
    """
1599
    bounce = message_json.get("bounce", {})
1✔
1600
    bounce_type = bounce.get("bounceType", "none")
1✔
1601
    bounce_subtype = bounce.get("bounceSubType", "none")
1✔
1602
    bounced_recipients = bounce.get("bouncedRecipients", [])
1✔
1603

1604
    now = datetime.now(UTC)
1✔
1605
    bounce_data = []
1✔
1606
    for recipient in bounced_recipients:
1✔
1607
        recipient_address = recipient.pop("emailAddress", None)
1✔
1608
        data = {
1✔
1609
            "bounce_type": bounce_type,
1610
            "bounce_subtype": bounce_subtype,
1611
            "bounce_action": recipient.pop("action", ""),
1612
            "bounce_status": recipient.pop("status", ""),
1613
            "bounce_diagnostic": recipient.pop("diagnosticCode", ""),
1614
            "user_match": "no_address",
1615
            "relay_action": "no_action",
1616
        }
1617
        if recipient:
1!
UNCOV
1618
            data["bounce_extra"] = recipient.copy()
×
1619
        bounce_data.append(data)
1✔
1620

1621
        if recipient_address is None:
1!
UNCOV
1622
            continue
×
1623

1624
        recipient_address = parseaddr(recipient_address)[1]
1✔
1625
        recipient_domain = recipient_address.split("@")[1]
1✔
1626
        data["domain"] = recipient_domain
1✔
1627

1628
        try:
1✔
1629
            user = User.objects.get(email=recipient_address)
1✔
1630
            profile = user.profile
1✔
1631
            data["user_match"] = "found"
1✔
1632
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1633
                data["fxa_id"] = fxa.uid
1✔
1634
            else:
1635
                data["fxa_id"] = ""
1✔
1636
        except User.DoesNotExist:
1✔
1637
            # TODO: handle bounce for a user who no longer exists
1638
            # add to SES account-wide suppression list?
1639
            data["user_match"] = "missing"
1✔
1640
            continue
1✔
1641

1642
        action = None
1✔
1643
        if "spam" in data["bounce_diagnostic"].lower():
1✔
1644
            # if an email bounced as spam, set to auto block spam for this user
1645
            # and DON'T set them into bounce pause state
1646
            action = "auto_block_spam"
1✔
1647
            profile.auto_block_spam = True
1✔
1648
        elif bounce_type == "Permanent":
1✔
1649
            # TODO: handle sub-types: 'General', 'NoEmail', etc.
1650
            action = "hard_bounce"
1✔
1651
            profile.last_hard_bounce = now
1✔
1652
        elif bounce_type == "Transient":
1!
1653
            # TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
1654
            action = "soft_bounce"
1✔
1655
            profile.last_soft_bounce = now
1✔
1656
        if action:
1!
1657
            data["relay_action"] = action
1✔
1658
            profile.save()
1✔
1659

1660
    if not bounce_data:
1!
1661
        # Data when there are no identified recipients
UNCOV
1662
        bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1663

1664
    for data in bounce_data:
1✔
1665
        tags = {
1✔
1666
            "bounce_type": bounce_type,
1667
            "bounce_subtype": bounce_subtype,
1668
            "user_match": data["user_match"],
1669
            "relay_action": data["relay_action"],
1670
        }
1671
        incr_if_enabled(
1✔
1672
            "email_bounce",
1673
            1,
1674
            tags=[generate_tag(key, val) for key, val in tags.items()],
1675
        )
1676
        info_logger.info("bounce_notification", extra=data)
1✔
1677

1678
    if any(data["user_match"] == "missing" for data in bounce_data):
1✔
1679
        return HttpResponse("Address does not exist", status=404)
1✔
1680
    return HttpResponse("OK", status=200)
1✔
1681

1682

1683
def _build_disabled_mask_for_spam_email(
1✔
1684
    mask: RelayAddress | DomainAddress,
1685
) -> EmailMessage:
1686
    ctx = {"mask": mask.full_address, "SITE_ORIGIN": settings.SITE_ORIGIN}
1✔
1687
    html_body = render_to_string("emails/disabled_mask_for_spam.html", ctx)
1✔
1688
    text_body = render_to_string("emails/disabled_mask_for_spam.txt", ctx)
1✔
1689

1690
    # Create the message
1691
    msg = EmailMessage()
1✔
1692
    msg["Subject"] = ftl_bundle.format("relay-deactivated-mask-email-subject")
1✔
1693
    msg["From"] = settings.RELAY_FROM_ADDRESS
1✔
1694
    msg["To"] = mask.user.email
1✔
1695
    msg.set_content(text_body)
1✔
1696
    msg.add_alternative(html_body, subtype="html")
1✔
1697
    return msg
1✔
1698

1699

1700
def _send_disabled_mask_for_spam_email(mask: RelayAddress | DomainAddress) -> None:
1✔
1701
    msg = _build_disabled_mask_for_spam_email(mask)
1✔
1702
    if not settings.RELAY_FROM_ADDRESS:
1!
UNCOV
1703
        raise ValueError(
×
1704
            "Must set settings.RELAY_FROM_ADDRESS to send disabled_mask_for_spam email."
1705
        )
1706
    try:
1✔
1707
        ses_send_raw_email(
1✔
1708
            source_address=settings.RELAY_FROM_ADDRESS,
1709
            destination_address=mask.user.email,
1710
            message=msg,
1711
        )
UNCOV
1712
    except ClientError as e:
×
1713
        logger.error("send_disabled_mask_ses_client_error", extra=e.response["Error"])
×
1714
    incr_if_enabled("send_disabled_mask_email", 1)
1✔
1715

1716

1717
def _handle_complaint(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1718
    """
1719
    Handle an AWS SES complaint notification.
1720

1721
    This looks for Relay users in the complainedRecipients (real email address)
1722
    and the From: header (mask address). We expect both to match the same Relay user,
1723
    and return a 200. If one or the other do not match, a 404 is returned, and errors
1724
    may be logged.
1725

1726
    The first time a user complains, this sets the user's auto_block_spam flag to True.
1727

1728
    The second time a user complains, this disables the mask thru which the spam mail
1729
    was forwarded, and sends an email to the user to notify them the mask is disabled
1730
    and can be re-enabled on their dashboard.
1731

1732
    For more information on the complaint notification, see:
1733
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1734

1735
    Returns:
1736
    * 404 response if any email address does not match a user,
1737
    * 200 response if all match or none are given
1738

1739
    Emits a counter metric "email_complaint" with these tags:
1740
    * complaint_subtype: 'onaccountsuppressionlist', or 'none' if omitted
1741
    * complaint_feedback - feedback enumeration from ISP (usually 'abuse') or 'none'
1742
    * user_match: 'found' or 'no_recipients'
1743
    * relay_action: 'no_action', 'auto_block_spam', or 'disable_mask'
1744

1745
    Emits an info log "complaint_notification", same data as metric, plus:
1746
    * complaint_user_agent - identifies the client used to file the complaint
1747
    * complaint_extra - Extra data from complainedRecipients data, if any
1748
    * domain - User's domain, if an address was given
1749
    * found_in - "complained_recipients" (real email), "from_header" (email mask),
1750
      or "all" (matching records found in both)
1751
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1752
    * mask_match - "found" if "From" header contains an email mask, or "not_found"
1753
    """
1754
    complaint_data = _get_complaint_data(message_json)
1✔
1755
    complainers, unknown_count = _gather_complainers(complaint_data)
1✔
1756

1757
    # Reduce future complaints from complaining Relay users
1758
    actions: list[ComplaintAction] = []
1✔
1759
    for complainer in complainers:
1✔
1760
        action = _reduce_future_complaints(complainer)
1✔
1761
        actions.append(action)
1✔
1762

1763
        if (
1✔
1764
            flag_is_active_in_task("developer_mode", complainer["user"])
1765
            and action.mask_id
1766
        ):
1767
            _log_dev_notification(
1✔
1768
                "_handle_complaint: developer_mode",
1769
                DeveloperModeAction(mask_id=action.mask_id, action="log"),
1770
                message_json,
1771
            )
1772

1773
    # Log complaint and actions taken
1774
    if not actions:
1✔
1775
        # Log the complaint but that no action was taken
1776
        actions.append(ComplaintAction(user_match="no_recipients"))
1✔
1777
    for action in actions:
1✔
1778
        tags = [
1✔
1779
            generate_tag(key, val)
1780
            for key, val in {
1781
                "complaint_subtype": complaint_data.subtype or "none",
1782
                "complaint_feedback": complaint_data.feedback_type or "none",
1783
                "user_match": action.user_match,
1784
                "relay_action": action.relay_action,
1785
            }.items()
1786
        ]
1787
        incr_if_enabled("email_complaint", tags=tags)
1✔
1788

1789
        log_extra = {
1✔
1790
            "complaint_subtype": complaint_data.subtype or None,
1791
            "complaint_user_agent": complaint_data.user_agent or None,
1792
            "complaint_feedback": complaint_data.feedback_type or None,
1793
        }
1794
        log_extra.update(
1✔
1795
            {
1796
                key: value
1797
                for key, value in action._asdict().items()
1798
                if (value is not None and key != "mask_id")
1799
            }
1800
        )
1801
        info_logger.info("complaint_notification", extra=log_extra)
1✔
1802

1803
    if unknown_count:
1✔
1804
        return HttpResponse("Address does not exist", status=404)
1✔
1805
    return HttpResponse("OK", status=200)
1✔
1806

1807

1808
class RawComplaintData(NamedTuple):
1✔
1809
    complained_recipients: list[tuple[str, dict[str, Any]]]
1✔
1810
    from_addresses: list[str]
1✔
1811
    subtype: str
1✔
1812
    user_agent: str
1✔
1813
    feedback_type: str
1✔
1814

1815

1816
def _get_complaint_data(message_json: AWS_SNSMessageJSON) -> RawComplaintData:
1✔
1817
    """
1818
    Extract complaint data from an AWS SES Complaint Notification.
1819

1820
    This extracts only the data used by _handle_complaint(). It also works on
1821
    complaint events, which have a similar structure and the same data needed
1822
    by _handle_complaint.
1823

1824
    For more information on the complaint notification, see:
1825
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1826
    """
1827
    complaint = message_json["complaint"]
1✔
1828

1829
    T = TypeVar("T")
1✔
1830

1831
    def get_or_log(
1✔
1832
        key: str, source: dict[str, T], data_type: type[T]
1833
    ) -> tuple[T, bool]:
1834
        """Get a value from a dictionary, or log if not found"""
1835
        if key in source:
1✔
1836
            return source[key], True
1✔
1837
        logger.error(
1✔
1838
            "_get_complaint_data: Unexpected message format",
1839
            extra={"missing_key": key, "found_keys": ",".join(sorted(source.keys()))},
1840
        )
1841
        return data_type(), False
1✔
1842

1843
    raw_recipients, has_cr = get_or_log("complainedRecipients", complaint, list)
1✔
1844
    complained_recipients = []
1✔
1845
    no_entries = True
1✔
1846
    for entry in raw_recipients:
1✔
1847
        no_entries = False
1✔
1848
        raw_email_address, has_email = get_or_log("emailAddress", entry, str)
1✔
1849
        if has_email:
1✔
1850
            email_address = parseaddr(raw_email_address)[1]
1✔
1851
            extra = {
1✔
1852
                key: value for key, value in entry.items() if key != "emailAddress"
1853
            }
1854
            complained_recipients.append((email_address, extra))
1✔
1855
    if has_cr and no_entries:
1✔
1856
        logger.error("_get_complaint_data: Empty complainedRecipients")
1✔
1857

1858
    mail, has_mail = get_or_log("mail", message_json, dict)
1✔
1859
    if has_mail:
1✔
1860
        commonHeaders, has_ch = get_or_log("commonHeaders", mail, dict)
1✔
1861
    else:
1862
        commonHeaders, has_ch = {}, False
1✔
1863
    if has_ch:
1✔
1864
        raw_from_addresses, _ = get_or_log("from", commonHeaders, list)
1✔
1865
    else:
1866
        raw_from_addresses = []
1✔
1867
    from_addresses = [parseaddr(addr)[1] for addr in raw_from_addresses]
1✔
1868

1869
    # Only present when set
1870
    feedback_type = complaint.get("complaintFeedbackType", "")
1✔
1871
    # Only present when destination is on account suppression list
1872
    subtype = complaint.get("complaintSubType", "")
1✔
1873
    # Only present for feedback reports
1874
    user_agent = complaint.get("userAgent", "")
1✔
1875

1876
    return RawComplaintData(
1✔
1877
        complained_recipients, from_addresses, subtype, user_agent, feedback_type
1878
    )
1879

1880

1881
class Complainer(TypedDict):
1✔
1882
    user: User
1✔
1883
    found_in: Literal["complained_recipients", "from_header", "all"]
1✔
1884
    domain: str
1✔
1885
    extra: dict[str, Any] | None
1✔
1886
    masks: list[RelayAddress | DomainAddress]
1✔
1887

1888

1889
def _gather_complainers(
1✔
1890
    complaint_data: RawComplaintData,
1891
) -> tuple[list[Complainer], int]:
1892
    """
1893
    Fetch Relay Users and masks from the complaint data.
1894

1895
    This matches data from an AWS SES Complaint Notification (as extracted by
1896
    _get_complaint_data()) to the Relay database, and returns the Users,
1897
    RelayAddresses, and DomainAddresses, as well as status and extra data.
1898

1899
    If the complaint came from the AWS SES complaint simulator, detect
1900
    developer_mode and move forward with the developer's User data.
1901
    """
1902

1903
    users: dict[int, Complainer] = {}
1✔
1904
    unknown_complainer_count = 0
1✔
1905
    for email_address, extra_data in complaint_data.complained_recipients:
1✔
1906
        local, domain = email_address.split("@", 1)
1✔
1907

1908
        # If the complainer is the AWS SES complaint simulation, assume that
1909
        # it was send by a user with the developer_mode flag. Look for
1910
        # a mask that matches the embedded mask metrics_id, and use
1911
        # the related user's email instead of the AWS simulator address.
1912
        # See docs/developer_mode.md
1913
        if domain == "simulator.amazonses.com" and local.startswith("complaint+"):
1✔
1914
            mask_metrics_id = local.removeprefix("complaint+")
1✔
1915
            mask = _get_mask_by_metrics_id(mask_metrics_id)
1✔
1916
            if mask:
1✔
1917
                email_address = mask.user.email
1✔
1918
                domain = mask.user.email.split("@")[1]
1✔
1919

1920
        try:
1✔
1921
            user = User.objects.get(email=email_address)
1✔
1922
        except User.DoesNotExist:
1✔
1923
            logger.error("_gather_complainers: unknown complainedRecipient")
1✔
1924
            unknown_complainer_count += 1
1✔
1925
            continue
1✔
1926

1927
        if user.id in users:
1✔
1928
            logger.error("_gather_complainers: complainer appears twice")
1✔
1929
            continue
1✔
1930

1931
        users[user.id] = {
1✔
1932
            "user": user,
1933
            "found_in": "complained_recipients",
1934
            "domain": domain,
1935
            "extra": extra_data or None,
1936
            "masks": [],
1937
        }
1938

1939
    # Collect From: addresses and their users
1940
    unknown_sender_count = 0
1✔
1941
    for email_address in complaint_data.from_addresses:
1✔
1942
        mask = _get_address_if_exists(email_address)
1✔
1943
        if not mask:
1✔
1944
            logger.error("_gather_complainers: unknown mask, maybe deleted?")
1✔
1945
            unknown_sender_count += 1
1✔
1946
            continue
1✔
1947

1948
        if mask.user.id not in users:
1✔
1949
            # Add mask-only entry to users
1950
            users[mask.user.id] = {
1✔
1951
                "user": mask.user,
1952
                "found_in": "from_header",
1953
                "domain": mask.user.email.split("@")[1],
1954
                "extra": None,
1955
                "masks": [mask],
1956
            }
1957
            continue
1✔
1958

1959
        user_data = users[mask.user.id]
1✔
1960
        if mask in user_data["masks"]:
1✔
1961
            logger.error("_gather_complainers: mask appears twice")
1✔
1962
            continue
1✔
1963

1964
        user_data["masks"].append(mask)
1✔
1965
        if user_data["found_in"] in ("all", "complained_recipients"):
1✔
1966
            user_data["found_in"] = "all"
1✔
1967
        else:
1968
            logger.error("_gather_complainers: no complainer, multi-mask")
1✔
1969

1970
    return (list(users.values()), unknown_complainer_count + unknown_sender_count)
1✔
1971

1972

1973
def _get_mask_by_metrics_id(metrics_id: str) -> RelayAddress | DomainAddress | None:
1✔
1974
    """Look up a mask by metrics ID, or None if not found."""
1975
    if not metrics_id or metrics_id[0] not in ("R", "D"):
1✔
1976
        return None
1✔
1977
    mask_type_id = metrics_id[0]
1✔
1978
    mask_raw_id = metrics_id[1:]
1✔
1979
    try:
1✔
1980
        mask_id = int(mask_raw_id)
1✔
1981
    except ValueError:
1✔
1982
        return None  # ID is not an int, do not try to match to Relay mask
1✔
1983

1984
    if mask_type_id == "R":
1✔
1985
        try:
1✔
1986
            return RelayAddress.objects.get(id=mask_id)
1✔
1987
        except RelayAddress.DoesNotExist:
1✔
1988
            return None
1✔
1989
    try:
1✔
1990
        return DomainAddress.objects.get(id=mask_id)
1✔
1991
    except DomainAddress.DoesNotExist:
1✔
1992
        return None
1✔
1993

1994

1995
class ComplaintAction(NamedTuple):
1✔
1996
    user_match: Literal["found", "no_recipients"]
1✔
1997
    relay_action: Literal["no_action", "auto_block_spam", "disable_mask"] = "no_action"
1✔
1998
    mask_match: Literal["found", "not_found"] = "not_found"
1✔
1999
    mask_id: str | None = None
1✔
2000
    found_in: Literal["complained_recipients", "from_header", "all"] | None = None
1✔
2001
    fxa_id: str | None = None
1✔
2002
    domain: str | None = None
1✔
2003
    complaint_extra: str | None = None
1✔
2004

2005

2006
def _reduce_future_complaints(complainer: Complainer) -> ComplaintAction:
1✔
2007
    """Take action to reduce future complaints from complaining user."""
2008

2009
    user = complainer["user"]
1✔
2010
    mask_match: Literal["found", "not_found"] = "not_found"
1✔
2011
    relay_action: Literal["no_action", "auto_block_spam", "disable_mask"] = "no_action"
1✔
2012
    mask_id = None
1✔
2013

2014
    if not user.profile.auto_block_spam:
1✔
2015
        relay_action = "auto_block_spam"
1✔
2016
        user.profile.auto_block_spam = True
1✔
2017
        user.profile.save()
1✔
2018

2019
    for mask in complainer["masks"]:
1✔
2020
        mask_match = "found"
1✔
2021
        mask_id = mask.metrics_id
1✔
2022
        if (
1✔
2023
            flag_is_active_in_task("disable_mask_on_complaint", user)
2024
            and mask.enabled
2025
            and relay_action != "auto_block_spam"
2026
        ):
2027
            relay_action = "disable_mask"
1✔
2028
            mask.enabled = False
1✔
2029
            mask.save()
1✔
2030
            _send_disabled_mask_for_spam_email(mask)
1✔
2031

2032
    return ComplaintAction(
1✔
2033
        user_match="found",
2034
        relay_action=relay_action,
2035
        mask_match=mask_match,
2036
        mask_id=mask_id,
2037
        fxa_id=user.profile.metrics_fxa_id,
2038
        domain=complainer["domain"],
2039
        found_in=complainer["found_in"],
2040
        complaint_extra=(
2041
            json.dumps(complainer["extra"]) if complainer["extra"] else None
2042
        ),
2043
    )
2044

2045

2046
_WAFFLE_FLAGS_INITIALIZED = False
1✔
2047

2048

2049
def init_waffle_flags() -> None:
1✔
2050
    """Initialize waffle flags for email tasks"""
2051
    global _WAFFLE_FLAGS_INITIALIZED
2052
    if _WAFFLE_FLAGS_INITIALIZED:
1✔
2053
        return
1✔
2054

2055
    flags: list[tuple[str, str]] = [
1✔
2056
        (
2057
            "disable_mask_on_complaint",
2058
            "MPP-3119: When a Relay user marks an email as spam, disable the mask.",
2059
        ),
2060
        (
2061
            "developer_mode",
2062
            "MPP-3932: Enable logging and overrides for Relay developers.",
2063
        ),
2064
    ]
2065
    waffle_flag_table = get_waffle_flag_model().objects
1✔
2066
    for name, note in flags:
1✔
2067
        waffle_flag_table.get_or_create(name=name, defaults={"note": note})
1✔
2068
    _WAFFLE_FLAGS_INITIALIZED = True
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc