• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3528

13 Nov 2024 02:14AM UTC coverage: 60.905% (+0.09%) from 60.819%
#3528

push

travis-ci

web-flow
Merge pull request #28748 from taosdata/test/chr-3.0-TD14758

test:add docs ci in jenkinsfile2

118800 of 249004 branches covered (47.71%)

Branch coverage included in aggregate %.

199361 of 273386 relevant lines covered (72.92%)

14738389.65 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

73.17
/source/libs/executor/src/filloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "os.h"
18
#include "query.h"
19
#include "taosdef.h"
20
#include "tmsg.h"
21
#include "ttypes.h"
22

23
#include "executorInt.h"
24
#include "tcommon.h"
25
#include "thash.h"
26
#include "ttime.h"
27

28
#include "function.h"
29
#include "operator.h"
30
#include "querynodes.h"
31
#include "querytask.h"
32
#include "tdatablock.h"
33
#include "tfill.h"
34

35
typedef struct STimeRange {
36
  TSKEY    skey;
37
  TSKEY    ekey;
38
  uint64_t groupId;
39
} STimeRange;
40

41
typedef struct SFillOperatorInfo {
42
  struct SFillInfo* pFillInfo;
43
  SSDataBlock*      pRes;
44
  SSDataBlock*      pFinalRes;
45
  int64_t           totalInputRows;
46
  void**            p;
47
  SSDataBlock*      existNewGroupBlock;
48
  STimeWindow       win;
49
  SColMatchInfo     matchInfo;
50
  int32_t           primaryTsCol;
51
  int32_t           primarySrcSlotId;
52
  uint64_t          curGroupId;  // current handled group id
53
  SExprInfo*        pExprInfo;
54
  int32_t           numOfExpr;
55
  SExprSupp         noFillExprSupp;
56
  SExprSupp         fillNullExprSupp;
57
} SFillOperatorInfo;
58

59
static void destroyFillOperatorInfo(void* param);
60
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
61
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo);
62
static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order);
63

64
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
6,743✔
65
                                               SResultInfo* pResultInfo, int32_t order) {
66
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
6,743✔
67
  SSDataBlock*   pResBlock = pInfo->pFinalRes;
6,743✔
68
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
6,743✔
69

70
  //  int32_t order = TSDB_ORDER_ASC;
71
  int32_t scanFlag = MAIN_SCAN;
6,743✔
72
  //  getTableScanInfo(pOperator, &order, &scanFlag, false);
73
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
6,743✔
74

75
  blockDataCleanup(pInfo->pRes);
6,743✔
76
  doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);
6,743✔
77

78
  reviseFillStartAndEndKey(pOperator->info, order);
6,743✔
79

80
  int64_t ts = (order == TSDB_ORDER_ASC) ? pInfo->existNewGroupBlock->info.window.ekey
13,486✔
81
                                         : pInfo->existNewGroupBlock->info.window.skey;
6,743!
82
  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
6,743✔
83

84
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
6,743✔
85
  if (pInfo->pFillInfo->type == TSDB_FILL_PREV || pInfo->pFillInfo->type == TSDB_FILL_LINEAR) {
6,743✔
86
    int32_t code = fillResetPrevForNewGroup(pInfo->pFillInfo);
104✔
87
    if (code != TSDB_CODE_SUCCESS) {
104!
88
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
89
      T_LONG_JMP(pTaskInfo->env, code);
×
90
    }
91
  }
92

93
  int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
6,743✔
94
  int32_t code = taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
6,743✔
95
  if (code != TSDB_CODE_SUCCESS) {
6,743!
96
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
97
    T_LONG_JMP(pTaskInfo->env, code);
×
98
  }
99

100
  pInfo->curGroupId = pInfo->existNewGroupBlock->info.id.groupId;
6,743✔
101
  pInfo->existNewGroupBlock = NULL;
6,743✔
102
}
6,743✔
103

104
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
1,587,155✔
105
                                            SResultInfo* pResultInfo, int32_t order) {
106
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
1,587,155✔
107
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
1,587,155✔
108
    int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
441,782✔
109
    int32_t code = taosFillResultDataBlock(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows);
441,782✔
110
    if (code != TSDB_CODE_SUCCESS) {
441,787✔
111
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1!
112
      T_LONG_JMP(pTaskInfo->env, code);
1!
113
    }
114
    pInfo->pRes->info.id.groupId = pInfo->curGroupId;
441,786✔
115
    return;
441,786✔
116
  }
117

118
  // handle the cached new group data block
119
  if (pInfo->existNewGroupBlock) {
1,145,356✔
120
    doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order);
5,839✔
121
  }
122
}
123

124
void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
243,229✔
125
  int32_t            code = TSDB_CODE_SUCCESS;
243,229✔
126
  int32_t            lino = 0;
243,229✔
127
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
243,229✔
128
  SFillOperatorInfo* pInfo = pOperator->info;
243,229✔
129
  SExprSupp*         pSup = &pOperator->exprSupp;
243,229✔
130
  code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
243,229✔
131
  QUERY_CHECK_CODE(code, lino, _end);
243,226!
132
  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
243,226✔
133
  QUERY_CHECK_CODE(code, lino, _end);
243,229!
134

135
  // reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
136
  pInfo->pRes->info.rows = 0;
243,229✔
137
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
243,229✔
138
  code = setInputDataBlock(pNoFillSupp, pBlock, order, scanFlag, false);
243,229✔
139
  QUERY_CHECK_CODE(code, lino, _end);
243,227!
140

141
  code = projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs,
243,227✔
142
                               NULL);
143
  QUERY_CHECK_CODE(code, lino, _end);
243,227!
144

145
  if (pInfo->fillNullExprSupp.pExprInfo) {
243,227✔
146
    pInfo->pRes->info.rows = 0;
80,259✔
147
    code = setInputDataBlock(&pInfo->fillNullExprSupp, pBlock, order, scanFlag, false);
80,259✔
148
    QUERY_CHECK_CODE(code, lino, _end);
80,259!
149
    code = projectApplyFunctions(pInfo->fillNullExprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->fillNullExprSupp.pCtx,
80,259✔
150
        pInfo->fillNullExprSupp.numOfExprs, NULL);
151
  }
152

153
  pInfo->pRes->info.id.groupId = pBlock->info.id.groupId;
243,229✔
154

155
_end:
243,229✔
156
  if (code != TSDB_CODE_SUCCESS) {
243,229!
157
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
158
    T_LONG_JMP(pTaskInfo->env, code);
×
159
  }
160
}
243,229✔
161

162
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo) {
104✔
163
  int32_t code = TSDB_CODE_SUCCESS;
104✔
164
  int32_t lino = 0;
104✔
165
  for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
863✔
166
    if (!pFillInfo->pFillCol[colIdx].notFillCol) {
759✔
167
      SGroupKeys* key = taosArrayGet(pFillInfo->prev.pRowVal, colIdx);
571✔
168
      QUERY_CHECK_NULL(key, code, lino, _end, terrno);
571!
169
      key->isNull = true;
571✔
170
    }
171
  }
172

173
_end:
104✔
174
  if (code != TSDB_CODE_SUCCESS) {
104!
175
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
176
  }
177
  return code;
104✔
178
}
179

180
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
1,412,821✔
181
  int32_t            code = TSDB_CODE_SUCCESS;
1,412,821✔
182
  int32_t            lino = 0;
1,412,821✔
183
  SFillOperatorInfo* pInfo = pOperator->info;
1,412,821✔
184
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
1,412,821✔
185

186
  SResultInfo* pResultInfo = &pOperator->resultInfo;
1,412,821✔
187
  SSDataBlock* pResBlock = pInfo->pFinalRes;
1,412,821✔
188

189
  blockDataCleanup(pResBlock);
1,412,821✔
190

191
  int32_t order = pInfo->pFillInfo->order;
1,412,826✔
192

193
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
1,412,826✔
194
#if 0
195
  // the scan order may be different from the output result order for agg interval operator.
196
  if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL) {
197
    order = ((SIntervalAggOperatorInfo*) pDownstream->info)->resultTsOrder;
198
  } else {
199
    order = pInfo->pFillInfo->order;
200
  }
201
#endif
202

203
  doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
1,412,826✔
204
  if (pResBlock->info.rows > 0) {
1,412,825✔
205
    pResBlock->info.id.groupId = pInfo->curGroupId;
447,625✔
206
    return pResBlock;
447,625✔
207
  }
208

209
  while (1) {
175,194✔
210
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,140,394✔
211
    if (pBlock == NULL) {
1,140,386✔
212
      if (pInfo->totalInputRows == 0 &&
903,911✔
213
          (pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) {
486,847✔
214
        setOperatorCompleted(pOperator);
62,822✔
215
        return NULL;
62,822✔
216
      } else if (pInfo->totalInputRows == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
841,089✔
217
        reviseFillStartAndEndKey(pInfo, order);
213,249✔
218
      }
219

220
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
841,089✔
221
    } else {
222
      pResBlock->info.scanFlag = pBlock->info.scanFlag;
236,475✔
223
      pBlock->info.dataLoad = 1;
236,475✔
224
      code = blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
236,475✔
225
      QUERY_CHECK_CODE(code, lino, _end);
236,484!
226

227
      blockDataCleanup(pInfo->pRes);
236,484✔
228
      code = blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
236,485✔
229
      QUERY_CHECK_CODE(code, lino, _end);
236,487!
230
      code = blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows);
236,487✔
231
      QUERY_CHECK_CODE(code, lino, _end);
236,487!
232
      doApplyScalarCalculation(pOperator, pBlock, order, pBlock->info.scanFlag);
236,487✔
233

234
      if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) {
466,231✔
235
        if (pInfo->curGroupId == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
229,743✔
236
          reviseFillStartAndEndKey(pInfo, order);
211,057✔
237
        }
238

239
        pInfo->curGroupId = pInfo->pRes->info.id.groupId;  // the first data block
229,745✔
240
        pInfo->totalInputRows += pInfo->pRes->info.rows;
229,745✔
241

242
        int64_t ts = (order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
229,745✔
243
        taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
229,745✔
244
        taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
229,745✔
245
      } else if (pInfo->curGroupId != pBlock->info.id.groupId) {  // the new group data block
6,743!
246
        pInfo->existNewGroupBlock = pBlock;
6,743✔
247

248
        // Fill the previous group data block, before handle the data block of new group.
249
        // Close the fill operation for previous group data block
250
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
6,743✔
251
        pInfo->pFillInfo->prev.key = 0;
6,743✔
252
      }
253
    }
254

255
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
1,077,575✔
256
    code = taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
1,077,575✔
257
    QUERY_CHECK_CODE(code, lino, _end);
1,077,562!
258

259
    // current group has no more result to return
260
    if (pResBlock->info.rows > 0) {
1,077,562✔
261
      // 1. The result in current group not reach the threshold of output result, continue
262
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
263
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
657,199✔
264
        pResBlock->info.id.groupId = pInfo->curGroupId;
482,865✔
265
        return pResBlock;
482,865✔
266
      }
267

268
      doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
174,334✔
269
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
174,334!
270
        pResBlock->info.id.groupId = pInfo->curGroupId;
44✔
271
        return pResBlock;
44✔
272
      }
273
    } else if (pInfo->existNewGroupBlock) {  // try next group
420,363✔
274
      blockDataCleanup(pResBlock);
904✔
275

276
      doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order);
904✔
277
      if (pResBlock->info.rows > pResultInfo->threshold) {
904!
278
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
279
        return pResBlock;
×
280
      }
281
    } else {
282
      return NULL;
419,459✔
283
    }
284
  }
285

286
_end:
×
287
  if (code != TSDB_CODE_SUCCESS) {
×
288
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
289
    T_LONG_JMP(pTaskInfo->env, code);
×
290
  }
291
  return NULL;
×
292
}
293

294
static int32_t doFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,412,787✔
295
  int32_t            code = TSDB_CODE_SUCCESS;
1,412,787✔
296
  SFillOperatorInfo* pInfo = pOperator->info;
1,412,787✔
297
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
1,412,787✔
298

299
  if (pOperator->status == OP_EXEC_DONE) {
1,412,787!
300
    (*ppRes) = NULL;
×
301
    return code;
×
302
  }
303

304
  SSDataBlock* fillResult = NULL;
1,412,787✔
305
  while (true) {
306
    fillResult = doFillImpl(pOperator);
1,412,825✔
307
    if (fillResult == NULL) {
1,412,818✔
308
      setOperatorCompleted(pOperator);
482,297✔
309
      break;
482,299✔
310
    }
311

312
    code = doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
930,521✔
313
    if (code != TSDB_CODE_SUCCESS) {
930,533✔
314
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1!
315
      pTaskInfo->code = code;
1✔
316
      T_LONG_JMP(pTaskInfo->env, code);
1!
317
    }
318
    if (fillResult->info.rows > 0) {
930,532✔
319
      break;
930,494✔
320
    }
321
  }
322

323
  if (fillResult != NULL) {
1,412,793✔
324
    pOperator->resultInfo.totalRows += fillResult->info.rows;
930,495✔
325
  }
326

327
  (*ppRes) = fillResult;
1,412,793✔
328
  return code;
1,412,793✔
329
}
330

331
void destroyFillOperatorInfo(void* param) {
487,127✔
332
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
487,127✔
333
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
487,127✔
334
  blockDataDestroy(pInfo->pRes);
487,127✔
335
  pInfo->pRes = NULL;
487,128✔
336
  blockDataDestroy(pInfo->pFinalRes);
487,128✔
337
  pInfo->pFinalRes = NULL;
487,127✔
338

339
  cleanupExprSupp(&pInfo->noFillExprSupp);
487,127✔
340
  cleanupExprSupp(&pInfo->fillNullExprSupp);
487,127✔
341

342
  taosMemoryFreeClear(pInfo->p);
487,126!
343
  taosArrayDestroy(pInfo->matchInfo.pList);
487,129✔
344
  taosMemoryFreeClear(param);
487,125!
345
}
487,130✔
346

347
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
487,123✔
348
                            int32_t numOfNotFillCols, SExprInfo* pFillNullExpr, int32_t numOfFillNullExprs,
349
                            SNodeListNode* pValNode, STimeWindow win, int32_t capacity, const char* id,
350
                            SInterval* pInterval, int32_t fillType, int32_t order, SExecTaskInfo* pTaskInfo) {
351
  SFillColInfo* pColInfo =
352
      createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pFillNullExpr, numOfFillNullExprs, pValNode);
487,123✔
353
  if (!pColInfo) {
487,123✔
354
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
3!
355
    return terrno;
3✔
356
  }
357

358
  int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
487,120✔
359

360
  //  STimeWindow w = {0};
361
  //  getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC);
362
  pInfo->pFillInfo = NULL;
487,120✔
363
  int32_t code = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, numOfFillNullExprs, capacity, pInterval,
487,120✔
364
                                    fillType, pColInfo, pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo);
487,120✔
365
  if (code != TSDB_CODE_SUCCESS) {
487,122✔
366
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1!
367
    return code;
×
368
  }
369

370
  if (order == TSDB_ORDER_ASC) {
487,121✔
371
    pInfo->win.skey = win.skey;
453,463✔
372
    pInfo->win.ekey = win.ekey;
453,463✔
373
  } else {
374
    pInfo->win.skey = win.ekey;
33,658✔
375
    pInfo->win.ekey = win.skey;
33,658✔
376
  }
377
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
487,121✔
378
  if (!pInfo->p) {
487,129!
379
    return terrno;
×
380
  }
381

382
  if (pInfo->pFillInfo == NULL || pInfo->p == NULL) {
487,129!
383
    taosMemoryFree(pInfo->pFillInfo);
8✔
384
    taosMemoryFree(pInfo->p);
×
385
    return TSDB_CODE_OUT_OF_MEMORY;
×
386
  } else {
387
    return TSDB_CODE_SUCCESS;
487,121✔
388
  }
389
}
390

391
static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
487,121✔
392
  if (pInfo->noFillExprSupp.numOfExprs == 0) {
487,121✔
393
    return false;
390,549✔
394
  }
395

396
  for (int32_t i = 0; i < pInfo->noFillExprSupp.numOfExprs; ++i) {
97,893✔
397
    SExprInfo* exprInfo = pInfo->noFillExprSupp.pExprInfo + i;
97,826✔
398
    if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 &&
97,826!
399
        exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
97,827✔
400
      return true;
96,505✔
401
    }
402
  }
403
  return false;
67✔
404
}
405

406
static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiNode* pPhyFillNode, SExprSupp* pExprSupp,
487,118✔
407
                                           const char* idStr) {
408
  bool wstartExist = isWstartColumnExist(pInfo);
487,118✔
409

410
  if (wstartExist == false) {
487,124✔
411
    if (pPhyFillNode->pWStartTs->type != QUERY_NODE_TARGET) {
390,618!
412
      qError("pWStartTs of fill physical node is not a target node, %s", idStr);
×
413
      return TSDB_CODE_QRY_SYS_ERROR;
×
414
    }
415

416
    SExprInfo* pExpr = taosMemoryRealloc(pExprSupp->pExprInfo, (pExprSupp->numOfExprs + 1) * sizeof(SExprInfo));
390,618✔
417
    if (pExpr == NULL) {
390,619!
418
      return terrno;
×
419
    }
420

421
    int32_t code = createExprFromTargetNode(&pExpr[pExprSupp->numOfExprs], (STargetNode*)pPhyFillNode->pWStartTs);
390,619✔
422
    if (code != TSDB_CODE_SUCCESS) {
390,623!
423
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
424
      pExprSupp->numOfExprs += 1;
×
425
      pExprSupp->pExprInfo = pExpr;
×
426
      return code;
×
427
    }
428

429
    pExprSupp->numOfExprs += 1;
390,623✔
430
    pExprSupp->pExprInfo = pExpr;
390,623✔
431
  }
432

433
  return TSDB_CODE_SUCCESS;
487,129✔
434
}
435

436
int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
487,116✔
437
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
438
  QRY_PARAM_CHECK(pOptrInfo);
487,116!
439
  int32_t code = 0;
487,116✔
440
  int32_t lino = 0;
487,116✔
441

442
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
487,116✔
443
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
487,119✔
444
  if (pInfo == NULL || pOperator == NULL) {
487,121!
445
    code = terrno;
×
446
    goto _error;
×
447
  }
448

449
  pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
487,123✔
450
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
487,128!
451
  SExprInfo* pExprInfo = NULL;
487,128✔
452

453
  code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pExprInfo, &pInfo->numOfExpr);
487,128✔
454
  QUERY_CHECK_CODE(code, lino, _error);
487,125!
455

456
  pOperator->exprSupp.pExprInfo = pExprInfo;
487,125✔
457
  pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
487,125✔
458

459
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
487,125✔
460
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->pExprInfo, &pNoFillSupp->numOfExprs);
487,125✔
461
  QUERY_CHECK_CODE(code, lino, _error);
487,127!
462

463
  code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str);
487,127✔
464
  QUERY_CHECK_CODE(code, lino, _error);
487,129!
465

466
  code =
467
      initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore);
487,129✔
468
  QUERY_CHECK_CODE(code, lino, _error);
487,125!
469

470
  code = createExprInfo(pPhyFillNode->pFillNullExprs, NULL, &pInfo->fillNullExprSupp.pExprInfo,
487,125✔
471
                        &pInfo->fillNullExprSupp.numOfExprs);
472
  QUERY_CHECK_CODE(code, lino, _error);
487,124!
473
  code = initExprSupp(&pInfo->fillNullExprSupp, pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs,
487,124✔
474
                      &pTaskInfo->storageAPI.functionStore);
475
  QUERY_CHECK_CODE(code, lino, _error);
487,123!
476

477
  SInterval* pInterval =
487,123✔
478
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
487,123✔
479
          ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
309,121✔
480
          : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
487,123✔
481

482
  int32_t order = (pPhyFillNode->node.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
487,123✔
483
  int32_t type = convertFillType(pPhyFillNode->mode);
487,123✔
484

485
  SResultInfo* pResultInfo = &pOperator->resultInfo;
487,119✔
486

487
  initResultSizeInfo(&pOperator->resultInfo, 4096);
487,119✔
488
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
487,121✔
489
  if (code != TSDB_CODE_SUCCESS) {
487,130!
490
    goto _error;
×
491
  }
492
  code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr, &pTaskInfo->storageAPI.functionStore);
487,130✔
493
  if (code != TSDB_CODE_SUCCESS) {
487,125!
494
    goto _error;
×
495
  }
496

497
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
487,125✔
498
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
487,125✔
499

500
  int32_t numOfOutputCols = 0;
487,125✔
501
  code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
487,125✔
502
                             COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
503

504
  QUERY_CHECK_CODE(code, lino, _error);
487,122!
505
  code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs,
487,122✔
506
                      pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs,
507
                      (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
487,122✔
508
                      pTaskInfo->id.str, pInterval, type, order, pTaskInfo);
487,122✔
509
  if (code != TSDB_CODE_SUCCESS) {
487,122!
510
    goto _error;
×
511
  }
512

513
  pInfo->pFinalRes = NULL;
487,122✔
514

515
  code = createOneDataBlock(pInfo->pRes, false, &pInfo->pFinalRes);
487,122✔
516
  if (code) {
487,122!
517
    goto _error;
×
518
  }
519

520
  code = blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);
487,122✔
521
  if (code != TSDB_CODE_SUCCESS) {
487,128!
522
    goto _error;
×
523
  }
524

525
  code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
487,128✔
526
  if (code != TSDB_CODE_SUCCESS) {
487,128!
527
    goto _error;
×
528
  }
529

530
  setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
487,128✔
531
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFillNext, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL,
487,128✔
532
                                         optrDefaultGetNextExtFn, NULL);
533

534
  code = appendDownstream(pOperator, &downstream, 1);
487,127✔
535
  if (code != TSDB_CODE_SUCCESS) {
487,130!
536
    goto _error;
×
537
  }
538

539
  *pOptrInfo = pOperator;
487,130✔
540
  return TSDB_CODE_SUCCESS;
487,130✔
541

542
_error:
×
543
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
544

545
  if (pInfo != NULL) {
×
546
    destroyFillOperatorInfo(pInfo);
×
547
  }
548
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
549
  pTaskInfo->code = code;
×
550
  return code;
×
551
}
552

553
static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) {
431,045✔
554
  int64_t skey, ekey, next;
555
  if (order == TSDB_ORDER_ASC) {
431,045✔
556
    skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval);
399,759✔
557
    taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
399,760✔
558

559
    ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval);
399,760✔
560
    next = ekey;
399,760✔
561
    while (next < pInfo->win.ekey) {
1,662,485✔
562
      next = taosTimeAdd(ekey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit,
1,262,724✔
563
                         pInfo->pFillInfo->interval.precision);
1,262,724✔
564
      ekey = next > pInfo->win.ekey ? ekey : next;
1,262,725✔
565
    }
566
    pInfo->win.ekey = ekey;
399,761✔
567
  } else {
568
    skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval);
31,286✔
569
    next = skey;
31,287✔
570
    while (next < pInfo->win.skey) {
128,232✔
571
      next = taosTimeAdd(skey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit,
96,945✔
572
                         pInfo->pFillInfo->interval.precision);
96,945✔
573
      skey = next > pInfo->win.skey ? skey : next;
96,945✔
574
    }
575
    taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
31,287✔
576
    pInfo->win.ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval);
31,287✔
577
  }
578
}
431,048✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc