• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3533

20 Nov 2024 07:11AM UTC coverage: 58.848% (-1.9%) from 60.78%
#3533

push

travis-ci

web-flow
Merge pull request #28823 from taosdata/fix/3.0/TD-32587

fix:[TD-32587]fix stmt segmentation fault

115578 of 252434 branches covered (45.79%)

Branch coverage included in aggregate %.

1 of 4 new or added lines in 1 file covered. (25.0%)

8038 existing lines in 233 files now uncovered.

194926 of 275199 relevant lines covered (70.83%)

1494459.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

43.08
/source/libs/stream/src/streamCheckpoint.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "rsync.h"
17
#include "streamBackendRocksdb.h"
18
#include "streamInt.h"
19
#include "tcs.h"
20

21
static int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName);
22
static int32_t deleteCheckpointFile(const char* id, const char* name);
23
static int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId);
24
static int32_t deleteCheckpoint(const char* id);
25
static int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName);
26
static int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask);
27
static int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId,
28
                                          int32_t transId, int32_t srcTaskId);
29
static int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList);
30
static void    checkpointTriggerMonitorFn(void* param, void* tmrId);
31

32
int32_t createChkptTriggerBlock(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
893✔
33
                                int32_t srcTaskId, SStreamDataBlock** pRes) {
34
  SStreamDataBlock* pChkpoint = NULL;
893✔
35
  int32_t code = taosAllocateQitem(sizeof(SStreamDataBlock), DEF_QITEM, sizeof(SSDataBlock), (void**)&pChkpoint);
893✔
36
  if (code) {
893!
37
    return code;
×
38
  }
39

40
  pChkpoint->type = checkpointType;
893✔
41
  if (checkpointType == STREAM_INPUT__CHECKPOINT_TRIGGER && (pTask->info.taskLevel != TASK_LEVEL__SOURCE)) {
893!
42
    pChkpoint->srcTaskId = srcTaskId;
×
43
    if (srcTaskId <= 0) {
×
44
      stDebug("s-task:%s invalid src task id:%d for creating checkpoint trigger block", pTask->id.idStr, srcTaskId);
×
45
      return TSDB_CODE_INVALID_PARA;
×
46
    }
47
  }
48

49
  SSDataBlock* pBlock = taosMemoryCalloc(1, sizeof(SSDataBlock));
893✔
50
  if (pBlock == NULL) {
893!
51
    taosFreeQitem(pChkpoint);
×
52
    return terrno;
×
53
  }
54

55
  pBlock->info.type = STREAM_CHECKPOINT;
893✔
56
  pBlock->info.version = checkpointId;
893✔
57
  pBlock->info.window.ekey = pBlock->info.window.skey = transId;  // NOTE: set the transId
893✔
58
  pBlock->info.rows = 1;
893✔
59
  pBlock->info.childId = pTask->info.selfChildId;
893✔
60

61
  pChkpoint->blocks = taosArrayInit(4, sizeof(SSDataBlock));  // pBlock;
893✔
62
  if (pChkpoint->blocks == NULL) {
893!
63
    taosMemoryFree(pBlock);
×
64
    taosFreeQitem(pChkpoint);
×
65
    return terrno;
×
66
  }
67

68
  void* p = taosArrayPush(pChkpoint->blocks, pBlock);
893✔
69
  if (p == NULL) {
893!
70
    taosArrayDestroy(pChkpoint->blocks);
×
71
    taosMemoryFree(pBlock);
×
72
    taosFreeQitem(pChkpoint);
×
73
    return terrno;
×
74
  }
75

76
  *pRes = pChkpoint;
893✔
77

78
  taosMemoryFree(pBlock);
893✔
79
  return TSDB_CODE_SUCCESS;
893✔
80
}
81

82
// this message must be put into inputq successfully, continue retrying until it succeeds
83
// todo must be success
84
int32_t appendCheckpointIntoInputQ(SStreamTask* pTask, int32_t checkpointType, int64_t checkpointId, int32_t transId,
893✔
85
                                   int32_t srcTaskId) {
86
  SStreamDataBlock* pCheckpoint = NULL;
893✔
87
  int32_t code = createChkptTriggerBlock(pTask, checkpointType, checkpointId, transId, srcTaskId, &pCheckpoint);
893✔
88
  if (code != TSDB_CODE_SUCCESS) {
893!
89
    return code;
×
90
  }
91

92
  if (streamTaskPutDataIntoInputQ(pTask, (SStreamQueueItem*)pCheckpoint) < 0) {
893!
93
    return TSDB_CODE_OUT_OF_MEMORY;
×
94
  }
95

96
  return streamTrySchedExec(pTask);
893✔
97
}
98

99
int32_t streamProcessCheckpointSourceReq(SStreamTask* pTask, SStreamCheckpointSourceReq* pReq) {
443✔
100
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
443!
101
    return TSDB_CODE_INVALID_MSG;
×
102
  }
103

104
  // todo this status may not be set here.
105
  // 1. set task status to be prepared for check point, no data are allowed to put into inputQ.
106
  int32_t code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
443✔
107
  if (code != TSDB_CODE_SUCCESS) {
444!
108
    stError("s-task:%s failed to handle gen-checkpoint event, failed to start checkpoint procedure", pTask->id.idStr);
×
109
    return code;
×
110
  }
111

112
  pTask->chkInfo.pActiveInfo->transId = pReq->transId;
444✔
113
  pTask->chkInfo.pActiveInfo->activeId = pReq->checkpointId;
444✔
114
  pTask->chkInfo.startTs = taosGetTimestampMs();
444✔
115
  pTask->execInfo.checkpoint += 1;
444✔
116

117
  // 2. Put the checkpoint block into inputQ, to make sure all blocks with less version have been handled by this task
118
  // and this is the last item in the inputQ.
119
  return appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pReq->checkpointId, pReq->transId, -1);
444✔
120
}
121

122
int32_t streamTaskProcessCheckpointTriggerRsp(SStreamTask* pTask, SCheckpointTriggerRsp* pRsp) {
×
123
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
124
  bool                   unQualified = false;
×
125
  const char*            id = pTask->id.idStr;
×
126

127
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
128
    stError("s-task:%s invalid msg recv, checkpoint-trigger rsp not handled", id);
×
129
    return TSDB_CODE_INVALID_MSG;
×
130
  }
131

132
  if (pRsp->rspCode != TSDB_CODE_SUCCESS) {
×
133
    stDebug("s-task:%s retrieve checkpoint-trgger rsp from upstream:0x%x invalid, code:%s", id, pRsp->upstreamTaskId,
×
134
            tstrerror(pRsp->rspCode));
135
    return TSDB_CODE_SUCCESS;
×
136
  }
137

138
  streamMutexLock(&pTask->lock);
×
139
  SStreamTaskState status = streamTaskGetStatus(pTask);
×
140
  streamMutexUnlock(&pTask->lock);
×
141

142
  if (status.state != TASK_STATUS__CK) {
×
143
    stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name);
×
144
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
145
  }
146

147
  streamMutexLock(&pInfo->lock);
×
148
  unQualified = (pInfo->activeId != pRsp->checkpointId || pInfo->transId != pRsp->transId);
×
149
  streamMutexUnlock(&pInfo->lock);
×
150

151
  if (unQualified) {
×
152
    stError("s-task:%s status:%s not in checkpoint status, discard the checkpoint-trigger msg", id, status.name);
×
153
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
154
  }
155

156
  // NOTE: here we do not do the duplicated checkpoint-trigger msg check, since it will be done by following functions.
157
  int32_t code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT_TRIGGER, pRsp->checkpointId, pRsp->transId,
×
158
                                            pRsp->upstreamTaskId);
159
  return code;
×
160
}
161

162
int32_t streamTaskSendCheckpointTriggerMsg(SStreamTask* pTask, int32_t dstTaskId, int32_t downstreamNodeId,
×
163
                                           SRpcHandleInfo* pRpcInfo, int32_t code) {
164
  int32_t  ret = 0;
×
165
  int32_t  tlen = 0;
×
166
  void*    buf = NULL;
×
167
  SEncoder encoder;
168

169
  SCheckpointTriggerRsp req = {.streamId = pTask->id.streamId,
×
170
                               .upstreamTaskId = pTask->id.taskId,
×
171
                               .taskId = dstTaskId,
172
                               .rspCode = code};
173

174
  if (code == TSDB_CODE_SUCCESS) {
×
175
    req.checkpointId = pTask->chkInfo.pActiveInfo->activeId;
×
176
    req.transId = pTask->chkInfo.pActiveInfo->transId;
×
177
  } else {
178
    req.checkpointId = -1;
×
179
    req.transId = -1;
×
180
  }
181

182
  tEncodeSize(tEncodeCheckpointTriggerRsp, &req, tlen, ret);
×
183
  if (ret < 0) {
×
184
    stError("s-task:%s encode checkpoint-trigger rsp msg failed, code:%s", pTask->id.idStr, tstrerror(code));
×
185
    return ret;
×
186
  }
187

188
  buf = rpcMallocCont(tlen + sizeof(SMsgHead));
×
189
  if (buf == NULL) {
×
190
    stError("s-task:%s malloc chkpt-trigger rsp failed for task:0x%x, since out of memory", pTask->id.idStr, dstTaskId);
×
191
    return terrno;
×
192
  }
193

194
  ((SMsgHead*)buf)->vgId = htonl(downstreamNodeId);
×
195
  void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
×
196

197
  tEncoderInit(&encoder, abuf, tlen);
×
198
  if ((ret = tEncodeCheckpointTriggerRsp(&encoder, &req)) < 0) {
×
199
    rpcFreeCont(buf);
×
200
    tEncoderClear(&encoder);
×
201
    stError("encode checkpoint-trigger rsp failed, code:%s", tstrerror(code));
×
202
    return ret;
×
203
  }
204
  tEncoderClear(&encoder);
×
205

206
  SRpcMsg rspMsg = {.code = 0, .pCont = buf, .contLen = tlen + sizeof(SMsgHead), .info = *pRpcInfo};
×
207
  tmsgSendRsp(&rspMsg);
×
208

209
  return ret;
×
210
}
211

212
int32_t continueDispatchCheckpointTriggerBlock(SStreamDataBlock* pBlock, SStreamTask* pTask) {
450✔
213
  pBlock->srcTaskId = pTask->id.taskId;
450✔
214
  pBlock->srcVgId = pTask->pMeta->vgId;
450✔
215

216
  if (pTask->chkInfo.pActiveInfo->dispatchTrigger == true) {
450!
217
    stError("s-task:%s already dispatch checkpoint-trigger, not dispatch again", pTask->id.idStr);
×
218
    return 0;
×
219
  }
220

221
  int32_t code = taosWriteQitem(pTask->outputq.queue->pQueue, pBlock);
450✔
222
  if (code == 0) {
450!
223
    code = streamDispatchStreamBlock(pTask);
450✔
224
  } else {
225
    stError("s-task:%s failed to put checkpoint into outputQ, code:%s", pTask->id.idStr, tstrerror(code));
×
226
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
227
  }
228

229
  return code;
450✔
230
}
231

232
static int32_t doCheckBeforeHandleChkptTrigger(SStreamTask* pTask, int64_t checkpointId, SStreamDataBlock* pBlock,
1,252✔
233
                                               int32_t transId) {
234
  int32_t     code = 0;
1,252✔
235
  int32_t     vgId = pTask->pMeta->vgId;
1,252✔
236
  int32_t     taskLevel = pTask->info.taskLevel;
1,252✔
237
  const char* id = pTask->id.idStr;
1,252✔
238

239
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
1,252✔
240
  if (pTask->chkInfo.checkpointId > checkpointId) {
1,252!
241
    stError("s-task:%s vgId:%d current checkpointId:%" PRId64
×
242
            " recv expired checkpoint-trigger block, checkpointId:%" PRId64 " transId:%d, discard",
243
            id, vgId, pTask->chkInfo.checkpointId, checkpointId, transId);
244
    return TSDB_CODE_STREAM_INVLD_CHKPT;
×
245
  }
246

247
  if (pActiveInfo->failedId >= checkpointId) {
1,252!
248
    stError("s-task:%s vgId:%d checkpointId:%" PRId64 " transId:%d, has been marked failed, failedId:%" PRId64
×
249
            " discard the checkpoint-trigger block",
250
            id, vgId, checkpointId, transId, pActiveInfo->failedId);
251
    return TSDB_CODE_STREAM_INVLD_CHKPT;
×
252
  }
253

254
  if (pTask->chkInfo.checkpointId == checkpointId) {
1,252!
255
    {  // send checkpoint-ready msg to upstream
256
      SRpcMsg                msg = {0};
×
257
      SStreamUpstreamEpInfo* pInfo = NULL;
×
258
      streamTaskGetUpstreamTaskEpInfo(pTask, pBlock->srcTaskId, &pInfo);
×
259
      if (pInfo == NULL) {
×
260
        return TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
261
      }
262

263
      code = initCheckpointReadyMsg(pTask, pInfo->nodeId, pBlock->srcTaskId, pInfo->childId, checkpointId, &msg);
×
264
      if (code == TSDB_CODE_SUCCESS) {
×
265
        code = tmsgSendReq(&pInfo->epSet, &msg);
×
266
        if (code) {
×
267
          stError("s-task:%s vgId:%d failed send chkpt-ready msg to upstream, code:%s", id, vgId, tstrerror(code));
×
268
        }
269
      }
270
    }
271

272
    stWarn(
×
273
        "s-task:%s vgId:%d recv already finished checkpoint-trigger, send checkpoint-ready to upstream:0x%x to resume "
274
        "the interrupted checkpoint",
275
        id, vgId, pBlock->srcTaskId);
276

277
    return TSDB_CODE_STREAM_INVLD_CHKPT;
×
278
  }
279

280
  if (streamTaskGetStatus(pTask).state == TASK_STATUS__CK) {
1,252✔
281
    if (pActiveInfo->activeId != checkpointId) {
821!
282
      stError("s-task:%s vgId:%d active checkpointId:%" PRId64 ", recv invalid checkpoint-trigger checkpointId:%" PRId64
×
283
              " discard",
284
              id, vgId, pActiveInfo->activeId, checkpointId);
285
      return TSDB_CODE_STREAM_INVLD_CHKPT;
×
286
    } else {  // checkpointId == pActiveInfo->activeId
287
      if (pActiveInfo->allUpstreamTriggerRecv == 1) {
821!
288
        stDebug(
×
289
            "s-task:%s vgId:%d all upstream checkpoint-trigger recv, discard this checkpoint-trigger, "
290
            "checkpointId:%" PRId64 " transId:%d",
291
            id, vgId, checkpointId, transId);
292
        return TSDB_CODE_STREAM_INVLD_CHKPT;
×
293
      }
294

295
      if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
821✔
296
        //  check if already recv or not, and duplicated checkpoint-trigger msg recv, discard it
297
        for (int32_t i = 0; i < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++i) {
843✔
298
          STaskCheckpointReadyInfo* p = taosArrayGet(pActiveInfo->pReadyMsgList, i);
450✔
299
          if (p == NULL) {
450!
300
            return TSDB_CODE_INVALID_PARA;
×
301
          }
302

303
          if (p->upstreamTaskId == pBlock->srcTaskId) {
450!
304
            stWarn("s-task:%s repeatly recv checkpoint-trigger msg from task:0x%x vgId:%d, checkpointId:%" PRId64
×
305
                   ", prev recvTs:%" PRId64 " discard",
306
                   pTask->id.idStr, p->upstreamTaskId, p->upstreamNodeId, p->checkpointId, p->recvTs);
307
            return TSDB_CODE_STREAM_INVLD_CHKPT;
×
308
          }
309
        }
310
      }
311
    }
312
  }
313

314
  return TSDB_CODE_SUCCESS;
1,252✔
315
}
316

317
int32_t streamProcessCheckpointTriggerBlock(SStreamTask* pTask, SStreamDataBlock* pBlock) {
1,252✔
318
  int64_t                checkpointId = 0;
1,252✔
319
  int32_t                transId = 0;
1,252✔
320
  const char*            id = pTask->id.idStr;
1,252✔
321
  int32_t                code = TSDB_CODE_SUCCESS;
1,252✔
322
  int32_t                vgId = pTask->pMeta->vgId;
1,252✔
323
  int32_t                taskLevel = pTask->info.taskLevel;
1,252✔
324
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
1,252✔
325

326
  SSDataBlock* pDataBlock = taosArrayGet(pBlock->blocks, 0);
1,252✔
327
  if (pDataBlock == NULL) {
1,252!
328
    return TSDB_CODE_INVALID_PARA;
×
329
  }
330

331
  checkpointId = pDataBlock->info.version;
1,252✔
332
  transId = pDataBlock->info.window.skey;
1,252✔
333

334
  streamMutexLock(&pTask->lock);
1,252✔
335
  code = doCheckBeforeHandleChkptTrigger(pTask, checkpointId, pBlock, transId);
1,252✔
336
  streamMutexUnlock(&pTask->lock);
1,252✔
337
  if (code) {
1,252!
338
    if (taskLevel != TASK_LEVEL__SOURCE) { // the checkpoint-trigger is discard, open the inputQ for upstream tasks
×
339
      streamTaskOpenUpstreamInput(pTask, pBlock->srcTaskId);
×
340
    }
341
    streamFreeQitem((SStreamQueueItem*)pBlock);
×
342
    return code;
×
343
  }
344

345
  stDebug("s-task:%s vgId:%d start to handle the checkpoint-trigger block, checkpointId:%" PRId64 " ver:%" PRId64
1,252✔
346
          ", transId:%d current active checkpointId:%" PRId64,
347
          id, vgId, pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, transId, checkpointId);
348

349
  // set task status
350
  if (streamTaskGetStatus(pTask).state != TASK_STATUS__CK) {
1,252✔
351
    pActiveInfo->activeId = checkpointId;
431✔
352
    pActiveInfo->transId = transId;
431✔
353

354
    if (pTask->chkInfo.startTs == 0) {
431✔
355
      pTask->chkInfo.startTs = taosGetTimestampMs();
430✔
356
      pTask->execInfo.checkpoint += 1;
430✔
357
    }
358

359
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_GEN_CHECKPOINT);
431✔
360
    if (code != TSDB_CODE_SUCCESS) {
431✔
361
      stError("s-task:%s handle checkpoint-trigger block failed, code:%s", id, tstrerror(code));
1!
362
      streamFreeQitem((SStreamQueueItem*)pBlock);
1✔
363
      return code;
1✔
364
    }
365

366
    // if previous launched timer not started yet, not start a new timer
367
    // todo: fix this bug: previous set checkpoint-trigger check tmr is running, while we happen to try to launch
368
    //  a new checkpoint-trigger timer right now.
369
    //  And if we don't start a new timer, and the lost of checkpoint-trigger message may cause the whole checkpoint
370
    //  procedure to be stucked.
371
    SStreamTmrInfo* pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
430✔
372
    int8_t          old = atomic_val_compare_exchange_8(&pTmrInfo->isActive, 0, 1);
430✔
373
    if (old == 0) {
430!
374
      stDebug("s-task:%s start checkpoint-trigger monitor in 10s", pTask->id.idStr);
430✔
375

376
      int64_t* pTaskRefId = NULL;
430✔
377
      code = streamTaskAllocRefId(pTask, &pTaskRefId);
430✔
378
      if (code == 0) {
430!
379
        streamTmrStart(checkpointTriggerMonitorFn, 200, pTaskRefId, streamTimer, &pTmrInfo->tmrHandle, vgId,
430✔
380
                       "trigger-recv-monitor");
381
        pTmrInfo->launchChkptId = pActiveInfo->activeId;
430✔
382
      }
383
    } else {  // already launched, do nothing
384
      stError("s-task:%s previous checkpoint-trigger monitor tmr is set, not start new one", pTask->id.idStr);
×
385
    }
386
  }
387

388
  if (taskLevel == TASK_LEVEL__SOURCE) {
1,251✔
389
    int8_t type = pTask->outputInfo.type;
428✔
390
    pActiveInfo->allUpstreamTriggerRecv = 1;
428✔
391

392
    // We need to transfer state here, before dispatching checkpoint-trigger to downstream tasks.
393
    // The transfer of state may generate new data that need to dispatch to downstream tasks,
394
    // Otherwise, those new generated data by executors that is kept in outputQ, may be lost if this program crashed
395
    // before the next checkpoint.
396
    code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
428✔
397
    if (code) {
428!
398
      streamFreeQitem((SStreamQueueItem*)pBlock);
×
399
      return code;
×
400
    }
401

402
#if 0
403
    chkptFailedByRetrieveReqToSource(pTask, checkpointId);
404
#endif
405

406
    if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
428✔
407
      stDebug("s-task:%s set childIdx:%d, and add checkpoint-trigger block into outputQ", id, pTask->info.selfChildId);
411✔
408
      code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);  // todo handle this failure
411✔
409
    } else {  // only one task exists, no need to dispatch downstream info
410
      code =
411
          appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, pActiveInfo->activeId, pActiveInfo->transId, -1);
17✔
412
      streamFreeQitem((SStreamQueueItem*)pBlock);
17✔
413
    }
414
  } else if (taskLevel == TASK_LEVEL__SINK || taskLevel == TASK_LEVEL__AGG) {
823!
415
    // todo: handle this
416
    // update the child Id for downstream tasks
417
    code = streamAddCheckpointReadyMsg(pTask, pBlock->srcTaskId, pTask->info.selfChildId, checkpointId);
823✔
418

419
    // there are still some upstream tasks not send checkpoint request, do nothing and wait for then
420
    if (pActiveInfo->allUpstreamTriggerRecv != 1) {
823✔
421
      streamFreeQitem((SStreamQueueItem*)pBlock);
395✔
422
      return code;
395✔
423
    }
424

425
    int32_t num = streamTaskGetNumOfUpstream(pTask);
428✔
426
    if (taskLevel == TASK_LEVEL__SINK) {
428✔
427
      stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, send ready msg to upstream", id, num);
389✔
428
      streamFreeQitem((SStreamQueueItem*)pBlock);
389✔
429
      code = streamTaskBuildCheckpoint(pTask);  // todo: not handle error yet
389✔
430
    } else {                                    // source & agg tasks need to forward the checkpoint msg downwards
431
      stDebug("s-task:%s process checkpoint-trigger block, all %d upstreams sent, forwards to downstream", id, num);
39✔
432
      code = flushStateDataInExecutor(pTask, (SStreamQueueItem*)pBlock);
39✔
433
      if (code) {
39!
434
        return code;
×
435
      }
436

437
      // Put the checkpoint-trigger block into outputQ, to make sure all blocks with less version have been handled by
438
      // this task already. And then, dispatch check point msg to all downstream tasks
439
      code = continueDispatchCheckpointTriggerBlock(pBlock, pTask);
39✔
440
    }
441
  }
442

443
  return code;
856✔
444
}
445

446
// only when all downstream tasks are send checkpoint rsp, we can start the checkpoint procedure for the agg task
447
static int32_t processCheckpointReadyHelp(SActiveCheckpointInfo* pInfo, int32_t numOfDownstream,
816✔
448
                                          int32_t downstreamNodeId, int64_t streamId, int32_t downstreamTaskId,
449
                                          const char* id, int32_t* pNotReady, int32_t* pTransId, bool* alreadyRecv) {
450
  *alreadyRecv = false;
816✔
451
  int32_t size = taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
816✔
452
  for (int32_t i = 0; i < size; ++i) {
1,267✔
453
    STaskDownstreamReadyInfo* p = taosArrayGet(pInfo->pCheckpointReadyRecvList, i);
451✔
454
    if (p == NULL) {
451!
455
      return TSDB_CODE_INVALID_PARA;
×
456
    }
457

458
    if (p->downstreamTaskId == downstreamTaskId) {
451!
459
      (*alreadyRecv) = true;
×
460
      break;
×
461
    }
462
  }
463

464
  if (*alreadyRecv) {
816!
465
    stDebug("s-task:%s already recv checkpoint-ready msg from downstream:0x%x, ignore. %d/%d downstream not ready", id,
×
466
            downstreamTaskId, (int32_t)(numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList)),
467
            numOfDownstream);
468
  } else {
469
    STaskDownstreamReadyInfo info = {.recvTs = taosGetTimestampMs(),
816✔
470
                                     .downstreamTaskId = downstreamTaskId,
471
                                     .checkpointId = pInfo->activeId,
816✔
472
                                     .transId = pInfo->transId,
816✔
473
                                     .streamId = streamId,
474
                                     .downstreamNodeId = downstreamNodeId};
475
    void*                    p = taosArrayPush(pInfo->pCheckpointReadyRecvList, &info);
816✔
476
    if (p == NULL) {
816!
477
      stError("s-task:%s failed to set checkpoint ready recv msg, code:%s", id, tstrerror(terrno));
×
478
      return terrno;
×
479
    }
480
  }
481

482
  *pNotReady = numOfDownstream - taosArrayGetSize(pInfo->pCheckpointReadyRecvList);
816✔
483
  *pTransId = pInfo->transId;
816✔
484
  return 0;
816✔
485
}
486

487
/**
488
 * All down stream tasks have successfully completed the check point task.
489
 * Current stream task is allowed to start to do checkpoint things in ASYNC model.
490
 */
491
int32_t streamProcessCheckpointReadyMsg(SStreamTask* pTask, int64_t checkpointId, int32_t downstreamNodeId,
816✔
492
                                        int32_t downstreamTaskId) {
493
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
816✔
494

495
  const char* id = pTask->id.idStr;
816✔
496
  int32_t     total = streamTaskGetNumOfDownstream(pTask);
816✔
497
  int32_t     code = 0;
816✔
498
  int32_t     notReady = 0;
816✔
499
  int32_t     transId = 0;
816✔
500
  bool        alreadyHandled = false;
816✔
501

502
  // 1. not in checkpoint status now
503
  SStreamTaskState pStat = streamTaskGetStatus(pTask);
816✔
504
  if (pStat.state != TASK_STATUS__CK) {
816!
505
    stError("s-task:%s status:%s discard checkpoint-ready msg from task:0x%x", id, pStat.name, downstreamTaskId);
×
506
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
507
  }
508

509
  // 2. expired checkpoint-ready msg, invalid checkpoint-ready msg
510
  if (pTask->chkInfo.checkpointId > checkpointId || pInfo->activeId != checkpointId) {
816!
511
    stError("s-task:%s status:%s checkpointId:%" PRId64 " new arrival checkpoint-ready msg (checkpointId:%" PRId64
×
512
            ") from task:0x%x, expired and discard",
513
            id, pStat.name, pTask->chkInfo.checkpointId, checkpointId, downstreamTaskId);
514
    return TSDB_CODE_INVALID_MSG;
×
515
  }
516

517
  streamMutexLock(&pInfo->lock);
816✔
518
  code = processCheckpointReadyHelp(pInfo, total, downstreamNodeId, pTask->id.streamId, downstreamTaskId, id, &notReady,
816✔
519
                                    &transId, &alreadyHandled);
520
  streamMutexUnlock(&pInfo->lock);
816✔
521

522
  if (alreadyHandled) {
816!
523
    stDebug("s-task:%s checkpoint-ready msg checkpointId:%" PRId64 " from task:0x%x already handled, not handle again",
×
524
            id, checkpointId, downstreamTaskId);
525
  } else {
526
    if ((notReady == 0) && (code == 0) && (!alreadyHandled)) {
816!
527
      stDebug("s-task:%s all downstream tasks have completed build checkpoint, do checkpoint for current task", id);
432✔
528
      code = appendCheckpointIntoInputQ(pTask, STREAM_INPUT__CHECKPOINT, checkpointId, transId, -1);
432✔
529
    }
530
  }
531

532
  return code;
816✔
533
}
534

535
int32_t streamTaskProcessCheckpointReadyRsp(SStreamTask* pTask, int32_t upstreamTaskId, int64_t checkpointId) {
816✔
536
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
816✔
537
  int64_t                now = taosGetTimestampMs();
816✔
538
  int32_t                numOfConfirmed = 0;
816✔
539

540
  streamMutexLock(&pInfo->lock);
816✔
541
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
1,263!
542
    STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
1,263✔
543
    if (pReadyInfo == NULL) {
1,263!
544
      stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
×
545
              pTask->id.idStr, i);
546
      continue;
×
547
    }
548

549
    if (pReadyInfo->upstreamTaskId == upstreamTaskId && pReadyInfo->checkpointId == checkpointId) {
1,263!
550
      pReadyInfo->sendCompleted = 1;
816✔
551
      stDebug("s-task:%s send checkpoint-ready msg to upstream:0x%x confirmed, checkpointId:%" PRId64 " ts:%" PRId64,
816✔
552
              pTask->id.idStr, upstreamTaskId, checkpointId, now);
553
      break;
816✔
554
    }
555
  }
556

557
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pReadyMsgList); ++i) {
2,527✔
558
    STaskCheckpointReadyInfo* pReadyInfo = taosArrayGet(pInfo->pReadyMsgList, i);
1,711✔
559
    if (pReadyInfo == NULL) {
1,711!
560
      stError("s-task:%s invalid index during iterate the checkpoint-ready msg list, index:%d, ignore and continue",
×
561
              pTask->id.idStr, i);
562
      continue;
×
563
    }
564

565
    if (pReadyInfo->sendCompleted == 1) {
1,711✔
566
      numOfConfirmed += 1;
1,263✔
567
    }
568
  }
569

570
  stDebug("s-task:%s send checkpoint-ready msg to %d upstream confirmed, checkpointId:%" PRId64, pTask->id.idStr,
816✔
571
          numOfConfirmed, checkpointId);
572

573
  streamMutexUnlock(&pInfo->lock);
816✔
574
  return TSDB_CODE_SUCCESS;
816✔
575
}
576

577
void streamTaskClearCheckInfo(SStreamTask* pTask, bool clearChkpReadyMsg) {
430✔
578
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
430✔
579

580
  pTask->chkInfo.startTs = 0;             // clear the recorded start time
430✔
581
  streamTaskOpenAllUpstreamInput(pTask);  // open inputQ for all upstream tasks
430✔
582

583
  streamMutexLock(&pInfo->lock);
430✔
584
  streamTaskClearActiveInfo(pInfo);
430✔
585
  if (clearChkpReadyMsg) {
430!
586
    streamClearChkptReadyMsg(pInfo);
430✔
587
  }
588
  streamMutexUnlock(&pInfo->lock);
430✔
589

590
  stDebug("s-task:%s clear active checkpointInfo, failed checkpointId:%" PRId64 ", latest checkpointId:%" PRId64,
430✔
591
          pTask->id.idStr, pInfo->failedId, pTask->chkInfo.checkpointId);
592
}
430✔
593

594
int32_t streamTaskUpdateTaskCheckpointInfo(SStreamTask* pTask, bool restored, SVUpdateCheckpointInfoReq* pReq) {
430✔
595
  SStreamMeta*     pMeta = pTask->pMeta;
430✔
596
  int32_t          vgId = pMeta->vgId;
430✔
597
  int32_t          code = 0;
430✔
598
  const char*      id = pTask->id.idStr;
430✔
599
  SCheckpointInfo* pInfo = &pTask->chkInfo;
430✔
600

601
  streamMutexLock(&pTask->lock);
430✔
602

603
  if (pReq->checkpointId <= pInfo->checkpointId) {
429!
UNCOV
604
    stDebug("s-task:%s vgId:%d latest checkpointId:%" PRId64 " Ver:%" PRId64
×
605
            " no need to update checkpoint info, updated checkpointId:%" PRId64 " Ver:%" PRId64 " transId:%d ignored",
606
            id, vgId, pInfo->checkpointId, pInfo->checkpointVer, pReq->checkpointId, pReq->checkpointVer,
607
            pReq->transId);
UNCOV
608
    streamMutexUnlock(&pTask->lock);
×
609

610
    {  // destroy the related fill-history tasks
611
      // drop task should not in the meta-lock, and drop the related fill-history task now
UNCOV
612
      streamMetaWUnLock(pMeta);
×
UNCOV
613
      if (pReq->dropRelHTask) {
×
UNCOV
614
        code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
×
UNCOV
615
        int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
×
UNCOV
616
        stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped in update checkpointInfo, remain tasks:%d",
×
617
                id, vgId, pReq->taskId, numOfTasks);
618
      }
619

UNCOV
620
      streamMetaWLock(pMeta);
×
UNCOV
621
      if (pReq->dropRelHTask) {
×
UNCOV
622
        code = streamMetaCommit(pMeta);
×
623
      }
624
    }
625

626
    // always return true
UNCOV
627
    return TSDB_CODE_SUCCESS;
×
628
  }
629

630
  SStreamTaskState pStatus = streamTaskGetStatus(pTask);
429✔
631

632
  if (!restored) {  // during restore procedure, do update checkpoint-info
430!
633
    stDebug("s-task:%s vgId:%d status:%s update the checkpoint-info during restore, checkpointId:%" PRId64 "->%" PRId64
×
634
            " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
635
            id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer, pReq->checkpointVer,
636
            pInfo->checkpointTime, pReq->checkpointTs);
637
  } else {  // not in restore status, must be in checkpoint status
638
    if ((pStatus.state == TASK_STATUS__CK) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
430!
639
      stDebug("s-task:%s vgId:%d status:%s start to update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
430✔
640
              " checkpointVer:%" PRId64 "->%" PRId64 " checkpointTs:%" PRId64 "->%" PRId64,
641
              id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
642
              pReq->checkpointVer, pInfo->checkpointTime, pReq->checkpointTs);
643
    } else {
644
      stDebug("s-task:%s vgId:%d status:%s NOT update the checkpoint-info, checkpointId:%" PRId64 "->%" PRId64
×
645
              " checkpointVer:%" PRId64 "->%" PRId64,
646
              id, vgId, pStatus.name, pInfo->checkpointId, pReq->checkpointId, pInfo->checkpointVer,
647
              pReq->checkpointVer);
648
    }
649
  }
650

651
  bool valid = (pInfo->checkpointId <= pReq->checkpointId && pInfo->checkpointVer <= pReq->checkpointVer &&
860!
652
                pInfo->processedVer <= pReq->checkpointVer);
430!
653

654
  if (!valid) {
430!
655
    stFatal("invalid checkpoint id check, current checkpointId:%" PRId64 " checkpointVer:%" PRId64
×
656
            " processedVer:%" PRId64 " req checkpointId:%" PRId64 " checkpointVer:%" PRId64,
657
            pInfo->checkpointId, pInfo->checkpointVer, pInfo->processedVer, pReq->checkpointId, pReq->checkpointVer);
658
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
659
  }
660

661
  // update only it is in checkpoint status, or during restore procedure.
662
  if ((pStatus.state == TASK_STATUS__CK) || (!restored) || (pMeta->role == NODE_ROLE_FOLLOWER)) {
430!
663
    pInfo->checkpointId = pReq->checkpointId;
430✔
664
    pInfo->checkpointVer = pReq->checkpointVer;
430✔
665
    pInfo->checkpointTime = pReq->checkpointTs;
430✔
666

667
    if (restored && (pMeta->role == NODE_ROLE_LEADER)) {
430!
668
      code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
430✔
669
    }
670
  }
671

672
  streamTaskClearCheckInfo(pTask, true);
430✔
673

674
  if (pReq->dropRelHTask) {
430✔
675
    stDebug("s-task:0x%x vgId:%d drop the related fill-history task:0x%" PRIx64 " after update checkpoint",
367✔
676
            pReq->taskId, vgId, pReq->hTaskId);
677
    CLEAR_RELATED_FILLHISTORY_TASK(pTask);
367✔
678
  }
679

680
  stDebug("s-task:0x%x set the persistent status attr to be ready, prev:%s, status in sm:%s", pReq->taskId,
430✔
681
          streamTaskGetStatusStr(pTask->status.taskStatus), streamTaskGetStatus(pTask).name);
682

683
  pTask->status.taskStatus = TASK_STATUS__READY;
430✔
684

685
  code = streamMetaSaveTask(pMeta, pTask);
430✔
686
  streamMutexUnlock(&pTask->lock);
430✔
687

688
  if (code != TSDB_CODE_SUCCESS) {
430!
689
    stError("s-task:%s vgId:%d failed to save task info after do checkpoint, checkpointId:%" PRId64 ", since %s", id,
×
690
            vgId, pReq->checkpointId, terrstr());
691
    return TSDB_CODE_SUCCESS;
×
692
  }
693

694
  streamMetaWUnLock(pMeta);
430✔
695

696
  // drop task should not in the meta-lock, and drop the related fill-history task now
697
  if (pReq->dropRelHTask) {
430✔
698
    code = streamMetaUnregisterTask(pMeta, pReq->hStreamId, pReq->hTaskId);
367✔
699
    int32_t numOfTasks = streamMetaGetNumOfTasks(pMeta);
367✔
700
    stDebug("s-task:%s vgId:%d related fill-history task:0x%x dropped, remain tasks:%d", id, vgId,
367✔
701
            (int32_t)pReq->hTaskId, numOfTasks);
702
  }
703

704
  streamMetaWLock(pMeta);
430✔
705
  code = streamMetaCommit(pMeta);
430✔
706

707
  return TSDB_CODE_SUCCESS;
430✔
708
}
709

710
void streamTaskSetFailedCheckpointId(SStreamTask* pTask, int64_t failedId) {
×
711
  struct SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
712

713
  if (failedId <= 0) {
×
714
    stWarn("s-task:%s failedId is 0, not update the failed checkpoint info, current failedId:%" PRId64
×
715
           " activeId:%" PRId64,
716
           pTask->id.idStr, pInfo->failedId, pInfo->activeId);
717
  } else {
718
    if (failedId <= pInfo->failedId) {
×
719
      stDebug("s-task:%s failedId:%" PRId64 " not update to:%" PRId64, pTask->id.idStr, pInfo->failedId, failedId);
×
720
    } else {
721
      stDebug("s-task:%s mark and set the failed checkpointId:%" PRId64 " (transId:%d) activeId:%" PRId64
×
722
              " prev failedId:%" PRId64,
723
              pTask->id.idStr, failedId, pInfo->transId, pInfo->activeId, pInfo->failedId);
724
      pInfo->failedId = failedId;
×
725
    }
726
  }
727
}
×
728

729
void streamTaskSetCheckpointFailed(SStreamTask* pTask) {
299✔
730
  streamMutexLock(&pTask->lock);
299✔
731
  ETaskStatus status = streamTaskGetStatus(pTask).state;
300✔
732
  if (status == TASK_STATUS__CK) {
300!
733
    streamTaskSetFailedCheckpointId(pTask, pTask->chkInfo.pActiveInfo->activeId);
×
734
  }
735
  streamMutexUnlock(&pTask->lock);
300✔
736
}
300✔
737

738
static int32_t getCheckpointDataMeta(const char* id, const char* path, SArray* list) {
×
739
  int32_t code = 0;
×
740
  int32_t cap = strlen(path) + 64;
×
741

742
  char* filePath = taosMemoryCalloc(1, cap);
×
743
  if (filePath == NULL) {
×
744
    return terrno;
×
745
  }
746

747
  int32_t nBytes = snprintf(filePath, cap, "%s%s%s", path, TD_DIRSEP, "META_TMP");
×
748
  if (nBytes <= 0 || nBytes >= cap) {
×
749
    taosMemoryFree(filePath);
×
750
    return TSDB_CODE_OUT_OF_RANGE;
×
751
  }
752

753
  code = downloadCheckpointDataByName(id, "META", filePath);
×
754
  if (code != 0) {
×
755
    stError("%s chkp failed to download meta file:%s", id, filePath);
×
756
    taosMemoryFree(filePath);
×
757
    return code;
×
758
  }
759

760
  code = remoteChkpGetDelFile(filePath, list);
×
761
  if (code != 0) {
×
762
    stError("%s chkp failed to get to del:%s", id, filePath);
×
763
    taosMemoryFree(filePath);
×
764
  }
765
  return 0;
×
766
}
767

768
int32_t uploadCheckpointData(SStreamTask* pTask, int64_t checkpointId, int64_t dbRefId, ECHECKPOINT_BACKUP_TYPE type) {
×
769
  int32_t code = 0;
×
770
  char*   path = NULL;
×
771

772
  SStreamMeta* pMeta = pTask->pMeta;
×
773
  const char*  idStr = pTask->id.idStr;
×
774
  int64_t      now = taosGetTimestampMs();
×
775

776
  SArray* toDelFiles = taosArrayInit(4, POINTER_BYTES);
×
777
  if (toDelFiles == NULL) {
×
778
    return terrno;
×
779
  }
780

781
  if ((code = taskDbGenChkpUploadData(pTask->pBackend, pMeta->bkdChkptMgt, checkpointId, type, &path, toDelFiles,
×
782
                                      pTask->id.idStr)) != 0) {
783
    stError("s-task:%s failed to gen upload checkpoint:%" PRId64 ", reason:%s", idStr, checkpointId, tstrerror(code));
×
784
  }
785

786
  if (type == DATA_UPLOAD_S3) {
×
787
    if (code == TSDB_CODE_SUCCESS && (code = getCheckpointDataMeta(idStr, path, toDelFiles)) != 0) {
×
788
      stError("s-task:%s failed to get checkpointData for checkpointId:%" PRId64 ", reason:%s", idStr, checkpointId,
×
789
              tstrerror(code));
790
    }
791
  }
792

793
  if (code == TSDB_CODE_SUCCESS) {
×
794
    code = streamTaskUploadCheckpoint(idStr, path, checkpointId);
×
795
    if (code == TSDB_CODE_SUCCESS) {
×
796
      stDebug("s-task:%s upload checkpointId:%" PRId64 " to remote succ", idStr, checkpointId);
×
797
    } else {
798
      stError("s-task:%s failed to upload checkpointId:%" PRId64 " path:%s,reason:%s", idStr, checkpointId, path,
×
799
              tstrerror(code));
800
    }
801
  }
802

803
  if (code == TSDB_CODE_SUCCESS) {
×
804
    int32_t size = taosArrayGetSize(toDelFiles);
×
805
    stDebug("s-task:%s remove redundant %d files", idStr, size);
×
806

807
    for (int i = 0; i < size; i++) {
×
808
      char* pName = taosArrayGetP(toDelFiles, i);
×
809
      code = deleteCheckpointFile(idStr, pName);
×
810
      if (code != 0) {
×
811
        stDebug("s-task:%s failed to remove file: %s", idStr, pName);
×
812
        break;
×
813
      }
814
    }
815

816
    stDebug("s-task:%s remove redundant files in uploading checkpointId:%" PRId64 " data", idStr, checkpointId);
×
817
  }
818

819
  taosArrayDestroyP(toDelFiles, taosMemoryFree);
×
820
  double el = (taosGetTimestampMs() - now) / 1000.0;
×
821

822
  if (code == TSDB_CODE_SUCCESS) {
×
823
    stDebug("s-task:%s complete update checkpointId:%" PRId64 ", elapsed time:%.2fs remove local checkpoint data %s",
×
824
            idStr, checkpointId, el, path);
825
    taosRemoveDir(path);
×
826
  } else {
827
    stDebug("s-task:%s failed to upload checkpointId:%" PRId64 " keep local checkpoint data, elapsed time:%.2fs", idStr,
×
828
            checkpointId, el);
829
  }
830

831
  taosMemoryFree(path);
×
832
  return code;
×
833
}
834

835
int32_t streamTaskRemoteBackupCheckpoint(SStreamTask* pTask, int64_t checkpointId) {
847✔
836
  ECHECKPOINT_BACKUP_TYPE type = streamGetCheckpointBackupType();
847✔
837
  if (type == DATA_UPLOAD_DISABLE) {
847!
838
    stDebug("s-task:%s not allowed to upload checkpoint data", pTask->id.idStr);
847✔
839
    return 0;
847✔
840
  }
841

842
  if (pTask == NULL || pTask->pBackend == NULL) {
×
843
    return 0;
×
844
  }
845

846
  int64_t dbRefId = taskGetDBRef(pTask->pBackend);
×
847
  void*   pBackend = taskAcquireDb(dbRefId);
×
848
  if (pBackend == NULL) {
×
849
    stError("s-task:%s failed to acquire db during update checkpoint data, failed to upload checkpointData",
×
850
            pTask->id.idStr);
851
    return -1;
×
852
  }
853

854
  int32_t code = uploadCheckpointData(pTask, checkpointId, taskGetDBRef(pTask->pBackend), type);
×
855
  taskReleaseDb(dbRefId);
×
856

857
  return code;
×
858
}
859

860
int32_t streamTaskBuildCheckpoint(SStreamTask* pTask) {
847✔
861
  int32_t      code = TSDB_CODE_SUCCESS;
847✔
862
  int64_t      startTs = pTask->chkInfo.startTs;
847✔
863
  int64_t      ckId = pTask->chkInfo.pActiveInfo->activeId;
847✔
864
  const char*  id = pTask->id.idStr;
847✔
865
  bool         dropRelHTask = (streamTaskGetPrevStatus(pTask) == TASK_STATUS__HALT);
847✔
866
  SStreamMeta* pMeta = pTask->pMeta;
847✔
867

868
  // sink task does not need to save the status, and generated the checkpoint
869
  if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
847✔
870
    stDebug("s-task:%s level:%d start gen checkpoint, checkpointId:%" PRId64, id, pTask->info.taskLevel, ckId);
458✔
871

872
    int64_t ver = pTask->chkInfo.processedVer;
458✔
873
    code = streamBackendDoCheckpoint(pTask->pBackend, ckId, ver);
458✔
874
    if (code != TSDB_CODE_SUCCESS) {
458!
875
      stError("s-task:%s gen checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(terrno));
×
876
    }
877
  }
878

879
  // TODO: monitoring the checkpoint-source msg
880
  // send check point response to upstream task
881
  if (code == TSDB_CODE_SUCCESS) {
847!
882
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
847✔
883
      code = streamTaskSendCheckpointSourceRsp(pTask);
404✔
884
    } else {
885
      code = streamTaskSendCheckpointReadyMsg(pTask);
443✔
886
    }
887

888
    if (code != TSDB_CODE_SUCCESS) {
847!
889
      // todo: let's retry send rsp to mnode, checkpoint-ready has monitor now
890
      stError("s-task:%s failed to send checkpoint rsp to upstream, checkpointId:%" PRId64 ", code:%s", id, ckId,
×
891
              tstrerror(code));
892
    }
893
  }
894

895
  if (code == TSDB_CODE_SUCCESS) {
847!
896
    code = streamTaskRemoteBackupCheckpoint(pTask, ckId);
847✔
897
    if (code != TSDB_CODE_SUCCESS) {
847!
898
      stError("s-task:%s upload checkpointId:%" PRId64 " data failed, code:%s", id, ckId, tstrerror(code));
×
899
    }
900
  } else {
901
    stError("s-task:%s taskInfo failed, checkpoint:%" PRId64 " failed, code:%s", id, ckId, tstrerror(code));
×
902
  }
903

904
  // TODO: monitoring the checkpoint-report msg
905
  // update the latest checkpoint info if all works are done successfully, for rsma, the pMsgCb is null.
906
  if (code == TSDB_CODE_SUCCESS) {
847!
907
    if (pTask->pMsgCb != NULL) {
847✔
908
      code = streamSendChkptReportMsg(pTask, &pTask->chkInfo, dropRelHTask);
832✔
909
    }
910
  } else {  // clear the checkpoint info if failed
911
    // set failed checkpoint id before clear the checkpoint info
912
    streamMutexLock(&pTask->lock);
×
913
    streamTaskSetFailedCheckpointId(pTask, ckId);
×
914
    streamMutexUnlock(&pTask->lock);
×
915

916
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_CHECKPOINT_DONE);
×
917
    stDebug("s-task:%s clear checkpoint flag since gen checkpoint failed, checkpointId:%" PRId64, id, ckId);
×
918
  }
919

920
  double el = (taosGetTimestampMs() - startTs) / 1000.0;
847✔
921
  stInfo("s-task:%s vgId:%d level:%d, checkpointId:%" PRId64 " ver:%" PRId64 " elapsed time:%.2fs, %s ", id,
847!
922
         pMeta->vgId, pTask->info.taskLevel, ckId, pTask->chkInfo.checkpointVer, el,
923
         (code == TSDB_CODE_SUCCESS) ? "succ" : "failed");
924

925
  return code;
847✔
926
}
927

928
static int32_t doChkptStatusCheck(SStreamTask* pTask, void* param) {
1✔
929
  const char*            id = pTask->id.idStr;
1✔
930
  int32_t                vgId = pTask->pMeta->vgId;
1✔
931
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
1✔
932
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
1✔
933

934
  // checkpoint-trigger recv flag is set, quit
935
  if (pActiveInfo->allUpstreamTriggerRecv) {
1!
UNCOV
936
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
UNCOV
937
    stDebug("s-task:%s vgId:%d all checkpoint-trigger recv, quit from monitor checkpoint-trigger", id, vgId);
×
UNCOV
938
    return -1;
×
939
  }
940

941
  if (pTmrInfo->launchChkptId != pActiveInfo->activeId) {
1!
942
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
943
    stWarn("s-task:%s vgId:%d checkpoint-trigger retrieve by previous checkpoint procedure, checkpointId:%" PRId64
×
944
           ", quit",
945
           id, vgId, pTmrInfo->launchChkptId);
946
    return -1;
×
947
  }
948

949
  // active checkpoint info is cleared for now
950
  if ((pActiveInfo->activeId == 0) || (pActiveInfo->transId == 0) || (pTask->chkInfo.startTs == 0)) {
1!
951
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
952
    stWarn("s-task:%s vgId:%d active checkpoint may be cleared, quit from retrieve checkpoint-trigger send tmr", id,
×
953
           vgId);
954
    return -1;
×
955
  }
956

957
  return 0;
1✔
958
}
959

960
static int32_t doFindNotSendUpstream(SStreamTask* pTask, SArray* pList, SArray** ppNotSendList) {
1✔
961
  const char*            id = pTask->id.idStr;
1✔
962
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
1✔
963

964
  SArray* pNotSendList = taosArrayInit(4, sizeof(SStreamUpstreamEpInfo));
1✔
965
  if (pNotSendList == NULL) {
1!
966
    stDebug("s-task:%s start to triggerMonitor, reason:%s", id, tstrerror(terrno));
×
967
    return terrno;
×
968
  }
969

970
  for (int32_t i = 0; i < taosArrayGetSize(pList); ++i) {
3✔
971
    SStreamUpstreamEpInfo* pInfo = taosArrayGetP(pList, i);
2✔
972

973
    bool recved = false;
2✔
974
    for (int32_t j = 0; j < taosArrayGetSize(pActiveInfo->pReadyMsgList); ++j) {
3✔
975
      STaskCheckpointReadyInfo* pReady = taosArrayGet(pActiveInfo->pReadyMsgList, j);
2✔
976
      if (pReady == NULL) {
2!
977
        continue;
×
978
      }
979

980
      if (pInfo->nodeId == pReady->upstreamNodeId) {
2✔
981
        recved = true;
1✔
982
        break;
1✔
983
      }
984
    }
985

986
    if (!recved) {  // make sure the inputQ is opened for not recv upstream checkpoint-trigger message
2✔
987
      streamTaskOpenUpstreamInput(pTask, pInfo->taskId);
1✔
988
      void* px = taosArrayPush(pNotSendList, pInfo);
1✔
989
      if (px == NULL) {
1!
990
        stError("s-task:%s failed to record not send info, code: out of memory", id);
×
991
        taosArrayDestroy(pNotSendList);
×
992
        return terrno;
×
993
      }
994
    }
995
  }
996

997
  *ppNotSendList = pNotSendList;
1✔
998
  return 0;
1✔
999
}
1000

1001
static int32_t chkptTriggerRecvMonitorHelper(SStreamTask* pTask, void* param, SArray* pNotSendList) {
1✔
1002
  const char*            id = pTask->id.idStr;
1✔
1003
  SArray*                pList = pTask->upstreamInfo.pList;  // send msg to retrieve checkpoint trigger msg
1✔
1004
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
1✔
1005
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
1✔
1006
  int32_t                vgId = pTask->pMeta->vgId;
1✔
1007

1008
  int32_t code = doChkptStatusCheck(pTask, param);
1✔
1009
  if (code) {
1!
UNCOV
1010
    return code;
×
1011
  }
1012

1013
  code = doFindNotSendUpstream(pTask, pList, &pNotSendList);
1✔
1014
  if (code) {
1!
1015
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1016
    stDebug("s-task:%s failed to find not send upstream, code:%s, out of tmr", id, tstrerror(code));
×
1017
    return code;
×
1018
  }
1019

1020
  // do send retrieve checkpoint trigger msg to upstream
1021
  code = doSendRetrieveTriggerMsg(pTask, pNotSendList);
1✔
1022
  if (code) {
1!
1023
    stError("s-task:%s vgId:%d failed to retrieve trigger msg, code:%s", pTask->id.idStr, vgId, tstrerror(code));
×
1024
    code = 0;
×
1025
  }
1026

1027
  return code;
1✔
1028
}
1029

1030
static void doCleanup(SStreamTask* pTask, SArray* pList) {
9,942✔
1031
  streamMetaReleaseTask(pTask->pMeta, pTask);
9,942✔
1032
  taosArrayDestroy(pList);
9,942✔
1033
}
9,942✔
1034

1035
void checkpointTriggerMonitorFn(void* param, void* tmrId) {
9,942✔
1036
  int32_t      code = 0;
9,942✔
1037
  int32_t      numOfNotSend = 0;
9,942✔
1038
  SArray*      pNotSendList = NULL;
9,942✔
1039
  int64_t      taskRefId = *(int64_t*)param;
9,942✔
1040
  int64_t      now = taosGetTimestampMs();
9,942✔
1041

1042
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
9,942✔
1043
  if (pTask == NULL) {
9,942!
UNCOV
1044
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
×
UNCOV
1045
    streamTaskFreeRefId(param);
×
1046
    return;
9,941✔
1047
  }
1048

1049
  int32_t                vgId = pTask->pMeta->vgId;
9,942✔
1050
  const char*            id = pTask->id.idStr;
9,942✔
1051
  SArray*                pList = pTask->upstreamInfo.pList;  // send msg to retrieve checkpoint trigger msg
9,942✔
1052
  SActiveCheckpointInfo* pActiveInfo = pTask->chkInfo.pActiveInfo;
9,942✔
1053
  SStreamTmrInfo*        pTmrInfo = &pActiveInfo->chkptTriggerMsgTmr;
9,942✔
1054

1055
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
9,942!
1056
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
1057
    stError("s-task:%s source task should not start the checkpoint-trigger monitor fn, quit", id);
×
1058
    doCleanup(pTask, pNotSendList);
×
1059
    return;
×
1060
  }
1061

1062
  // check the status every 100ms
1063
  if (streamTaskShouldStop(pTask)) {
9,942!
UNCOV
1064
    streamCleanBeforeQuitTmr(pTmrInfo, param);
×
UNCOV
1065
    stDebug("s-task:%s vgId:%d quit from monitor checkpoint-trigger", id, vgId);
×
UNCOV
1066
    doCleanup(pTask, pNotSendList);
×
UNCOV
1067
    return;
×
1068
  }
1069

1070
  if (++pTmrInfo->activeCounter < 50) {
9,942✔
1071
    streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
9,829✔
1072
                   "trigger-recv-monitor");
1073
    doCleanup(pTask, pNotSendList);
9,829✔
1074
    return;
9,829✔
1075
  }
1076

1077
  pTmrInfo->activeCounter = 0;
113✔
1078
  stDebug("s-task:%s vgId:%d checkpoint-trigger monitor in tmr, ts:%" PRId64, id, vgId, now);
113✔
1079

1080
  streamMutexLock(&pTask->lock);
113✔
1081
  SStreamTaskState state = streamTaskGetStatus(pTask);
113✔
1082
  streamMutexUnlock(&pTask->lock);
113✔
1083

1084
  if (state.state != TASK_STATUS__CK) {
113✔
1085
    streamCleanBeforeQuitTmr(pTmrInfo, param);
112✔
1086
    stDebug("s-task:%s vgId:%d status:%s not in checkpoint status, quit from monitor checkpoint-trigger", id,
112✔
1087
            vgId, state.name);
1088
    doCleanup(pTask, pNotSendList);
112✔
1089
    return;
112✔
1090
  }
1091

1092
  streamMutexLock(&pActiveInfo->lock);
1✔
1093
  code = chkptTriggerRecvMonitorHelper(pTask, param, pNotSendList);
1✔
1094
  streamMutexUnlock(&pActiveInfo->lock);
1✔
1095

1096
  if (code != TSDB_CODE_SUCCESS) {
1!
UNCOV
1097
    doCleanup(pTask, pNotSendList);
×
UNCOV
1098
    return;
×
1099
  }
1100

1101
  // check every 100ms
1102
  numOfNotSend = taosArrayGetSize(pNotSendList);
1✔
1103
  if (numOfNotSend > 0) {
1!
1104
    stDebug("s-task:%s start to monitor checkpoint-trigger in 10s", id);
×
1105
    streamTmrStart(checkpointTriggerMonitorFn, 200, param, streamTimer, &pTmrInfo->tmrHandle, vgId,
×
1106
                   "trigger-recv-monitor");
1107
  } else {
1108
    streamCleanBeforeQuitTmr(pTmrInfo, param);
1✔
1109
    stDebug("s-task:%s all checkpoint-trigger recved, quit from monitor checkpoint-trigger tmr", id);
1!
1110
  }
1111

1112
  doCleanup(pTask, pNotSendList);
1✔
1113
}
1114

1115
int32_t doSendRetrieveTriggerMsg(SStreamTask* pTask, SArray* pNotSendList) {
1✔
1116
  int32_t     code = 0;
1✔
1117
  int32_t     vgId = pTask->pMeta->vgId;
1✔
1118
  const char* pId = pTask->id.idStr;
1✔
1119
  int32_t     size = taosArrayGetSize(pNotSendList);
1✔
1120
  int32_t     numOfUpstream = streamTaskGetNumOfUpstream(pTask);
1✔
1121
  int64_t     checkpointId = pTask->chkInfo.pActiveInfo->activeId;
1✔
1122

1123
  if (size <= 0) {
1!
1124
    stDebug("s-task:%s all upstream checkpoint trigger recved, no need to send retrieve", pId);
×
1125
    return code;
×
1126
  }
1127

1128
  stDebug("s-task:%s %d/%d not recv checkpoint-trigger from upstream(s), start to send trigger-retrieve", pId, size,
1!
1129
          numOfUpstream);
1130

1131
  for (int32_t i = 0; i < size; i++) {
2✔
1132
    SStreamUpstreamEpInfo* pUpstreamTask = taosArrayGet(pNotSendList, i);
1✔
1133
    if (pUpstreamTask == NULL) {
1!
1134
      return TSDB_CODE_INVALID_PARA;
×
1135
    }
1136

1137
    int32_t  ret = 0;
1✔
1138
    int32_t  tlen = 0;
1✔
1139
    void*    buf = NULL;
1✔
1140
    SRpcMsg  rpcMsg = {0};
1✔
1141
    SEncoder encoder;
1142

1143
    SRetrieveChkptTriggerReq req = {.streamId = pTask->id.streamId,
1✔
1144
                                    .downstreamTaskId = pTask->id.taskId,
1✔
1145
                                    .downstreamNodeId = vgId,
1146
                                    .upstreamTaskId = pUpstreamTask->taskId,
1✔
1147
                                    .upstreamNodeId = pUpstreamTask->nodeId,
1✔
1148
                                    .checkpointId = checkpointId};
1149

1150
    tEncodeSize(tEncodeRetrieveChkptTriggerReq, &req, tlen, ret);
1!
1151
    if (ret < 0) {
1!
1152
      stError("encode retrieve checkpoint-trigger msg failed, code:%s", tstrerror(code));
×
1153
    }
1154

1155
    buf = rpcMallocCont(tlen + sizeof(SMsgHead));
1✔
1156
    if (buf == NULL) {
1!
1157
      stError("vgId:%d failed to create retrieve checkpoint-trigger msg for task:%s exec, code:out of memory", vgId, pId);
×
1158
      continue;
×
1159
    }
1160

1161
    ((SRetrieveChkptTriggerReq*)buf)->head.vgId = htonl(pUpstreamTask->nodeId);
1✔
1162
    void* abuf = POINTER_SHIFT(buf, sizeof(SMsgHead));
1✔
1163

1164
    tEncoderInit(&encoder, abuf, tlen);
1✔
1165
    if ((code = tEncodeRetrieveChkptTriggerReq(&encoder, &req)) < 0) {
1!
1166
      rpcFreeCont(buf);
×
1167
      tEncoderClear(&encoder);
×
1168
      stError("encode retrieve checkpoint-trigger req failed, code:%s", tstrerror(code));
×
1169
      continue;
×
1170
    }
1171
    tEncoderClear(&encoder);
1✔
1172

1173
    initRpcMsg(&rpcMsg, TDMT_STREAM_RETRIEVE_TRIGGER, buf, tlen + sizeof(SMsgHead));
1✔
1174

1175
    code = tmsgSendReq(&pUpstreamTask->epSet, &rpcMsg);
1✔
1176
    if (code == TSDB_CODE_SUCCESS) {
1!
1177
      stDebug("s-task:%s vgId:%d send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64, pId,
1!
1178
              vgId, pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
1179
    } else {
1180
      stError("s-task:%s vgId:%d failed to send checkpoint-trigger retrieve msg to 0x%x(vgId:%d) checkpointId:%" PRId64,
×
1181
              pId, vgId, pUpstreamTask->taskId, pUpstreamTask->nodeId, checkpointId);
1182
    }
1183
  }
1184

1185
  return code;
1✔
1186
}
1187

1188
static int32_t isAlreadySendTriggerNoLock(SStreamTask* pTask, int32_t downstreamNodeId) {
×
1189
  int64_t                now = taosGetTimestampMs();
×
1190
  const char*            id = pTask->id.idStr;
×
1191
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
1192
  SStreamTaskState       pStatus = streamTaskGetStatus(pTask);
×
1193

1194
  if (!pInfo->dispatchTrigger) {
×
1195
    return false;
×
1196
  }
1197

1198
  int32_t num = taosArrayGetSize(pInfo->pDispatchTriggerList);
×
1199
  for (int32_t i = 0; i < num; ++i) {
×
1200
    STaskTriggerSendInfo* pSendInfo = taosArrayGet(pInfo->pDispatchTriggerList, i);
×
1201
    if (pSendInfo == NULL) {
×
1202
      stError("s-task:%s invalid index in dispatch-trigger list, index:%d, size:%d, ignore and continue", id, i, num);
×
1203
      continue;
×
1204
    }
1205

1206
    if (pSendInfo->nodeId != downstreamNodeId) {
×
1207
      continue;
×
1208
    }
1209

1210
    // has send trigger msg to downstream node,
1211
    double before = (now - pSendInfo->sendTs) / 1000.0;
×
1212
    if (pSendInfo->recved) {
×
1213
      stWarn("s-task:%s checkpoint-trigger msg already send at:%" PRId64
×
1214
             "(%.2fs before) and recv confirmed by downstream:0x%x, checkpointId:%" PRId64 ", transId:%d",
1215
             id, pSendInfo->sendTs, before, pSendInfo->taskId, pInfo->activeId, pInfo->transId);
1216
    } else {
1217
      stWarn("s-task:%s checkpoint-trigger already send at:%" PRId64 "(%.2fs before), checkpointId:%" PRId64
×
1218
             ", transId:%d",
1219
             id, pSendInfo->sendTs, before, pInfo->activeId, pInfo->transId);
1220
    }
1221

1222
    return true;
×
1223
  }
1224

1225
  return false;
×
1226
}
1227

1228
bool streamTaskAlreadySendTrigger(SStreamTask* pTask, int32_t downstreamNodeId) {
×
1229
  int64_t                now = taosGetTimestampMs();
×
1230
  const char*            id = pTask->id.idStr;
×
1231
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
×
1232
  SStreamTaskState       pStatus = streamTaskGetStatus(pTask);
×
1233

1234
  if (pStatus.state != TASK_STATUS__CK) {
×
1235
    return false;
×
1236
  }
1237

1238
  streamMutexLock(&pInfo->lock);
×
1239
  bool send = isAlreadySendTriggerNoLock(pTask, downstreamNodeId);
×
1240
  streamMutexUnlock(&pInfo->lock);
×
1241

1242
  return send;
×
1243
}
1244

1245
void streamTaskGetTriggerRecvStatus(SStreamTask* pTask, int32_t* pRecved, int32_t* pTotal) {
×
1246
  *pRecved = taosArrayGetSize(pTask->chkInfo.pActiveInfo->pReadyMsgList);
×
1247

1248
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
×
1249
    *pTotal = 1;
×
1250
  } else {
1251
    *pTotal = streamTaskGetNumOfUpstream(pTask);
×
1252
  }
1253
}
×
1254

1255
// record the dispatch checkpoint trigger info in the list
1256
// memory insufficient may cause the stream computing stopped
1257
int32_t streamTaskInitTriggerDispatchInfo(SStreamTask* pTask) {
449✔
1258
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
449✔
1259
  int64_t                now = taosGetTimestampMs();
449✔
1260
  int32_t                code = 0;
449✔
1261

1262
  streamMutexLock(&pInfo->lock);
449✔
1263

1264
  pInfo->dispatchTrigger = true;
449✔
1265
  if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
449✔
1266
    STaskDispatcherFixed* pDispatch = &pTask->outputInfo.fixedDispatcher;
96✔
1267

1268
    STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pDispatch->nodeId, .taskId = pDispatch->taskId};
96✔
1269
    void*                px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
96✔
1270
    if (px == NULL) {  // pause the stream task, if memory not enough
96!
1271
      code = terrno;
×
1272
    }
1273
  } else {
1274
    for (int32_t i = 0; i < streamTaskGetNumOfDownstream(pTask); ++i) {
1,102✔
1275
      SVgroupInfo* pVgInfo = taosArrayGet(pTask->outputInfo.shuffleDispatcher.dbInfo.pVgroupInfos, i);
749✔
1276
      if (pVgInfo == NULL) {
749!
1277
        continue;
×
1278
      }
1279

1280
      STaskTriggerSendInfo p = {.sendTs = now, .recved = false, .nodeId = pVgInfo->vgId, .taskId = pVgInfo->taskId};
749✔
1281
      void*                px = taosArrayPush(pInfo->pDispatchTriggerList, &p);
749✔
1282
      if (px == NULL) {  // pause the stream task, if memory not enough
749!
1283
        code = terrno;
×
1284
        break;
×
1285
      }
1286
    }
1287
  }
1288

1289
  streamMutexUnlock(&pInfo->lock);
449✔
1290

1291
  return code;
449✔
1292
}
1293

1294
int32_t streamTaskGetNumOfConfirmed(SActiveCheckpointInfo* pInfo) {
823✔
1295
  int32_t num = 0;
823✔
1296
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
2,558✔
1297
    STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
1,735✔
1298
    if (p == NULL) {
1,735!
1299
      continue;
×
1300
    }
1301

1302
    if (p->recved) {
1,735✔
1303
      num++;
1,279✔
1304
    }
1305
  }
1306
  return num;
823✔
1307
}
1308

1309
void streamTaskSetTriggerDispatchConfirmed(SStreamTask* pTask, int32_t vgId) {
823✔
1310
  SActiveCheckpointInfo* pInfo = pTask->chkInfo.pActiveInfo;
823✔
1311

1312
  int64_t now = taosGetTimestampMs();
823✔
1313
  int32_t taskId = 0;
823✔
1314
  int32_t total = streamTaskGetNumOfDownstream(pTask);
823✔
1315
  bool    alreadyRecv = false;
823✔
1316

1317
  streamMutexLock(&pInfo->lock);
823✔
1318

1319
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pDispatchTriggerList); ++i) {
1,279!
1320
    STaskTriggerSendInfo* p = taosArrayGet(pInfo->pDispatchTriggerList, i);
1,279✔
1321
    if (p == NULL) {
1,279!
1322
      continue;
×
1323
    }
1324

1325
    if (p->nodeId == vgId) {
1,279✔
1326
      if (p->recved) {
823!
1327
        stWarn("s-task:%s already recv checkpoint-trigger msg rsp from vgId:%d down:0x%x %.2fs ago, req send:%" PRId64
×
1328
               " discard",
1329
               pTask->id.idStr, vgId, p->taskId, (now - p->recvTs) / 1000.0, p->sendTs);
1330
        alreadyRecv = true;
×
1331
      } else {
1332
        p->recved = true;
823✔
1333
        p->recvTs = taosGetTimestampMs();
823✔
1334
        taskId = p->taskId;
823✔
1335
      }
1336
      break;
823✔
1337
    }
1338
  }
1339

1340
  int32_t numOfConfirmed = streamTaskGetNumOfConfirmed(pInfo);
823✔
1341
  streamMutexUnlock(&pInfo->lock);
823✔
1342

1343
  if (taskId == 0) {
823!
1344
    stError("s-task:%s recv invalid trigger-dispatch confirm, vgId:%d", pTask->id.idStr, vgId);
×
1345
  } else {
1346
    if (!alreadyRecv) {
823!
1347
      stDebug("s-task:%s set downstream:0x%x(vgId:%d) checkpoint-trigger dispatch confirmed, total confirmed:%d/%d",
823✔
1348
              pTask->id.idStr, taskId, vgId, numOfConfirmed, total);
1349
    }
1350
  }
1351
}
823✔
1352

1353
static int32_t uploadCheckpointToS3(const char* id, const char* path) {
×
1354
  int32_t code = 0;
×
1355
  int32_t nBytes = 0;
×
1356
  /*
1357
  if (s3Init() != 0) {
1358
    return TSDB_CODE_THIRDPARTY_ERROR;
1359
  }
1360
  */
1361
  TdDirPtr pDir = taosOpenDir(path);
×
1362
  if (pDir == NULL) {
×
1363
    return terrno;
×
1364
  }
1365

1366
  TdDirEntryPtr de = NULL;
×
1367
  while ((de = taosReadDir(pDir)) != NULL) {
×
1368
    char* name = taosGetDirEntryName(de);
×
1369
    if (strcmp(name, ".") == 0 || strcmp(name, "..") == 0 || taosDirEntryIsDir(de)) continue;
×
1370

1371
    char filename[PATH_MAX] = {0};
×
1372
    if (path[strlen(path) - 1] == TD_DIRSEP_CHAR) {
×
1373
      nBytes = snprintf(filename, sizeof(filename), "%s%s", path, name);
×
1374
      if (nBytes <= 0 || nBytes >= sizeof(filename)) {
×
1375
        code = TSDB_CODE_OUT_OF_RANGE;
×
1376
        break;
×
1377
      }
1378
    } else {
1379
      nBytes = snprintf(filename, sizeof(filename), "%s%s%s", path, TD_DIRSEP, name);
×
1380
      if (nBytes <= 0 || nBytes >= sizeof(filename)) {
×
1381
        code = TSDB_CODE_OUT_OF_RANGE;
×
1382
        break;
×
1383
      }
1384
    }
1385

1386
    char object[PATH_MAX] = {0};
×
1387
    nBytes = snprintf(object, sizeof(object), "%s%s%s", id, TD_DIRSEP, name);
×
1388
    if (nBytes <= 0 || nBytes >= sizeof(object)) {
×
1389
      code = TSDB_CODE_OUT_OF_RANGE;
×
1390
      break;
×
1391
    }
1392

1393
    code = tcsPutObjectFromFile2(filename, object, 0);
×
1394
    if (code != 0) {
×
1395
      stError("[tcs] failed to upload checkpoint:%s, reason:%s", filename, tstrerror(code));
×
1396
    } else {
1397
      stDebug("[tcs] upload checkpoint:%s", filename);
×
1398
    }
1399
  }
1400

1401
  int32_t ret = taosCloseDir(&pDir);
×
1402
  if (code == 0 && ret != 0) {
×
1403
    code = ret;
×
1404
  }
1405

1406
  return code;
×
1407
}
1408

1409
int32_t downloadCheckpointByNameS3(const char* id, const char* fname, const char* dstName) {
×
1410
  int32_t nBytes;
1411
  int32_t cap = strlen(id) + strlen(dstName) + 16;
×
1412

1413
  char* buf = taosMemoryCalloc(1, cap);
×
1414
  if (buf == NULL) {
×
1415
    return terrno;
×
1416
  }
1417

1418
  nBytes = snprintf(buf, cap, "%s/%s", id, fname);
×
1419
  if (nBytes <= 0 || nBytes >= cap) {
×
1420
    taosMemoryFree(buf);
×
1421
    return TSDB_CODE_OUT_OF_RANGE;
×
1422
  }
1423
  int32_t code = tcsGetObjectToFile(buf, dstName);
×
1424
  if (code != 0) {
×
1425
    taosMemoryFree(buf);
×
1426
    return TAOS_SYSTEM_ERROR(errno);
×
1427
  }
1428
  taosMemoryFree(buf);
×
1429
  return 0;
×
1430
}
1431

1432
ECHECKPOINT_BACKUP_TYPE streamGetCheckpointBackupType() {
857✔
1433
  if (strlen(tsSnodeAddress) != 0) {
857!
1434
    return DATA_UPLOAD_RSYNC;
×
1435
  } else if (tsS3StreamEnabled) {
857!
1436
    return DATA_UPLOAD_S3;
×
1437
  } else {
1438
    return DATA_UPLOAD_DISABLE;
857✔
1439
  }
1440
}
1441

1442
int32_t streamTaskUploadCheckpoint(const char* id, const char* path, int64_t checkpointId) {
×
1443
  int32_t code = 0;
×
1444
  if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
×
1445
    stError("invalid parameters in upload checkpoint, %s", id);
×
1446
    return TSDB_CODE_INVALID_CFG;
×
1447
  }
1448

1449
  if (strlen(tsSnodeAddress) != 0) {
×
1450
    code = uploadByRsync(id, path, checkpointId);
×
1451
    if (code != 0) {
×
1452
      return TAOS_SYSTEM_ERROR(errno);
×
1453
    }
1454
  } else if (tsS3StreamEnabled) {
×
1455
    return uploadCheckpointToS3(id, path);
×
1456
  }
1457

1458
  return 0;
×
1459
}
1460

1461
// fileName:  CURRENT
1462
int32_t downloadCheckpointDataByName(const char* id, const char* fname, const char* dstName) {
×
1463
  if (id == NULL || fname == NULL || strlen(id) == 0 || strlen(fname) == 0 || strlen(fname) >= PATH_MAX) {
×
1464
    stError("down load checkpoint data parameters invalid");
×
1465
    return TSDB_CODE_INVALID_PARA;
×
1466
  }
1467

1468
  if (strlen(tsSnodeAddress) != 0) {
×
1469
    return 0;
×
1470
  } else if (tsS3StreamEnabled) {
×
1471
    return downloadCheckpointByNameS3(id, fname, dstName);
×
1472
  }
1473

1474
  return 0;
×
1475
}
1476

1477
int32_t streamTaskDownloadCheckpointData(const char* id, char* path, int64_t checkpointId) {
×
1478
  if (id == NULL || path == NULL || strlen(id) == 0 || strlen(path) == 0 || strlen(path) >= PATH_MAX) {
×
1479
    stError("down checkpoint data parameters invalid");
×
1480
    return -1;
×
1481
  }
1482

1483
  if (strlen(tsSnodeAddress) != 0) {
×
1484
    return downloadByRsync(id, path, checkpointId);
×
1485
  } else if (tsS3StreamEnabled) {
×
1486
    return tcsGetObjectsByPrefix(id, path);
×
1487
  }
1488

1489
  return 0;
×
1490
}
1491

1492
int32_t deleteCheckpoint(const char* id) {
×
1493
  if (id == NULL || strlen(id) == 0) {
×
1494
    stError("deleteCheckpoint parameters invalid");
×
1495
    return TSDB_CODE_INVALID_PARA;
×
1496
  }
1497
  if (strlen(tsSnodeAddress) != 0) {
×
1498
    return deleteRsync(id);
×
1499
  } else if (tsS3StreamEnabled) {
×
1500
    tcsDeleteObjectsByPrefix(id);
×
1501
  }
1502
  return 0;
×
1503
}
1504

1505
int32_t deleteCheckpointFile(const char* id, const char* name) {
×
1506
  char object[128] = {0};
×
1507

1508
  int32_t nBytes = snprintf(object, sizeof(object), "%s/%s", id, name);
×
1509
  if (nBytes <= 0 || nBytes >= sizeof(object)) {
×
1510
    return TSDB_CODE_OUT_OF_RANGE;
×
1511
  }
1512

1513
  char*   tmp = object;
×
1514
  int32_t code = tcsDeleteObjects((const char**)&tmp, 1);
×
1515
  if (code != 0) {
×
1516
    return TSDB_CODE_THIRDPARTY_ERROR;
×
1517
  }
1518
  return code;
×
1519
}
1520

1521
int32_t streamTaskSendNegotiateChkptIdMsg(SStreamTask* pTask) {
27✔
1522
  streamMutexLock(&pTask->lock);
27✔
1523
  ETaskStatus p = streamTaskGetStatus(pTask).state;
27✔
1524
  //  if (pInfo->alreadySendChkptId == true) {
1525
  //    stDebug("s-task:%s already start to consensus-checkpointId, not start again before it completed", id);
1526
  //    streamMutexUnlock(&pTask->lock);
1527
  //    return TSDB_CODE_SUCCESS;
1528
  //  } else {
1529
  //    pInfo->alreadySendChkptId = true;
1530
  //  }
1531
  //
1532
  streamTaskSetReqConsenChkptId(pTask, taosGetTimestampMs());
27✔
1533
  streamMutexUnlock(&pTask->lock);
27✔
1534

1535
  if (pTask->pBackend != NULL) {
27!
1536
    streamFreeTaskState(pTask, p);
×
1537
    pTask->pBackend = NULL;
×
1538
  }
1539
  return 0;
27✔
1540
}
1541

1542
int32_t streamTaskSendCheckpointsourceRsp(SStreamTask* pTask) {
5✔
1543
  int32_t code = 0;
5✔
1544
  if (pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
5✔
1545
    return code;
3✔
1546
  }
1547

1548
  streamMutexLock(&pTask->lock);
2✔
1549
  SStreamTaskState p = streamTaskGetStatus(pTask);
2✔
1550
  if (p.state == TASK_STATUS__CK) {
2!
1551
    code = streamTaskSendCheckpointSourceRsp(pTask);
×
1552
  }
1553
  streamMutexUnlock(&pTask->lock);
2✔
1554

1555
  return code;
2✔
1556
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc