• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3720

26 Mar 2025 06:20AM UTC coverage: 30.242% (-31.7%) from 61.936%
#3720

push

travis-ci

web-flow
feat(taosBenchmark): supports decimal data type (#30456)

* feat: taosBenchmark supports decimal data type

* build: decimal script not use pytest.sh

* fix: fix typo for decimal script

* test: insertBasic.py debug

71234 of 313946 branches covered (22.69%)

Branch coverage included in aggregate %.

38 of 423 new or added lines in 8 files covered. (8.98%)

120240 existing lines in 447 files now uncovered.

118188 of 312400 relevant lines covered (37.83%)

1450220.33 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

10.42
/source/libs/stream/src/streamStartTask.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "executor.h"
17
#include "streamBackendRocksdb.h"
18
#include "streamInt.h"
19
#include "tmisce.h"
20
#include "tref.h"
21
#include "tsched.h"
22
#include "tstream.h"
23
#include "ttimer.h"
24
#include "wal.h"
25

26
typedef struct STaskInitTs {
27
  int64_t start;
28
  int64_t end;
29
  bool    success;
30
} STaskInitTs;
31

32
static int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now);
33
static bool    allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal);
34
static void    displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ);
35

36
// restore the checkpoint id by negotiating the latest consensus checkpoint id
37
int32_t streamMetaStartAllTasks(SStreamMeta* pMeta) {
208✔
38
  int32_t code = TSDB_CODE_SUCCESS;
208✔
39
  int32_t vgId = pMeta->vgId;
208✔
40
  int64_t now = taosGetTimestampMs();
208✔
41
  SArray* pTaskList = NULL;
208✔
42
  int32_t numOfConsensusChkptIdTasks = 0;
208✔
43
  int32_t numOfTasks = 0;
208✔
44

45
  numOfTasks = taosArrayGetSize(pMeta->pTaskList);
208✔
46
  if (numOfTasks == 0) {
208!
47
    stInfo("vgId:%d no tasks exist, quit from consensus checkpointId", pMeta->vgId);
208!
48
    streamMetaResetStartInfo(&pMeta->startInfo, vgId);
208✔
49
    return TSDB_CODE_SUCCESS;
208✔
50
  }
51

UNCOV
52
  stInfo("vgId:%d start to consensus checkpointId for all %d task(s), start ts:%" PRId64, vgId, numOfTasks, now);
×
53

UNCOV
54
  code = prepareBeforeStartTasks(pMeta, &pTaskList, now);
×
UNCOV
55
  if (code != TSDB_CODE_SUCCESS) {
×
56
    return TSDB_CODE_SUCCESS;  // ignore the error and return directly
×
57
  }
58

59
  // broadcast the check downstream tasks msg only for tasks with related fill-history tasks.
UNCOV
60
  numOfTasks = taosArrayGetSize(pTaskList);
×
61

62
  // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
63
  // initialization, when the operation of check downstream tasks status is executed far quickly.
UNCOV
64
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
UNCOV
65
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
×
UNCOV
66
    SStreamTask*   pTask = NULL;
×
67

UNCOV
68
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
×
UNCOV
69
    if ((pTask == NULL) || (code != 0)) {
×
70
      stError("vgId:%d failed to acquire task:0x%x during start task, it may be dropped", pMeta->vgId, pTaskId->taskId);
×
71
      int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId, false);
×
72
      if (ret) {
×
73
        stError("s-task:0x%x add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret));
×
74
      }
75
      continue;
×
76
    }
77

UNCOV
78
    if ((pTask->pBackend == NULL) && ((pTask->info.fillHistory == 1) || HAS_RELATED_FILLHISTORY_TASK(pTask))) {
×
UNCOV
79
      code = pMeta->expandTaskFn(pTask);
×
UNCOV
80
      if (code != TSDB_CODE_SUCCESS) {
×
81
        stError("s-task:0x%x vgId:%d failed to expand stream backend", pTaskId->taskId, vgId);
×
82
        streamMetaAddFailedTaskSelf(pTask, pTask->execInfo.readyTs, false);
×
83
      }
84
    }
85

UNCOV
86
    streamMetaReleaseTask(pMeta, pTask);
×
87
  }
88

89
  // Tasks, with related fill-history task or without any checkpoint yet, can be started directly here.
UNCOV
90
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
UNCOV
91
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
×
92

UNCOV
93
    SStreamTask* pTask = NULL;
×
UNCOV
94
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
×
UNCOV
95
    if ((pTask == NULL) || (code != 0)) {
×
96
      stError("vgId:%d failed to acquire task:0x%x during start tasks", pMeta->vgId, pTaskId->taskId);
×
97
      int32_t ret = streamMetaAddFailedTask(pMeta, pTaskId->streamId, pTaskId->taskId, false);
×
98
      if (ret) {
×
99
        stError("s-task:0x%x failed add check downstream failed, core:%s", pTaskId->taskId, tstrerror(ret));
×
100
      }
101

UNCOV
102
      continue;
×
103
    }
104

UNCOV
105
    STaskExecStatisInfo* pInfo = &pTask->execInfo;
×
106

107
    // fill-history task can only be launched by related stream tasks.
UNCOV
108
    if (pTask->info.fillHistory == 1) {
×
UNCOV
109
      stDebug("s-task:%s fill-history task wait related stream task start", pTask->id.idStr);
×
UNCOV
110
      streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
111
      continue;
×
112
    }
113

114
    // ready now, start the related fill-history task
UNCOV
115
    if (pTask->status.downstreamReady == 1) {
×
UNCOV
116
      if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
×
117
        stDebug("s-task:%s downstream ready, no need to check downstream, check only related fill-history task",
×
118
                pTask->id.idStr);
119
        code = streamLaunchFillHistoryTask(pTask, false);  // todo: how about retry launch fill-history task?
×
120
        if (code) {
×
121
          stError("s-task:%s failed to launch history task, code:%s", pTask->id.idStr, tstrerror(code));
×
122
        }
123
      }
124

UNCOV
125
      code = streamMetaAddTaskLaunchResultNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, pInfo->checkTs,
×
126
                                                 pInfo->readyTs, true);
UNCOV
127
      streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
128
      continue;
×
129
    }
130

UNCOV
131
    if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
×
UNCOV
132
      int32_t ret = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
×
UNCOV
133
      if (ret != TSDB_CODE_SUCCESS) {
×
134
        stError("vgId:%d failed to handle event:%d", pMeta->vgId, TASK_EVENT_INIT);
×
135
        code = ret;
×
136

137
        // do no added into result hashmap if it is failed due to concurrently starting of this stream task.
138
        if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
×
139
          streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false);
×
140
        }
141
      }
142

UNCOV
143
      streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
144
      continue;
×
145
    }
146

147
    // negotiate the consensus checkpoint id for current task
UNCOV
148
    code = streamTaskSendNegotiateChkptIdMsg(pTask);
×
UNCOV
149
    if (code == 0) {
×
UNCOV
150
      numOfConsensusChkptIdTasks += 1;
×
151
    }
152

153
    // this task may have no checkpoint, but others tasks may generate checkpoint already?
UNCOV
154
    streamMetaReleaseTask(pMeta, pTask);
×
155
  }
156

UNCOV
157
  if (numOfConsensusChkptIdTasks > 0) {
×
UNCOV
158
    pMeta->startInfo.curStage = START_MARK_REQ_CHKPID;
×
UNCOV
159
    SStartTaskStageInfo info = {.stage = pMeta->startInfo.curStage, .ts = now};
×
160

UNCOV
161
    void*   p = taosArrayPush(pMeta->startInfo.pStagesList, &info);
×
UNCOV
162
    int32_t num = (int32_t)taosArrayGetSize(pMeta->startInfo.pStagesList);
×
163

UNCOV
164
    if (p != NULL) {
×
UNCOV
165
      stDebug("vgId:%d %d task(s) 0 stage -> mark_req stage, reqTs:%" PRId64 " numOfStageHist:%d", pMeta->vgId,
×
166
              numOfConsensusChkptIdTasks, info.ts, num);
167
    } else {
UNCOV
168
      stError("vgId:%d %d task(s) 0 stage -> mark_req stage, reqTs:%" PRId64
×
169
              " numOfStageHist:%d, FAILED, out of memory",
170
              pMeta->vgId, numOfConsensusChkptIdTasks, info.ts, num);
171
    }
172
  }
173

174
  // prepare the fill-history task before starting all stream tasks, to avoid fill-history tasks are started without
175
  // initialization, when the operation of check downstream tasks status is executed far quickly.
176
  stInfo("vgId:%d start all task(s) completed", pMeta->vgId);
×
177
  taosArrayDestroy(pTaskList);
×
UNCOV
178
  return code;
×
179
}
180

UNCOV
181
int32_t prepareBeforeStartTasks(SStreamMeta* pMeta, SArray** pList, int64_t now) {
×
182
  STaskStartInfo* pInfo = &pMeta->startInfo;
×
183
  if (pMeta->closeFlag) {
×
UNCOV
184
    stError("vgId:%d vnode is closed, not start check task(s) downstream status", pMeta->vgId);
×
UNCOV
185
    return TSDB_CODE_FAILED;
×
186
  }
187

UNCOV
188
  *pList = taosArrayDup(pMeta->pTaskList, NULL);
×
UNCOV
189
  if (*pList == NULL) {
×
UNCOV
190
    stError("vgId:%d failed to dup tasklist, before restart tasks, code:%s", pMeta->vgId, tstrerror(terrno));
×
UNCOV
191
    return terrno;
×
192
  }
193

UNCOV
194
  taosHashClear(pInfo->pReadyTaskSet);
×
UNCOV
195
  taosHashClear(pInfo->pFailedTaskSet);
×
UNCOV
196
  taosArrayClear(pInfo->pStagesList);
×
197

UNCOV
198
  pInfo->curStage = 0;
×
UNCOV
199
  pInfo->startTs = now;
×
200

UNCOV
201
  int32_t code = streamMetaResetTaskStatus(pMeta);
×
UNCOV
202
  return code;
×
203
}
204

205
void streamMetaResetStartInfo(STaskStartInfo* pStartInfo, int32_t vgId) {
1,014✔
206
  taosHashClear(pStartInfo->pReadyTaskSet);
1,014✔
207
  taosHashClear(pStartInfo->pFailedTaskSet);
1,014✔
208
  taosArrayClear(pStartInfo->pStagesList);
1,014✔
209

210
  pStartInfo->tasksWillRestart = 0;
1,014✔
211
  pStartInfo->readyTs = 0;
1,014✔
212
  pStartInfo->elapsedTime = 0;
1,014✔
213
  pStartInfo->curStage = 0;
1,014✔
214

215
  // reset the sentinel flag value to be 0
216
  pStartInfo->startAllTasks = 0;
1,014✔
217
  stDebug("vgId:%d clear start-all-task info", vgId);
1,014✔
218
}
1,014✔
219

UNCOV
220
static void streamMetaLogLaunchTasksInfo(SStreamMeta* pMeta, int32_t numOfTotal, int32_t taskId, bool ready) {
×
UNCOV
221
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
×
222

UNCOV
223
  pStartInfo->readyTs = taosGetTimestampMs();
×
UNCOV
224
  pStartInfo->elapsedTime = (pStartInfo->startTs != 0) ? pStartInfo->readyTs - pStartInfo->startTs : 0;
×
225

UNCOV
226
  for (int32_t i = 0; i < taosArrayGetSize(pStartInfo->pStagesList); ++i) {
×
UNCOV
227
    SStartTaskStageInfo* pStageInfo = taosArrayGet(pStartInfo->pStagesList, i);
×
UNCOV
228
    stDebug("vgId:%d start task procedure, stage:%d, ts:%" PRId64, pMeta->vgId, pStageInfo->stage, pStageInfo->ts);
×
229
  }
230

UNCOV
231
  stDebug("vgId:%d all %d task(s) check downstream completed, last completed task:0x%x (succ:%d) startTs:%" PRId64
×
232
          ", readyTs:%" PRId64 " total elapsed time:%.2fs",
233
          pMeta->vgId, numOfTotal, taskId, ready, pStartInfo->startTs, pStartInfo->readyTs,
234
          pStartInfo->elapsedTime / 1000.0);
235

236
  // print the initialization elapsed time and info
UNCOV
237
  displayStatusInfo(pMeta, pStartInfo->pReadyTaskSet, true);
×
UNCOV
238
  displayStatusInfo(pMeta, pStartInfo->pFailedTaskSet, false);
×
UNCOV
239
}
×
240

UNCOV
241
int32_t streamMetaAddTaskLaunchResultNoLock(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
×
242
                                            int64_t endTs, bool ready) {
243
  STaskStartInfo* pStartInfo = &pMeta->startInfo;
×
244
  STaskId         id = {.streamId = streamId, .taskId = taskId};
×
UNCOV
245
  int32_t         vgId = pMeta->vgId;
×
UNCOV
246
  bool            allRsp = true;
×
UNCOV
247
  SStreamTask*    p = NULL;
×
248

UNCOV
249
  int32_t code = streamMetaAcquireTaskUnsafe(pMeta, &id, &p);
×
UNCOV
250
  if (code != 0) {  // task does not exist in current vnode, not record the complete info
×
UNCOV
251
    stError("vgId:%d s-task:0x%x not exists discard the check downstream info", vgId, taskId);
×
UNCOV
252
    return 0;
×
253
  }
254

UNCOV
255
  streamMetaReleaseTask(pMeta, p);
×
256

UNCOV
257
  if (pStartInfo->startAllTasks != 1) {
×
UNCOV
258
    int64_t el = endTs - startTs;
×
UNCOV
259
    stDebug(
×
260
        "vgId:%d not in start all task(s) process, not record launch result status, s-task:0x%x launch succ:%d elapsed "
261
        "time:%" PRId64 "ms",
262
        vgId, taskId, ready, el);
UNCOV
263
    return 0;
×
264
  }
265

UNCOV
266
  STaskInitTs initTs = {.start = startTs, .end = endTs, .success = ready};
×
UNCOV
267
  SHashObj*   pDst = ready ? pStartInfo->pReadyTaskSet : pStartInfo->pFailedTaskSet;
×
268
  code = taosHashPut(pDst, &id, sizeof(id), &initTs, sizeof(STaskInitTs));
×
UNCOV
269
  if (code) {
×
UNCOV
270
    if (code == TSDB_CODE_DUP_KEY) {
×
UNCOV
271
      stError("vgId:%d record start task result failed, s-task:0x%" PRIx64
×
272
              " already exist start results in meta start task result hashmap",
273
              vgId, id.taskId);
UNCOV
274
      code = 0;
×
275
    } else {
UNCOV
276
      stError("vgId:%d failed to record start task:0x%" PRIx64 " results, start all tasks failed, code:%s", vgId,
×
277
              id.taskId, tstrerror(code));
278
    }
279
  }
280

UNCOV
281
  int32_t numOfTotal = streamMetaGetNumOfTasks(pMeta);
×
UNCOV
282
  int32_t numOfSucc = taosHashGetSize(pStartInfo->pReadyTaskSet);
×
UNCOV
283
  int32_t numOfRecv = numOfSucc + taosHashGetSize(pStartInfo->pFailedTaskSet);
×
284

UNCOV
285
  allRsp = allCheckDownstreamRsp(pMeta, pStartInfo, numOfTotal);
×
UNCOV
286
  if (allRsp) {
×
UNCOV
287
    streamMetaLogLaunchTasksInfo(pMeta, numOfTotal, taskId, ready);
×
UNCOV
288
    streamMetaResetStartInfo(pStartInfo, vgId);
×
289

UNCOV
290
    code = pStartInfo->completeFn(pMeta);
×
291
  } else {
UNCOV
292
    stDebug("vgId:%d recv check downstream results, s-task:0x%x succ:%d, received:%d, total:%d", vgId, taskId, ready,
×
293
            numOfRecv, numOfTotal);
294
  }
295

UNCOV
296
  return code;
×
297
}
298

UNCOV
299
int32_t streamMetaAddTaskLaunchResult(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, int64_t startTs,
×
300
                                      int64_t endTs, bool ready) {
UNCOV
301
  int32_t code = 0;
×
302

UNCOV
303
  streamMetaWLock(pMeta);
×
UNCOV
304
  code = streamMetaAddTaskLaunchResultNoLock(pMeta, streamId, taskId, startTs, endTs, ready);
×
UNCOV
305
  streamMetaWUnLock(pMeta);
×
306

307
  return code;
×
308
}
309

310
// check all existed tasks are received rsp
UNCOV
311
bool allCheckDownstreamRsp(SStreamMeta* pMeta, STaskStartInfo* pStartInfo, int32_t numOfTotal) {
×
UNCOV
312
  for (int32_t i = 0; i < numOfTotal; ++i) {
×
UNCOV
313
    SStreamTaskId* pTaskId = taosArrayGet(pMeta->pTaskList, i);
×
UNCOV
314
    if (pTaskId == NULL) {
×
UNCOV
315
      continue;
×
316
    }
317

UNCOV
318
    STaskId idx = {.streamId = pTaskId->streamId, .taskId = pTaskId->taskId};
×
UNCOV
319
    void*   px = taosHashGet(pStartInfo->pReadyTaskSet, &idx, sizeof(idx));
×
UNCOV
320
    if (px == NULL) {
×
UNCOV
321
      px = taosHashGet(pStartInfo->pFailedTaskSet, &idx, sizeof(idx));
×
UNCOV
322
      if (px == NULL) {
×
UNCOV
323
        stDebug("vgId:%d s-task:0x%x start result not rsp yet", pMeta->vgId, (int32_t)idx.taskId);
×
UNCOV
324
        return false;
×
325
      }
326
    }
327
  }
328

UNCOV
329
  return true;
×
330
}
331

UNCOV
332
void displayStatusInfo(SStreamMeta* pMeta, SHashObj* pTaskSet, bool succ) {
×
UNCOV
333
  int32_t vgId = pMeta->vgId;
×
UNCOV
334
  void*   pIter = NULL;
×
UNCOV
335
  size_t  keyLen = 0;
×
336

UNCOV
337
  stInfo("vgId:%d %d tasks complete check-downstream, %s", vgId, taosHashGetSize(pTaskSet),
×
338
         succ ? "success" : "failed");
339

UNCOV
340
  while ((pIter = taosHashIterate(pTaskSet, pIter)) != NULL) {
×
UNCOV
341
    STaskInitTs* pInfo = pIter;
×
342
    void*        key = taosHashGetKey(pIter, &keyLen);
×
UNCOV
343
    SStreamTask* pTask = NULL;
×
UNCOV
344
    int32_t      code = streamMetaAcquireTaskUnsafe(pMeta, key, &pTask);
×
UNCOV
345
    if (code == 0) {
×
UNCOV
346
      stInfo("s-task:%s level:%d vgId:%d, init:%" PRId64 ", initEnd:%" PRId64 ", %s", pTask->id.idStr,
×
347
             pTask->info.taskLevel, vgId, pInfo->start, pInfo->end, pInfo->success ? "success" : "failed");
UNCOV
348
      streamMetaReleaseTask(pMeta, pTask);
×
349
    } else {
UNCOV
350
      stInfo("s-task:0x%x is dropped already, %s", (int32_t)((STaskId*)key)->taskId, succ ? "success" : "failed");
×
351
    }
352
  }
UNCOV
353
}
×
354

355
int32_t streamMetaInitStartInfo(STaskStartInfo* pStartInfo) {
588✔
356
  _hash_fn_t fp = taosGetDefaultHashFunction(TSDB_DATA_TYPE_VARCHAR);
588✔
357

358
  pStartInfo->pReadyTaskSet = taosHashInit(64, fp, false, HASH_NO_LOCK);
588✔
359
  if (pStartInfo->pReadyTaskSet == NULL) {
588!
UNCOV
360
    return terrno;
×
361
  }
362

363
  pStartInfo->pFailedTaskSet = taosHashInit(4, fp, false, HASH_NO_LOCK);
588✔
364
  if (pStartInfo->pFailedTaskSet == NULL) {
588!
UNCOV
365
    return terrno;
×
366
  }
367

368
  pStartInfo->pStagesList = taosArrayInit(4, sizeof(SStartTaskStageInfo));
588✔
369
  if (pStartInfo->pStagesList == NULL) {
588!
UNCOV
370
    return terrno;
×
371
  }
372

373
  return 0;
588✔
374
}
375

376
void streamMetaClearStartInfo(STaskStartInfo* pStartInfo) {
588✔
377
  taosHashCleanup(pStartInfo->pReadyTaskSet);
588✔
378
  taosHashCleanup(pStartInfo->pFailedTaskSet);
588✔
379
  taosArrayDestroy(pStartInfo->pStagesList);
588✔
380

381
  pStartInfo->readyTs = 0;
588✔
382
  pStartInfo->elapsedTime = 0;
588✔
383
  pStartInfo->startTs = 0;
588✔
384
  pStartInfo->startAllTasks = 0;
588✔
385
  pStartInfo->tasksWillRestart = 0;
588✔
386
  pStartInfo->restartCount = 0;
588✔
387
}
588✔
388

UNCOV
389
int32_t streamMetaStartOneTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId) {
×
UNCOV
390
  int32_t      code = 0;
×
391
  int32_t      vgId = pMeta->vgId;
×
392
  SStreamTask* pTask = NULL;
×
393
  bool         continueExec = true;
×
394

UNCOV
395
  stInfo("vgId:%d start task:0x%x by checking it's downstream status", vgId, taskId);
×
396

397
  code = streamMetaAcquireTask(pMeta, streamId, taskId, &pTask);
×
UNCOV
398
  if ((pTask == NULL) || (code != 0)) {
×
UNCOV
399
    stError("vgId:%d failed to acquire task:0x%x when starting task", vgId, taskId);
×
UNCOV
400
    int32_t ret = streamMetaAddFailedTask(pMeta, streamId, taskId, true);
×
UNCOV
401
    if (ret) {
×
UNCOV
402
      stError("s-task:0x%x add check downstream failed, core:%s", taskId, tstrerror(ret));
×
403
    }
404

405
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
406
  }
407

408
  // fill-history task can only be launched by related stream tasks.
UNCOV
409
  STaskExecStatisInfo* pInfo = &pTask->execInfo;
×
UNCOV
410
  if (pTask->info.fillHistory == 1) {
×
UNCOV
411
    stError("s-task:0x%x vgId:%d fill-history task, not start here", taskId, vgId);
×
UNCOV
412
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
413
    return TSDB_CODE_SUCCESS;
×
414
  }
415

416
  // the start all tasks procedure may happen to start the newly deployed stream task, and results in the
417
  // concurrent start this task by two threads.
UNCOV
418
  streamMutexLock(&pTask->lock);
×
419

UNCOV
420
  SStreamTaskState status = streamTaskGetStatus(pTask);
×
UNCOV
421
  if (status.state != TASK_STATUS__UNINIT) {
×
422
    stError("s-task:0x%x vgId:%d status:%s not uninit status, not start stream task", taskId, vgId, status.name);
×
423
    continueExec = false;
×
424
  } else {
UNCOV
425
    continueExec = true;
×
426
  }
427
  streamMutexUnlock(&pTask->lock);
×
428

429
  if (!continueExec) {
×
UNCOV
430
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
431
    return TSDB_CODE_STREAM_TASK_IVLD_STATUS;
×
432
  }
433

UNCOV
434
  if (pTask->status.downstreamReady != 0) {
×
UNCOV
435
    stFatal("s-task:0x%x downstream should be not ready, but it ready here, internal error happens", taskId);
×
UNCOV
436
    streamMetaReleaseTask(pMeta, pTask);
×
UNCOV
437
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
438
  }
439

UNCOV
440
  streamMetaWLock(pMeta);
×
441

442
  // avoid initialization and destroy running concurrently.
UNCOV
443
  streamMutexLock(&pTask->lock);
×
444
  if (pTask->pBackend == NULL) {
×
UNCOV
445
    code = pMeta->expandTaskFn(pTask);
×
UNCOV
446
    streamMutexUnlock(&pTask->lock);
×
447

UNCOV
448
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
449
      streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false);
×
450
    }
451
  } else {
UNCOV
452
    streamMutexUnlock(&pTask->lock);
×
453
  }
454

455
  // concurrently start task may cause the latter started task be failed, and also failed to added into meta result.
456
  if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
457
    code = streamTaskHandleEvent(pTask->status.pSM, TASK_EVENT_INIT);
×
UNCOV
458
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
459
      stError("s-task:%s vgId:%d failed to handle event:init-task, code:%s", pTask->id.idStr, pMeta->vgId,
×
460
              tstrerror(code));
461

462
      // do no added into result hashmap if it is failed due to concurrently starting of this stream task.
UNCOV
463
      if (code != TSDB_CODE_STREAM_CONFLICT_EVENT) {
×
UNCOV
464
        streamMetaAddFailedTaskSelf(pTask, pInfo->readyTs, false);
×
465
      }
466
    }
467
  }
468

UNCOV
469
  streamMetaWUnLock(pMeta);
×
UNCOV
470
  streamMetaReleaseTask(pMeta, pTask);
×
471

UNCOV
472
  return code;
×
473
}
474

475
int32_t streamMetaStopAllTasks(SStreamMeta* pMeta) {
822✔
476
  streamMetaWLock(pMeta);
822✔
477

478
  SArray* pTaskList = NULL;
822✔
479
  int32_t num = taosArrayGetSize(pMeta->pTaskList);
822✔
480
  stDebug("vgId:%d stop all %d stream task(s)", pMeta->vgId, num);
822✔
481

482
  if (num == 0) {
822!
483
    stDebug("vgId:%d stop all %d task(s) completed, elapsed time:0 Sec.", pMeta->vgId, num);
822✔
484
    streamMetaWUnLock(pMeta);
822✔
485
    return TSDB_CODE_SUCCESS;
822✔
486
  }
487

UNCOV
488
  int64_t st = taosGetTimestampMs();
×
489

490
  // send hb msg to mnode before closing all tasks.
UNCOV
491
  int32_t code = streamMetaSendMsgBeforeCloseTasks(pMeta, &pTaskList);
×
UNCOV
492
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
493
    streamMetaWUnLock(pMeta);
×
UNCOV
494
    return code;
×
495
  }
496

UNCOV
497
  int32_t numOfTasks = taosArrayGetSize(pTaskList);
×
UNCOV
498
  for (int32_t i = 0; i < numOfTasks; ++i) {
×
UNCOV
499
    SStreamTaskId* pTaskId = taosArrayGet(pTaskList, i);
×
UNCOV
500
    SStreamTask*   pTask = NULL;
×
501

UNCOV
502
    code = streamMetaAcquireTaskNoLock(pMeta, pTaskId->streamId, pTaskId->taskId, &pTask);
×
UNCOV
503
    if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
504
      continue;
×
505
    }
506

UNCOV
507
    int32_t ret = streamTaskStop(pTask);
×
UNCOV
508
    if (ret) {
×
UNCOV
509
      stError("s-task:0x%x failed to stop task, code:%s", pTaskId->taskId, tstrerror(ret));
×
510
    }
511

UNCOV
512
    streamMetaReleaseTask(pMeta, pTask);
×
513
  }
514

UNCOV
515
  taosArrayDestroy(pTaskList);
×
516

UNCOV
517
  double el = (taosGetTimestampMs() - st) / 1000.0;
×
UNCOV
518
  stDebug("vgId:%d stop all %d task(s) completed, elapsed time:%.2f Sec.", pMeta->vgId, num, el);
×
519

UNCOV
520
  streamMetaWUnLock(pMeta);
×
UNCOV
521
  return code;
×
522
}
523

UNCOV
524
int32_t streamTaskCheckIfReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
×
UNCOV
525
  SConsenChkptInfo* pConChkptInfo = &pTask->status.consenChkptInfo;
×
UNCOV
526
  int32_t           vgId = pTask->pMeta->vgId;
×
527

528
  if (pTask->pMeta->startInfo.curStage == START_MARK_REQ_CHKPID) {
×
UNCOV
529
    if (pConChkptInfo->status == TASK_CONSEN_CHKPT_REQ) {
×
530
      // mark the sending of req consensus checkpoint request.
531
      pConChkptInfo->status = TASK_CONSEN_CHKPT_SEND;
×
UNCOV
532
      stDebug("s-task:%s vgId:%d set requiring consensus-chkptId in hbMsg, ts:%" PRId64, pTask->id.idStr, vgId,
×
533
              pConChkptInfo->statusTs);
UNCOV
534
      return 1;
×
UNCOV
535
    } else if (pConChkptInfo->status == 0) {
×
UNCOV
536
      stDebug("vgId:%d s-task:%s not need to set the req checkpointId, current stage:%d", vgId, pTask->id.idStr,
×
537
              pConChkptInfo->status);
538
    } else {
UNCOV
539
      stWarn("vgId:%d, s-task:%s restart procedure expired, start stage:%d", vgId, pTask->id.idStr,
×
540
             pConChkptInfo->status);
541
    }
542
  }
543

UNCOV
544
  return 0;
×
545
}
546

UNCOV
547
void streamTaskSetConsenChkptIdRecv(SStreamTask* pTask, int32_t transId, int64_t ts) {
×
UNCOV
548
  SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
×
UNCOV
549
  pInfo->consenChkptTransId = transId;
×
UNCOV
550
  pInfo->status = TASK_CONSEN_CHKPT_RECV;
×
UNCOV
551
  pInfo->statusTs = ts;
×
552

UNCOV
553
  stInfo("s-task:%s set recv consen-checkpointId, transId:%d", pTask->id.idStr, transId);
×
UNCOV
554
}
×
555

UNCOV
556
void streamTaskSetReqConsenChkptId(SStreamTask* pTask, int64_t ts) {
×
UNCOV
557
  SConsenChkptInfo* pInfo = &pTask->status.consenChkptInfo;
×
UNCOV
558
  int32_t           prevTrans = pInfo->consenChkptTransId;
×
559

560
  pInfo->status = TASK_CONSEN_CHKPT_REQ;
×
561
  pInfo->statusTs = ts;
×
562
  pInfo->consenChkptTransId = 0;
×
563

564
  stDebug("s-task:%s set req consen-checkpointId flag, prev transId:%d, ts:%" PRId64 ", task created ts:%" PRId64,
×
565
          pTask->id.idStr, prevTrans, ts, pTask->execInfo.created);
566
}
×
567

UNCOV
568
int32_t streamMetaAddFailedTask(SStreamMeta* pMeta, int64_t streamId, int32_t taskId, bool lock) {
×
569
  int32_t      code = TSDB_CODE_SUCCESS;
×
UNCOV
570
  int64_t      now = taosGetTimestampMs();
×
571
  int64_t      startTs = 0;
×
572
  bool         hasFillhistoryTask = false;
×
UNCOV
573
  STaskId      hId = {0};
×
UNCOV
574
  STaskId      id = {.streamId = streamId, .taskId = taskId};
×
575
  SStreamTask* pTask = NULL;
×
576

577
  stDebug("vgId:%d add start failed task:0x%x", pMeta->vgId, taskId);
×
578

579
  if (lock) {
×
580
    streamMetaRLock(pMeta);
×
581
  }
582

583
  code = streamMetaAcquireTaskUnsafe(pMeta, &id, &pTask);
×
UNCOV
584
  if (code == 0) {
×
UNCOV
585
    startTs = pTask->taskCheckInfo.startTs;
×
UNCOV
586
    hasFillhistoryTask = HAS_RELATED_FILLHISTORY_TASK(pTask);
×
587
    hId = pTask->hTaskInfo.id;
×
588
    streamMetaReleaseTask(pMeta, pTask);
×
589

590
    if (lock) {
×
UNCOV
591
      streamMetaRUnLock(pMeta);
×
592
    }
593

594
    // add the failed task info, along with the related fill-history task info into tasks list.
595
    if (lock) {
×
UNCOV
596
      code = streamMetaAddTaskLaunchResult(pMeta, streamId, taskId, startTs, now, false);
×
UNCOV
597
      if (hasFillhistoryTask) {
×
UNCOV
598
        code = streamMetaAddTaskLaunchResult(pMeta, hId.streamId, hId.taskId, startTs, now, false);
×
599
      }
600
    } else {
UNCOV
601
      code = streamMetaAddTaskLaunchResultNoLock(pMeta, streamId, taskId, startTs, now, false);
×
UNCOV
602
      if (hasFillhistoryTask) {
×
603
        code = streamMetaAddTaskLaunchResultNoLock(pMeta, hId.streamId, hId.taskId, startTs, now, false);
×
604
      }
605
    }
606
  } else {
UNCOV
607
    if (lock) {
×
608
      streamMetaRUnLock(pMeta);
×
609
    }
610

UNCOV
611
    stError("failed to locate the stream task:0x%" PRIx64 "-0x%x (vgId:%d), it may have been destroyed or stopped",
×
612
            streamId, taskId, pMeta->vgId);
UNCOV
613
    code = TSDB_CODE_STREAM_TASK_NOT_EXIST;
×
614
  }
615

UNCOV
616
  return code;
×
617
}
618

UNCOV
619
void streamMetaAddFailedTaskSelf(SStreamTask* pTask, int64_t failedTs, bool lock) {
×
UNCOV
620
  int32_t startTs = pTask->execInfo.checkTs;
×
UNCOV
621
  int32_t code = 0;
×
622

623
  if (lock) {
×
UNCOV
624
    code = streamMetaAddTaskLaunchResult(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs, false);
×
625
  } else {
UNCOV
626
    code = streamMetaAddTaskLaunchResultNoLock(pTask->pMeta, pTask->id.streamId, pTask->id.taskId, startTs, failedTs,
×
627
                                               false);
628
  }
629

UNCOV
630
  if (code) {
×
UNCOV
631
    stError("s-task:%s failed to add self task failed to start, code:%s", pTask->id.idStr, tstrerror(code));
×
632
  }
633

634
  // automatically set the related fill-history task to be failed.
UNCOV
635
  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
×
UNCOV
636
    STaskId* pId = &pTask->hTaskInfo.id;
×
637

UNCOV
638
    if (lock) {
×
UNCOV
639
      code = streamMetaAddTaskLaunchResult(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
×
640
    } else {
UNCOV
641
      code = streamMetaAddTaskLaunchResultNoLock(pTask->pMeta, pId->streamId, pId->taskId, startTs, failedTs, false);
×
642
    }
643

UNCOV
644
    if (code) {
×
UNCOV
645
      stError("s-task:0x%" PRIx64 " failed to add self task failed to start, code:%s", pId->taskId, tstrerror(code));
×
646
    }
647
  }
UNCOV
648
}
×
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc