• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3627

02 Mar 2025 11:16PM UTC coverage: 63.596% (-0.2%) from 63.764%
#3627

push

travis-ci

GitHub
Merge pull request #29973 from taosdata/doc/internal

148665 of 299855 branches covered (49.58%)

Branch coverage included in aggregate %.

233076 of 300407 relevant lines covered (77.59%)

17543856.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

69.87
/source/libs/stream/src/streamStartTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "streamBackendRocksdb.h"
18
#include "streamInt.h"
19
#include "tmisce.h"
20
#include "tref.h"
21
#include "tsched.h"
22
#include "tstream.h"
23
#include "ttimer.h"
24
#include "wal.h"
25

26
typedef struct STaskInitTs {
27
  int64_t start;
28
  int64_t end;
29
  bool    success;
30
} STaskInitTs;
31

32
static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now);
33
static bool    allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal);
34
static void    displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ);
35

36
// restore the checkpoint id by negotiating the latest consensus checkpoint id
37
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
9,773✔
38
  int32_t code = TSDB_CODE_SUCCESS;
9,773✔
39
  int32_t vgId = pMeta->vgId;
9,773✔
40
  int64_t now = taosGetTimestampMs();
9,772✔
41
  SArray* pTaskList = NULL;
9,772✔
42

43
  int32_t numOfTasks = taosArrayGetSize(pMeta->pTaskList);
9,772✔
44
  stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now);
9,772!
45

46
  if (numOfTasks == 0) {
9,773✔
47
    stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);
9,631!
48

49
    streamMetaWLock(pMeta);
9,631✔
50
    streamMetaResetStartInfo(&pMeta->startInfo, vgId);
9,631✔
51
    streamMetaWUnLock(pMeta);
9,631✔
52
    return TSDB_CODE_SUCCESS;
9,630✔
53
  }
54

55
  code = prepareBeforeStartTasks(pMeta, &pTaskList, now);
142✔
56
  if (code != TSDB_CODE_SUCCESS) {
142!
57
    return TSDB_CODE_SUCCESS;  // ignore the error and return directly
×
58
  }
59

60
  // broadcast the check downstream tasks msg only for tasks with related fill-history tasks.
61
  numOfTasks = taosArrayGetSize(pTaskList);
142✔
62

63
  // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
64
  // initialization, when the operation of check downstream tasks status is executed far quickly.
65
  for (int32_t i = 0; i < numOfTasks; ++i) {
651✔
66
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
509✔
67
    SStreamTask*   pTask = NULL;
509✔
68
    code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
509✔
69
    if ((pTask == NULL) || (code != 0)) {
509!
70
      stError("vgId:%d failed to acquire task:0x%x during start task, it may be dropped", pMeta->vgId, pTaskId->taskId);
×
71
      int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
×
72
      if (ret) {
×
73
        stError("s-task:0x%x add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret));
×
74
      }
75
      continue;
×
76
    }
77

78
    if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) {
509!
79
      code = pMeta->expandTaskFn(pTask);
90✔
80
      if (code != TSDB_CODE_SUCCESS) {
90!
81
        stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
×
82
        streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs);
×
83
      }
84
    }
85

86
    streamMetaReleaseTask(pMeta, pTask);
509✔
87
  }
88

89
  // Tasks, with related fill-history task or without any checkpoint yet, can be started directly here.
90
  for (int32_t i = 0; i < numOfTasks; ++i) {
651✔
91
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
509✔
92

93
    SStreamTask* pTask = NULL;
509✔
94
    code = streamMetaAcquireTask(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
509✔
95
    if ((pTask == NULL )|| (code != 0)) {
509!
96
      stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
×
97
      int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId);
×
98
      if (ret) {
×
99
        stError("s-task:0x%x failed add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret));
×
100
      }
101

102
      continue;
90✔
103
    }
104

105
    STaskExecStatisInfo* pInfo = &pTask->execInfo;
509✔
106

107
    // fill-history task can only be launched by related stream tasks.
108
    if (pTask->info.fillHistory == 1) {
509✔
109
      stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr);
45!
110
      streamMetaReleaseTask(pMeta, pTask);
45✔
111
      continue;
45✔
112
    }
113

114
    // ready now, start the related fill-history task
115
    if (pTask->status.downstreamReady == 1) {
464!
116
      if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
×
117
        stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
×
118
                pTask->id.idStr);
119
        code = streamLaunchFillHistoryTask(pTask);  // todo: how about retry launch fill-history task?
×
120
        if (code) {
×
121
          stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code));
×
122
        }
123
      }
124

125
      code = streamMetaAddTaskLaunchResult(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs, pInfo->readyTs,
×
126
                                           true);
127
      streamMetaReleaseTask(pMeta, pTask);
×
128
      continue;
×
129
    }
130

131
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
464✔
132
      int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
45✔
133
      if (ret != TSDB_CODE_SUCCESS) {
45!
134
        stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
×
135
        code = ret;
×
136

137
        // do no added into result hashmap if it is failed due to concurrently starting of this stream task.
138
        if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
×
139
          streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
×
140
        }
141
      }
142

143
      streamMetaReleaseTask(pMeta, pTask);
45✔
144
      continue;
45✔
145
    }
146

147
    // negotiate the consensus checkpoint id for current task
148
    code = streamTaskSendNegotiateChkptIdMsg(pTask);
419✔
149

150
    // this task may has no checkpoint, but others tasks may generate checkpoint already?
151
    streamMetaReleaseTask(pMeta, pTask);
419✔
152
  }
153

154
  // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
155
  // initialization, when the operation of check downstream tasks status is executed far quickly.
156
  stInfo("vgId:%d start all task(s) completed", pMeta->vgId);
142!
157
  taosArrayDestroy(pTaskList);
142✔
158
  return code;
142✔
159
}
160

161
int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) {
142✔
162
  streamMetaWLock(pMeta);
142✔
163

164
  if (pMeta->closeFlag) {
142!
165
    streamMetaWUnLock(pMeta);
×
166
    stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId);
×
167
    return TSDB_CODE_FAILED;
×
168
  }
169

170
  *pList = taosArrayDup(pMeta->pTaskList, NULL);
142✔
171
  if (*pList == NULL) {
142!
172
    return terrno;
×
173
  }
174

175
  taosHashClear(pMeta->startInfo.pReadyTaskSet);
142✔
176
  taosHashClear(pMeta->startInfo.pFailedTaskSet);
142✔
177
  pMeta->startInfo.startTs = now;
142✔
178

179
  int32_t code = streamMetaResetTaskStatus(pMeta);
142✔
180
  streamMetaWUnLock(pMeta);
142✔
181

182
  return code;
142✔
183
}
184

185
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
9,717✔
186
  taosHashClear(pStartInfo->pReadyTaskSet);
9,717✔
187
  taosHashClear(pStartInfo->pFailedTaskSet);
9,717✔
188
  pStartInfo->tasksWillRestart = 0;
9,717✔
189
  pStartInfo->readyTs = 0;
9,717✔
190
  pStartInfo->elapsedTime = 0;
9,717✔
191

192
  // reset the sentinel flag value to be 0
193
  pStartInfo->startAllTasks = 0;
9,717✔
194
  stDebug("vgId:%d clear start-all-task info", vgId);
9,717✔
195
}
9,717✔
196

197
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
14,218✔
198
                                      int64_t endTs, bool ready) {
199
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
14,218✔
200
  STaskId         id = {.streamId = streamId, .taskId = taskId};
14,218✔
201
  int32_t         vgId = pMeta->vgId;
14,218✔
202
  bool            allRsp = true;
14,218✔
203
  SStreamTask*    p = NULL;
14,218✔
204

205
  streamMetaWLock(pMeta);
14,218✔
206
  int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &p);
14,218✔
207
  if (code != 0) {  // task does not exist in current vnode, not record the complete info
14,218✔
208
    stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId);
1!
209
    streamMetaWUnLock(pMeta);
1✔
210
    return 0;
1✔
211
  }
212

213
  streamMetaReleaseTask(pMeta, p);
14,217✔
214

215
  if (pStartInfo->startAllTasks != 1) {
14,217✔
216
    int64_t el = endTs - startTs;
13,905✔
217
    stDebug(
13,905✔
218
        "vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed "
219
        "time:%" PRId64 "ms",
220
        vgId, taskId, ready, el);
221
    streamMetaWUnLock(pMeta);
13,905✔
222
    return 0;
13,905✔
223
  }
224

225
  STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
312✔
226
  SHashObj*   pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
312✔
227
  code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
312✔
228
  if (code) {
312!
229
    if (code == TSDB_CODE_DUP_KEY) {
×
230
      stError("vgId:%d record start task result failed, s-task:0x%" PRIx64
×
231
              " already exist start results in meta start task result hashmap",
232
              vgId, id.taskId);
233
    } else {
234
      stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed", vgId, id.taskId);
×
235
    }
236
    streamMetaWUnLock(pMeta);
×
237
    return code;
×
238
  }
239

240
  int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
312✔
241
  int32_t numOfRecv = taosHashGetSize(pStartInfo->pReadyTaskSet) + taosHashGetSize(pStartInfo->pFailedTaskSet);
312✔
242

243
  allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal);
312✔
244
  if (allRsp) {
312✔
245
    pStartInfo->readyTs = taosGetTimestampMs();
80✔
246
    pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
80!
247

248
    stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
80✔
249
                ", readyTs:%" PRId64 " total elapsed time:%.2fs",
250
            vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
251
            pStartInfo->elapsedTime / 1000.0);
252

253
    // print the initialization elapsed time and info
254
    displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
80✔
255
    displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
80✔
256
    streamMetaResetStartInfo(pStartInfo, vgId);
80✔
257
    streamMetaWUnLock(pMeta);
80✔
258

259
    code = pStartInfo->completeFn(pMeta);
80✔
260
  } else {
261
    streamMetaWUnLock(pMeta);
232✔
262
    stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready,
232✔
263
            numOfRecv, numOfTotal);
264
  }
265

266
  return code;
312✔
267
}
268

269
// check all existed tasks are received rsp
270
bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) {
312✔
271
  for (int32_t i = 0; i < numOfTotal; ++i) {
894✔
272
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
814✔
273
    if (pTaskId == NULL) {
814!
274
      continue;
×
275
    }
276

277
    STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
814✔
278
    void*   px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx));
814✔
279
    if (px == NULL) {
814✔
280
      px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx));
273✔
281
      if (px == NULL) {
273✔
282
        return false;
232✔
283
      }
284
    }
285
  }
286

287
  return true;
80✔
288
}
289

290
void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
160✔
291
  int32_t vgId = pMeta->vgId;
160✔
292
  void*   pIter = NULL;
160✔
293
  size_t  keyLen = 0;
160✔
294

295
  stInfo("vgId:%d %d tasks check-downstream completed, %s", vgId, taosHashGetSize(pTaskSet),
160!
296
         succ ? "success" : "failed");
297

298
  while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
444✔
299
    STaskInitTs* pInfo = pIter;
284✔
300
    void*        key = taosHashGetKey(pIter, &keyLen);
284✔
301
    SStreamTask* pTask = NULL;
284✔
302
    int32_t      code = streamMetaAcquireTaskUnsafe(pMeta, key, &pTask);
284✔
303
    if (code == 0) {
284!
304
      stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", pTask->id.idStr,
284!
305
             pTask->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
306
      streamMetaReleaseTask(pMeta, pTask);
284✔
307
    } else {
308
      stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
×
309
    }
310
  }
311
}
160✔
312

313
int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo) {
11,707✔
314
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
11,707✔
315

316
  pStartInfo->pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
11,723✔
317
  if (pStartInfo->pReadyTaskSet == NULL) {
11,724!
318
    return terrno;
×
319
  }
320

321
  pStartInfo->pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
11,724✔
322
  if (pStartInfo->pFailedTaskSet == NULL) {
11,724!
323
    return terrno;
×
324
  }
325

326
  return 0;
11,724✔
327
}
328

329
void streamMetaClearStartInfo(STaskStartInfo* pStartInfo) {
11,685✔
330
  taosHashCleanup(pStartInfo->pReadyTaskSet);
11,685✔
331
  taosHashCleanup(pStartInfo->pFailedTaskSet);
11,687✔
332
  pStartInfo->readyTs = 0;
11,687✔
333
  pStartInfo->elapsedTime = 0;
11,687✔
334
  pStartInfo->startTs = 0;
11,687✔
335
  pStartInfo->startAllTasks = 0;
11,687✔
336
  pStartInfo->tasksWillRestart = 0;
11,687✔
337
  pStartInfo->restartCount = 0;
11,687✔
338
}
11,687✔
339

340
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
9,408✔
341
  int32_t      code = 0;
9,408✔
342
  int32_t      vgId = pMeta->vgId;
9,408✔
343
  SStreamTask* pTask = NULL;
9,408✔
344
  bool         continueExec = true;
9,408✔
345

346
  stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
9,408!
347

348
  code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
9,408✔
349
  if ((pTask == NULL) || (code != 0)) {
9,408!
350
    stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId);
1!
351
    int32_t ret = streamMetaAddFailedTask(pMeta, streamId, taskId);
1✔
352
    if (ret) {
1!
353
      stError("s-task:0x%x add check downstream failed, core:%s", taskId, tstrerror(ret));
1!
354
    }
355

356
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
1✔
357
  }
358

359
  // fill-history task can only be launched by related stream tasks.
360
  STaskExecStatisInfo* pInfo = &pTask->execInfo;
9,407✔
361
  if (pTask->info.fillHistory == 1) {
9,407!
362
    stError("s-task:0x%x vgId:%d fill-history task, not start here", taskId, vgId);
×
363
    streamMetaReleaseTask(pMeta, pTask);
×
364
    return TSDB_CODE_SUCCESS;
×
365
  }
366

367
  // the start all tasks procedure may happen to start the newly deployed stream task, and results in the
368
  // concurrently start this task by two threads.
369
  streamMutexLock(&pTask->lock);
9,407✔
370

371
  SStreamTaskState status = streamTaskGetStatus(pTask);
9,407✔
372
  if (status.state != TASK_STATUS__UNINIT) {
9,407!
373
    stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name);
×
374
    continueExec = false;
×
375
  } else {
376
    continueExec = true;
9,407✔
377
  }
378
  streamMutexUnlock(&pTask->lock);
9,407✔
379

380
  if (!continueExec) {
9,407!
381
    streamMetaReleaseTask(pMeta, pTask);
×
382
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
383
  }
384

385
  if(pTask->status.downstreamReady != 0) {
9,407!
386
    stFatal("s-task:0x%x downstream should be not ready, but it ready here, internal error happens", taskId);
×
387
    streamMetaReleaseTask(pMeta, pTask);
×
388
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
389
  }
390

391
  // avoid initialization and destroy running concurrently.
392
  streamMutexLock(&pTask->lock);
9,407✔
393
  if (pTask->pBackend == NULL) {
9,407!
394
    code = pMeta->expandTaskFn(pTask);
9,407✔
395
    streamMutexUnlock(&pTask->lock);
9,406✔
396

397
    if (code != TSDB_CODE_SUCCESS) {
9,406!
398
      streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
×
399
    }
400
  } else {
401
    streamMutexUnlock(&pTask->lock);
×
402
  }
403

404
  // concurrently start task may cause the latter started task be failed, and also failed to added into meta result.
405
  if (code == TSDB_CODE_SUCCESS) {
9,406!
406
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
9,406✔
407
    if (code != TSDB_CODE_SUCCESS) {
9,407!
408
      stError("s-task:%s vgId:%d failed to handle event:init-task, code:%s", pTask->id.idStr, pMeta->vgId,
×
409
              tstrerror(code));
410

411
      // do no added into result hashmap if it is failed due to concurrently starting of this stream task.
412
      if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
×
413
        streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs);
×
414
      }
415
    }
416
  }
417

418
  streamMetaReleaseTask(pMeta, pTask);
9,407✔
419
  return code;
9,407✔
420
}
421

422
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
4,759✔
423
  streamMetaRLock(pMeta);
4,759✔
424

425
  SArray* pTaskList = NULL;
4,760✔
426
  int32_t num = taosArrayGetSize(pMeta->pTaskList);
4,760✔
427
  stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
4,761✔
428

429
  if (num == 0) {
4,763✔
430
    stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
4,710!
431
    streamMetaRUnLock(pMeta);
4,710✔
432
    return TSDB_CODE_SUCCESS;
4,710✔
433
  }
434

435
  int64_t st = taosGetTimestampMs();
53✔
436

437
  // send hb msg to mnode before closing all tasks.
438
  int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
53✔
439
  if (code != TSDB_CODE_SUCCESS) {
53!
440
    streamMetaRUnLock(pMeta);
×
441
    return code;
×
442
  }
443

444
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
53✔
445
  for (int32_t i = 0; i < numOfTasks; ++i) {
209✔
446
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
156✔
447
    SStreamTask*   pTask = NULL;
156✔
448

449
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
156✔
450
    if (code != TSDB_CODE_SUCCESS) {
156✔
451
      continue;
74✔
452
    }
453

454
    int64_t refId = pTask->id.refId;
82✔
455
    int32_t ret = streamTaskStop(pTask);
82✔
456
    if (ret) {
82!
457
      stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
×
458
    }
459

460
    streamMetaReleaseTask(pMeta, pTask);
82✔
461
  }
462

463
  taosArrayDestroy(pTaskList);
53✔
464

465
  double el = (taosGetTimestampMs() - st) / 1000.0;
53✔
466
  stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
53✔
467

468
  streamMetaRUnLock(pMeta);
53✔
469
  return code;
53✔
470
}
471

472
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
62,433✔
473
  SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo;
62,433✔
474

475
  int32_t vgId = pTask->pMeta->vgId;
62,433✔
476
  if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) {
62,433✔
477
    // mark the sending of req consensus checkpoint request.
478
    pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND;
249✔
479
    pConChkptInfo->statusTs = ts;
249✔
480
    stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr,
249✔
481
            vgId, pConChkptInfo->statusTs);
482
    return 1;
249✔
483
  } else {
484
    int32_t el = (ts - pConChkptInfo->statusTs) / 1000;
62,184✔
485

486
    // not recv consensus-checkpoint rsp for 60sec, send it again in hb to mnode
487
    if ((pConChkptInfo->status == TASK_CONSEN_CHKPT_SEND) && el > 60) {
62,184!
488
      pConChkptInfo->statusTs = ts;
×
489

490
      stWarn(
×
491
          "s-task:%s vgId:%d not recv consensus-chkptId for %ds(more than 60s), set requiring in Hb again, ts:%" PRId64,
492
          pTask->id.idStr, vgId, el, pConChkptInfo->statusTs);
493
      return 1;
×
494
    }
495
  }
496

497
  return 0;
62,184✔
498
}
499

500
void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts) {
201✔
501
  SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
201✔
502
  pInfo->consenChkptTransId = transId;
201✔
503
  pInfo->status = TASK_CONSEN_CHKPT_RECV;
201✔
504
  pInfo->statusTs = ts;
201✔
505

506
  stInfo("s-task:%s set recv consen-checkpointId, transId:%d", pTask->id.idStr, transId);
201!
507
}
201✔
508

509
void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
419✔
510
  SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
419✔
511
  int32_t           prevTrans = pInfo->consenChkptTransId;
419✔
512

513
  pInfo->status = TASK_CONSEN_CHKPT_REQ;
419✔
514
  pInfo->statusTs = ts;
419✔
515
  pInfo->consenChkptTransId = 0;
419✔
516

517
  stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64, pTask->id.idStr, prevTrans, ts);
419✔
518
}
419✔
519

520
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
40✔
521
  int32_t      code = TSDB_CODE_SUCCESS;
40✔
522
  int64_t      now = taosGetTimestampMs();
40✔
523
  int64_t      startTs = 0;
40✔
524
  bool         hasFillhistoryTask = false;
40✔
525
  STaskId      hId = {0};
40✔
526
  STaskId      id = {.streamId = streamId, .taskId = taskId};
40✔
527
  SStreamTask* pTask = NULL;
40✔
528

529
  stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
40✔
530

531
  streamMetaRLock(pMeta);
40✔
532

533
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
40✔
534
  if (code == 0) {
40✔
535
    startTs = pTask->taskCheckInfo.startTs;
2✔
536
    hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(pTask);
2✔
537
    hId = pTask->hTaskInfo.id;
2✔
538
    streamMetaReleaseTask(pMeta, pTask);
2✔
539

540
    streamMetaRUnLock(pMeta);
2✔
541

542
    // add the failed task info, along with the related fill-history task info into tasks list.
543
    code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
2✔
544
    if (hasFillhistoryTask) {
2!
545
      code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
×
546
    }
547
  } else {
548
    streamMetaRUnLock(pMeta);
38✔
549

550
    stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
38!
551
            streamId, taskId, pMeta->vgId);
552
    code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
38✔
553
  }
554

555
  return code;
40✔
556
}
557

558
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs) {
28✔
559
  int32_t startTs = pTask->execInfo.checkTs;
28✔
560
  int32_t code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
28✔
561
  if (code) {
28!
562
    stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code));
×
563
  }
564

565
  // automatically set the related fill-history task to be failed.
566
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
28✔
567
    STaskId* pId = &pTask->hTaskInfo.id;
15✔
568
    code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
15✔
569
    if (code) {
15!
570
      stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code));
×
571
    }
572
  }
573
}
28✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc