• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5053

13 May 2026 12:00PM UTC coverage: 73.397% (+0.06%) from 73.338%
#5053

push

travis-ci

web-flow
feat: taosdump support stream backup/restore (#35326)

139 of 170 new or added lines in 3 files covered. (81.76%)

627 existing lines in 131 files now uncovered.

281694 of 383795 relevant lines covered (73.4%)

132505311.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

82.83
/source/libs/executor/src/filloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "os.h"
18
#include "query.h"
19
#include "taosdef.h"
20
#include "taoserror.h"
21
#include "tmsg.h"
22
#include "ttypes.h"
23

24
#include "executorInt.h"
25
#include "tcommon.h"
26
#include "thash.h"
27
#include "ttime.h"
28

29
#include "function.h"
30
#include "operator.h"
31
#include "querynodes.h"
32
#include "querytask.h"
33
#include "tdatablock.h"
34
#include "tfill.h"
35

36
typedef struct STimeRange {
37
  TSKEY    skey;
38
  TSKEY    ekey;
39
  uint64_t groupId;
40
} STimeRange;
41

42
typedef struct SFillOperatorInfo {
43
  struct SFillInfo* pFillInfo;
44
  SSDataBlock*      pRes;
45
  SSDataBlock*      pFinalRes;
46
  int64_t           totalInputRows;
47
  void**            p;
48
  SSDataBlock*      existNewGroupBlock;
49
  STimeWindow       win;
50
  SColMatchInfo     matchInfo;
51
  int32_t           primaryTsCol;
52
  int32_t           primarySrcSlotId;
53
  uint64_t          curGroupId;  // current handled group id
54
  SExprInfo*        pExprInfo;
55
  int32_t           numOfExpr;
56
  SExprSupp         noFillExprSupp;
57
  SExprSupp         fillNullExprSupp;
58
  SList*            pFillSavedBlockList;
59
  SNode*            pTimeRange;  // STimeRangeNode for stream fill
60
} SFillOperatorInfo;
61

62
static void destroyFillOperatorInfo(void* param);
63
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
64
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo);
65
static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order);
66

67
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
534,942✔
68
                                               SResultInfo* pResultInfo, int32_t order) {
69
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
534,942✔
70
  SSDataBlock*   pResBlock = pInfo->pFinalRes;
534,942✔
71
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
534,942✔
72

73
  //  int32_t order = TSDB_ORDER_ASC;
74
  int32_t scanFlag = MAIN_SCAN;
534,942✔
75
  //  getTableScanInfo(pOperator, &order, &scanFlag, false);
76
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
534,942✔
77

78
  blockDataCleanup(pInfo->pRes);
534,942✔
79
  doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);
534,942✔
80

81
  reviseFillStartAndEndKey(pOperator->info, order);
534,942✔
82

83
  int64_t ts = (order == TSDB_ORDER_ASC) ? pInfo->existNewGroupBlock->info.window.ekey
534,942✔
84
                                         : pInfo->existNewGroupBlock->info.window.skey;
1,069,884✔
85
  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
534,942✔
86

87
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
534,942✔
88
  if (pInfo->pFillInfo->type == TSDB_FILL_PREV || pInfo->pFillInfo->type == TSDB_FILL_LINEAR) {
534,942✔
89
    int32_t code = fillResetPrevForNewGroup(pInfo->pFillInfo);
170,847✔
90
    if (code != TSDB_CODE_SUCCESS) {
170,847✔
91
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
92
      T_LONG_JMP(pTaskInfo->env, code);
×
93
    }
94
  }
95

96
  int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
534,942✔
97
  int32_t code = taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows, NULL);
534,942✔
98
  if (code != TSDB_CODE_SUCCESS) {
534,942✔
99
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
100
    T_LONG_JMP(pTaskInfo->env, code);
×
101
  }
102

103
  pInfo->curGroupId = pInfo->existNewGroupBlock->info.id.groupId;
534,942✔
104
  pInfo->existNewGroupBlock = NULL;
534,942✔
105
}
534,942✔
106

107
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
2,649,458✔
108
                                            SResultInfo* pResultInfo, int32_t order) {
109
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
2,649,458✔
110
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
2,649,458✔
111
    int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
950,024✔
112
    int32_t code = taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows, NULL);
950,024✔
113
    if (code != TSDB_CODE_SUCCESS) {
950,024✔
114
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
115
      T_LONG_JMP(pTaskInfo->env, code);
×
116
    }
117
    pInfo->pRes->info.id.groupId = pInfo->curGroupId;
950,024✔
118
    return;
950,024✔
119
  }
120

121
  // handle the cached new group data block
122
  if (pInfo->existNewGroupBlock) {
1,699,434✔
123
    doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order);
534,942✔
124
  }
125
}
126

127
void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
2,451,944✔
128
  int32_t            code = TSDB_CODE_SUCCESS;
2,451,944✔
129
  int32_t            lino = 0;
2,451,944✔
130
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
2,451,944✔
131
  SFillOperatorInfo* pInfo = pOperator->info;
2,451,944✔
132
  SExprSupp*         pSup = &pOperator->exprSupp;
2,451,944✔
133
  code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
2,451,944✔
134
  QUERY_CHECK_CODE(code, lino, _end);
2,451,944✔
135
  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo), pOperator->pTaskInfo);
2,451,944✔
136
  QUERY_CHECK_CODE(code, lino, _end);
2,451,944✔
137

138
  // reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
139
  pInfo->pRes->info.rows = 0;
2,451,944✔
140
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
2,451,944✔
141
  code = setInputDataBlock(pNoFillSupp, pBlock, order, scanFlag, false);
2,451,944✔
142
  QUERY_CHECK_CODE(code, lino, _end);
2,451,944✔
143

144
  code = projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs,
4,903,888✔
145
                               NULL, GET_STM_RTINFO(pOperator->pTaskInfo), pOperator->pTaskInfo);
2,451,944✔
146
  QUERY_CHECK_CODE(code, lino, _end);
2,451,944✔
147

148
  if (pInfo->fillNullExprSupp.pExprInfo) {
2,451,944✔
149
    pInfo->pRes->info.rows = 0;
125,784✔
150
    code = setInputDataBlock(&pInfo->fillNullExprSupp, pBlock, order, scanFlag, false);
125,784✔
151
    QUERY_CHECK_CODE(code, lino, _end);
125,784✔
152
    code = projectApplyFunctions(pInfo->fillNullExprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->fillNullExprSupp.pCtx,
251,568✔
153
        pInfo->fillNullExprSupp.numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo), pOperator->pTaskInfo);
125,784✔
154
  }
155

156
  pInfo->pRes->info.id.groupId = pBlock->info.id.groupId;
2,451,944✔
157

158
_end:
2,451,944✔
159
  if (code != TSDB_CODE_SUCCESS) {
2,451,944✔
160
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
161
    T_LONG_JMP(pTaskInfo->env, code);
×
162
  }
163
}
2,451,944✔
164

165
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo) {
170,847✔
166
  int32_t code = TSDB_CODE_SUCCESS;
170,847✔
167
  int32_t lino = 0;
170,847✔
168
  for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
1,375,862✔
169
    if (!pFillInfo->pFillCol[colIdx].notFillCol) {
1,205,015✔
170
      SGroupKeys* key = taosArrayGet(pFillInfo->prev.pRowVal, colIdx);
870,915✔
171
      QUERY_CHECK_NULL(key, code, lino, _end, terrno);
870,915✔
172
      key->isNull = true;
870,915✔
173
    }
174
  }
175

176
_end:
170,847✔
177
  if (code != TSDB_CODE_SUCCESS) {
170,847✔
178
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
179
  }
180
  return code;
170,847✔
181
}
182

183
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
2,649,458✔
184
  int32_t            code = TSDB_CODE_SUCCESS;
2,649,458✔
185
  int32_t            lino = 0;
2,649,458✔
186
  SFillOperatorInfo* pInfo = pOperator->info;
2,649,458✔
187
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
2,649,458✔
188
  if (pInfo == NULL || pTaskInfo == NULL) {
2,649,458✔
189
    qError("%s failed at line %d since pInfo or pTaskInfo is NULL.", __func__, __LINE__);
×
190
    return NULL;
×
191
  }
192

193
  SResultInfo* pResultInfo = &pOperator->resultInfo;
2,649,458✔
194
  SSDataBlock* pResBlock = pInfo->pFinalRes;
2,649,458✔
195
  if (pResBlock == NULL) {
2,649,458✔
196
    qError("%s failed at line %d since pResBlock is NULL.", __func__, __LINE__);
×
197
    return NULL;
×
198
  }
199
  blockDataCleanup(pResBlock);
2,649,458✔
200
  int32_t        order = pInfo->pFillInfo->order;
2,649,458✔
201

202
  doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
2,649,458✔
203
  if (pResBlock->info.rows > 0) {
2,649,458✔
204
    pResBlock->info.id.groupId = pInfo->curGroupId;
659,463✔
205
    return pResBlock;
659,463✔
206
  }
207

208
  while (1) {
1,069,378✔
209
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
3,059,373✔
210
    if (pBlock == NULL) {
3,058,882✔
211
      if (pInfo->totalInputRows == 0 &&
1,141,880✔
212
          (pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) {
224,448✔
213
        setOperatorCompleted(pOperator);
162,686✔
214
        return NULL;
162,686✔
215
      } else if (pInfo->totalInputRows == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
979,194✔
216
        reviseFillStartAndEndKey(pInfo, order);
30,881✔
217
      }
218

219
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
979,194✔
220
    } else {
221
      pResBlock->info.scanFlag = pBlock->info.scanFlag;
1,917,002✔
222
      pBlock->info.dataLoad = 1;
1,917,002✔
223
      code = blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
1,917,002✔
224
      QUERY_CHECK_CODE(code, lino, _end);
1,917,002✔
225

226
      blockDataCleanup(pInfo->pRes);
1,917,002✔
227
      code = blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
1,917,002✔
228
      QUERY_CHECK_CODE(code, lino, _end);
1,917,002✔
229
      code = blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows);
1,917,002✔
230
      QUERY_CHECK_CODE(code, lino, _end);
1,917,002✔
231
      doApplyScalarCalculation(pOperator, pBlock, order, pBlock->info.scanFlag);
1,917,002✔
232

233
      if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) {
3,299,062✔
234
        if (pInfo->curGroupId == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
1,382,060✔
235
          reviseFillStartAndEndKey(pInfo, order);
522,982✔
236
        }
237

238
        pInfo->curGroupId = pInfo->pRes->info.id.groupId;  // the first data block
1,382,060✔
239
        pInfo->totalInputRows += pInfo->pRes->info.rows;
1,382,060✔
240

241
        int64_t ts = (order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
1,382,060✔
242
        taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
1,382,060✔
243
        taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
1,382,060✔
244
      } else if (pInfo->curGroupId != pBlock->info.id.groupId) {  // the new group data block
534,942✔
245
        pInfo->existNewGroupBlock = pBlock;
534,942✔
246

247
        // Fill the previous group data block, before handle the data block of new group.
248
        // Close the fill operation for previous group data block
249
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
534,942✔
250
      }
251
    }
252

253
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
2,896,196✔
254
    bool wantMoreBlock = false;
2,896,196✔
255
    code = taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows, &wantMoreBlock);
2,896,196✔
256
    QUERY_CHECK_CODE(code, lino, _end);
2,896,196✔
257

258
    // current group has no more result to return
259
    if (pResBlock->info.rows > 0) {
2,896,196✔
260
      // 1. The result in current group not reach the threshold of output result, continue
261
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
262
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
1,341,970✔
263
        pResBlock->info.id.groupId = pInfo->curGroupId;
1,341,970✔
264
        return pResBlock;
1,341,970✔
265
      }
266

267
      doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
×
268
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
×
269
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
270
        return pResBlock;
×
271
      }
272
    } else if (pInfo->existNewGroupBlock) {  // try next group
1,554,226✔
273
      blockDataCleanup(pResBlock);
×
274

275
      doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order);
×
276
      if (pResBlock->info.rows > pResultInfo->threshold) {
×
277
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
278
        return pResBlock;
×
279
      }
280
    } else {
281
      if (wantMoreBlock) continue;
1,554,226✔
282
      return NULL;
484,848✔
283
    }
284
  }
285

286
_end:
×
287
  if (code != TSDB_CODE_SUCCESS) {
×
288
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
289
    T_LONG_JMP(pTaskInfo->env, code);
×
290
  }
291
  return NULL;
×
292
}
293

294
static int32_t doFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,636,805✔
295
  int32_t            code = TSDB_CODE_SUCCESS;
2,636,805✔
296
  SFillOperatorInfo* pInfo = pOperator->info;
2,636,805✔
297
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
2,636,805✔
298

299
  if (pInfo->pTimeRange != NULL) {
2,636,805✔
300
    STimeWindow pWinRange = {0};
13,080✔
301
    bool        isWinRangeValid = false;
13,080✔
302
    code = streamCalcCurrWinTimeRange((STimeRangeNode*)pInfo->pTimeRange, &pTaskInfo->pStreamRuntimeInfo->funcInfo, &pWinRange,
13,080✔
303
                  &isWinRangeValid, 3);
304
    if (code != TSDB_CODE_SUCCESS || !isWinRangeValid) {
13,080✔
305
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
306
      pTaskInfo->code = code;
×
307
      T_LONG_JMP(pTaskInfo->env, code);
×
308
    }
309

310
    pInfo->win.skey = pWinRange.skey;
13,080✔
311
    /* ekey is inclusive in fill operator, so we need to subtract 1 */
312
    pInfo->win.ekey = pWinRange.ekey - 1;
13,080✔
313
  }
314

315
  if (pOperator->status == OP_EXEC_DONE) {
2,636,805✔
316
    (*ppRes) = NULL;
×
317
    return code;
×
318
  }
319

320
  SSDataBlock* fillResult = NULL;
2,636,805✔
321
  while (true) {
322
    fillResult = doFillImpl(pOperator);
2,649,458✔
323
    if (fillResult == NULL) {
2,648,967✔
324
      setOperatorCompleted(pOperator);
647,534✔
325
      break;
647,534✔
326
    }
327

328
    code = doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo, NULL);
2,001,433✔
329
    if (code != TSDB_CODE_SUCCESS) {
2,001,433✔
330
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
331
      pTaskInfo->code = code;
×
332
      T_LONG_JMP(pTaskInfo->env, code);
×
333
    }
334

335
    if (fillResult->info.rows > 0) {
2,001,433✔
336
      break;
1,988,780✔
337
    }
338
  }
339

340
  (*ppRes) = fillResult;
2,636,314✔
341
  return code;
2,636,314✔
342
}
343

344
void destroyFillOperatorInfo(void* param) {
674,818✔
345
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
674,818✔
346
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
674,818✔
347
  blockDataDestroy(pInfo->pRes);
674,818✔
348
  pInfo->pRes = NULL;
674,818✔
349
  blockDataDestroy(pInfo->pFinalRes);
674,818✔
350
  pInfo->pFinalRes = NULL;
674,818✔
351

352
  cleanupExprSupp(&pInfo->noFillExprSupp);
674,818✔
353
  cleanupExprSupp(&pInfo->fillNullExprSupp);
674,818✔
354

355
  taosMemoryFreeClear(pInfo->p);
674,818✔
356
  taosArrayDestroy(pInfo->matchInfo.pList);
674,818✔
357
  taosMemoryFreeClear(param);
674,818✔
358
}
674,818✔
359

360
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr,
674,818✔
361
                            int32_t numOfCols, SExprInfo* pNotFillExpr,
362
                            int32_t numOfNotFillCols, SExprInfo* pFillNullExpr,
363
                            int32_t numOfFillNullExprs, SNodeListNode* pValNode,
364
                            STimeWindow win, int32_t capacity, const char* id,
365
                            SInterval* pInterval, int32_t fillType,
366
                            int32_t order, SExecTaskInfo* pTaskInfo,
367
                            int64_t surroundingTime, bool indefRowsMode) {
368
  SFillColInfo* pColInfo =
369
      createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols,
674,818✔
370
                        pFillNullExpr, numOfFillNullExprs, pValNode);
371
  if (!pColInfo) {
674,818✔
UNCOV
372
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
UNCOV
373
    return terrno;
×
374
  }
375

376
  int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
674,818✔
377

378
  //  STimeWindow w = {0};
379
  //  getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC);
380
  pInfo->pFillInfo = NULL;
674,818✔
381
  int32_t code = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols,
674,818✔
382
                                    numOfFillNullExprs, capacity, pInterval,
383
                                    fillType, pColInfo, pInfo->primaryTsCol,
384
                                    order, id, pTaskInfo, surroundingTime,
385
                                    indefRowsMode, &pInfo->pFillInfo);
674,818✔
386
  if (code != TSDB_CODE_SUCCESS) {
674,818✔
387
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
388
    return code;
×
389
  }
390

391
  if (order == TSDB_ORDER_ASC) {
674,818✔
392
    pInfo->win.skey = win.skey;
586,207✔
393
    pInfo->win.ekey = win.ekey;
586,207✔
394
  } else {
395
    pInfo->win.skey = win.ekey;
88,611✔
396
    pInfo->win.ekey = win.skey;
88,611✔
397
  }
398
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
674,818✔
399
  if (!pInfo->p) {
674,349✔
400
    return terrno;
×
401
  }
402

403
  if (pInfo->pFillInfo == NULL) {
674,367✔
404
    taosMemoryFree(pInfo->pFillInfo);
×
405
    taosMemoryFree(pInfo->p);
×
406
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
407
  } else {
408
    return TSDB_CODE_SUCCESS;
674,818✔
409
  }
410
}
411

412
static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
674,818✔
413
  if (pInfo->noFillExprSupp.numOfExprs == 0) {
674,818✔
414
    return false;
84,450✔
415
  }
416

417
  for (int32_t i = 0; i < pInfo->noFillExprSupp.numOfExprs; ++i) {
886,316✔
418
    SExprInfo* exprInfo = pInfo->noFillExprSupp.pExprInfo + i;
856,113✔
419
    if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 &&
856,113✔
420
        exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
856,113✔
421
      return true;
560,165✔
422
    }
423
  }
424
  return false;
29,752✔
425
}
426

427
static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiNode* pPhyFillNode, SExprSupp* pExprSupp,
674,818✔
428
                                           const char* idStr) {
429
  bool wstartExist = isWstartColumnExist(pInfo);
674,818✔
430

431
  if (wstartExist == false) {
674,367✔
432
    if (pPhyFillNode->pWStartTs->type != QUERY_NODE_TARGET) {
114,202✔
433
      qError("pWStartTs of fill physical node is not a target node, %s", idStr);
×
434
      return TSDB_CODE_QRY_SYS_ERROR;
×
435
    }
436

437
    SExprInfo* pExpr = taosMemoryRealloc(pExprSupp->pExprInfo, (pExprSupp->numOfExprs + 1) * sizeof(SExprInfo));
114,202✔
438
    if (pExpr == NULL) {
114,202✔
439
      return terrno;
×
440
    }
441

442
    int32_t code = createExprFromTargetNode(&pExpr[pExprSupp->numOfExprs], (STargetNode*)pPhyFillNode->pWStartTs);
114,202✔
443
    if (code != TSDB_CODE_SUCCESS) {
114,202✔
444
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
445
      pExprSupp->numOfExprs += 1;
×
446
      pExprSupp->pExprInfo = pExpr;
×
447
      return code;
×
448
    }
449

450
    pExprSupp->numOfExprs += 1;
114,202✔
451
    pExprSupp->pExprInfo = pExpr;
114,202✔
452
  }
453

454
  return TSDB_CODE_SUCCESS;
674,818✔
455
}
456

457
static int32_t resetFillOperState(SOperatorInfo* pOper) {
6,540✔
458
  SFillOperatorInfo* pFill = pOper->info;
6,540✔
459
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
6,540✔
460
  pOper->status = OP_NOT_OPENED;
6,540✔
461
  SFillPhysiNode* pPhyNode = (SFillPhysiNode*)pOper->pPhyNode;
6,540✔
462

463
  pFill->curGroupId = 0;
6,540✔
464
  pFill->totalInputRows = 0;
6,540✔
465
  blockDataCleanup(pFill->pRes);
6,540✔
466
  blockDataCleanup(pFill->pFinalRes);
6,540✔
467

468
  int64_t startKey = (pFill->pFillInfo->order == TSDB_ORDER_ASC) ? pPhyNode->timeRange.skey : pPhyNode->timeRange.ekey;
6,540✔
469
  pFill->pFillInfo->start = startKey;
6,540✔
470
  pFill->pFillInfo->currentKey = startKey;
6,540✔
471
  pFill->pFillInfo->end = startKey;
6,540✔
472

473
  pFill->pFillInfo->numOfRows = 0;
6,540✔
474
  pFill->pFillInfo->index = -1;
6,540✔
475
  pFill->pFillInfo->numOfTotal = 0;
6,540✔
476
  pFill->pFillInfo->numOfCurrent = 0;
6,540✔
477
  pFill->pFillInfo->isFilled = false;
6,540✔
478
  int32_t size = taosArrayGetSize(pFill->pFillInfo->prev.pRowVal);
6,540✔
479
  for (int32_t i = 0; i < size; ++i) {
39,240✔
480
    SGroupKeys* pKey = taosArrayGet(pFill->pFillInfo->prev.pRowVal, i);
32,700✔
481
    pKey->isNull = true;
32,700✔
482
  }
483
  size = taosArrayGetSize(pFill->pFillInfo->next.pRowVal);
6,540✔
484
  for (int32_t i = 0; i < size; ++i) {
39,240✔
485
    SGroupKeys* pKey = taosArrayGet(pFill->pFillInfo->next.pRowVal, i);
32,700✔
486
    pKey->isNull = true;
32,700✔
487
  }
488

489
  taosMemoryFreeClear(pFill->pFillInfo->pTags);
6,540✔
490
  taosArrayDestroy(pFill->pFillInfo->pColFillProgress);
6,540✔
491
  pFill->pFillInfo->pColFillProgress = NULL;
6,540✔
492

493
  tdListFreeP(pFill->pFillInfo->pFillSavedBlockList, destroyFillBlock);
6,540✔
494
  pFill->pFillInfo->pFillSavedBlockList = NULL;
6,540✔
495

496
  int32_t order = (pPhyNode->node.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
6,540✔
497
  if (order == TSDB_ORDER_ASC) {
6,540✔
498
    pFill->win.skey = pPhyNode->timeRange.skey;
6,540✔
499
    pFill->win.ekey = pPhyNode->timeRange.ekey;
6,540✔
500
  } else {
501
    pFill->win.skey = pPhyNode->timeRange.ekey;
×
502
    pFill->win.ekey = pPhyNode->timeRange.skey;
×
503
  }
504

505
  return TSDB_CODE_SUCCESS;
6,540✔
506
}
507

508
int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
674,818✔
509
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
510
  QRY_PARAM_CHECK(pOptrInfo);
674,818✔
511
  int32_t code = 0;
674,818✔
512
  int32_t lino = 0;
674,818✔
513

514
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
674,818✔
515
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
674,818✔
516
  if (pInfo == NULL || pOperator == NULL) {
674,818✔
UNCOV
517
    code = terrno;
×
518
    goto _error;
×
519
  }
520
  initOperatorCostInfo(pOperator);
674,818✔
521

522
  pOperator->pPhyNode = pPhyFillNode;
674,818✔
523
  pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
674,818✔
524
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
674,818✔
525
  SExprInfo* pExprInfo = NULL;
674,818✔
526

527
  code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pExprInfo, &pInfo->numOfExpr);
674,818✔
528
  QUERY_CHECK_CODE(code, lino, _error);
674,818✔
529

530
  pOperator->exprSupp.pExprInfo = pExprInfo;
674,818✔
531
  pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
674,818✔
532

533
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
674,818✔
534
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->pExprInfo, &pNoFillSupp->numOfExprs);
674,818✔
535
  QUERY_CHECK_CODE(code, lino, _error);
674,818✔
536

537
  code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str);
674,818✔
538
  QUERY_CHECK_CODE(code, lino, _error);
674,818✔
539

540
  code =
541
      initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore);
674,818✔
542
  QUERY_CHECK_CODE(code, lino, _error);
674,818✔
543

544
  code = createExprInfo(pPhyFillNode->pFillNullExprs, NULL, &pInfo->fillNullExprSupp.pExprInfo,
674,818✔
545
                        &pInfo->fillNullExprSupp.numOfExprs);
546
  QUERY_CHECK_CODE(code, lino, _error);
674,818✔
547
  code = initExprSupp(&pInfo->fillNullExprSupp, pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs,
674,818✔
548
                      &pTaskInfo->storageAPI.functionStore);
549
  QUERY_CHECK_CODE(code, lino, _error);
674,818✔
550

551
  SInterval* pInterval =
674,818✔
552
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
674,818✔
553
          ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
196,895✔
554
          : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
871,713✔
555

556
  int32_t order = (pPhyFillNode->node.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
674,818✔
557
  int32_t type = convertFillType(pPhyFillNode->mode);
674,818✔
558

559
  SResultInfo* pResultInfo = &pOperator->resultInfo;
674,818✔
560

561
  initResultSizeInfo(&pOperator->resultInfo, 4096);
674,818✔
562
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
674,818✔
563
  if (code != TSDB_CODE_SUCCESS) {
674,818✔
564
    goto _error;
×
565
  }
566
  code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr, &pTaskInfo->storageAPI.functionStore);
674,818✔
567
  if (code != TSDB_CODE_SUCCESS) {
674,818✔
568
    goto _error;
×
569
  }
570

571
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
674,818✔
572
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
674,818✔
573

574
  int32_t numOfOutputCols = 0;
674,818✔
575
  code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
674,818✔
576
                             COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
577

578
  QUERY_CHECK_CODE(code, lino, _error);
674,818✔
579

580
  // Extract surroundingTime from pSurroundingTime node
581
  int64_t surroundingTime = 0;
674,818✔
582
  if (pPhyFillNode->pSurroundingTime != NULL &&
674,818✔
583
      nodeType(pPhyFillNode->pSurroundingTime) == QUERY_NODE_VALUE) {
9,156✔
584
    surroundingTime = ((SValueNode*)pPhyFillNode->pSurroundingTime)->datum.i;
9,156✔
585
  }
586

587
  code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr,
1,349,636✔
588
                      pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs,
589
                      pInfo->fillNullExprSupp.pExprInfo,
590
                      pInfo->fillNullExprSupp.numOfExprs,
591
                      (SNodeListNode*)pPhyFillNode->pValues,
674,818✔
592
                      pPhyFillNode->timeRange, pResultInfo->capacity,
593
                      pTaskInfo->id.str, pInterval, type, order, pTaskInfo,
674,818✔
594
                      surroundingTime, pPhyFillNode->indefRowsMode);
674,818✔
595
  if (code != TSDB_CODE_SUCCESS) {
674,818✔
596
    goto _error;
×
597
  }
598
  TSWAP(pInfo->pTimeRange, pPhyFillNode->pTimeRange);
674,818✔
599
  pInfo->pFinalRes = NULL;
673,898✔
600

601
  code = createOneDataBlock(pInfo->pRes, false, &pInfo->pFinalRes);
673,898✔
602
  if (code) {
674,818✔
603
    goto _error;
×
604
  }
605

606
  code = blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);
674,818✔
607
  if (code != TSDB_CODE_SUCCESS) {
674,818✔
608
    goto _error;
×
609
  }
610

611
  code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
674,818✔
612
                            pTaskInfo->pStreamRuntimeInfo);
674,818✔
613
  if (code != TSDB_CODE_SUCCESS) {
674,818✔
614
    goto _error;
×
615
  }
616
  filterSetExecContext(pOperator->exprSupp.pFilterInfo, pTaskInfo, isTaskKilled);
674,818✔
617
  setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
674,818✔
618
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFillNext, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL,
674,818✔
619
                                         optrDefaultGetNextExtFn, NULL);
620
  setOperatorResetStateFn(pOperator, resetFillOperState);
674,367✔
621

622
  code = appendDownstream(pOperator, &downstream, 1);
674,367✔
623
  if (code != TSDB_CODE_SUCCESS) {
674,818✔
624
    goto _error;
×
625
  }
626

627
  *pOptrInfo = pOperator;
674,818✔
628
  return TSDB_CODE_SUCCESS;
674,818✔
629

630
_error:
×
631
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
632

633
  if (pInfo != NULL) {
×
634
    destroyFillOperatorInfo(pInfo);
×
635
  }
636
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
637
  pTaskInfo->code = code;
×
638
  return code;
×
639
}
640

641
static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) {
1,088,805✔
642
  int64_t skey, ekey, next;
643
  if (order == TSDB_ORDER_ASC) {
1,088,805✔
644
    skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval);
1,032,250✔
645
    taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
1,032,250✔
646

647
    ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval);
1,032,250✔
648
    next = ekey;
1,032,250✔
649
    while (next < pInfo->win.ekey) {
1,347,356✔
650
      next = taosTimeAdd(ekey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit,
315,106✔
651
                         pInfo->pFillInfo->interval.precision, NULL);
315,106✔
652
      if (next == ekey) break;
315,106✔
653
      ekey = next > pInfo->win.ekey ? ekey : next;
315,106✔
654
    }
655
    pInfo->win.ekey = ekey;
1,032,250✔
656
  } else {
657
    skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval);
56,555✔
658
    next = skey;
56,555✔
659
    while (next < pInfo->win.skey) {
105,054✔
660
      next = taosTimeAdd(skey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit,
48,499✔
661
                         pInfo->pFillInfo->interval.precision, NULL);
48,499✔
662
      if (next == skey) break;
48,499✔
663
      skey = next > pInfo->win.skey ? skey : next;
48,499✔
664
    }
665
    taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
56,555✔
666
    pInfo->win.ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval);
56,555✔
667
  }
668
}
1,088,805✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc