• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

mozilla / fx-private-relay / 81bd7efa-7e2f-44be-91cc-76a322ca2564

28 May 2025 08:14PM UTC coverage: 85.374% (+0.02%) from 85.353%
81bd7efa-7e2f-44be-91cc-76a322ca2564

Pull #5572

circleci

groovecoder
for MPP-3439: add ensure_ascii_email and call before sending an email
Pull Request #5572: for MPP-3439: add IDNAEmailCleaner to clean email address domains with non-ASCII chars

2479 of 3627 branches covered (68.35%)

Branch coverage included in aggregate %.

105 of 107 new or added lines in 6 files covered. (98.13%)

10 existing lines in 2 files now uncovered.

17607 of 19900 relevant lines covered (88.48%)

9.56 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

89.74
/emails/views.py
1
import html
1✔
2
import json
1✔
3
import logging
1✔
4
import re
1✔
5
import shlex
1✔
6
from datetime import UTC, datetime
1✔
7
from email import message_from_bytes
1✔
8
from email.iterators import _structure
1✔
9
from email.message import EmailMessage
1✔
10
from email.utils import parseaddr
1✔
11
from io import StringIO
1✔
12
from json import JSONDecodeError
1✔
13
from textwrap import dedent
1✔
14
from typing import Any, Literal, NamedTuple, TypedDict, TypeVar
1✔
15
from urllib.parse import urlencode
1✔
16
from uuid import uuid4
1✔
17

18
from django.conf import settings
1✔
19
from django.contrib.auth.models import User
1✔
20
from django.core.exceptions import ObjectDoesNotExist
1✔
21
from django.db import transaction
1✔
22
from django.db.models import prefetch_related_objects
1✔
23
from django.http import HttpRequest, HttpResponse
1✔
24
from django.shortcuts import render
1✔
25
from django.template.loader import render_to_string
1✔
26
from django.utils.html import escape
1✔
27
from django.views.decorators.csrf import csrf_exempt
1✔
28

29
from botocore.exceptions import ClientError
1✔
30
from codetiming import Timer
1✔
31
from decouple import strtobool
1✔
32
from markus.utils import generate_tag
1✔
33
from sentry_sdk import capture_message
1✔
34
from waffle import get_waffle_flag_model, sample_is_active
1✔
35

36
from privaterelay.cleaners import IDNAEmailCleaner
1✔
37
from privaterelay.ftl_bundles import main as ftl_bundle
1✔
38
from privaterelay.models import Profile
1✔
39
from privaterelay.utils import (
1✔
40
    flag_is_active_in_task,
41
    get_subplat_upgrade_link_by_language,
42
    glean_logger,
43
)
44

45
from .exceptions import CannotMakeAddressException
1✔
46
from .models import (
1✔
47
    DeletedAddress,
48
    DomainAddress,
49
    RelayAddress,
50
    Reply,
51
    address_hash,
52
    get_domain_numerical,
53
)
54
from .policy import relay_policy
1✔
55
from .sns import SUPPORTED_SNS_TYPES, verify_from_sns
1✔
56
from .types import (
1✔
57
    AWS_MailJSON,
58
    AWS_SNSMessageJSON,
59
    EmailForwardingIssues,
60
    EmailHeaderIssues,
61
    OutgoingHeaders,
62
)
63
from .utils import (
1✔
64
    InvalidFromHeader,
65
    _get_bucket_and_key_from_s3_json,
66
    b64_lookup_key,
67
    count_all_trackers,
68
    decrypt_reply_metadata,
69
    derive_reply_keys,
70
    encode_dict_gza85,
71
    encrypt_reply_metadata,
72
    generate_from_header,
73
    get_domains_from_settings,
74
    get_message_content_from_s3,
75
    get_message_id_bytes,
76
    get_reply_to_address,
77
    histogram_if_enabled,
78
    incr_if_enabled,
79
    parse_email_header,
80
    remove_message_from_s3,
81
    remove_trackers,
82
    ses_send_raw_email,
83
    urlize_and_linebreaks,
84
)
85

86
logger = logging.getLogger("events")
1✔
87
info_logger = logging.getLogger("eventsinfo")
1✔
88

89

90
class ReplyHeadersNotFound(Exception):
1✔
91
    def __init__(self, message="No In-Reply-To or References headers."):
1✔
92
        self.message = message
1✔
93

94

95
def first_time_user_test(request):
1✔
96
    """
97
    Demonstrate rendering of the "First time Relay user" email.
98
    Settings like language can be given in the querystring, otherwise settings
99
    come from a random free profile.
100
    """
101
    in_bundle_country = strtobool(request.GET.get("in_bundle_country", "yes"))
×
102
    email_context = {
×
103
        "in_bundle_country": in_bundle_country,
104
        "SITE_ORIGIN": settings.SITE_ORIGIN,
105
    }
106
    if request.GET.get("format", "html") == "text":
×
107
        return render(
×
108
            request,
109
            "emails/first_time_user.txt",
110
            email_context,
111
            "text/plain; charset=utf-8",
112
        )
113
    return render(request, "emails/first_time_user.html", email_context)
×
114

115

116
def reply_requires_premium_test(request):
1✔
117
    """
118
    Demonstrate rendering of the "Reply requires premium" email.
119

120
    Settings like language can be given in the querystring, otherwise settings
121
    come from a random free profile.
122
    """
123
    email_context = {
1✔
124
        "sender": "test@example.com",
125
        "forwarded": True,
126
        "SITE_ORIGIN": settings.SITE_ORIGIN,
127
    }
128
    for param in request.GET:
1✔
129
        email_context[param] = request.GET.get(param)
1✔
130
        if param == "forwarded" and request.GET[param] == "True":
1✔
131
            email_context[param] = True
1✔
132

133
    for param in request.GET:
1✔
134
        if param == "content-type" and request.GET[param] == "text/plain":
1✔
135
            return render(
1✔
136
                request,
137
                "emails/reply_requires_premium.txt",
138
                email_context,
139
                "text/plain; charset=utf-8",
140
            )
141
    return render(request, "emails/reply_requires_premium.html", email_context)
1✔
142

143

144
def disabled_mask_for_spam_test(request):
1✔
145
    """
146
    Demonstrate rendering of the "Disabled mask for spam" email.
147

148
    Settings like language can be given in the querystring, otherwise settings
149
    come from a random free profile.
150
    """
151
    mask = "abc123456@mozmail.com"
×
152
    email_context = {
×
153
        "mask": mask,
154
        "SITE_ORIGIN": settings.SITE_ORIGIN,
155
    }
156
    for param in request.GET:
×
157
        email_context[param] = request.GET.get(param)
×
158

159
    for param in request.GET:
×
160
        if param == "content-type" and request.GET[param] == "text/plain":
×
161
            return render(
×
162
                request,
163
                "emails/disabled_mask_for_spam.txt",
164
                email_context,
165
                "text/plain; charset=utf-8",
166
            )
167
    return render(request, "emails/disabled_mask_for_spam.html", email_context)
×
168

169

170
def first_forwarded_email_test(request: HttpRequest) -> HttpResponse:
1✔
171
    # TO DO: Update with correct context when trigger is created
172
    first_forwarded_email_html = render_to_string(
×
173
        "emails/first_forwarded_email.html",
174
        {
175
            "SITE_ORIGIN": settings.SITE_ORIGIN,
176
        },
177
    )
178

179
    wrapped_email = wrap_html_email(
×
180
        first_forwarded_email_html,
181
        "en-us",
182
        True,
183
        "test@example.com",
184
        0,
185
    )
186

187
    return HttpResponse(wrapped_email)
×
188

189

190
def wrap_html_email(
1✔
191
    original_html: str,
192
    language: str,
193
    has_premium: bool,
194
    display_email: str,
195
    num_level_one_email_trackers_removed: int | None = None,
196
    tracker_report_link: str | None = None,
197
) -> str:
198
    """Add Relay banners, surveys, etc. to an HTML email"""
199
    subplat_upgrade_link = get_subplat_upgrade_link_by_language(language)
1✔
200
    email_context = {
1✔
201
        "original_html": original_html,
202
        "language": language,
203
        "has_premium": has_premium,
204
        "subplat_upgrade_link": subplat_upgrade_link,
205
        "display_email": display_email,
206
        "tracker_report_link": tracker_report_link,
207
        "num_level_one_email_trackers_removed": num_level_one_email_trackers_removed,
208
        "SITE_ORIGIN": settings.SITE_ORIGIN,
209
    }
210
    content = render_to_string("emails/wrapped_email.html", email_context)
1✔
211
    # Remove empty lines
212
    content_lines = [line for line in content.splitlines() if line.strip()]
1✔
213
    return "\n".join(content_lines) + "\n"
1✔
214

215

216
def wrapped_email_test(request: HttpRequest) -> HttpResponse:
1✔
217
    """
218
    Demonstrate rendering of forwarded HTML emails.
219

220
    Settings like language can be given in the querystring, otherwise settings
221
    come from a randomly chosen profile.
222
    """
223

224
    if all(key in request.GET for key in ("language", "has_premium")):
1✔
225
        user_profile = None
1✔
226
    else:
227
        user_profile = Profile.objects.order_by("?").first()
1✔
228

229
    if "language" in request.GET:
1✔
230
        language = request.GET["language"]
1✔
231
    else:
232
        if user_profile is None:
1!
233
            raise ValueError("user_profile must not be None")
×
234
        language = user_profile.language
1✔
235

236
    if "has_premium" in request.GET:
1✔
237
        has_premium = strtobool(request.GET["has_premium"])
1✔
238
    else:
239
        if user_profile is None:
1!
240
            raise ValueError("user_profile must not be None")
×
241
        has_premium = user_profile.has_premium
1✔
242

243
    if "num_level_one_email_trackers_removed" in request.GET:
1✔
244
        num_level_one_email_trackers_removed = int(
1✔
245
            request.GET["num_level_one_email_trackers_removed"]
246
        )
247
    else:
248
        num_level_one_email_trackers_removed = 0
1✔
249

250
    if "has_tracker_report_link" in request.GET:
1✔
251
        has_tracker_report_link = strtobool(request.GET["has_tracker_report_link"])
1✔
252
    else:
253
        has_tracker_report_link = False
1✔
254
    if has_tracker_report_link:
1✔
255
        if num_level_one_email_trackers_removed:
1✔
256
            trackers = {
1✔
257
                "fake-tracker.example.com": num_level_one_email_trackers_removed
258
            }
259
        else:
260
            trackers = {}
1✔
261
        tracker_report_link = (
1✔
262
            "/tracker-report/#{"
263
            '"sender": "sender@example.com", '
264
            '"received_at": 1658434657, '
265
            f'"trackers": { json.dumps(trackers) }'
266
            "}"
267
        )
268
    else:
269
        tracker_report_link = ""
1✔
270

271
    path = "/emails/wrapped_email_test"
1✔
272
    old_query = {
1✔
273
        "language": language,
274
        "has_premium": "Yes" if has_premium else "No",
275
        "has_tracker_report_link": "Yes" if has_tracker_report_link else "No",
276
        "num_level_one_email_trackers_removed": str(
277
            num_level_one_email_trackers_removed
278
        ),
279
    }
280

281
    def switch_link(key, value):
1✔
282
        if old_query[key] == value:
1✔
283
            return str(value)
1✔
284
        new_query = old_query.copy()
1✔
285
        new_query[key] = value
1✔
286
        return f'<a href="{path}?{urlencode(new_query)}">{value}</a>'
1✔
287

288
    html_content = dedent(
1✔
289
        f"""\
290
    <p>
291
      <strong>Email rendering Test</strong>
292
    </p>
293
    <p>Settings: (<a href="{path}">clear all</a>)</p>
294
    <ul>
295
      <li>
296
        <strong>language</strong>:
297
        {escape(language)}
298
        (switch to
299
        {switch_link("language", "en-us")},
300
        {switch_link("language", "de")},
301
        {switch_link("language", "en-gb")},
302
        {switch_link("language", "fr")},
303
        {switch_link("language", "ru-ru")},
304
        {switch_link("language", "es-es")},
305
        {switch_link("language", "pt-br")},
306
        {switch_link("language", "it-it")},
307
        {switch_link("language", "en-ca")},
308
        {switch_link("language", "de-de")},
309
        {switch_link("language", "es-mx")})
310
      </li>
311
      <li>
312
        <strong>has_premium</strong>:
313
        {"Yes" if has_premium else "No"}
314
        (switch to
315
        {switch_link("has_premium", "Yes")},
316
        {switch_link("has_premium", "No")})
317
      </li>
318
      <li>
319
        <strong>has_tracker_report_link</strong>:
320
        {"Yes" if has_tracker_report_link else "No"}
321
        (switch to
322
        {switch_link("has_tracker_report_link", "Yes")},
323
        {switch_link("has_tracker_report_link", "No")})
324
      </li>
325
      <li>
326
        <strong>num_level_one_email_trackers_removed</strong>:
327
        {num_level_one_email_trackers_removed}
328
        (switch to
329
        {switch_link("num_level_one_email_trackers_removed", "0")},
330
        {switch_link("num_level_one_email_trackers_removed", "1")},
331
        {switch_link("num_level_one_email_trackers_removed", "2")})
332
      </li>
333
    </ul>
334
    """
335
    )
336

337
    wrapped_email = wrap_html_email(
1✔
338
        original_html=html_content,
339
        language=language,
340
        has_premium=has_premium,
341
        tracker_report_link=tracker_report_link,
342
        display_email="test@relay.firefox.com",
343
        num_level_one_email_trackers_removed=num_level_one_email_trackers_removed,
344
    )
345
    return HttpResponse(wrapped_email)
1✔
346

347

348
def _store_reply_record(
1✔
349
    mail: AWS_MailJSON, message_id: str, address: RelayAddress | DomainAddress
350
) -> AWS_MailJSON:
351
    # After relaying email, store a Reply record for it
352
    reply_metadata = {}
1✔
353
    for header in mail["headers"]:
1✔
354
        if header["name"].lower() in ["message-id", "from", "reply-to"]:
1✔
355
            reply_metadata[header["name"].lower()] = header["value"]
1✔
356
    message_id_bytes = get_message_id_bytes(message_id)
1✔
357
    (lookup_key, encryption_key) = derive_reply_keys(message_id_bytes)
1✔
358
    lookup = b64_lookup_key(lookup_key)
1✔
359
    encrypted_metadata = encrypt_reply_metadata(encryption_key, reply_metadata)
1✔
360
    reply_create_args: dict[str, Any] = {
1✔
361
        "lookup": lookup,
362
        "encrypted_metadata": encrypted_metadata,
363
    }
364
    if isinstance(address, DomainAddress):
1✔
365
        reply_create_args["domain_address"] = address
1✔
366
    else:
367
        if not isinstance(address, RelayAddress):
1!
368
            raise TypeError("address must be type RelayAddress")
×
369
        reply_create_args["relay_address"] = address
1✔
370
    Reply.objects.create(**reply_create_args)
1✔
371
    return mail
1✔
372

373

374
@csrf_exempt
1✔
375
def sns_inbound(request):
1✔
376
    incr_if_enabled("sns_inbound", 1)
1✔
377
    # First thing we do is verify the signature
378
    json_body = json.loads(request.body)
1✔
379
    verified_json_body = verify_from_sns(json_body)
1✔
380

381
    # Validate ARN and message type
382
    topic_arn = verified_json_body.get("TopicArn", None)
1✔
383
    message_type = verified_json_body.get("Type", None)
1✔
384
    error_details = validate_sns_arn_and_type(topic_arn, message_type)
1✔
385
    if error_details:
1✔
386
        logger.error("validate_sns_arn_and_type_error", extra=error_details)
1✔
387
        return HttpResponse(error_details["error"], status=400)
1✔
388

389
    return _sns_inbound_logic(topic_arn, message_type, verified_json_body)
1✔
390

391

392
def validate_sns_arn_and_type(
1✔
393
    topic_arn: str | None, message_type: str | None
394
) -> dict[str, Any] | None:
395
    """
396
    Validate Topic ARN and SNS Message Type.
397

398
    If an error is detected, the return is a dictionary of error details.
399
    If no error is detected, the return is None.
400
    """
401
    if not topic_arn:
1✔
402
        error = "Received SNS request without Topic ARN."
1✔
403
    elif topic_arn not in settings.AWS_SNS_TOPIC:
1✔
404
        error = "Received SNS message for wrong topic."
1✔
405
    elif not message_type:
1✔
406
        error = "Received SNS request without Message Type."
1✔
407
    elif message_type not in SUPPORTED_SNS_TYPES:
1✔
408
        error = "Received SNS message for unsupported Type."
1✔
409
    else:
410
        error = None
1✔
411

412
    if error:
1✔
413
        return {
1✔
414
            "error": error,
415
            "received_topic_arn": shlex.quote(topic_arn) if topic_arn else topic_arn,
416
            "supported_topic_arn": sorted(settings.AWS_SNS_TOPIC),
417
            "received_sns_type": (
418
                shlex.quote(message_type) if message_type else message_type
419
            ),
420
            "supported_sns_types": SUPPORTED_SNS_TYPES,
421
        }
422
    return None
1✔
423

424

425
def _sns_inbound_logic(topic_arn, message_type, json_body):
1✔
426
    if message_type == "SubscriptionConfirmation":
×
427
        info_logger.info(
×
428
            "SNS SubscriptionConfirmation",
429
            extra={"SubscribeURL": json_body["SubscribeURL"]},
430
        )
431
        return HttpResponse("Logged SubscribeURL", status=200)
×
432
    if message_type == "Notification":
×
433
        incr_if_enabled("sns_inbound_Notification", 1)
×
434
        return _sns_notification(json_body)
×
435

436
    logger.error(
×
437
        "SNS message type did not fall under the SNS inbound logic",
438
        extra={"message_type": shlex.quote(message_type)},
439
    )
440
    capture_message(
×
441
        "Received SNS message with type not handled in inbound log",
442
        level="error",
443
        stack=True,
444
    )
445
    return HttpResponse(
×
446
        "Received SNS message with type not handled in inbound log", status=400
447
    )
448

449

450
def _sns_notification(json_body):
1✔
451
    try:
1✔
452
        message_json = json.loads(json_body["Message"])
1✔
453
    except JSONDecodeError:
1✔
454
        logger.error(
1✔
455
            "SNS notification has non-JSON message body",
456
            extra={"content": shlex.quote(json_body["Message"])},
457
        )
458
        return HttpResponse("Received SNS notification with non-JSON body", status=400)
1✔
459

460
    event_type = message_json.get("eventType")
1✔
461
    notification_type = message_json.get("notificationType")
1✔
462
    if notification_type not in {
1✔
463
        "Complaint",
464
        "Received",
465
        "Bounce",
466
    } and event_type not in {"Complaint", "Bounce"}:
467
        logger.error(
1✔
468
            "SNS notification for unsupported type",
469
            extra={
470
                "notification_type": shlex.quote(notification_type),
471
                "event_type": shlex.quote(event_type),
472
                "keys": [shlex.quote(key) for key in message_json.keys()],
473
            },
474
        )
475
        return HttpResponse(
1✔
476
            (
477
                "Received SNS notification for unsupported Type: "
478
                f"{html.escape(shlex.quote(notification_type))}"
479
            ),
480
            status=400,
481
        )
482
    response = _sns_message(message_json)
1✔
483
    bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
484
    if response.status_code < 500:
1✔
485
        remove_message_from_s3(bucket, object_key)
1✔
486

487
    return response
1✔
488

489

490
def _get_recipient_with_relay_domain(recipients):
1✔
491
    domains_to_check = get_domains_from_settings().values()
1✔
492
    for recipient in recipients:
1✔
493
        for domain in domains_to_check:
1✔
494
            if domain in recipient:
1✔
495
                return recipient
1✔
496
    return None
1✔
497

498

499
def _get_relay_recipient_from_message_json(message_json):
1✔
500
    # Go thru all To, Cc, and Bcc fields and
501
    # return the one that has a Relay domain
502

503
    # First check common headers for to or cc match
504
    headers_to_check = "to", "cc"
1✔
505
    common_headers = message_json["mail"]["commonHeaders"]
1✔
506
    for header in headers_to_check:
1✔
507
        if header in common_headers:
1✔
508
            recipient = _get_recipient_with_relay_domain(common_headers[header])
1✔
509
            if recipient is not None:
1✔
510
                return parseaddr(recipient)[1]
1✔
511

512
    # SES-SNS sends bcc in a different part of the message
513
    recipients = message_json["receipt"]["recipients"]
1✔
514
    return _get_recipient_with_relay_domain(recipients)
1✔
515

516

517
def _sns_message(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
518
    incr_if_enabled("sns_inbound_Notification_Received", 1)
1✔
519
    init_waffle_flags()
1✔
520
    notification_type = message_json.get("notificationType")
1✔
521
    event_type = message_json.get("eventType")
1✔
522
    if notification_type == "Bounce" or event_type == "Bounce":
1✔
523
        return _handle_bounce(message_json)
1✔
524
    if notification_type == "Complaint" or event_type == "Complaint":
1✔
525
        return _handle_complaint(message_json)
1✔
526
    if notification_type != "Received":
1!
527
        raise ValueError('notification_type must be "Received"')
×
528
    if event_type is not None:
1!
529
        raise ValueError("event_type must be None")
×
530
    return _handle_received(message_json)
1✔
531

532

533
# Enumerate the reasons that an email was not forwarded.
534
# This excludes emails dropped due to mask forwarding settings,
535
# such as "block all" and "block promotional". Those are logged
536
# as Glean email_blocked events.
537
EmailDroppedReason = Literal[
1✔
538
    "auto_block_spam",  # Email identified as spam, user has the auto_block_spam flag
539
    "dmarc_reject_failed",  # Email failed DMARC check with a reject policy
540
    "hard_bounce_pause",  # The user recently had a hard bounce
541
    "soft_bounce_pause",  # The user recently has a soft bounce
542
    "abuse_flag",  # The user exceeded an abuse limit, like mails forwarded
543
    "user_deactivated",  # The user account is deactivated
544
    "reply_requires_premium",  # The email is a reply from a free user
545
    "content_missing",  # Could not load the email from storage
546
    "error_from_header",  # Error generating the From: header, retryable
547
    "error_storage",  # Error fetching the email contents from storage (S3), retryable
548
    "error_sending",  # Error sending the forwarded email (SES), retryable
549
]
550

551

552
def log_email_dropped(
1✔
553
    reason: EmailDroppedReason,
554
    mask: RelayAddress | DomainAddress,
555
    is_reply: bool = False,
556
    can_retry: bool = False,
557
) -> None:
558
    """
559
    Log that an email was dropped for a reason other than a mask blocking setting.
560

561
    This mirrors the interface of glean_logger().log_email_blocked(), which
562
    records emails dropped due to the mask's blocking setting.
563
    """
564
    extra: dict[str, str | int | bool] = {"reason": reason}
1✔
565
    if mask.user.profile.metrics_enabled:
1✔
566
        if mask.user.profile.fxa is not None:
1✔
567
            extra["fxa_id"] = mask.user.profile.fxa.uid
1✔
568
        extra["mask_id"] = mask.metrics_id
1✔
569
    extra |= {
1✔
570
        "is_random_mask": isinstance(mask, RelayAddress),
571
        "is_reply": is_reply,
572
        "can_retry": can_retry,
573
    }
574
    info_logger.info("email_dropped", extra=extra)
1✔
575

576

577
def _handle_received(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
578
    """
579
    Handle an AWS SES received notification.
580

581
    For more information, see:
582
    https://docs.aws.amazon.com/ses/latest/dg/receiving-email-notifications-contents.html
583
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html
584

585
    Returns (may be incomplete):
586
    * 200 if the email was sent, the Relay address is disabled, the Relay user is
587
      flagged for abuse, the email is under a bounce pause, the email was suppressed
588
      for spam, the list email was blocked, or the noreply address was the recipient.
589
    * 400 if commonHeaders entry is missing, the Relay recipient address is malformed,
590
      the email failed DMARC with reject policy, or the email is a reply chain to a
591
      non-premium user.
592
    * 404 if an S3-stored email was not found, no Relay address was found in the "To",
593
      "CC", or "BCC" fields, or the Relay address is not in the database.
594
    * 503 if the "From" address is malformed, the S3 client returned an error different
595
      from "not found", or the SES client fails
596

597
    And many other returns conditions if the email is a reply. The HTTP returns are an
598
    artifact from an earlier time when emails were sent to a webhook. Currently,
599
    production instead pulls events from a queue.
600

601
    TODO: Return a more appropriate status object
602
    TODO: Document the metrics emitted
603
    """
604
    mail = message_json["mail"]
1✔
605
    if "commonHeaders" not in mail:
1✔
606
        logger.error("SNS message without commonHeaders")
1✔
607
        return HttpResponse(
1✔
608
            "Received SNS notification without commonHeaders.", status=400
609
        )
610
    common_headers = mail["commonHeaders"]
1✔
611
    receipt = message_json["receipt"]
1✔
612

613
    _record_receipt_verdicts(receipt, "all")
1✔
614
    to_address = _get_relay_recipient_from_message_json(message_json)
1✔
615
    if to_address is None:
1✔
616
        incr_if_enabled("no_relay_domain_in_recipient_fields", 1)
1✔
617
        return HttpResponse("Address does not exist", status=404)
1✔
618

619
    _record_receipt_verdicts(receipt, "relay_recipient")
1✔
620
    from_addresses = parse_email_header(common_headers["from"][0])
1✔
621
    if not from_addresses:
1✔
622
        info_logger.error(
1✔
623
            "_handle_received: no from address",
624
            extra={
625
                "source": mail["source"],
626
                "common_headers_from": common_headers["from"],
627
            },
628
        )
629
        return HttpResponse("Unable to parse From address", status=400)
1✔
630
    from_address = from_addresses[0][1]
1✔
631

632
    try:
1✔
633
        [to_local_portion, to_domain_portion] = to_address.split("@")
1✔
634
    except ValueError:
1✔
635
        # TODO: Add metric
636
        return HttpResponse("Malformed to field.", status=400)
1✔
637

638
    if to_local_portion.lower() == "noreply":
1✔
639
        incr_if_enabled("email_for_noreply_address", 1)
1✔
640
        return HttpResponse("noreply address is not supported.")
1✔
641
    try:
1✔
642
        # FIXME: this ambiguous return of either
643
        # RelayAddress or DomainAddress types makes the Rustacean in me throw
644
        # up a bit.
645
        address = _get_address(to_address)
1✔
646
        prefetch_related_objects([address.user], "socialaccount_set", "profile")
1✔
647
        user_profile = address.user.profile
1✔
648
    except (
1✔
649
        ObjectDoesNotExist,
650
        CannotMakeAddressException,
651
        DeletedAddress.MultipleObjectsReturned,
652
    ):
653
        if to_local_portion.lower() == "replies":
1✔
654
            response = _handle_reply(from_address, message_json, to_address)
1✔
655
        else:
656
            response = HttpResponse("Address does not exist", status=404)
1✔
657
        return response
1✔
658

659
    _record_receipt_verdicts(receipt, "valid_user")
1✔
660
    # if this is spam and the user is set to auto-block spam, early return
661
    if user_profile.auto_block_spam and _get_verdict(receipt, "spam") == "FAIL":
1✔
662
        incr_if_enabled("email_auto_suppressed_for_spam", 1)
1✔
663
        log_email_dropped(reason="auto_block_spam", mask=address)
1✔
664
        return HttpResponse("Address rejects spam.")
1✔
665

666
    if _get_verdict(receipt, "dmarc") == "FAIL":
1✔
667
        policy = receipt.get("dmarcPolicy", "none")
1✔
668
        # TODO: determine action on dmarcPolicy "quarantine"
669
        if policy == "reject":
1!
670
            log_email_dropped(reason="dmarc_reject_failed", mask=address)
1✔
671
            incr_if_enabled(
1✔
672
                "email_suppressed_for_dmarc_failure",
673
                tags=["dmarcPolicy:reject", "dmarcVerdict:FAIL"],
674
            )
675
            return HttpResponse("DMARC failure, policy is reject", status=400)
1✔
676

677
    # if this user is over bounce limits, early return
678
    bounce_paused, bounce_type = user_profile.check_bounce_pause()
1✔
679
    if bounce_paused:
1✔
680
        _record_receipt_verdicts(receipt, "user_bounce_paused")
1✔
681
        incr_if_enabled(f"email_suppressed_for_{bounce_type}_bounce", 1)
1✔
682
        reason: Literal["soft_bounce_pause", "hard_bounce_pause"] = (
1✔
683
            "soft_bounce_pause" if bounce_type == "soft" else "hard_bounce_pause"
684
        )
685
        log_email_dropped(reason=reason, mask=address)
1✔
686
        return HttpResponse("Address is temporarily disabled.")
1✔
687

688
    # check if this is a reply from an external sender to a Relay user
689
    try:
1✔
690
        (lookup_key, _) = _get_keys_from_headers(mail["headers"])
1✔
691
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
692
        user_address = address
1✔
693
        address = reply_record.address
1✔
694
        message_id = _get_message_id_from_headers(mail["headers"])
1✔
695
        # make sure the relay user is premium
696
        if not _reply_allowed(from_address, to_address, reply_record, message_id):
1!
697
            log_email_dropped(reason="reply_requires_premium", mask=user_address)
1✔
698
            return HttpResponse("Relay replies require a premium account", status=403)
1✔
699
    except (ReplyHeadersNotFound, Reply.DoesNotExist):
1✔
700
        # if there's no In-Reply-To header, or the In-Reply-To value doesn't
701
        # match a Reply record, continue to treat this as a regular email from
702
        # an external sender to a relay user
703
        pass
1✔
704

705
    # if account flagged for abuse, early return
706
    if user_profile.is_flagged:
1✔
707
        log_email_dropped(reason="abuse_flag", mask=address)
1✔
708
        return HttpResponse("Address is temporarily disabled.")
1✔
709

710
    if not user_profile.user.is_active:
1✔
711
        log_email_dropped(reason="user_deactivated", mask=address)
1✔
712
        return HttpResponse("Account is deactivated.")
1✔
713

714
    # if address is set to block, early return
715
    if not address.enabled:
1✔
716
        incr_if_enabled("email_for_disabled_address", 1)
1✔
717
        address.num_blocked += 1
1✔
718
        address.save(update_fields=["num_blocked"])
1✔
719
        _record_receipt_verdicts(receipt, "disabled_alias")
1✔
720
        user_profile.last_engagement = datetime.now(UTC)
1✔
721
        user_profile.save()
1✔
722
        glean_logger().log_email_blocked(mask=address, reason="block_all")
1✔
723
        return HttpResponse("Address is temporarily disabled.")
1✔
724

725
    _record_receipt_verdicts(receipt, "active_alias")
1✔
726
    incr_if_enabled("email_for_active_address", 1)
1✔
727

728
    # if address is blocking list emails, and email is from list, early return
729
    if (
1✔
730
        address
731
        and address.block_list_emails
732
        and user_profile.has_premium
733
        and _check_email_from_list(mail["headers"])
734
    ):
735
        incr_if_enabled("list_email_for_address_blocking_lists", 1)
1✔
736
        address.num_blocked += 1
1✔
737
        address.save(update_fields=["num_blocked"])
1✔
738
        user_profile.last_engagement = datetime.now(UTC)
1✔
739
        user_profile.save()
1✔
740
        glean_logger().log_email_blocked(mask=address, reason="block_promotional")
1✔
741
        return HttpResponse("Address is not accepting list emails.")
1✔
742

743
    # Collect new headers
744
    subject = common_headers.get("subject", "")
1✔
745
    destination_address = user_profile.user.email
1✔
746
    reply_address = get_reply_to_address()
1✔
747
    try:
1✔
748
        from_header = generate_from_header(from_address, to_address)
1✔
749
    except InvalidFromHeader:
1✔
750
        # TODO: MPP-3407, MPP-3417 - Determine how to handle these
751
        header_from = []
1✔
752
        for header in mail["headers"]:
1✔
753
            if header["name"].lower() == "from":
1✔
754
                header_from.append(header)
1✔
755
        info_logger.error(
1✔
756
            "generate_from_header",
757
            extra={
758
                "from_address": from_address,
759
                "source": mail["source"],
760
                "common_headers_from": common_headers["from"],
761
                "headers_from": header_from,
762
            },
763
        )
764
        log_email_dropped(reason="error_from_header", mask=address, can_retry=True)
1✔
765
        return HttpResponse("Cannot parse the From address", status=503)
1✔
766

767
    # Get incoming email
768
    try:
1✔
769
        (incoming_email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
770
    except ClientError as e:
1✔
771
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
772
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
773
            log_email_dropped(reason="content_missing", mask=address)
1✔
774
            return HttpResponse("Email not in S3", status=404)
1✔
775
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
776
        log_email_dropped(reason="error_storage", mask=address, can_retry=True)
1✔
777
        # we are returning a 503 so that SNS can retry the email processing
778
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
779

780
    # Handle developer overrides, logging
781
    dev_action = _get_developer_mode_action(address)
1✔
782
    if dev_action:
1✔
783
        if dev_action.new_destination_address:
1!
784
            destination_address = dev_action.new_destination_address
1✔
785
        _log_dev_notification(
1✔
786
            "_handle_received: developer_mode", dev_action, message_json
787
        )
788

789
    # Convert to new email
790
    headers: OutgoingHeaders = {
1✔
791
        "Subject": subject,
792
        "From": from_header,
793
        "To": destination_address,
794
        "Reply-To": reply_address,
795
        "Resent-From": from_address,
796
    }
797
    sample_trackers = bool(sample_is_active("tracker_sample"))
1✔
798
    tracker_removal_flag = flag_is_active_in_task("tracker_removal", address.user)
1✔
799
    remove_level_one_trackers = bool(
1✔
800
        tracker_removal_flag and user_profile.remove_level_one_email_trackers
801
    )
802
    (
1✔
803
        forwarded_email,
804
        issues,
805
        level_one_trackers_removed,
806
        has_html,
807
        has_text,
808
    ) = _convert_to_forwarded_email(
809
        incoming_email_bytes=incoming_email_bytes,
810
        headers=headers,
811
        to_address=to_address,
812
        from_address=from_address,
813
        language=user_profile.language,
814
        has_premium=user_profile.has_premium,
815
        sample_trackers=sample_trackers,
816
        remove_level_one_trackers=remove_level_one_trackers,
817
    )
818
    if has_html:
1✔
819
        incr_if_enabled("email_with_html_content", 1)
1✔
820
    if has_text:
1✔
821
        incr_if_enabled("email_with_text_content", 1)
1✔
822
    if issues:
1✔
823
        info_logger.info(
1✔
824
            "_handle_received: forwarding issues", extra={"issues": issues}
825
        )
826

827
    # Send new email
828
    destination_address = ensure_ascii_email(destination_address)
1✔
829
    try:
1✔
830
        ses_response = ses_send_raw_email(
1✔
831
            source_address=reply_address,
832
            destination_address=destination_address,
833
            message=forwarded_email,
834
        )
835
    except ClientError:
1✔
836
        # 503 service unavailable response to SNS so it can retry
837
        log_email_dropped(reason="error_sending", mask=address, can_retry=True)
1✔
838
        return HttpResponse("SES client error on Raw Email", status=503)
1✔
839

840
    message_id = ses_response["MessageId"]
1✔
841
    _store_reply_record(mail, message_id, address)
1✔
842

843
    user_profile.update_abuse_metric(
1✔
844
        email_forwarded=True, forwarded_email_size=len(incoming_email_bytes)
845
    )
846
    user_profile.last_engagement = datetime.now(UTC)
1✔
847
    user_profile.save()
1✔
848
    address.num_forwarded += 1
1✔
849
    address.last_used_at = datetime.now(UTC)
1✔
850
    if level_one_trackers_removed:
1!
851
        address.num_level_one_trackers_blocked = (
×
852
            address.num_level_one_trackers_blocked or 0
853
        ) + level_one_trackers_removed
854
    address.save(
1✔
855
        update_fields=[
856
            "num_forwarded",
857
            "last_used_at",
858
            "block_list_emails",
859
            "num_level_one_trackers_blocked",
860
        ]
861
    )
862
    glean_logger().log_email_forwarded(mask=address, is_reply=False)
1✔
863
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
864

865

866
class DeveloperModeAction(NamedTuple):
1✔
867
    mask_id: str
1✔
868
    action: Literal["log", "simulate_complaint"] = "log"
1✔
869
    new_destination_address: str | None = None
1✔
870

871

872
def _get_verdict(receipt, verdict_type):
1✔
873
    return receipt[f"{verdict_type}Verdict"]["status"]
1✔
874

875

876
def _check_email_from_list(headers):
1✔
877
    for header in headers:
1!
878
        if header["name"].lower().startswith("list-"):
1!
879
            return True
1✔
880
    return False
×
881

882

883
def _record_receipt_verdicts(receipt, state):
1✔
884
    verdict_tags = []
1✔
885
    for key in sorted(receipt.keys()):
1✔
886
        if key.endswith("Verdict"):
1✔
887
            value = receipt[key]["status"]
1✔
888
            verdict_tags.append(f"{key}:{value}")
1✔
889
            incr_if_enabled(f"relay.emails.verdicts.{key}", 1, [f"state:{state}"])
1✔
890
        elif key == "dmarcPolicy":
1✔
891
            value = receipt[key]
1✔
892
            verdict_tags.append(f"{key}:{value}")
1✔
893
    incr_if_enabled(f"relay.emails.state.{state}", 1, verdict_tags)
1✔
894

895

896
def _get_message_id_from_headers(headers):
1✔
897
    message_id = None
1✔
898
    for header in headers:
1✔
899
        if header["name"].lower() == "message-id":
1✔
900
            message_id = header["value"]
1✔
901
    return message_id
1✔
902

903

904
def _get_keys_from_headers(headers):
1✔
905
    in_reply_to = None
1✔
906
    for header in headers:
1✔
907
        if header["name"].lower() == "in-reply-to":
1✔
908
            in_reply_to = header["value"]
1✔
909
            message_id_bytes = get_message_id_bytes(in_reply_to)
1✔
910
            return derive_reply_keys(message_id_bytes)
1✔
911

912
        if header["name"].lower() == "references":
1✔
913
            message_ids = header["value"]
1✔
914
            for message_id in message_ids.split(" "):
1✔
915
                message_id_bytes = get_message_id_bytes(message_id)
1✔
916
                lookup_key, encryption_key = derive_reply_keys(message_id_bytes)
1✔
917
                try:
1✔
918
                    # FIXME: calling code is likely to duplicate this query
919
                    _get_reply_record_from_lookup_key(lookup_key)
1✔
920
                    return lookup_key, encryption_key
1✔
921
                except Reply.DoesNotExist:
1✔
922
                    pass
1✔
923
            raise Reply.DoesNotExist
1✔
924
    incr_if_enabled("mail_to_replies_without_reply_headers", 1)
1✔
925
    raise ReplyHeadersNotFound
1✔
926

927

928
def _get_reply_record_from_lookup_key(lookup_key):
1✔
929
    lookup = b64_lookup_key(lookup_key)
1✔
930
    return Reply.objects.get(lookup=lookup)
1✔
931

932

933
def _strip_localpart_tag(address):
1✔
934
    [localpart, domain] = address.split("@")
1✔
935
    subaddress_parts = localpart.split("+")
1✔
936
    return f"{subaddress_parts[0]}@{domain}"
1✔
937

938

939
_TransportType = Literal["sns", "s3"]
1✔
940

941

942
def _get_email_bytes(
1✔
943
    message_json: AWS_SNSMessageJSON,
944
) -> tuple[bytes, _TransportType, float]:
945
    with Timer(logger=None) as load_timer:
1✔
946
        if "content" in message_json:
1✔
947
            # email content in sns message
948
            message_content = message_json["content"].encode("utf-8")
1✔
949
            transport: Literal["sns", "s3"] = "sns"
1✔
950
        else:
951
            # assume email content in S3
952
            bucket, object_key = _get_bucket_and_key_from_s3_json(message_json)
1✔
953
            message_content = get_message_content_from_s3(bucket, object_key)
1✔
954
            transport = "s3"
1✔
955
        histogram_if_enabled("relayed_email.size", len(message_content))
1✔
956
    load_time_s = round(load_timer.last, 3)
1✔
957
    return (message_content, transport, load_time_s)
1✔
958

959

960
def _get_developer_mode_action(
1✔
961
    mask: RelayAddress | DomainAddress,
962
) -> DeveloperModeAction | None:
963
    """Get the developer mode actions for this mask, if enabled."""
964

965
    if not (
1✔
966
        flag_is_active_in_task("developer_mode", mask.user)
967
        and "DEV:" in mask.description
968
    ):
969
        return None
1✔
970

971
    if "DEV:simulate_complaint" in mask.description:
1!
972
        action = DeveloperModeAction(
1✔
973
            mask_id=mask.metrics_id,
974
            action="simulate_complaint",
975
            new_destination_address=f"complaint+{mask.metrics_id}@simulator.amazonses.com",
976
        )
977
    else:
978
        action = DeveloperModeAction(mask_id=mask.metrics_id, action="log")
×
979
    return action
1✔
980

981

982
def _log_dev_notification(
1✔
983
    log_message: str, dev_action: DeveloperModeAction, notification: dict[str, Any]
984
) -> None:
985
    """
986
    Log notification JSON
987

988
    This will log information beyond our privacy policy, so it should only be used on
989
    Relay staff accounts with prior permission.
990

991
    The notification JSON will be compressed, Ascii85-encoded with padding, and broken
992
    into 1024-bytes chunks. This will ensure it fits into GCP's log entry, which has a
993
    64KB limit per label value.
994
    """
995

996
    notification_gza85 = encode_dict_gza85(notification)
1✔
997
    total_parts = notification_gza85.count("\n") + 1
1✔
998
    log_group_id = str(uuid4())
1✔
999
    for partnum, part in enumerate(notification_gza85.splitlines()):
1✔
1000
        info_logger.info(
1✔
1001
            log_message,
1002
            extra={
1003
                "mask_id": dev_action.mask_id,
1004
                "dev_action": dev_action.action,
1005
                "log_group_id": log_group_id,
1006
                "part": partnum,
1007
                "parts": total_parts,
1008
                "notification_gza85": part,
1009
            },
1010
        )
1011

1012

1013
def _convert_to_forwarded_email(
1✔
1014
    incoming_email_bytes: bytes,
1015
    headers: OutgoingHeaders,
1016
    to_address: str,
1017
    from_address: str,
1018
    language: str,
1019
    has_premium: bool,
1020
    sample_trackers: bool,
1021
    remove_level_one_trackers: bool,
1022
    now: datetime | None = None,
1023
) -> tuple[EmailMessage, EmailForwardingIssues, int, bool, bool]:
1024
    """
1025
    Convert an email (as bytes) to a forwarded email.
1026

1027
    Return is a tuple:
1028
    - email - The forwarded email
1029
    - issues - Any detected issues in conversion
1030
    - level_one_trackers_removed (int) - Number of trackers removed
1031
    - has_html - True if the email has an HTML representation
1032
    - has_text - True if the email has a plain text representation
1033
    """
1034
    email = message_from_bytes(incoming_email_bytes, policy=relay_policy)
1✔
1035
    # python/typeshed issue 2418
1036
    # The Python 3.2 default was Message, 3.6 uses policy.message_factory, and
1037
    # policy.default.message_factory is EmailMessage
1038
    if not isinstance(email, EmailMessage):
1!
1039
        raise TypeError("email must be type EmailMessage")
×
1040

1041
    # Replace headers in the original email
1042
    header_issues = _replace_headers(email, headers)
1✔
1043

1044
    # Find and replace text content
1045
    text_body = email.get_body("plain")
1✔
1046
    text_content = None
1✔
1047
    has_text = False
1✔
1048
    if text_body:
1✔
1049
        has_text = True
1✔
1050
        if not isinstance(text_body, EmailMessage):
1!
1051
            raise TypeError("text_body must be type EmailMessage")
×
1052
        text_content = text_body.get_content()
1✔
1053
        new_text_content = _convert_text_content(text_content, to_address)
1✔
1054
        text_body.set_content(new_text_content)
1✔
1055

1056
    # Find and replace HTML content
1057
    html_body = email.get_body("html")
1✔
1058
    level_one_trackers_removed = 0
1✔
1059
    has_html = False
1✔
1060
    if html_body:
1✔
1061
        has_html = True
1✔
1062
        if not isinstance(html_body, EmailMessage):
1!
1063
            raise TypeError("html_body must be type EmailMessage")
×
1064
        html_content = html_body.get_content()
1✔
1065
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
1066
            html_content,
1067
            to_address,
1068
            from_address,
1069
            language,
1070
            has_premium,
1071
            sample_trackers,
1072
            remove_level_one_trackers,
1073
        )
1074
        html_body.set_content(new_content, subtype="html")
1✔
1075
    elif text_content:
1!
1076
        # Try to use the text content to generate HTML content
1077
        html_content = urlize_and_linebreaks(text_content)
1✔
1078
        new_content, level_one_trackers_removed = _convert_html_content(
1✔
1079
            html_content,
1080
            to_address,
1081
            from_address,
1082
            language,
1083
            has_premium,
1084
            sample_trackers,
1085
            remove_level_one_trackers,
1086
        )
1087
        if not isinstance(text_body, EmailMessage):
1!
1088
            raise TypeError("text_body must be type EmailMessage")
×
1089
        try:
1✔
1090
            text_body.add_alternative(new_content, subtype="html")
1✔
1091
        except TypeError as e:
×
1092
            out = StringIO()
×
1093
            _structure(email, fp=out)
×
1094
            info_logger.error(
×
1095
                "Adding HTML alternate failed",
1096
                extra={"exception": str(e), "structure": out.getvalue()},
1097
            )
1098

1099
    issues: EmailForwardingIssues = {}
1✔
1100
    if header_issues:
1✔
1101
        issues["headers"] = header_issues
1✔
1102
    return (email, issues, level_one_trackers_removed, has_html, has_text)
1✔
1103

1104

1105
def _replace_headers(
1✔
1106
    email: EmailMessage, headers: OutgoingHeaders
1107
) -> EmailHeaderIssues:
1108
    """
1109
    Replace the headers in email with new headers.
1110

1111
    This replaces headers in the passed email object, rather than returns an altered
1112
    copy. The primary reason is that the Python email package can read an email with
1113
    non-compliant headers or content, but can't write it. A read/write is required to
1114
    create a copy that we then alter. This code instead alters the passed EmailMessage
1115
    object, making header-specific changes in try / except statements.
1116

1117
    The other reason is the object size. An Email can be up to 10 MB, and we hope to
1118
    support 40 MB emails someday. Modern servers may be OK with this, but it would be
1119
    nice to handle the non-compliant headers without crashing before we add a source of
1120
    memory-related crashes.
1121
    """
1122
    # Look for headers to drop
1123
    to_drop: list[str] = []
1✔
1124
    replacements: set[str] = {_k.lower() for _k in headers.keys()}
1✔
1125
    issues: EmailHeaderIssues = []
1✔
1126

1127
    # Detect non-compliant headers in incoming emails
1128
    for header in email.keys():
1✔
1129
        try:
1✔
1130
            value = email[header]
1✔
1131
        except Exception as e:
1✔
1132
            issues.append(
1✔
1133
                {"header": header, "direction": "in", "exception_on_read": repr(e)}
1134
            )
1135
            value = None
1✔
1136
        if getattr(value, "defects", None):
1✔
1137
            issues.append(
1✔
1138
                {
1139
                    "header": header,
1140
                    "direction": "in",
1141
                    "defect_count": len(value.defects),
1142
                    "parsed_value": str(value),
1143
                    "raw_value": str(value.as_raw),
1144
                }
1145
            )
1146
        elif getattr(getattr(value, "_parse_tree", None), "all_defects", []):
1✔
1147
            issues.append(
1✔
1148
                {
1149
                    "header": header,
1150
                    "direction": "in",
1151
                    "defect_count": len(value._parse_tree.all_defects),
1152
                    "parsed_value": str(value),
1153
                    "raw_value": str(value.as_raw),
1154
                }
1155
            )
1156

1157
    # Collect headers that will not be forwarded
1158
    for header in email.keys():
1✔
1159
        header_lower = header.lower()
1✔
1160
        if (
1✔
1161
            header_lower not in replacements
1162
            and header_lower != "mime-version"
1163
            and not header_lower.startswith("content-")
1164
        ):
1165
            to_drop.append(header)
1✔
1166

1167
    # Drop headers that should be dropped
1168
    for header in to_drop:
1✔
1169
        del email[header]
1✔
1170

1171
    # Replace the requested headers
1172
    for header, value in headers.items():
1✔
1173
        del email[header]
1✔
1174
        try:
1✔
1175
            email[header] = value.rstrip("\r\n")
1✔
1176
        except Exception as e:
×
1177
            issues.append(
×
1178
                {
1179
                    "header": header,
1180
                    "direction": "out",
1181
                    "exception_on_write": repr(e),
1182
                    "value": value,
1183
                }
1184
            )
1185
            continue
×
1186
        try:
1✔
1187
            parsed_value = email[header]
1✔
1188
        except Exception as e:
×
1189
            issues.append(
×
1190
                {
1191
                    "header": header,
1192
                    "direction": "out",
1193
                    "exception_on_write": repr(e),
1194
                    "value": value,
1195
                }
1196
            )
1197
            continue
×
1198
        if parsed_value.defects:
1!
1199
            issues.append(
×
1200
                {
1201
                    "header": header,
1202
                    "direction": "out",
1203
                    "defect_count": len(parsed_value.defects),
1204
                    "parsed_value": str(parsed_value),
1205
                    "raw_value": str(parsed_value.as_raw),
1206
                },
1207
            )
1208

1209
    return issues
1✔
1210

1211

1212
def _convert_html_content(
1✔
1213
    html_content: str,
1214
    to_address: str,
1215
    from_address: str,
1216
    language: str,
1217
    has_premium: bool,
1218
    sample_trackers: bool,
1219
    remove_level_one_trackers: bool,
1220
    now: datetime | None = None,
1221
) -> tuple[str, int]:
1222
    # frontend expects a timestamp in milliseconds
1223
    now = now or datetime.now(UTC)
1✔
1224
    datetime_now_ms = int(now.timestamp() * 1000)
1✔
1225

1226
    # scramble alias so that clients don't recognize it
1227
    # and apply default link styles
1228
    display_email = re.sub("([@.:])", r"<span>\1</span>", to_address)
1✔
1229

1230
    # sample tracker numbers
1231
    if sample_trackers:
1!
1232
        count_all_trackers(html_content)
×
1233

1234
    tracker_report_link = ""
1✔
1235
    removed_count = 0
1✔
1236
    if remove_level_one_trackers:
1!
1237
        html_content, tracker_details = remove_trackers(
×
1238
            html_content, from_address, datetime_now_ms
1239
        )
1240
        removed_count = tracker_details["tracker_removed"]
×
1241
        tracker_report_details = {
×
1242
            "sender": from_address,
1243
            "received_at": datetime_now_ms,
1244
            "trackers": tracker_details["level_one"]["trackers"],
1245
        }
1246
        tracker_report_link = f"{settings.SITE_ORIGIN}/tracker-report/#" + json.dumps(
×
1247
            tracker_report_details
1248
        )
1249

1250
    wrapped_html = wrap_html_email(
1✔
1251
        original_html=html_content,
1252
        language=language,
1253
        has_premium=has_premium,
1254
        display_email=display_email,
1255
        tracker_report_link=tracker_report_link,
1256
        num_level_one_email_trackers_removed=removed_count,
1257
    )
1258
    return wrapped_html, removed_count
1✔
1259

1260

1261
def _convert_text_content(text_content: str, to_address: str) -> str:
1✔
1262
    relay_header_text = (
1✔
1263
        "This email was sent to your alias "
1264
        f"{to_address}. To stop receiving emails sent to this alias, "
1265
        "update the forwarding settings in your dashboard.\n"
1266
        "---Begin Email---\n"
1267
    )
1268
    wrapped_text = relay_header_text + text_content
1✔
1269
    return wrapped_text
1✔
1270

1271

1272
def _build_reply_requires_premium_email(
1✔
1273
    from_address: str,
1274
    reply_record: Reply,
1275
    message_id: str | None,
1276
    decrypted_metadata: dict[str, Any] | None,
1277
) -> EmailMessage:
1278
    # If we haven't forwarded a first reply for this user yet, _reply_allowed
1279
    # will forward.  So, tell the user we forwarded it.
1280
    forwarded = not reply_record.address.user.profile.forwarded_first_reply
1✔
1281
    sender: str | None = ""
1✔
1282
    if decrypted_metadata is not None:
1!
1283
        sender = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1284
    ctx = {
1✔
1285
        "sender": sender or "",
1286
        "forwarded": forwarded,
1287
        "SITE_ORIGIN": settings.SITE_ORIGIN,
1288
    }
1289
    html_body = render_to_string("emails/reply_requires_premium.html", ctx)
1✔
1290
    text_body = render_to_string("emails/reply_requires_premium.txt", ctx)
1✔
1291

1292
    # Create the message
1293
    msg = EmailMessage()
1✔
1294
    msg["Subject"] = ftl_bundle.format("replies-not-included-in-free-account-header")
1✔
1295
    msg["From"] = get_reply_to_address()
1✔
1296
    msg["To"] = from_address
1✔
1297
    if message_id:
1!
1298
        msg["In-Reply-To"] = message_id
1✔
1299
        msg["References"] = message_id
1✔
1300
    msg.set_content(text_body)
1✔
1301
    msg.add_alternative(html_body, subtype="html")
1✔
1302
    return msg
1✔
1303

1304

1305
def _set_forwarded_first_reply(profile):
1✔
1306
    profile.forwarded_first_reply = True
1✔
1307
    profile.save()
1✔
1308

1309

1310
def _send_reply_requires_premium_email(
1✔
1311
    from_address: str,
1312
    reply_record: Reply,
1313
    message_id: str | None,
1314
    decrypted_metadata: dict[str, Any] | None,
1315
) -> None:
1316
    msg = _build_reply_requires_premium_email(
×
1317
        from_address, reply_record, message_id, decrypted_metadata
1318
    )
NEW
1319
    from_address = ensure_ascii_email(from_address)
×
1320
    try:
×
1321
        ses_send_raw_email(
×
1322
            source_address=get_reply_to_address(premium=False),
1323
            destination_address=from_address,
1324
            message=msg,
1325
        )
1326
        # If we haven't forwarded a first reply for this user yet, _reply_allowed will.
1327
        # So, updated the DB.
1328
        _set_forwarded_first_reply(reply_record.address.user.profile)
×
1329
    except ClientError as e:
×
1330
        logger.error("reply_not_allowed_ses_client_error", extra=e.response["Error"])
×
1331
    incr_if_enabled("free_user_reply_attempt", 1)
×
1332

1333

1334
def _reply_allowed(
1✔
1335
    from_address, to_address, reply_record, message_id=None, decrypted_metadata=None
1336
):
1337
    stripped_from_address = _strip_localpart_tag(from_address)
1✔
1338
    reply_record_email = reply_record.address.user.email
1✔
1339
    stripped_reply_record_address = _strip_localpart_tag(reply_record_email)
1✔
1340
    if (from_address == reply_record_email) or (
1!
1341
        stripped_from_address == stripped_reply_record_address
1342
    ):
1343
        # This is a Relay user replying to an external sender;
1344

1345
        if not reply_record.profile.user.is_active:
1!
1346
            return False
×
1347

1348
        if reply_record.profile.is_flagged:
1!
1349
            return False
×
1350

1351
        if reply_record.owner_has_premium:
1!
1352
            return True
1✔
1353

1354
        # if we haven't forwarded a first reply for this user, return True to allow
1355
        # this first reply
1356
        allow_first_reply = not reply_record.address.user.profile.forwarded_first_reply
×
1357
        _send_reply_requires_premium_email(
×
1358
            from_address, reply_record, message_id, decrypted_metadata
1359
        )
1360
        return allow_first_reply
×
1361
    else:
1362
        # The From: is not a Relay user, so make sure this is a reply *TO* a
1363
        # premium Relay user
1364
        try:
×
1365
            address = _get_address(to_address)
×
1366
            if address.user.profile.has_premium:
×
1367
                return True
×
1368
        except ObjectDoesNotExist:
×
1369
            return False
×
1370
    incr_if_enabled("free_user_reply_attempt", 1)
×
1371
    return False
×
1372

1373

1374
def _handle_reply(
1✔
1375
    from_address: str, message_json: AWS_SNSMessageJSON, to_address: str
1376
) -> HttpResponse:
1377
    """
1378
    Handle a reply from a Relay user to an external email.
1379

1380
    Returns (may be incomplete):
1381
    * 200 if the reply was sent
1382
    * 400 if the In-Reply-To and References headers are missing, none of the References
1383
      headers are a reply record, or the SES client raises an error
1384
    * 403 if the Relay user is not allowed to reply
1385
    * 404 if the S3-stored email is not found, or there is no matching Reply record in
1386
      the database
1387
    * 503 if the S3 client returns an error (other than not found), or the SES client
1388
      returns an error
1389

1390
    TODO: Return a more appropriate status object (see _handle_received)
1391
    TODO: Document metrics emitted
1392
    """
1393
    mail = message_json["mail"]
1✔
1394
    try:
1✔
1395
        (lookup_key, encryption_key) = _get_keys_from_headers(mail["headers"])
1✔
1396
    except ReplyHeadersNotFound:
1✔
1397
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-header"])
1✔
1398
        return HttpResponse("No In-Reply-To header", status=400)
1✔
1399

1400
    try:
1✔
1401
        reply_record = _get_reply_record_from_lookup_key(lookup_key)
1✔
1402
    except Reply.DoesNotExist:
1✔
1403
        incr_if_enabled("reply_email_header_error", 1, tags=["detail:no-reply-record"])
1✔
1404
        return HttpResponse("Unknown or stale In-Reply-To header", status=404)
1✔
1405

1406
    address = reply_record.address
1✔
1407
    message_id = _get_message_id_from_headers(mail["headers"])
1✔
1408
    decrypted_metadata = json.loads(
1✔
1409
        decrypt_reply_metadata(encryption_key, reply_record.encrypted_metadata)
1410
    )
1411
    if not _reply_allowed(
1✔
1412
        from_address, to_address, reply_record, message_id, decrypted_metadata
1413
    ):
1414
        log_email_dropped(reason="reply_requires_premium", mask=address, is_reply=True)
1✔
1415
        return HttpResponse("Relay replies require a premium account", status=403)
1✔
1416

1417
    outbound_from_address = address.full_address
1✔
1418
    incr_if_enabled("reply_email", 1)
1✔
1419
    subject = mail["commonHeaders"].get("subject", "")
1✔
1420
    to_address = decrypted_metadata.get("reply-to") or decrypted_metadata.get("from")
1✔
1421
    headers: OutgoingHeaders = {
1✔
1422
        "Subject": subject,
1423
        "From": outbound_from_address,
1424
        "To": to_address,
1425
        "Reply-To": outbound_from_address,
1426
    }
1427
    to_address = ensure_ascii_email(to_address)
1✔
1428
    try:
1✔
1429
        (email_bytes, transport, load_time_s) = _get_email_bytes(message_json)
1✔
1430
    except ClientError as e:
1✔
1431
        if e.response["Error"].get("Code", "") == "NoSuchKey":
1✔
1432
            logger.error("s3_object_does_not_exist", extra=e.response["Error"])
1✔
1433
            log_email_dropped(reason="content_missing", mask=address, is_reply=True)
1✔
1434
            return HttpResponse("Email not in S3", status=404)
1✔
1435
        logger.error("s3_client_error_get_email", extra=e.response["Error"])
1✔
1436
        log_email_dropped(
1✔
1437
            reason="error_storage", mask=address, is_reply=True, can_retry=True
1438
        )
1439
        # we are returning a 500 so that SNS can retry the email processing
1440
        return HttpResponse("Cannot fetch the message content from S3", status=503)
1✔
1441

1442
    email = message_from_bytes(email_bytes, policy=relay_policy)
1✔
1443
    if not isinstance(email, EmailMessage):
1!
1444
        raise TypeError("email must be type EmailMessage")
×
1445

1446
    # Convert to a reply email
1447
    # TODO: Issue #1747 - Remove wrapper / prefix in replies
1448
    _replace_headers(email, headers)
1✔
1449

1450
    try:
1✔
1451
        ses_send_raw_email(
1✔
1452
            source_address=outbound_from_address,
1453
            destination_address=to_address,
1454
            message=email,
1455
        )
1456
    except ClientError:
1✔
1457
        log_email_dropped(reason="error_sending", mask=address, is_reply=True)
1✔
1458
        return HttpResponse("SES client error", status=400)
1✔
1459

1460
    reply_record.increment_num_replied()
1✔
1461
    profile = address.user.profile
1✔
1462
    profile.update_abuse_metric(replied=True)
1✔
1463
    profile.last_engagement = datetime.now(UTC)
1✔
1464
    profile.save()
1✔
1465
    glean_logger().log_email_forwarded(mask=address, is_reply=True)
1✔
1466
    return HttpResponse("Sent email to final recipient.", status=200)
1✔
1467

1468

1469
def _get_domain_address(
1✔
1470
    local_portion: str, domain_portion: str, create: bool = True
1471
) -> DomainAddress:
1472
    """
1473
    Find or create the DomainAddress for the parts of an email address.
1474

1475
    If the domain_portion is for a valid subdomain, and create=True, a new DomainAddress
1476
    will be created and returned. If create=False, DomainAddress.DoesNotExist is raised.
1477

1478
    If the domain_portion is for an unknown domain, ObjectDoesNotExist is raised.
1479

1480
    If the domain_portion is for an unclaimed subdomain, Profile.DoesNotExist is raised.
1481
    """
1482

1483
    [address_subdomain, address_domain] = domain_portion.split(".", 1)
1✔
1484
    if address_domain != get_domains_from_settings()["MOZMAIL_DOMAIN"]:
1✔
1485
        if create:
1✔
1486
            incr_if_enabled("email_for_not_supported_domain", 1)
1✔
1487
        raise ObjectDoesNotExist("Address does not exist")
1✔
1488
    try:
1✔
1489
        with transaction.atomic():
1✔
1490
            locked_profile = Profile.objects.select_for_update().get(
1✔
1491
                subdomain=address_subdomain
1492
            )
1493
            domain_numerical = get_domain_numerical(address_domain)
1✔
1494
            # filter DomainAddress because it may not exist
1495
            # which will throw an error with get()
1496
            domain_address = DomainAddress.objects.filter(
1✔
1497
                user=locked_profile.user, address=local_portion, domain=domain_numerical
1498
            ).first()
1499
            if domain_address is None:
1✔
1500
                if not create:
1✔
1501
                    raise DomainAddress.DoesNotExist()
1✔
1502
                # TODO: Consider flows when a user generating alias on a fly
1503
                # was unable to receive an email due to user no longer being a
1504
                # premium user as seen in exception thrown on make_domain_address
1505
                domain_address = DomainAddress.make_domain_address(
1✔
1506
                    locked_profile.user, local_portion, True
1507
                )
1508
                glean_logger().log_email_mask_created(
1✔
1509
                    mask=domain_address,
1510
                    created_by_api=False,
1511
                )
1512
            domain_address.last_used_at = datetime.now(UTC)
1✔
1513
            domain_address.save()
1✔
1514
            return domain_address
1✔
1515
    except Profile.DoesNotExist as e:
1✔
1516
        if create:
1✔
1517
            incr_if_enabled("email_for_dne_subdomain", 1)
1✔
1518
        raise e
1✔
1519

1520

1521
def _get_address(address: str, create: bool = True) -> RelayAddress | DomainAddress:
1✔
1522
    """
1523
    Find or create the RelayAddress or DomainAddress for an email address.
1524

1525
    If an unknown email address is for a valid subdomain, and create is True,
1526
    a new DomainAddress will be created.
1527

1528
    On failure, raises exception based on Django's ObjectDoesNotExist:
1529
    * RelayAddress.DoesNotExist - looks like RelayAddress, deleted or does not exist
1530
    * Profile.DoesNotExist - looks like DomainAddress, no subdomain match
1531
    * DomainAddress.DoesNotExist - looks like unknown DomainAddress, create is False
1532
    * ObjectDoesNotExist - Unknown domain
1533
    """
1534

1535
    local_portion, domain_portion = address.split("@")
1✔
1536
    local_address = local_portion.lower()
1✔
1537
    domain = domain_portion.lower()
1✔
1538

1539
    # if the domain is not the site's 'top' relay domain,
1540
    # it may be for a user's subdomain
1541
    email_domains = get_domains_from_settings().values()
1✔
1542
    if domain not in email_domains:
1✔
1543
        return _get_domain_address(local_address, domain, create)
1✔
1544

1545
    # the domain is the site's 'top' relay domain, so look up the RelayAddress
1546
    try:
1✔
1547
        domain_numerical = get_domain_numerical(domain)
1✔
1548
        relay_address = RelayAddress.objects.get(
1✔
1549
            address=local_address, domain=domain_numerical
1550
        )
1551
        return relay_address
1✔
1552
    except RelayAddress.DoesNotExist as e:
1✔
1553
        if not create:
1✔
1554
            raise e
1✔
1555
        try:
1✔
1556
            DeletedAddress.objects.get(
1✔
1557
                address_hash=address_hash(local_address, domain=domain)
1558
            )
1559
            incr_if_enabled("email_for_deleted_address", 1)
1✔
1560
            # TODO: create a hard bounce receipt rule in SES
1561
        except DeletedAddress.DoesNotExist:
1✔
1562
            incr_if_enabled("email_for_unknown_address", 1)
1✔
1563
        except DeletedAddress.MultipleObjectsReturned:
1✔
1564
            # not sure why this happens on stage but let's handle it
1565
            incr_if_enabled("email_for_deleted_address_multiple", 1)
1✔
1566
        raise e
1✔
1567

1568

1569
def _get_address_if_exists(address: str) -> RelayAddress | DomainAddress | None:
1✔
1570
    """Get the matching RelayAddress or DomainAddress, or None if it doesn't exist."""
1571
    try:
1✔
1572
        return _get_address(address, create=False)
1✔
1573
    except (RelayAddress.DoesNotExist, Profile.DoesNotExist, ObjectDoesNotExist):
1✔
1574
        return None
1✔
1575

1576

1577
def _handle_bounce(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1578
    """
1579
    Handle an AWS SES bounce notification.
1580

1581
    For more information, see:
1582
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#bounce-object
1583

1584
    Returns:
1585
    * 404 response if any email address does not match a user,
1586
    * 200 response if all match or none are given
1587

1588
    Emits a counter metric "email_bounce" with these tags:
1589
    * bounce_type: 'permanent', 'transient', 'undetermined', 'none' if omitted
1590
    * bounce_subtype: 'undetermined', 'general', etc., 'none' if omitted
1591
    * user_match: 'found', 'missing', error states 'no_address' and 'no_recipients'
1592
    * relay_action: 'no_action', 'auto_block_spam', 'hard_bounce', 'soft_bounce'
1593

1594
    Emits an info log "bounce_notification", same data as metric, plus:
1595
    * bounce_action: 'action' from bounced recipient data, or None
1596
    * bounce_status: 'status' from bounced recipient data, or None
1597
    * bounce_diagnostic: 'diagnosticCode' from bounced recipient data, or None
1598
    * bounce_extra: Extra data from bounce_recipient data, if any
1599
    * domain: User's real email address domain, if an address was given
1600
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1601
    """
1602
    bounce = message_json.get("bounce", {})
1✔
1603
    bounce_type = bounce.get("bounceType", "none")
1✔
1604
    bounce_subtype = bounce.get("bounceSubType", "none")
1✔
1605
    bounced_recipients = bounce.get("bouncedRecipients", [])
1✔
1606

1607
    now = datetime.now(UTC)
1✔
1608
    bounce_data = []
1✔
1609
    for recipient in bounced_recipients:
1✔
1610
        recipient_address = recipient.pop("emailAddress", None)
1✔
1611
        data = {
1✔
1612
            "bounce_type": bounce_type,
1613
            "bounce_subtype": bounce_subtype,
1614
            "bounce_action": recipient.pop("action", ""),
1615
            "bounce_status": recipient.pop("status", ""),
1616
            "bounce_diagnostic": recipient.pop("diagnosticCode", ""),
1617
            "user_match": "no_address",
1618
            "relay_action": "no_action",
1619
        }
1620
        if recipient:
1!
1621
            data["bounce_extra"] = recipient.copy()
×
1622
        bounce_data.append(data)
1✔
1623

1624
        if recipient_address is None:
1!
1625
            continue
×
1626

1627
        recipient_address = parseaddr(recipient_address)[1]
1✔
1628
        recipient_domain = recipient_address.split("@")[1]
1✔
1629
        data["domain"] = recipient_domain
1✔
1630

1631
        try:
1✔
1632
            user = User.objects.get(email=recipient_address)
1✔
1633
            profile = user.profile
1✔
1634
            data["user_match"] = "found"
1✔
1635
            if (fxa := profile.fxa) and profile.metrics_enabled:
1✔
1636
                data["fxa_id"] = fxa.uid
1✔
1637
            else:
1638
                data["fxa_id"] = ""
1✔
1639
        except User.DoesNotExist:
1✔
1640
            # TODO: handle bounce for a user who no longer exists
1641
            # add to SES account-wide suppression list?
1642
            data["user_match"] = "missing"
1✔
1643
            continue
1✔
1644

1645
        action = None
1✔
1646
        if "spam" in data["bounce_diagnostic"].lower():
1✔
1647
            # if an email bounced as spam, set to auto block spam for this user
1648
            # and DON'T set them into bounce pause state
1649
            action = "auto_block_spam"
1✔
1650
            profile.auto_block_spam = True
1✔
1651
        elif bounce_type == "Permanent":
1✔
1652
            # TODO: handle sub-types: 'General', 'NoEmail', etc.
1653
            action = "hard_bounce"
1✔
1654
            profile.last_hard_bounce = now
1✔
1655
        elif bounce_type == "Transient":
1!
1656
            # TODO: handle sub-types: 'MessageTooLarge', 'AttachmentRejected', etc.
1657
            action = "soft_bounce"
1✔
1658
            profile.last_soft_bounce = now
1✔
1659
        if action:
1!
1660
            data["relay_action"] = action
1✔
1661
            profile.save()
1✔
1662

1663
    if not bounce_data:
1!
1664
        # Data when there are no identified recipients
1665
        bounce_data = [{"user_match": "no_recipients", "relay_action": "no_action"}]
×
1666

1667
    for data in bounce_data:
1✔
1668
        tags = {
1✔
1669
            "bounce_type": bounce_type,
1670
            "bounce_subtype": bounce_subtype,
1671
            "user_match": data["user_match"],
1672
            "relay_action": data["relay_action"],
1673
        }
1674
        incr_if_enabled(
1✔
1675
            "email_bounce",
1676
            1,
1677
            tags=[generate_tag(key, val) for key, val in tags.items()],
1678
        )
1679
        info_logger.info("bounce_notification", extra=data)
1✔
1680

1681
    if any(data["user_match"] == "missing" for data in bounce_data):
1✔
1682
        return HttpResponse("Address does not exist", status=404)
1✔
1683
    return HttpResponse("OK", status=200)
1✔
1684

1685

1686
def _build_disabled_mask_for_spam_email(
1✔
1687
    mask: RelayAddress | DomainAddress,
1688
) -> EmailMessage:
1689
    ctx = {"mask": mask.full_address, "SITE_ORIGIN": settings.SITE_ORIGIN}
1✔
1690
    html_body = render_to_string("emails/disabled_mask_for_spam.html", ctx)
1✔
1691
    text_body = render_to_string("emails/disabled_mask_for_spam.txt", ctx)
1✔
1692

1693
    # Create the message
1694
    msg = EmailMessage()
1✔
1695
    msg["Subject"] = ftl_bundle.format("relay-deactivated-mask-email-subject")
1✔
1696
    msg["From"] = settings.RELAY_FROM_ADDRESS
1✔
1697
    msg["To"] = mask.user.email
1✔
1698
    msg.set_content(text_body)
1✔
1699
    msg.add_alternative(html_body, subtype="html")
1✔
1700
    return msg
1✔
1701

1702

1703
def _send_disabled_mask_for_spam_email(mask: RelayAddress | DomainAddress) -> None:
1✔
1704
    msg = _build_disabled_mask_for_spam_email(mask)
1✔
1705
    if not settings.RELAY_FROM_ADDRESS:
1!
1706
        raise ValueError(
×
1707
            "Must set settings.RELAY_FROM_ADDRESS to send disabled_mask_for_spam email."
1708
        )
1709
    user_email = ensure_ascii_email(mask.user.email)
1✔
1710
    try:
1✔
1711
        ses_send_raw_email(
1✔
1712
            source_address=settings.RELAY_FROM_ADDRESS,
1713
            destination_address=user_email,
1714
            message=msg,
1715
        )
1716
    except ClientError as e:
×
1717
        logger.error("send_disabled_mask_ses_client_error", extra=e.response["Error"])
×
1718
    incr_if_enabled("send_disabled_mask_email", 1)
1✔
1719

1720

1721
def _handle_complaint(message_json: AWS_SNSMessageJSON) -> HttpResponse:
1✔
1722
    """
1723
    Handle an AWS SES complaint notification.
1724

1725
    This looks for Relay users in the complainedRecipients (real email address)
1726
    and the From: header (mask address). We expect both to match the same Relay user,
1727
    and return a 200. If one or the other do not match, a 404 is returned, and errors
1728
    may be logged.
1729

1730
    The first time a user complains, this sets the user's auto_block_spam flag to True.
1731

1732
    The second time a user complains, this disables the mask thru which the spam mail
1733
    was forwarded, and sends an email to the user to notify them the mask is disabled
1734
    and can be re-enabled on their dashboard.
1735

1736
    For more information on the complaint notification, see:
1737
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1738

1739
    Returns:
1740
    * 404 response if any email address does not match a user,
1741
    * 200 response if all match or none are given
1742

1743
    Emits a counter metric "email_complaint" with these tags:
1744
    * complaint_subtype: 'onaccountsuppressionlist', or 'none' if omitted
1745
    * complaint_feedback - feedback enumeration from ISP (usually 'abuse') or 'none'
1746
    * user_match: 'found' or 'no_recipients'
1747
    * relay_action: 'no_action', 'auto_block_spam', or 'disable_mask'
1748

1749
    Emits an info log "complaint_notification", same data as metric, plus:
1750
    * complaint_user_agent - identifies the client used to file the complaint
1751
    * complaint_extra - Extra data from complainedRecipients data, if any
1752
    * domain - User's domain, if an address was given
1753
    * found_in - "complained_recipients" (real email), "from_header" (email mask),
1754
      or "all" (matching records found in both)
1755
    * fxa_id - The Mozilla account (previously known as Firefox Account) ID of the user
1756
    * mask_match - "found" if "From" header contains an email mask, or "not_found"
1757
    """
1758
    complaint_data = _get_complaint_data(message_json)
1✔
1759
    complainers, unknown_count = _gather_complainers(complaint_data)
1✔
1760

1761
    # Reduce future complaints from complaining Relay users
1762
    actions: list[ComplaintAction] = []
1✔
1763
    for complainer in complainers:
1✔
1764
        action = _reduce_future_complaints(complainer)
1✔
1765
        actions.append(action)
1✔
1766

1767
        if (
1✔
1768
            flag_is_active_in_task("developer_mode", complainer["user"])
1769
            and action.mask_id
1770
        ):
1771
            _log_dev_notification(
1✔
1772
                "_handle_complaint: developer_mode",
1773
                DeveloperModeAction(mask_id=action.mask_id, action="log"),
1774
                message_json,
1775
            )
1776

1777
    # Log complaint and actions taken
1778
    if not actions:
1✔
1779
        # Log the complaint but that no action was taken
1780
        actions.append(ComplaintAction(user_match="no_recipients"))
1✔
1781
    for action in actions:
1✔
1782
        tags = [
1✔
1783
            generate_tag(key, val)
1784
            for key, val in {
1785
                "complaint_subtype": complaint_data.subtype or "none",
1786
                "complaint_feedback": complaint_data.feedback_type or "none",
1787
                "user_match": action.user_match,
1788
                "relay_action": action.relay_action,
1789
            }.items()
1790
        ]
1791
        incr_if_enabled("email_complaint", tags=tags)
1✔
1792

1793
        log_extra = {
1✔
1794
            "complaint_subtype": complaint_data.subtype or None,
1795
            "complaint_user_agent": complaint_data.user_agent or None,
1796
            "complaint_feedback": complaint_data.feedback_type or None,
1797
        }
1798
        log_extra.update(
1✔
1799
            {
1800
                key: value
1801
                for key, value in action._asdict().items()
1802
                if (value is not None and key != "mask_id")
1803
            }
1804
        )
1805
        info_logger.info("complaint_notification", extra=log_extra)
1✔
1806

1807
    if unknown_count:
1✔
1808
        return HttpResponse("Address does not exist", status=404)
1✔
1809
    return HttpResponse("OK", status=200)
1✔
1810

1811

1812
class RawComplaintData(NamedTuple):
1✔
1813
    complained_recipients: list[tuple[str, dict[str, Any]]]
1✔
1814
    from_addresses: list[str]
1✔
1815
    subtype: str
1✔
1816
    user_agent: str
1✔
1817
    feedback_type: str
1✔
1818

1819

1820
def _get_complaint_data(message_json: AWS_SNSMessageJSON) -> RawComplaintData:
1✔
1821
    """
1822
    Extract complaint data from an AWS SES Complaint Notification.
1823

1824
    This extracts only the data used by _handle_complaint(). It also works on
1825
    complaint events, which have a similar structure and the same data needed
1826
    by _handle_complaint.
1827

1828
    For more information on the complaint notification, see:
1829
    https://docs.aws.amazon.com/ses/latest/dg/notification-contents.html#complaint-object
1830
    """
1831
    complaint = message_json["complaint"]
1✔
1832

1833
    T = TypeVar("T")
1✔
1834

1835
    def get_or_log(
1✔
1836
        key: str, source: dict[str, T], data_type: type[T]
1837
    ) -> tuple[T, bool]:
1838
        """Get a value from a dictionary, or log if not found"""
1839
        if key in source:
1✔
1840
            return source[key], True
1✔
1841
        logger.error(
1✔
1842
            "_get_complaint_data: Unexpected message format",
1843
            extra={"missing_key": key, "found_keys": ",".join(sorted(source.keys()))},
1844
        )
1845
        return data_type(), False
1✔
1846

1847
    raw_recipients, has_cr = get_or_log("complainedRecipients", complaint, list)
1✔
1848
    complained_recipients = []
1✔
1849
    no_entries = True
1✔
1850
    for entry in raw_recipients:
1✔
1851
        no_entries = False
1✔
1852
        raw_email_address, has_email = get_or_log("emailAddress", entry, str)
1✔
1853
        if has_email:
1✔
1854
            email_address = parseaddr(raw_email_address)[1]
1✔
1855
            extra = {
1✔
1856
                key: value for key, value in entry.items() if key != "emailAddress"
1857
            }
1858
            complained_recipients.append((email_address, extra))
1✔
1859
    if has_cr and no_entries:
1✔
1860
        logger.error("_get_complaint_data: Empty complainedRecipients")
1✔
1861

1862
    mail, has_mail = get_or_log("mail", message_json, dict)
1✔
1863
    if has_mail:
1✔
1864
        commonHeaders, has_ch = get_or_log("commonHeaders", mail, dict)
1✔
1865
    else:
1866
        commonHeaders, has_ch = {}, False
1✔
1867
    if has_ch:
1✔
1868
        raw_from_addresses, _ = get_or_log("from", commonHeaders, list)
1✔
1869
    else:
1870
        raw_from_addresses = []
1✔
1871
    from_addresses = [parseaddr(addr)[1] for addr in raw_from_addresses]
1✔
1872

1873
    # Only present when set
1874
    feedback_type = complaint.get("complaintFeedbackType", "")
1✔
1875
    # Only present when destination is on account suppression list
1876
    subtype = complaint.get("complaintSubType", "")
1✔
1877
    # Only present for feedback reports
1878
    user_agent = complaint.get("userAgent", "")
1✔
1879

1880
    return RawComplaintData(
1✔
1881
        complained_recipients, from_addresses, subtype, user_agent, feedback_type
1882
    )
1883

1884

1885
class Complainer(TypedDict):
1✔
1886
    user: User
1✔
1887
    found_in: Literal["complained_recipients", "from_header", "all"]
1✔
1888
    domain: str
1✔
1889
    extra: dict[str, Any] | None
1✔
1890
    masks: list[RelayAddress | DomainAddress]
1✔
1891

1892

1893
def _gather_complainers(
1✔
1894
    complaint_data: RawComplaintData,
1895
) -> tuple[list[Complainer], int]:
1896
    """
1897
    Fetch Relay Users and masks from the complaint data.
1898

1899
    This matches data from an AWS SES Complaint Notification (as extracted by
1900
    _get_complaint_data()) to the Relay database, and returns the Users,
1901
    RelayAddresses, and DomainAddresses, as well as status and extra data.
1902

1903
    If the complaint came from the AWS SES complaint simulator, detect
1904
    developer_mode and move forward with the developer's User data.
1905
    """
1906

1907
    users: dict[int, Complainer] = {}
1✔
1908
    unknown_complainer_count = 0
1✔
1909
    for email_address, extra_data in complaint_data.complained_recipients:
1✔
1910
        local, domain = email_address.split("@", 1)
1✔
1911

1912
        # If the complainer is the AWS SES complaint simulation, assume that
1913
        # it was send by a user with the developer_mode flag. Look for
1914
        # a mask that matches the embedded mask metrics_id, and use
1915
        # the related user's email instead of the AWS simulator address.
1916
        # See docs/developer_mode.md
1917
        if domain == "simulator.amazonses.com" and local.startswith("complaint+"):
1✔
1918
            mask_metrics_id = local.removeprefix("complaint+")
1✔
1919
            mask = _get_mask_by_metrics_id(mask_metrics_id)
1✔
1920
            if mask:
1✔
1921
                email_address = mask.user.email
1✔
1922
                domain = mask.user.email.split("@")[1]
1✔
1923

1924
        try:
1✔
1925
            user = User.objects.get(email=email_address)
1✔
1926
        except User.DoesNotExist:
1✔
1927
            logger.error("_gather_complainers: unknown complainedRecipient")
1✔
1928
            unknown_complainer_count += 1
1✔
1929
            continue
1✔
1930

1931
        if user.id in users:
1✔
1932
            logger.error("_gather_complainers: complainer appears twice")
1✔
1933
            continue
1✔
1934

1935
        users[user.id] = {
1✔
1936
            "user": user,
1937
            "found_in": "complained_recipients",
1938
            "domain": domain,
1939
            "extra": extra_data or None,
1940
            "masks": [],
1941
        }
1942

1943
    # Collect From: addresses and their users
1944
    unknown_sender_count = 0
1✔
1945
    for email_address in complaint_data.from_addresses:
1✔
1946
        mask = _get_address_if_exists(email_address)
1✔
1947
        if not mask:
1✔
1948
            logger.error("_gather_complainers: unknown mask, maybe deleted?")
1✔
1949
            unknown_sender_count += 1
1✔
1950
            continue
1✔
1951

1952
        if mask.user.id not in users:
1✔
1953
            # Add mask-only entry to users
1954
            users[mask.user.id] = {
1✔
1955
                "user": mask.user,
1956
                "found_in": "from_header",
1957
                "domain": mask.user.email.split("@")[1],
1958
                "extra": None,
1959
                "masks": [mask],
1960
            }
1961
            continue
1✔
1962

1963
        user_data = users[mask.user.id]
1✔
1964
        if mask in user_data["masks"]:
1✔
1965
            logger.error("_gather_complainers: mask appears twice")
1✔
1966
            continue
1✔
1967

1968
        user_data["masks"].append(mask)
1✔
1969
        if user_data["found_in"] in ("all", "complained_recipients"):
1✔
1970
            user_data["found_in"] = "all"
1✔
1971
        else:
1972
            logger.error("_gather_complainers: no complainer, multi-mask")
1✔
1973

1974
    return (list(users.values()), unknown_complainer_count + unknown_sender_count)
1✔
1975

1976

1977
def _get_mask_by_metrics_id(metrics_id: str) -> RelayAddress | DomainAddress | None:
1✔
1978
    """Look up a mask by metrics ID, or None if not found."""
1979
    if not metrics_id or metrics_id[0] not in ("R", "D"):
1✔
1980
        return None
1✔
1981
    mask_type_id = metrics_id[0]
1✔
1982
    mask_raw_id = metrics_id[1:]
1✔
1983
    try:
1✔
1984
        mask_id = int(mask_raw_id)
1✔
1985
    except ValueError:
1✔
1986
        return None  # ID is not an int, do not try to match to Relay mask
1✔
1987

1988
    if mask_type_id == "R":
1✔
1989
        try:
1✔
1990
            return RelayAddress.objects.get(id=mask_id)
1✔
1991
        except RelayAddress.DoesNotExist:
1✔
1992
            return None
1✔
1993
    try:
1✔
1994
        return DomainAddress.objects.get(id=mask_id)
1✔
1995
    except DomainAddress.DoesNotExist:
1✔
1996
        return None
1✔
1997

1998

1999
class ComplaintAction(NamedTuple):
1✔
2000
    user_match: Literal["found", "no_recipients"]
1✔
2001
    relay_action: Literal["no_action", "auto_block_spam", "disable_mask"] = "no_action"
1✔
2002
    mask_match: Literal["found", "not_found"] = "not_found"
1✔
2003
    mask_id: str | None = None
1✔
2004
    found_in: Literal["complained_recipients", "from_header", "all"] | None = None
1✔
2005
    fxa_id: str | None = None
1✔
2006
    domain: str | None = None
1✔
2007
    complaint_extra: str | None = None
1✔
2008

2009

2010
def _reduce_future_complaints(complainer: Complainer) -> ComplaintAction:
1✔
2011
    """Take action to reduce future complaints from complaining user."""
2012

2013
    user = complainer["user"]
1✔
2014
    mask_match: Literal["found", "not_found"] = "not_found"
1✔
2015
    relay_action: Literal["no_action", "auto_block_spam", "disable_mask"] = "no_action"
1✔
2016
    mask_id = None
1✔
2017

2018
    if not user.profile.auto_block_spam:
1✔
2019
        relay_action = "auto_block_spam"
1✔
2020
        user.profile.auto_block_spam = True
1✔
2021
        user.profile.save()
1✔
2022

2023
    for mask in complainer["masks"]:
1✔
2024
        mask_match = "found"
1✔
2025
        mask_id = mask.metrics_id
1✔
2026
        if (
1✔
2027
            flag_is_active_in_task("disable_mask_on_complaint", user)
2028
            and mask.enabled
2029
            and relay_action != "auto_block_spam"
2030
        ):
2031
            relay_action = "disable_mask"
1✔
2032
            mask.enabled = False
1✔
2033
            mask.save()
1✔
2034
            _send_disabled_mask_for_spam_email(mask)
1✔
2035

2036
    return ComplaintAction(
1✔
2037
        user_match="found",
2038
        relay_action=relay_action,
2039
        mask_match=mask_match,
2040
        mask_id=mask_id,
2041
        fxa_id=user.profile.metrics_fxa_id,
2042
        domain=complainer["domain"],
2043
        found_in=complainer["found_in"],
2044
        complaint_extra=(
2045
            json.dumps(complainer["extra"]) if complainer["extra"] else None
2046
        ),
2047
    )
2048

2049

2050
_WAFFLE_FLAGS_INITIALIZED = False
1✔
2051

2052

2053
def init_waffle_flags() -> None:
1✔
2054
    """Initialize waffle flags for email tasks"""
2055
    global _WAFFLE_FLAGS_INITIALIZED
2056
    if _WAFFLE_FLAGS_INITIALIZED:
1✔
2057
        return
1✔
2058

2059
    flags: list[tuple[str, str]] = [
1✔
2060
        (
2061
            "disable_mask_on_complaint",
2062
            "MPP-3119: When a Relay user marks an email as spam, disable the mask.",
2063
        ),
2064
        (
2065
            "developer_mode",
2066
            "MPP-3932: Enable logging and overrides for Relay developers.",
2067
        ),
2068
    ]
2069
    waffle_flag_table = get_waffle_flag_model().objects
1✔
2070
    for name, note in flags:
1✔
2071
        waffle_flag_table.get_or_create(name=name, defaults={"note": note})
1✔
2072
    _WAFFLE_FLAGS_INITIALIZED = True
1✔
2073

2074

2075
def ensure_ascii_email(email: str) -> str:
1✔
2076
    """
2077
    Ensure the given email has an ASCII-compatible domain (Punycode).
2078
    If the email has a non-ASCII domain, clean it in the DB and return the clean value.
2079
    """
2080
    if not email or "@" not in email:
1✔
2081
        return email
1✔
2082
    idna_cleaner = IDNAEmailCleaner()
1✔
2083
    if idna_cleaner.has_non_ascii_domain(email):
1✔
2084
        punycode_email = idna_cleaner.punycode_email(email)
1✔
2085
        users = User.objects.filter(email=email)
1✔
2086
        idna_cleaner.clean_users(users)
1✔
2087
        user = User.objects.get(email=punycode_email)
1✔
2088
        if user:
1!
2089
            return user.email
1✔
2090
    return email
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc