• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

testit-tms / adapters-python / 17798992833

17 Sep 2025 01:19PM UTC coverage: 36.943% (-6.5%) from 43.443%
17798992833

Pull #202

github

web-flow
Merge bfd5ede57 into b59427724
Pull Request #202: fix: TMS-35114: add threads for creating and updating autotests with …

102 of 224 new or added lines in 15 files covered. (45.54%)

1 existing line in 1 file now uncovered.

1293 of 3500 relevant lines covered (36.94%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.96
/testit-python-commons/src/testit_python_commons/client/helpers/bulk_autotest_helper.py
1
import logging
2✔
2

3
from testit_api_client.apis import AutoTestsApi, TestRunsApi
2✔
4
from testit_api_client.models import (
2✔
5
    AutoTestPostModel,
6
    AutoTestPutModel,
7
    AutoTestResultsForTestRunModel,
8
    LinkAutoTestToWorkItemRequest,
9
    WorkItemIdentifierModel
10
)
11

12
from testit_python_commons.client.client_configuration import ClientConfiguration
2✔
13
from testit_python_commons.client.helpers.threads_manager import ThreadsManager
2✔
14
from testit_python_commons.client.models import (
2✔
15
    ThreadForCreateAndResult,
16
    ThreadsForCreateAndResult,
17
    ThreadForUpdateAndResult,
18
    ThreadsForUpdateAndResult
19
)
20
from testit_python_commons.services.logger import adapter_logger
2✔
21
from testit_python_commons.services.retry import retry
2✔
22
from testit_python_commons.utils.html_escape_utils import HtmlEscapeUtils
2✔
23
from typing import Dict, List
2✔
24

25

26
class BulkAutotestHelper:
2✔
27
    __max_tests_for_import = 100
2✔
28

29
    def __init__(
2✔
30
            self,
31
            autotests_api: AutoTestsApi,
32
            test_runs_api: TestRunsApi,
33
            config: ClientConfiguration):
34
        self.__autotests_api = autotests_api
×
35
        self.__test_runs_api = test_runs_api
×
36
        self.__test_run_id = config.get_test_run_id()
×
37
        self.__automatic_updation_links_to_test_cases = config.get_automatic_updation_links_to_test_cases()
×
NEW
38
        self.__threads_manager = ThreadsManager()
×
39

40
    @adapter_logger
2✔
41
    def add_for_create(
2✔
42
            self,
43
            create_model: AutoTestPostModel,
44
            result_model: AutoTestResultsForTestRunModel):
NEW
45
        thread_for_create_and_result: ThreadForCreateAndResult = self.__threads_manager.\
×
46
            get_thread_for_create_and_result(create_model.external_id)
47

NEW
48
        thread_for_create: Dict[str, AutoTestPostModel] = thread_for_create_and_result.get_thread_for_create()
×
NEW
49
        thread_for_create[create_model.external_id] = create_model
×
50

NEW
51
        thread_results_for_created_autotests: List[AutoTestResultsForTestRunModel] = thread_for_create_and_result\
×
52
            .get_thread_results_for_created_autotests()
NEW
53
        thread_results_for_created_autotests.append(result_model)
×
54

NEW
55
        if len(thread_for_create) >= self.__max_tests_for_import:
×
NEW
56
            self.__bulk_create(thread_for_create, thread_results_for_created_autotests)
×
57

NEW
58
            self.__threads_manager.delete_thread_for_create_and_result(thread_for_create_and_result)
×
59

60
    @adapter_logger
2✔
61
    def add_for_update(
2✔
62
            self,
63
            update_model: AutoTestPutModel,
64
            result_model: AutoTestResultsForTestRunModel,
65
            autotest_links_to_wi_for_update: Dict[str, List[str]]):
NEW
66
        thread_for_update_and_result: ThreadForUpdateAndResult = self.__threads_manager.\
×
67
            get_thread_for_update_and_result(update_model.external_id)
68

NEW
69
        thread_for_update: Dict[str, AutoTestPutModel] = thread_for_update_and_result.get_thread_for_update()
×
NEW
70
        thread_for_update[update_model.external_id] = update_model
×
71

NEW
72
        thread_results_for_updated_autotests: List[AutoTestResultsForTestRunModel] = thread_for_update_and_result\
×
73
            .get_thread_results_for_updated_autotests()
NEW
74
        thread_results_for_updated_autotests.append(result_model)
×
75

NEW
76
        thread_for_autotest_links_to_wi_for_update: Dict[str, List[str]] = thread_for_update_and_result\
×
77
            .get_thread_for_autotest_links_to_wi_for_update()
NEW
78
        thread_for_autotest_links_to_wi_for_update.update(autotest_links_to_wi_for_update)
×
79

NEW
80
        if len(thread_for_update) >= self.__max_tests_for_import:
×
NEW
81
            self.__bulk_update(
×
82
                thread_for_update,
83
                thread_results_for_updated_autotests,
84
                thread_for_autotest_links_to_wi_for_update
85
            )
86

NEW
87
            self.__threads_manager.delete_thread_for_update_and_result(thread_for_update_and_result)
×
88

89
    @adapter_logger
2✔
90
    def teardown(self):
2✔
NEW
91
        self.__teardown_for_create()
×
NEW
92
        self.__teardown_for_update()
×
93

94
    def __teardown_for_create(self):
2✔
NEW
95
        all_threads_for_create_and_result: ThreadsForCreateAndResult = self.__threads_manager\
×
96
            .get_all_threads_for_create_and_result()
NEW
97
        threads_for_create: List[Dict[str, AutoTestPostModel]] = all_threads_for_create_and_result\
×
98
            .get_threads_for_create()
NEW
99
        threads_results_for_created_autotests: List[List[AutoTestResultsForTestRunModel]] = all_threads_for_create_and_result\
×
100
            .get_threads_results_for_created_autotests()
101

NEW
102
        for index in range(len(threads_for_create)):
×
NEW
103
            thread_for_create = threads_for_create[index]
×
NEW
104
            thread_results_for_created_autotests = threads_results_for_created_autotests[index]
×
105

NEW
106
            self.__bulk_create(thread_for_create, thread_results_for_created_autotests)
×
107

108
    def __teardown_for_update(self):
2✔
NEW
109
        all_threads_for_update_and_result: ThreadsForUpdateAndResult = self.__threads_manager\
×
110
            .get_all_threads_for_update_and_result()
NEW
111
        threads_for_update: List[Dict[str, AutoTestPutModel]] = all_threads_for_update_and_result\
×
112
            .get_threads_for_update()
NEW
113
        threads_results_for_updated_autotests: List[List[AutoTestResultsForTestRunModel]] = all_threads_for_update_and_result\
×
114
            .get_threads_results_for_updated_autotests()
NEW
115
        threads_for_autotest_links_to_wi_for_update: List[Dict[str, List[str]]] = all_threads_for_update_and_result\
×
116
            .get_threads_for_autotest_links_to_wi_for_update()
117

NEW
118
        for index in range(len(threads_for_update)):
×
NEW
119
            thread_for_update = threads_for_update[index]
×
NEW
120
            thread_results_for_updated_autotests = threads_results_for_updated_autotests[index]
×
NEW
121
            thread_for_autotest_links_to_wi_for_update = threads_for_autotest_links_to_wi_for_update[index]
×
122

NEW
123
            self.__bulk_update(
×
124
                thread_for_update,
125
                thread_results_for_updated_autotests,
126
                thread_for_autotest_links_to_wi_for_update
127
            )
128

129
    @adapter_logger
2✔
130
    def __bulk_create(
2✔
131
            self,
132
            thread_for_create: Dict[str, AutoTestPostModel],
133
            thread_results_for_created_autotests: List[AutoTestResultsForTestRunModel]
134
    ):
NEW
135
        self.__create_tests(list(thread_for_create.values()))
×
NEW
136
        self.__load_test_results(thread_results_for_created_autotests)
×
137

138
    @adapter_logger
2✔
139
    def __bulk_update(
2✔
140
            self,
141
            thread_for_update: Dict[str, AutoTestPutModel],
142
            thread_results_for_updated_autotests: List[AutoTestResultsForTestRunModel],
143
            thread_for_autotest_links_to_wi_for_update: Dict[str, List[str]]
144
    ):
NEW
145
        self.__update_tests(list(thread_for_update.values()))
×
NEW
146
        self.__load_test_results(thread_results_for_updated_autotests)
×
147

NEW
148
        for autotest_id, work_item_ids in thread_for_autotest_links_to_wi_for_update.items():
×
NEW
149
            self.__update_autotest_link_from_work_items(autotest_id, work_item_ids)
×
150

151
    @adapter_logger
2✔
152
    def __create_tests(self, autotests_for_create: List[AutoTestPostModel]):
2✔
153
        logging.debug(f'Creating autotests: "{autotests_for_create}')
×
154

155
        autotests_for_create = HtmlEscapeUtils.escape_html_in_object(autotests_for_create)
×
156
        self.__autotests_api.create_multiple(auto_test_post_model=autotests_for_create)
×
157

158
        logging.debug(f'Autotests were created')
×
159

160
    @adapter_logger
2✔
161
    def __update_tests(self, autotests_for_update: List[AutoTestPutModel]):
2✔
162
        logging.debug(f'Updating autotests: {autotests_for_update}')
×
163

164
        autotests_for_update = HtmlEscapeUtils.escape_html_in_object(autotests_for_update)
×
165
        self.__autotests_api.update_multiple(auto_test_put_model=autotests_for_update)
×
166

167
        logging.debug(f'Autotests were updated')
×
168

169
    @adapter_logger
2✔
170
    def __load_test_results(self, test_results: List[AutoTestResultsForTestRunModel]):
2✔
171
        logging.debug(f'Loading test results: {test_results}')
×
172

173
        test_results = HtmlEscapeUtils.escape_html_in_object(test_results)
×
174
        self.__test_runs_api.set_auto_test_results_for_test_run(
×
175
            id=self.__test_run_id,
176
            auto_test_results_for_test_run_model=test_results)
177

178
    # TODO: delete after fix PUT/api/v2/autoTests
179
    @adapter_logger
2✔
180
    def __get_work_items_linked_to_autotest(self, autotest_global_id: str) -> List[WorkItemIdentifierModel]:
2✔
181
        return self.__autotests_api.get_work_items_linked_to_auto_test(id=autotest_global_id)
×
182

183
    # TODO: delete after fix PUT/api/v2/autoTests
184
    @adapter_logger
2✔
185
    @retry
2✔
186
    def __unlink_test_to_work_item(self, autotest_global_id: str, work_item_id: str):
2✔
187
        self.__autotests_api.delete_auto_test_link_from_work_item(
×
188
            id=autotest_global_id,
189
            work_item_id=work_item_id)
190

191
        logging.debug(f'Autotest was unlinked with workItem "{work_item_id}" by global id "{autotest_global_id}')
×
192

193
    # TODO: delete after fix PUT/api/v2/autoTests
194
    @adapter_logger
2✔
195
    @retry
2✔
196
    def __link_test_to_work_item(self, autotest_global_id: str, work_item_id: str):
2✔
197
        self.__autotests_api.link_auto_test_to_work_item(
×
198
            autotest_global_id,
199
            link_auto_test_to_work_item_request=LinkAutoTestToWorkItemRequest(id=work_item_id))
200

201
        logging.debug(f'Autotest was linked with workItem "{work_item_id}" by global id "{autotest_global_id}')
×
202

203
    # TODO: delete after fix PUT/api/v2/autoTests
204
    @adapter_logger
2✔
205
    def __update_autotest_link_from_work_items(self, autotest_global_id: str, work_item_ids: list):
2✔
206
        linked_work_items = self.__get_work_items_linked_to_autotest(autotest_global_id)
×
207

208
        for linked_work_item in linked_work_items:
×
209
            linked_work_item_id = str(linked_work_item.global_id)
×
210

211
            if linked_work_item_id in work_item_ids:
×
212
                work_item_ids.remove(linked_work_item_id)
×
213

214
                continue
×
215

216
            if self.__automatic_updation_links_to_test_cases != 'false':
×
217
                self.__unlink_test_to_work_item(autotest_global_id, linked_work_item_id)
×
218

219
        for work_item_id in work_item_ids:
×
220
            self.__link_test_to_work_item(autotest_global_id, work_item_id)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc