• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

smartondev / gwbackupy / 7dfccd0c-e35b-4402-8ef2-fef38193a0b7

pending completion
7dfccd0c-e35b-4402-8ef2-fef38193a0b7

Pull #55

circleci

Márton Somogyi
#24
Pull Request #55: #24 storage add content hash property

38 of 38 new or added lines in 2 files covered. (100.0%)

668 of 1249 relevant lines covered (53.48%)

0.53 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.4
/gwbackupy/gmail.py
1
from __future__ import annotations
1✔
2

3
import collections
1✔
4
import concurrent.futures
1✔
5
import gzip
1✔
6
import json
1✔
7
import logging
1✔
8
import threading
1✔
9
from datetime import datetime, timedelta
1✔
10

11
from gwbackupy import global_properties
1✔
12
from gwbackupy.filters.filter_interface import FilterInterface
1✔
13
from gwbackupy.helpers import (
1✔
14
    decode_base64url,
15
    encode_base64url,
16
    str_trim,
17
    json_load,
18
)
19
from gwbackupy.process_helpers import is_killed, sleep_kc, await_all_futures
1✔
20
from gwbackupy.providers.gmail_service_wrapper_interface import (
1✔
21
    GmailServiceWrapperInterface,
22
)
23
from gwbackupy.storage.storage_interface import (
1✔
24
    StorageInterface,
25
    LinkList,
26
    LinkInterface,
27
    Data,
28
)
29

30

31
class Gmail:
1✔
32
    """Gmail service"""
33

34
    object_id_labels = "--gwbackupy-labels--"
1✔
35
    """Gmail's special object ID for storing labels"""
1✔
36

37
    def __init__(
1✔
38
        self,
39
        email: str,
40
        storage: StorageInterface,
41
        service_wrapper: GmailServiceWrapperInterface,
42
        batch_size: int = 10,
43
        labels: list[str] | None = None,
44
        dry_mode: bool = False,
45
    ):
46
        self.dry_mode = dry_mode
1✔
47
        self.email = email
1✔
48
        self.storage = storage
1✔
49
        if batch_size is None or batch_size < 1:
1✔
50
            batch_size = 5
×
51
        self.batch_size = batch_size
1✔
52
        self.__lock = threading.RLock()
1✔
53
        self.__services = {}
1✔
54
        self.__error_count = 0
1✔
55
        self.__service_wrapper = service_wrapper
1✔
56
        if labels is None:
1✔
57
            labels = []
1✔
58
        self.labels = labels
1✔
59

60
    def __get_local_messages_latest_mutations_only(self):
1✔
61
        pass
×
62

63
    def __get_message_from_server(self, message_id, message_format="raw", email=None):
1✔
64
        if email is None:
1✔
65
            email = self.email
1✔
66
        logging.debug(
1✔
67
            f"{message_id} download from server with format: {message_format}"
68
        )
69
        result = self.__service_wrapper.get_message(email, message_id, message_format)
1✔
70
        logging.debug(f"{message_id} successfully downloaded")
1✔
71
        return result
1✔
72

73
    def __store_message_file(
1✔
74
        self, message_id: str, raw_message: bytes, create_timestamp: float
75
    ):
76
        logging.debug("Store message {id}".format(id=message_id))
1✔
77
        link = self.storage.new_link(
1✔
78
            object_id=message_id, extension="eml.gz", created_timestamp=create_timestamp
79
        ).set_properties({LinkInterface.property_object: True})
80
        result = self.__storage_put(
1✔
81
            link, data=gzip.compress(raw_message, compresslevel=9)
82
        )
83
        if result:
1✔
84
            logging.info(f"{message_id} message is saved")
1✔
85
        else:
86
            raise Exception("Mail message save failed")
×
87

88
    def __backup_messages(
1✔
89
        self, message, stored_messages: dict[str, dict[int, LinkInterface]]
90
    ):
91
        message_id = message.get("id", "UNKNOWN")  # for logging
1✔
92
        try:
1✔
93
            # if message_id != '1853ee437c8ff302':
94
            #     raise Exception('SKIP')
95
            message_id = message["id"]
1✔
96
            latest_meta_link = None
1✔
97
            if message_id in stored_messages:
1✔
98
                latest_meta_link = stored_messages[message_id][0]
×
99
            is_new = latest_meta_link is None
1✔
100
            if is_new:
1✔
101
                logging.debug(f"{message_id} is new")
1✔
102
            # TODO: option for force raw mode
103
            message_format = "raw"
1✔
104
            if not is_new and stored_messages[message_id][1] is not None:
1✔
105
                message_format = "minimal"
×
106
            data = self.__get_message_from_server(message_id, message_format)
1✔
107
            if data is None:
1✔
108
                # (deleted)
109
                logging.info(f"{message_id} is not found on server")
×
110
                return
×
111

112
            subject = str_trim(data.get("snippet", ""), 64)
1✔
113
            if is_new:
1✔
114
                logging.info(f"{message_id} new message, snippet: {subject}")
1✔
115
            else:
116
                logging.debug(f"{message_id} Snippet: {subject}")
×
117

118
            create_timestamp = int(data["internalDate"]) / 1000.0
1✔
119
            if "raw" in data.keys():
1✔
120
                raw = decode_base64url(data.get("raw"))
1✔
121
                self.__store_message_file(message_id, raw, create_timestamp)
1✔
122
                data.pop("raw")
1✔
123

124
            write_meta = True  # if any failure then write it force
1✔
125
            if not is_new:
1✔
126
                logging.log(
×
127
                    global_properties.log_finest,
128
                    f"{message_id} load local version of meta data",
129
                )
130
                try:
×
131
                    with self.storage.get(latest_meta_link) as mf:
×
132
                        d = json.load(mf)
×
133
                        logging.log(
×
134
                            global_properties.log_finest,
135
                            f"{message_id} metadata is loaded from local",
136
                        )
137
                        if d == data:
×
138
                            write_meta = False
×
139
                except BaseException as e:
×
140
                    logging.exception(f"{message_id} metadata load as json failed: {e}")
×
141

142
            if write_meta:
1✔
143
                link = self.storage.new_link(
1✔
144
                    object_id=message_id,
145
                    extension="json",
146
                    created_timestamp=create_timestamp,
147
                ).set_properties({LinkInterface.property_metadata: True})
148
                success = self.__storage_put(link, data=json.dumps(data))
1✔
149
                if success:
1✔
150
                    logging.info(f"{message_id} meta data is saved")
1✔
151
                else:
152
                    raise Exception("Meta data put failed")
×
153
            else:
154
                logging.debug(f"{message_id} meta data is not changed, skip put")
×
155
            if message_id in stored_messages:
1✔
156
                del stored_messages[message_id]
×
157
        except Exception as e:
×
158
            with self.__lock:
×
159
                self.__error_count += 1
×
160
            if str(e) == "SKIP":
×
161
                return
×
162
            logging.exception(f"{message_id} {e}")
×
163

164
    def __get_all_messages_from_server(
1✔
165
        self, email: str | None = None, q: str | None = "label:all"
166
    ):
167
        if email is None:
1✔
168
            email = self.email
1✔
169
        logging.info("Get all message ids from server...")
1✔
170
        messages = self.__service_wrapper.get_messages(email, q)
1✔
171
        logging.info(f"Message(s) count: {len(messages)}")
1✔
172
        # print(count)
173
        # print(messages)
174
        return messages
1✔
175

176
    def __get_labels_from_server(self, email=None) -> list[dict[str, any]]:
1✔
177
        if email is None:
1✔
178
            email = self.email
1✔
179
        logging.info(f"Getting labels from server ({email})")
1✔
180
        return self.__service_wrapper.get_labels(email)
1✔
181

182
    def __backup_labels(self, link: LinkInterface | None) -> bool:
1✔
183
        logging.info("Backing up labels...")
1✔
184
        labels = self.__get_labels_from_server()
1✔
185
        if link is not None:
1✔
186
            logging.debug("labels is exists, checking for changes")
×
187
            try:
×
188
                with self.storage.get(link) as f:
×
189
                    d = json.load(f)
×
190
                    if d == labels:
×
191
                        logging.info("Labels is not changed, not saving it")
×
192
                        return True
×
193
                    else:
194
                        logging.debug("labels is changed")
×
195
            except BaseException as e:
×
196
                logging.exception(f"labels loading or parsing failed: {e}")
×
197
        link = self.storage.new_link(
1✔
198
            object_id=Gmail.object_id_labels, extension="json", created_timestamp=None
199
        ).set_properties({LinkInterface.property_metadata: True})
200
        result = self.__storage_put(link, data=json.dumps(labels))
1✔
201
        if not result:
1✔
202
            logging.error("Error while storing labels")
×
203
            return False
×
204
        logging.info("Backing up labels successfully")
1✔
205
        return True
1✔
206

207
    def __storage_remove(self, link: LinkInterface) -> bool:
1✔
208
        if self.dry_mode:
×
209
            logging.info(f"DRY MODE storage remove: {link}")
×
210
            return True
×
211
        return self.storage.remove(link)
×
212

213
    def __storage_put(self, link: LinkInterface, data: Data) -> bool:
1✔
214
        if self.dry_mode:
1✔
215
            logging.info(f"DRY MODE storage put: {link}")
×
216
            return True
×
217
        return self.storage.put(link, data)
1✔
218

219
    def backup(self, quick_sync_days: int | None = None) -> bool:
1✔
220
        logging.info(f"Starting backup for {self.email}")
1✔
221
        self.__error_count = 0
1✔
222

223
        logging.info("Scanning backup storage...")
1✔
224
        stored_data_all = self.storage.find()
1✔
225
        logging.info(f"Stored items: {len(stored_data_all)}")
1✔
226
        self.__service_wrapper.get_service_provider().storage_links(stored_data_all)
1✔
227

228
        labels_link = stored_data_all.find(
1✔
229
            f=lambda l: l.id() == Gmail.object_id_labels and l.is_metadata
230
        )
231
        if not self.__backup_labels(labels_link):
1✔
232
            logging.error("Backup finished with storing labels failed")
×
233
            return False
×
234
        if quick_sync_days is not None and quick_sync_days < 1:
1✔
235
            quick_sync_days = None
×
236
        if quick_sync_days is not None:
1✔
237
            logging.info(f"Quick syncing, going back {quick_sync_days} days")
×
238

239
        stored_messages: dict[str, dict[int, LinkInterface]] = stored_data_all.find(
1✔
240
            f=lambda l: not l.is_special_id() and (l.is_metadata() or l.is_object()),
241
            g=lambda l: [l.id(), 0 if l.is_metadata() else 1],
242
        )
243
        del stored_data_all
1✔
244
        for message_id in list(stored_messages.keys()):
1✔
245
            link_metadata = stored_messages[message_id].get(0)
×
246
            if link_metadata is None:
×
247
                logging.error(f"{message_id} metadata is not found in locally")
×
248
                del stored_messages[message_id]
×
249
            elif link_metadata.is_deleted():
×
250
                logging.debug(f"{message_id} metadata is already deleted")
×
251
                del stored_messages[message_id]
×
252
            else:
253
                logging.log(
×
254
                    global_properties.log_finest,
255
                    f"{message_id} is usable from backup storage",
256
                )
257
        logging.info(f"Stored messages: {len(stored_messages)}")
1✔
258

259
        q = "label:all"
1✔
260
        if quick_sync_days is not None:
1✔
261
            date = datetime.now() - timedelta(days=quick_sync_days)
×
262
            q = f"label:all after:{date.strftime('%Y/%m/%d')}"
×
263
        messages_from_server = self.__get_all_messages_from_server(q=q)
1✔
264
        logging.info("Processing...")
1✔
265
        executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.batch_size)
1✔
266
        futures = []
1✔
267
        # submit message download jobs
268
        for message_id in messages_from_server:
1✔
269
            futures.append(
1✔
270
                executor.submit(
271
                    self.__backup_messages,
272
                    messages_from_server[message_id],
273
                    stored_messages,
274
                )
275
            )
276
        # wait for jobs
277
        if not await_all_futures(futures):
1✔
278
            # cancel jobs
279
            executor.shutdown(cancel_futures=True)
×
280
            logging.warning("Process is killed")
×
281
            return False
×
282
        logging.info("Processed")
1✔
283

284
        if self.__error_count > 0:
1✔
285
            # if error then never delete!
286
            logging.error("Backup failed with " + str(self.__error_count) + " errors")
×
287
            return False
×
288

289
        if quick_sync_days is None:
1✔
290
            logging.info("Mark as deletes...")
1✔
291
            for message_id in stored_messages:
1✔
292
                if is_killed():
×
293
                    logging.warning("Process is killed")
×
294
                    return False
×
295

296
                links = stored_messages[message_id]
×
297
                logging.debug(f"{message_id} mark as deleted in local storage...")
×
298
                meta_link = links.get(0)
×
299
                if meta_link is None:
×
300
                    continue
×
301
                logging.debug(f"{message_id} - {meta_link}")
×
302
                if self.__storage_remove(meta_link):
×
303
                    logging.debug(f"{message_id} metadata mark as deleted successfully")
×
304
                    message_link = links.get(1)
×
305
                    if message_link is None:
×
306
                        logging.info(f"{message_id} marked as deleted")
×
307
                    else:
308
                        if self.__storage_remove(message_link):
×
309
                            logging.debug(
×
310
                                f"{message_id} object mark as deleted successfully"
311
                            )
312
                            logging.info(f"{message_id} marked as deleted")
×
313
                        else:
314
                            logging.error(f"{message_id} object mark as deleted fail")
×
315
                else:
316
                    logging.error(f"{message_id} mark as deleted failed")
×
317
            logging.info("Mark as deleted: complete")
1✔
318
        else:
319
            logging.info("Quick syncing mode, skip deletion for locale storage")
×
320
        logging.info(f"Backup finished for {self.email}")
1✔
321
        return True
1✔
322

323
    def __create_label_server(self, label_name, email) -> dict:
1✔
324
        if email is None:
×
325
            email = self.email
×
326
        logging.info(f"Restoring label if not exists: {label_name}")
×
327
        if self.dry_mode:
×
328
            logging.info("DRY MODE: create label if not exists")
×
329
            return {
×
330
                "id": f"Label_DRY{datetime.utcnow().timestamp():1.3f}",
331
                "type": "user",
332
                "name": label_name,
333
            }
334
        return self.__service_wrapper.create_label(email=email, name=label_name)
×
335

336
    def __get_restore_label_ids(
1✔
337
        self,
338
        to_email: str,
339
        labels_from_storage: dict[str, dict[str, any]],
340
        labels_form_server: dict[str, dict[str, any]],
341
        add_labels: [str],
342
        label_ids_from_message: [str],
343
    ) -> [str]:
344
        if self.email != to_email:
×
345
            raise NotImplementedError("Not implemented for different emails")
×
346
        with self.__lock:
×
347
            result = []
×
348
            for message_label_id in label_ids_from_message:
×
349
                if message_label_id == "CHAT":
×
350
                    # CHAT tag cannot be restoring
351
                    continue
×
352
                if message_label_id in labels_form_server:
×
353
                    server_data = labels_form_server[message_label_id]
×
354
                    if server_data["type"] == "system" or server_data["type"] == "user":
×
355
                        # user and system tag allow directly if exists on server
356
                        result.append(server_data["id"])
×
357
                        continue
×
358
                    raise NotImplementedError(
×
359
                        f'Not implemented tag type: {server_data["type"]}'
360
                    )
361
                # not exists on server
362
                if message_label_id not in labels_from_storage:
×
363
                    logging.warning(
×
364
                        f"Label with {message_label_id} ID is cannot be restored because no data can be found."
365
                    )
366
                    continue
×
367
                label_data = labels_from_storage[message_label_id]
×
368
                created_label_data = self.__create_label_server(
×
369
                    label_data["name"], to_email
370
                )
371
                if created_label_data is None:
×
372
                    raise Exception(f"Label is created already? A ({label_data})")
×
373
                # label data stored on original label ID!
374
                labels_form_server[message_label_id] = created_label_data
×
375
                result.append(created_label_data["id"])
×
376
            for add_label in add_labels:
×
377
                found = False
×
378
                for key in labels_form_server:
×
379
                    server_data = labels_form_server[key]
×
380
                    if server_data["name"] == add_label:
×
381
                        result.append(server_data["id"])
×
382
                        found = True
×
383
                        break
×
384
                if found:
×
385
                    continue
×
386
                # create label...
387
                created_label_data = self.__create_label_server(add_label, to_email)
×
388
                if created_label_data is None:
×
389
                    raise Exception(f"Label is created already? B ({label_data})")
×
390
                # label data stored on original label ID!
391
                labels_form_server[created_label_data["id"]] = created_label_data
×
392
                result.append(created_label_data["id"])
×
393

394
            return result
×
395

396
    def __restore_message(
1✔
397
        self,
398
        restore_message_id: str,
399
        link: dict[int, LinkInterface],
400
        to_email: str,
401
        labels_from_storage: dict[str, dict[str, any]],
402
        labels_form_server: dict[str, dict[str, any]],
403
        add_labels: [str],
404
    ):
405
        try:
×
406
            logging.info(f"{restore_message_id} Restoring message...")
×
407
            if 0 not in link or 1 not in link:
×
408
                raise Exception("Metadata and/or message link not found in storage")
×
409
            with self.storage.get(link[0]) as mf:
×
410
                meta = json.load(mf)
×
411
            logging.debug(f"{restore_message_id} {meta}")
×
412
            with self.storage.get(link[1]) as mf:
×
413
                message_content = gzip.decompress(mf.read())
×
414
            label_ids_from_message: [str] = meta["labelIds"]
×
415
            try:
×
416
                label_ids_from_message.index("CHAT")
×
417
                # not restorable as message
418
                logging.info(
×
419
                    f"{restore_message_id} message with CHAT label is not supported."
420
                )
421
                return
×
422
            except ValueError:
×
423
                pass
×
424
            label_ids = self.__get_restore_label_ids(
×
425
                to_email=to_email,
426
                labels_from_storage=labels_from_storage,
427
                labels_form_server=labels_form_server,
428
                add_labels=add_labels,
429
                label_ids_from_message=label_ids_from_message,
430
            )
431
            label_names = []
×
432
            for label_id in meta["labelIds"]:
×
433
                label_names.append(labels_form_server[label_id]["name"])
×
434

435
            logging.info(
×
436
                f"{restore_message_id} snippet: {str_trim(meta['snippet'], 80)} / labels: {', '.join(label_names)}"
437
            )
438

439
            message_data = {
×
440
                "labelIds": label_ids,
441
                "raw": encode_base64url(message_content),
442
            }
443
            subject = str_trim(meta.get("snippet", ""), 64)
×
444
            if self.dry_mode:
×
445
                logging.info(f"DRY MODE {restore_message_id} message insert")
×
446
            result = self.__service_wrapper.insert_message(to_email, message_data)
×
447
            logging.debug(f"Message uploaded {result}")
×
448
            logging.info(
×
449
                f'{restore_message_id}->{result.get("id")} Message uploaded ({subject})'
450
            )
451
            return result
×
452
        except BaseException as e:
×
453
            with self.__lock:
×
454
                self.__error_count += 1
×
455
            logging.exception(f"{restore_message_id} Exception: {e}")
×
456

457
    def __load_labels_from_storage(
1✔
458
        self, links: LinkList[LinkInterface]
459
    ) -> dict[str, dict[str, any]] | None:
460
        logging.info(f"Loading labels...")
×
461
        labels_links: dict[str, LinkInterface] = links.find(
×
462
            f=lambda l: l.id() == Gmail.object_id_labels and l.is_metadata,
463
            g=lambda l: [l.mutation()],
464
        )
465
        labels_links = collections.OrderedDict(sorted(labels_links.items()))
×
466
        result = {}
×
467
        for key in labels_links:
×
468
            labels_link = labels_links[key]
×
469
            with self.storage.get(labels_link) as lf:
×
470
                json_data: list[dict[str, any]] = json_load(lf)
×
471
                if json_data is None:
×
472
                    logging.error(f"Stored labels read from JSON failed")
×
473
                    return None
×
474
                for item in json_data:
×
475
                    if item.get("id") is None:
×
476
                        logging.error(f"Invalid structure of stored labels")
×
477
                        return None
×
478
                    result[item.get("id")] = item
×
479
        logging.info(f"Labels loaded successfully ({len(result)})")
×
480
        return result
×
481

482
    def restore(
1✔
483
        self,
484
        item_filter: FilterInterface,
485
        to_email: str | None = None,
486
        restore_deleted: bool = False,
487
        add_labels: list[str] | None = None,
488
        restore_missing: bool = False,
489
    ):
490
        self.__error_count = 0
×
491
        if to_email is None:
×
492
            to_email = self.email
×
493

494
        if not restore_deleted and not restore_missing:
×
495
            logging.warning("Tasks not found, see more e.g. --restore-deleted")
×
496
            return True
×
497

498
        logging.info("Scanning backup storage...")
×
499
        stored_data_all = self.storage.find()
×
500
        logging.info(f"Stored items: {len(stored_data_all)}")
×
501
        self.__service_wrapper.get_service_provider().storage_links(stored_data_all)
×
502

503
        latest_labels_from_storage = self.__load_labels_from_storage(stored_data_all)
×
504
        if latest_labels_from_storage is None:
×
505
            logging.error("Stored labels loading failed")
×
506
            return False
×
507
        _labels_from_server = self.__get_labels_from_server()
×
508
        if _labels_from_server is None:
×
509
            logging.error("Loading labels from server failed")
×
510
            return False
×
511
        labels_from_server = {}
×
512
        for label in _labels_from_server:
×
513
            if "id" not in label:
×
514
                raise Exception("Not supported label structure")
×
515
            labels_from_server[label.get("id")] = label
×
516
        del _labels_from_server
×
517
        messages_from_server_dest_email = {}
×
518
        if self.email == to_email:
×
519
            messages_from_server_dest_email = self.__get_all_messages_from_server()
×
520

521
        logging.info("Filtering messages...")
×
522
        stored_messages: dict[str, dict[int, LinkInterface]] = stored_data_all.find(
×
523
            f=lambda l: not l.is_special_id()
524
            and (l.is_metadata() or l.is_object())
525
            and item_filter.match(
526
                {
527
                    "message-id": l.id(),
528
                    "link": l,
529
                    "server-data": messages_from_server_dest_email,
530
                }
531
            ),
532
            g=lambda l: [l.id(), 0 if l.is_metadata() else 1],
533
        )
534
        del stored_data_all
×
535
        for message_id in list(stored_messages.keys()):
×
536
            if (
×
537
                stored_messages[message_id].get(0) is None
538
                or stored_messages[message_id].get(1) is None
539
            ):
540
                # no metadata or no object
541
                del stored_messages[message_id]
×
542

543
        logging.info(f"Number of potentially affected messages: {len(stored_messages)}")
×
544
        logging.info("Upload messages...")
×
545
        executor = concurrent.futures.ThreadPoolExecutor(max_workers=self.batch_size)
×
546
        futures = []
×
547
        for message_id in stored_messages:
×
548
            futures.append(
×
549
                executor.submit(
550
                    self.__restore_message,
551
                    message_id,
552
                    stored_messages[message_id],
553
                    to_email,
554
                    latest_labels_from_storage,
555
                    labels_from_server,
556
                    add_labels,
557
                )
558
            )
559
        if not await_all_futures(futures):
×
560
            logging.warning("Process killed")
×
561
            return False
×
562

563
        if self.__error_count > 0:
×
564
            logging.error(f"Messages uploaded with {self.__error_count} errors")
×
565
            return False
×
566
        logging.info("Messages uploaded successfully")
×
567

568
        return True
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc