• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

testit-tms / adapters-python / 19481362404

18 Nov 2025 09:43PM UTC coverage: 36.992%. Remained the same
19481362404

Pull #217

github

web-flow
Merge d7772ee46 into 7d331b3fc
Pull Request #217: fix: TMS-36027: fix the parsing of bool cli-parameters.

2 of 8 new or added lines in 4 files covered. (25.0%)

1 existing line in 1 file now uncovered.

1301 of 3517 relevant lines covered (36.99%)

0.74 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

35.04
/testit-python-commons/src/testit_python_commons/client/helpers/bulk_autotest_helper.py
1
import logging
2✔
2

3
from testit_api_client.apis import AutoTestsApi, TestRunsApi
2✔
4
from testit_api_client.models import (
2✔
5
    AutoTestPostModel,
6
    AutoTestPutModel,
7
    AutoTestResultsForTestRunModel,
8
    LinkAutoTestToWorkItemRequest,
9
    WorkItemIdentifierModel
10
)
11

12
from testit_python_commons.client.client_configuration import ClientConfiguration
2✔
13
from testit_python_commons.client.helpers.threads_manager import ThreadsManager
2✔
14
from testit_python_commons.client.models import (
2✔
15
    ThreadForCreateAndResult,
16
    ThreadsForCreateAndResult,
17
    ThreadForUpdateAndResult,
18
    ThreadsForUpdateAndResult
19
)
20
from testit_python_commons.services.logger import adapter_logger
2✔
21
from testit_python_commons.services.retry import retry
2✔
22
from testit_python_commons.utils.html_escape_utils import HtmlEscapeUtils
2✔
23
from typing import Dict, List
2✔
24

25

26
class BulkAutotestHelper:
2✔
27
    __max_tests_for_import = 100
2✔
28

29
    def __init__(
2✔
30
            self,
31
            autotests_api: AutoTestsApi,
32
            test_runs_api: TestRunsApi,
33
            config: ClientConfiguration):
34
        self.__autotests_api = autotests_api
×
35
        self.__test_runs_api = test_runs_api
×
36
        self.__test_run_id = config.get_test_run_id()
×
37
        self.__automatic_updation_links_to_test_cases = config.get_automatic_updation_links_to_test_cases()
×
38
        self.__threads_manager = ThreadsManager()
×
39

40
    @adapter_logger
2✔
41
    def add_for_create(
2✔
42
            self,
43
            create_model: AutoTestPostModel,
44
            result_model: AutoTestResultsForTestRunModel):
45
        thread_for_create_and_result: ThreadForCreateAndResult = self.__threads_manager.\
×
46
            get_thread_for_create_and_result(create_model.external_id)
47

48
        thread_for_create: Dict[str, AutoTestPostModel] = thread_for_create_and_result.get_thread_for_create()
×
49
        thread_for_create[create_model.external_id] = create_model
×
50

51
        thread_results_for_created_autotests: Dict[str, AutoTestResultsForTestRunModel] = thread_for_create_and_result\
×
52
            .get_thread_results_for_created_autotests()
53
        thread_results_for_created_autotests[result_model.auto_test_external_id] = result_model
×
54

55
        if len(thread_for_create) >= self.__max_tests_for_import:
×
56
            self.__teardown_for_create()
×
57

58
    @adapter_logger
2✔
59
    def add_for_update(
2✔
60
            self,
61
            update_model: AutoTestPutModel,
62
            result_model: AutoTestResultsForTestRunModel,
63
            autotest_links_to_wi_for_update: Dict[str, List[str]]):
64
        thread_for_update_and_result: ThreadForUpdateAndResult = self.__threads_manager.\
×
65
            get_thread_for_update_and_result(update_model.external_id)
66

67
        thread_for_update: Dict[str, AutoTestPutModel] = thread_for_update_and_result.get_thread_for_update()
×
68
        thread_for_update[update_model.external_id] = update_model
×
69

70
        thread_results_for_updated_autotests: Dict[str, AutoTestResultsForTestRunModel] = thread_for_update_and_result\
×
71
            .get_thread_results_for_updated_autotests()
72
        thread_results_for_updated_autotests[result_model.auto_test_external_id] = result_model
×
73

74
        thread_for_autotest_links_to_wi_for_update: Dict[str, List[str]] = thread_for_update_and_result\
×
75
            .get_thread_for_autotest_links_to_wi_for_update()
76
        thread_for_autotest_links_to_wi_for_update.update(autotest_links_to_wi_for_update)
×
77

78
        if len(thread_for_update) >= self.__max_tests_for_import:
×
79
            self.__teardown_for_update()
×
80

81
    @adapter_logger
2✔
82
    def teardown(self):
2✔
83
        self.__teardown_for_create()
×
84
        self.__teardown_for_update()
×
85

86
    def __teardown_for_create(self):
2✔
87
        all_threads_for_create_and_result: ThreadsForCreateAndResult = self.__threads_manager\
×
88
            .get_all_threads_for_create_and_result()
89

90
        thread_for_create: Dict[str, AutoTestPostModel] = all_threads_for_create_and_result\
×
91
            .get_threads_for_create()
92
        autotests_for_create = list(thread_for_create.values())
×
93

94
        if autotests_for_create:
×
95
            self.__create_tests(autotests_for_create)
×
96

97
        threads_results_for_created_autotests: List[Dict[str, AutoTestResultsForTestRunModel]] = all_threads_for_create_and_result\
×
98
            .get_threads_results_for_created_autotests()
99

100
        for thread_results_for_created_autotests in threads_results_for_created_autotests:
×
101
            self.__load_test_results(list(thread_results_for_created_autotests.values()))
×
102

103
        self.__threads_manager.delete_threads_for_create_and_result()
×
104

105
    def __teardown_for_update(self):
2✔
106
        all_threads_for_update_and_result: ThreadsForUpdateAndResult = self.__threads_manager\
×
107
            .get_all_threads_for_update_and_result()
108

109
        thread_for_update: Dict[str, AutoTestPutModel] = all_threads_for_update_and_result\
×
110
            .get_threads_for_update()
111

112
        autotests_for_update = list(thread_for_update.values())
×
113

114
        if autotests_for_update:
×
115
            self.__update_tests(autotests_for_update)
×
116

117
        thread_for_autotest_links_to_wi_for_update: Dict[str, List[str]] = all_threads_for_update_and_result\
×
118
            .get_threads_for_autotest_links_to_wi_for_update()
119

120
        for autotest_id, work_item_ids in thread_for_autotest_links_to_wi_for_update.items():
×
121
            self.__update_autotest_link_from_work_items(autotest_id, work_item_ids)
×
122

123
        threads_results_for_updated_autotests: List[Dict[str, AutoTestResultsForTestRunModel]] = all_threads_for_update_and_result\
×
124
            .get_threads_results_for_updated_autotests()
125

126
        for thread_results_for_updated_autotests in threads_results_for_updated_autotests:
×
127
            self.__load_test_results(list(thread_results_for_updated_autotests.values()))
×
128

129
        self.__threads_manager.delete_threads_for_update_and_result()
×
130

131
    @adapter_logger
2✔
132
    def __bulk_create(
2✔
133
            self,
134
            thread_for_create: List[AutoTestPostModel],
135
            thread_results_for_created_autotests: List[AutoTestResultsForTestRunModel]
136
    ):
137
        self.__create_tests(thread_for_create)
×
138
        self.__load_test_results(thread_results_for_created_autotests)
×
139

140
    @adapter_logger
2✔
141
    def __bulk_update(
2✔
142
            self,
143
            thread_for_update: Dict[str, AutoTestPutModel],
144
            thread_results_for_updated_autotests: List[AutoTestResultsForTestRunModel],
145
            thread_for_autotest_links_to_wi_for_update: Dict[str, List[str]]
146
    ):
147
        self.__update_tests(list(thread_for_update.values()))
×
148
        self.__load_test_results(thread_results_for_updated_autotests)
×
149

150
        for autotest_id, work_item_ids in thread_for_autotest_links_to_wi_for_update.items():
×
151
            self.__update_autotest_link_from_work_items(autotest_id, work_item_ids)
×
152

153
    @adapter_logger
2✔
154
    def __create_tests(self, autotests_for_create: List[AutoTestPostModel]):
2✔
155
        logging.debug(f'Creating autotests: "{autotests_for_create}')
×
156

157
        autotests_for_create = HtmlEscapeUtils.escape_html_in_object(autotests_for_create)
×
158
        self.__autotests_api.create_multiple(auto_test_post_model=autotests_for_create)
×
159

160
        logging.debug(f'Autotests were created')
×
161

162
    @adapter_logger
2✔
163
    def __update_tests(self, autotests_for_update: List[AutoTestPutModel]):
2✔
164
        logging.debug(f'Updating autotests: {autotests_for_update}')
×
165

166
        autotests_for_update = HtmlEscapeUtils.escape_html_in_object(autotests_for_update)
×
167
        self.__autotests_api.update_multiple(auto_test_put_model=autotests_for_update)
×
168

169
        logging.debug(f'Autotests were updated')
×
170

171
    @adapter_logger
2✔
172
    def __load_test_results(self, test_results: List[AutoTestResultsForTestRunModel]):
2✔
173
        logging.debug(f'Loading test results: {test_results}')
×
174

175
        test_results = HtmlEscapeUtils.escape_html_in_object(test_results)
×
176
        self.__test_runs_api.set_auto_test_results_for_test_run(
×
177
            id=self.__test_run_id,
178
            auto_test_results_for_test_run_model=test_results)
179

180
    # TODO: delete after fix PUT/api/v2/autoTests
181
    @adapter_logger
2✔
182
    def __get_work_items_linked_to_autotest(self, autotest_global_id: str) -> List[WorkItemIdentifierModel]:
2✔
183
        return self.__autotests_api.get_work_items_linked_to_auto_test(id=autotest_global_id)
×
184

185
    # TODO: delete after fix PUT/api/v2/autoTests
186
    @adapter_logger
2✔
187
    @retry
2✔
188
    def __unlink_test_to_work_item(self, autotest_global_id: str, work_item_id: str):
2✔
189
        self.__autotests_api.delete_auto_test_link_from_work_item(
×
190
            id=autotest_global_id,
191
            work_item_id=work_item_id)
192

193
        logging.debug(f'Autotest was unlinked with workItem "{work_item_id}" by global id "{autotest_global_id}')
×
194

195
    # TODO: delete after fix PUT/api/v2/autoTests
196
    @adapter_logger
2✔
197
    @retry
2✔
198
    def __link_test_to_work_item(self, autotest_global_id: str, work_item_id: str):
2✔
199
        self.__autotests_api.link_auto_test_to_work_item(
×
200
            autotest_global_id,
201
            link_auto_test_to_work_item_request=LinkAutoTestToWorkItemRequest(id=work_item_id))
202

203
        logging.debug(f'Autotest was linked with workItem "{work_item_id}" by global id "{autotest_global_id}')
×
204

205
    # TODO: delete after fix PUT/api/v2/autoTests
206
    @adapter_logger
2✔
207
    def __update_autotest_link_from_work_items(self, autotest_global_id: str, work_item_ids: list):
2✔
208
        linked_work_items = self.__get_work_items_linked_to_autotest(autotest_global_id)
×
209

210
        for linked_work_item in linked_work_items:
×
211
            linked_work_item_id = str(linked_work_item.global_id)
×
212

213
            if linked_work_item_id in work_item_ids:
×
214
                work_item_ids.remove(linked_work_item_id)
×
215

216
                continue
×
217

NEW
218
            if self.__automatic_updation_links_to_test_cases:
×
219
                self.__unlink_test_to_work_item(autotest_global_id, linked_work_item_id)
×
220

221
        for work_item_id in work_item_ids:
×
222
            self.__link_test_to_work_item(autotest_global_id, work_item_id)
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc