• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3768

28 Mar 2025 10:15AM UTC coverage: 33.726% (-0.3%) from 33.993%
#3768

push

travis-ci

happyguoxy
test:alter lcov result

144891 of 592084 branches covered (24.47%)

Branch coverage included in aggregate %.

218795 of 486283 relevant lines covered (44.99%)

765715.29 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

31.61
/source/libs/stream/src/streamStartHistory.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "streamInt.h"
17
#include "streamsm.h"
18
#include "tref.h"
19
#include "trpc.h"
20
#include "ttimer.h"
21
#include "wal.h"
22

23
#define SCANHISTORY_IDLE_TIME_SLICE 100  // 100ms
24
#define SCANHISTORY_MAX_IDLE_TIME   10   // 10 sec
25
#define SCANHISTORY_IDLE_TICK       ((SCANHISTORY_MAX_IDLE_TIME * 1000) / SCANHISTORY_IDLE_TIME_SLICE)
26

27
typedef struct SLaunchHTaskInfo {
28
  int64_t metaRid;
29
  STaskId id;
30
  STaskId hTaskId;
31
} SLaunchHTaskInfo;
32

33
static int32_t streamSetParamForScanHistory(SStreamTask* pTask);
34
static int32_t streamTaskSetRangeStreamCalc(SStreamTask* pTask);
35
static void    initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated);
36
static int32_t createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId,
37
                                     SLaunchHTaskInfo** pInfo);
38
static void    tryLaunchHistoryTask(void* param, void* tmrId);
39
static void    doExecScanhistoryInFuture(void* param, void* tmrId);
40
static int32_t doStartScanHistoryTask(SStreamTask* pTask);
41
static int32_t streamTaskStartScanHistory(SStreamTask* pTask);
42
static void    checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask);
43
static int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask, bool lock);
44
static void    doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
45
static void    notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now);
46

47
static int32_t streamTaskSetReady(SStreamTask* pTask) {
28✔
48
  int32_t          numOfDowns = streamTaskGetNumOfDownstream(pTask);
28✔
49
  SStreamTaskState p = streamTaskGetStatus(pTask);
28✔
50

51
  if ((p.state == TASK_STATUS__SCAN_HISTORY) && pTask->info.taskLevel != TASK_LEVEL__SOURCE) {
28✔
52
    int32_t numOfUps = taosArrayGetSize(pTask->upstreamInfo.pList);
3✔
53
    stDebug("s-task:%s level:%d task wait for %d upstream tasks complete scan-history procedure, status:%s",
3!
54
            pTask->id.idStr, pTask->info.taskLevel, numOfUps, p.name);
55
  }
56

57
  pTask->status.downstreamReady = 1;
28✔
58
  pTask->execInfo.readyTs = taosGetTimestampMs();
28✔
59

60
  int64_t el = (pTask->execInfo.readyTs - pTask->execInfo.checkTs);
28✔
61
  stDebug("s-task:%s all %d downstream ready, init completed, elapsed time:%" PRId64 "ms, task status:%s",
28!
62
          pTask->id.idStr, numOfDowns, el, p.name);
63
  return TSDB_CODE_SUCCESS;
28✔
64
}
65

66
int32_t streamStartScanHistoryAsync(SStreamTask* pTask, int8_t igUntreated) {
2✔
67
  SStreamScanHistoryReq req;
68
  initScanHistoryReq(pTask, &req, igUntreated);
2✔
69

70
  int32_t len = sizeof(SStreamScanHistoryReq);
2✔
71
  void*   serializedReq = rpcMallocCont(len);
2✔
72
  if (serializedReq == NULL) {
2!
73
    return terrno;
×
74
  }
75

76
  memcpy(serializedReq, &req, len);
2✔
77

78
  SRpcMsg rpcMsg = {.contLen = len, .pCont = serializedReq, .msgType = TDMT_VND_STREAM_SCAN_HISTORY};
2✔
79
  return tmsgPutToQueue(pTask->pMsgCb, STREAM_LONG_EXEC_QUEUE, &rpcMsg);
2✔
80
}
81

82
void streamExecScanHistoryInFuture(SStreamTask* pTask, int32_t idleDuration) {
×
83
  int32_t vgId = pTask->pMeta->vgId;
×
84
  int32_t numOfTicks = idleDuration / SCANHISTORY_IDLE_TIME_SLICE;
×
85
  if (numOfTicks <= 0) {
×
86
    numOfTicks = 1;
×
87
  } else if (numOfTicks > SCANHISTORY_IDLE_TICK) {
×
88
    numOfTicks = SCANHISTORY_IDLE_TICK;
×
89
  }
90

91
  pTask->schedHistoryInfo.numOfTicks = numOfTicks;
×
92

93
  stDebug("s-task:%s scan-history resumed in %.2fs", pTask->id.idStr, numOfTicks * 0.1);
×
94
  int64_t* pTaskRefId = NULL;
×
95
  int32_t  ret = streamTaskAllocRefId(pTask, &pTaskRefId);
×
96
  if (ret == 0) {
×
97
    streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTaskRefId, streamTimer,
×
98
                   &pTask->schedHistoryInfo.pTimer, vgId, "history-task");
99
  }
100
}
×
101

102
int32_t streamTaskStartScanHistory(SStreamTask* pTask) {
5✔
103
  int32_t          level = pTask->info.taskLevel;
5✔
104
  SStreamTaskState state = streamTaskGetStatus(pTask);
5✔
105

106
  if (((pTask->status.downstreamReady != 1) || (state.state != TASK_STATUS__SCAN_HISTORY) ||
5!
107
       (pTask->info.fillHistory != 1))) {
5!
108
    stFatal("s-task:%s invalid status:%s to start fill-history task, downReady:%d, is-fill-history task:%d",
×
109
            pTask->id.idStr, state.name, pTask->status.downstreamReady, pTask->info.fillHistory);
110
    return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
111
  }
112

113
  if (level == TASK_LEVEL__SOURCE) {
5✔
114
    return doStartScanHistoryTask(pTask);
2✔
115
  } else if (level == TASK_LEVEL__AGG) {
3✔
116
    return streamSetParamForScanHistory(pTask);
1✔
117
  } else if (level == TASK_LEVEL__SINK) {
2!
118
    stDebug("s-task:%s sink task do nothing to handle scan-history", pTask->id.idStr);
2!
119
  }
120
  return 0;
2✔
121
}
122

123
int32_t streamTaskOnNormalTaskReady(SStreamTask* pTask) {
23✔
124
  const char* id = pTask->id.idStr;
23✔
125
  int32_t     code = 0;
23✔
126

127
  code = streamTaskSetReady(pTask);
23✔
128
  if (code) {
23!
129
    stError("s-task:%s failed to set task status ready", id);
×
130
    return code;
×
131
  }
132

133
  code = streamTaskSetRangeStreamCalc(pTask);
23✔
134
  if (code) {
23!
135
    stError("s-task:%s failed to set the time range for stream task", id);
×
136
    return code;
×
137
  }
138

139
  SStreamTaskState p = streamTaskGetStatus(pTask);
23✔
140

141
  int8_t schedStatus = pTask->status.schedStatus;
23✔
142
  if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
23✔
143
    int64_t startVer = walReaderGetCurrentVer(pTask->exec.pWalReader);
11✔
144
    if (startVer == -1) {
11!
145
      startVer = pTask->chkInfo.nextProcessVer;
11✔
146
    }
147

148
    stDebug("s-task:%s status:%s, sched-status:%d, ready for data from wal ver:%" PRId64, id, p.name, schedStatus,
11!
149
            startVer);
150
  } else {
151
    stDebug("s-task:%s level:%d status:%s sched-status:%d", id, pTask->info.taskLevel, p.name, schedStatus);
12!
152
  }
153

154
  return code;
23✔
155
}
156

157
int32_t streamTaskOnScanHistoryTaskReady(SStreamTask* pTask) {
5✔
158
  // set the state to be ready
159
  int32_t code = streamTaskSetReady(pTask);
5✔
160

161
  if (pTask->info.fillHistory == STREAM_RECALCUL_TASK) {
5!
162
    // if this task is used as the recalculate task, do nothing
163
  } else {
164
    if (code == 0) {
5!
165
      code = streamTaskSetRangeStreamCalc(pTask);
5✔
166
    }
167

168
    if (code == 0) {
5!
169
      SStreamTaskState p = streamTaskGetStatus(pTask);
5✔
170
      stDebug("s-task:%s fill-history task enters into scan-history data stage, status:%s", pTask->id.idStr, p.name);
5!
171
      code = streamTaskStartScanHistory(pTask);
5✔
172
    }
173
  }
174

175
  // NOTE: there will be a deadlock if launch fill history here.
176
  // start the related fill-history task, when current task is ready
177
  //  if (HAS_RELATED_FILLHISTORY_TASK(pTask)) {
178
  //    streamLaunchFillHistoryTask(pTask);
179
  //  }
180

181
  return code;
5✔
182
}
183

184
// common
185
int32_t streamSetParamForScanHistory(SStreamTask* pTask) {
3✔
186
  stDebug("s-task:%s set operator option for scan-history data", pTask->id.idStr);
3!
187
  return qSetStreamOperatorOptionForScanHistory(pTask->exec.pExecutor);
3✔
188
}
189

190
// source
191
int32_t streamSetParamForStreamScannerStep1(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow) {
2✔
192
  return qStreamSourceScanParamForHistoryScanStep1(pTask->exec.pExecutor, pVerRange, pWindow);
2✔
193
}
194

195
int32_t streamSetParamForStreamScannerStep2(SStreamTask* pTask, SVersionRange* pVerRange, STimeWindow* pWindow) {
2✔
196
  return qStreamSourceScanParamForHistoryScanStep2(pTask->exec.pExecutor, pVerRange, pWindow);
2✔
197
}
198

199
// an fill history task needs to be started.
200
int32_t streamLaunchFillHistoryTask(SStreamTask* pTask, bool lock) {
5✔
201
  SStreamMeta*         pMeta = pTask->pMeta;
5✔
202
  STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
5✔
203
  const char*          idStr = pTask->id.idStr;
5✔
204
  int64_t              hStreamId = pTask->hTaskInfo.id.streamId;
5✔
205
  int32_t              hTaskId = pTask->hTaskInfo.id.taskId;
5✔
206
  int64_t              now = taosGetTimestampMs();
5✔
207
  int32_t              code = 0;
5✔
208
  SStreamTask*         pHisTask = NULL;
5✔
209

210
  // check stream task status in the first place.
211
  SStreamTaskState status = streamTaskGetStatus(pTask);
5✔
212
  if (status.state != TASK_STATUS__READY && status.state != TASK_STATUS__HALT && status.state != TASK_STATUS__PAUSE) {
5!
213
    stDebug("s-task:%s not launch related fill-history task:0x%" PRIx64 "-0x%x, status:%s", idStr, hStreamId, hTaskId,
×
214
            status.name);
215
    if (lock) {
×
216
      return streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
×
217
    } else {
218
      return streamMetaAddTaskLaunchResultNoLock(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs,
×
219
                                                 false);
220
    }
221
  }
222

223
  stDebug("s-task:%s start to launch related fill-history task:0x%" PRIx64 "-0x%x", idStr, hStreamId, hTaskId);
5!
224

225
  // Set the execution conditions, including the query time window and the version range
226
  if (lock) {
5✔
227
    streamMetaRLock(pMeta);
3✔
228
  }
229

230
  code = streamMetaAcquireTaskUnsafe(pMeta, &pTask->hTaskInfo.id, &pHisTask);
5✔
231

232
  if (lock) {
5✔
233
    streamMetaRUnLock(pMeta);
3✔
234
  }
235

236
  if (code == 0) {                                // it is already added into stream meta store.
5!
237
    if (pHisTask->status.downstreamReady == 1) {  // it's ready now, do nothing
5!
238
      stDebug("s-task:%s fill-history task is ready, no need to check downstream", pHisTask->id.idStr);
×
239
      if (lock) {
×
240
        code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, true);
×
241
      } else {
242
        code = streamMetaAddTaskLaunchResultNoLock(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs,
×
243
                                                   true);
244
      }
245

246
      if (code) {
×
247
        stError("s-task:%s failed to record start task status, code:%s", idStr, tstrerror(code));
×
248
      }
249
    } else {  // exist, but not ready, continue check downstream task status
250
      if (pHisTask->pBackend == NULL) {
5!
251
        code = pMeta->expandTaskFn(pHisTask);
5✔
252
        if (code != TSDB_CODE_SUCCESS) {
5!
253
          streamMetaAddFailedTaskSelf(pHisTask, now, lock);
×
254
          stError("s-task:%s failed to expand fill-history task, code:%s", pHisTask->id.idStr, tstrerror(code));
×
255
        }
256
      }
257

258
      if (code == TSDB_CODE_SUCCESS) {
5!
259
        checkFillhistoryTaskStatus(pTask, pHisTask);
5✔
260
      }
261
    }
262

263
    streamMetaReleaseTask(pMeta, pHisTask);
5✔
264
    return code;
5✔
265
  } else {
266
    return launchNotBuiltFillHistoryTask(pTask, lock);
×
267
  }
268
}
269

270
///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
271
void initScanHistoryReq(SStreamTask* pTask, SStreamScanHistoryReq* pReq, int8_t igUntreated) {
2✔
272
  pReq->msgHead.vgId = pTask->info.nodeId;
2✔
273
  pReq->streamId = pTask->id.streamId;
2✔
274
  pReq->taskId = pTask->id.taskId;
2✔
275
  pReq->igUntreated = igUntreated;
2✔
276
}
2✔
277

278
void checkFillhistoryTaskStatus(SStreamTask* pTask, SStreamTask* pHTask) {
5✔
279
  SDataRange* pRange = &pHTask->dataRange;
5✔
280

281
  // the query version range should be limited to the already processed data
282
  pHTask->execInfo.checkTs = taosGetTimestampMs();
5✔
283

284
  if (pHTask->info.fillHistory == STREAM_HISTORY_TASK) {
5!
285
    if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
5✔
286
      stDebug("s-task:%s set the launch condition for fill-history s-task:%s, window:%" PRId64 " - %" PRId64
2!
287
              " verRange:%" PRId64 " - %" PRId64 ", init:%" PRId64,
288
              pTask->id.idStr, pHTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
289
              pRange->range.maxVer, pHTask->execInfo.checkTs);
290
    } else {
291
      stDebug("s-task:%s no fill-history condition for non-source task:%s", pTask->id.idStr, pHTask->id.idStr);
3!
292
    }
293

294
    int32_t code = streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT_SCANHIST);
5✔
295
    if (code) {
5!
296
      stError("s-task:%s rel hist task:%s handle event init_scanhist failed", pTask->id.idStr, pHTask->id.idStr);
×
297
    }
298
  } else { // check if downstream tasks have been ready
299
    stDebug("s-task:%s start rel recalculate task:%s, handle event init", pTask->id.idStr, pHTask->id.idStr);
×
300
    int32_t code = streamTaskHandleEvent(pHTask->status.pSM, TASK_EVENT_INIT);
×
301
    if (code) {
×
302
      stError("s-task:%s rel hist task:%s handle event init failed", pTask->id.idStr, pHTask->id.idStr);
×
303
    }
304
  }
305
}
5✔
306

307
void notRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
×
308
  SStreamMeta*      pMeta = pTask->pMeta;
×
309
  SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
×
310

311
  int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
×
312

313
  if (code) {
×
314
    stError("s-task:%s failed to record the start task status, code:%s", pTask->id.idStr, tstrerror(code));
×
315
  } else {
316
    stError("s-task:%s max retry:%d reached, quit from retrying launch related fill-history task:0x%x", pTask->id.idStr,
×
317
            MAX_RETRY_LAUNCH_HISTORY_TASK, (int32_t)pHTaskInfo->id.taskId);
318
  }
319

320
  pHTaskInfo->id.taskId = 0;
×
321
  pHTaskInfo->id.streamId = 0;
×
322
}
×
323

324
void doRetryLaunchFillHistoryTask(SStreamTask* pTask, SLaunchHTaskInfo* pInfo, int64_t now) {
×
325
  SStreamMeta*      pMeta = pTask->pMeta;
×
326
  SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
×
327

328
  if (streamTaskShouldStop(pTask)) {  // record the failure
×
329
    stDebug("s-task:0x%" PRIx64 " stopped, not launch rel history task:0x%" PRIx64, pInfo->id.taskId,
×
330
            pInfo->hTaskId.taskId);
331

332
    int32_t code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
×
333
    if (code) {
×
334
      stError("s-task:%s failed to record the start task status, code:%s", pTask->id.idStr, tstrerror(code));
×
335
    }
336
    taosMemoryFree(pInfo);
×
337
  } else {
338
    char*   p = streamTaskGetStatus(pTask).name;
×
339
    int32_t hTaskId = pHTaskInfo->id.taskId;
×
340

341
    stDebug("s-task:%s status:%s failed to launch fill-history task:0x%x, retry launch:%dms, retryCount:%d",
×
342
            pTask->id.idStr, p, hTaskId, pHTaskInfo->waitInterval, pHTaskInfo->retryTimes);
343

344
    streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
×
345
                   pTask->pMeta->vgId, " start-history-task-tmr");
×
346
  }
347
}
×
348

349
static void doCleanup(SStreamTask* pTask, int64_t metaRid, SLaunchHTaskInfo* pInfo) {
×
350
  SStreamMeta* pMeta = pTask->pMeta;
×
351
  int32_t      vgId = pMeta->vgId;
×
352

353
  streamMetaReleaseTask(pMeta, pTask);
×
354
  int32_t ret = taosReleaseRef(streamMetaRefPool, metaRid);
×
355
  if (ret) {
×
356
    stError("vgId:%d failed to release meta refId:%" PRId64, vgId, metaRid);
×
357
  }
358

359
  if (pInfo != NULL) {
×
360
    taosMemoryFree(pInfo);
×
361
  }
362
}
×
363

364
void tryLaunchHistoryTask(void* param, void* tmrId) {
×
365
  SLaunchHTaskInfo* pInfo = param;
×
366
  int64_t           metaRid = pInfo->metaRid;
×
367
  int64_t           now = taosGetTimestampMs();
×
368
  int32_t           code = 0;
×
369
  SStreamTask*      pTask = NULL;
×
370
  int32_t           vgId = 0;
×
371

372
  SStreamMeta* pMeta = taosAcquireRef(streamMetaRefPool, metaRid);
×
373
  if (pMeta == NULL) {
×
374
    stError("invalid meta rid:%" PRId64 " failed to acquired stream-meta", metaRid);
×
375
    taosMemoryFree(pInfo);
×
376
    return;
×
377
  }
378

379
  vgId = pMeta->vgId;
×
380

381
  streamMetaWLock(pMeta);
×
382

383
  code = streamMetaAcquireTaskUnsafe(pMeta, &pInfo->id, &pTask);
×
384
  if (code != 0) {
×
385
    stError("s-task:0x%x and rel fill-history task:0x%" PRIx64 " all have been destroyed, not launch",
×
386
            (int32_t)pInfo->id.taskId, pInfo->hTaskId.taskId);
387
    streamMetaWUnLock(pMeta);
×
388

389
    int32_t ret = taosReleaseRef(streamMetaRefPool, metaRid);
×
390
    if (ret) {
×
391
      stError("vgId:%d failed to release meta refId:%" PRId64, vgId, metaRid);
×
392
    }
393

394
    // already dropped, no need to set the failure info into the stream task meta.
395
    taosMemoryFree(pInfo);
×
396
    return;
×
397
  }
398

399
  if (streamTaskShouldStop(pTask)) {
×
400
    char* p = streamTaskGetStatus(pTask).name;
×
401
    stDebug("s-task:%s status:%s should stop, quit launch fill-history task timer, retry:%d", pTask->id.idStr, p,
×
402
            pTask->hTaskInfo.retryTimes);
403

404
    streamMetaWUnLock(pMeta);
×
405

406
    // record the related fill-history task failed
407
    code = streamMetaAddTaskLaunchResult(pMeta, pInfo->hTaskId.streamId, pInfo->hTaskId.taskId, 0, now, false);
×
408
    if (code) {
×
409
      stError("s-task:0x%" PRId64 " failed to record the start task status, code:%s", pInfo->hTaskId.taskId,
×
410
              tstrerror(code));
411
    }
412

413
    doCleanup(pTask, metaRid, pInfo);
×
414
    return;
×
415
  }
416

417
  streamMetaWUnLock(pMeta);
×
418

419
  SHistoryTaskInfo* pHTaskInfo = &pTask->hTaskInfo;
×
420
  pHTaskInfo->tickCount -= 1;
×
421
  if (pHTaskInfo->tickCount > 0) {
×
422
    streamTmrStart(tryLaunchHistoryTask, LAUNCH_HTASK_INTERVAL, pInfo, streamTimer, &pHTaskInfo->pTimer,
×
423
                   pTask->pMeta->vgId, " start-history-task-tmr");
×
424
    doCleanup(pTask, metaRid, NULL);
×
425
    return;
×
426
  }
427

428
  if (pHTaskInfo->retryTimes > MAX_RETRY_LAUNCH_HISTORY_TASK) {
×
429
    notRetryLaunchFillHistoryTask(pTask, pInfo, now);
×
430
  } else {  // not reach the limitation yet, let's continue retrying launch related fill-history task.
431
    streamTaskSetRetryInfoForLaunch(pHTaskInfo);
×
432

433
    // abort the timer if intend to stop task
434
    SStreamTask* pHTask = NULL;
×
435
    code = streamMetaAcquireTask(pMeta, pHTaskInfo->id.streamId, pHTaskInfo->id.taskId, &pHTask);
×
436
    if (pHTask == NULL) {
×
437
      doRetryLaunchFillHistoryTask(pTask, pInfo, now);
×
438
      doCleanup(pTask, metaRid, NULL);
×
439
      return;
×
440
    } else {
441
      if (pHTask->pBackend == NULL) {
×
442
        code = pMeta->expandTaskFn(pHTask);
×
443
        if (code != TSDB_CODE_SUCCESS) {
×
444
          streamMetaAddFailedTaskSelf(pHTask, now, true);
×
445
          stError("failed to expand fill-history task:%s, code:%s", pHTask->id.idStr, tstrerror(code));
×
446
        }
447
      }
448

449
      if (code == TSDB_CODE_SUCCESS) {
×
450
        checkFillhistoryTaskStatus(pTask, pHTask);
×
451
        // not in timer anymore
452
        stDebug("s-task:0x%x fill-history task launch completed, retry times:%d", (int32_t)pInfo->id.taskId,
×
453
                pHTaskInfo->retryTimes);
454
      }
455
      streamMetaReleaseTask(pMeta, pHTask);
×
456
    }
457
  }
458

459
  doCleanup(pTask, metaRid, pInfo);
×
460
}
461

462
int32_t createHTaskLaunchInfo(SStreamMeta* pMeta, STaskId* pTaskId, int64_t hStreamId, int32_t hTaskId,
×
463
                              SLaunchHTaskInfo** pInfo) {
464
  *pInfo = taosMemoryCalloc(1, sizeof(SLaunchHTaskInfo));
×
465
  if ((*pInfo) == NULL) {
×
466
    return terrno;
×
467
  }
468

469
  (*pInfo)->id.streamId = pTaskId->streamId;
×
470
  (*pInfo)->id.taskId = pTaskId->taskId;
×
471

472
  (*pInfo)->hTaskId.streamId = hStreamId;
×
473
  (*pInfo)->hTaskId.taskId = hTaskId;
×
474

475
  (*pInfo)->metaRid = pMeta->rid;
×
476
  return TSDB_CODE_SUCCESS;
×
477
}
478

479
int32_t launchNotBuiltFillHistoryTask(SStreamTask* pTask, bool lock) {
×
480
  SStreamMeta*         pMeta = pTask->pMeta;
×
481
  STaskExecStatisInfo* pExecInfo = &pTask->execInfo;
×
482
  const char*          idStr = pTask->id.idStr;
×
483
  int64_t              hStreamId = pTask->hTaskInfo.id.streamId;
×
484
  int32_t              hTaskId = pTask->hTaskInfo.id.taskId;
×
485
  SLaunchHTaskInfo*    pInfo = NULL;
×
486
  int32_t              ret = 0;
×
487

488
  stWarn("s-task:%s vgId:%d failed to launch history task:0x%x, since not built yet", idStr, pMeta->vgId, hTaskId);
×
489

490
  STaskId id = streamTaskGetTaskId(pTask);
×
491
  int32_t code = createHTaskLaunchInfo(pMeta, &id, hStreamId, hTaskId, &pInfo);
×
492
  if (code) {
×
493
    stError("s-task:%s failed to launch related fill-history task, since Out Of Memory", idStr);
×
494
    if (lock) {
×
495
      ret = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
×
496
    } else {
497
      ret = streamMetaAddTaskLaunchResultNoLock(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
×
498
    }
499

500
    if (ret) {
×
501
      stError("s-task:%s add task check downstream result failed, code:%s", idStr, tstrerror(ret));
×
502
    }
503

504
    return code;
×
505
  }
506

507
  // set the launch time info
508
  streamTaskInitForLaunchHTask(&pTask->hTaskInfo);
×
509

510
  // check for the timer
511
  if (pTask->hTaskInfo.pTimer == NULL) {
×
512
    pTask->hTaskInfo.pTimer = taosTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer);
×
513

514
    if (pTask->hTaskInfo.pTimer == NULL) {
×
515
      stError("s-task:%s failed to start timer, related fill-history task not launched", idStr);
×
516

517
      taosMemoryFree(pInfo);
×
518

519
      if (lock) {
×
520
        code = streamMetaAddTaskLaunchResult(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
×
521
      } else {
522
        code = streamMetaAddTaskLaunchResultNoLock(pMeta, hStreamId, hTaskId, pExecInfo->checkTs, pExecInfo->readyTs, false);
×
523
      }
524

525
      if (code) {
×
526
        stError("s-task:0x%x failed to record the start task status, code:%s", hTaskId, tstrerror(code));
×
527
      }
528
      return terrno;
×
529
    }
530

531
    stDebug("s-task:%s set timer active flag", idStr);
×
532
  } else {  // timer exists
533
    stDebug("s-task:%s set timer active flag, task timer not null", idStr);
×
534
    streamTmrStart(tryLaunchHistoryTask, WAIT_FOR_MINIMAL_INTERVAL, pInfo, streamTimer, &pTask->hTaskInfo.pTimer,
×
535
                   pTask->pMeta->vgId, " start-history-task-tmr");
×
536
  }
537

538
  return TSDB_CODE_SUCCESS;
×
539
}
540

541
int32_t streamTaskResetTimewindowFilter(SStreamTask* pTask) {
2✔
542
  void* exec = pTask->exec.pExecutor;
2✔
543
  return qStreamInfoResetTimewindowFilter(exec);
2✔
544
}
545

546
bool streamHistoryTaskSetVerRangeStep2(SStreamTask* pTask, int64_t nextProcessVer) {
2✔
547
  SVersionRange* pRange = &pTask->dataRange.range;
2✔
548
  if (nextProcessVer < pRange->maxVer) {
2!
549
    stError("s-task:%s next processdVer:%" PRId64 " is less than range max ver:%" PRId64, pTask->id.idStr,
×
550
            nextProcessVer, pRange->maxVer);
551
    return true;
×
552
  }
553

554
  // maxVer for fill-history task is the version, where the last timestamp is acquired.
555
  // it's also the maximum version to scan data in tsdb.
556
  int64_t walScanStartVer = pRange->maxVer + 1;
2✔
557
  if (walScanStartVer > nextProcessVer - 1) {
2!
558
    stDebug(
2!
559
        "s-task:%s no need to perform secondary scan-history data(step 2), since no data ingest during step1 scan, "
560
        "related stream task currentVer:%" PRId64,
561
        pTask->id.idStr, nextProcessVer);
562
    return true;
2✔
563
  } else {
564
    // 2. do secondary scan of the history data, the time window remain, and the version range is updated to
565
    // [pTask->dataRange.range.maxVer, ver1]
566
    pTask->step2Range.minVer = walScanStartVer;
×
567
    pTask->step2Range.maxVer = nextProcessVer - 1;
×
568
    stDebug("s-task:%s set step2 verRange:%" PRId64 "-%" PRId64 ", step1 verRange:%" PRId64 "-%" PRId64,
×
569
            pTask->id.idStr, pTask->step2Range.minVer, pTask->step2Range.maxVer, pRange->minVer, pRange->maxVer);
570
    return false;
×
571
  }
572
}
573

574
int32_t streamTaskSetRangeStreamCalc(SStreamTask* pTask) {
28✔
575
  SDataRange* pRange = &pTask->dataRange;
28✔
576

577
  if (!HAS_RELATED_FILLHISTORY_TASK(pTask)) {
28✔
578
    if (pTask->info.fillHistory == STREAM_HISTORY_TASK) {
23✔
579
      stDebug("s-task:%s fill-history task, time window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64,
5!
580
              pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
581
    } else {
582
      stDebug(
18!
583
          "s-task:%s no related fill-history task, stream time window and verRange are not set. default stream time "
584
          "window:%" PRId64 "-%" PRId64 ", verRange:%" PRId64 "-%" PRId64,
585
          pTask->id.idStr, pRange->window.skey, pRange->window.ekey, pRange->range.minVer, pRange->range.maxVer);
586
    }
587

588
    return TSDB_CODE_SUCCESS;
23✔
589
  } else {  // has related helper tasks
590
    if (pTask->info.fillHistory != STREAM_NORMAL_TASK) {
5!
591
      stError("s-task:%s task should not be fill-history task, internal error", pTask->id.idStr);
×
592
      return TSDB_CODE_STREAM_INTERNAL_ERROR;
×
593
    }
594

595
    if (pTask->info.taskLevel >= TASK_LEVEL__AGG) {
5✔
596
      return TSDB_CODE_SUCCESS;
3✔
597
    }
598

599
    if (pTask->info.trigger == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
2!
600
      stDebug("s-task:%s level:%d related recalculate task exist, do nothing", pTask->id.idStr, pTask->info.taskLevel);
×
601
      return 0;
×
602
    } else {
603
      stDebug("s-task:%s level:%d related fill-history task exists, stream task timeWindow:%" PRId64 " - %" PRId64
2!
604
              ", verRang:%" PRId64 " - %" PRId64,
605
              pTask->id.idStr, pTask->info.taskLevel, pRange->window.skey, pRange->window.ekey, pRange->range.minVer,
606
              pRange->range.maxVer);
607

608
      SVersionRange verRange = pRange->range;
2✔
609
      STimeWindow   win = pRange->window;
2✔
610
      return streamSetParamForStreamScannerStep2(pTask, &verRange, &win);
2✔
611
    }
612
  }
613
}
614

615
void doExecScanhistoryInFuture(void* param, void* tmrId) {
×
616
  int64_t taskRefId = *(int64_t*)param;
×
617

618
  SStreamTask* pTask = taosAcquireRef(streamTaskRefPool, taskRefId);
×
619
  if (pTask == NULL) {
×
620
    stError("invalid task rid:%" PRId64 " failed to acquired stream-task at %s", taskRefId, __func__);
×
621
    streamTaskFreeRefId(param);
×
622
    return;
×
623
  }
624

625
  pTask->schedHistoryInfo.numOfTicks -= 1;
×
626

627
  SStreamTaskState p = streamTaskGetStatus(pTask);
×
628
  if (p.state == TASK_STATUS__DROPPING || p.state == TASK_STATUS__STOP) {
×
629
    stDebug("s-task:%s status:%s not start scan-history again", pTask->id.idStr, p.name);
×
630
    streamMetaReleaseTask(pTask->pMeta, pTask);
×
631
    streamTaskFreeRefId(param);
×
632
    return;
×
633
  }
634

635
  if (pTask->schedHistoryInfo.numOfTicks <= 0) {
×
636
    int32_t code = streamStartScanHistoryAsync(pTask, 0);
×
637
    if (code) {
×
638
      stError("s-task:%s async start history task failed", pTask->id.idStr);
×
639
    }
640

641
    stDebug("s-task:%s fill-history:%d start scan-history data, out of tmr", pTask->id.idStr, pTask->info.fillHistory);
×
642
  } else {
643
    int64_t* pTaskRefId = NULL;
×
644
    int32_t  code = streamTaskAllocRefId(pTask, &pTaskRefId);
×
645
    if (code == 0) {
×
646
      streamTmrStart(doExecScanhistoryInFuture, SCANHISTORY_IDLE_TIME_SLICE, pTaskRefId, streamTimer,
×
647
                     &pTask->schedHistoryInfo.pTimer, pTask->pMeta->vgId, " start-history-task-tmr");
×
648
    }
649
  }
650

651
  streamMetaReleaseTask(pTask->pMeta, pTask);
×
652
  streamTaskFreeRefId(param);
×
653
}
654

655
int32_t doStartScanHistoryTask(SStreamTask* pTask) {
2✔
656
  int32_t        code = 0;
2✔
657
  SVersionRange* pRange = &pTask->dataRange.range;
2✔
658

659
  if (pTask->info.fillHistory) {
2!
660
    code = streamSetParamForScanHistory(pTask);
2✔
661
    if (code) {
2!
662
      return code;
×
663
    }
664
  }
665

666
  code = streamSetParamForStreamScannerStep1(pTask, pRange, &pTask->dataRange.window);
2✔
667
  if (code) {
2!
668
    return code;
×
669
  }
670

671
  return streamStartScanHistoryAsync(pTask, 0);
2✔
672
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc