• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

chift-oneapi / chift-python-sdk / 14660830381

25 Apr 2025 08:56AM UTC coverage: 94.82% (-3.7%) from 98.536%
14660830381

push

github

web-flow
Merge pull request #39 from chift-oneapi/dima/pydatic-v2-migration

feature(sdk): migrate from pydantic.v1 to v2

356 of 361 new or added lines in 26 files covered. (98.61%)

190 existing lines in 13 files now uncovered.

3844 of 4054 relevant lines covered (94.82%)

0.95 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

27.27
/tests/test_sync.py
1
import uuid
1✔
2
from datetime import datetime
1✔
3

4
import pytest
1✔
5

6
from chift.openapi.models import Sync
1✔
7
from tests.fixtures import sync
1✔
8

9
today_date = datetime.now().strftime("%d/%m/%Y")
1✔
10

11

12
def _get_flow_data(
1✔
13
    flow_name=None, datastore_name=None, trigger_type="event", execution_type="code"
14
):
UNCOV
15
    if not flow_name:
×
UNCOV
16
        flow_name = str(uuid.uuid1())
×
UNCOV
17
    if not datastore_name:
×
UNCOV
18
        datastore_name = str(uuid.uuid1())
×
19

UNCOV
20
    data = {
×
21
        "name": flow_name,
22
        "description": "test",
23
        "triggers": [{"id": "trigger-1", "type": trigger_type}],
24
        "execution": {
25
            "type": execution_type,
26
            "data": {},
27
        },
28
        "config": {
29
            "datastores": [
30
                {
31
                    "name": "INVOICES",
32
                    "definition": {
33
                        "columns": [
34
                            {
35
                                "name": "source_id",
36
                                "type": "text",
37
                                "title": "Invoice source id",
38
                            },
39
                            {
40
                                "name": "target_id",
41
                                "type": "text",
42
                                "title": "Invoice target id",
43
                            },
44
                            {
45
                                "name": "reference",
46
                                "type": "text",
47
                                "title": "Invoice code/ref",
48
                            },
49
                        ],
50
                        "search_column": "source_id",
51
                    },
52
                },
53
                {
54
                    "name": "CONTACTS",
55
                    "definition": {
56
                        "columns": [
57
                            {
58
                                "name": "source_id",
59
                                "type": "text",
60
                                "title": "Contact source id",
61
                            },
62
                            {
63
                                "name": "target_id",
64
                                "type": "text",
65
                                "title": "Contact target id",
66
                            },
67
                        ],
68
                        "search_column": "source_id",
69
                    },
70
                },
71
                {
72
                    "name": "PRODUCTS",
73
                    "definition": {
74
                        "columns": [
75
                            {
76
                                "name": "source_id",
77
                                "type": "text",
78
                                "title": "Product source id",
79
                            },
80
                            {
81
                                "name": "target_id",
82
                                "type": "text",
83
                                "title": "Product target id",
84
                            },
85
                            {
86
                                "name": "reference",
87
                                "type": "text",
88
                                "title": "Product code/ref",
89
                            },
90
                        ],
91
                        "search_column": "source_id",
92
                    },
93
                },
94
                {
95
                    "name": "ERRORS",
96
                    "definition": {
97
                        "columns": [
98
                            {
99
                                "name": "message",
100
                                "type": "text",
101
                                "title": "Error message",
102
                            },
103
                            {
104
                                "name": "source_id",
105
                                "type": "text",
106
                                "title": "Invoice source id",
107
                            },
108
                        ]
109
                    },
110
                },
111
            ],
112
            "definitionFields": [
113
                {
114
                    "type": "date",
115
                    "title": "Début de la synchronisation? (dd/mm/yyyy)",
116
                    "name": "from_date",
117
                    "optional": False,
118
                },
119
                {
120
                    "type": "text",
121
                    "title": "Valeur par défault pour le pays (ISO2)",
122
                    "name": "default_country",
123
                    "default": "FR",
124
                    "optional": True,
125
                },
126
            ],
127
            "customFields": [
128
                {
129
                    "type": "source_node_id",
130
                    "value": 7012,
131
                },
132
                {
133
                    "type": "target_node_id",
134
                    "value": 7010,
135
                },
136
                {
137
                    "type": "tax_mapping_name",
138
                    "value": "Tax Rates Mapping",
139
                },
140
            ],
141
        },
142
    }
143

UNCOV
144
    if trigger_type == "timer":
×
UNCOV
145
        data["triggers"][0]["cronschedules"] = ["0/30 * * * *"]
×
146

UNCOV
147
    if execution_type == "code":
×
UNCOV
148
        data["execution"]["data"]["code"] = "console.log('This is fucking awesome!')"
×
149

UNCOV
150
    if execution_type == "module":
×
UNCOV
151
        data["execution"]["data"]["name"] = "Invoice to invoice"
×
152

UNCOV
153
    return data
×
154

155

156
def get_sync(chift):
1✔
157
    # TODO: improve this once sync creation endpoint is there
UNCOV
158
    sync = chift.Sync.get("76d8b868-4900-4302-9eee-c3d62bb79b2d")
×
UNCOV
159
    return sync
×
160

161

162
@pytest.mark.mock_chift_response(sync.READ_SYNC_ALL, sync.READ_SYNC_ALL[0])
1✔
163
def test_sync(chift):
1✔
164
    syncs = chift.Sync.all()
1✔
165

166
    assert syncs
1✔
167

168
    expected_sync = syncs[0]
1✔
169

170
    actual_sync = chift.Sync.get(expected_sync.syncid)
1✔
171

172
    assert expected_sync.name == actual_sync.name
1✔
173

174

175
@pytest.mark.skip(
1✔
176
    reason="The test is too complex for unit testing. it should be split into smaller tests."
177
)
178
def test_flow_update(chift):
1✔
UNCOV
179
    syncs = chift.Sync.all()
×
180

UNCOV
181
    sync: Sync = syncs[0]
×
182

183
    # create flow
UNCOV
184
    datastore_name = str(uuid.uuid1())
×
UNCOV
185
    data = _get_flow_data(datastore_name=datastore_name)
×
UNCOV
186
    flow_created = chift.Flow.create(sync.syncid, data)
×
187

UNCOV
188
    assert flow_created.id in [flow.id for flow in chift.Sync.get(sync.syncid).flows]
×
189

190
    # update flow
UNCOV
191
    data["description"] = "test updated"
×
UNCOV
192
    data["config"]["datastores"] = [
×
193
        {
194
            "name": datastore_name,
195
            "definition": {
196
                "columns": [{"name": "test_x", "title": "title", "type": "text"}]
197
            },
198
        },
199
        {
200
            "name": str(uuid.uuid1()),
201
            "definition": {
202
                "columns": [{"name": "test_y", "title": "title", "type": "text"}]
203
            },
204
        },
205
    ]
UNCOV
206
    flow_updated = chift.Flow.create(sync.syncid, data)
×
207

UNCOV
208
    assert flow_updated.id == flow_created.id
×
209

UNCOV
210
    for flow in chift.Sync.get(sync.syncid).flows:
×
UNCOV
211
        if flow.id == flow_created.id:
×
UNCOV
212
            assert flow.description == "test updated"
×
UNCOV
213
            assert len(flow.config.datastores) == 2
×
214

UNCOV
215
    chift.Flow.delete(sync.syncid, flow_created.id)
×
216

217

218
@pytest.mark.skip(
1✔
219
    reason="The test is too complex for unit testing. it should be split into smaller tests."
220
)
221
def test_flow_create_trigger_code(chift):
1✔
UNCOV
222
    syncs = chift.Sync.all()
×
223

UNCOV
224
    sync: Sync = syncs[0]
×
225

226
    # create flow
UNCOV
227
    flow_created = chift.Flow.create(sync.syncid, _get_flow_data(execution_type="code"))
×
228

229
    # test trigger flow of type code (AWS lambda) should not raise
UNCOV
230
    chift.Flow.trigger(
×
231
        sync.syncid, flow_created.id, {"data": {"from_date": today_date}}
232
    )
233

234

235
@pytest.mark.skip(
1✔
236
    reason="The test is too complex for unit testing. it should be split into smaller tests."
237
)
238
def test_flow_create_trigger_module(chift):
1✔
UNCOV
239
    syncs = chift.Sync.all()
×
UNCOV
240
    sync: Sync = syncs[0]
×
241

242
    # create flow
UNCOV
243
    flow_created = chift.Flow.create(
×
244
        sync.syncid, _get_flow_data(execution_type="module")
245
    )
246

247
    # test trigger flow of type chain should not raise
UNCOV
248
    chift.Flow.trigger(
×
249
        sync.syncid, flow_created.id, {"data": {"from_date": today_date}}
250
    )
251

252

253
@pytest.mark.skip(
1✔
254
    reason="The test is too complex for unit testing. it should be split into smaller tests."
255
)
256
def test_flow_create_trigger_timer(chift):
1✔
UNCOV
257
    syncs = chift.Sync.all()
×
258

UNCOV
259
    sync: Sync = syncs[0]
×
260

261
    # create flow with timer should not raise
UNCOV
262
    flow_created = chift.Flow.create(sync.syncid, _get_flow_data(trigger_type="timer"))
×
263

UNCOV
264
    assert flow_created
×
265

266

267
@pytest.mark.skip(
1✔
268
    reason="The test is too complex for unit testing. it should be split into smaller tests."
269
)
270
def test_create_flow(chift):
1✔
UNCOV
271
    sync = get_sync(chift)
×
272

UNCOV
273
    datastore_name = str(uuid.uuid1())
×
UNCOV
274
    data = _get_flow_data(
×
275
        flow_name="Migration de factures",
276
        datastore_name=datastore_name,
277
        execution_type="module",
278
    )
UNCOV
279
    flow_created = chift.Flow.create(sync.syncid, data)
×
280

UNCOV
281
    assert flow_created.id in [flow.id for flow in chift.Sync.get(sync.syncid).flows]
×
282

283
    # let's trigger it
UNCOV
284
    res = chift.Flow.trigger(
×
285
        sync.syncid,
286
        flow_created.id,
287
        {"data": {"from_date": today_date}},
288
    )
UNCOV
289
    status = chift.Flow.chainexecution(
×
290
        sync.syncid, flow_created.id, res["data"]["executionid"]
291
    )
UNCOV
292
    assert status.get("status")
×
293

294
    # let's trigger it for one consumer
UNCOV
295
    chift.Flow.trigger(
×
296
        sync.syncid,
297
        flow_created.id,
298
        {"consumers": sync.consumers[:1], "data": {"from_date": today_date}},
299
    )
300

301
    # test getting sync for consumer info
UNCOV
302
    consumer = chift.Consumer.get(sync.consumers[0])
×
UNCOV
303
    info = consumer.Sync.get(sync.syncid)
×
UNCOV
304
    assert info
×
305

306

307
@pytest.mark.skip(
1✔
308
    reason="The test is too complex for unit testing. it should be split into smaller tests."
309
)
310
def test_datastore(chift):
1✔
UNCOV
311
    sync = get_sync(chift)
×
312

UNCOV
313
    consumer = chift.Consumer.get(sync.consumers[0])
×
UNCOV
314
    datastore = sync.flows[0].config.datastores[0]
×
315

316
    # test create
UNCOV
317
    value = {}
×
318

UNCOV
319
    str_column_name = None
×
320

UNCOV
321
    for column in datastore.definition.columns:
×
UNCOV
322
        if column.type == "json":
×
323
            value[column.name] = {"1": "1"}
×
UNCOV
324
        elif column.type == "number":
×
325
            value[column.name] = 1
×
326
        else:
UNCOV
327
            str_column_name = column.name
×
UNCOV
328
            value[column.name] = "1"
×
329

UNCOV
330
    data = consumer.Data.create(datastore.id, [{"data": value}])[0]
×
331

332
    # test update
UNCOV
333
    updated_value = data.data
×
UNCOV
334
    if str_column_name in updated_value:
×
UNCOV
335
        updated_value[str_column_name] = "2"
×
336

UNCOV
337
    updated_data = consumer.Data.update(datastore.id, data.id, {"data": updated_value})
×
338

UNCOV
339
    if str_column_name in updated_data.data:
×
UNCOV
340
        assert updated_data.data[str_column_name] == "2"
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc