• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3831

02 Apr 2025 01:14AM UTC coverage: 34.081% (-0.02%) from 34.097%
#3831

push

travis-ci

happyguoxy
test:alter gcda dir

148596 of 599532 branches covered (24.79%)

Branch coverage included in aggregate %.

222550 of 489473 relevant lines covered (45.47%)

1589752.67 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

57.07
/source/libs/stream/src/streamCheckpoint.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "rsync.h"
17
#include "streamBackendRocksdb.h"
18
#include "streamInt.h"
19
#include "tcs.h"
20

21
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
22
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId);
23
static int32_t deleteRemoteCheckpointBackup(const char* pTaskId, int64_t checkpointId);
24

25
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
26
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
27
                                          int32_t transId, int32_t srcTaskId);
28
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
29
static void    checkpointTriggerMonitorFn(void* param, void* tmrId);
30

31
int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
15✔
32
                                int32_t srcTaskId, SStreamDataBlock** pRes) {
33
  SStreamDataBlock* pChkpoint = NULL;
15✔
34
  int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pChkpoint);
15✔
35
  if (code) {
15!
36
    return code;
×
37
  }
38

39
  pChkpoint->type = checkpointType;
15✔
40
  if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) {
15!
41
    pChkpoint->srcTaskId = srcTaskId;
×
42
    if (srcTaskId <= 0) {
×
43
      stDebug("s-task:%s invalid src task id:%d for creating checkpoint trigger block", pTask->id.idStr, srcTaskId);
×
44
      return TSDB_CODE_INVALID_PARA;
×
45
    }
46
  }
47

48
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
15!
49
  if (pBlock == NULL) {
15!
50
    taosFreeQitem(pChkpoint);
×
51
    return terrno;
×
52
  }
53

54
  pBlock->info.type = STREAM_CHECKPOINT;
15✔
55
  pBlock->info.version = checkpointId;
15✔
56
  pBlock->info.window.ekey = pBlock->info.window.skey = transId;  // NOTE: set the transId
15✔
57
  pBlock->info.rows = 1;
15✔
58
  pBlock->info.childId = pTask->info.selfChildId;
15✔
59

60
  pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
15✔
61
  if (pChkpoint->blocks == NULL) {
15!
62
    taosMemoryFree(pBlock);
×
63
    taosFreeQitem(pChkpoint);
×
64
    return terrno;
×
65
  }
66

67
  void* p = taosArrayPush(pChkpoint->blocks, pBlock);
15✔
68
  if (p == NULL) {
15!
69
    taosArrayDestroy(pChkpoint->blocks);
×
70
    taosMemoryFree(pBlock);
×
71
    taosFreeQitem(pChkpoint);
×
72
    return terrno;
×
73
  }
74

75
  *pRes = pChkpoint;
15✔
76

77
  taosMemoryFree(pBlock);
15!
78
  return TSDB_CODE_SUCCESS;
15✔
79
}
80

81
// this message must be put into inputq successfully, continue retrying until it succeeds
82
// todo must be success
83
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
15✔
84
                                   int32_t srcTaskId) {
85
  SStreamDataBlock* pCheckpoint = NULL;
15✔
86
  int32_t code = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId, srcTaskId, &pCheckpoint);
15✔
87
  if (code != TSDB_CODE_SUCCESS) {
15!
88
    return code;
×
89
  }
90

91
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) {
15!
92
    return TSDB_CODE_OUT_OF_MEMORY;
×
93
  }
94

95
  return streamTrySchedExec(pTask, true);
15✔
96
}
97

98
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
6✔
99
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
6!
100
    return TSDB_CODE_INVALID_MSG;
×
101
  }
102

103
  // todo this status may not be set here.
104
  // 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
105
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
6✔
106
  if (code != TSDB_CODE_SUCCESS) {
6!
107
    stError("s-task:%s failed to handle gen-checkpoint event, failed to start checkpoint procedure", pTask->id.idStr);
×
108
    return code;
×
109
  }
110

111
  pTask->chkInfo.pActiveInfo->transId = pReq->transId;
6✔
112
  pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId;
6✔
113
  pTask->chkInfo.startTs = taosGetTimestampMs();
6✔
114
  pTask->execInfo.checkpoint += 1;
6✔
115

116
  // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
117
  // and this is the last item in the inputQ.
118
  return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId, -1);
6✔
119
}
120

121
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) {
6✔
122
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
6✔
123
  bool                   unQualified = false;
6✔
124
  const char*            id = pTask->id.idStr;
6✔
125

126
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
6!
127
    stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", id);
×
128
    return TSDB_CODE_INVALID_MSG;
×
129
  }
130

131
  if (pRsp->rspCode != TSDB_CODE_SUCCESS) {
6✔
132
    stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", id, pRsp->upstreamTaskId,
3!
133
            tstrerror(pRsp->rspCode));
134
    return TSDB_CODE_SUCCESS;
3✔
135
  }
136

137
  streamMutexLock(&pTask->lock);
3✔
138
  SStreamTaskState status = streamTaskGetStatus(pTask);
3✔
139
  streamMutexUnlock(&pTask->lock);
3✔
140

141
  if (status.state != TASK_STATUS__CK) {
3!
142
    stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name);
×
143
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
144
  }
145

146
  streamMutexLock(&pInfo->lock);
3✔
147
  unQualified = (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId);
3!
148
  streamMutexUnlock(&pInfo->lock);
3✔
149

150
  if (unQualified) {
3!
151
    stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name);
3!
152
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
3✔
153
  }
154

155
  // NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions.
156
  int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
×
157
                                            pRsp->upstreamTaskId);
158
  return code;
×
159
}
160

161
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
3✔
162
                                           SRpcHandleInfo* pRpcInfo, int32_t code) {
163
  int32_t  ret = 0;
3✔
164
  int32_t  tlen = 0;
3✔
165
  void*    buf = NULL;
3✔
166
  SEncoder encoder;
167

168
  SCheckpointTriggerRsp req = {.streamId = pTask->id.streamId,
3✔
169
                               .upstreamTaskId = pTask->id.taskId,
3✔
170
                               .taskId = dstTaskId,
171
                               .rspCode = code};
172

173
  if (code == TSDB_CODE_SUCCESS) {
3!
174
    req.checkpointId = pTask->chkInfo.pActiveInfo->activeId;
3✔
175
    req.transId = pTask->chkInfo.pActiveInfo->transId;
3✔
176
  } else {
177
    req.checkpointId = -1;
×
178
    req.transId = -1;
×
179
  }
180

181
  tEncodeSize(tEncodeCheckpointTriggerRsp, &req, tlen, ret);
3!
182
  if (ret < 0) {
3!
183
    stError("s-task:%s encode checkpoint-trigger rsp msg failed, code:%s", pTask->id.idStr, tstrerror(code));
×
184
    return ret;
×
185
  }
186

187
  buf = rpcMallocCont(tlen + sizeof(SMsgHead));
3✔
188
  if (buf == NULL) {
3!
189
    stError("s-task:%s malloc chkpt-trigger rsp failed for task:0x%x, since out of memory", pTask->id.idStr, dstTaskId);
×
190
    return terrno;
×
191
  }
192

193
  ((SMsgHead*)buf)->vgId = htonl(downstreamNodeId);
3✔
194
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
3✔
195

196
  tEncoderInit(&encoder, abuf, tlen);
3✔
197
  if ((ret = tEncodeCheckpointTriggerRsp(&encoder, &req)) < 0) {
3!
198
    rpcFreeCont(buf);
×
199
    tEncoderClear(&encoder);
×
200
    stError("encode checkpoint-trigger rsp failed, code:%s", tstrerror(code));
×
201
    return ret;
×
202
  }
203
  tEncoderClear(&encoder);
3✔
204

205
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = tlen + sizeof(SMsgHead), .info = *pRpcInfo};
3✔
206
  tmsgSendRsp(&rspMsg);
3✔
207

208
  return ret;
3✔
209
}
210

211
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
9✔
212
  pBlock->srcTaskId = pTask->id.taskId;
9✔
213
  pBlock->srcVgId = pTask->pMeta->vgId;
9✔
214

215
  if (pTask->chkInfo.pActiveInfo->dispatchTrigger == true) {
9!
216
    stError("s-task:%s already dispatch checkpoint-trigger, not dispatch again", pTask->id.idStr);
×
217
    return 0;
×
218
  }
219

220
  int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
9✔
221
  if (code == 0) {
9!
222
    code = streamDispatchStreamBlock(pTask);
9✔
223
  } else {
224
    stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
×
225
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
226
  }
227

228
  return code;
9✔
229
}
230

231
int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
36✔
232
                                        int32_t transId) {
233
  int32_t     code = 0;
36✔
234
  int32_t     vgId = pTask->pMeta->vgId;
36✔
235
  int32_t     taskLevel = pTask->info.taskLevel;
36✔
236
  const char* id = pTask->id.idStr;
36✔
237

238
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
36✔
239
  if (pTask->chkInfo.checkpointId > checkpointId) {
36✔
240
    stError("s-task:%s vgId:%d current checkpointId:%" PRId64
3!
241
            " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
242
            id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
243
    return TSDB_CODE_STREAM_INVLD_CHKPT;
3✔
244
  }
245

246
  if (pActiveInfo->failedId >= checkpointId) {
33✔
247
    stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
3!
248
            " discard the checkpoint-trigger block",
249
            id, vgId, checkpointId, transId, pActiveInfo->failedId);
250
    return TSDB_CODE_STREAM_INVLD_CHKPT;
3✔
251
  }
252

253
  if (pTask->chkInfo.checkpointId == checkpointId) {
30✔
254
    {  // send checkpoint-ready msg to upstream
255
      SRpcMsg                msg = {0};
3✔
256
      SStreamUpstreamEpInfo* pInfo = NULL;
3✔
257
      streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo);
3✔
258
      if (pInfo == NULL) {
3!
259
        return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
260
      }
261

262
      code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
3✔
263
      if (code == TSDB_CODE_SUCCESS) {
3!
264
        code = tmsgSendReq(&pInfo->epSet, &msg);
3✔
265
        if (code) {
3!
266
          stError("s-task:%s vgId:%d failed send chkpt-ready msg to upstream, code:%s", id, vgId, tstrerror(code));
×
267
        }
268
      }
269
    }
270

271
    stWarn(
3!
272
        "s-task:%s vgId:%d recv already finished checkpoint-trigger, send checkpoint-ready to upstream:0x%x to resume "
273
        "the interrupted checkpoint",
274
        id, vgId, pBlock->srcTaskId);
275

276
    return TSDB_CODE_STREAM_INVLD_CHKPT;
3✔
277
  }
278

279
  if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
27✔
280
    if (pActiveInfo->activeId != checkpointId) {
18✔
281
      stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
3!
282
              " discard",
283
              id, vgId, pActiveInfo->activeId, checkpointId);
284
      return TSDB_CODE_STREAM_INVLD_CHKPT;
3✔
285
    } else {  // checkpointId == pActiveInfo->activeId
286
      if (pActiveInfo->allUpstreamTriggerRecv == 1) {
15✔
287
        stDebug(
3!
288
            "s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
289
            "checkpointId:%" PRId64 " transId:%d",
290
            id, vgId, checkpointId, transId);
291
        return TSDB_CODE_STREAM_INVLD_CHKPT;
3✔
292
      }
293

294
      if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
12✔
295
        //  check if already recv or not, and duplicated checkpoint-trigger msg recv, discard it
296
        for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
9✔
297
          STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
6✔
298
          if (p == NULL) {
6!
299
            return TSDB_CODE_INVALID_PARA;
×
300
          }
301

302
          if (p->upstreamTaskId == pBlock->srcTaskId) {
6✔
303
            stWarn("s-task:%s repeatly recv checkpoint-trigger msg from task:0x%x vgId:%d, checkpointId:%" PRId64
3!
304
                   ", prev recvTs:%" PRId64 " discard",
305
                   pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
306
            return TSDB_CODE_STREAM_INVLD_CHKPT;
3✔
307
          }
308
        }
309
      }
310
    }
311
  }
312

313
  return TSDB_CODE_SUCCESS;
18✔
314
}
315

316
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
18✔
317
  int64_t                checkpointId = 0;
18✔
318
  int32_t                transId = 0;
18✔
319
  const char*            id = pTask->id.idStr;
18✔
320
  int32_t                code = TSDB_CODE_SUCCESS;
18✔
321
  int32_t                vgId = pTask->pMeta->vgId;
18✔
322
  int32_t                taskLevel = pTask->info.taskLevel;
18✔
323
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
18✔
324

325
  SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
18✔
326
  if (pDataBlock == NULL) {
18!
327
    return TSDB_CODE_INVALID_PARA;
×
328
  }
329

330
  checkpointId = pDataBlock->info.version;
18✔
331
  transId = pDataBlock->info.window.skey;
18✔
332

333
  streamMutexLock(&pTask->lock);
18✔
334
  code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
18✔
335
  streamMutexUnlock(&pTask->lock);
18✔
336
  if (code) {
18!
337
    if (taskLevel != TASK_LEVEL__SOURCE) { // the checkpoint-trigger is discard, open the inputQ for upstream tasks
×
338
      streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
×
339
    }
340
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
341
    return code;
×
342
  }
343

344
  stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64
18!
345
          ", transId:%d current active checkpointId:%" PRId64,
346
          id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId);
347

348
  // set task status
349
  if (streamTaskGetStatus(pTask).state != TASK_STATUS__CK) {
18✔
350
    pActiveInfo->activeId = checkpointId;
9✔
351
    pActiveInfo->transId = transId;
9✔
352

353
    if (pTask->chkInfo.startTs == 0) {
9!
354
      pTask->chkInfo.startTs = taosGetTimestampMs();
9✔
355
      pTask->execInfo.checkpoint += 1;
9✔
356
    }
357

358
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
9✔
359
    if (code != TSDB_CODE_SUCCESS) {
9!
360
      stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
×
361
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
362
      return code;
×
363
    }
364

365
    // if previous launched timer not started yet, not start a new timer
366
    // todo: fix this bug: previous set checkpoint-trigger check tmr is running, while we happen to try to launch
367
    //  a new checkpoint-trigger timer right now.
368
    //  And if we don't start a new timer, and the lost of checkpoint-trigger message may cause the whole checkpoint
369
    //  procedure to be stucked.
370
    SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
9✔
371
    int8_t          old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1);
9✔
372
    if (old == 0) {
9!
373
      stDebug("s-task:%s start checkpoint-trigger monitor in 10s", pTask->id.idStr);
9!
374

375
      int64_t* pTaskRefId = NULL;
9✔
376
      code = streamTaskAllocRefId(pTask, &pTaskRefId);
9✔
377
      if (code == 0) {
9!
378
        streamTmrStart(checkpointTriggerMonitorFn, 200, pTaskRefId, streamTimer, &pTmrInfo->tmrHandle, vgId,
9✔
379
                       "trigger-recv-monitor");
380
        pTmrInfo->launchChkptId = pActiveInfo->activeId;
9✔
381
      }
382
    } else {  // already launched, do nothing
383
      stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr);
×
384
    }
385
  }
386

387
#if 0
388
  taosMsleep(20*1000);
389
#endif
390

391
  if (taskLevel == TASK_LEVEL__SOURCE) {
18✔
392
    int8_t type = pTask->outputInfo.type;
6✔
393
    pActiveInfo->allUpstreamTriggerRecv = 1;
6✔
394

395
    // We need to transfer state here, before dispatching checkpoint-trigger to downstream tasks.
396
    // The transfer of state may generate new data that need to dispatch to downstream tasks,
397
    // Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed
398
    // before the next checkpoint.
399
    code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
6✔
400
    if (code) {
6!
401
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
402
      return code;
×
403
    }
404

405
#if 0
406
    chkptFailedByRetrieveReqToSource(pTask, checkpointId);
407
#endif
408

409
    if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH ||
6!
410
        type == TASK_OUTPUT__VTABLE_MAP) {
411
      stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
6!
412
      code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);  // todo handle this failure
6✔
413
    } else {  // only one task exists, no need to dispatch downstream info
414
      code =
415
          appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, -1);
×
416
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
417
    }
418
  } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
12!
419
    // todo: handle this
420
    // update the child Id for downstream tasks
421
    code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
12✔
422

423
    // there are still some upstream tasks not send checkpoint request, do nothing and wait for then
424
    if (pActiveInfo->allUpstreamTriggerRecv != 1) {
12✔
425
      streamFreeQitem((SStreamQueueItem*)pBlock);
3✔
426
      return code;
3✔
427
    }
428

429
    int32_t num = streamTaskGetNumOfUpstream(pTask);
9✔
430
    if (taskLevel == TASK_LEVEL__SINK) {
9✔
431
      stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num);
6!
432
      streamFreeQitem((SStreamQueueItem*)pBlock);
6✔
433
      code = streamTaskBuildCheckpoint(pTask);  // todo: not handle error yet
6✔
434
    } else {                                    // source & agg tasks need to forward the checkpoint msg downwards
435
      stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
3!
436
      code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
3✔
437
      if (code) {
3!
438
        return code;
×
439
      }
440

441
      // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by
442
      // this task already. And then, dispatch check point msg to all downstream tasks
443
      code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);
3✔
444
    }
445
  }
446

447
  return code;
15✔
448
}
449

450
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
451
static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream,
12✔
452
                                          int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId,
453
                                          const char* id, int32_t* pNotReady, int32_t* pTransId, bool* alreadyRecv) {
454
  *alreadyRecv = false;
12✔
455
  int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
12✔
456
  for (int32_t i = 0; i < size; ++i) {
15✔
457
    STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
3✔
458
    if (p == NULL) {
3!
459
      return TSDB_CODE_INVALID_PARA;
×
460
    }
461

462
    if (p->downstreamTaskId == downstreamTaskId) {
3!
463
      (*alreadyRecv) = true;
×
464
      break;
×
465
    }
466
  }
467

468
  if (*alreadyRecv) {
12!
469
    stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id,
×
470
            downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)),
471
            numOfDownstream);
472
  } else {
473
    STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(),
12✔
474
                                     .downstreamTaskId = downstreamTaskId,
475
                                     .checkpointId = pInfo->activeId,
12✔
476
                                     .transId = pInfo->transId,
12✔
477
                                     .streamId = streamId,
478
                                     .downstreamNodeId = downstreamNodeId};
479
    void*                    p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info);
12✔
480
    if (p == NULL) {
12!
481
      stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno));
×
482
      return terrno;
×
483
    }
484
  }
485

486
  *pNotReady = numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
12✔
487
  *pTransId = pInfo->transId;
12✔
488
  return 0;
12✔
489
}
490

491
/**
492
 * All down stream tasks have successfully completed the check point task.
493
 * Current stream task is allowed to start to do checkpoint things in ASYNC model.
494
 */
495
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId,
12✔
496
                                        int32_t downstreamTaskId) {
497
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
12✔
498

499
  const char* id = pTask->id.idStr;
12✔
500
  int32_t     total = streamTaskGetNumOfDownstream(pTask);
12✔
501
  int32_t     code = 0;
12✔
502
  int32_t     notReady = 0;
12✔
503
  int32_t     transId = 0;
12✔
504
  bool        alreadyHandled = false;
12✔
505

506
  // 1. not in checkpoint status now
507
  SStreamTaskState pStat = streamTaskGetStatus(pTask);
12✔
508
  if (pStat.state != TASK_STATUS__CK) {
12!
509
    stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId);
×
510
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
511
  }
512

513
  // 2. expired checkpoint-ready msg, invalid checkpoint-ready msg
514
  if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) {
12!
515
    stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64
×
516
            ") from task:0x%x, expired and discard",
517
            id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId);
518
    return TSDB_CODE_INVALID_MSG;
×
519
  }
520

521
  streamMutexLock(&pInfo->lock);
12✔
522
  code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, &notReady,
12✔
523
                                    &transId, &alreadyHandled);
524
  streamMutexUnlock(&pInfo->lock);
12✔
525

526
  if (alreadyHandled) {
12!
527
    stDebug("s-task:%s checkpoint-ready msg checkpointId:%" PRId64 " from task:0x%x already handled, not handle again",
×
528
            id, checkpointId, downstreamTaskId);
529
  } else {
530
    if ((notReady == 0) && (code == 0) && (!alreadyHandled)) {
12!
531
      stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
9!
532
      code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
9✔
533
    }
534
  }
535

536
  return code;
12✔
537
}
538

539
int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) {
12✔
540
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
12✔
541
  int64_t                now = taosGetTimestampMs();
12✔
542
  int32_t                numOfConfirmed = 0;
12✔
543

544
  streamMutexLock(&pInfo->lock);
12✔
545
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
15!
546
    STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
15✔
547
    if (pReadyInfo == NULL) {
15!
548
      stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
×
549
              pTask->id.idStr, i);
550
      continue;
×
551
    }
552

553
    if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) {
15!
554
      pReadyInfo->sendCompleted = 1;
12✔
555
      stDebug("s-task:%s send checkpoint-ready msg to upstream:0x%x confirmed, checkpointId:%" PRId64 " ts:%" PRId64,
12!
556
              pTask->id.idStr, upstreamTaskId, checkpointId, now);
557
      break;
12✔
558
    }
559
  }
560

561
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
30✔
562
    STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
18✔
563
    if (pReadyInfo == NULL) {
18!
564
      stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
×
565
              pTask->id.idStr, i);
566
      continue;
×
567
    }
568

569
    if (pReadyInfo->sendCompleted == 1) {
18✔
570
      numOfConfirmed += 1;
15✔
571
    }
572
  }
573

574
  stDebug("s-task:%s send checkpoint-ready msg to %d upstream confirmed, checkpointId:%" PRId64, pTask->id.idStr,
12!
575
          numOfConfirmed, checkpointId);
576

577
  streamMutexUnlock(&pInfo->lock);
12✔
578
  return TSDB_CODE_SUCCESS;
12✔
579
}
580

581
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
15✔
582
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
15✔
583

584
  pTask->chkInfo.startTs = 0;             // clear the recorded start time
15✔
585
  streamTaskOpenAllUpstreamInput(pTask);  // open inputQ for all upstream tasks
15✔
586

587
  streamMutexLock(&pInfo->lock);
15✔
588
  streamTaskClearActiveInfo(pInfo);
15✔
589
  if (clearChkpReadyMsg) {
15!
590
    streamClearChkptReadyMsg(pInfo);
15✔
591
  }
592
  streamMutexUnlock(&pInfo->lock);
15✔
593

594
  stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", latest checkpointId:%" PRId64,
15!
595
          pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId);
596
}
15✔
597

598
// The checkpointInfo can be updated in the following three cases:
599
// 1. follower tasks; 2. leader task with status of TASK_STATUS__CK; 3. restore not completed
600
static int32_t doUpdateCheckpointInfoCheck(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq,
15✔
601
                                           bool* pContinue) {
602
  SStreamMeta*     pMeta = pTask->pMeta;
15✔
603
  int32_t          vgId = pMeta->vgId;
15✔
604
  int32_t          code = 0;
15✔
605
  const char*      id = pTask->id.idStr;
15✔
606
  SCheckpointInfo* pInfo = &pTask->chkInfo;
15✔
607

608
  *pContinue = true;
15✔
609

610
  // not update the checkpoint info if the checkpointId is less than the failed checkpointId
611
  if (pReq->checkpointId < pInfo->pActiveInfo->failedId) {
15!
612
    stWarn("s-task:%s vgId:%d not update the checkpoint-info, since update checkpointId:%" PRId64
×
613
           " is less than the failed checkpointId:%" PRId64 ", discard",
614
           id, vgId, pReq->checkpointId, pInfo->pActiveInfo->failedId);
615

616
    *pContinue = false;
×
617
    return TSDB_CODE_SUCCESS;
×
618
  }
619

620
  // it's an expired checkpointInfo update msg, we still try to drop the required drop fill-history task.
621
  if (pReq->checkpointId <= pInfo->checkpointId) {
15!
622
    stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64
×
623
            " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored",
624
            id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer,
625
            pReq->transId);
626

627
    { // destroy the related fill-history tasks
628
      if (pReq->dropRelHTask) {
×
629
        if (pTask->info.trigger != STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
×
630
          code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
×
631

632
          int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
633
          stDebug("s-task:%s vgId:%d related fill-history task:0x%" PRIx64
×
634
                  " dropped in update checkpointInfo, remain tasks:%d",
635
                  id, vgId, pReq->hTaskId, numOfTasks);
636

637
          // todo: task may not exist, commit anyway, optimize this later
638
          code = streamMetaCommit(pMeta);
×
639
        }
640
      }
641
    }
642

643
    *pContinue = false;
×
644
    // always return true
645
    return TSDB_CODE_SUCCESS;
×
646
  }
647

648
  SStreamTaskState status = streamTaskGetStatus(pTask);
15✔
649

650
  if (!restored) {  // during restore procedure, do update checkpoint-info
15!
651
    stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
×
652
            " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
653
            id, vgId, status.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
654
            pInfo->checkpointTime, pReq->checkpointTs);
655
  } else {  // not in restore status, must be in checkpoint status
656
    if (((status.state == TASK_STATUS__CK) && (pMeta->role == NODE_ROLE_LEADER)) ||
15!
657
        (pMeta->role == NODE_ROLE_FOLLOWER)) {
×
658
      stDebug("s-task:%s vgId:%d status:%s role:%d start to update the checkpoint-info, checkpointId:%" PRId64
15!
659
              "->%" PRId64 " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
660
              id, vgId, status.name, pMeta->role, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
661
              pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs);
662
    } else {
663
      stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
×
664
              " checkpointVer:%" PRId64 "->%" PRId64,
665
              id, vgId, status.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
666
              pReq->checkpointVer);
667
    }
668
  }
669

670
  bool valid = (pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
30!
671
                pInfo->processedVer <= pReq->checkpointVer);
15!
672

673
  if (!valid) {
15!
674
    // invalid update checkpoint info for leader, since the processedVer is greater than the checkpointVer
675
    // It is possible for follower tasks that the processedVer is greater than the checkpointVer, and the processed info
676
    // in follower tasks will be discarded, since the leader/follower switch happens before the checkpoint of the
677
    // processedVer being generated.
678
    if (pMeta->role == NODE_ROLE_LEADER) {
×
679

680
      stFatal("s-task:%s checkpointId update info recv, current checkpointId:%" PRId64 " checkpointVer:%" PRId64
×
681
              " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64 " discard it",
682
              id, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId,
683
              pReq->checkpointVer);
684

685
      *pContinue = false;
×
686
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
687
    } else {
688
      stInfo("s-task:%s vgId:%d follower recv checkpointId update info, current checkpointId:%" PRId64
×
689
             " checkpointVer:%" PRId64 " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64,
690
             id, pMeta->vgId, pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId,
691
             pReq->checkpointVer);
692
    }
693
  }
694

695
  return TSDB_CODE_SUCCESS;
15✔
696
}
697

698
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
15✔
699
  SStreamMeta*     pMeta = pTask->pMeta;
15✔
700
  int32_t          vgId = pMeta->vgId;
15✔
701
  int32_t          code = 0;
15✔
702
  const char*      id = pTask->id.idStr;
15✔
703
  SCheckpointInfo* pInfo = &pTask->chkInfo;
15✔
704
  bool             continueUpdate = true;
15✔
705

706
  streamMutexLock(&pTask->lock);
15✔
707
  code = doUpdateCheckpointInfoCheck(pTask, restored, pReq, &continueUpdate);
15✔
708

709
  if (!continueUpdate) {
15!
710
    streamMutexUnlock(&pTask->lock);
×
711
    return code;
×
712
  }
713

714
  SStreamTaskState pStatus = streamTaskGetStatus(pTask);
15✔
715

716
  // update only it is in checkpoint status, or during restore procedure.
717
  if ((pStatus.state == TASK_STATUS__CK) || (!restored) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
15!
718
    pInfo->checkpointId = pReq->checkpointId;
15✔
719
    pInfo->checkpointVer = pReq->checkpointVer;
15✔
720
    pInfo->checkpointTime = pReq->checkpointTs;
15✔
721

722
    if (restored && (pMeta->role == NODE_ROLE_LEADER)) {
15!
723
      code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
15✔
724
    }
725
  }
726

727
  streamTaskClearCheckInfo(pTask, true);
15✔
728

729
  if (pReq->dropRelHTask) {
15!
730
    if (pTask->info.trigger != STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
15!
731
      stInfo("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
15!
732
              pReq->taskId, vgId, pReq->hTaskId);
733
      CLEAR_RELATED_FILLHISTORY_TASK(pTask);
15✔
734
    } else {
735
      stInfo("s-task:0x%x vgId:%d update the related fill-history task:0x%" PRIx64" to be recalculate task",
×
736
             pReq->taskId, vgId, pReq->hTaskId);
737
    }
738
  }
739

740
  stDebug("s-task:0x%x set the persistent status attr to be ready, prev:%s, status in sm:%s", pReq->taskId,
15!
741
          streamTaskGetStatusStr(pTask->status.taskStatus), streamTaskGetStatus(pTask).name);
742

743
  pTask->status.taskStatus = TASK_STATUS__READY;
15✔
744

745
  code = streamMetaSaveTaskInMeta(pMeta, pTask);
15✔
746
  streamMutexUnlock(&pTask->lock);
15✔
747

748
  if (code != TSDB_CODE_SUCCESS) {
15!
749
    stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
×
750
            vgId, pReq->checkpointId, tstrerror(code));
751
    return TSDB_CODE_SUCCESS;
×
752
  }
753

754
  // drop task should not in the meta-lock, and drop the related fill-history task now
755
  if (pReq->dropRelHTask) {
15!
756
    if (pTask->info.trigger != STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
15!
757
      code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
15✔
758
      int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
15✔
759
      stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
15!
760
              (int32_t)pReq->hTaskId, numOfTasks);
761
    } else {
762
      STaskId hTaskId = {.streamId = pReq->hStreamId, .taskId = pReq->hTaskId};
×
763

764
      SStreamTask* pHTask = NULL;
×
765
      code = streamMetaAcquireTaskUnsafe(pMeta, &hTaskId, &pHTask);
×
766
      if (code == 0 && pHTask != NULL) {
×
767
        stInfo("s-task:0x%x fill-history updated to recalculate task, reset step2Start ts, stream task:0x%x",
×
768
               (int32_t)hTaskId.taskId, pReq->taskId);
769

770
        streamMutexLock(&pHTask->lock);
×
771

772
        pHTask->info.fillHistory = STREAM_RECALCUL_TASK;
×
773
        pHTask->execInfo.step2Start = 0; // clear the step2start timestamp
×
774

775
        SStreamTaskState status = streamTaskGetStatus(pHTask);
×
776
        if (status.state == TASK_STATUS__SCAN_HISTORY) {
×
777
          code = streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_SCANHIST_DONE);
×
778
        }
779

780
        if (pHTask->pBackend != NULL) {
×
781
          streamFreeTaskState(pHTask, TASK_STATUS__READY);
×
782
          pHTask->pBackend = NULL;
×
783
        }
784

785
        if (pHTask->exec.pExecutor != NULL) {
×
786
          qDestroyTask(pHTask->exec.pExecutor);
×
787
          pHTask->exec.pExecutor = NULL;
×
788
        }
789

790
        pMeta->expandTaskFn(pHTask);
×
791

792
        streamMutexUnlock(&pHTask->lock);
×
793

794
        code = streamMetaSaveTaskInMeta(pMeta, pHTask);
×
795
        streamMetaReleaseTask(pMeta, pHTask);
×
796
      }
797
    }
798
  }
799

800
  code = streamMetaCommit(pMeta);
15✔
801
  return TSDB_CODE_SUCCESS;
15✔
802
}
803

804
void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId) {
12✔
805
  struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
12✔
806

807
  if (failedId <= 0) {
12✔
808
    stWarn("s-task:%s failedId is 0, not update the failed checkpoint info, current failedId:%" PRId64
6!
809
           " activeId:%" PRId64,
810
           pTask->id.idStr, pInfo->failedId, pInfo->activeId);
811
  } else {
812
    if (failedId <= pInfo->failedId) {
6✔
813
      stDebug("s-task:%s failedId:%" PRId64 " not update to:%" PRId64, pTask->id.idStr, pInfo->failedId, failedId);
3!
814
    } else {
815
      stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d) activeId:%" PRId64
3!
816
              " prev failedId:%" PRId64,
817
              pTask->id.idStr, failedId, pInfo->transId, pInfo->activeId, pInfo->failedId);
818
      pInfo->failedId = failedId;
3✔
819
    }
820
  }
821
}
12✔
822

823
void streamTaskSetCheckpointFailed(SStreamTask* pTask) {
×
824
  streamMutexLock(&pTask->lock);
×
825
  ETaskStatus status = streamTaskGetStatus(pTask).state;
×
826
  if (status == TASK_STATUS__CK) {
×
827
    streamTaskSetFailedCheckpointId(pTask, pTask->chkInfo.pActiveInfo->activeId);
×
828
  }
829
  streamMutexUnlock(&pTask->lock);
×
830
}
×
831

832
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
3✔
833
  int32_t code = 0;
3✔
834
  int32_t cap = strlen(path) + 64;
3✔
835

836
  char* filePath = taosMemoryCalloc(1, cap);
3!
837
  if (filePath == NULL) {
3!
838
    return terrno;
×
839
  }
840

841
  int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP");
3✔
842
  if (nBytes <= 0 || nBytes >= cap) {
3!
843
    taosMemoryFree(filePath);
×
844
    return TSDB_CODE_OUT_OF_RANGE;
×
845
  }
846

847
  code = downloadCheckpointDataByName(id, "META", filePath);
3✔
848
  if (code != 0) {
3!
849
    stError("%s chkp failed to download meta file:%s", id, filePath);
×
850
    taosMemoryFree(filePath);
×
851
    return code;
×
852
  }
853

854
  code = remoteChkpGetDelFile(filePath, list);
3✔
855
  if (code != 0) {
3!
856
    stError("%s chkp failed to get to del:%s", id, filePath);
3!
857
    taosMemoryFree(filePath);
3!
858
  }
859
  return 0;
3✔
860
}
861

862
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type,
9✔
863
                             char** chkptDir) {
864
  int32_t      code = 0;
9✔
865
  int64_t      chkptSize = 0;
9✔
866
  SStreamMeta* pMeta = pTask->pMeta;
9✔
867
  const char*  idStr = pTask->id.idStr;
9✔
868
  int64_t      now = taosGetTimestampMs();
9✔
869

870
  SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
9✔
871
  if (toDelFiles == NULL) {
9!
872
    stError("s-task:%s failed to prepare array list during upload checkpoint, code:%s", pTask->id.idStr,
×
873
            tstrerror(terrno));
874
    return terrno;
×
875
  }
876

877
  if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, chkptDir, toDelFiles,
9✔
878
                                      pTask->id.idStr)) != 0) {
879
    stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(code));
3!
880
  }
881

882
  if (type == DATA_UPLOAD_S3) {
9✔
883
    if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, *chkptDir, toDelFiles)) != 0) {
3!
884
      stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId,
×
885
              tstrerror(code));
886
    }
887
  }
888

889
  if (code == TSDB_CODE_SUCCESS) {
9✔
890
    code = streamTaskUploadCheckpoint(idStr, *chkptDir, checkpointId);
6✔
891
    if (code == TSDB_CODE_SUCCESS) {
6✔
892
      stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
3!
893
    } else {
894
      stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, *chkptDir,
3!
895
              tstrerror(code));
896
    }
897
  }
898

899
  int32_t num = taosArrayGetSize(toDelFiles);
9✔
900
  if (code == TSDB_CODE_SUCCESS && num > 0) {
9!
901
    stDebug("s-task:%s remove redundant %d files", idStr, num);
×
902

903
    for (int i = 0; i < num; i++) {
×
904
      char* pName = taosArrayGetP(toDelFiles, i);
×
905
      code = deleteCheckpointRemoteBackup(idStr, pName);
×
906
      if (code != 0) {
×
907
        stDebug("s-task:%s failed to remove file: %s", idStr, pName);
×
908
        break;
×
909
      }
910
    }
911

912
    stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId);
×
913
  }
914

915
  taosArrayDestroyP(toDelFiles, NULL);
9✔
916
  double el = (taosGetTimestampMs() - now) / 1000.0;
9✔
917

918
  if (code == TSDB_CODE_SUCCESS) {
9✔
919
    code = taosGetDirSize(*chkptDir, &chkptSize);
3✔
920
    stDebug("s-task:%s complete upload checkpointId:%" PRId64
3!
921
            ", elapsed time:%.2fs, checkpointSize:%.2fKiB local dir:%s",
922
            idStr, checkpointId, el, SIZE_IN_KiB(chkptSize), *chkptDir);
923
  } else {
924
    stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " elapsed time:%.2fs, checkpointSize:%.2fKiB", idStr,
6!
925
            checkpointId, el, SIZE_IN_KiB(chkptSize));
926
  }
927

928
  return code;
9✔
929
}
930

931
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId, SArray* pList) {
18✔
932
  char*                   pChkptDir = NULL;
18✔
933
  ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
18✔
934

935
  if (type == DATA_UPLOAD_DISABLE) {
18✔
936
    stDebug("s-task:%s not config to backup checkpoint data at snode, checkpointId:%"PRId64, pTask->id.idStr, checkpointId);
15!
937
    return 0;
15✔
938
  }
939

940
  if (pTask == NULL || pTask->pBackend == NULL) {
3!
941
    return 0;
×
942
  }
943

944
  int64_t dbRefId = taskGetDBRef(pTask->pBackend);
3✔
945
  void*   pBackend = taskAcquireDb(dbRefId);
3✔
946
  if (pBackend == NULL) {
3!
947
    stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData",
×
948
            pTask->id.idStr);
949
    return -1;
×
950
  }
951

952
  int32_t code = uploadCheckpointData(pTask, checkpointId, taskGetDBRef(pTask->pBackend), type, &pChkptDir);
3✔
953
  taskReleaseDb(dbRefId);
3✔
954

955
  if (code != 0) {
3!
956
    if ((pChkptDir != NULL) && strlen(pChkptDir) != 0) {  // failed, drop the latest checkpoint data in local directory
3!
957
      stError("s-task:%s upload checkpoint data failed, generated checkpointId:%" PRId64
×
958
              " failed, remove local checkpoint data:%s",
959
              pTask->id.idStr, checkpointId, pChkptDir);
960
      taosRemoveDir(pChkptDir);
×
961
    }
962
  } else {
963
    // we only keep the latest five checkpoint data in the local directory,
964
    // the expired checkpoint data will be removed. Remove the remote backup checkpoint Data here.
965
    // check the existed number of checkpoint data, according to the directory name
966
    if (type == DATA_UPLOAD_RSYNC && taosArrayGetSize(pList) > 0) {
×
967
      for(int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
968
        int64_t* pCheckpointId = (int64_t*) taosArrayGet(pList, i);
×
969
        deleteRemoteCheckpointBackup(pTask->id.idStr, *pCheckpointId);
×
970
      }
971
    }
972
  }
973

974
  taosMemoryFree(pChkptDir);
3!
975
  return code;
3✔
976
}
977

978
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
18✔
979
  int32_t      code = TSDB_CODE_SUCCESS;
18✔
980
  int64_t      startTs = pTask->chkInfo.startTs;
18✔
981
  int64_t      ckId = pTask->chkInfo.pActiveInfo->activeId;
18✔
982
  const char*  id = pTask->id.idStr;
18✔
983
  SStreamMeta* pMeta = pTask->pMeta;
18✔
984
  SArray*      pList = taosArrayInit(4, sizeof(int64_t));
18✔
985

986
  if (pList == NULL) {
18!
987
    stError("s-task:%s failed to prepare list during build checkpoint, code:%s", id, tstrerror(code));
×
988
    return terrno;
×
989
  }
990

991
  streamMutexLock(&pTask->lock);
18✔
992
  bool dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
18✔
993
  streamMutexUnlock(&pTask->lock);
18✔
994

995
  // sink task does not need to save the status, and generated the checkpoint
996
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
18✔
997
    stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
9!
998

999
    int64_t ver = pTask->chkInfo.processedVer;
9✔
1000
    code = streamBackendDoCheckpoint(pTask->pBackend, ckId, ver, pList);
9✔
1001
    if (code != TSDB_CODE_SUCCESS) {
9!
1002
      stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
×
1003
    }
1004

1005
    int64_t et = taosGetTimestampMs();
9✔
1006
    stDebug("s-task:%s gen local checkpoint completed, elapsed time:%.2fs", id, (et - startTs) / 1000.0);
9!
1007
  }
1008

1009
  // TODO: monitoring the checkpoint-source msg
1010
  // send check point response to upstream task
1011
  if (code == TSDB_CODE_SUCCESS) {
18!
1012
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
18✔
1013
      code = streamTaskSendCheckpointSourceRsp(pTask);
6✔
1014
    } else {
1015
      code = streamTaskSendCheckpointReadyMsg(pTask);
12✔
1016
    }
1017

1018
    if (code != TSDB_CODE_SUCCESS) {
18!
1019
      // todo: let's retry send rsp to mnode, checkpoint-ready has monitor now
1020
      stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId,
×
1021
              tstrerror(code));
1022
    }
1023
  }
1024

1025
  if (code == TSDB_CODE_SUCCESS) {
18!
1026
    code = streamTaskRemoteBackupCheckpoint(pTask, ckId, pList);
18✔
1027
    if (code != TSDB_CODE_SUCCESS) {
18✔
1028
      stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code));
3!
1029
    }
1030
  } else {
1031
    stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
×
1032
  }
1033

1034
  // TODO: monitoring the checkpoint-report msg
1035
  // update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
1036
  if (code == TSDB_CODE_SUCCESS) {
18✔
1037
    if (pTask->pMsgCb != NULL) {
15!
1038
      code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
15✔
1039
    }
1040
  } else {  // clear the checkpoint info if failed
1041
    // set failed checkpoint id before clear the checkpoint info
1042
    streamMutexLock(&pTask->lock);
3✔
1043
    streamTaskSetFailedCheckpointId(pTask, ckId);
3✔
1044
    streamMutexUnlock(&pTask->lock);
3✔
1045

1046
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
3✔
1047
    stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
3!
1048
  }
1049

1050
  double el = (taosGetTimestampMs() - startTs) / 1000.0;
18✔
1051
  stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2fs, %s ", id,
18!
1052
         pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
1053
         (code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
1054

1055
  taosArrayDestroy(pList);
18✔
1056
  return code;
18✔
1057
}
1058

1059
static int32_t doChkptStatusCheck(SStreamTask* pTask, void* param) {
3✔
1060
  const char*            id = pTask->id.idStr;
3✔
1061
  int32_t                vgId = pTask->pMeta->vgId;
3✔
1062
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
3✔
1063
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
3✔
1064

1065
  // checkpoint-trigger recv flag is set, quit
1066
  if (pActiveInfo->allUpstreamTriggerRecv) {
3!
1067
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1068
    stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger", id, vgId);
×
1069
    return -1;
×
1070
  }
1071

1072
  if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
3!
1073
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1074
    stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
×
1075
           ", quit",
1076
           id, vgId, pTmrInfo->launchChkptId);
1077
    return -1;
×
1078
  }
1079

1080
  // active checkpoint info is cleared for now
1081
  if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) {
3!
1082
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1083
    stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr", id,
×
1084
           vgId);
1085
    return -1;
×
1086
  }
1087

1088
  return 0;
3✔
1089
}
1090

1091
static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** ppNotSendList) {
3✔
1092
  const char*            id = pTask->id.idStr;
3✔
1093
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
3✔
1094

1095
  SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
3✔
1096
  if (pNotSendList == NULL) {
3!
1097
    stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
×
1098
    return terrno;
×
1099
  }
1100

1101
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
6✔
1102
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i);
3✔
1103

1104
    bool recved = false;
3✔
1105
    for (int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) {
6✔
1106
      STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j);
3✔
1107
      if (pReady == NULL) {
3!
1108
        continue;
×
1109
      }
1110

1111
      if (pInfo->nodeId == pReady->upstreamNodeId) {
3!
1112
        recved = true;
×
1113
        break;
×
1114
      }
1115
    }
1116

1117
    if (!recved) {  // make sure the inputQ is opened for not recv upstream checkpoint-trigger message
3!
1118
      streamTaskOpenUpstreamInput(pTask, pInfo->taskId);
3✔
1119
      void* px = taosArrayPush(pNotSendList, pInfo);
3✔
1120
      if (px == NULL) {
3!
1121
        stError("s-task:%s failed to record not send info, code: out of memory", id);
×
1122
        taosArrayDestroy(pNotSendList);
×
1123
        return terrno;
×
1124
      }
1125
    }
1126
  }
1127

1128
  *ppNotSendList = pNotSendList;
3✔
1129
  return 0;
3✔
1130
}
1131

1132
int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray** ppNotSendList) {
3✔
1133
  const char*            id = pTask->id.idStr;
3✔
1134
  SArray*                pList = pTask->upstreamInfo.pList;  // send msg to retrieve checkpoint trigger msg
3✔
1135
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
3✔
1136
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
3✔
1137
  int32_t                vgId = pTask->pMeta->vgId;
3✔
1138

1139
  int32_t code = doChkptStatusCheck(pTask, param);
3✔
1140
  if (code) {
3!
1141
    return code;
×
1142
  }
1143

1144
  code = doFindNotSendUpstream(pTask, pList, ppNotSendList);
3✔
1145
  if (code) {
3!
1146
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1147
    stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr", id, tstrerror(code));
×
1148
    return code;
×
1149
  }
1150

1151
  // do send retrieve checkpoint trigger msg to upstream
1152
  code = doSendRetrieveTriggerMsg(pTask, *ppNotSendList);
3✔
1153
  if (code) {
3!
1154
    stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
1155
    code = 0;
×
1156
  }
1157

1158
  return code;
3✔
1159
}
1160

1161
static void doCleanup(SStreamTask* pTask, SArray* pList) {
450✔
1162
  streamMetaReleaseTask(pTask->pMeta, pTask);
450✔
1163
  taosArrayDestroy(pList);
450✔
1164
}
450✔
1165

1166
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
450✔
1167
  int32_t      code = 0;
450✔
1168
  int32_t      numOfNotSend = 0;
450✔
1169
  SArray*      pNotSendList = NULL;
450✔
1170
  int64_t      taskRefId = *(int64_t*)param;
450✔
1171
  int64_t      now = taosGetTimestampMs();
450✔
1172

1173
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
450✔
1174
  if (pTask == NULL) {
450!
1175
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
×
1176
    streamTaskFreeRefId(param);
×
1177
    return;
450✔
1178
  }
1179

1180
  int32_t                vgId = pTask->pMeta->vgId;
450✔
1181
  const char*            id = pTask->id.idStr;
450✔
1182
  SArray*                pList = pTask->upstreamInfo.pList;  // send msg to retrieve checkpoint trigger msg
450✔
1183
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
450✔
1184
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
450✔
1185

1186
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
450!
1187
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1188
    stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, quit", id);
×
1189
    doCleanup(pTask, pNotSendList);
×
1190
    return;
×
1191
  }
1192

1193
  // check the status every 100ms
1194
  if (streamTaskShouldStop(pTask)) {
450!
1195
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1196
    stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger", id, vgId);
×
1197
    doCleanup(pTask, pNotSendList);
×
1198
    return;
×
1199
  }
1200

1201
  if (++pTmrInfo->activeCounter < 50) {
450✔
1202
    streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
441✔
1203
                   "trigger-recv-monitor");
1204
    doCleanup(pTask, pNotSendList);
441✔
1205
    return;
441✔
1206
  }
1207

1208
  pTmrInfo->activeCounter = 0;
9✔
1209
  stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
9!
1210

1211
  streamMutexLock(&pTask->lock);
9✔
1212
  SStreamTaskState state = streamTaskGetStatus(pTask);
9✔
1213
  streamMutexUnlock(&pTask->lock);
9✔
1214

1215
  if (state.state != TASK_STATUS__CK) {
9!
1216
    streamCleanBeforeQuitTmr(pTmrInfo, param);
9✔
1217
    stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger", id,
9!
1218
            vgId, state.name);
1219
    doCleanup(pTask, pNotSendList);
9✔
1220
    return;
9✔
1221
  }
1222

1223
  streamMutexLock(&pActiveInfo->lock);
×
1224
  code = chkptTriggerRecvMonitorHelper(pTask, param, &pNotSendList);
×
1225
  streamMutexUnlock(&pActiveInfo->lock);
×
1226

1227
  if (code != TSDB_CODE_SUCCESS) {
×
1228
    doCleanup(pTask, pNotSendList);
×
1229
    return;
×
1230
  }
1231

1232
  // check every 100ms
1233
  numOfNotSend = taosArrayGetSize(pNotSendList);
×
1234
  if (numOfNotSend > 0) {
×
1235
    stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
×
1236
    streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
×
1237
                   "trigger-recv-monitor");
1238
  } else {
1239
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1240
    stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr", id);
×
1241
  }
1242

1243
  doCleanup(pTask, pNotSendList);
×
1244
}
1245

1246
int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
3✔
1247
  int32_t     code = 0;
3✔
1248
  int32_t     vgId = pTask->pMeta->vgId;
3✔
1249
  const char* pId = pTask->id.idStr;
3✔
1250
  int32_t     size = taosArrayGetSize(pNotSendList);
3✔
1251
  int32_t     numOfUpstream = streamTaskGetNumOfUpstream(pTask);
3✔
1252
  int64_t     checkpointId = pTask->chkInfo.pActiveInfo->activeId;
3✔
1253

1254
  if (size <= 0) {
3!
1255
    stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId);
×
1256
    return code;
×
1257
  }
1258

1259
  stDebug("s-task:%s %d/%d not recv checkpoint-trigger from upstream(s), start to send trigger-retrieve", pId, size,
3!
1260
          numOfUpstream);
1261

1262
  for (int32_t i = 0; i < size; i++) {
6✔
1263
    SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i);
3✔
1264
    if (pUpstreamTask == NULL) {
3!
1265
      return TSDB_CODE_INVALID_PARA;
×
1266
    }
1267

1268
    int32_t  ret = 0;
3✔
1269
    int32_t  tlen = 0;
3✔
1270
    void*    buf = NULL;
3✔
1271
    SRpcMsg  rpcMsg = {0};
3✔
1272
    SEncoder encoder;
1273

1274
    SRetrieveChkptTriggerReq req = {.streamId = pTask->id.streamId,
3✔
1275
                                    .downstreamTaskId = pTask->id.taskId,
3✔
1276
                                    .downstreamNodeId = vgId,
1277
                                    .upstreamTaskId = pUpstreamTask->taskId,
3✔
1278
                                    .upstreamNodeId = pUpstreamTask->nodeId,
3✔
1279
                                    .checkpointId = checkpointId};
1280

1281
    tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret);
3!
1282
    if (ret < 0) {
3!
1283
      stError("encode retrieve checkpoint-trigger msg failed, code:%s", tstrerror(code));
×
1284
    }
1285

1286
    buf = rpcMallocCont(tlen + sizeof(SMsgHead));
3✔
1287
    if (buf == NULL) {
3!
1288
      stError("vgId:%d failed to create retrieve checkpoint-trigger msg for task:%s exec, code:out of memory", vgId, pId);
×
1289
      continue;
×
1290
    }
1291

1292
    ((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(pUpstreamTask->nodeId);
3✔
1293
    void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
3✔
1294

1295
    tEncoderInit(&encoder, abuf, tlen);
3✔
1296
    if ((code = tEncodeRetrieveChkptTriggerReq(&encoder, &req)) < 0) {
3!
1297
      rpcFreeCont(buf);
×
1298
      tEncoderClear(&encoder);
×
1299
      stError("encode retrieve checkpoint-trigger req failed, code:%s", tstrerror(code));
×
1300
      continue;
×
1301
    }
1302
    tEncoderClear(&encoder);
3✔
1303

1304
    initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, buf, tlen + sizeof(SMsgHead));
3✔
1305

1306
    code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
3✔
1307
    if (code == TSDB_CODE_SUCCESS) {
3!
1308
      stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId,
3!
1309
              vgId, pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
1310
    } else {
1311
      stError("s-task:%s vgId:%d failed to send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64,
×
1312
              pId, vgId, pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
1313
    }
1314
  }
1315

1316
  return code;
3✔
1317
}
1318

1319
static int32_t isAlreadySendTriggerNoLock(SStreamTask* pTask, int32_t downstreamNodeId) {
3✔
1320
  int64_t                now = taosGetTimestampMs();
3✔
1321
  const char*            id = pTask->id.idStr;
3✔
1322
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
3✔
1323
  SStreamTaskState       pStatus = streamTaskGetStatus(pTask);
3✔
1324

1325
  if (!pInfo->dispatchTrigger) {
3!
1326
    return false;
×
1327
  }
1328

1329
  int32_t num = taosArrayGetSize(pInfo->pDispatchTriggerList);
3✔
1330
  for (int32_t i = 0; i < num; ++i) {
3!
1331
    STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
3✔
1332
    if (pSendInfo == NULL) {
3!
1333
      stError("s-task:%s invalid index in dispatch-trigger list, index:%d, size:%d, ignore and continue", id, i, num);
×
1334
      continue;
×
1335
    }
1336

1337
    if (pSendInfo->nodeId != downstreamNodeId) {
3!
1338
      continue;
×
1339
    }
1340

1341
    // has send trigger msg to downstream node,
1342
    double before = (now - pSendInfo->sendTs) / 1000.0;
3✔
1343
    if (pSendInfo->recved) {
3!
1344
      stWarn("s-task:%s checkpoint-trigger msg already send at:%" PRId64
×
1345
             "(%.2fs before) and recv confirmed by downstream:0x%x, checkpointId:%" PRId64 ", transId:%d",
1346
             id, pSendInfo->sendTs, before, pSendInfo->taskId, pInfo->activeId, pInfo->transId);
1347
    } else {
1348
      stWarn("s-task:%s checkpoint-trigger already send at:%" PRId64 "(%.2fs before), checkpointId:%" PRId64
3!
1349
             ", transId:%d",
1350
             id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
1351
    }
1352

1353
    return true;
3✔
1354
  }
1355

1356
  return false;
×
1357
}
1358

1359
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) {
3✔
1360
  int64_t                now = taosGetTimestampMs();
3✔
1361
  const char*            id = pTask->id.idStr;
3✔
1362
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
3✔
1363
  SStreamTaskState       pStatus = streamTaskGetStatus(pTask);
3✔
1364

1365
  if (pStatus.state != TASK_STATUS__CK) {
3!
1366
    return false;
×
1367
  }
1368

1369
  streamMutexLock(&pInfo->lock);
3✔
1370
  bool send = isAlreadySendTriggerNoLock(pTask, downstreamNodeId);
3✔
1371
  streamMutexUnlock(&pInfo->lock);
3✔
1372

1373
  return send;
3✔
1374
}
1375

1376
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal) {
6✔
1377
  *pRecved = taosArrayGetSize(pTask->chkInfo.pActiveInfo->pReadyMsgList);
6✔
1378

1379
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
6✔
1380
    *pTotal = 1;
3✔
1381
  } else {
1382
    *pTotal = streamTaskGetNumOfUpstream(pTask);
3✔
1383
  }
1384
}
6✔
1385

1386
// record the dispatch checkpoint trigger info in the list
1387
// memory insufficient may cause the stream computing stopped
1388
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask, int64_t sendingChkptId) {
9✔
1389
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
9✔
1390
  int64_t                now = taosGetTimestampMs();
9✔
1391
  int32_t                code = 0;
9✔
1392

1393
  streamMutexLock(&pInfo->lock);
9✔
1394

1395
  if (sendingChkptId > pInfo->failedId) {
9!
1396
    pInfo->dispatchTrigger = true;
9✔
1397
    if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
9✔
1398
      STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
6✔
1399

1400
      STaskTriggerSendInfo p = {
6✔
1401
          .sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
6✔
1402
      void* px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
6✔
1403
      if (px == NULL) {  // pause the stream task, if memory not enough
6!
1404
        code = terrno;
×
1405
      }
1406
    } else if (pTask->outputInfo.type == TASK_OUTPUT__VTABLE_MAP) {
3!
1407
      for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
×
1408
        STaskDispatcherFixed* pAddr = taosArrayGet(pTask->outputInfo.vtableMapDispatcher.taskInfos, i);
×
1409
        if (pAddr == NULL) {
×
1410
          continue;
×
1411
        }
1412

1413
        STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pAddr->nodeId, .taskId = pAddr->taskId};
×
1414
        void*                px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
×
1415
        if (px == NULL) {  // pause the stream task, if memory not enough
×
1416
          code = terrno;
×
1417
          break;
×
1418
        }
1419
      }
1420
    } else {
1421
      for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
9✔
1422
        SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
6✔
1423
        if (pVgInfo == NULL) {
6!
1424
          continue;
×
1425
        }
1426

1427
        STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
6✔
1428
        void*                px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
6✔
1429
        if (px == NULL) {  // pause the stream task, if memory not enough
6!
1430
          code = terrno;
×
1431
          break;
×
1432
        }
1433
      }
1434
    }
1435
  }
1436

1437
  streamMutexUnlock(&pInfo->lock);
9✔
1438

1439
  return code;
9✔
1440
}
1441

1442
int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) {
12✔
1443
  int32_t num = 0;
12✔
1444
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
30✔
1445
    STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
18✔
1446
    if (p == NULL) {
18!
1447
      continue;
×
1448
    }
1449

1450
    if (p->recved) {
18✔
1451
      num++;
15✔
1452
    }
1453
  }
1454
  return num;
12✔
1455
}
1456

1457
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
12✔
1458
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
12✔
1459

1460
  int64_t now = taosGetTimestampMs();
12✔
1461
  int32_t taskId = 0;
12✔
1462
  int32_t total = streamTaskGetNumOfDownstream(pTask);
12✔
1463
  bool    alreadyRecv = false;
12✔
1464

1465
  streamMutexLock(&pInfo->lock);
12✔
1466

1467
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
15!
1468
    STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
15✔
1469
    if (p == NULL) {
15!
1470
      continue;
×
1471
    }
1472

1473
    if (p->nodeId == vgId) {
15✔
1474
      if (p->recved) {
12!
1475
        stWarn("s-task:%s already recv checkpoint-trigger msg rsp from vgId:%d down:0x%x %.2fs ago, req send:%" PRId64
×
1476
               " discard",
1477
               pTask->id.idStr, vgId, p->taskId, (now - p->recvTs) / 1000.0, p->sendTs);
1478
        alreadyRecv = true;
×
1479
      } else {
1480
        p->recved = true;
12✔
1481
        p->recvTs = taosGetTimestampMs();
12✔
1482
        taskId = p->taskId;
12✔
1483
      }
1484
      break;
12✔
1485
    }
1486
  }
1487

1488
  int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pInfo);
12✔
1489
  streamMutexUnlock(&pInfo->lock);
12✔
1490

1491
  if (taskId == 0) {
12!
1492
    stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId);
×
1493
  } else {
1494
    if (!alreadyRecv) {
12!
1495
      stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d",
12!
1496
              pTask->id.idStr, taskId, vgId, numOfConfirmed, total);
1497
    }
1498
  }
1499
}
12✔
1500

1501
int32_t uploadCheckpointToS3(const char* id, const char* path) {
3✔
1502
  int32_t code = 0;
3✔
1503
  int32_t nBytes = 0;
3✔
1504
  /*
1505
  if (s3Init() != 0) {
1506
    return TSDB_CODE_THIRDPARTY_ERROR;
1507
  }
1508
  */
1509
  TdDirPtr pDir = taosOpenDir(path);
3✔
1510
  if (pDir == NULL) {
3!
1511
    return terrno;
×
1512
  }
1513

1514
  TdDirEntryPtr de = NULL;
3✔
1515
  while ((de = taosReadDir(pDir)) != NULL) {
15✔
1516
    char* name = taosGetDirEntryName(de);
12✔
1517
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
12!
1518

1519
    char filename[PATH_MAX] = {0};
6✔
1520
    if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
6!
1521
      nBytes = snprintf(filename, sizeof(filename), "%s%s", path, name);
×
1522
      if (nBytes <= 0 || nBytes >= sizeof(filename)) {
×
1523
        code = TSDB_CODE_OUT_OF_RANGE;
×
1524
        break;
×
1525
      }
1526
    } else {
1527
      nBytes = snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
6✔
1528
      if (nBytes <= 0 || nBytes >= sizeof(filename)) {
6!
1529
        code = TSDB_CODE_OUT_OF_RANGE;
×
1530
        break;
×
1531
      }
1532
    }
1533

1534
    char object[PATH_MAX] = {0};
6✔
1535
    nBytes = snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
6✔
1536
    if (nBytes <= 0 || nBytes >= sizeof(object)) {
6!
1537
      code = TSDB_CODE_OUT_OF_RANGE;
×
1538
      break;
×
1539
    }
1540

1541
    code = tcsPutObjectFromFile2(filename, object, 0);
6✔
1542
    if (code != 0) {
6!
1543
      stError("[tcs] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code));
6!
1544
    } else {
1545
      stDebug("[tcs] upload checkpoint:%s", filename);
×
1546
    }
1547
  }
1548

1549
  int32_t ret = taosCloseDir(&pDir);
3✔
1550
  if (code == 0 && ret != 0) {
3!
1551
    code = ret;
×
1552
  }
1553

1554
  return code;
3✔
1555
}
1556

1557
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
3✔
1558
  int32_t nBytes;
1559
  int32_t cap = strlen(id) + strlen(dstName) + 16;
3✔
1560

1561
  char* buf = taosMemoryCalloc(1, cap);
3!
1562
  if (buf == NULL) {
3!
1563
    return terrno;
×
1564
  }
1565

1566
  nBytes = snprintf(buf, cap, "%s/%s", id, fname);
3✔
1567
  if (nBytes <= 0 || nBytes >= cap) {
3!
1568
    taosMemoryFree(buf);
×
1569
    return TSDB_CODE_OUT_OF_RANGE;
×
1570
  }
1571
  int32_t code = tcsGetObjectToFile(buf, dstName);
3✔
1572
  if (code != 0) {
3!
1573
    taosMemoryFree(buf);
3!
1574
    return TAOS_SYSTEM_ERROR(ERRNO);
3✔
1575
  }
1576
  taosMemoryFree(buf);
×
1577
  return 0;
×
1578
}
1579

1580
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
18✔
1581
  if (strlen(tsSnodeAddress) != 0) {
18✔
1582
    return DATA_UPLOAD_RSYNC;
3✔
1583
  } else if (tsS3StreamEnabled) {
15!
1584
    return DATA_UPLOAD_S3;
×
1585
  } else {
1586
    return DATA_UPLOAD_DISABLE;
15✔
1587
  }
1588
}
1589

1590
int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId) {
6✔
1591
  int32_t code = 0;
6✔
1592
  if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
6!
1593
    stError("invalid parameters in upload checkpoint, %s", id);
3!
1594
    return TSDB_CODE_INVALID_CFG;
3✔
1595
  }
1596

1597
  if (strlen(tsSnodeAddress) != 0) {
3!
1598
    code = uploadByRsync(id, path, checkpointId);
×
1599
    if (code != 0) {
×
1600
      return TAOS_SYSTEM_ERROR(ERRNO);
×
1601
    }
1602
  } else if (tsS3StreamEnabled) {
3!
1603
    return uploadCheckpointToS3(id, path);
×
1604
  }
1605

1606
  return 0;
3✔
1607
}
1608

1609
// fileName:  CURRENT
1610
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
3✔
1611
  if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
3!
1612
    stError("down load checkpoint data parameters invalid");
×
1613
    return TSDB_CODE_INVALID_PARA;
×
1614
  }
1615

1616
  if (strlen(tsSnodeAddress) != 0) {
3!
1617
    return 0;
×
1618
  } else if (tsS3StreamEnabled) {
3!
1619
    return downloadCheckpointByNameS3(id, fname, dstName);
×
1620
  }
1621

1622
  return 0;
3✔
1623
}
1624

1625
int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t checkpointId) {
3✔
1626
  if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
3!
1627
    stError("down checkpoint data parameters invalid");
×
1628
    return -1;
×
1629
  }
1630

1631
  if (strlen(tsSnodeAddress) != 0) {
3!
1632
    return downloadByRsync(id, path, checkpointId);
3✔
1633
  } else if (tsS3StreamEnabled) {
×
1634
    return tcsGetObjectsByPrefix(id, path);
×
1635
  }
1636

1637
  return 0;
×
1638
}
1639

1640
int32_t deleteRemoteCheckpointBackup(const char* pTaskId, int64_t checkpointId) {
×
1641
  if (pTaskId == NULL || strlen(pTaskId) == 0) {
×
1642
    stError("s-task:%s deleteRemoteCheckpointBackup parameters invalid, checkpointId:%"PRId64, pTaskId, checkpointId);
×
1643
    return TSDB_CODE_INVALID_PARA;
×
1644
  }
1645

1646
  if (strlen(tsSnodeAddress) != 0) {
×
1647
    return deleteRsync(pTaskId, checkpointId);
×
1648
  } else if (tsS3StreamEnabled) {
×
1649
    tcsDeleteObjectsByPrefix(pTaskId);
×
1650
  }
1651
  return 0;
×
1652
}
1653

1654
int32_t deleteCheckpointRemoteBackup(const char* id, const char* name) {
3✔
1655
  char object[128] = {0};
3✔
1656

1657
  int32_t nBytes = snprintf(object, sizeof(object), "%s/%s", id, name);
3✔
1658
  if (nBytes <= 0 || nBytes >= sizeof(object)) {
3!
1659
    return TSDB_CODE_OUT_OF_RANGE;
×
1660
  }
1661

1662
  char*   tmp = object;
3✔
1663
  int32_t code = tcsDeleteObjects((const char**)&tmp, 1);
3✔
1664
  if (code != 0) {
3!
1665
    return TSDB_CODE_THIRDPARTY_ERROR;
3✔
1666
  }
1667
  return code;
×
1668
}
1669

1670
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
24✔
1671
  streamMutexLock(&pTask->lock);
24✔
1672
  ETaskStatus p = streamTaskGetStatus(pTask).state;
24✔
1673
  streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs());
24✔
1674
  streamMutexUnlock(&pTask->lock);
24✔
1675

1676
  // 1. stop the executo at first
1677
  if (pTask->exec.pExecutor != NULL) {
24!
1678
    // we need to make sure the underlying operator is stopped right, otherwise, SIGSEG may occur,
1679
    // waiting at most for 10min
1680
    if (pTask->info.taskLevel != TASK_LEVEL__SINK && pTask->exec.pExecutor != NULL) {
×
1681
      int32_t code = qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS, 600000);
×
1682
      if (code != TSDB_CODE_SUCCESS) {
×
1683
        stError("s-task:%s failed to kill task related query handle, code:%s", pTask->id.idStr, tstrerror(code));
×
1684
      }
1685
    }
1686

1687
    qDestroyTask(pTask->exec.pExecutor);
×
1688
    pTask->exec.pExecutor = NULL;
×
1689
  }
1690

1691
  // 2. destroy backend after stop executor
1692
  if (pTask->pBackend != NULL) {
24!
1693
    streamFreeTaskState(pTask, p);
×
1694
    pTask->pBackend = NULL;
×
1695
  }
1696

1697
  return 0;
24✔
1698
}
1699

1700
int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask) {
15✔
1701
  int32_t code = 0;
15✔
1702
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
15✔
1703
    return code;
9✔
1704
  }
1705

1706
  streamMutexLock(&pTask->lock);
6✔
1707
  SStreamTaskState p = streamTaskGetStatus(pTask);
6✔
1708
  if (p.state == TASK_STATUS__CK) {
6!
1709
    code = streamTaskSendCheckpointSourceRsp(pTask);
×
1710
  }
1711
  streamMutexUnlock(&pTask->lock);
6✔
1712

1713
  return code;
6✔
1714
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc