• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3829

28 Mar 2025 06:51AM UTC coverage: 63.162% (+0.5%) from 62.668%
#3829

push

travis-ci

web-flow
fix(tdb): disable page recycling (#30529)

155078 of 313582 branches covered (49.45%)

Branch coverage included in aggregate %.

240930 of 313390 relevant lines covered (76.88%)

20393545.98 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

70.42
/source/libs/executor/src/filloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "os.h"
18
#include "query.h"
19
#include "taosdef.h"
20
#include "tmsg.h"
21
#include "ttypes.h"
22

23
#include "executorInt.h"
24
#include "tcommon.h"
25
#include "thash.h"
26
#include "ttime.h"
27

28
#include "function.h"
29
#include "operator.h"
30
#include "querynodes.h"
31
#include "querytask.h"
32
#include "tdatablock.h"
33
#include "tfill.h"
34

35
typedef struct STimeRange {
36
  TSKEY    skey;
37
  TSKEY    ekey;
38
  uint64_t groupId;
39
} STimeRange;
40

41
typedef struct SFillOperatorInfo {
42
  struct SFillInfo* pFillInfo;
43
  SSDataBlock*      pRes;
44
  SSDataBlock*      pFinalRes;
45
  int64_t           totalInputRows;
46
  void**            p;
47
  SSDataBlock*      existNewGroupBlock;
48
  STimeWindow       win;
49
  SColMatchInfo     matchInfo;
50
  int32_t           primaryTsCol;
51
  int32_t           primarySrcSlotId;
52
  uint64_t          curGroupId;  // current handled group id
53
  SExprInfo*        pExprInfo;
54
  int32_t           numOfExpr;
55
  SExprSupp         noFillExprSupp;
56
  SExprSupp         fillNullExprSupp;
57
} SFillOperatorInfo;
58

59
static void destroyFillOperatorInfo(void* param);
60
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
61
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo);
62
static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order);
63

64
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
5,195✔
65
                                               SResultInfo* pResultInfo, int32_t order) {
66
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
5,195✔
67
  SSDataBlock*   pResBlock = pInfo->pFinalRes;
5,195✔
68
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
5,195✔
69

70
  //  int32_t order = TSDB_ORDER_ASC;
71
  int32_t scanFlag = MAIN_SCAN;
5,195✔
72
  //  getTableScanInfo(pOperator, &order, &scanFlag, false);
73
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
5,195✔
74

75
  blockDataCleanup(pInfo->pRes);
5,195✔
76
  doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);
5,195✔
77

78
  reviseFillStartAndEndKey(pOperator->info, order);
5,195✔
79

80
  int64_t ts = (order == TSDB_ORDER_ASC) ? pInfo->existNewGroupBlock->info.window.ekey
10,390✔
81
                                         : pInfo->existNewGroupBlock->info.window.skey;
5,195!
82
  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
5,195✔
83

84
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
5,195✔
85
  if (pInfo->pFillInfo->type == TSDB_FILL_PREV || pInfo->pFillInfo->type == TSDB_FILL_LINEAR) {
5,195✔
86
    int32_t code = fillResetPrevForNewGroup(pInfo->pFillInfo);
144✔
87
    if (code != TSDB_CODE_SUCCESS) {
144!
88
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
89
      T_LONG_JMP(pTaskInfo->env, code);
×
90
    }
91
  }
92

93
  int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
5,195✔
94
  int32_t code = taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
5,195✔
95
  if (code != TSDB_CODE_SUCCESS) {
5,195!
96
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
97
    T_LONG_JMP(pTaskInfo->env, code);
×
98
  }
99

100
  pInfo->curGroupId = pInfo->existNewGroupBlock->info.id.groupId;
5,195✔
101
  pInfo->existNewGroupBlock = NULL;
5,195✔
102
}
5,195✔
103

104
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
1,535,045✔
105
                                            SResultInfo* pResultInfo, int32_t order) {
106
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,535,045✔
107
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
1,535,045✔
108
    int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
408,629✔
109
    int32_t code = taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
408,629✔
110
    if (code != TSDB_CODE_SUCCESS) {
408,631✔
111
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1!
112
      T_LONG_JMP(pTaskInfo->env, code);
1!
113
    }
114
    pInfo->pRes->info.id.groupId = pInfo->curGroupId;
408,630✔
115
    return;
408,630✔
116
  }
117

118
  // handle the cached new group data block
119
  if (pInfo->existNewGroupBlock) {
1,126,369✔
120
    doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order);
4,431✔
121
  }
122
}
123

124
void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
234,316✔
125
  int32_t            code = TSDB_CODE_SUCCESS;
234,316✔
126
  int32_t            lino = 0;
234,316✔
127
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
234,316✔
128
  SFillOperatorInfo* pInfo = pOperator->info;
234,316✔
129
  SExprSupp*         pSup = &pOperator->exprSupp;
234,316✔
130
  code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
234,316✔
131
  QUERY_CHECK_CODE(code, lino, _end);
234,316!
132
  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
234,316✔
133
  QUERY_CHECK_CODE(code, lino, _end);
234,317!
134

135
  // reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
136
  pInfo->pRes->info.rows = 0;
234,317✔
137
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
234,317✔
138
  code = setInputDataBlock(pNoFillSupp, pBlock, order, scanFlag, false);
234,317✔
139
  QUERY_CHECK_CODE(code, lino, _end);
234,316!
140

141
  code = projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs,
234,316✔
142
                               NULL);
143
  QUERY_CHECK_CODE(code, lino, _end);
234,316!
144

145
  if (pInfo->fillNullExprSupp.pExprInfo) {
234,316✔
146
    pInfo->pRes->info.rows = 0;
80,588✔
147
    code = setInputDataBlock(&pInfo->fillNullExprSupp, pBlock, order, scanFlag, false);
80,588✔
148
    QUERY_CHECK_CODE(code, lino, _end);
80,589!
149
    code = projectApplyFunctions(pInfo->fillNullExprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->fillNullExprSupp.pCtx,
80,589✔
150
        pInfo->fillNullExprSupp.numOfExprs, NULL);
151
  }
152

153
  pInfo->pRes->info.id.groupId = pBlock->info.id.groupId;
234,316✔
154

155
_end:
234,316✔
156
  if (code != TSDB_CODE_SUCCESS) {
234,316!
157
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
158
    T_LONG_JMP(pTaskInfo->env, code);
×
159
  }
160
}
234,316✔
161

162
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo) {
144✔
163
  int32_t code = TSDB_CODE_SUCCESS;
144✔
164
  int32_t lino = 0;
144✔
165
  for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
1,063✔
166
    if (!pFillInfo->pFillCol[colIdx].notFillCol) {
919✔
167
      SGroupKeys* key = taosArrayGet(pFillInfo->prev.pRowVal, colIdx);
651✔
168
      QUERY_CHECK_NULL(key, code, lino, _end, terrno);
651!
169
      key->isNull = true;
651✔
170
    }
171
  }
172

173
_end:
144✔
174
  if (code != TSDB_CODE_SUCCESS) {
144!
175
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
176
  }
177
  return code;
144✔
178
}
179

180
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
1,365,180✔
181
  int32_t            code = TSDB_CODE_SUCCESS;
1,365,180✔
182
  int32_t            lino = 0;
1,365,180✔
183
  SFillOperatorInfo* pInfo = pOperator->info;
1,365,180✔
184
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
1,365,180✔
185
  if (pInfo == NULL || pTaskInfo == NULL) {
1,365,180!
186
    qError("%s failed at line %d since pInfo or pTaskInfo is NULL.", __func__, __LINE__);
×
187
    return NULL;
×
188
  }
189

190
  SResultInfo* pResultInfo = &pOperator->resultInfo;
1,365,182✔
191
  SSDataBlock* pResBlock = pInfo->pFinalRes;
1,365,182✔
192
  if (pResBlock == NULL) {
1,365,182!
193
    qError("%s failed at line %d since pResBlock is NULL.", __func__, __LINE__);
×
194
    return NULL;
×
195
  }
196

197
  blockDataCleanup(pResBlock);
1,365,182✔
198

199
  int32_t order = pInfo->pFillInfo->order;
1,365,206✔
200

201
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
1,365,206✔
202
#if 0
203
  // the scan order may be different from the output result order for agg interval operator.
204
  if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL) {
205
    order = ((SIntervalAggOperatorInfo*) pDownstream->info)->resultTsOrder;
206
  } else {
207
    order = pInfo->pFillInfo->order;
208
  }
209
#endif
210

211
  doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
1,365,206✔
212
  if (pResBlock->info.rows > 0) {
1,365,170✔
213
    pResBlock->info.id.groupId = pInfo->curGroupId;
413,061✔
214
    return pResBlock;
413,061✔
215
  }
216

217
  while (1) {
170,606✔
218
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,122,715✔
219
    if (pBlock == NULL) {
1,122,753✔
220
      if (pInfo->totalInputRows == 0 &&
893,636✔
221
          (pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) {
475,588✔
222
        setOperatorCompleted(pOperator);
56,079✔
223
        return NULL;
56,082✔
224
      } else if (pInfo->totalInputRows == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
837,557✔
225
        reviseFillStartAndEndKey(pInfo, order);
211,034✔
226
      }
227

228
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
837,558✔
229
    } else {
230
      pResBlock->info.scanFlag = pBlock->info.scanFlag;
229,117✔
231
      pBlock->info.dataLoad = 1;
229,117✔
232
      code = blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
229,117✔
233
      QUERY_CHECK_CODE(code, lino, _end);
229,121!
234

235
      blockDataCleanup(pInfo->pRes);
229,121✔
236
      code = blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
229,122✔
237
      QUERY_CHECK_CODE(code, lino, _end);
229,122!
238
      code = blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows);
229,122✔
239
      QUERY_CHECK_CODE(code, lino, _end);
229,122!
240
      doApplyScalarCalculation(pOperator, pBlock, order, pBlock->info.scanFlag);
229,122✔
241

242
      if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) {
453,045✔
243
        if (pInfo->curGroupId == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
223,926✔
244
          reviseFillStartAndEndKey(pInfo, order);
211,274✔
245
        }
246

247
        pInfo->curGroupId = pInfo->pRes->info.id.groupId;  // the first data block
223,926✔
248
        pInfo->totalInputRows += pInfo->pRes->info.rows;
223,926✔
249

250
        int64_t ts = (order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
223,926✔
251
        taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
223,926✔
252
        taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
223,924✔
253
      } else if (pInfo->curGroupId != pBlock->info.id.groupId) {  // the new group data block
5,195!
254
        pInfo->existNewGroupBlock = pBlock;
5,195✔
255

256
        // Fill the previous group data block, before handle the data block of new group.
257
        // Close the fill operation for previous group data block
258
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
5,195✔
259
        pInfo->pFillInfo->prev.key = 0;
5,195✔
260
      }
261
    }
262

263
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
1,066,679✔
264
    code = taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
1,066,679✔
265
    QUERY_CHECK_CODE(code, lino, _end);
1,066,676✔
266

267
    // current group has no more result to return
268
    if (pResBlock->info.rows > 0) {
1,066,675✔
269
      // 1. The result in current group not reach the threshold of output result, continue
270
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
271
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
648,857✔
272
        pResBlock->info.id.groupId = pInfo->curGroupId;
479,015✔
273
        return pResBlock;
479,015✔
274
      }
275

276
      doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
169,842✔
277
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
169,843!
278
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
279
        return pResBlock;
×
280
      }
281
    } else if (pInfo->existNewGroupBlock) {  // try next group
417,818✔
282
      blockDataCleanup(pResBlock);
764✔
283

284
      doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order);
764✔
285
      if (pResBlock->info.rows > pResultInfo->threshold) {
763!
286
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
287
        return pResBlock;
×
288
      }
289
    } else {
290
      return NULL;
417,054✔
291
    }
292
  }
293

294
_end:
1✔
295
  if (code != TSDB_CODE_SUCCESS) {
1!
296
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
1!
297
    T_LONG_JMP(pTaskInfo->env, code);
1!
298
  }
299
  return NULL;
×
300
}
301

302
static int32_t doFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,365,145✔
303
  int32_t            code = TSDB_CODE_SUCCESS;
1,365,145✔
304
  SFillOperatorInfo* pInfo = pOperator->info;
1,365,145✔
305
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
1,365,145✔
306

307
  if (pOperator->status == OP_EXEC_DONE) {
1,365,145!
308
    (*ppRes) = NULL;
×
309
    return code;
×
310
  }
311

312
  SSDataBlock* fillResult = NULL;
1,365,145✔
313
  while (true) {
314
    fillResult = doFillImpl(pOperator);
1,365,183✔
315
    if (fillResult == NULL) {
1,365,208✔
316
      setOperatorCompleted(pOperator);
473,139✔
317
      break;
473,142✔
318
    }
319

320
    code = doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
892,069✔
321
    if (code != TSDB_CODE_SUCCESS) {
892,076!
322
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
323
      pTaskInfo->code = code;
×
324
      T_LONG_JMP(pTaskInfo->env, code);
×
325
    }
326
    if (fillResult->info.rows > 0) {
892,076✔
327
      break;
892,038✔
328
    }
329
  }
330

331
  if (fillResult != NULL) {
1,365,180✔
332
    pOperator->resultInfo.totalRows += fillResult->info.rows;
892,037✔
333
  }
334

335
  (*ppRes) = fillResult;
1,365,180✔
336
  return code;
1,365,180✔
337
}
338

339
void destroyFillOperatorInfo(void* param) {
478,389✔
340
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
478,389✔
341
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
478,389✔
342
  blockDataDestroy(pInfo->pRes);
478,389✔
343
  pInfo->pRes = NULL;
478,393✔
344
  blockDataDestroy(pInfo->pFinalRes);
478,393✔
345
  pInfo->pFinalRes = NULL;
478,394✔
346

347
  cleanupExprSupp(&pInfo->noFillExprSupp);
478,394✔
348
  cleanupExprSupp(&pInfo->fillNullExprSupp);
478,390✔
349

350
  taosMemoryFreeClear(pInfo->p);
478,390!
351
  taosArrayDestroy(pInfo->matchInfo.pList);
478,391✔
352
  taosMemoryFreeClear(param);
478,394!
353
}
478,397✔
354

355
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
478,382✔
356
                            int32_t numOfNotFillCols, SExprInfo* pFillNullExpr, int32_t numOfFillNullExprs,
357
                            SNodeListNode* pValNode, STimeWindow win, int32_t capacity, const char* id,
358
                            SInterval* pInterval, int32_t fillType, int32_t order, SExecTaskInfo* pTaskInfo) {
359
  SFillColInfo* pColInfo =
360
      createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pFillNullExpr, numOfFillNullExprs, pValNode);
478,382✔
361
  if (!pColInfo) {
478,378!
362
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
363
    return terrno;
×
364
  }
365

366
  int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
478,379✔
367

368
  //  STimeWindow w = {0};
369
  //  getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC);
370
  pInfo->pFillInfo = NULL;
478,379✔
371
  int32_t code = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, numOfFillNullExprs, capacity, pInterval,
478,379✔
372
                                    fillType, pColInfo, pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo);
478,379✔
373
  if (code != TSDB_CODE_SUCCESS) {
478,375!
374
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
375
    return code;
×
376
  }
377

378
  if (order == TSDB_ORDER_ASC) {
478,381✔
379
    pInfo->win.skey = win.skey;
444,016✔
380
    pInfo->win.ekey = win.ekey;
444,016✔
381
  } else {
382
    pInfo->win.skey = win.ekey;
34,365✔
383
    pInfo->win.ekey = win.skey;
34,365✔
384
  }
385
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
478,381!
386
  if (!pInfo->p) {
478,378!
387
    return terrno;
×
388
  }
389

390
  if (pInfo->pFillInfo == NULL) {
478,378!
391
    taosMemoryFree(pInfo->pFillInfo);
×
392
    taosMemoryFree(pInfo->p);
×
393
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
394
  } else {
395
    return TSDB_CODE_SUCCESS;
478,378✔
396
  }
397
}
398

399
static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
478,379✔
400
  if (pInfo->noFillExprSupp.numOfExprs == 0) {
478,379✔
401
    return false;
381,625✔
402
  }
403

404
  for (int32_t i = 0; i < pInfo->noFillExprSupp.numOfExprs; ++i) {
104,113✔
405
    SExprInfo* exprInfo = pInfo->noFillExprSupp.pExprInfo + i;
104,055✔
406
    if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 &&
104,055!
407
        exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
104,053✔
408
      return true;
96,696✔
409
    }
410
  }
411
  return false;
58✔
412
}
413

414
static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiNode* pPhyFillNode, SExprSupp* pExprSupp,
478,381✔
415
                                           const char* idStr) {
416
  bool wstartExist = isWstartColumnExist(pInfo);
478,381✔
417

418
  if (wstartExist == false) {
478,373✔
419
    if (pPhyFillNode->pWStartTs->type != QUERY_NODE_TARGET) {
381,668!
420
      qError("pWStartTs of fill physical node is not a target node, %s", idStr);
×
421
      return TSDB_CODE_QRY_SYS_ERROR;
×
422
    }
423

424
    SExprInfo* pExpr = taosMemoryRealloc(pExprSupp->pExprInfo, (pExprSupp->numOfExprs + 1) * sizeof(SExprInfo));
381,668!
425
    if (pExpr == NULL) {
381,679!
426
      return terrno;
×
427
    }
428

429
    int32_t code = createExprFromTargetNode(&pExpr[pExprSupp->numOfExprs], (STargetNode*)pPhyFillNode->pWStartTs);
381,679✔
430
    if (code != TSDB_CODE_SUCCESS) {
381,693✔
431
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1!
432
      pExprSupp->numOfExprs += 1;
×
433
      pExprSupp->pExprInfo = pExpr;
×
434
      return code;
×
435
    }
436

437
    pExprSupp->numOfExprs += 1;
381,692✔
438
    pExprSupp->pExprInfo = pExpr;
381,692✔
439
  }
440

441
  return TSDB_CODE_SUCCESS;
478,397✔
442
}
443

444
int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
478,353✔
445
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
446
  QRY_PARAM_CHECK(pOptrInfo);
478,353!
447
  int32_t code = 0;
478,353✔
448
  int32_t lino = 0;
478,353✔
449

450
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
478,353!
451
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
478,355!
452
  if (pInfo == NULL || pOperator == NULL) {
478,369!
453
    code = terrno;
×
454
    goto _error;
×
455
  }
456

457
  pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
478,370✔
458
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
478,385!
459
  SExprInfo* pExprInfo = NULL;
478,385✔
460

461
  code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pExprInfo, &pInfo->numOfExpr);
478,385✔
462
  QUERY_CHECK_CODE(code, lino, _error);
478,384!
463

464
  pOperator->exprSupp.pExprInfo = pExprInfo;
478,384✔
465
  pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
478,384✔
466

467
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
478,384✔
468
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->pExprInfo, &pNoFillSupp->numOfExprs);
478,384✔
469
  QUERY_CHECK_CODE(code, lino, _error);
478,389!
470

471
  code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str);
478,389✔
472
  QUERY_CHECK_CODE(code, lino, _error);
478,374!
473

474
  code =
475
      initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore);
478,374✔
476
  QUERY_CHECK_CODE(code, lino, _error);
478,380!
477

478
  code = createExprInfo(pPhyFillNode->pFillNullExprs, NULL, &pInfo->fillNullExprSupp.pExprInfo,
478,380✔
479
                        &pInfo->fillNullExprSupp.numOfExprs);
480
  QUERY_CHECK_CODE(code, lino, _error);
478,386!
481
  code = initExprSupp(&pInfo->fillNullExprSupp, pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs,
478,386✔
482
                      &pTaskInfo->storageAPI.functionStore);
483
  QUERY_CHECK_CODE(code, lino, _error);
478,381!
484

485
  SInterval* pInterval =
478,381✔
486
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
478,381✔
487
          ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
306,001✔
488
          : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
478,381✔
489

490
  int32_t order = (pPhyFillNode->node.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
478,381✔
491
  int32_t type = convertFillType(pPhyFillNode->mode);
478,381✔
492

493
  SResultInfo* pResultInfo = &pOperator->resultInfo;
478,370✔
494

495
  initResultSizeInfo(&pOperator->resultInfo, 4096);
478,370✔
496
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
478,369✔
497
  if (code != TSDB_CODE_SUCCESS) {
478,393!
498
    goto _error;
×
499
  }
500
  code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr, &pTaskInfo->storageAPI.functionStore);
478,393✔
501
  if (code != TSDB_CODE_SUCCESS) {
478,386!
502
    goto _error;
×
503
  }
504

505
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
478,386✔
506
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
478,386✔
507

508
  int32_t numOfOutputCols = 0;
478,386✔
509
  code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
478,386✔
510
                             COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
511

512
  QUERY_CHECK_CODE(code, lino, _error);
478,384!
513
  code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs,
478,384✔
514
                      pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs,
515
                      (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
478,384✔
516
                      pTaskInfo->id.str, pInterval, type, order, pTaskInfo);
478,384✔
517
  if (code != TSDB_CODE_SUCCESS) {
478,379!
518
    goto _error;
×
519
  }
520

521
  pInfo->pFinalRes = NULL;
478,379✔
522

523
  code = createOneDataBlock(pInfo->pRes, false, &pInfo->pFinalRes);
478,379✔
524
  if (code) {
478,379!
525
    goto _error;
×
526
  }
527

528
  code = blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);
478,379✔
529
  if (code != TSDB_CODE_SUCCESS) {
478,387!
530
    goto _error;
×
531
  }
532

533
  code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
478,387✔
534
  if (code != TSDB_CODE_SUCCESS) {
478,388!
535
    goto _error;
×
536
  }
537

538
  setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
478,388✔
539
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFillNext, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL,
478,389✔
540
                                         optrDefaultGetNextExtFn, NULL);
541

542
  code = appendDownstream(pOperator, &downstream, 1);
478,382✔
543
  if (code != TSDB_CODE_SUCCESS) {
478,389!
544
    goto _error;
×
545
  }
546

547
  *pOptrInfo = pOperator;
478,389✔
548
  return TSDB_CODE_SUCCESS;
478,389✔
549

550
_error:
×
551
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
552

553
  if (pInfo != NULL) {
×
554
    destroyFillOperatorInfo(pInfo);
×
555
  }
556
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
557
  pTaskInfo->code = code;
×
558
  return code;
×
559
}
560

561
static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) {
427,501✔
562
  int64_t skey, ekey, next;
563
  if (order == TSDB_ORDER_ASC) {
427,501✔
564
    skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval);
394,847✔
565
    taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
394,844✔
566

567
    ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval);
394,844✔
568
    next = ekey;
394,847✔
569
    while (next < pInfo->win.ekey) {
1,644,422✔
570
      next = taosTimeAdd(ekey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit,
1,249,576✔
571
                         pInfo->pFillInfo->interval.precision, NULL);
1,249,576✔
572
      ekey = next > pInfo->win.ekey ? ekey : next;
1,249,575✔
573
    }
574
    pInfo->win.ekey = ekey;
394,846✔
575
  } else {
576
    skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval);
32,654✔
577
    next = skey;
32,656✔
578
    while (next < pInfo->win.skey) {
135,021✔
579
      next = taosTimeAdd(skey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit,
102,365✔
580
                         pInfo->pFillInfo->interval.precision, NULL);
102,365✔
581
      skey = next > pInfo->win.skey ? skey : next;
102,365✔
582
    }
583
    taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
32,656✔
584
    pInfo->win.ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval);
32,656✔
585
  }
586
}
427,502✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc