• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

my8100 / logparser / 1006

01 Jan 2025 01:45PM UTC coverage: 80.811% (-6.6%) from 87.405%
1006

push

circleci

web-flow
Release v0.8.3 and support Python 3.13 (#30)

1 of 1 new or added line in 1 file covered. (100.0%)

57 existing lines in 4 files now uncovered.

737 of 912 relevant lines covered (80.81%)

0.81 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

95.35
/logparser/logparser.py
1
# coding: utf-8
2
from collections import OrderedDict
1✔
3
from datetime import datetime
1✔
4
import glob
1✔
5
import io
1✔
6
import json
1✔
7
import logging
1✔
8
import os
1✔
9
import re
1✔
10
import sys
1✔
11
import time
1✔
12
import traceback
1✔
13

14
try:
1✔
15
    from psutil import pid_exists
1✔
16
except ImportError:
1✔
17
    pid_exists = None
1✔
18

19
from .__version__ import __version__
1✔
20
from .common import Common
1✔
21
from .scrapylogparser import parse
1✔
22
from .telnet import MyTelnet
1✔
23
from .utils import custom_settings, get_logger
1✔
24

25

26
logger = get_logger(__name__)
1✔
27

28
SIMPLIFIED_KEYS = [
1✔
29
    'log_path',
30
    'json_path',
31
    'json_url',
32
    'size',
33
    'position',
34
    'status',
35
    '_head',
36

37
    'pages',
38
    'items',
39
    'first_log_time',
40
    'latest_log_time',
41
    'runtime',
42
    'shutdown_reason',
43
    'finish_reason',
44
    'last_update_time'
45
]
46

47

48
# noinspection PyBroadException
49
class LogParser(Common):
1✔
50
    # datas = {}  # Cause shared self.datas between test functions!
51
    logger = logger
1✔
52

53
    def __init__(self, scrapyd_server, scrapyd_logs_dir, parse_round_interval,
1✔
54
                 enable_telnet, override_telnet_console_host, log_encoding, log_extensions,
55
                 log_head_lines, log_tail_lines, log_categories_limit, jobs_to_keep, chunk_size,
56
                 delete_existing_json_files_at_startup, keep_data_in_memory, verbose,
57
                 main_pid=0, debug=False, exit_timeout=0):
58
        self.SCRAPYD_SERVER = scrapyd_server
1✔
59
        self.SCRAPYD_LOGS_DIR = scrapyd_logs_dir
1✔
60
        self.PARSE_ROUND_INTERVAL = parse_round_interval
1✔
61
        self.ENABLE_TELNET = enable_telnet
1✔
62

63
        self.OVERRIDE_TELNET_CONSOLE_HOST = override_telnet_console_host
1✔
64
        self.LOG_ENCODING = log_encoding
1✔
65
        self.LOG_EXTENSIONS = log_extensions
1✔
66
        self.LOG_HEAD_LINES = log_head_lines
1✔
67
        self.LOG_TAIL_LINES = log_tail_lines
1✔
68
        self.LOG_CATEGORIES_LIMIT = log_categories_limit
1✔
69
        self.JOBS_TO_KEEP = jobs_to_keep
1✔
70
        self.CHUNK_SIZE = chunk_size
1✔
71
        self.DELETE_EXISTING_JSON_FILES_AT_STARTUP = delete_existing_json_files_at_startup
1✔
72
        self.KEEP_DATA_IN_MEMORY = keep_data_in_memory
1✔
73

74
        self.verbose = verbose
1✔
75
        if self.verbose:
1✔
76
            self.logger.setLevel(logging.DEBUG)
1✔
77
        else:
78
            self.logger.setLevel(logging.INFO)
1✔
79
        self.DEBUG = debug
1✔
80
        self.EXIT_TIMEOUT = exit_timeout
1✔
81

82
        self.main_pid = main_pid
1✔
83
        self.logparser_pid = os.getpid()
1✔
84

85
        # TypeError: Object of type set is not JSON serializable
86
        self.logger.debug(self.json_dumps(vars(self)))
1✔
87

88
        self.stats_json_path = os.path.join(self.SCRAPYD_LOGS_DIR, 'stats.json')
1✔
89
        self.stats_json_url = 'http://%s/logs/stats.json' % self.SCRAPYD_SERVER
1✔
90
        self.logparser_version = __version__
1✔
91
        self.init_time = time.time()
1✔
92
        self.log_paths = []
1✔
93
        self.existing_file_keys = set()
1✔
94
        self.datas = {}
1✔
95

96
        if self.DELETE_EXISTING_JSON_FILES_AT_STARTUP:
1✔
97
            self.delete_existing_results()
1✔
98

99
        if not os.path.exists(self.stats_json_path):
1✔
100
            self.save_text_into_logs_dir('stats.json', self.json_dumps(self.get_default_stats()))
1✔
101

102
    def calc_runtime(self, start_string, end_string):
1✔
103
        try:
1✔
104
            start_datetime = datetime.strptime(start_string, '%Y-%m-%d %H:%M:%S')
1✔
105
            end_datetime = datetime.strptime(end_string, '%Y-%m-%d %H:%M:%S')
1✔
106
        except (TypeError, ValueError):  # 0 or ''
1✔
107
            return self.NA
1✔
108
        else:
109
            return str(end_datetime - start_datetime)
1✔
110

111
    # REF: /scrapydweb/scrapydweb/utils/poll.py
112
    def check_exit(self):
1✔
113
        exit_condition_1 = pid_exists is not None and not pid_exists(self.main_pid)
1✔
114
        exit_condition_2 = not self.ON_WINDOWS and not self.check_pid(self.main_pid)
1✔
115
        if exit_condition_1 or exit_condition_2:
1✔
116
            sys.exit("!!! LogParser subprocess (pid: %s) exits "
×
117
                     "since main_pid %s not exists" % (self.logparser_pid, self.main_pid))
118

119
    @staticmethod
1✔
120
    def check_pid(pid):
121
        """ Check For the existence of a unix pid. """
122
        try:
1✔
123
            os.kill(pid, 0)
1✔
124
        except OSError:
×
125
            return False
×
126
        else:
127
            return True
1✔
128

129
    def count_actual_lines(self, text):
1✔
130
        return len(re.split(self.LINESEP_BULK_PATTERN, text))
1✔
131

132
    def cut_text(self, text, lines_limit, keep_head=True):
1✔
133
        count = 0
1✔
134
        lines = []
1✔
135
        for line in re.split(self.LINESEP_PATTERN, text)[::1 if keep_head else -1]:
1✔
136
            lines.append(line)
1✔
137
            if line.strip():
1✔
138
                count += 1
1✔
139
            if count == lines_limit:
1✔
140
                break
1✔
141
        return '\n'.join(lines[::1 if keep_head else -1])
1✔
142

143
    def delete_existing_results(self):
1✔
144
        for path in glob.glob(os.path.join(self.SCRAPYD_LOGS_DIR, '*/*/*.json')):
1✔
145
            try:
1✔
146
                os.remove(path)
1✔
147
            except Exception as err:
×
148
                self.logger.error(err)
×
149
            else:
150
                self.logger.warning("Deleted %s", path)
1✔
151

152
    def find_text_to_ignore(self, text):
1✔
153
        lines = re.split(r'\n', text)  # KEEP the same '\n'
1✔
154
        m = re.search(self.PATTERN_LOG_ENDING, text)
1✔
155
        if m:
1✔
156
            self.logger.info("Found log ending:\n%s", self.format_log_block('log ending', m.group()))
1✔
157
            text_to_ignore = ''
1✔
158
        else:
159
            # To ensure the integrity of a log with multilines, e.g. error with traceback info,
160
            # the tail of the appended_log must be ignored
161
            # 2019-01-01 00:00:01 [test] WARNING: warning  # Would be parsed in this round
162
            # 123abc                                       # Would be parsed in this round
163
            # -------------------------------------------------------------------------
164
            # 2019-01-01 00:00:01 [test] ERROR: error      # Would be ignored for next round
165
            # 456abc                                       # Would be ignored for next round
166
            if len(re.findall(self.DATETIME_PATTERN + r'[ ].+?\n', text)) < 2:
1✔
167
                text_to_ignore = text
1✔
168
                self.logger.debug("Skip short appended log for next round: %s", repr(text_to_ignore))
1✔
169
            else:
170
                lines_to_ignore = []
1✔
171
                for line in lines[::-1]:
1✔
172
                    lines_to_ignore.append(line)
1✔
173
                    if re.match(self.DATETIME_PATTERN, line):
1✔
174
                        break
1✔
175
                text_to_ignore = '\n'.join(lines_to_ignore[::-1])  # KEEP the same '\n'
1✔
176
                self.logger.debug("Text to be ignored for next round: %s", repr(text_to_ignore))
1✔
177

178
        return text_to_ignore
1✔
179

180
    @staticmethod
1✔
181
    def format_log_block(title, log, lines_limit=0):
1✔
182
        if lines_limit:
1✔
183
            lines = re.split(r'\n', log)
1✔
184
            half = max(1, int(lines_limit / 2))
1✔
185
            if len(lines) > lines_limit:
1✔
186
                log = '\n'.join(lines[:half] + ['......'] + lines[-half:])
1✔
187
        return u'\n{title}:\n{sign}\n{log}\n{sign}\n'.format(title=title, log=log, sign='=' * 150)
1✔
188

189
    def get_default_stats(self):
1✔
190
        last_update_timestamp = int(time.time())
1✔
191
        last_update_time = datetime.fromtimestamp(last_update_timestamp).strftime('%Y-%m-%d %H:%M:%S')
1✔
192
        return dict(status='ok', datas={},
1✔
193
                    settings_py=self.handle_slash(self.SETTINGS_PY_PATH), settings=custom_settings,
194
                    last_update_timestamp=last_update_timestamp, last_update_time=last_update_time,
195
                    logparser_version=self.logparser_version)
196

197
    def handle_logfile(self, log_path):
1✔
198
        self.logger.debug("log_path: %s", log_path)
1✔
199
        project, spider, job, ext = self.parse_log_path(log_path)
1✔
200
        self.existing_file_keys.add(log_path)
1✔
201

202
        # The last modification time of Scrapy log does not change over time?!
203
        # mtime = os.path.getmtime(log_path)
204
        # datetime.fromtimestamp(os.path.getmtime(log_path)).strftime('%Y-%m-%dT%H_%M_%S')
205
        size = os.path.getsize(log_path)
1✔
206

207
        if log_path not in self.datas:
1✔
208
            self.logger.info("New logfile found: %s (%s bytes)", log_path, size)
1✔
209
            json_path = os.path.join(self.SCRAPYD_LOGS_DIR, project, spider, '%s.json' % job)
1✔
210
            json_url = 'http://%s/logs/%s/%s/%s.json' % (self.SCRAPYD_SERVER, project, spider, job)
1✔
211
            # NOTE: do not use handle_slash() on log_path since parse_log_path() use os.sep
212
            data = OrderedDict(log_path=log_path, json_path=json_path, json_url=json_url,
1✔
213
                               size=size, position=0, status='ok', _head='')
214
            self.datas[log_path] = data
1✔
215

216
            loaded_data = self.read_data(json_path)
1✔
217
            if loaded_data.get('size', -1) == size:
1✔
218
                data.update(loaded_data)  # AVOID using data =
1✔
219
                self.logger.info("New logfile and its data with same size found: %s (size: %s) -> skip",
1✔
220
                                 json_path, loaded_data['size'])
221
                return
1✔
222
            else:
223
                self.logger.info("New logfile: %s (%s bytes) -> parse", log_path, size)
1✔
224
        elif size < self.datas[log_path]['size']:
1✔
225
            self.logger.warning("Old logfile with smaller size: %s (before: %s, now: %s bytes) -> parse in next round",
1✔
226
                                log_path, self.datas[log_path]['size'], size)
227
            self.datas.pop(log_path)
1✔
228
            return
1✔
229
        elif size == self.datas[log_path]['size']:
1✔
230
            self.logger.debug("Old logfile with old size: %s (%s bytes) -> skip", log_path, size)
1✔
231
            return
1✔
232
        else:
233
            self.logger.info("Old logfile with new size: %s (%s bytes) -> parse", log_path, size)
1✔
234
            data = self.datas[log_path]
1✔
235

236
            if not self.KEEP_DATA_IN_MEMORY:
1✔
237
                # If the json file is broken, the logfile should be reparsed from position 0
238
                loaded_data = self.read_data(data['json_path'])
1✔
239
                if (loaded_data.get('size', -1) == data['size']
1✔
240
                   and loaded_data.get('position', -1) == data['position']):
241
                    data.update(loaded_data)
1✔
242
                else:
243
                    self.logger.warning("The logfile would be parsed from position 0: %s", log_path)
1✔
244
                    data['position'] = 0
1✔
245
                    data.pop('first_log_time', None)  # See parse_appended_log()
1✔
246
            data['size'] = size
1✔
247

248
        # f.read(1000000) => f.tell() 15868     # safe
249
        # f.seek(1000000) => f.tell() 1000000   # unexpected
250
        # Add max() for logfile with 0 size
251
        for __ in range(data['position'], max(1, data['size']), self.CHUNK_SIZE):
1✔
252
            self.logger.debug("Remaining size to be read: %s bytes", data['size'] - data['position'])
1✔
253
            appended_log = self.read_appended_log(data, size=self.CHUNK_SIZE)
1✔
254
            if self.DEBUG:
1✔
255
                self.save_text_into_logs_dir('appended_log.log', appended_log)
1✔
256
            self.parse_appended_log(data, appended_log)
1✔
257

258
        return data
1✔
259

260
    @staticmethod
1✔
261
    def handle_slash(string):
262
        if not string:
1✔
263
            return string
×
264
        else:
265
            return string.replace('\\', '/')
1✔
266

267
    def handle_telnet(self, data):
1✔
268
        data.setdefault('crawler_engine', {})
1✔
269
        if (self.ENABLE_TELNET
1✔
270
           and data['latest_matches']['telnet_console']
271
           and data['crawler_stats'].get('source') != 'log'):  # Do not telnet when the job is finished
272
            mytelnet = MyTelnet(data, self.OVERRIDE_TELNET_CONSOLE_HOST, self.verbose)
1✔
273
            crawler_stats, crawler_engine = mytelnet.main()
1✔
274
            if crawler_stats:
1✔
275
                # update_log_count=False to avoid wrong count in parse_appended_log() when the job is running
UNCOV
276
                self.update_data_with_crawler_stats(data, crawler_stats, update_log_count=False)
×
277

278
            data['crawler_stats'] = crawler_stats or data['crawler_stats']
1✔
279
            data['crawler_engine'] = crawler_engine or data['crawler_engine']
1✔
280
            self.logger.debug("crawler_stats:\n%s", self.json_dumps(data['crawler_stats']))
1✔
281
            self.logger.debug("crawler_engine:\n%s", self.json_dumps(data['crawler_engine']))
1✔
282

283
    @staticmethod
1✔
284
    def json_dumps(obj, sort_keys=False):
1✔
285
        return json.dumps(obj, ensure_ascii=False, indent=4, sort_keys=sort_keys)
1✔
286

287
    def main(self):
1✔
288
        while True:
1✔
289
            if self.main_pid:
1✔
290
                self.check_exit()
1✔
291
            start_time = time.time()
1✔
292
            try:
1✔
293
                self.run()
1✔
294
                end_time = time.time()
1✔
295
                self.logger.debug("Took %.1f seconds in this round", (end_time - start_time))
1✔
296
                if 0 < self.EXIT_TIMEOUT < end_time - self.init_time:
1✔
297
                    self.logger.critical("GoodBye, EXIT_TIMEOUT: %s", self.EXIT_TIMEOUT)
1✔
298
                    break
1✔
299
                else:
300
                    self.logger.info("Sleeping for %ss", self.PARSE_ROUND_INTERVAL)
1✔
301
                    time.sleep(self.PARSE_ROUND_INTERVAL)
1✔
302
            except KeyboardInterrupt:
×
303
                if self.main_pid:
×
304
                    self.logger.warning("LogParser subprocess (pid: %s) cancelled by KeyboardInterrupt",
×
305
                                        self.logparser_pid)
306
                else:
307
                    self.logger.warning("KeyboardInterrupt")
×
308
                sys.exit()
×
309
            except:
×
310
                self.logger.error(traceback.format_exc())
×
311

312
    def parse_appended_log(self, data, appended_log):
1✔
313
        tail_backup = data.get('tail', '')
1✔
314
        # Note that appended_log may be an empty string
315
        data_ = parse(appended_log, self.LOG_HEAD_LINES, self.LOG_TAIL_LINES)
1✔
316
        self.logger.debug("Parsed data_ from appended_log:\n%s", self.json_dumps(data_))
1✔
317

318
        if 'first_log_time' not in data:
1✔
319
            # To keep the order of keys in Python 2
320
            for k, v in data_.items():
1✔
321
                data[k] = v
1✔
322
        else:
323
            # data['head'] would be updated below
324
            data['tail'] = data_['tail']
1✔
325

326
            if data['first_log_time'] == self.NA:
1✔
327
                data['first_log_time'] = data_['first_log_time']
1✔
328
                data['first_log_timestamp'] = data_['first_log_timestamp']
1✔
329
            if data_['latest_log_time'] != self.NA:
1✔
330
                data['latest_log_time'] = data_['latest_log_time']
1✔
331
                data['latest_log_timestamp'] = data_['latest_log_timestamp']
1✔
332
            data['runtime'] = self.calc_runtime(data['first_log_time'], data['latest_log_time'])
1✔
333

334
            data['datas'].extend(data_['datas'])
1✔
335
            for k in ['pages', 'items']:
1✔
336
                if data[k] is None:
1✔
337
                    data[k] = data_[k]
1✔
338
                elif data_[k] is not None:
1✔
339
                    data[k] = max(data[k], data_[k])
1✔
340

341
            for k, v in data_['latest_matches'].items():
1✔
342
                data['latest_matches'][k] = v or data['latest_matches'][k]
1✔
343
            # latest_crawl_timestamp, latest_scrape_timestamp
344
            for k in ['latest_crawl', 'latest_scrape']:
1✔
345
                if data_['latest_matches'][k]:
1✔
346
                    data['%s_timestamp' % k] = data_['%s_timestamp' % k]
1✔
347

348
            # "log_categories": {"critical_logs": {"count": 0, "details": []}}
349
            for k, v in data_['log_categories'].items():
1✔
350
                if v['count'] > 0:
1✔
351
                    if data_['finish_reason'] != self.NA:
1✔
352
                        data['log_categories'][k]['count'] = v['count']
1✔
353
                    else:
354
                        data['log_categories'][k]['count'] += v['count']
1✔
355
                data['log_categories'][k]['details'].extend(v['details'])
1✔
356

357
            for k in ['shutdown_reason', 'finish_reason']:
1✔
358
                if data_[k] != self.NA:
1✔
359
                    data[k] = data_[k]
1✔
360
            data['crawler_stats'] = data_['crawler_stats'] or data['crawler_stats']
1✔
361
            data['last_update_timestamp'] = data_['last_update_timestamp']
1✔
362
            data['last_update_time'] = data_['last_update_time']
1✔
363

364
        # To ensure the actual length of headlines and taillines
365
        if data['_head'] != self.LOG_HEAD_LINES:
1✔
366
            if data['_head']:
1✔
367
                if appended_log:
1✔
368
                    data['head'] = '%s\n%s' % (data['_head'], appended_log)
1✔
369
                else:  # appended_log would be empty string for short appended log
370
                    data['head'] = data['_head']
1✔
371
            else:
372
                data['head'] = appended_log
1✔
373
            data['head'] = self.cut_text(data['head'], self.LOG_HEAD_LINES)
1✔
374
            if self.count_actual_lines(data['head']) < self.LOG_HEAD_LINES:
1✔
375
                data['_head'] = data['head']
1✔
376
            else:
377
                data['_head'] = self.LOG_HEAD_LINES
1✔
378

379
        if self.count_actual_lines(data['tail']) < self.LOG_TAIL_LINES:
1✔
380
            if tail_backup:
1✔
381
                if appended_log:
1✔
382
                    data['tail'] = '%s\n%s' % (tail_backup, appended_log)
1✔
383
                else:
384
                    data['tail'] = tail_backup
1✔
385
            else:
386
                data['tail'] = appended_log
1✔
387
        data['tail'] = self.cut_text(data['tail'], self.LOG_TAIL_LINES, keep_head=False)
1✔
388

389
        # TO limit each item e.g. critical_logs in log_categories
390
        # "log_categories": {"critical_logs": {"count": 0, "details": []}}
391
        for k, v in data['log_categories'].items():
1✔
392
            v.update(details=v['details'][-self.LOG_CATEGORIES_LIMIT:])
1✔
393

394
        self.logger.info("crawled_pages: %s, scraped_items: %s", data['pages'], data['items'])
1✔
395

396
    def read_appended_log(self, data, size=-1, backoff_times=10):
1✔
397
        # If the argument size is omitted, None, or negative, reads and returns all data until EOF.
398
        # https://stackoverflow.com/a/21533561/10517783
399
        # In text files (those opened without a b in the mode string),
400
        # only seeks relative to the beginning of the file are allowed
401
        # b'\x80abc'.decode('utf-8')
402
        # UnicodeDecodeError: 'utf-8' codec can't decode byte 0x80 in position 0: invalid start byte
403
        size_backup = size
1✔
404
        text = ''
1✔
405
        with io.open(data['log_path'], 'rb') as f:
1✔
406
            f.seek(data['position'])
1✔
407
            for count in range(1, backoff_times + 1):
1✔
408
                try:
1✔
409
                    text = f.read(size).decode(self.LOG_ENCODING, 'strict')
1✔
410
                except UnicodeDecodeError as err:
1✔
411
                    self.logger.error(err)
1✔
412
                    if count == backoff_times:
1✔
413
                        self.logger.critical("Use f.read().decode(%s, 'replace') instead.", self.LOG_ENCODING)
1✔
414
                        f.seek(data['position'])
1✔
415
                        text = f.read(size_backup).decode(self.LOG_ENCODING, 'replace')
1✔
416
                    else:
417
                        # A backoff of 1 byte every time
418
                        size = f.tell() - data['position'] - 1
1✔
419
                        if size > 0:
1✔
420
                            f.seek(data['position'])
1✔
421
                            self.logger.warning("Fail %s times, backoff to %s and read %s bytes", count, f.tell(), size)
1✔
422
                else:
423
                    break
1✔
424
            current_stream_position = f.tell()
1✔
425

426
        text_to_ignore = self.find_text_to_ignore(text)
1✔
427
        if text_to_ignore == text:
1✔
428
            return ''
1✔
429
        else:
430
            data['position'] = current_stream_position - len(text_to_ignore.encode(self.LOG_ENCODING))
1✔
431
            appended_log = text[:-len(text_to_ignore)] if text_to_ignore else text
1✔
432
            self.logger.debug("Found appended log:\n%s",
1✔
433
                              self.format_log_block('appended log', appended_log, lines_limit=10))
434
            return appended_log
1✔
435

436
    def read_data(self, json_path):
1✔
437
        data = {}
1✔
438
        self.logger.debug("Try to load json file: %s", json_path)
1✔
439
        if not os.path.exists(json_path):
1✔
440
            self.logger.warning("Json file not found: %s", json_path)
1✔
441
        else:
442
            try:
1✔
443
                with io.open(json_path, 'r', encoding='utf-8') as f:
1✔
444
                    data = json.loads(f.read(), object_pairs_hook=OrderedDict)
1✔
445
            except Exception as err:
1✔
446
                self.logger.error(err)
1✔
447
            else:
448
                self.logger.debug("Loaded json file: %s", json_path)
1✔
449
                logparser_version = data.get('logparser_version', '')
1✔
450
                if logparser_version != __version__:
1✔
451
                    data = {}
1✔
452
                    self.logger.warning("Ignore json file for mismatching version : %s", logparser_version)
1✔
453
        return data
1✔
454

455
    def run(self):
1✔
456
        self.log_paths = []
1✔
457
        for ext in self.LOG_EXTENSIONS:
1✔
458
            self.log_paths.extend(glob.glob(os.path.join(self.SCRAPYD_LOGS_DIR, '*/*/*%s' % ext)))
1✔
459
        if not self.log_paths:
1✔
460
            self.logger.error("No logfiles found in %s/*/*/, check and update "
1✔
461
                              "the `SCRAPYD_LOGS_DIR` option in %s", self.SCRAPYD_LOGS_DIR, self.SETTINGS_PY_PATH)
462
        else:
463
            self.logger.info("Found %s logfiles", len(self.log_paths))
1✔
464

465
        self.existing_file_keys = set()
1✔
466
        for log_path in self.log_paths:
1✔
467
            try:
1✔
468
                data = self.handle_logfile(log_path)
1✔
469
                if not data:
1✔
470
                    continue
1✔
471
                self.handle_telnet(data)
1✔
472
                self.save_data(data)
1✔
473
            except:
×
474
                self.logger.error(traceback.format_exc())
×
475
                self.logger.warning("Pop %s from self.datas", log_path)
×
476
                self.datas.pop(log_path, None)
×
477

478
        if self.DEBUG:
1✔
479
            self.save_text_into_logs_dir('datas_complete.json', self.json_dumps(self.datas))
1✔
480
        self.simplify_datas_in_memory()
1✔
481
        if self.DEBUG:
1✔
482
            self.save_text_into_logs_dir('datas_simplified.json', self.json_dumps(self.datas))
1✔
483
        self.save_datas()
1✔
484

485
    def save_data(self, data):
1✔
486
        with io.open(data['json_path'], 'wb') as f:
1✔
487
            f.write(self.json_dumps(data).encode('utf-8', 'replace'))
1✔
488
        self.logger.info("Saved to %s", data['json_path'])
1✔
489

490
    def save_datas(self):
1✔
491
        stats = self.get_default_stats()
1✔
492
        for log_path, data in self.datas.items():
1✔
493
            if self.KEEP_DATA_IN_MEMORY and log_path in self.existing_file_keys:
1✔
494
                data = self.simplify_data(dict(data))
1✔
495
            else:
496
                data = dict(data)
1✔
497
            data.pop('_head')  # To simplify data for 'List Stats' in the Overview page
1✔
498
            project, spider, job, ext = self.parse_log_path(log_path)
1✔
499
            stats['datas'].setdefault(project, {})
1✔
500
            stats['datas'][project].setdefault(spider, {})
1✔
501
            stats['datas'][project][spider][job] = data
1✔
502
        text = self.json_dumps(stats)
1✔
503
        self.logger.debug("stats.json:\n%s", text)
1✔
504
        self.save_text_into_logs_dir('stats.json', text)
1✔
505

506
    def save_text_into_logs_dir(self, filename, text):
1✔
507
        path = os.path.join(self.SCRAPYD_LOGS_DIR, filename)
1✔
508
        with io.open(path, 'wb') as f:
1✔
509
            content = text.encode('utf-8', 'replace')
1✔
510
            f.write(content)
1✔
511
            if filename == 'stats.json':
1✔
512
                self.logger.info("Saved to %s (%s bytes). Visit stats at: %s", self.stats_json_path,
1✔
513
                                 len(content), self.stats_json_url)
514
            else:
515
                self.logger.info("Saved to %s (%s bytes)", filename, len(content))
1✔
516

517
    @staticmethod
1✔
518
    def simplify_data(data):
519
        data_ = OrderedDict()
1✔
520
        for k in SIMPLIFIED_KEYS:
1✔
521
            data_[k] = data[k]
1✔
522
        return data_
1✔
523

524
    def simplify_datas_in_memory(self):
1✔
525
        all_keys = set(self.datas.keys())
1✔
526
        redundant_keys = all_keys.difference(self.existing_file_keys)
1✔
527
        self.logger.debug("all_keys: %s", len(all_keys))
1✔
528
        self.logger.debug("existing_file_keys: %s", len(self.existing_file_keys))
1✔
529
        self.logger.debug("redundant_keys: %s", len(redundant_keys))
1✔
530
        if self.KEEP_DATA_IN_MEMORY:
1✔
531
            keys_to_simplify = redundant_keys
1✔
532
        else:
533
            keys_to_simplify = all_keys
1✔
534
        for key in keys_to_simplify:
1✔
535
            if 'head' not in self.datas[key]:  # Has been simplified
1✔
536
                continue
1✔
537
            self.logger.debug("Simplify %s in memory", key)
1✔
538
            self.datas[key] = self.simplify_data(self.datas[key])
1✔
539
        self.logger.debug("Datas in memory: ")
1✔
540
        for key, value in self.datas.items():
1✔
541
            self.logger.debug("%s: %s keys, size %s", key, len(value), sys.getsizeof(value))
1✔
542

543
        # Remove data of deleted log to reduce the size of the stats.json file
544
        if len(all_keys) > self.JOBS_TO_KEEP and redundant_keys:
1✔
545
            self.logger.debug("JOBS_TO_KEEP: %s", self.JOBS_TO_KEEP)
1✔
546
            self.logger.debug("Limit the size of all_keys in memory: %s", len(all_keys))
1✔
547
            for key in redundant_keys:
1✔
548
                self.datas.pop(key)
1✔
549
                self.logger.debug("Pop key: %s", key)
1✔
550
            self.logger.debug("Now all_keys in memory: %s", len(self.datas))
1✔
551
        else:
552
            self.logger.debug("all_keys in memory: %s", len(self.datas))
1✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc