• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4700

29 Aug 2025 06:36AM UTC coverage: 58.335% (+0.008%) from 58.327%
#4700

push

travis-ci

web-flow
fix(gpt): fix race-condition in preparing tmp files (#32800)

133694 of 291873 branches covered (45.81%)

Branch coverage included in aggregate %.

5 of 34 new or added lines in 6 files covered. (14.71%)

242 existing lines in 22 files now uncovered.

201983 of 283561 relevant lines covered (71.23%)

29993718.15 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

51.2
/source/libs/executor/src/filloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "os.h"
18
#include "query.h"
19
#include "taosdef.h"
20
#include "taoserror.h"
21
#include "tmsg.h"
22
#include "ttypes.h"
23

24
#include "executorInt.h"
25
#include "tcommon.h"
26
#include "thash.h"
27
#include "ttime.h"
28

29
#include "function.h"
30
#include "operator.h"
31
#include "querynodes.h"
32
#include "querytask.h"
33
#include "tdatablock.h"
34
#include "tfill.h"
35

36
typedef struct STimeRange {
37
  TSKEY    skey;
38
  TSKEY    ekey;
39
  uint64_t groupId;
40
} STimeRange;
41

42
typedef struct SFillOperatorInfo {
43
  struct SFillInfo* pFillInfo;
44
  SSDataBlock*      pRes;
45
  SSDataBlock*      pFinalRes;
46
  int64_t           totalInputRows;
47
  void**            p;
48
  SSDataBlock*      existNewGroupBlock;
49
  STimeWindow       win;
50
  SColMatchInfo     matchInfo;
51
  int32_t           primaryTsCol;
52
  int32_t           primarySrcSlotId;
53
  uint64_t          curGroupId;  // current handled group id
54
  SExprInfo*        pExprInfo;
55
  int32_t           numOfExpr;
56
  SExprSupp         noFillExprSupp;
57
  SExprSupp         fillNullExprSupp;
58
  SList*            pFillSavedBlockList;
59
  SNode*            pTimeRange;  // STimeRangeNode for stream fill
60
} SFillOperatorInfo;
61

62
static void destroyFillOperatorInfo(void* param);
63
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
64
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo);
65
static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order);
66

67
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
3,350✔
68
                                               SResultInfo* pResultInfo, int32_t order) {
69
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
3,350✔
70
  SSDataBlock*   pResBlock = pInfo->pFinalRes;
3,350✔
71
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,350✔
72

73
  //  int32_t order = TSDB_ORDER_ASC;
74
  int32_t scanFlag = MAIN_SCAN;
3,350✔
75
  //  getTableScanInfo(pOperator, &order, &scanFlag, false);
76
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
3,350✔
77

78
  blockDataCleanup(pInfo->pRes);
3,350✔
79
  doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);
3,350✔
80

81
  reviseFillStartAndEndKey(pOperator->info, order);
3,350✔
82

83
  int64_t ts = (order == TSDB_ORDER_ASC) ? pInfo->existNewGroupBlock->info.window.ekey
6,700✔
84
                                         : pInfo->existNewGroupBlock->info.window.skey;
3,350!
85
  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
3,350✔
86

87
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
3,350✔
88
  if (pInfo->pFillInfo->type == TSDB_FILL_PREV || pInfo->pFillInfo->type == TSDB_FILL_LINEAR) {
3,350✔
89
    int32_t code = fillResetPrevForNewGroup(pInfo->pFillInfo);
787✔
90
    if (code != TSDB_CODE_SUCCESS) {
787!
91
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
92
      T_LONG_JMP(pTaskInfo->env, code);
×
93
    }
94
  }
95

96
  int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
3,350✔
97
  int32_t code = taosFillResultDataBlock2(pInfo->pFillInfo, pResBlock, numOfResultRows, NULL);
3,350✔
98
  if (code != TSDB_CODE_SUCCESS) {
3,350!
99
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
100
    T_LONG_JMP(pTaskInfo->env, code);
×
101
  }
102

103
  pInfo->curGroupId = pInfo->existNewGroupBlock->info.id.groupId;
3,350✔
104
  pInfo->existNewGroupBlock = NULL;
3,350✔
105
}
3,350✔
106

107
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
3,927,111✔
108
                                            SResultInfo* pResultInfo, int32_t order) {
109
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
3,927,111✔
110
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
3,927,111✔
111
    int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
1,284,870✔
112
    int32_t code = taosFillResultDataBlock2(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows, NULL);
1,284,870✔
113
    if (code != TSDB_CODE_SUCCESS) {
1,284,878✔
114
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
7!
115
      T_LONG_JMP(pTaskInfo->env, code);
7!
116
    }
117
    pInfo->pRes->info.id.groupId = pInfo->curGroupId;
1,284,871✔
118
    return;
1,284,871✔
119
  }
120

121
  // handle the cached new group data block
122
  if (pInfo->existNewGroupBlock) {
2,642,212✔
123
    doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order);
3,350✔
124
  }
125
}
126

127
void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
644,521✔
128
  int32_t            code = TSDB_CODE_SUCCESS;
644,521✔
129
  int32_t            lino = 0;
644,521✔
130
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
644,521✔
131
  SFillOperatorInfo* pInfo = pOperator->info;
644,521✔
132
  SExprSupp*         pSup = &pOperator->exprSupp;
644,521✔
133
  code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
644,521✔
134
  QUERY_CHECK_CODE(code, lino, _end);
644,515!
135
  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
644,515!
136
  QUERY_CHECK_CODE(code, lino, _end);
644,520!
137

138
  // reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
139
  pInfo->pRes->info.rows = 0;
644,520✔
140
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
644,520✔
141
  code = setInputDataBlock(pNoFillSupp, pBlock, order, scanFlag, false);
644,520✔
142
  QUERY_CHECK_CODE(code, lino, _end);
644,515!
143

144
  code = projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs,
644,515✔
145
                               NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
644,515!
146
  QUERY_CHECK_CODE(code, lino, _end);
644,517!
147

148
  if (pInfo->fillNullExprSupp.pExprInfo) {
644,517✔
149
    pInfo->pRes->info.rows = 0;
242,101✔
150
    code = setInputDataBlock(&pInfo->fillNullExprSupp, pBlock, order, scanFlag, false);
242,101✔
151
    QUERY_CHECK_CODE(code, lino, _end);
242,103!
152
    code = projectApplyFunctions(pInfo->fillNullExprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->fillNullExprSupp.pCtx,
242,103✔
153
        pInfo->fillNullExprSupp.numOfExprs, NULL, GET_STM_RTINFO(pOperator->pTaskInfo));
242,103!
154
  }
155

156
  pInfo->pRes->info.id.groupId = pBlock->info.id.groupId;
644,520✔
157

158
_end:
644,520✔
159
  if (code != TSDB_CODE_SUCCESS) {
644,520!
160
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
161
    T_LONG_JMP(pTaskInfo->env, code);
×
162
  }
163
}
644,520✔
164

165
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo) {
787✔
166
  int32_t code = TSDB_CODE_SUCCESS;
787✔
167
  int32_t lino = 0;
787✔
168
  for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
6,403✔
169
    if (!pFillInfo->pFillCol[colIdx].notFillCol) {
5,616✔
170
      SGroupKeys* key = taosArrayGet(pFillInfo->prev.pRowVal, colIdx);
4,109✔
171
      QUERY_CHECK_NULL(key, code, lino, _end, terrno);
4,109!
172
      key->isNull = true;
4,109✔
173
    }
174
  }
175

176
_end:
787✔
177
  if (code != TSDB_CODE_SUCCESS) {
787!
178
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
179
  }
180
  return code;
787✔
181
}
182

183
static SSDataBlock* doFillImpl2(SOperatorInfo* pOperator) {
3,927,100✔
184
  int32_t            code = TSDB_CODE_SUCCESS;
3,927,100✔
185
  int32_t            lino = 0;
3,927,100✔
186
  SFillOperatorInfo* pInfo = pOperator->info;
3,927,100✔
187
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
3,927,100✔
188
  if (pInfo == NULL || pTaskInfo == NULL) {
3,927,100!
189
    qError("%s failed at line %d since pInfo or pTaskInfo is NULL.", __func__, __LINE__);
×
190
    return NULL;
×
191
  }
192

193
  SResultInfo* pResultInfo = &pOperator->resultInfo;
3,927,130✔
194
  SSDataBlock* pResBlock = pInfo->pFinalRes;
3,927,130✔
195
  if (pResBlock == NULL) {
3,927,130!
196
    qError("%s failed at line %d since pResBlock is NULL.", __func__, __LINE__);
×
197
    return NULL;
×
198
  }
199
  blockDataCleanup(pResBlock);
3,927,130✔
200
  int32_t        order = pInfo->pFillInfo->order;
3,927,119✔
201
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
3,927,119✔
202

203
  doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
3,927,119✔
204
  if (pResBlock->info.rows > 0) {
3,927,093✔
205
    pResBlock->info.id.groupId = pInfo->curGroupId;
1,131,279✔
206
    return pResBlock;
1,131,279✔
207
  }
208

209
  while (1) {
480,868✔
210
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
3,276,682✔
211
    if (pBlock == NULL) {
3,276,728✔
212
      if (pInfo->totalInputRows == 0 &&
2,635,578✔
213
          (pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) {
1,421,295✔
214
        setOperatorCompleted(pOperator);
150,152✔
215
        return NULL;
2,795,855✔
216
      } else if (pInfo->totalInputRows == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
2,485,426✔
217
        reviseFillStartAndEndKey(pInfo, order);
639,336✔
218
      }
219

220
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
2,485,422✔
221
    } else {
222
      pResBlock->info.scanFlag = pBlock->info.scanFlag;
641,150✔
223
      pBlock->info.dataLoad = 1;
641,150✔
224
      code = blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
641,150✔
225
      QUERY_CHECK_CODE(code, lino, _end);
641,162!
226

227
      blockDataCleanup(pInfo->pRes);
641,162✔
228
      code = blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
641,171✔
229
      QUERY_CHECK_CODE(code, lino, _end);
641,171!
230
      code = blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows);
641,171✔
231
      QUERY_CHECK_CODE(code, lino, _end);
641,170!
232
      doApplyScalarCalculation(pOperator, pBlock, order, pBlock->info.scanFlag);
641,170✔
233

234
      if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) {
1,278,990✔
235
        if (pInfo->curGroupId == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
637,820✔
236
          reviseFillStartAndEndKey(pInfo, order);
612,549✔
237
        }
238

239
        pInfo->curGroupId = pInfo->pRes->info.id.groupId;  // the first data block
637,821✔
240
        pInfo->totalInputRows += pInfo->pRes->info.rows;
637,821✔
241

242
        int64_t ts = (order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
637,821✔
243
        taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
637,821✔
244
        taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
637,821✔
245
      } else if (pInfo->curGroupId != pBlock->info.id.groupId) {  // the new group data block
3,350!
246
        pInfo->existNewGroupBlock = pBlock;
3,350✔
247

248
        // Fill the previous group data block, before handle the data block of new group.
249
        // Close the fill operation for previous group data block
250
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
3,350✔
251
        pInfo->pFillInfo->prev.key = 0;
3,350✔
252
      }
253
    }
254

255
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
3,126,593✔
256
    bool wantMoreBlock = false;
3,126,593✔
257
    code = taosFillResultDataBlock2(pInfo->pFillInfo, pResBlock, numOfResultRows, &wantMoreBlock);
3,126,593✔
258
    QUERY_CHECK_CODE(code, lino, _end);
3,126,571!
259

260
    // current group has no more result to return
261
    if (pResBlock->info.rows > 0) {
3,126,571✔
262
      // 1. The result in current group not reach the threshold of output result, continue
263
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
264
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
1,408,907!
265
        pResBlock->info.id.groupId = pInfo->curGroupId;
1,408,907✔
266
        return pResBlock;
1,408,907✔
267
      }
268

269
      doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
×
270
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
×
271
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
272
        return pResBlock;
×
273
      }
274
    } else if (pInfo->existNewGroupBlock) {  // try next group
1,717,664!
275
      blockDataCleanup(pResBlock);
×
276

277
      doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order);
×
278
      if (pResBlock->info.rows > pResultInfo->threshold) {
×
279
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
280
        return pResBlock;
×
281
      }
282
    } else {
283
      if (wantMoreBlock) continue;
1,717,664✔
284
      return NULL;
1,236,796✔
285
    }
286
  }
287

288
_end:
×
289
  if (code != TSDB_CODE_SUCCESS) {
×
290
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
291
    T_LONG_JMP(pTaskInfo->env, code);
×
292
  }
293
  return NULL;
×
294
}
295

296
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
×
297
  int32_t            code = TSDB_CODE_SUCCESS;
×
298
  int32_t            lino = 0;
×
299
  SFillOperatorInfo* pInfo = pOperator->info;
×
300
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
×
301
  if (pInfo == NULL || pTaskInfo == NULL) {
×
302
    qError("%s failed at line %d since pInfo or pTaskInfo is NULL.", __func__, __LINE__);
×
303
    return NULL;
×
304
  }
305

306
  SResultInfo* pResultInfo = &pOperator->resultInfo;
×
307
  SSDataBlock* pResBlock = pInfo->pFinalRes;
×
308
  if (pResBlock == NULL) {
×
309
    qError("%s failed at line %d since pResBlock is NULL.", __func__, __LINE__);
×
310
    return NULL;
×
311
  }
312

313
  blockDataCleanup(pResBlock);
×
314

315
  int32_t order = pInfo->pFillInfo->order;
×
316

317
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
×
318
#if 0
319
  // the scan order may be different from the output result order for agg interval operator.
320
  if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL) {
321
    order = ((SIntervalAggOperatorInfo*) pDownstream->info)->resultTsOrder;
322
  } else {
323
    order = pInfo->pFillInfo->order;
324
  }
325
#endif
326

327
  doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
×
328
  if (pResBlock->info.rows > 0) {
×
329
    pResBlock->info.id.groupId = pInfo->curGroupId;
×
330
    return pResBlock;
×
331
  }
332

333
  while (1) {
×
334
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
335
    if (pBlock == NULL) {
×
336
      if (pInfo->totalInputRows == 0 &&
×
337
          (pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) {
×
338
        setOperatorCompleted(pOperator);
×
339
        return NULL;
×
340
      } else if (pInfo->totalInputRows == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
×
341
        reviseFillStartAndEndKey(pInfo, order);
×
342
      }
343

344
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
×
345
    } else {
346
      pResBlock->info.scanFlag = pBlock->info.scanFlag;
×
347
      pBlock->info.dataLoad = 1;
×
348
      code = blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
×
349
      QUERY_CHECK_CODE(code, lino, _end);
×
350

351
      blockDataCleanup(pInfo->pRes);
×
352
      code = blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
×
353
      QUERY_CHECK_CODE(code, lino, _end);
×
354
      code = blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows);
×
355
      QUERY_CHECK_CODE(code, lino, _end);
×
356
      doApplyScalarCalculation(pOperator, pBlock, order, pBlock->info.scanFlag);
×
357

358
      if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) {
×
359
        if (pInfo->curGroupId == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
×
360
          reviseFillStartAndEndKey(pInfo, order);
×
361
        }
362

363
        pInfo->curGroupId = pInfo->pRes->info.id.groupId;  // the first data block
×
364
        pInfo->totalInputRows += pInfo->pRes->info.rows;
×
365

366
        int64_t ts = (order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
×
367
        taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
×
368
        taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
×
369
      } else if (pInfo->curGroupId != pBlock->info.id.groupId) {  // the new group data block
×
370
        pInfo->existNewGroupBlock = pBlock;
×
371

372
        // Fill the previous group data block, before handle the data block of new group.
373
        // Close the fill operation for previous group data block
374
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
×
375
        pInfo->pFillInfo->prev.key = 0;
×
376
      }
377
    }
378

379
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
×
380
    code = taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
×
381
    QUERY_CHECK_CODE(code, lino, _end);
×
382

383
    // current group has no more result to return
384
    if (pResBlock->info.rows > 0) {
×
385
      // 1. The result in current group not reach the threshold of output result, continue
386
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
387
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
×
388
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
389
        return pResBlock;
×
390
      }
391

392
      doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
×
393
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
×
394
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
395
        return pResBlock;
×
396
      }
397
    } else if (pInfo->existNewGroupBlock) {  // try next group
×
398
      blockDataCleanup(pResBlock);
×
399

400
      doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order);
×
401
      if (pResBlock->info.rows > pResultInfo->threshold) {
×
402
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
403
        return pResBlock;
×
404
      }
405
    } else {
406
      return NULL;
×
407
    }
408
  }
409

410
_end:
×
411
  if (code != TSDB_CODE_SUCCESS) {
×
412
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
413
    T_LONG_JMP(pTaskInfo->env, code);
×
414
  }
415
  return NULL;
×
416
}
417

418
static int32_t doFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,927,078✔
419
  int32_t            code = TSDB_CODE_SUCCESS;
3,927,078✔
420
  SFillOperatorInfo* pInfo = pOperator->info;
3,927,078✔
421
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
3,927,078✔
422

423
  if (pInfo->pTimeRange != NULL) {
3,927,078!
424
    STimeWindow pWinRange = {0};
×
425
    bool        isWinRangeValid = false;
×
426
    calcTimeRange((STimeRangeNode*)pInfo->pTimeRange, &pTaskInfo->pStreamRuntimeInfo->funcInfo, &pWinRange,
×
427
                  &isWinRangeValid);
428

429
    if (isWinRangeValid) {
×
430
      pInfo->win.skey = pWinRange.skey;
×
431
      pInfo->win.ekey = pWinRange.ekey;
×
432
    }
433
  }
434

435
  if (pOperator->status == OP_EXEC_DONE) {
3,927,078!
436
    (*ppRes) = NULL;
×
437
    return code;
×
438
  }
439

440
  SSDataBlock* fillResult = NULL;
3,927,078✔
441
  while (true) {
442
    fillResult = doFillImpl2(pOperator);
3,927,078✔
443
    if (fillResult == NULL) {
3,927,097✔
444
      setOperatorCompleted(pOperator);
1,386,982✔
445
      break;
1,386,987✔
446
    }
447

448
    code = doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
2,540,115✔
449
    if (code != TSDB_CODE_SUCCESS) {
2,540,163✔
450
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
3!
451
      pTaskInfo->code = code;
3✔
452
      T_LONG_JMP(pTaskInfo->env, code);
3!
453
    }
454
    if (fillResult->info.rows > 0) {
2,540,160!
455
      break;
2,540,160✔
456
    }
457
  }
458

459
  if (fillResult != NULL) {
3,927,147✔
460
    pOperator->resultInfo.totalRows += fillResult->info.rows;
2,540,161✔
461
  }
462

463
  (*ppRes) = fillResult;
3,927,147✔
464
  return code;
3,927,147✔
465
}
466

467
void destroyFillOperatorInfo(void* param) {
1,402,033✔
468
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
1,402,033✔
469
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
1,402,033✔
470
  blockDataDestroy(pInfo->pRes);
1,402,036✔
471
  pInfo->pRes = NULL;
1,402,038✔
472
  blockDataDestroy(pInfo->pFinalRes);
1,402,038✔
473
  pInfo->pFinalRes = NULL;
1,402,036✔
474

475
  cleanupExprSupp(&pInfo->noFillExprSupp);
1,402,036✔
476
  cleanupExprSupp(&pInfo->fillNullExprSupp);
1,402,037✔
477

478
  taosMemoryFreeClear(pInfo->p);
1,402,033!
479
  taosArrayDestroy(pInfo->matchInfo.pList);
1,402,036✔
480
  taosMemoryFreeClear(param);
1,402,034!
481
}
1,402,032✔
482

483
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
1,402,009✔
484
                            int32_t numOfNotFillCols, SExprInfo* pFillNullExpr, int32_t numOfFillNullExprs,
485
                            SNodeListNode* pValNode, STimeWindow win, int32_t capacity, const char* id,
486
                            SInterval* pInterval, int32_t fillType, int32_t order, SExecTaskInfo* pTaskInfo) {
487
  SFillColInfo* pColInfo =
488
      createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pFillNullExpr, numOfFillNullExprs, pValNode);
1,402,009✔
489
  if (!pColInfo) {
1,402,015!
490
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
491
    return terrno;
×
492
  }
493

494
  int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
1,402,018✔
495

496
  //  STimeWindow w = {0};
497
  //  getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC);
498
  pInfo->pFillInfo = NULL;
1,402,018✔
499
  int32_t code = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, numOfFillNullExprs, capacity, pInterval,
1,402,018✔
500
                                    fillType, pColInfo, pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo);
1,402,018✔
501
  if (code != TSDB_CODE_SUCCESS) {
1,401,998!
502
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
503
    return code;
×
504
  }
505

506
  if (order == TSDB_ORDER_ASC) {
1,402,001✔
507
    pInfo->win.skey = win.skey;
1,304,694✔
508
    pInfo->win.ekey = win.ekey;
1,304,694✔
509
  } else {
510
    pInfo->win.skey = win.ekey;
97,307✔
511
    pInfo->win.ekey = win.skey;
97,307✔
512
  }
513
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
1,402,001!
514
  if (!pInfo->p) {
1,402,022!
515
    return terrno;
×
516
  }
517

518
  if (pInfo->pFillInfo == NULL) {
1,402,022!
519
    taosMemoryFree(pInfo->pFillInfo);
×
520
    taosMemoryFree(pInfo->p);
×
521
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
522
  } else {
523
    return TSDB_CODE_SUCCESS;
1,402,022✔
524
  }
525
}
526

527
static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
1,402,007✔
528
  if (pInfo->noFillExprSupp.numOfExprs == 0) {
1,402,007✔
529
    return false;
1,144,312✔
530
  }
531

532
  for (int32_t i = 0; i < pInfo->noFillExprSupp.numOfExprs; ++i) {
263,451✔
533
    SExprInfo* exprInfo = pInfo->noFillExprSupp.pExprInfo + i;
263,394✔
534
    if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 &&
263,394!
535
        exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
263,395✔
536
      return true;
257,638✔
537
    }
538
  }
539
  return false;
57✔
540
}
541

542
static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiNode* pPhyFillNode, SExprSupp* pExprSupp,
1,402,009✔
543
                                           const char* idStr) {
544
  bool wstartExist = isWstartColumnExist(pInfo);
1,402,009✔
545

546
  if (wstartExist == false) {
1,402,008✔
547
    if (pPhyFillNode->pWStartTs->type != QUERY_NODE_TARGET) {
1,144,377!
548
      qError("pWStartTs of fill physical node is not a target node, %s", idStr);
×
549
      return TSDB_CODE_QRY_SYS_ERROR;
×
550
    }
551

552
    SExprInfo* pExpr = taosMemoryRealloc(pExprSupp->pExprInfo, (pExprSupp->numOfExprs + 1) * sizeof(SExprInfo));
1,144,377!
553
    if (pExpr == NULL) {
1,144,374!
554
      return terrno;
×
555
    }
556

557
    int32_t code = createExprFromTargetNode(&pExpr[pExprSupp->numOfExprs], (STargetNode*)pPhyFillNode->pWStartTs);
1,144,374✔
558
    if (code != TSDB_CODE_SUCCESS) {
1,144,393!
UNCOV
559
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
560
      pExprSupp->numOfExprs += 1;
×
561
      pExprSupp->pExprInfo = pExpr;
×
562
      return code;
×
563
    }
564

565
    pExprSupp->numOfExprs += 1;
1,144,393✔
566
    pExprSupp->pExprInfo = pExpr;
1,144,393✔
567
  }
568

569
  return TSDB_CODE_SUCCESS;
1,402,024✔
570
}
571

572
static int32_t resetFillOperState(SOperatorInfo* pOper) {
×
573
  SFillOperatorInfo* pFill = pOper->info;
×
574
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
575
  pOper->status = OP_NOT_OPENED;
×
576
  SFillPhysiNode* pPhyNode = (SFillPhysiNode*)pOper->pPhyNode;
×
577

578
  pFill->curGroupId = 0;
×
579
  pFill->totalInputRows = 0;
×
580
  blockDataCleanup(pFill->pRes);
×
581
  blockDataCleanup(pFill->pFinalRes);
×
582

583
  int64_t startKey = (pFill->pFillInfo->order == TSDB_ORDER_ASC) ? pPhyNode->timeRange.skey : pPhyNode->timeRange.ekey;
×
584
  pFill->pFillInfo->start = startKey;
×
585
  pFill->pFillInfo->currentKey = startKey;
×
586
  pFill->pFillInfo->end = startKey;
×
587

588
  pFill->pFillInfo->numOfRows = 0;
×
589
  pFill->pFillInfo->index = -1;
×
590
  pFill->pFillInfo->numOfTotal = 0;
×
591
  pFill->pFillInfo->numOfCurrent = 0;
×
592
  pFill->pFillInfo->isFilled = false;
×
593
  pFill->pFillInfo->prev.key = 0;
×
594
  pFill->pFillInfo->next.key = 0;
×
595
  int32_t size = taosArrayGetSize(pFill->pFillInfo->prev.pRowVal);
×
596
  for (int32_t i = 0; i < size; ++i) {
×
597
    SGroupKeys* pKey = taosArrayGet(pFill->pFillInfo->prev.pRowVal, i);
×
598
    pKey->isNull = true;
×
599
  }
600
  size = taosArrayGetSize(pFill->pFillInfo->next.pRowVal);
×
601
  for (int32_t i = 0; i < size; ++i) {
×
602
    SGroupKeys* pKey = taosArrayGet(pFill->pFillInfo->next.pRowVal, i);
×
603
    pKey->isNull = true;
×
604
  }
605

606
  taosMemoryFreeClear(pFill->pFillInfo->pTags);
×
607
  taosArrayDestroy(pFill->pFillInfo->pColFillProgress);
×
608
  pFill->pFillInfo->pColFillProgress = NULL;
×
609

610
  tdListFreeP(pFill->pFillInfo->pFillSavedBlockList, destroyFillBlock);
×
611
  pFill->pFillInfo->pFillSavedBlockList = NULL;
×
612

613
  int32_t order = (pPhyNode->node.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
×
614
  if (order == TSDB_ORDER_ASC) {
×
615
    pFill->win.skey = pPhyNode->timeRange.skey;
×
616
    pFill->win.ekey = pPhyNode->timeRange.ekey;
×
617
  } else {
618
    pFill->win.skey = pPhyNode->timeRange.ekey;
×
619
    pFill->win.ekey = pPhyNode->timeRange.skey;
×
620
  }
621

622
  return TSDB_CODE_SUCCESS;
×
623
}
624

625
int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
1,402,003✔
626
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
627
  QRY_PARAM_CHECK(pOptrInfo);
1,402,003!
628
  int32_t code = 0;
1,402,003✔
629
  int32_t lino = 0;
1,402,003✔
630

631
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
1,402,003!
632
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
1,402,001!
633
  if (pInfo == NULL || pOperator == NULL) {
1,402,012!
634
    code = terrno;
1✔
635
    goto _error;
×
636
  }
637

638
  pOperator->pPhyNode = pPhyFillNode;
1,402,011✔
639
  pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
1,402,011✔
640
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
1,402,027!
641
  SExprInfo* pExprInfo = NULL;
1,402,027✔
642

643
  code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pExprInfo, &pInfo->numOfExpr);
1,402,027✔
644
  QUERY_CHECK_CODE(code, lino, _error);
1,402,018!
645

646
  pOperator->exprSupp.pExprInfo = pExprInfo;
1,402,018✔
647
  pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
1,402,018✔
648

649
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
1,402,018✔
650
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->pExprInfo, &pNoFillSupp->numOfExprs);
1,402,018✔
651
  QUERY_CHECK_CODE(code, lino, _error);
1,402,023!
652

653
  code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str);
1,402,023✔
654
  QUERY_CHECK_CODE(code, lino, _error);
1,402,020!
655

656
  code =
657
      initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore);
1,402,020✔
658
  QUERY_CHECK_CODE(code, lino, _error);
1,402,000!
659

660
  code = createExprInfo(pPhyFillNode->pFillNullExprs, NULL, &pInfo->fillNullExprSupp.pExprInfo,
1,402,000✔
661
                        &pInfo->fillNullExprSupp.numOfExprs);
662
  QUERY_CHECK_CODE(code, lino, _error);
1,402,016!
663
  code = initExprSupp(&pInfo->fillNullExprSupp, pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs,
1,402,016✔
664
                      &pTaskInfo->storageAPI.functionStore);
665
  QUERY_CHECK_CODE(code, lino, _error);
1,402,008!
666

667
  SInterval* pInterval =
1,402,008✔
668
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
1,402,008✔
669
          ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
909,630✔
670
          : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
1,402,008✔
671

672
  int32_t order = (pPhyFillNode->node.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
1,402,008✔
673
  int32_t type = convertFillType(pPhyFillNode->mode);
1,402,008✔
674

675
  SResultInfo* pResultInfo = &pOperator->resultInfo;
1,402,000✔
676

677
  initResultSizeInfo(&pOperator->resultInfo, 4096);
1,402,000✔
678
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
1,402,002✔
679
  if (code != TSDB_CODE_SUCCESS) {
1,402,032!
680
    goto _error;
×
681
  }
682
  code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr, &pTaskInfo->storageAPI.functionStore);
1,402,032✔
683
  if (code != TSDB_CODE_SUCCESS) {
1,402,018!
684
    goto _error;
×
685
  }
686

687
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
1,402,018✔
688
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
1,402,018✔
689

690
  int32_t numOfOutputCols = 0;
1,402,018✔
691
  code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
1,402,018✔
692
                             COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
693

694
  QUERY_CHECK_CODE(code, lino, _error);
1,401,980!
695
  code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs,
1,401,980✔
696
                      pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs,
697
                      (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
1,401,980✔
698
                      pTaskInfo->id.str, pInterval, type, order, pTaskInfo);
1,401,980✔
699
  if (code != TSDB_CODE_SUCCESS) {
1,402,021!
700
    goto _error;
×
701
  }
702
  TSWAP(pInfo->pTimeRange, pPhyFillNode->pTimeRange);
1,402,021✔
703
  pInfo->pFinalRes = NULL;
1,402,021✔
704

705
  code = createOneDataBlock(pInfo->pRes, false, &pInfo->pFinalRes);
1,402,021✔
706
  if (code) {
1,402,014!
707
    goto _error;
×
708
  }
709

710
  code = blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);
1,402,014✔
711
  if (code != TSDB_CODE_SUCCESS) {
1,402,034!
712
    goto _error;
×
713
  }
714

715
  code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
1,402,034✔
716
                            pTaskInfo->pStreamRuntimeInfo);
1,402,034✔
717
  if (code != TSDB_CODE_SUCCESS) {
1,402,027!
718
    goto _error;
×
719
  }
720

721
  setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
1,402,027✔
722
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFillNext, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL,
1,402,025✔
723
                                         optrDefaultGetNextExtFn, NULL);
724
  setOperatorResetStateFn(pOperator, resetFillOperState);
1,402,033✔
725

726
  code = appendDownstream(pOperator, &downstream, 1);
1,402,029✔
727
  if (code != TSDB_CODE_SUCCESS) {
1,402,033!
728
    goto _error;
×
729
  }
730

731
  *pOptrInfo = pOperator;
1,402,033✔
732
  return TSDB_CODE_SUCCESS;
1,402,033✔
733

734
_error:
×
735
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
736

737
  if (pInfo != NULL) {
×
738
    destroyFillOperatorInfo(pInfo);
×
739
  }
740
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
741
  pTaskInfo->code = code;
×
742
  return code;
×
743
}
744

745
static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) {
1,255,234✔
746
  int64_t skey, ekey, next;
747
  if (order == TSDB_ORDER_ASC) {
1,255,234✔
748
    skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval);
1,161,792✔
749
    taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
1,161,789✔
750

751
    ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval);
1,161,787✔
752
    next = ekey;
1,161,796✔
753
    while (next < pInfo->win.ekey) {
4,755,023✔
754
      next = taosTimeAdd(ekey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit,
3,593,234✔
755
                         pInfo->pFillInfo->interval.precision, NULL);
3,593,234✔
756
      if (next == ekey) break;
3,593,227!
757
      ekey = next > pInfo->win.ekey ? ekey : next;
3,593,227✔
758
    }
759
    pInfo->win.ekey = ekey;
1,161,789✔
760
  } else {
761
    skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval);
93,442✔
762
    next = skey;
93,442✔
763
    while (next < pInfo->win.skey) {
415,945✔
764
      next = taosTimeAdd(skey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit,
322,503✔
765
                         pInfo->pFillInfo->interval.precision, NULL);
322,503✔
766
      if (next == skey) break;
322,503!
767
      skey = next > pInfo->win.skey ? skey : next;
322,503✔
768
    }
769
    taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
93,442✔
770
    pInfo->win.ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval);
93,442✔
771
  }
772
}
1,255,231✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc