• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

my8100 / scrapydweb / 374f7459-4bbd-421d-9b8d-94fbf13e60d3

12 Jan 2025 10:13AM UTC coverage: 42.723% (-43.1%) from 85.817%
374f7459-4bbd-421d-9b8d-94fbf13e60d3

Pull #250

circleci

my8100
Update test_schedule.py
Pull Request #250: Fix bug for scrapyd v1.5.0

1 of 3 new or added lines in 2 files covered. (33.33%)

1746 existing lines in 29 files now uncovered.

1726 of 4040 relevant lines covered (42.72%)

4.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

15.83
/scrapydweb/views/operations/execute_task.py
1
# coding: utf-8
2
import json
12✔
3
import logging
12✔
4
import re
12✔
5
import time
12✔
6
import traceback
12✔
7

8
from ...common import get_now_string, get_response_from_view, handle_metadata
12✔
9
from ...models import Task, TaskResult, TaskJobResult, db
12✔
10
from ...utils.scheduler import scheduler
12✔
11

12

13
apscheduler_logger = logging.getLogger('apscheduler')
12✔
14

15
REPLACE_URL_NODE_PATTERN = re.compile(r'^/(\d+)/')
12✔
16
EXTRACT_URL_SERVER_PATTERN = re.compile(r'//(.+?:\d+)')
12✔
17

18

19
class TaskExecutor(object):
12✔
20

21
    def __init__(self, task_id, task_name, url_scrapydweb, url_schedule_task, url_delete_task_result,
12✔
22
                 auth, selected_nodes):
UNCOV
23
        self.task_id = task_id
×
UNCOV
24
        self.task_name = task_name
×
UNCOV
25
        self.url_scrapydweb = url_scrapydweb
×
UNCOV
26
        self.url_schedule_task = url_schedule_task
×
UNCOV
27
        self.url_delete_task_result = url_delete_task_result
×
UNCOV
28
        self.auth = auth
×
UNCOV
29
        self.data = dict(
×
30
            task_id=task_id,
31
            jobid='task_%s_%s' % (task_id, get_now_string(allow_space=False))
32
        )
UNCOV
33
        self.selected_nodes = selected_nodes
×
UNCOV
34
        self.task_result_id = None  # Be set in get_task_result_id()
×
UNCOV
35
        self.pass_count = 0
×
UNCOV
36
        self.fail_count = 0
×
37

UNCOV
38
        self.sleep_seconds_before_retry = 3
×
UNCOV
39
        self.nodes_to_retry = []
×
UNCOV
40
        self.logger = logging.getLogger(self.__class__.__name__)
×
41

42
    def main(self):
12✔
UNCOV
43
        self.get_task_result_id()
×
UNCOV
44
        for index, nodes in enumerate([self.selected_nodes, self.nodes_to_retry]):
×
UNCOV
45
            if not nodes:
×
UNCOV
46
                continue
×
UNCOV
47
            if index == 1:
×
48
                # https://apscheduler.readthedocs.io/en/latest/userguide.html#shutting-down-the-scheduler
UNCOV
49
                self.logger.warning("Retry task #%s (%s) on nodes %s in %s seconds",
×
50
                                    self.task_id, self.task_name, nodes, self.sleep_seconds_before_retry)
UNCOV
51
                time.sleep(self.sleep_seconds_before_retry)
×
UNCOV
52
                self.logger.warning("Retrying task #%s (%s) on nodes %s", self.task_id, self.task_name, nodes)
×
UNCOV
53
            for node in nodes:
×
UNCOV
54
                result = self.schedule_task(node)
×
UNCOV
55
                if result:
×
UNCOV
56
                    if result['status'] == 'ok':
×
UNCOV
57
                        self.pass_count += 1
×
58
                    else:
UNCOV
59
                        self.fail_count += 1
×
UNCOV
60
                    self.db_insert_task_job_result(result)
×
UNCOV
61
        self.db_update_task_result()
×
62

63
    def get_task_result_id(self):
12✔
64
        # SQLite objects created in a thread can only be used in that same thread
UNCOV
65
        with db.app.app_context():
×
UNCOV
66
            task_result = TaskResult()
×
UNCOV
67
            task_result.task_id = self.task_id
×
UNCOV
68
            db.session.add(task_result)
×
69
            # db.session.flush()  # Get task_result.id before committing, flush() is part of commit()
UNCOV
70
            db.session.commit()
×
71
            # If directly use task_result.id later: Instance <TaskResult at 0x123> is not bound to a Session
UNCOV
72
            self.task_result_id = task_result.id
×
UNCOV
73
            self.logger.debug("Get new task_result_id %s for task #%s", self.task_result_id, self.task_id)
×
74

75
    def schedule_task(self, node):
12✔
76
        # TODO: Application was not able to create a URL adapter for request independent URL generation.
77
        # You might be able to fix this by setting the SERVER_NAME config variable.
78
        # with app.app_context():
79
        #     url_schedule_task = url_for('schedule.task', node=node)
80
        # http://127.0.0.1:5000/1/schedule/task/
81
        # /1/schedule/task/
UNCOV
82
        url_schedule_task = re.sub(REPLACE_URL_NODE_PATTERN, r'/%s/' % node, self.url_schedule_task)
×
UNCOV
83
        js = {}
×
UNCOV
84
        try:
×
85
            # assert '/1/' not in url_schedule_task, u"'故意出错'\r\n\"出错\"'故意出错'\r\n\"出错\""
86
            # assert False
87
            # time.sleep(10)
UNCOV
88
            js = get_response_from_view(url_schedule_task, auth=self.auth, data=self.data, as_json=True)
×
UNCOV
89
            assert js['status_code'] == 200 and js['status'] == 'ok', "Request got %s" % js
×
UNCOV
90
        except Exception as err:
×
UNCOV
91
            if node not in self.nodes_to_retry:
×
UNCOV
92
                apscheduler_logger.warning("Fail to execute task #%s (%s) on node %s, would retry later: %s",
×
93
                                           self.task_id, self.task_name, node, err)
UNCOV
94
                self.nodes_to_retry.append(node)
×
UNCOV
95
                return {}
×
96
            else:
UNCOV
97
                apscheduler_logger.error("Fail to execute task #%s (%s) on node %s, no more retries: %s",
×
98
                                         self.task_id, self.task_name, node, traceback.format_exc())
UNCOV
99
                js.setdefault('url', self.url_scrapydweb)  # '127.0.0.1:5000'
×
UNCOV
100
                js.setdefault('status_code', -1)
×
UNCOV
101
                js.setdefault('status', 'exception')
×
UNCOV
102
                js.setdefault('exception', traceback.format_exc())
×
UNCOV
103
        js.update(node=node)
×
UNCOV
104
        return js
×
105

106
    def db_insert_task_job_result(self, js):
12✔
UNCOV
107
        with db.app.app_context():
×
UNCOV
108
            if not TaskResult.query.get(self.task_result_id):
×
UNCOV
109
                apscheduler_logger.error("task_result #%s of task #%s not found", self.task_result_id, self.task_id)
×
UNCOV
110
                apscheduler_logger.warning("Discard task_job_result of task_result #%s of task #%s: %s",
×
111
                                           self.task_result_id, self.task_id, js)
UNCOV
112
                return
×
UNCOV
113
            task_job_result = TaskJobResult()
×
UNCOV
114
            task_job_result.task_result_id = self.task_result_id
×
UNCOV
115
            task_job_result.node = js['node']
×
UNCOV
116
            task_job_result.server = re.search(EXTRACT_URL_SERVER_PATTERN, js['url']).group(1)  # '127.0.0.1:6800'
×
UNCOV
117
            task_job_result.status_code = js['status_code']
×
UNCOV
118
            task_job_result.status = js['status']
×
UNCOV
119
            task_job_result.result = js.get('jobid', '') or js.get('message', '') or js.get('exception', '')
×
UNCOV
120
            db.session.add(task_job_result)
×
UNCOV
121
            db.session.commit()
×
UNCOV
122
            self.logger.info("Inserted task_job_result: %s", task_job_result)
×
123

124
    # https://stackoverflow.com/questions/13895176/sqlalchemy-and-sqlite-database-is-locked
125
    def db_update_task_result(self):
12✔
UNCOV
126
        with db.app.app_context():
×
UNCOV
127
            task = Task.query.get(self.task_id)
×
UNCOV
128
            task_result = TaskResult.query.get(self.task_result_id)
×
UNCOV
129
            if not task:
×
UNCOV
130
                apscheduler_logger.error("Task #%s not found", self.task_id)
×
131
                # if task_result:
132
                # '/1/tasks/xhr/delete/1/1/'
UNCOV
133
                url_delete_task_result = re.sub(r'/\d+/\d+/$', '/%s/%s/' % (self.task_id, self.task_result_id),
×
134
                                                self.url_delete_task_result)
UNCOV
135
                js = get_response_from_view(url_delete_task_result, auth=self.auth, data=self.data, as_json=True)
×
UNCOV
136
                apscheduler_logger.warning("Deleted task_result #%s [FAIL %s, PASS %s] of task #%s: %s",
×
137
                                           self.task_result_id, self.fail_count, self.pass_count, self.task_id, js)
UNCOV
138
                return
×
UNCOV
139
            if not task_result:
×
UNCOV
140
                apscheduler_logger.error("task_result #%s of task #%s not found", self.task_result_id, self.task_id)
×
UNCOV
141
                apscheduler_logger.warning("Failed to update task_result #%s [FAIL %s, PASS %s] of task #%s",
×
142
                                           self.task_result_id, self.fail_count, self.pass_count, self.task_id)
UNCOV
143
                return
×
UNCOV
144
            task_result.fail_count = self.fail_count
×
UNCOV
145
            task_result.pass_count = self.pass_count
×
UNCOV
146
            db.session.commit()
×
UNCOV
147
            self.logger.info("Inserted task_result: %s", task_result)
×
148

149

150
def execute_task(task_id):
12✔
UNCOV
151
    with db.app.app_context():
×
UNCOV
152
        task = Task.query.get(task_id)
×
UNCOV
153
        apscheduler_job = scheduler.get_job(str(task_id))
×
UNCOV
154
        if not task:
×
UNCOV
155
            apscheduler_job.remove()
×
UNCOV
156
            apscheduler_logger.error("apscheduler_job #{id} removed since task #{id} not exist. ".format(id=task_id))
×
157
        else:
UNCOV
158
            metadata = handle_metadata()
×
UNCOV
159
            username = metadata.get('username', '')
×
UNCOV
160
            password = metadata.get('password', '')
×
UNCOV
161
            url_delete_task_result = metadata.get('url_delete_task_result', '/1/tasks/xhr/delete/1/1/')
×
UNCOV
162
            task_executor = TaskExecutor(task_id=task_id,
×
163
                                         task_name=task.name,
164
                                         url_scrapydweb=metadata.get('url_scrapydweb', 'http://127.0.0.1:5000'),
165
                                         url_schedule_task=metadata.get('url_schedule_task', '/1/schedule/task/'),
166
                                         url_delete_task_result=url_delete_task_result,
167
                                         auth=(username, password) if username and password else None,
168
                                         selected_nodes=json.loads(task.selected_nodes))
UNCOV
169
            try:
×
UNCOV
170
                task_executor.main()
×
171
            except Exception:
×
172
                apscheduler_logger.error(traceback.format_exc())
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc