• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

testit-tms / adapters-python / 17790047561

17 Sep 2025 07:24AM UTC coverage: 43.237% (-0.2%) from 43.443%
17790047561

Pull #202

github

web-flow
Merge a14ece727 into b59427724
Pull Request #202: fix: TMS-35114: add threads for creating and updating autotests with …

61 of 177 new or added lines in 5 files covered. (34.46%)

117 existing lines in 3 files now uncovered.

1285 of 2972 relevant lines covered (43.24%)

0.87 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.96
/testit-python-commons/src/testit_python_commons/client/helpers/bulk_autotest_helper.py
1
import logging
2✔
2
import typing
2✔
3

4
from testit_api_client.apis import AutoTestsApi, TestRunsApi
2✔
5
from testit_api_client.models import (
2✔
6
    AutoTestPostModel,
7
    AutoTestPutModel,
8
    AutoTestResultsForTestRunModel,
9
    LinkAutoTestToWorkItemRequest,
10
    WorkItemIdentifierModel,
11
)
12

13
from testit_python_commons.client.client_configuration import ClientConfiguration
2✔
14
from testit_python_commons.client.helpers.threads_manager import ThreadsManager
2✔
15
from testit_python_commons.client.models import (
2✔
16
    ThreadForCreateAndResult,
17
    ThreadsForCreateAndResult,
18
    ThreadForUpdateAndResult,
19
    ThreadsForUpdateAndResult
20
)
21
from testit_python_commons.services.logger import adapter_logger
2✔
22
from testit_python_commons.services.retry import retry
2✔
23
from testit_python_commons.utils.html_escape_utils import HtmlEscapeUtils
2✔
24

25

26
class BulkAutotestHelper:
2✔
27
    __max_tests_for_import = 100
2✔
28

29
    def __init__(
2✔
30
            self,
31
            autotests_api: AutoTestsApi,
32
            test_runs_api: TestRunsApi,
33
            config: ClientConfiguration):
34
        self.__autotests_api = autotests_api
×
35
        self.__test_runs_api = test_runs_api
×
36
        self.__test_run_id = config.get_test_run_id()
×
37
        self.__automatic_updation_links_to_test_cases = config.get_automatic_updation_links_to_test_cases()
×
NEW
38
        self.__threads_manager = ThreadsManager()
×
39

40
    @adapter_logger
2✔
41
    def add_for_create(
2✔
42
            self,
43
            create_model: AutoTestPostModel,
44
            result_model: AutoTestResultsForTestRunModel):
NEW
45
        thread_for_create_and_result: ThreadForCreateAndResult = self.__threads_manager.\
×
46
            get_thread_for_create_and_result(create_model.external_id)
47

NEW
48
        thread_for_create: typing.Dict[str, AutoTestPostModel] = thread_for_create_and_result.get_thread_for_create()
×
NEW
49
        thread_for_create[create_model.external_id] = create_model
×
50

NEW
51
        thread_results_for_created_autotests: typing.List[AutoTestResultsForTestRunModel] = thread_for_create_and_result\
×
52
            .get_thread_results_for_created_autotests()
NEW
53
        thread_results_for_created_autotests.append(result_model)
×
54

NEW
55
        if len(thread_for_create) >= self.__max_tests_for_import:
×
NEW
56
            self.__bulk_create(thread_for_create, thread_results_for_created_autotests)
×
57

NEW
58
            self.__threads_manager.delete_thread_for_create_and_result(thread_for_create_and_result)
×
59

60
    @adapter_logger
2✔
61
    def add_for_update(
2✔
62
            self,
63
            update_model: AutoTestPutModel,
64
            result_model: AutoTestResultsForTestRunModel,
65
            autotest_links_to_wi_for_update: typing.Dict[str, typing.List[str]]):
NEW
66
        thread_for_update_and_result: ThreadForUpdateAndResult = self.__threads_manager.\
×
67
            get_thread_for_update_and_result(update_model.external_id)
68

NEW
69
        thread_for_update: typing.Dict[str, AutoTestPutModel] = thread_for_update_and_result.get_thread_for_update()
×
NEW
70
        thread_for_update[update_model.external_id] = update_model
×
71

NEW
72
        thread_results_for_updated_autotests: typing.List[AutoTestResultsForTestRunModel] = thread_for_update_and_result\
×
73
            .get_thread_results_for_updated_autotests()
NEW
74
        thread_results_for_updated_autotests.append(result_model)
×
75

NEW
76
        thread_for_autotest_links_to_wi_for_update: typing.Dict[str, typing.List[str]] = thread_for_update_and_result\
×
77
            .get_thread_for_autotest_links_to_wi_for_update()
NEW
78
        thread_for_autotest_links_to_wi_for_update.update(autotest_links_to_wi_for_update)
×
79

NEW
80
        if len(thread_for_update) >= self.__max_tests_for_import:
×
NEW
81
            self.__bulk_update(
×
82
                thread_for_update,
83
                thread_results_for_updated_autotests,
84
                thread_for_autotest_links_to_wi_for_update
85
            )
86

NEW
87
            self.__threads_manager.delete_thread_for_update_and_result(thread_for_update_and_result)
×
88

89
    @adapter_logger
2✔
90
    def teardown(self):
2✔
NEW
91
        self.__teardown_for_create()
×
NEW
92
        self.__teardown_for_update()
×
93

94
    def __teardown_for_create(self):
2✔
NEW
95
        all_threads_for_create_and_result: ThreadsForCreateAndResult = self.__threads_manager\
×
96
            .get_all_threads_for_create_and_result()
NEW
97
        threads_for_create: typing.List[typing.Dict[str, AutoTestPostModel]] = all_threads_for_create_and_result\
×
98
            .get_threads_for_create()
NEW
99
        threads_results_for_created_autotests: typing.List[typing.List[AutoTestResultsForTestRunModel]] = all_threads_for_create_and_result\
×
100
            .get_threads_results_for_created_autotests()
101

NEW
102
        for index in range(len(threads_for_create)):
×
NEW
103
            thread_for_create = threads_for_create[index]
×
NEW
104
            thread_results_for_created_autotests = threads_results_for_created_autotests[index]
×
105

NEW
106
            self.__bulk_create(thread_for_create, thread_results_for_created_autotests)
×
107

108
    def __teardown_for_update(self):
2✔
NEW
109
        all_threads_for_update_and_result: ThreadsForUpdateAndResult = self.__threads_manager\
×
110
            .get_all_threads_for_update_and_result()
NEW
111
        threads_for_update = all_threads_for_update_and_result.get_threads_for_update()
×
NEW
112
        threads_results_for_updated_autotests = all_threads_for_update_and_result\
×
113
            .get_threads_results_for_updated_autotests()
NEW
114
        threads_for_autotest_links_to_wi_for_update = all_threads_for_update_and_result\
×
115
            .get_threads_for_autotest_links_to_wi_for_update()
116

NEW
117
        for index in range(len(threads_for_update)):
×
NEW
118
            thread_for_update = threads_for_update[index]
×
NEW
119
            thread_results_for_updated_autotests = threads_results_for_updated_autotests[index]
×
NEW
120
            thread_for_autotest_links_to_wi_for_update = threads_for_autotest_links_to_wi_for_update[index]
×
121

NEW
122
            self.__bulk_create(
×
123
                thread_for_update,
124
                thread_results_for_updated_autotests,
125
                thread_for_autotest_links_to_wi_for_update
126
            )
127

128
    @adapter_logger
2✔
129
    def __bulk_create(
2✔
130
            self,
131
            thread_for_create: typing.Dict[str, AutoTestPostModel],
132
            thread_results_for_created_autotests: typing.List[AutoTestResultsForTestRunModel]
133
    ):
NEW
134
        self.__create_tests(list(thread_for_create.values()))
×
NEW
135
        self.__load_test_results(thread_results_for_created_autotests)
×
136

137
    @adapter_logger
2✔
138
    def __bulk_update(
2✔
139
            self,
140
            thread_for_update: typing.Dict[str, AutoTestPutModel],
141
            thread_results_for_updated_autotests: typing.List[AutoTestResultsForTestRunModel],
142
            thread_for_autotest_links_to_wi_for_update: typing.Dict[str, typing.List[str]]
143
    ):
NEW
144
        self.__update_tests(list(thread_for_update.values()))
×
NEW
145
        self.__load_test_results(thread_results_for_updated_autotests)
×
146

NEW
147
        for autotest_id, work_item_ids in thread_for_autotest_links_to_wi_for_update.items():
×
NEW
148
            self.__update_autotest_link_from_work_items(autotest_id, work_item_ids)
×
149

150
    @adapter_logger
2✔
151
    def __create_tests(self, autotests_for_create: typing.List[AutoTestPostModel]):
2✔
152
        logging.debug(f'Creating autotests: "{autotests_for_create}')
×
153

UNCOV
154
        autotests_for_create = HtmlEscapeUtils.escape_html_in_object(autotests_for_create)
×
155
        self.__autotests_api.create_multiple(auto_test_post_model=autotests_for_create)
×
156

UNCOV
157
        logging.debug(f'Autotests were created')
×
158

159
    @adapter_logger
2✔
160
    def __update_tests(self, autotests_for_update: typing.List[AutoTestPutModel]):
2✔
161
        logging.debug(f'Updating autotests: {autotests_for_update}')
×
162

UNCOV
163
        autotests_for_update = HtmlEscapeUtils.escape_html_in_object(autotests_for_update)
×
164
        self.__autotests_api.update_multiple(auto_test_put_model=autotests_for_update)
×
165

UNCOV
166
        logging.debug(f'Autotests were updated')
×
167

168
    @adapter_logger
2✔
169
    def __load_test_results(self, test_results: typing.List[AutoTestResultsForTestRunModel]):
2✔
170
        logging.debug(f'Loading test results: {test_results}')
×
171

UNCOV
172
        test_results = HtmlEscapeUtils.escape_html_in_object(test_results)
×
173
        self.__test_runs_api.set_auto_test_results_for_test_run(
×
174
            id=self.__test_run_id,
175
            auto_test_results_for_test_run_model=test_results)
176

177
    # TODO: delete after fix PUT/api/v2/autoTests
178
    @adapter_logger
2✔
179
    def __get_work_items_linked_to_autotest(self, autotest_global_id: str) -> typing.List[WorkItemIdentifierModel]:
2✔
180
        return self.__autotests_api.get_work_items_linked_to_auto_test(id=autotest_global_id)
×
181

182
    # TODO: delete after fix PUT/api/v2/autoTests
183
    @adapter_logger
2✔
184
    @retry
2✔
185
    def __unlink_test_to_work_item(self, autotest_global_id: str, work_item_id: str):
2✔
UNCOV
186
        self.__autotests_api.delete_auto_test_link_from_work_item(
×
187
            id=autotest_global_id,
188
            work_item_id=work_item_id)
189

UNCOV
190
        logging.debug(f'Autotest was unlinked with workItem "{work_item_id}" by global id "{autotest_global_id}')
×
191

192
    # TODO: delete after fix PUT/api/v2/autoTests
193
    @adapter_logger
2✔
194
    @retry
2✔
195
    def __link_test_to_work_item(self, autotest_global_id: str, work_item_id: str):
2✔
UNCOV
196
        self.__autotests_api.link_auto_test_to_work_item(
×
197
            autotest_global_id,
198
            link_auto_test_to_work_item_request=LinkAutoTestToWorkItemRequest(id=work_item_id))
199

UNCOV
200
        logging.debug(f'Autotest was linked with workItem "{work_item_id}" by global id "{autotest_global_id}')
×
201

202
    # TODO: delete after fix PUT/api/v2/autoTests
203
    @adapter_logger
2✔
204
    def __update_autotest_link_from_work_items(self, autotest_global_id: str, work_item_ids: list):
2✔
UNCOV
205
        linked_work_items = self.__get_work_items_linked_to_autotest(autotest_global_id)
×
206

UNCOV
207
        for linked_work_item in linked_work_items:
×
208
            linked_work_item_id = str(linked_work_item.global_id)
×
209

UNCOV
210
            if linked_work_item_id in work_item_ids:
×
211
                work_item_ids.remove(linked_work_item_id)
×
212

UNCOV
213
                continue
×
214

UNCOV
215
            if self.__automatic_updation_links_to_test_cases != 'false':
×
216
                self.__unlink_test_to_work_item(autotest_global_id, linked_work_item_id)
×
217

UNCOV
218
        for work_item_id in work_item_ids:
×
219
            self.__link_test_to_work_item(autotest_global_id, work_item_id)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc