• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3530

16 Nov 2024 07:44AM UTC coverage: 60.219% (-0.7%) from 60.888%
#3530

push

travis-ci

web-flow
Update 03-ad.md

118417 of 252124 branches covered (46.97%)

Branch coverage included in aggregate %.

198982 of 274951 relevant lines covered (72.37%)

6072359.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

37.24
/source/libs/stream/src/streamCheckpoint.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "rsync.h"
17
#include "streamBackendRocksdb.h"
18
#include "streamInt.h"
19
#include "tcs.h"
20

21
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
22
static int32_t deleteCheckpointFile(const char* id, const char* name);
23
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId);
24
static int32_t deleteCheckpoint(const char* id);
25
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
26
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
27
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
28
                                          int32_t transId, int32_t srcTaskId);
29
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
30
static void    checkpointTriggerMonitorFn(void* param, void* tmrId);
31

32
int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
2,954✔
33
                                int32_t srcTaskId, SStreamDataBlock** pRes) {
34
  SStreamDataBlock* pChkpoint = NULL;
2,954✔
35
  int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pChkpoint);
2,954✔
36
  if (code) {
2,954!
37
    return code;
×
38
  }
39

40
  pChkpoint->type = checkpointType;
2,954✔
41
  if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) {
2,954!
42
    pChkpoint->srcTaskId = srcTaskId;
×
43
    if (srcTaskId <= 0) {
×
44
      stDebug("s-task:%s invalid src task id:%d for creating checkpoint trigger block", pTask->id.idStr, srcTaskId);
×
45
      return TSDB_CODE_INVALID_PARA;
×
46
    }
47
  }
48

49
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
2,954✔
50
  if (pBlock == NULL) {
2,954!
51
    taosFreeQitem(pChkpoint);
×
52
    return terrno;
×
53
  }
54

55
  pBlock->info.type = STREAM_CHECKPOINT;
2,954✔
56
  pBlock->info.version = checkpointId;
2,954✔
57
  pBlock->info.window.ekey = pBlock->info.window.skey = transId;  // NOTE: set the transId
2,954✔
58
  pBlock->info.rows = 1;
2,954✔
59
  pBlock->info.childId = pTask->info.selfChildId;
2,954✔
60

61
  pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
2,954✔
62
  if (pChkpoint->blocks == NULL) {
2,954!
63
    taosMemoryFree(pBlock);
×
64
    taosFreeQitem(pChkpoint);
×
65
    return terrno;
×
66
  }
67

68
  void* p = taosArrayPush(pChkpoint->blocks, pBlock);
2,954✔
69
  if (p == NULL) {
2,954!
70
    taosArrayDestroy(pChkpoint->blocks);
×
71
    taosMemoryFree(pBlock);
×
72
    taosFreeQitem(pChkpoint);
×
73
    return terrno;
×
74
  }
75

76
  *pRes = pChkpoint;
2,954✔
77

78
  taosMemoryFree(pBlock);
2,954✔
79
  return TSDB_CODE_SUCCESS;
2,954✔
80
}
81

82
// this message must be put into inputq successfully, continue retrying until it succeeds
83
// todo must be success
84
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
2,954✔
85
                                   int32_t srcTaskId) {
86
  SStreamDataBlock* pCheckpoint = NULL;
2,954✔
87
  int32_t code = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId, srcTaskId, &pCheckpoint);
2,954✔
88
  if (code != TSDB_CODE_SUCCESS) {
2,954!
89
    return code;
×
90
  }
91

92
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) {
2,954!
93
    return TSDB_CODE_OUT_OF_MEMORY;
×
94
  }
95

96
  return streamTrySchedExec(pTask);
2,954✔
97
}
98

99
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
1,485✔
100
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
1,485!
101
    return TSDB_CODE_INVALID_MSG;
×
102
  }
103

104
  // todo this status may not be set here.
105
  // 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
106
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
1,485✔
107
  if (code != TSDB_CODE_SUCCESS) {
1,485!
108
    stError("s-task:%s failed to handle gen-checkpoint event, failed to start checkpoint procedure", pTask->id.idStr);
×
109
    return code;
×
110
  }
111

112
  pTask->chkInfo.pActiveInfo->transId = pReq->transId;
1,485✔
113
  pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId;
1,485✔
114
  pTask->chkInfo.startTs = taosGetTimestampMs();
1,485✔
115
  pTask->execInfo.checkpoint += 1;
1,485✔
116

117
  // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
118
  // and this is the last item in the inputQ.
119
  return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId, -1);
1,485✔
120
}
121

122
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) {
×
123
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
124
  bool                   unQualified = false;
×
125
  const char*            id = pTask->id.idStr;
×
126

127
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
128
    stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", id);
×
129
    return TSDB_CODE_INVALID_MSG;
×
130
  }
131

132
  if (pRsp->rspCode != TSDB_CODE_SUCCESS) {
×
133
    stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", id, pRsp->upstreamTaskId,
×
134
            tstrerror(pRsp->rspCode));
135
    return TSDB_CODE_SUCCESS;
×
136
  }
137

138
  streamMutexLock(&pTask->lock);
×
139
  SStreamTaskState status = streamTaskGetStatus(pTask);
×
140
  streamMutexUnlock(&pTask->lock);
×
141

142
  if (status.state != TASK_STATUS__CK) {
×
143
    stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name);
×
144
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
145
  }
146

147
  streamMutexLock(&pInfo->lock);
×
148
  unQualified = (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId);
×
149
  streamMutexUnlock(&pInfo->lock);
×
150

151
  if (unQualified) {
×
152
    stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name);
×
153
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
154
  }
155

156
  // NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions.
157
  int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
×
158
                                            pRsp->upstreamTaskId);
159
  return code;
×
160
}
161

162
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
×
163
                                           SRpcHandleInfo* pRpcInfo, int32_t code) {
164
  int32_t size = sizeof(SMsgHead) + sizeof(SCheckpointTriggerRsp);
×
165
  void*   pBuf = rpcMallocCont(size);
×
166
  if (pBuf == NULL) {
×
167
    return terrno;
×
168
  }
169

170
  SCheckpointTriggerRsp* pRsp = POINTER_SHIFT(pBuf, sizeof(SMsgHead));
×
171

172
  ((SMsgHead*)pBuf)->vgId = htonl(downstreamNodeId);
×
173

174
  pRsp->streamId = pTask->id.streamId;
×
175
  pRsp->upstreamTaskId = pTask->id.taskId;
×
176
  pRsp->taskId = dstTaskId;
×
177
  pRsp->rspCode = code;
×
178

179
  if (code == TSDB_CODE_SUCCESS) {
×
180
    pRsp->checkpointId = pTask->chkInfo.pActiveInfo->activeId;
×
181
    pRsp->transId = pTask->chkInfo.pActiveInfo->transId;
×
182
  } else {
183
    pRsp->checkpointId = -1;
×
184
    pRsp->transId = -1;
×
185
  }
186

187
  SRpcMsg rspMsg = {.code = 0, .pCont = pBuf, .contLen = size, .info = *pRpcInfo};
×
188
  tmsgSendRsp(&rspMsg);
×
189

190
  return 0;
×
191
}
192

193
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
1,496✔
194
  pBlock->srcTaskId = pTask->id.taskId;
1,496✔
195
  pBlock->srcVgId = pTask->pMeta->vgId;
1,496✔
196

197
  if (pTask->chkInfo.pActiveInfo->dispatchTrigger == true) {
1,496!
198
    stError("s-task:%s already dispatch checkpoint-trigger, not dispatch again", pTask->id.idStr);
×
199
    return 0;
×
200
  }
201

202
  int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
1,496✔
203
  if (code == 0) {
1,496!
204
    code = streamDispatchStreamBlock(pTask);
1,496✔
205
  } else {
206
    stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
×
207
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
208
  }
209

210
  return code;
1,496✔
211
}
212

213
static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
6,084✔
214
                                               int32_t transId) {
215
  int32_t     code = 0;
6,084✔
216
  int32_t     vgId = pTask->pMeta->vgId;
6,084✔
217
  int32_t     taskLevel = pTask->info.taskLevel;
6,084✔
218
  const char* id = pTask->id.idStr;
6,084✔
219

220
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
6,084✔
221
  if (pTask->chkInfo.checkpointId > checkpointId) {
6,084!
222
    stError("s-task:%s vgId:%d current checkpointId:%" PRId64
×
223
            " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
224
            id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
225
    return code;
×
226
  }
227

228
  if (pActiveInfo->failedId >= checkpointId) {
6,084!
229
    stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
×
230
            " discard the checkpoint-trigger block",
231
            id, vgId, checkpointId, transId, pActiveInfo->failedId);
232
    return code;
×
233
  }
234

235
  if (pTask->chkInfo.checkpointId == checkpointId) {
6,084!
236
    {  // send checkpoint-ready msg to upstream
237
      SRpcMsg                msg = {0};
×
238
      SStreamUpstreamEpInfo* pInfo = NULL;
×
239
      streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo);
×
240
      if (pInfo == NULL) {
×
241
        return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
242
      }
243

244
      code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
×
245
      if (code == TSDB_CODE_SUCCESS) {
×
246
        code = tmsgSendReq(&pInfo->epSet, &msg);
×
247
        if (code) {
×
248
          stError("s-task:%s vgId:%d failed send chkpt-ready msg to upstream, code:%s", id, vgId, tstrerror(code));
×
249
        }
250
      }
251
    }
252

253
    stWarn(
×
254
        "s-task:%s vgId:%d recv already finished checkpoint-trigger, send checkpoint-ready to upstream:0x%x to resume "
255
        "the interrupted checkpoint",
256
        id, vgId, pBlock->srcTaskId);
257

258
    streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
×
259
    return code;
×
260
  }
261

262
  if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
6,084✔
263
    if (pActiveInfo->activeId != checkpointId) {
4,626!
264
      stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
×
265
              " discard",
266
              id, vgId, pActiveInfo->activeId, checkpointId);
267
      return code;
×
268
    } else {  // checkpointId == pActiveInfo->activeId
269
      if (pActiveInfo->allUpstreamTriggerRecv == 1) {
4,626!
270
        stDebug(
×
271
            "s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
272
            "checkpointId:%" PRId64 " transId:%d",
273
            id, vgId, checkpointId, transId);
274
        return code;
×
275
      }
276

277
      if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
4,626✔
278
        //  check if already recv or not, and duplicated checkpoint-trigger msg recv, discard it
279
        for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
8,982✔
280
          STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
5,829✔
281
          if (p == NULL) {
5,829!
282
            return TSDB_CODE_INVALID_PARA;
×
283
          }
284

285
          if (p->upstreamTaskId == pBlock->srcTaskId) {
5,829!
286
            stWarn("s-task:%s repeatly recv checkpoint-source msg from task:0x%x vgId:%d, checkpointId:%" PRId64
×
287
                   ", prev recvTs:%" PRId64 " discard",
288
                   pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
289
            return code;
×
290
          }
291
        }
292
      }
293
    }
294
  }
295

296
  return 0;
6,084✔
297
}
298

299
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
6,084✔
300
  int64_t                checkpointId = 0;
6,084✔
301
  int32_t                transId = 0;
6,084✔
302
  const char*            id = pTask->id.idStr;
6,084✔
303
  int32_t                code = TSDB_CODE_SUCCESS;
6,084✔
304
  int32_t                vgId = pTask->pMeta->vgId;
6,084✔
305
  int32_t                taskLevel = pTask->info.taskLevel;
6,084✔
306
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
6,084✔
307

308
  SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
6,084✔
309
  if (pDataBlock == NULL) {
6,084!
310
    return TSDB_CODE_INVALID_PARA;
×
311
  }
312

313
  checkpointId = pDataBlock->info.version;
6,084✔
314
  transId = pDataBlock->info.window.skey;
6,084✔
315

316
  streamMutexLock(&pTask->lock);
6,084✔
317
  code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
6,084✔
318
  streamMutexUnlock(&pTask->lock);
6,084✔
319
  if (code) {
6,084!
320
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
321
    return code;
×
322
  }
323

324
  stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64
6,084✔
325
          ", transId:%d current active checkpointId:%" PRId64,
326
          id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId);
327

328
  // set task status
329
  if (streamTaskGetStatus(pTask).state != TASK_STATUS__CK) {
6,084✔
330
    pActiveInfo->activeId = checkpointId;
1,458✔
331
    pActiveInfo->transId = transId;
1,458✔
332

333
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
1,458✔
334
    if (code != TSDB_CODE_SUCCESS) {
1,458!
335
      stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
×
336
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
337
      return code;
×
338
    }
339

340
    // if previous launched timer not started yet, not start a new timer
341
    // todo: fix this bug: previous set checkpoint-trigger check tmr is running, while we happen to try to launch
342
    //  a new checkpoint-trigger timer right now.
343
    //  And if we don't start a new timer, and the lost of checkpoint-trigger message may cause the whole checkpoint
344
    //  procedure to be stucked.
345
    SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
1,458✔
346
    int8_t          old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1);
1,458✔
347
    if (old == 0) {
1,458!
348
      stDebug("s-task:%s start checkpoint-trigger monitor in 10s", pTask->id.idStr);
1,458✔
349

350
      int64_t* pTaskRefId = NULL;
1,458✔
351
      code = streamTaskAllocRefId(pTask, &pTaskRefId);
1,458✔
352
      if (code == 0) {
1,458!
353
        streamTmrStart(checkpointTriggerMonitorFn, 200, pTaskRefId, streamTimer, &pTmrInfo->tmrHandle, vgId,
1,458✔
354
                       "trigger-recv-monitor");
355
        pTmrInfo->launchChkptId = pActiveInfo->activeId;
1,458✔
356
      }
357
    } else {  // already launched, do nothing
358
      stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr);
×
359
    }
360
  }
361

362
  if (taskLevel == TASK_LEVEL__SOURCE) {
6,084✔
363
    int8_t type = pTask->outputInfo.type;
1,473✔
364
    pActiveInfo->allUpstreamTriggerRecv = 1;
1,473✔
365

366
    // We need to transfer state here, before dispatching checkpoint-trigger to downstream tasks.
367
    // The transfer of state may generate new data that need to dispatch to downstream tasks,
368
    // Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed
369
    // before the next checkpoint.
370
    code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
1,473✔
371
    if (code) {
1,473!
372
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
373
      return code;
×
374
    }
375

376
    if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
1,473✔
377
      stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
1,454✔
378
      code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);  // todo handle this failure
1,454✔
379
    } else {  // only one task exists, no need to dispatch downstream info
380
      code =
381
          appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, -1);
19✔
382
      streamFreeQitem((SStreamQueueItem*)pBlock);
19✔
383
    }
384
  } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
4,611!
385
    if (pTask->chkInfo.startTs == 0) {
4,611✔
386
      pTask->chkInfo.startTs = taosGetTimestampMs();
1,458✔
387
      pTask->execInfo.checkpoint += 1;
1,458✔
388
    }
389

390
    // todo: handle this
391
    // update the child Id for downstream tasks
392
    code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
4,611✔
393

394
    // there are still some upstream tasks not send checkpoint request, do nothing and wait for then
395
    if (pActiveInfo->allUpstreamTriggerRecv != 1) {
4,611✔
396
      streamFreeQitem((SStreamQueueItem*)pBlock);
3,165✔
397
      return code;
3,165✔
398
    }
399

400
    int32_t num = streamTaskGetNumOfUpstream(pTask);
1,446✔
401
    if (taskLevel == TASK_LEVEL__SINK) {
1,446✔
402
      stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num);
1,404✔
403
      streamFreeQitem((SStreamQueueItem*)pBlock);
1,404✔
404
      code = streamTaskBuildCheckpoint(pTask);  // todo: not handle error yet
1,404✔
405
    } else {                                    // source & agg tasks need to forward the checkpoint msg downwards
406
      stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
42✔
407
      code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
42✔
408
      if (code) {
42!
409
        return code;
×
410
      }
411

412
      // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by
413
      // this task already. And then, dispatch check point msg to all downstream tasks
414
      code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);
42✔
415
    }
416
  }
417

418
  return code;
2,919✔
419
}
420

421
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
422
static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream,
4,590✔
423
                                          int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId,
424
                                          const char* id, int32_t* pNotReady, int32_t* pTransId, bool* alreadyRecv) {
425
  *alreadyRecv = false;
4,590✔
426
  int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
4,590✔
427
  for (int32_t i = 0; i < size; ++i) {
10,409✔
428
    STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
5,819✔
429
    if (p == NULL) {
5,819!
430
      return TSDB_CODE_INVALID_PARA;
×
431
    }
432

433
    if (p->downstreamTaskId == downstreamTaskId) {
5,819!
434
      (*alreadyRecv) = true;
×
435
      break;
×
436
    }
437
  }
438

439
  if (*alreadyRecv) {
4,590!
440
    stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id,
×
441
            downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)),
442
            numOfDownstream);
443
  } else {
444
    STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(),
4,590✔
445
                                     .downstreamTaskId = downstreamTaskId,
446
                                     .checkpointId = pInfo->activeId,
4,590✔
447
                                     .transId = pInfo->transId,
4,590✔
448
                                     .streamId = streamId,
449
                                     .downstreamNodeId = downstreamNodeId};
450
    void*                    p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info);
4,590✔
451
    if (p == NULL) {
4,590!
452
      stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno));
×
453
      return terrno;
×
454
    }
455
  }
456

457
  *pNotReady = numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
4,590✔
458
  *pTransId = pInfo->transId;
4,590✔
459
  return 0;
4,590✔
460
}
461

462
/**
463
 * All down stream tasks have successfully completed the check point task.
464
 * Current stream task is allowed to start to do checkpoint things in ASYNC model.
465
 */
466
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId,
4,591✔
467
                                        int32_t downstreamTaskId) {
468
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
4,591✔
469

470
  const char* id = pTask->id.idStr;
4,591✔
471
  int32_t     total = streamTaskGetNumOfDownstream(pTask);
4,591✔
472
  int32_t     code = 0;
4,591✔
473
  int32_t     notReady = 0;
4,591✔
474
  int32_t     transId = 0;
4,591✔
475
  bool        alreadyHandled = false;
4,591✔
476

477
  // 1. not in checkpoint status now
478
  SStreamTaskState pStat = streamTaskGetStatus(pTask);
4,591✔
479
  if (pStat.state != TASK_STATUS__CK) {
4,591✔
480
    stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId);
1!
481
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
1✔
482
  }
483

484
  // 2. expired checkpoint-ready msg, invalid checkpoint-ready msg
485
  if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) {
4,590!
486
    stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64
×
487
            ") from task:0x%x, expired and discard",
488
            id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId);
489
    return TSDB_CODE_INVALID_MSG;
×
490
  }
491

492
  streamMutexLock(&pInfo->lock);
4,590✔
493
  code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, &notReady,
4,590✔
494
                                    &transId, &alreadyHandled);
495
  streamMutexUnlock(&pInfo->lock);
4,590✔
496

497
  if (alreadyHandled) {
4,590!
498
    stDebug("s-task:%s checkpoint-ready msg checkpointId:%" PRId64 " from task:0x%x already handled, not handle again",
×
499
            id, checkpointId, downstreamTaskId);
500
  } else {
501
    if ((notReady == 0) && (code == 0) && (!alreadyHandled)) {
4,590!
502
      stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
1,450✔
503
      code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
1,450✔
504
    }
505
  }
506

507
  return code;
4,590✔
508
}
509

510
int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) {
4,585✔
511
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
4,585✔
512
  int64_t                now = taosGetTimestampMs();
4,585✔
513
  int32_t                numOfConfirmed = 0;
4,585✔
514

515
  streamMutexLock(&pInfo->lock);
4,585✔
516
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
10,407!
517
    STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
10,407✔
518
    if (pReadyInfo == NULL) {
10,407!
519
      stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
×
520
              pTask->id.idStr, i);
521
      continue;
×
522
    }
523

524
    if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) {
10,407!
525
      pReadyInfo->sendCompleted = 1;
4,585✔
526
      stDebug("s-task:%s send checkpoint-ready msg to upstream:0x%x confirmed, checkpointId:%" PRId64 " ts:%" PRId64,
4,585✔
527
              pTask->id.idStr, upstreamTaskId, checkpointId, now);
528
      break;
4,585✔
529
    }
530
  }
531

532
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
20,814✔
533
    STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
16,229✔
534
    if (pReadyInfo == NULL) {
16,229!
535
      stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
×
536
              pTask->id.idStr, i);
537
      continue;
×
538
    }
539

540
    if (pReadyInfo->sendCompleted == 1) {
16,229✔
541
      numOfConfirmed += 1;
10,407✔
542
    }
543
  }
544

545
  stDebug("s-task:%s send checkpoint-ready msg to %d upstream confirmed, checkpointId:%" PRId64, pTask->id.idStr,
4,585✔
546
          numOfConfirmed, checkpointId);
547

548
  streamMutexUnlock(&pInfo->lock);
4,585✔
549
  return TSDB_CODE_SUCCESS;
4,585✔
550
}
551

552
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
2,479✔
553
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
2,479✔
554

555
  pTask->chkInfo.startTs = 0;             // clear the recorded start time
2,479✔
556
  streamTaskOpenAllUpstreamInput(pTask);  // open inputQ for all upstream tasks
2,479✔
557

558
  streamMutexLock(&pInfo->lock);
2,479✔
559
  streamTaskClearActiveInfo(pInfo);
2,479✔
560
  if (clearChkpReadyMsg) {
2,479!
561
    streamClearChkptReadyMsg(pInfo);
2,479✔
562
  }
563
  streamMutexUnlock(&pInfo->lock);
2,479✔
564

565
  stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", current checkpointId:%" PRId64,
2,479✔
566
          pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId);
567
}
2,479✔
568

569
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
2,479✔
570
  SStreamMeta*     pMeta = pTask->pMeta;
2,479✔
571
  int32_t          vgId = pMeta->vgId;
2,479✔
572
  int32_t          code = 0;
2,479✔
573
  const char*      id = pTask->id.idStr;
2,479✔
574
  SCheckpointInfo* pInfo = &pTask->chkInfo;
2,479✔
575

576
  streamMutexLock(&pTask->lock);
2,479✔
577

578
  if (pReq->checkpointId <= pInfo->checkpointId) {
2,479!
579
    stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64
×
580
            " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored",
581
            id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer,
582
            pReq->transId);
583
    streamMutexUnlock(&pTask->lock);
×
584

585
    {  // destroy the related fill-history tasks
586
      // drop task should not in the meta-lock, and drop the related fill-history task now
587
      streamMetaWUnLock(pMeta);
×
588
      if (pReq->dropRelHTask) {
×
589
        code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
×
590
        int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
591
        stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
×
592
                id, vgId, pReq->taskId, numOfTasks);
593
      }
594

595
      streamMetaWLock(pMeta);
×
596
      if (pReq->dropRelHTask) {
×
597
        code = streamMetaCommit(pMeta);
×
598
      }
599
    }
600

601
    // always return true
602
    return TSDB_CODE_SUCCESS;
×
603
  }
604

605
  SStreamTaskState pStatus = streamTaskGetStatus(pTask);
2,479✔
606

607
  if (!restored) {  // during restore procedure, do update checkpoint-info
2,479!
608
    stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
×
609
            " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
610
            id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
611
            pInfo->checkpointTime, pReq->checkpointTs);
612
  } else {  // not in restore status, must be in checkpoint status
613
    if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
2,479!
614
      stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
2,479✔
615
              " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
616
              id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
617
              pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs);
618
    } else {
619
      stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
×
620
              " checkpointVer:%" PRId64 "->%" PRId64,
621
              id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
622
              pReq->checkpointVer);
623
    }
624
  }
625

626
  bool valid = (pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
4,958!
627
                pInfo->processedVer <= pReq->checkpointVer);
2,479!
628

629
  if (!valid) {
2,479!
630
    stFatal("invalid checkpoint id check, current checkpointId:%" PRId64 " checkpointVer:%" PRId64
×
631
            " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64,
632
            pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId, pReq->checkpointVer);
633
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
634
  }
635

636
  // update only it is in checkpoint status, or during restore procedure.
637
  if ((pStatus.state == TASK_STATUS__CK) || (!restored) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
2,479!
638
    pInfo->checkpointId = pReq->checkpointId;
2,479✔
639
    pInfo->checkpointVer = pReq->checkpointVer;
2,479✔
640
    pInfo->checkpointTime = pReq->checkpointTs;
2,479✔
641

642
    if (restored && (pMeta->role == NODE_ROLE_LEADER)) {
2,479!
643
      code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
2,479✔
644
    }
645
  }
646

647
  streamTaskClearCheckInfo(pTask, true);
2,479✔
648

649
  if (pReq->dropRelHTask) {
2,479✔
650
    stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
2,376✔
651
            pReq->taskId, vgId, pReq->hTaskId);
652
    CLEAR_RELATED_FILLHISTORY_TASK(pTask);
2,376✔
653
  }
654

655
  stDebug("s-task:0x%x set the persistent status attr to be ready, prev:%s, status in sm:%s", pReq->taskId,
2,479✔
656
          streamTaskGetStatusStr(pTask->status.taskStatus), streamTaskGetStatus(pTask).name);
657

658
  pTask->status.taskStatus = TASK_STATUS__READY;
2,479✔
659

660
  code = streamMetaSaveTask(pMeta, pTask);
2,479✔
661
  streamMutexUnlock(&pTask->lock);
2,479✔
662

663
  if (code != TSDB_CODE_SUCCESS) {
2,479!
664
    stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
×
665
            vgId, pReq->checkpointId, terrstr());
666
    return TSDB_CODE_SUCCESS;
×
667
  }
668

669
  streamMetaWUnLock(pMeta);
2,479✔
670

671
  // drop task should not in the meta-lock, and drop the related fill-history task now
672
  if (pReq->dropRelHTask) {
2,479✔
673
    code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
2,376✔
674
    int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
2,376✔
675
    stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
2,376✔
676
            (int32_t)pReq->hTaskId, numOfTasks);
677
  }
678

679
  streamMetaWLock(pMeta);
2,479✔
680
  code = streamMetaCommit(pMeta);
2,479✔
681

682
  return TSDB_CODE_SUCCESS;
2,479✔
683
}
684

685
void streamTaskSetFailedCheckpointId(SStreamTask* pTask) {
×
686
  struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
687

688
  if (pInfo->activeId <= 0) {
×
689
    stWarn("s-task:%s checkpoint-info is cleared now, not set the failed checkpoint info", pTask->id.idStr);
×
690
  } else {
691
    pInfo->failedId = pInfo->activeId;
×
692
    stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d)", pTask->id.idStr, pInfo->activeId,
×
693
            pInfo->transId);
694
  }
695
}
×
696

697
void streamTaskSetCheckpointFailed(SStreamTask* pTask) {
450✔
698
  streamMutexLock(&pTask->lock);
450✔
699
  ETaskStatus status = streamTaskGetStatus(pTask).state;
450✔
700
  if (status == TASK_STATUS__CK) {
450!
701
    streamTaskSetFailedCheckpointId(pTask);
×
702
  }
703
  streamMutexUnlock(&pTask->lock);
450✔
704
}
450✔
705

706
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
×
707
  int32_t code = 0;
×
708
  int32_t cap = strlen(path) + 64;
×
709

710
  char* filePath = taosMemoryCalloc(1, cap);
×
711
  if (filePath == NULL) {
×
712
    return terrno;
×
713
  }
714

715
  int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP");
×
716
  if (nBytes <= 0 || nBytes >= cap) {
×
717
    taosMemoryFree(filePath);
×
718
    return TSDB_CODE_OUT_OF_RANGE;
×
719
  }
720

721
  code = downloadCheckpointDataByName(id, "META", filePath);
×
722
  if (code != 0) {
×
723
    stError("%s chkp failed to download meta file:%s", id, filePath);
×
724
    taosMemoryFree(filePath);
×
725
    return code;
×
726
  }
727

728
  code = remoteChkpGetDelFile(filePath, list);
×
729
  if (code != 0) {
×
730
    stError("%s chkp failed to get to del:%s", id, filePath);
×
731
    taosMemoryFree(filePath);
×
732
  }
733
  return 0;
×
734
}
735

736
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
×
737
  int32_t code = 0;
×
738
  char*   path = NULL;
×
739

740
  SStreamMeta* pMeta = pTask->pMeta;
×
741
  const char*  idStr = pTask->id.idStr;
×
742
  int64_t      now = taosGetTimestampMs();
×
743

744
  SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
×
745
  if (toDelFiles == NULL) {
×
746
    return terrno;
×
747
  }
748

749
  if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
×
750
                                      pTask->id.idStr)) != 0) {
751
    stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(code));
×
752
  }
753

754
  if (type == DATA_UPLOAD_S3) {
×
755
    if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) {
×
756
      stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId,
×
757
              tstrerror(code));
758
    }
759
  }
760

761
  if (code == TSDB_CODE_SUCCESS) {
×
762
    code = streamTaskUploadCheckpoint(idStr, path, checkpointId);
×
763
    if (code == TSDB_CODE_SUCCESS) {
×
764
      stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
×
765
    } else {
766
      stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, path,
×
767
              tstrerror(code));
768
    }
769
  }
770

771
  if (code == TSDB_CODE_SUCCESS) {
×
772
    int32_t size = taosArrayGetSize(toDelFiles);
×
773
    stDebug("s-task:%s remove redundant %d files", idStr, size);
×
774

775
    for (int i = 0; i < size; i++) {
×
776
      char* pName = taosArrayGetP(toDelFiles, i);
×
777
      code = deleteCheckpointFile(idStr, pName);
×
778
      if (code != 0) {
×
779
        stDebug("s-task:%s failed to remove file: %s", idStr, pName);
×
780
        break;
×
781
      }
782
    }
783

784
    stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId);
×
785
  }
786

787
  taosArrayDestroyP(toDelFiles, taosMemoryFree);
×
788
  double el = (taosGetTimestampMs() - now) / 1000.0;
×
789

790
  if (code == TSDB_CODE_SUCCESS) {
×
791
    stDebug("s-task:%s complete update checkpointId:%" PRId64 ", elapsed time:%.2fs remove local checkpoint data %s",
×
792
            idStr, checkpointId, el, path);
793
    taosRemoveDir(path);
×
794
  } else {
795
    stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs", idStr,
×
796
            checkpointId, el);
797
  }
798

799
  taosMemoryFree(path);
×
800
  return code;
×
801
}
802

803
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) {
2,862✔
804
  ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
2,862✔
805
  if (type == DATA_UPLOAD_DISABLE) {
2,862!
806
    stDebug("s-task:%s not allowed to upload checkpoint data", pTask->id.idStr);
2,862✔
807
    return 0;
2,862✔
808
  }
809

810
  if (pTask == NULL || pTask->pBackend == NULL) {
×
811
    return 0;
×
812
  }
813

814
  int64_t dbRefId = taskGetDBRef(pTask->pBackend);
×
815
  void*   pBackend = taskAcquireDb(dbRefId);
×
816
  if (pBackend == NULL) {
×
817
    stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData",
×
818
            pTask->id.idStr);
819
    return -1;
×
820
  }
821

822
  int32_t code = uploadCheckpointData(pTask, checkpointId, taskGetDBRef(pTask->pBackend), type);
×
823
  taskReleaseDb(dbRefId);
×
824

825
  return code;
×
826
}
827

828
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
2,862✔
829
  int32_t      code = TSDB_CODE_SUCCESS;
2,862✔
830
  int64_t      startTs = pTask->chkInfo.startTs;
2,862✔
831
  int64_t      ckId = pTask->chkInfo.pActiveInfo->activeId;
2,862✔
832
  const char*  id = pTask->id.idStr;
2,862✔
833
  bool         dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
2,862✔
834
  SStreamMeta* pMeta = pTask->pMeta;
2,862✔
835

836
  // sink task does not need to save the status, and generated the checkpoint
837
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
2,862✔
838
    stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
1,458✔
839

840
    int64_t ver = pTask->chkInfo.processedVer;
1,458✔
841
    code = streamBackendDoCheckpoint(pTask->pBackend, ckId, ver);
1,458✔
842
    if (code != TSDB_CODE_SUCCESS) {
1,458!
843
      stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
×
844
    }
845
  }
846

847
  // TODO: monitoring the checkpoint-source msg
848
  // send check point response to upstream task
849
  if (code == TSDB_CODE_SUCCESS) {
2,862!
850
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
2,862✔
851
      code = streamTaskSendCheckpointSourceRsp(pTask);
1,402✔
852
    } else {
853
      code = streamTaskSendCheckpointReadyMsg(pTask);
1,460✔
854
    }
855

856
    if (code != TSDB_CODE_SUCCESS) {
2,862!
857
      // todo: let's retry send rsp to mnode, checkpoint-ready has monitor now
858
      stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId,
×
859
              tstrerror(code));
860
    }
861
  }
862

863
  if (code == TSDB_CODE_SUCCESS) {
2,862!
864
    code = streamTaskRemoteBackupCheckpoint(pTask, ckId);
2,862✔
865
    if (code != TSDB_CODE_SUCCESS) {
2,862!
866
      stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code));
×
867
    }
868
  } else {
869
    stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
×
870
  }
871

872
  // TODO: monitoring the checkpoint-report msg
873
  // update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
874
  if (code == TSDB_CODE_SUCCESS) {
2,862!
875
    if (pTask->pMsgCb != NULL) {
2,862✔
876
      code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
2,848✔
877
    }
878
  } else {  // clear the checkpoint info if failed
879
    streamMutexLock(&pTask->lock);
×
880
    streamTaskSetFailedCheckpointId(pTask);  // set failed checkpoint id before clear the checkpoint info
×
881
    streamMutexUnlock(&pTask->lock);
×
882

883
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
×
884
    stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
×
885
  }
886

887
  double el = (taosGetTimestampMs() - startTs) / 1000.0;
2,862✔
888
  stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2fs, %s ", id,
2,862!
889
         pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
890
         (code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
891

892
  return code;
2,862✔
893
}
894

895
static int32_t doChkptStatusCheck(SStreamTask* pTask, void* param) {
×
896
  const char*            id = pTask->id.idStr;
×
897
  int32_t                vgId = pTask->pMeta->vgId;
×
898
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
×
899
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
×
900

901
  // checkpoint-trigger recv flag is set, quit
902
  if (pActiveInfo->allUpstreamTriggerRecv) {
×
903
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
904
    stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger", id, vgId);
×
905
    return -1;
×
906
  }
907

908
  if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
×
909
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
910
    stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
×
911
           ", quit",
912
           id, vgId, pTmrInfo->launchChkptId);
913
    return -1;
×
914
  }
915

916
  // active checkpoint info is cleared for now
917
  if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) {
×
918
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
919
    stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr", id,
×
920
           vgId);
921
    return -1;
×
922
  }
923

924
  return 0;
×
925
}
926

927
static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** ppNotSendList) {
×
928
  const char*            id = pTask->id.idStr;
×
929
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
×
930

931
  SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
×
932
  if (pNotSendList == NULL) {
×
933
    stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
×
934
    return terrno;
×
935
  }
936

937
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
×
938
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i);
×
939

940
    bool recved = false;
×
941
    for (int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) {
×
942
      STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j);
×
943
      if (pReady == NULL) {
×
944
        continue;
×
945
      }
946

947
      if (pInfo->nodeId == pReady->upstreamNodeId) {
×
948
        recved = true;
×
949
        break;
×
950
      }
951
    }
952

953
    if (!recved) {  // make sure the inputQ is opened for not recv upstream checkpoint-trigger message
×
954
      streamTaskOpenUpstreamInput(pTask, pInfo->taskId);
×
955
      void* px = taosArrayPush(pNotSendList, pInfo);
×
956
      if (px == NULL) {
×
957
        stError("s-task:%s failed to record not send info, code: out of memory", id);
×
958
        taosArrayDestroy(pNotSendList);
×
959
        return terrno;
×
960
      }
961
    }
962
  }
963

964
  *ppNotSendList = pNotSendList;
×
965
  return 0;
×
966
}
967

968
static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray* pNotSendList) {
×
969
  const char*            id = pTask->id.idStr;
×
970
  SArray*                pList = pTask->upstreamInfo.pList;  // send msg to retrieve checkpoint trigger msg
×
971
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
×
972
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
×
973
  int32_t                vgId = pTask->pMeta->vgId;
×
974

975
  int32_t code = doChkptStatusCheck(pTask, param);
×
976
  if (code) {
×
977
    return code;
×
978
  }
979

980
  code = doFindNotSendUpstream(pTask, pList, &pNotSendList);
×
981
  if (code) {
×
982
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
983
    stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr", id, tstrerror(code));
×
984
    return code;
×
985
  }
986

987
  // do send retrieve checkpoint trigger msg to upstream
988
  code = doSendRetrieveTriggerMsg(pTask, pNotSendList);
×
989
  if (code) {
×
990
    stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
991
    code = 0;
×
992
  }
993

994
  return code;
×
995
}
996

997
static void doCleanup(SStreamTask* pTask, SArray* pList) {
44,704✔
998
  streamMetaReleaseTask(pTask->pMeta, pTask);
44,704✔
999
  taosArrayDestroy(pList);
44,704✔
1000
}
44,704✔
1001

1002
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
44,709✔
1003
  int32_t      code = 0;
44,709✔
1004
  int32_t      numOfNotSend = 0;
44,709✔
1005
  SArray*      pNotSendList = NULL;
44,709✔
1006
  int64_t      taskRefId = *(int64_t*)param;
44,709✔
1007
  int64_t      now = taosGetTimestampMs();
44,709✔
1008

1009
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
44,709✔
1010
  if (pTask == NULL) {
44,709✔
1011
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
5!
1012
    streamTaskFreeRefId(param);
5✔
1013
    return;
44,709✔
1014
  }
1015

1016
  int32_t                vgId = pTask->pMeta->vgId;
44,704✔
1017
  const char*            id = pTask->id.idStr;
44,704✔
1018
  SArray*                pList = pTask->upstreamInfo.pList;  // send msg to retrieve checkpoint trigger msg
44,704✔
1019
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
44,704✔
1020
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
44,704✔
1021

1022
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
44,704!
1023
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1024
    stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, quit", id);
×
1025
    doCleanup(pTask, pNotSendList);
×
1026
    return;
×
1027
  }
1028

1029
  // check the status every 100ms
1030
  if (streamTaskShouldStop(pTask)) {
44,704✔
1031
    streamCleanBeforeQuitTmr(pTmrInfo, param);
8✔
1032
    stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger", id, vgId);
8!
1033
    doCleanup(pTask, pNotSendList);
8✔
1034
    return;
8✔
1035
  }
1036

1037
  if (++pTmrInfo->activeCounter < 50) {
44,696✔
1038
    streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
44,243✔
1039
                   "trigger-recv-monitor");
1040
    doCleanup(pTask, pNotSendList);
44,243✔
1041
    return;
44,243✔
1042
  }
1043

1044
  pTmrInfo->activeCounter = 0;
453✔
1045
  stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
453✔
1046

1047
  streamMutexLock(&pTask->lock);
453✔
1048
  SStreamTaskState state = streamTaskGetStatus(pTask);
453✔
1049
  streamMutexUnlock(&pTask->lock);
453✔
1050

1051
  if (state.state != TASK_STATUS__CK) {
453!
1052
    streamCleanBeforeQuitTmr(pTmrInfo, param);
453✔
1053
    stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger", id,
453✔
1054
            vgId, state.name);
1055
    doCleanup(pTask, pNotSendList);
453✔
1056
    return;
453✔
1057
  }
1058

1059
  streamMutexLock(&pActiveInfo->lock);
×
1060
  code = chkptTriggerRecvMonitorHelper(pTask, param, pNotSendList);
×
1061
  streamMutexUnlock(&pActiveInfo->lock);
×
1062

1063
  if (code != TSDB_CODE_SUCCESS) {
×
1064
    doCleanup(pTask, pNotSendList);
×
1065
    return;
×
1066
  }
1067

1068
  // check every 100ms
1069
  numOfNotSend = taosArrayGetSize(pNotSendList);
×
1070
  if (numOfNotSend > 0) {
×
1071
    stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
×
1072
    streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
×
1073
                   "trigger-recv-monitor");
1074
  } else {
1075
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1076
    stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr", id);
×
1077
  }
1078

1079
  doCleanup(pTask, pNotSendList);
×
1080
}
1081

1082
int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
×
1083
  int32_t     code = 0;
×
1084
  int32_t     vgId = pTask->pMeta->vgId;
×
1085
  const char* pId = pTask->id.idStr;
×
1086
  int32_t     size = taosArrayGetSize(pNotSendList);
×
1087
  int32_t     numOfUpstream = streamTaskGetNumOfUpstream(pTask);
×
1088
  int64_t     checkpointId = pTask->chkInfo.pActiveInfo->activeId;
×
1089

1090
  if (size <= 0) {
×
1091
    stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId);
×
1092
    return code;
×
1093
  }
1094

1095
  stDebug("s-task:%s %d/%d not recv checkpoint-trigger from upstream(s), start to send trigger-retrieve", pId, size,
×
1096
          numOfUpstream);
1097

1098
  for (int32_t i = 0; i < size; i++) {
×
1099
    SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i);
×
1100
    if (pUpstreamTask == NULL) {
×
1101
      return TSDB_CODE_INVALID_PARA;
×
1102
    }
1103

1104
    SRetrieveChkptTriggerReq* pReq = rpcMallocCont(sizeof(SRetrieveChkptTriggerReq));
×
1105
    if (pReq == NULL) {
×
1106
      code = terrno;
×
1107
      stError("vgId:%d failed to create msg to retrieve trigger msg for task:%s exec, code:out of memory", vgId, pId);
×
1108
      continue;
×
1109
    }
1110

1111
    pReq->head.vgId = htonl(pUpstreamTask->nodeId);
×
1112
    pReq->streamId = pTask->id.streamId;
×
1113
    pReq->downstreamTaskId = pTask->id.taskId;
×
1114
    pReq->downstreamNodeId = vgId;
×
1115
    pReq->upstreamTaskId = pUpstreamTask->taskId;
×
1116
    pReq->upstreamNodeId = pUpstreamTask->nodeId;
×
1117
    pReq->checkpointId = checkpointId;
×
1118

1119
    SRpcMsg rpcMsg = {0};
×
1120
    initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, pReq, sizeof(SRetrieveChkptTriggerReq));
×
1121

1122
    code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
×
1123
    if (code == TSDB_CODE_SUCCESS) {
×
1124
      stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId,
×
1125
              vgId, pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
1126
    } else {
1127
      stError("s-task:%s vgId:%d failed to send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64,
×
1128
              pId, vgId, pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
1129
    }
1130
  }
1131

1132
  return code;
×
1133
}
1134

1135
static int32_t isAlreadySendTriggerNoLock(SStreamTask* pTask, int32_t downstreamNodeId) {
×
1136
  int64_t                now = taosGetTimestampMs();
×
1137
  const char*            id = pTask->id.idStr;
×
1138
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
1139
  SStreamTaskState       pStatus = streamTaskGetStatus(pTask);
×
1140

1141
  if (!pInfo->dispatchTrigger) {
×
1142
    return false;
×
1143
  }
1144

1145
  int32_t num = taosArrayGetSize(pInfo->pDispatchTriggerList);
×
1146
  for (int32_t i = 0; i < num; ++i) {
×
1147
    STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
×
1148
    if (pSendInfo == NULL) {
×
1149
      stError("s-task:%s invalid index in dispatch-trigger list, index:%d, size:%d, ignore and continue", id, i, num);
×
1150
      continue;
×
1151
    }
1152

1153
    if (pSendInfo->nodeId != downstreamNodeId) {
×
1154
      continue;
×
1155
    }
1156

1157
    // has send trigger msg to downstream node,
1158
    double before = (now - pSendInfo->sendTs) / 1000.0;
×
1159
    if (pSendInfo->recved) {
×
1160
      stWarn("s-task:%s checkpoint-trigger msg already send at:%" PRId64
×
1161
             "(%.2fs before) and recv confirmed by downstream:0x%x, checkpointId:%" PRId64 ", transId:%d",
1162
             id, pSendInfo->sendTs, before, pSendInfo->taskId, pInfo->activeId, pInfo->transId);
1163
    } else {
1164
      stWarn("s-task:%s checkpoint-trigger already send at:%" PRId64 "(%.2fs before), checkpointId:%" PRId64
×
1165
             ", transId:%d",
1166
             id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
1167
    }
1168

1169
    return true;
×
1170
  }
1171

1172
  return false;
×
1173
}
1174

1175
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) {
×
1176
  int64_t                now = taosGetTimestampMs();
×
1177
  const char*            id = pTask->id.idStr;
×
1178
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
1179
  SStreamTaskState       pStatus = streamTaskGetStatus(pTask);
×
1180

1181
  if (pStatus.state != TASK_STATUS__CK) {
×
1182
    return false;
×
1183
  }
1184

1185
  streamMutexLock(&pInfo->lock);
×
1186
  bool send = isAlreadySendTriggerNoLock(pTask, downstreamNodeId);
×
1187
  streamMutexUnlock(&pInfo->lock);
×
1188

1189
  return send;
×
1190
}
1191

1192
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal) {
×
1193
  *pRecved = taosArrayGetSize(pTask->chkInfo.pActiveInfo->pReadyMsgList);
×
1194

1195
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
1196
    *pTotal = 1;
×
1197
  } else {
1198
    *pTotal = streamTaskGetNumOfUpstream(pTask);
×
1199
  }
1200
}
×
1201

1202
// record the dispatch checkpoint trigger info in the list
1203
// memory insufficient may cause the stream computing stopped
1204
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
1,496✔
1205
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
1,496✔
1206
  int64_t                now = taosGetTimestampMs();
1,496✔
1207
  int32_t                code = 0;
1,496✔
1208

1209
  streamMutexLock(&pInfo->lock);
1,496✔
1210

1211
  pInfo->dispatchTrigger = true;
1,496✔
1212
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
1,496✔
1213
    STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
106✔
1214

1215
    STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
106✔
1216
    void*                px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
106✔
1217
    if (px == NULL) {  // pause the stream task, if memory not enough
106!
1218
      code = terrno;
×
1219
    }
1220
  } else {
1221
    for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
5,965✔
1222
      SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
4,575✔
1223
      if (pVgInfo == NULL) {
4,575!
1224
        continue;
×
1225
      }
1226

1227
      STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
4,575✔
1228
      void*                px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
4,575✔
1229
      if (px == NULL) {  // pause the stream task, if memory not enough
4,575!
1230
        code = terrno;
×
1231
        break;
×
1232
      }
1233
    }
1234
  }
1235

1236
  streamMutexUnlock(&pInfo->lock);
1,496✔
1237

1238
  return code;
1,496✔
1239
}
1240

1241
int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) {
4,617✔
1242
  int32_t num = 0;
4,617✔
1243
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
20,902✔
1244
    STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
16,285✔
1245
    if (p == NULL) {
16,285!
1246
      continue;
×
1247
    }
1248

1249
    if (p->recved) {
16,285✔
1250
      num++;
10,449✔
1251
    }
1252
  }
1253
  return num;
4,617✔
1254
}
1255

1256
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
4,618✔
1257
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
4,618✔
1258

1259
  int64_t now = taosGetTimestampMs();
4,618✔
1260
  int32_t taskId = 0;
4,618✔
1261
  int32_t total = streamTaskGetNumOfDownstream(pTask);
4,618✔
1262
  bool    alreadyRecv = false;
4,618✔
1263

1264
  streamMutexLock(&pInfo->lock);
4,618✔
1265

1266
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
10,451!
1267
    STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
10,450✔
1268
    if (p == NULL) {
10,450!
1269
      continue;
×
1270
    }
1271

1272
    if (p->nodeId == vgId) {
10,450✔
1273
      if (p->recved) {
4,617!
1274
        stWarn("s-task:%s already recv checkpoint-trigger msg rsp from vgId:%d down:0x%x %.2fs ago, req send:%" PRId64
×
1275
               " discard",
1276
               pTask->id.idStr, vgId, p->taskId, (now - p->recvTs) / 1000.0, p->sendTs);
1277
        alreadyRecv = true;
×
1278
      } else {
1279
        p->recved = true;
4,617✔
1280
        p->recvTs = taosGetTimestampMs();
4,617✔
1281
        taskId = p->taskId;
4,617✔
1282
      }
1283
      break;
4,617✔
1284
    }
1285
  }
1286

1287
  int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pInfo);
4,617✔
1288
  streamMutexUnlock(&pInfo->lock);
4,617✔
1289

1290
  if (taskId == 0) {
4,618!
1291
    stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId);
×
1292
  } else {
1293
    if (!alreadyRecv) {
4,618!
1294
      stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d",
4,618✔
1295
              pTask->id.idStr, taskId, vgId, numOfConfirmed, total);
1296
    }
1297
  }
1298
}
4,618✔
1299

1300
static int32_t uploadCheckpointToS3(const char* id, const char* path) {
×
1301
  int32_t code = 0;
×
1302
  int32_t nBytes = 0;
×
1303
  /*
1304
  if (s3Init() != 0) {
1305
    return TSDB_CODE_THIRDPARTY_ERROR;
1306
  }
1307
  */
1308
  TdDirPtr pDir = taosOpenDir(path);
×
1309
  if (pDir == NULL) {
×
1310
    return terrno;
×
1311
  }
1312

1313
  TdDirEntryPtr de = NULL;
×
1314
  while ((de = taosReadDir(pDir)) != NULL) {
×
1315
    char* name = taosGetDirEntryName(de);
×
1316
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
×
1317

1318
    char filename[PATH_MAX] = {0};
×
1319
    if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
×
1320
      nBytes = snprintf(filename, sizeof(filename), "%s%s", path, name);
×
1321
      if (nBytes <= 0 || nBytes >= sizeof(filename)) {
×
1322
        code = TSDB_CODE_OUT_OF_RANGE;
×
1323
        break;
×
1324
      }
1325
    } else {
1326
      nBytes = snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
×
1327
      if (nBytes <= 0 || nBytes >= sizeof(filename)) {
×
1328
        code = TSDB_CODE_OUT_OF_RANGE;
×
1329
        break;
×
1330
      }
1331
    }
1332

1333
    char object[PATH_MAX] = {0};
×
1334
    nBytes = snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
×
1335
    if (nBytes <= 0 || nBytes >= sizeof(object)) {
×
1336
      code = TSDB_CODE_OUT_OF_RANGE;
×
1337
      break;
×
1338
    }
1339

1340
    code = tcsPutObjectFromFile2(filename, object, 0);
×
1341
    if (code != 0) {
×
1342
      stError("[tcs] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code));
×
1343
    } else {
1344
      stDebug("[tcs] upload checkpoint:%s", filename);
×
1345
    }
1346
  }
1347

1348
  int32_t ret = taosCloseDir(&pDir);
×
1349
  if (code == 0 && ret != 0) {
×
1350
    code = ret;
×
1351
  }
1352

1353
  return code;
×
1354
}
1355

1356
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
×
1357
  int32_t nBytes;
1358
  int32_t cap = strlen(id) + strlen(dstName) + 16;
×
1359

1360
  char* buf = taosMemoryCalloc(1, cap);
×
1361
  if (buf == NULL) {
×
1362
    return terrno;
×
1363
  }
1364

1365
  nBytes = snprintf(buf, cap, "%s/%s", id, fname);
×
1366
  if (nBytes <= 0 || nBytes >= cap) {
×
1367
    taosMemoryFree(buf);
×
1368
    return TSDB_CODE_OUT_OF_RANGE;
×
1369
  }
1370
  int32_t code = tcsGetObjectToFile(buf, dstName);
×
1371
  if (code != 0) {
×
1372
    taosMemoryFree(buf);
×
1373
    return TAOS_SYSTEM_ERROR(errno);
×
1374
  }
1375
  taosMemoryFree(buf);
×
1376
  return 0;
×
1377
}
1378

1379
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
2,872✔
1380
  if (strlen(tsSnodeAddress) != 0) {
2,872!
1381
    return DATA_UPLOAD_RSYNC;
×
1382
  } else if (tsS3StreamEnabled) {
2,872!
1383
    return DATA_UPLOAD_S3;
×
1384
  } else {
1385
    return DATA_UPLOAD_DISABLE;
2,872✔
1386
  }
1387
}
1388

1389
int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId) {
×
1390
  int32_t code = 0;
×
1391
  if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
×
1392
    stError("invalid parameters in upload checkpoint, %s", id);
×
1393
    return TSDB_CODE_INVALID_CFG;
×
1394
  }
1395

1396
  if (strlen(tsSnodeAddress) != 0) {
×
1397
    code = uploadByRsync(id, path, checkpointId);
×
1398
    if (code != 0) {
×
1399
      return TAOS_SYSTEM_ERROR(errno);
×
1400
    }
1401
  } else if (tsS3StreamEnabled) {
×
1402
    return uploadCheckpointToS3(id, path);
×
1403
  }
1404

1405
  return 0;
×
1406
}
1407

1408
// fileName:  CURRENT
1409
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
×
1410
  if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
×
1411
    stError("down load checkpoint data parameters invalid");
×
1412
    return TSDB_CODE_INVALID_PARA;
×
1413
  }
1414

1415
  if (strlen(tsSnodeAddress) != 0) {
×
1416
    return 0;
×
1417
  } else if (tsS3StreamEnabled) {
×
1418
    return downloadCheckpointByNameS3(id, fname, dstName);
×
1419
  }
1420

1421
  return 0;
×
1422
}
1423

1424
int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t checkpointId) {
×
1425
  if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
×
1426
    stError("down checkpoint data parameters invalid");
×
1427
    return -1;
×
1428
  }
1429

1430
  if (strlen(tsSnodeAddress) != 0) {
×
1431
    return downloadByRsync(id, path, checkpointId);
×
1432
  } else if (tsS3StreamEnabled) {
×
1433
    return tcsGetObjectsByPrefix(id, path);
×
1434
  }
1435

1436
  return 0;
×
1437
}
1438

1439
int32_t deleteCheckpoint(const char* id) {
×
1440
  if (id == NULL || strlen(id) == 0) {
×
1441
    stError("deleteCheckpoint parameters invalid");
×
1442
    return TSDB_CODE_INVALID_PARA;
×
1443
  }
1444
  if (strlen(tsSnodeAddress) != 0) {
×
1445
    return deleteRsync(id);
×
1446
  } else if (tsS3StreamEnabled) {
×
1447
    tcsDeleteObjectsByPrefix(id);
×
1448
  }
1449
  return 0;
×
1450
}
1451

1452
int32_t deleteCheckpointFile(const char* id, const char* name) {
×
1453
  char object[128] = {0};
×
1454

1455
  int32_t nBytes = snprintf(object, sizeof(object), "%s/%s", id, name);
×
1456
  if (nBytes <= 0 || nBytes >= sizeof(object)) {
×
1457
    return TSDB_CODE_OUT_OF_RANGE;
×
1458
  }
1459

1460
  char*   tmp = object;
×
1461
  int32_t code = tcsDeleteObjects((const char**)&tmp, 1);
×
1462
  if (code != 0) {
×
1463
    return TSDB_CODE_THIRDPARTY_ERROR;
×
1464
  }
1465
  return code;
×
1466
}
1467

1468
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
37✔
1469
  streamMutexLock(&pTask->lock);
37✔
1470
  ETaskStatus p = streamTaskGetStatus(pTask).state;
37✔
1471
  //  if (pInfo->alreadySendChkptId == true) {
1472
  //    stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
1473
  //    streamMutexUnlock(&pTask->lock);
1474
  //    return TSDB_CODE_SUCCESS;
1475
  //  } else {
1476
  //    pInfo->alreadySendChkptId = true;
1477
  //  }
1478
  //
1479
  streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs());
37✔
1480
  streamMutexUnlock(&pTask->lock);
37✔
1481

1482
  if (pTask->pBackend != NULL) {
37!
1483
    streamFreeTaskState(pTask, p);
×
1484
    pTask->pBackend = NULL;
×
1485
  }
1486
  return 0;
37✔
1487
}
1488

1489
int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask) {
5✔
1490
  int32_t code = 0;
5✔
1491
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
5✔
1492
    return code;
3✔
1493
  }
1494

1495
  streamMutexLock(&pTask->lock);
2✔
1496
  SStreamTaskState p = streamTaskGetStatus(pTask);
2✔
1497
  if (p.state == TASK_STATUS__CK) {
2!
1498
    code = streamTaskSendCheckpointSourceRsp(pTask);
×
1499
  }
1500
  streamMutexUnlock(&pTask->lock);
2✔
1501

1502
  return code;
2✔
1503
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc