• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

popstas / google-drive-access / 19720831600

27 Nov 2025 12:01AM UTC coverage: 62.208% (+6.6%) from 55.632%
19720831600

push

github

web-flow
refactor: extract server helpers into modules (#15)

134 of 313 new or added lines in 7 files covered. (42.81%)

1 existing line in 1 file now uncovered.

1037 of 1667 relevant lines covered (62.21%)

0.62 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

23.89
/src/drive_audit/http_handler.py
1
"""HTTP handler for managing Google Drive access."""
2

3
import ast
1✔
4
import json
1✔
5
import logging
1✔
6
import re
1✔
7
from http.server import BaseHTTPRequestHandler
1✔
8
from typing import Any, Dict, List, Union
1✔
9
from urllib.parse import parse_qs, urlparse
1✔
10

11
from .google_client import (
1✔
12
    add_user_permission,
13
    create_folder,
14
    ensure_public_subdir,
15
    find_child_folder,
16
    get_file_permissions,
17
)
18
from .model import DriveConfig, HttpConfig
1✔
19
from .planfix_client import PlanfixClient
1✔
20
from .translations import translate
1✔
21

22
logger = logging.getLogger(__name__)
1✔
23

24

25
class LocalizedError(Exception):
1✔
26
    def __init__(self, key: str, **context: Any) -> None:
1✔
27
        super().__init__(key)
1✔
28
        self.key = key
1✔
29
        self.context = context
1✔
30

31

32
def extract_folder_id(folder_url: str) -> str:
1✔
33
    parsed = urlparse(folder_url)
1✔
34
    query_params = parse_qs(parsed.query)
1✔
35
    if "id" in query_params and query_params["id"]:
1✔
36
        return query_params["id"][0]
1✔
37

38
    match = re.search(r"/folders/([A-Za-z0-9_-]+)", parsed.path)
1✔
39
    if match:
1✔
40
        return match.group(1)
1✔
41

42
    raise LocalizedError("unable_extract_folder_id")
1✔
43

44

45
def parse_assignee_ids(assignee_id: Any) -> List[str]:
1✔
46
    """
47
    Parse assignee_id which can be:
48
    - A single value (string or number)
49
    - A Python-style array string like "['7', '1']"
50
    - A JSON array string like '["7", "1"]'
51
    - An actual JSON array
52
    """
53
    if isinstance(assignee_id, list):
1✔
54
        return [str(item) for item in assignee_id]
1✔
55

56
    if isinstance(assignee_id, (int, float)):
1✔
57
        return [str(assignee_id)]
1✔
58

59
    assignee_id_str = str(assignee_id).strip()
1✔
60

61
    if assignee_id_str.startswith("[") or (
1✔
62
        assignee_id_str.startswith("'") and "[" in assignee_id_str
63
    ):
64
        try:
1✔
65
            parsed = ast.literal_eval(assignee_id_str)
1✔
66
            if isinstance(parsed, list):
1✔
67
                return [str(item) for item in parsed]
1✔
NEW
68
        except (ValueError, SyntaxError):
×
NEW
69
            pass
×
70

NEW
71
        try:
×
NEW
72
            parsed = json.loads(assignee_id_str)
×
NEW
73
            if isinstance(parsed, list):
×
NEW
74
                return [str(item) for item in parsed]
×
NEW
75
        except json.JSONDecodeError:
×
NEW
76
            pass
×
77

78
    return [assignee_id_str]
1✔
79

80

81
def normalize_assignee_ids(assignee_ids: List[Union[str, Dict[str, Any]]]) -> List[str]:
1✔
82
    normalized_ids: List[str] = []
1✔
83
    for assignee_id in assignee_ids:
1✔
84
        if isinstance(assignee_id, dict):
1✔
85
            assignee_str = str(assignee_id.get("id", "")).strip()
1✔
86
        else:
87
            assignee_str = str(assignee_id).strip()
1✔
88

89
        if not assignee_str:
1✔
90
            continue
1✔
91

92
        if assignee_str.startswith("user:"):
1✔
93
            assignee_str = assignee_str.split(":", 1)[1]
1✔
94
        normalized_ids.append(assignee_str)
1✔
95
    return normalized_ids
1✔
96

97

98
def collect_google_accounts(
1✔
99
    planfix_client: PlanfixClient, assignee_ids: List[str]
100
) -> List[str]:
NEW
101
    google_accounts: List[str] = []
×
NEW
102
    seen_accounts = set()
×
NEW
103
    for assignee_id in assignee_ids:
×
NEW
104
        manager = planfix_client.get_manager(assignee_id)
×
NEW
105
        google_account = manager.get("google_account")
×
NEW
106
        if google_account and google_account not in seen_accounts:
×
NEW
107
            seen_accounts.add(google_account)
×
NEW
108
            google_accounts.append(google_account)
×
NEW
109
            logger.debug(
×
110
                "Collected google account %s for assignee %s",
111
                google_account,
112
                assignee_id,
113
            )
NEW
114
    return google_accounts
×
115

116

117
def set_permissions(
1✔
118
    service, folder_id: str, google_accounts: List[str], role: str
119
) -> List[Dict[str, Any]]:
NEW
120
    results: List[Dict[str, Any]] = []
×
NEW
121
    for account in google_accounts:
×
NEW
122
        permission = add_user_permission(service, folder_id, account, role)
×
NEW
123
        results.append(
×
124
            {
125
                "email": account,
126
                "permission_id": permission.get("id"),
127
                "role": role,
128
            }
129
        )
NEW
130
    return results
×
131

132

133
def collect_existing_user_accounts(service, folder_id: str) -> List[str]:
1✔
NEW
134
    permissions = get_file_permissions(service, folder_id)
×
NEW
135
    return [
×
136
        permission["emailAddress"]
137
        for permission in permissions
138
        if permission.get("type") == "user" and permission.get("emailAddress")
139
    ]
140

141

142
def create_handler(
1✔
143
    planfix_client: PlanfixClient,
144
    service,
145
    http_config: HttpConfig,
146
    drive_config: DriveConfig,
147
    role: str,
148
):
NEW
149
    language = http_config.lang
×
150

NEW
151
    class AccessHandler(BaseHTTPRequestHandler):
×
NEW
152
        def log_message(self, format: str, *args: Any) -> None:  # noqa: A003
×
NEW
153
            logger.info("%s - %s", self.address_string(), format % args)
×
154

NEW
155
        def _log_request(self, payload: Dict[str, Any]) -> None:
×
NEW
156
            logger.info(
×
157
                "%s request: %s", self.path, json.dumps(payload, ensure_ascii=False)
158
            )
159

NEW
160
        def _translate(self, key: str, **context: Any) -> str:
×
NEW
161
            return translate(language, key, **context)
×
162

NEW
163
        def _format_accounts(self, accounts: List[str]) -> str:
×
NEW
164
            return ", ".join(accounts) if accounts else self._translate("none")
×
165

NEW
166
        def _send_json(self, status_code: int, payload: Dict[str, Any]) -> None:
×
NEW
167
            logger.info(
×
168
                "%s answer: %s", self.path, json.dumps(payload, ensure_ascii=False)
169
            )
NEW
170
            response = json.dumps(payload).encode("utf-8")
×
NEW
171
            self.send_response(status_code)
×
NEW
172
            self.send_header("Content-Type", "application/json")
×
NEW
173
            self.send_header("Content-Length", str(len(response)))
×
NEW
174
            self.end_headers()
×
NEW
175
            self.wfile.write(response)
×
176

NEW
177
        def _authenticate(self) -> bool:
×
NEW
178
            auth_header = self.headers.get("Authorization", "")
×
NEW
179
            if not auth_header.startswith("Bearer "):
×
NEW
180
                self._send_json(
×
181
                    200, {"answer": self._translate("missing_or_invalid_auth_header")}
182
                )
NEW
183
                return False
×
184

NEW
185
            token = auth_header.split(" ", 1)[1]
×
NEW
186
            if token != http_config.token:
×
NEW
187
                self._send_json(200, {"answer": self._translate("invalid_token")})
×
NEW
188
                return False
×
NEW
189
            return True
×
190

NEW
191
        def _parse_body(self) -> Dict[str, Any]:
×
NEW
192
            content_length = int(self.headers.get("Content-Length", "0"))
×
NEW
193
            if content_length == 0:
×
NEW
194
                raise LocalizedError("request_body_required")
×
NEW
195
            body = self.rfile.read(content_length)
×
NEW
196
            try:
×
NEW
197
                return json.loads(body)
×
NEW
198
            except json.JSONDecodeError as exc:
×
NEW
199
                raise LocalizedError("invalid_json", detail=exc.msg) from exc
×
200

NEW
201
        def _grant_access(
×
202
            self, task_id: int, initial_assignee_ids: List[str], folder_id: str
203
        ) -> Dict[str, List[str]]:
NEW
204
            if drive_config.public_subdir:
×
NEW
205
                ensure_public_subdir(
×
206
                    service,
207
                    folder_id,
208
                    drive_config.public_subdir,
209
                    drive_config.drive_id,
210
                )
211

NEW
212
            tasks = planfix_client.get_child_tasks(task_id)
×
NEW
213
            assignee_ids = PlanfixClient.collect_assignee_ids(
×
214
                tasks, initial_assignee_ids
215
            )
NEW
216
            google_accounts = collect_google_accounts(
×
217
                planfix_client, sorted(assignee_ids)
218
            )
NEW
219
            existing_accounts = collect_existing_user_accounts(service, folder_id)
×
NEW
220
            existing_accounts_set = set(existing_accounts)
×
221

NEW
222
            new_accounts = [
×
223
                account
224
                for account in google_accounts
225
                if account not in existing_accounts_set
226
            ]
227

NEW
228
            if new_accounts:
×
NEW
229
                set_permissions(service, folder_id, new_accounts, role)
×
230

NEW
231
            return {
×
232
                "granted_accounts": new_accounts,
233
                "existing_accounts": [
234
                    account
235
                    for account in google_accounts
236
                    if account in existing_accounts_set
237
                ],
238
            }
239

NEW
240
        def _handle_set_client_folder_access(self, payload: Dict[str, Any]) -> None:
×
NEW
241
            required_fields = ["contact_id", "folder_url"]
×
NEW
242
            missing_fields = [
×
243
                field for field in required_fields if field not in payload
244
            ]
NEW
245
            if missing_fields:
×
NEW
246
                self._send_json(
×
247
                    200,
248
                    {
249
                        "answer": self._translate(
250
                            "missing_fields", fields=", ".join(missing_fields)
251
                        )
252
                    },
253
                )
NEW
254
                return
×
255

NEW
256
            try:
×
NEW
257
                contact_id = int(payload["contact_id"])
×
NEW
258
                folder_id = extract_folder_id(str(payload["folder_url"]))
×
259

NEW
260
                has_task_id = "task_id" in payload
×
NEW
261
                has_assignee_id = "assignee_id" in payload
×
262

NEW
263
                if has_task_id and has_assignee_id:
×
NEW
264
                    task_id = int(payload["task_id"])
×
NEW
265
                    initial_assignee_ids = normalize_assignee_ids(
×
266
                        parse_assignee_ids(payload["assignee_id"])
267
                    )
NEW
268
                elif not has_task_id and not has_assignee_id:
×
NEW
269
                    client_task = planfix_client.get_client_task(contact_id)
×
NEW
270
                    if not client_task.get("found"):
×
NEW
271
                        self._send_json(
×
272
                            200, {"answer": self._translate("client_task_not_found")}
273
                        )
NEW
274
                        return
×
275

NEW
276
                    task_id = int(client_task.get("taskId"))
×
NEW
277
                    assignees = client_task.get("assignees", {}).get("users", [])
×
NEW
278
                    initial_assignee_ids = normalize_assignee_ids(assignees)
×
279
                else:
NEW
280
                    self._send_json(
×
281
                        200,
282
                        {"answer": self._translate("task_and_assignee_together")},
283
                    )
NEW
284
                    return
×
285

NEW
286
                access_report = self._grant_access(
×
287
                    task_id, initial_assignee_ids, folder_id
288
                )
NEW
289
            except LocalizedError as exc:
×
NEW
290
                self._send_json(
×
291
                    200, {"answer": self._translate(exc.key, **exc.context)}
292
                )
NEW
293
                return
×
NEW
294
            except Exception as exc:  # pylint: disable=broad-except
×
NEW
295
                logger.exception("Failed to process request: %s", exc)
×
NEW
296
                self._send_json(
×
297
                    200, {"answer": self._translate("internal_server_error")}
298
                )
NEW
299
                return
×
300

NEW
301
            granted_accounts = access_report["granted_accounts"]
×
NEW
302
            existing_accounts = access_report["existing_accounts"]
×
NEW
303
            answer = self._translate(
×
304
                "granted_existing",
305
                granted=self._format_accounts(granted_accounts),
306
                existing=self._format_accounts(existing_accounts),
307
            )
NEW
308
            self._send_json(
×
309
                200,
310
                {
311
                    "answer": answer,
312
                    "granted_accounts": granted_accounts,
313
                    "existing_accounts": existing_accounts,
314
                },
315
            )
316

NEW
317
        def _handle_create_client_folder(self, payload: Dict[str, Any]) -> None:
×
NEW
318
            required_fields = ["contact_id", "folder_name"]
×
NEW
319
            missing_fields = [
×
320
                field for field in required_fields if field not in payload
321
            ]
NEW
322
            if missing_fields:
×
NEW
323
                self._send_json(
×
324
                    200,
325
                    {
326
                        "answer": self._translate(
327
                            "missing_fields", fields=", ".join(missing_fields)
328
                        )
329
                    },
330
                )
NEW
331
                return
×
332

NEW
333
            try:
×
NEW
334
                contact_id = int(payload["contact_id"])
×
NEW
335
                folder_name = str(payload["folder_name"]).strip()
×
NEW
336
                if not folder_name:
×
NEW
337
                    raise LocalizedError("folder_name_empty")
×
338

NEW
339
                client_task = planfix_client.get_client_task(contact_id)
×
NEW
340
                if not client_task.get("found"):
×
NEW
341
                    self._send_json(
×
342
                        200, {"answer": self._translate("client_task_not_found")}
343
                    )
NEW
344
                    return
×
345

NEW
346
                task_id = int(client_task.get("taskId"))
×
NEW
347
                assignees = client_task.get("assignees", {}).get("users", [])
×
NEW
348
                initial_assignee_ids = normalize_assignee_ids(assignees)
×
349

NEW
350
                existing_folder = find_child_folder(
×
351
                    service,
352
                    drive_config.root_folder_id,
353
                    folder_name,
354
                    drive_config.drive_id,
355
                )
NEW
356
                if existing_folder:
×
NEW
357
                    folder_url = f"https://drive.google.com/drive/folders/{existing_folder['id']}"
×
NEW
358
                    self._send_json(
×
359
                        200,
360
                        {
361
                            "answer": self._translate(
362
                                "client_folder_exists", folder_url=folder_url
363
                            )
364
                        },
365
                    )
NEW
366
                    return
×
367

NEW
368
                folder = create_folder(
×
369
                    service,
370
                    drive_config.root_folder_id,
371
                    folder_name,
372
                    drive_config.drive_id,
373
                )
NEW
374
                access_report = self._grant_access(
×
375
                    task_id, initial_assignee_ids, folder["id"]
376
                )
NEW
377
            except LocalizedError as exc:
×
NEW
378
                self._send_json(
×
379
                    200, {"answer": self._translate(exc.key, **exc.context)}
380
                )
NEW
381
                return
×
NEW
382
            except Exception as exc:  # pylint: disable=broad-except
×
NEW
383
                logger.exception("Failed to process request: %s", exc)
×
NEW
384
                self._send_json(
×
385
                    200, {"answer": self._translate("internal_server_error")}
386
                )
NEW
387
                return
×
388

NEW
389
            granted_accounts = access_report["granted_accounts"]
×
NEW
390
            existing_accounts = access_report["existing_accounts"]
×
NEW
391
            answer = self._translate(
×
392
                "granted_existing",
393
                granted=self._format_accounts(granted_accounts),
394
                existing=self._format_accounts(existing_accounts),
395
            )
NEW
396
            folder_url = f"https://drive.google.com/drive/folders/{folder['id']}"
×
NEW
397
            self._send_json(
×
398
                200,
399
                {
400
                    "answer": self._translate(
401
                        "folder_created",
402
                        folder_name=folder_name,
403
                        details=answer,
404
                        folder_url=folder_url,
405
                    ),
406
                    "folder_id": folder["id"],
407
                    "folder_url": folder_url,
408
                    "granted_accounts": granted_accounts,
409
                    "existing_accounts": existing_accounts,
410
                },
411
            )
412

NEW
413
        def do_POST(self) -> None:  # noqa: N802
×
NEW
414
            if self.path == "/set_client_folder_access":
×
NEW
415
                if not self._authenticate():
×
NEW
416
                    return
×
417

NEW
418
                try:
×
NEW
419
                    payload = self._parse_body()
×
NEW
420
                except LocalizedError as exc:
×
NEW
421
                    self._send_json(
×
422
                        200, {"answer": self._translate(exc.key, **exc.context)}
423
                    )
NEW
424
                    return
×
425

NEW
426
                self._log_request(payload)
×
NEW
427
                self._handle_set_client_folder_access(payload)
×
NEW
428
                return
×
429

NEW
430
            if self.path == "/create_client_folder":
×
NEW
431
                if not self._authenticate():
×
NEW
432
                    return
×
433

NEW
434
                try:
×
NEW
435
                    payload = self._parse_body()
×
NEW
436
                except LocalizedError as exc:
×
NEW
437
                    self._send_json(
×
438
                        200, {"answer": self._translate(exc.key, **exc.context)}
439
                    )
NEW
440
                    return
×
441

NEW
442
                self._log_request(payload)
×
NEW
443
                self._handle_create_client_folder(payload)
×
NEW
444
                return
×
445

NEW
446
            self._send_json(200, {"answer": self._translate("not_found")})
×
447

NEW
448
    return AccessHandler
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc