• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3809

01 Apr 2025 03:03AM UTC coverage: 34.048% (+0.02%) from 34.033%
#3809

push

travis-ci

happyguoxy
test:alter gcda dir

148452 of 599532 branches covered (24.76%)

Branch coverage included in aggregate %.

222312 of 489411 relevant lines covered (45.42%)

761122.82 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

47.3
/source/libs/stream/test/streamCheckPointTest.cpp
1
#include <gtest/gtest.h>
2
#include "tstream.h"
3
#include "streamInt.h"
4
#include "tcs.h"
5
#include "tglobal.h"
6

7
#pragma GCC diagnostic push
8
#pragma GCC diagnostic ignored "-Wwrite-strings"
9
#pragma GCC diagnostic ignored "-Wunused-function"
10
#pragma GCC diagnostic ignored "-Wunused-variable"
11
#pragma GCC diagnostic ignored "-Wsign-compare"
12
#pragma GCC diagnostic ignored "-Wsign-compare"
13
#pragma GCC diagnostic ignored "-Wformat"
14
#pragma GCC diagnostic ignored "-Wint-to-pointer-cast"
15
#pragma GCC diagnostic ignored "-Wpointer-arith"
16

17
void initTaskLock(SStreamTask* pTask) {
10✔
18
  TdThreadMutexAttr attr = {0};
10✔
19
  int32_t code = taosThreadMutexAttrInit(&attr);
10!
20
  ASSERT_EQ(code, TSDB_CODE_SUCCESS);
10!
21

22
  code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
10!
23
  ASSERT_EQ(code, TSDB_CODE_SUCCESS);
10!
24

25
  code = taosThreadMutexInit(&pTask->lock, &attr);
10!
26
  ASSERT_EQ(code, TSDB_CODE_SUCCESS);
10!
27

28
  code = taosThreadMutexAttrDestroy(&attr);
10!
29
  ASSERT_EQ(code, TSDB_CODE_SUCCESS);
10!
30
}
31

32
TEST(streamCheckpointTest, StreamTaskProcessCheckpointTriggerRsp) {
4✔
33
    SStreamTask* pTask = NULL;
1✔
34
    int64_t uid = 1111111111111111;
1✔
35
    SArray* array = taosArrayInit(4, POINTER_BYTES);
1!
36
    int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, STREAM_NORMAL_TASK, 0, 0, array,
1!
37
                                       false, 1, false, &pTask);
1✔
38
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
39

40
    initTaskLock(pTask);
1!
41

42
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
1!
43
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
44

45
    pTask->chkInfo.pActiveInfo->activeId = 123111;
1✔
46
    pTask->chkInfo.pActiveInfo->transId = 4561111;
1✔
47

48
    streamTaskSetStatusReady(pTask);
1!
49
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
1!
50
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
51

52
    SCheckpointTriggerRsp pRsp;
53
    memset(&pRsp, 0, sizeof(SCheckpointTriggerRsp));
1✔
54
    pRsp.rspCode = TSDB_CODE_SUCCESS;
1✔
55
    pRsp.checkpointId = 123;
1✔
56
    pRsp.transId = 456;
1✔
57
    pRsp.upstreamTaskId = 789;
1✔
58

59
    code = streamTaskProcessCheckpointTriggerRsp(pTask, &pRsp);
1!
60
    ASSERT_NE(code, TSDB_CODE_SUCCESS);
1!
61

62
    pRsp.rspCode = TSDB_CODE_FAILED;
1✔
63
    code = streamTaskProcessCheckpointTriggerRsp(pTask, &pRsp);
1!
64
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
65

66
    tFreeStreamTask(pTask);
1!
67
    taosArrayDestroy(array);
1!
68
}
69

70
TEST(streamCheckpointTest, StreamTaskSetFailedCheckpointId) {
4✔
71
    SStreamTask* pTask = NULL;
1✔
72
    int64_t uid = 1111111111111111;
1✔
73
    SArray* array = taosArrayInit(4, POINTER_BYTES);
1!
74
    int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, STREAM_NORMAL_TASK, 0, 0, array,
1!
75
                                       false, 1, false, &pTask);
1✔
76
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
77

78
    initTaskLock(pTask);
1!
79

80
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
1!
81
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
82

83
    SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
1✔
84
    pInfo->failedId = 0;
1✔
85

86
    int64_t failedCheckpointId = 123;
1✔
87

88
    streamTaskSetFailedCheckpointId(pTask, failedCheckpointId);
1!
89
    ASSERT_EQ(pInfo->failedId, failedCheckpointId);
1!
90

91
    streamTaskSetFailedCheckpointId(pTask, 0);
1!
92
    ASSERT_EQ(pInfo->failedId, failedCheckpointId);
1!
93

94
    streamTaskSetFailedCheckpointId(pTask, pInfo->failedId - 1);
1!
95
    ASSERT_EQ(pInfo->failedId, failedCheckpointId);
1!
96
    tFreeStreamTask(pTask);
1!
97
    taosArrayDestroy(array);
1!
98
}
99

100
TEST(UploadCheckpointDataTest, UploadSuccess) {
4✔
101
    streamMetaInit();
1!
102
    SStreamTask* pTask = NULL;
1✔
103
    int64_t uid = 1111111111111111;
1✔
104
    SArray* array = taosArrayInit(4, POINTER_BYTES);
1!
105
    int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, STREAM_NORMAL_TASK, 0, 0, array,
1!
106
                                       false, 1, false, &pTask);
1✔
107
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
108

109
    initTaskLock(pTask);
1!
110

111
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
1!
112
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
113

114
    int64_t checkpointId = 123;
1✔
115
    int64_t dbRefId = 1;
1✔
116
    ECHECKPOINT_BACKUP_TYPE type = DATA_UPLOAD_S3;
1✔
117

118
    STaskDbWrapper* pBackend = NULL;
1✔
119
    int64_t processVer = -1;
1✔
120
    const char *path = "/tmp/backend3/stream";
1✔
121
    code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta);
1!
122
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
123

124
    SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0);
1!
125
    ASSERT(pState != NULL);
1!
126

127
    pTask->pBackend = pState->pTdbState->pOwner->pBackend;
1✔
128

129
    SArray* pList = taosArrayInit(4, sizeof(int64_t));
1!
130
    code = taskDbDoCheckpoint(pTask->pBackend, checkpointId, 0, pList);
1!
131
    ASSERT(code == 0);
1!
132

133
    char* pDir = NULL;
1✔
134
    int32_t result = uploadCheckpointData(pTask, checkpointId, dbRefId, type, &pDir);
1!
135
    taosMemoryFree(pDir);
1!
136

137
    EXPECT_EQ(result, TSDB_CODE_SUCCESS) << "uploadCheckpointData should return 0 on success";
1!
138
    tFreeStreamTask(pTask);
1!
139
    taosRemoveDir(path);
1!
140
    streamStateClose(pState, true);
1!
141
    taosArrayDestroy(array);
1!
142
}
143

144
TEST(UploadCheckpointDataTest, UploadDisabled) {
4✔
145
    SStreamTask* pTask = NULL;
1✔
146
    int64_t uid = 2222222222222;
1✔
147
    SArray* array = taosArrayInit(4, POINTER_BYTES);
1!
148
    int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, STREAM_NORMAL_TASK, 0, 0, array,
1!
149
                                       false, 1, false, &pTask);
1✔
150
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
151

152
    initTaskLock(pTask);
1!
153

154
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
1!
155
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
156

157
    int64_t checkpointId = 123;
1✔
158
    int64_t dbRefId = 1;
1✔
159

160
    STaskDbWrapper* pBackend = NULL;
1✔
161
    int64_t processVer = -1;
1✔
162
    const char *path = "/tmp/backend4/stream";
1✔
163
    code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta);
1!
164
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
165

166
    SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0);
1!
167
    ASSERT(pState != NULL);
1!
168

169
    pTask->pBackend = pState->pTdbState->pOwner->pBackend;
1✔
170

171
    SArray* pList = taosArrayInit(4, sizeof(int64_t));
1!
172
    code = taskDbDoCheckpoint(pTask->pBackend, checkpointId, 0, pList);
1!
173
    ASSERT(code == 0);
1!
174
    taosArrayDestroy(pList);
1!
175

176
    ECHECKPOINT_BACKUP_TYPE type = DATA_UPLOAD_DISABLE;
1✔
177
    char* pDir = NULL;
1✔
178
    int32_t result = uploadCheckpointData(pTask, checkpointId, dbRefId, type, &pDir);
1!
179
    taosMemoryFree(pDir);
1!
180

181
    EXPECT_NE(result, TSDB_CODE_SUCCESS) << "uploadCheckpointData should return 0 when backup type is disabled";
1!
182

183
    streamStateClose(pState, true);
1!
184
    tFreeStreamTask(pTask);
1!
185
    taosArrayDestroy(array);
1!
186
}
187

188
TEST(StreamTaskAlreadySendTriggerTest, AlreadySendTrigger) {
4✔
189
    SStreamTask* pTask = NULL;
1✔
190
    int64_t uid = 2222222222222;
1✔
191
    SArray* array = taosArrayInit(4, POINTER_BYTES);
1!
192
    int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, STREAM_NORMAL_TASK, 0, 0, array,
1!
193
                                       false, 1, false, &pTask);
1✔
194
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
195

196
    initTaskLock(pTask);
1!
197

198
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
1!
199
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
200

201
    pTask->chkInfo.pActiveInfo->activeId = 123111;
1✔
202
    pTask->chkInfo.pActiveInfo->transId = 4561111;
1✔
203

204
    streamTaskSetStatusReady(pTask);
1!
205
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
1!
206
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
207

208
    int32_t downstreamNodeId = 1;
1✔
209
    int64_t sendingCheckpointId = 123;
1!
210
    TSKEY ts = taosGetTimestampMs();
1✔
211

212
    STaskTriggerSendInfo triggerInfo;
213
    triggerInfo.sendTs = ts;
1✔
214
    triggerInfo.recved = false;
1✔
215
    triggerInfo.nodeId = downstreamNodeId;
1✔
216

217
    taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo);
1!
218

219
    pTask->chkInfo.pActiveInfo->dispatchTrigger = true;
1✔
220
    bool result = streamTaskAlreadySendTrigger(pTask, downstreamNodeId);
1!
221

222
    EXPECT_TRUE(result) << "The trigger message should have been sent to the downstream node";
1!
223

224
    tFreeStreamTask(pTask);
1!
225
    taosArrayDestroy(array);
1!
226
}
227

228
int32_t sendReq1111(const SEpSet *pEpSet, SRpcMsg *pMsg) {
2✔
229
  return TSDB_CODE_SUCCESS;
2✔
230
}
231

232
TEST(ChkptTriggerRecvMonitorHelperTest, chkptTriggerRecvMonitorHelper) {
4✔
233
    SStreamTask* pTask = NULL;
1✔
234
    int64_t uid = 2222222222222;
1✔
235
    SArray* array = taosArrayInit(4, POINTER_BYTES);
1!
236
    int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, STREAM_NORMAL_TASK, 0, 0, array,
1!
237
                                       false, 1, false, &pTask);
1✔
238
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
239

240
    initTaskLock(pTask);
1!
241

242
    const char *path = "/tmp/backend5/stream";
1✔
243
    code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta);
1!
244
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
245

246
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
1!
247
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
248

249
    pTask->chkInfo.pActiveInfo->activeId = 123111;
1✔
250
    pTask->chkInfo.pActiveInfo->chkptTriggerMsgTmr.launchChkptId = pTask->chkInfo.pActiveInfo->activeId;
1✔
251
    pTask->chkInfo.pActiveInfo->transId = 4561111;
1✔
252
    pTask->chkInfo.startTs = 11111;
1✔
253

254
    SStreamTask upTask;
255
    upTask = *pTask;
1✔
256
    streamTaskSetUpstreamInfo(pTask, &upTask);
1!
257
    
258

259
    streamTaskSetStatusReady(pTask);
1!
260
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
1!
261
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
262

263
    int32_t downstreamNodeId = 1;
1✔
264
    int64_t sendingCheckpointId = 123;
1!
265
    TSKEY ts = taosGetTimestampMs();
1✔
266

267
    STaskTriggerSendInfo triggerInfo;
268
    triggerInfo.sendTs = ts;
1✔
269
    triggerInfo.recved = false;
1✔
270
    triggerInfo.nodeId = downstreamNodeId;
1✔
271

272
    taosArrayPush(pTask->chkInfo.pActiveInfo->pDispatchTriggerList, &triggerInfo);
1!
273

274
    STaskCheckpointReadyInfo readyInfo;
275
    readyInfo.upstreamNodeId = 789111;
1✔
276
    void* pBuf = rpcMallocCont(sizeof(SMsgHead) + 1);
1!
277

278
    initRpcMsg(&readyInfo.msg, 0, pBuf, sizeof(SMsgHead) + 1);
1!
279
    taosArrayPush(pTask->chkInfo.pActiveInfo->pReadyMsgList, &readyInfo);
1!
280

281

282
    pTask->chkInfo.pActiveInfo->dispatchTrigger = true;
1✔
283

284
    SMsgCb msgCb = {0};
1✔
285
    msgCb.sendReqFp = sendReq1111;
1✔
286
    msgCb.mgmt = (SMgmtWrapper*)(&msgCb);  // hack
1✔
287
    tmsgSetDefault(&msgCb);
1!
288

289
    SArray* array1 = NULL;
1✔
290
    code = chkptTriggerRecvMonitorHelper(pTask, NULL, &array1);
1!
291
    EXPECT_EQ(code, TSDB_CODE_SUCCESS);
1!
292

293
    pTask->pMeta->fatalInfo.code = TSDB_CODE_SUCCESS;
1✔
294
    streamSetFatalError(pTask->pMeta, code, __func__, __LINE__);
1!
295

296
    pTask->pMeta->fatalInfo.code = TSDB_CODE_FAILED;
1✔
297
    streamSetFatalError(pTask->pMeta, code, __func__, __LINE__);
1!
298
    tFreeStreamTask(pTask);
1!
299
    taosArrayDestroy(array);
1!
300
    taosArrayDestroy(array1);
1!
301
}
302

303
TEST(StreamTaskSendCheckpointTriggerMsgTest, SendCheckpointTriggerMsgSuccessTest) {
4✔
304
    SStreamTask* pTask = NULL;
1✔
305
    int64_t uid = 2222222222222;
1✔
306
    SArray* array = taosArrayInit(4, POINTER_BYTES);
1!
307
    int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, STREAM_NORMAL_TASK, 0, 0, array,
1!
308
                                       false, 1, false, &pTask);
1✔
309
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
310

311
    initTaskLock(pTask);
1!
312

313
    const char *path = "/tmp/SendCheckpointTriggerMsgSuccessTest/stream";
1✔
314
    code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta);
1!
315
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
316

317
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
1!
318
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
319

320
    SRpcHandleInfo rpcInfo;
321

322
    int32_t ret = streamTaskSendCheckpointTriggerMsg(pTask, 123, 456, &rpcInfo, code);
1!
323

324
    EXPECT_EQ(ret, TSDB_CODE_SUCCESS);
1!
325
}
326

327
TEST(streamTaskBuildCheckpointTest, streamTaskBuildCheckpointFnTest) {
4✔
328
    SStreamTask* pTask = NULL;
1✔
329
    int64_t uid = 2222222222222;
1✔
330
    SArray* array = taosArrayInit(4, POINTER_BYTES);
1!
331
    int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, STREAM_NORMAL_TASK, 0, 0, array,
1!
332
                                       false, 1, false, &pTask);
1✔
333
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
334

335
    initTaskLock(pTask);
1!
336

337
    const char *path = "/tmp/streamTaskBuildCheckpoinTest/stream";
1✔
338
    code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta);
1!
339
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
340

341
    SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0);
1!
342
    ASSERT(pState != NULL);
1!
343

344
    pTask->pBackend = pState->pTdbState->pOwner->pBackend;
1✔
345

346
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
1!
347
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
348

349
    char a[] = "localhost";
1✔
350
    memcpy(tsSnodeAddress, a, sizeof(a));
1✔
351

352
    int32_t ret = streamTaskBuildCheckpoint(pTask);
1!
353

354
    EXPECT_NE(ret, TSDB_CODE_SUCCESS);
1!
355
}
356

357
int32_t s3GetObjectToFileTest(const char *object_name, const char *fileName) {
×
358
  return TSDB_CODE_SUCCESS;
×
359
}
360

361
TEST(sstreamTaskGetTriggerRecvStatusTest, streamTaskGetTriggerRecvStatusFnTest) {
4✔
362
    SStreamTask* pTask = NULL;
1✔
363
    int64_t uid = 2222222222222;
1✔
364
    SArray* array = taosArrayInit(4, POINTER_BYTES);
1!
365
    int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, STREAM_NORMAL_TASK, 0, 0, array,
1!
366
                                       false, 1, false, &pTask);
1✔
367
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
368

369
    initTaskLock(pTask);
1!
370

371
    SStreamTask upTask;
372
    upTask = *pTask;
1✔
373
    code = streamTaskSetUpstreamInfo(pTask, &upTask);
1!
374
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
375

376
    code = streamTaskSetUpstreamInfo(pTask, &upTask);
1!
377
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
378

379
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
1!
380
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
381

382
    int32_t recv = 0;
1✔
383
    int32_t total = 0;
1✔
384
    pTask->info.taskLevel = TASK_LEVEL__SOURCE;
1✔
385
    streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
1!
386
    EXPECT_EQ(total, 1);
1!
387

388
    pTask->info.taskLevel = TASK_LEVEL__AGG;
1✔
389
    streamTaskGetTriggerRecvStatus(pTask, &recv, &total);
1!
390
    EXPECT_EQ(total, 2);
1!
391

392
    code = streamTaskDownloadCheckpointData("123", "/root/download", 123);
1!
393
    EXPECT_NE(code, TSDB_CODE_SUCCESS);
1!
394

395
    tcsInit();
1!
396
    extern int8_t tsS3EpNum;
397
    tsS3EpNum = 1;
1✔
398

399
    code = uploadCheckpointToS3("123", "/tmp/backend5/stream/stream");
1!
400
    EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE);
1!
401

402
    code = downloadCheckpointByNameS3("123", "/root/download", "");
1!
403
    EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE);
1!
404

405
    code = deleteCheckpointRemoteBackup("aaa123", "bbb");
1!
406
    EXPECT_NE(code, TSDB_CODE_OUT_OF_RANGE);
1!
407
}
408

409
TEST(doCheckBeforeHandleChkptTriggerTest, doCheckBeforeHandleChkptTriggerFnTest) {
4✔
410
    SStreamTask* pTask = NULL;
1✔
411
    int64_t uid = 2222222222222;
1✔
412
    SArray* array = taosArrayInit(4, POINTER_BYTES);
1!
413
    int32_t code = tNewStreamTask(uid, TASK_LEVEL__SINK, NULL, STREAM_NORMAL_TASK, 0, 0, array,
1!
414
                                       false, 1, false, &pTask);
1✔
415
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
416

417
    initTaskLock(pTask);
1!
418

419
    const char *path = "/tmp/doCheckBeforeHandleChkptTriggerTest/stream";
1✔
420
    code = streamMetaOpen((path), NULL, NULL, NULL, 0, 0, NULL, &pTask->pMeta);
1!
421
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
422

423
    SStreamState *pState = streamStateOpen((char *)path, pTask, 0, 0);
1!
424
    ASSERT(pState != NULL);
1!
425

426
    pTask->pBackend = pState->pTdbState->pOwner->pBackend;
1✔
427

428
    code = streamTaskCreateActiveChkptInfo(&pTask->chkInfo.pActiveInfo);
1!
429
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
430

431
    pTask->chkInfo.checkpointId = 123;
1✔
432
    code = doCheckBeforeHandleChkptTrigger(pTask, 100, NULL, 0);
1!
433
    ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
1!
434

435
    pTask->chkInfo.pActiveInfo->failedId = 223;
1✔
436
    code = doCheckBeforeHandleChkptTrigger(pTask, 200, NULL, 0);
1!
437
    ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
1!
438

439
    SStreamDataBlock block;
440
    block.srcTaskId = 456;
1✔
441
    SStreamTask upTask;
442
    upTask = *pTask;
1✔
443
    upTask.id.taskId = 456;
1✔
444
    streamTaskSetUpstreamInfo(pTask, &upTask);
1!
445
    pTask->chkInfo.pActiveInfo->failedId = 23;
1✔
446
    code = doCheckBeforeHandleChkptTrigger(pTask, 123, &block, 0);
1!
447
    ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
1!
448

449
    streamTaskSetUpstreamInfo(pTask, &upTask);
1!
450
    streamTaskSetStatusReady(pTask);
1!
451
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
1!
452
    ASSERT_EQ(code, TSDB_CODE_SUCCESS);
1!
453

454
    pTask->chkInfo.pActiveInfo->activeId = 223;
1✔
455

456
    STaskCheckpointReadyInfo readyInfo;
457
    readyInfo.upstreamTaskId = 4567;
1✔
458
    block.srcTaskId = 4567;
1✔
459
    void* pBuf = rpcMallocCont(sizeof(SMsgHead) + 1);
1!
460

461
    initRpcMsg(&readyInfo.msg, 0, pBuf, sizeof(SMsgHead) + 1);
1!
462
    taosArrayPush(pTask->chkInfo.pActiveInfo->pReadyMsgList, &readyInfo);
1!
463
    code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0);
1!
464
    ASSERT_NE(code, TSDB_CODE_SUCCESS);
1!
465

466
    pTask->chkInfo.pActiveInfo->allUpstreamTriggerRecv = 1;
1✔
467
    code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0);
1!
468
    ASSERT_NE(code, TSDB_CODE_SUCCESS);
1!
469

470
    pTask->chkInfo.pActiveInfo->activeId = 1111;
1✔
471
    code = doCheckBeforeHandleChkptTrigger(pTask, 223, &block, 0);
1!
472
    ASSERT_EQ(code, TSDB_CODE_STREAM_INVLD_CHKPT);
1!
473
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc