• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3838

03 Apr 2025 01:31PM UTC coverage: 61.998% (+0.007%) from 61.991%
#3838

push

travis-ci

web-flow
Merge pull request #30644 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

152866 of 315065 branches covered (48.52%)

Branch coverage included in aggregate %.

402 of 459 new or added lines in 2 files covered. (87.58%)

236 existing lines in 2 files now uncovered.

237758 of 314991 relevant lines covered (75.48%)

5809931.38 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

54.42
/source/libs/executor/src/filloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "os.h"
18
#include "query.h"
19
#include "taosdef.h"
20
#include "tmsg.h"
21
#include "ttypes.h"
22

23
#include "executorInt.h"
24
#include "tcommon.h"
25
#include "thash.h"
26
#include "ttime.h"
27

28
#include "function.h"
29
#include "operator.h"
30
#include "querynodes.h"
31
#include "querytask.h"
32
#include "tdatablock.h"
33
#include "tfill.h"
34

35
typedef struct STimeRange {
36
  TSKEY    skey;
37
  TSKEY    ekey;
38
  uint64_t groupId;
39
} STimeRange;
40

41
typedef struct SFillOperatorInfo {
42
  struct SFillInfo* pFillInfo;
43
  SSDataBlock*      pRes;
44
  SSDataBlock*      pFinalRes;
45
  int64_t           totalInputRows;
46
  void**            p;
47
  SSDataBlock*      existNewGroupBlock;
48
  STimeWindow       win;
49
  SColMatchInfo     matchInfo;
50
  int32_t           primaryTsCol;
51
  int32_t           primarySrcSlotId;
52
  uint64_t          curGroupId;  // current handled group id
53
  SExprInfo*        pExprInfo;
54
  int32_t           numOfExpr;
55
  SExprSupp         noFillExprSupp;
56
  SExprSupp         fillNullExprSupp;
57
  SList*            pFillSavedBlockList;
58
} SFillOperatorInfo;
59

60
static void destroyFillOperatorInfo(void* param);
61
static void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag);
62
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo);
63
static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order);
64

65
static void doHandleRemainBlockForNewGroupImpl(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
5,877✔
66
                                               SResultInfo* pResultInfo, int32_t order) {
67
  pInfo->totalInputRows = pInfo->existNewGroupBlock->info.rows;
5,877✔
68
  SSDataBlock*   pResBlock = pInfo->pFinalRes;
5,877✔
69
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
5,877✔
70

71
  //  int32_t order = TSDB_ORDER_ASC;
72
  int32_t scanFlag = MAIN_SCAN;
5,877✔
73
  //  getTableScanInfo(pOperator, &order, &scanFlag, false);
74
  taosResetFillInfo(pInfo->pFillInfo, getFillInfoStart(pInfo->pFillInfo));
5,877✔
75

76
  blockDataCleanup(pInfo->pRes);
5,877✔
77
  doApplyScalarCalculation(pOperator, pInfo->existNewGroupBlock, order, scanFlag);
5,877✔
78

79
  reviseFillStartAndEndKey(pOperator->info, order);
5,877✔
80

81
  int64_t ts = (order == TSDB_ORDER_ASC) ? pInfo->existNewGroupBlock->info.window.ekey
11,754✔
82
                                         : pInfo->existNewGroupBlock->info.window.skey;
5,877!
83
  taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
5,877✔
84

85
  taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
5,877✔
86
  if (pInfo->pFillInfo->type == TSDB_FILL_PREV || pInfo->pFillInfo->type == TSDB_FILL_LINEAR) {
5,877✔
87
    int32_t code = fillResetPrevForNewGroup(pInfo->pFillInfo);
144✔
88
    if (code != TSDB_CODE_SUCCESS) {
144!
UNCOV
89
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
90
      T_LONG_JMP(pTaskInfo->env, code);
×
91
    }
92
  }
93

94
  int32_t numOfResultRows = pResultInfo->capacity - pResBlock->info.rows;
5,877✔
95
  int32_t code = taosFillResultDataBlock2(pInfo->pFillInfo, pResBlock, numOfResultRows, NULL);
5,877✔
96
  if (code != TSDB_CODE_SUCCESS) {
5,877!
UNCOV
97
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
98
    T_LONG_JMP(pTaskInfo->env, code);
×
99
  }
100

101
  pInfo->curGroupId = pInfo->existNewGroupBlock->info.id.groupId;
5,877✔
102
  pInfo->existNewGroupBlock = NULL;
5,877✔
103
}
5,877✔
104

105
static void doHandleRemainBlockFromNewGroup(SOperatorInfo* pOperator, SFillOperatorInfo* pInfo,
26,732✔
106
                                            SResultInfo* pResultInfo, int32_t order) {
107
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
26,732✔
108
  if (taosFillHasMoreResults(pInfo->pFillInfo)) {
26,732✔
109
    int32_t numOfResultRows = pResultInfo->capacity - pInfo->pFinalRes->info.rows;
2,004✔
110
    int32_t code = taosFillResultDataBlock2(pInfo->pFillInfo, pInfo->pFinalRes, numOfResultRows, NULL);
2,004✔
111
    if (code != TSDB_CODE_SUCCESS) {
2,004!
UNCOV
112
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
113
      T_LONG_JMP(pTaskInfo->env, code);
×
114
    }
115
    pInfo->pRes->info.id.groupId = pInfo->curGroupId;
2,004✔
116
    return;
2,004✔
117
  }
118

119
  // handle the cached new group data block
120
  if (pInfo->existNewGroupBlock) {
24,726✔
121
    doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order);
5,877✔
122
  }
123
}
124

125
void doApplyScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pBlock, int32_t order, int32_t scanFlag) {
27,082✔
126
  int32_t            code = TSDB_CODE_SUCCESS;
27,082✔
127
  int32_t            lino = 0;
27,082✔
128
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
27,082✔
129
  SFillOperatorInfo* pInfo = pOperator->info;
27,082✔
130
  SExprSupp*         pSup = &pOperator->exprSupp;
27,082✔
131
  code = setInputDataBlock(pSup, pBlock, order, scanFlag, false);
27,082✔
132
  QUERY_CHECK_CODE(code, lino, _end);
27,083!
133
  code = projectApplyFunctions(pSup->pExprInfo, pInfo->pRes, pBlock, pSup->pCtx, pSup->numOfExprs, NULL);
27,083✔
134
  QUERY_CHECK_CODE(code, lino, _end);
27,081!
135

136
  // reset the row value before applying the no-fill functions to the input data block, which is "pBlock" in this case.
137
  pInfo->pRes->info.rows = 0;
27,081✔
138
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
27,081✔
139
  code = setInputDataBlock(pNoFillSupp, pBlock, order, scanFlag, false);
27,081✔
140
  QUERY_CHECK_CODE(code, lino, _end);
27,081!
141

142
  code = projectApplyFunctions(pNoFillSupp->pExprInfo, pInfo->pRes, pBlock, pNoFillSupp->pCtx, pNoFillSupp->numOfExprs,
27,081✔
143
                               NULL);
144
  QUERY_CHECK_CODE(code, lino, _end);
27,082!
145

146
  if (pInfo->fillNullExprSupp.pExprInfo) {
27,082✔
147
    pInfo->pRes->info.rows = 0;
118✔
148
    code = setInputDataBlock(&pInfo->fillNullExprSupp, pBlock, order, scanFlag, false);
118✔
149
    QUERY_CHECK_CODE(code, lino, _end);
118!
150
    code = projectApplyFunctions(pInfo->fillNullExprSupp.pExprInfo, pInfo->pRes, pBlock, pInfo->fillNullExprSupp.pCtx,
118✔
151
        pInfo->fillNullExprSupp.numOfExprs, NULL);
152
  }
153

154
  pInfo->pRes->info.id.groupId = pBlock->info.id.groupId;
27,081✔
155

156
_end:
27,081✔
157
  if (code != TSDB_CODE_SUCCESS) {
27,081!
UNCOV
158
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
159
    T_LONG_JMP(pTaskInfo->env, code);
×
160
  }
161
}
27,081✔
162

163
static int32_t fillResetPrevForNewGroup(SFillInfo* pFillInfo) {
144✔
164
  int32_t code = TSDB_CODE_SUCCESS;
144✔
165
  int32_t lino = 0;
144✔
166
  for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
1,063✔
167
    if (!pFillInfo->pFillCol[colIdx].notFillCol) {
919✔
168
      SGroupKeys* key = taosArrayGet(pFillInfo->prev.pRowVal, colIdx);
651✔
169
      QUERY_CHECK_NULL(key, code, lino, _end, terrno);
651!
170
      key->isNull = true;
651✔
171
    }
172
  }
173

174
_end:
144✔
175
  if (code != TSDB_CODE_SUCCESS) {
144!
UNCOV
176
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
177
  }
178
  return code;
144✔
179
}
180

181
static SSDataBlock* doFillImpl2(SOperatorInfo* pOperator) {
26,729✔
182
  int32_t            code = TSDB_CODE_SUCCESS;
26,729✔
183
  int32_t            lino = 0;
26,729✔
184
  SFillOperatorInfo* pInfo = pOperator->info;
26,729✔
185
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
26,729✔
186
  if (pInfo == NULL || pTaskInfo == NULL) {
26,729!
NEW
187
    qError("%s failed at line %d since pInfo or pTaskInfo is NULL.", __func__, __LINE__);
×
NEW
188
    return NULL;
×
189
  }
190

191
  SResultInfo* pResultInfo = &pOperator->resultInfo;
26,740✔
192
  SSDataBlock* pResBlock = pInfo->pFinalRes;
26,740✔
193
  if (pResBlock == NULL) {
26,740!
NEW
194
    qError("%s failed at line %d since pResBlock is NULL.", __func__, __LINE__);
×
NEW
195
    return NULL;
×
196
  }
197
  blockDataCleanup(pResBlock);
26,740✔
198
  int32_t        order = pInfo->pFillInfo->order;
26,738✔
199
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
26,738✔
200

201
  doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
26,738✔
202
  if (pResBlock->info.rows > 0) {
26,732✔
203
    pResBlock->info.id.groupId = pInfo->curGroupId;
620✔
204
    return pResBlock;
620✔
205
  }
206

207
  while (1) {
13,879✔
208
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
39,991✔
209
    if (pBlock == NULL) {
39,996✔
210
      if (pInfo->totalInputRows == 0 &&
18,793✔
211
          (pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) {
6,961✔
212
        setOperatorCompleted(pOperator);
6,941✔
213
        return NULL;
26,121✔
214
      } else if (pInfo->totalInputRows == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
11,852✔
215
        reviseFillStartAndEndKey(pInfo, order);
10✔
216
      }
217

218
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
11,852✔
219
    } else {
220
      pResBlock->info.scanFlag = pBlock->info.scanFlag;
21,203✔
221
      pBlock->info.dataLoad = 1;
21,203✔
222
      code = blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
21,203✔
223
      QUERY_CHECK_CODE(code, lino, _end);
21,205!
224

225
      blockDataCleanup(pInfo->pRes);
21,205✔
226
      code = blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
21,204✔
227
      QUERY_CHECK_CODE(code, lino, _end);
21,205!
228
      code = blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows);
21,205✔
229
      QUERY_CHECK_CODE(code, lino, _end);
21,206!
230
      doApplyScalarCalculation(pOperator, pBlock, order, pBlock->info.scanFlag);
21,206✔
231

232
      if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) {
36,534✔
233
        if (pInfo->curGroupId == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
15,328✔
234
          reviseFillStartAndEndKey(pInfo, order);
5,990✔
235
        }
236

237
        pInfo->curGroupId = pInfo->pRes->info.id.groupId;  // the first data block
15,328✔
238
        pInfo->totalInputRows += pInfo->pRes->info.rows;
15,328✔
239

240
        int64_t ts = (order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
15,328✔
241
        taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
15,328✔
242
        taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
15,329✔
243
      } else if (pInfo->curGroupId != pBlock->info.id.groupId) {  // the new group data block
5,877!
244
        pInfo->existNewGroupBlock = pBlock;
5,877✔
245

246
        // Fill the previous group data block, before handle the data block of new group.
247
        // Close the fill operation for previous group data block
248
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
5,877✔
249
        pInfo->pFillInfo->prev.key = 0;
5,877✔
250
      }
251
    }
252

253
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
33,058✔
254
    bool wantMoreBlock = false;
33,058✔
255
    code = taosFillResultDataBlock2(pInfo->pFillInfo, pResBlock, numOfResultRows, &wantMoreBlock);
33,058✔
256
    QUERY_CHECK_CODE(code, lino, _end);
33,057!
257

258
    // current group has no more result to return
259
    if (pResBlock->info.rows > 0) {
33,057✔
260
      // 1. The result in current group not reach the threshold of output result, continue
261
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
262
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
13,261!
263
        pResBlock->info.id.groupId = pInfo->curGroupId;
13,261✔
264
        return pResBlock;
13,261✔
265
      }
266

NEW
267
      doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
×
NEW
268
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
×
NEW
269
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
NEW
270
        return pResBlock;
×
271
      }
272
    } else if (pInfo->existNewGroupBlock) {  // try next group
19,796!
NEW
273
      blockDataCleanup(pResBlock);
×
274

NEW
275
      doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order);
×
NEW
276
      if (pResBlock->info.rows > pResultInfo->threshold) {
×
NEW
277
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
NEW
278
        return pResBlock;
×
279
      }
280
    } else {
281
      if (wantMoreBlock) continue;
19,796✔
282
      return NULL;
5,917✔
283
    }
284
  }
285

NEW
286
_end:
×
NEW
287
  if (code != TSDB_CODE_SUCCESS) {
×
NEW
288
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
NEW
289
    T_LONG_JMP(pTaskInfo->env, code);
×
290
  }
NEW
291
  return NULL;
×
292
}
293

UNCOV
294
static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
×
UNCOV
295
  int32_t            code = TSDB_CODE_SUCCESS;
×
UNCOV
296
  int32_t            lino = 0;
×
UNCOV
297
  SFillOperatorInfo* pInfo = pOperator->info;
×
UNCOV
298
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
×
UNCOV
299
  if (pInfo == NULL || pTaskInfo == NULL) {
×
UNCOV
300
    qError("%s failed at line %d since pInfo or pTaskInfo is NULL.", __func__, __LINE__);
×
301
    return NULL;
×
302
  }
303

UNCOV
304
  SResultInfo* pResultInfo = &pOperator->resultInfo;
×
UNCOV
305
  SSDataBlock* pResBlock = pInfo->pFinalRes;
×
UNCOV
306
  if (pResBlock == NULL) {
×
UNCOV
307
    qError("%s failed at line %d since pResBlock is NULL.", __func__, __LINE__);
×
308
    return NULL;
×
309
  }
310

UNCOV
311
  blockDataCleanup(pResBlock);
×
312

UNCOV
313
  int32_t order = pInfo->pFillInfo->order;
×
314

UNCOV
315
  SOperatorInfo* pDownstream = pOperator->pDownstream[0];
×
316
#if 0
317
  // the scan order may be different from the output result order for agg interval operator.
318
  if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL) {
319
    order = ((SIntervalAggOperatorInfo*) pDownstream->info)->resultTsOrder;
320
  } else {
321
    order = pInfo->pFillInfo->order;
322
  }
323
#endif
324

UNCOV
325
  doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
×
UNCOV
326
  if (pResBlock->info.rows > 0) {
×
UNCOV
327
    pResBlock->info.id.groupId = pInfo->curGroupId;
×
UNCOV
328
    return pResBlock;
×
329
  }
330

UNCOV
331
  while (1) {
×
UNCOV
332
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
×
UNCOV
333
    if (pBlock == NULL) {
×
UNCOV
334
      if (pInfo->totalInputRows == 0 &&
×
UNCOV
335
          (pInfo->pFillInfo->type != TSDB_FILL_NULL_F && pInfo->pFillInfo->type != TSDB_FILL_SET_VALUE_F)) {
×
UNCOV
336
        setOperatorCompleted(pOperator);
×
UNCOV
337
        return NULL;
×
UNCOV
338
      } else if (pInfo->totalInputRows == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
×
UNCOV
339
        reviseFillStartAndEndKey(pInfo, order);
×
340
      }
341

UNCOV
342
      taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
×
343
    } else {
UNCOV
344
      pResBlock->info.scanFlag = pBlock->info.scanFlag;
×
UNCOV
345
      pBlock->info.dataLoad = 1;
×
UNCOV
346
      code = blockDataUpdateTsWindow(pBlock, pInfo->primarySrcSlotId);
×
UNCOV
347
      QUERY_CHECK_CODE(code, lino, _end);
×
348

UNCOV
349
      blockDataCleanup(pInfo->pRes);
×
UNCOV
350
      code = blockDataEnsureCapacity(pInfo->pRes, pBlock->info.rows);
×
UNCOV
351
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
352
      code = blockDataEnsureCapacity(pInfo->pFinalRes, pBlock->info.rows);
×
UNCOV
353
      QUERY_CHECK_CODE(code, lino, _end);
×
UNCOV
354
      doApplyScalarCalculation(pOperator, pBlock, order, pBlock->info.scanFlag);
×
355

UNCOV
356
      if (pInfo->curGroupId == 0 || (pInfo->curGroupId == pInfo->pRes->info.id.groupId)) {
×
UNCOV
357
        if (pInfo->curGroupId == 0 && taosFillNotStarted(pInfo->pFillInfo)) {
×
UNCOV
358
          reviseFillStartAndEndKey(pInfo, order);
×
359
        }
360

UNCOV
361
        pInfo->curGroupId = pInfo->pRes->info.id.groupId;  // the first data block
×
UNCOV
362
        pInfo->totalInputRows += pInfo->pRes->info.rows;
×
363

UNCOV
364
        int64_t ts = (order == TSDB_ORDER_ASC) ? pBlock->info.window.ekey : pBlock->info.window.skey;
×
UNCOV
365
        taosFillSetStartInfo(pInfo->pFillInfo, pInfo->pRes->info.rows, ts);
×
UNCOV
366
        taosFillSetInputDataBlock(pInfo->pFillInfo, pInfo->pRes);
×
UNCOV
367
      } else if (pInfo->curGroupId != pBlock->info.id.groupId) {  // the new group data block
×
UNCOV
368
        pInfo->existNewGroupBlock = pBlock;
×
369

370
        // Fill the previous group data block, before handle the data block of new group.
371
        // Close the fill operation for previous group data block
UNCOV
372
        taosFillSetStartInfo(pInfo->pFillInfo, 0, pInfo->win.ekey);
×
UNCOV
373
        pInfo->pFillInfo->prev.key = 0;
×
374
      }
375
    }
376

UNCOV
377
    int32_t numOfResultRows = pOperator->resultInfo.capacity - pResBlock->info.rows;
×
UNCOV
378
    code = taosFillResultDataBlock(pInfo->pFillInfo, pResBlock, numOfResultRows);
×
UNCOV
379
    QUERY_CHECK_CODE(code, lino, _end);
×
380

381
    // current group has no more result to return
382
    if (pResBlock->info.rows > 0) {
×
383
      // 1. The result in current group not reach the threshold of output result, continue
384
      // 2. If multiple group results existing in one SSDataBlock is not allowed, return immediately
UNCOV
385
      if (pResBlock->info.rows > pResultInfo->threshold || pBlock == NULL || pInfo->existNewGroupBlock != NULL) {
×
UNCOV
386
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
387
        return pResBlock;
×
388
      }
389

390
      doHandleRemainBlockFromNewGroup(pOperator, pInfo, pResultInfo, order);
×
391
      if (pResBlock->info.rows >= pOperator->resultInfo.threshold || pBlock == NULL) {
×
392
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
UNCOV
393
        return pResBlock;
×
394
      }
UNCOV
395
    } else if (pInfo->existNewGroupBlock) {  // try next group
×
UNCOV
396
      blockDataCleanup(pResBlock);
×
397

UNCOV
398
      doHandleRemainBlockForNewGroupImpl(pOperator, pInfo, pResultInfo, order);
×
UNCOV
399
      if (pResBlock->info.rows > pResultInfo->threshold) {
×
400
        pResBlock->info.id.groupId = pInfo->curGroupId;
×
401
        return pResBlock;
×
402
      }
403
    } else {
UNCOV
404
      return NULL;
×
405
    }
406
  }
407

408
_end:
×
409
  if (code != TSDB_CODE_SUCCESS) {
×
410
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
411
    T_LONG_JMP(pTaskInfo->env, code);
×
412
  }
413
  return NULL;
×
414
}
415

416
static int32_t doFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
26,735✔
417
  int32_t            code = TSDB_CODE_SUCCESS;
26,735✔
418
  SFillOperatorInfo* pInfo = pOperator->info;
26,735✔
419
  SExecTaskInfo*     pTaskInfo = pOperator->pTaskInfo;
26,735✔
420

421
  if (pOperator->status == OP_EXEC_DONE) {
26,735!
422
    (*ppRes) = NULL;
×
UNCOV
423
    return code;
×
424
  }
425

426
  SSDataBlock* fillResult = NULL;
26,735✔
427
  while (true) {
428
    fillResult = doFillImpl2(pOperator);
26,735✔
429
    if (fillResult == NULL) {
26,740✔
430
      setOperatorCompleted(pOperator);
12,860✔
431
      break;
12,861✔
432
    }
433

434
    code = doFilter(fillResult, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
13,880✔
435
    if (code != TSDB_CODE_SUCCESS) {
13,881!
UNCOV
436
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
437
      pTaskInfo->code = code;
×
UNCOV
438
      T_LONG_JMP(pTaskInfo->env, code);
×
439
    }
440
    if (fillResult->info.rows > 0) {
13,881!
441
      break;
13,881✔
442
    }
443
  }
444

445
  if (fillResult != NULL) {
26,742✔
446
    pOperator->resultInfo.totalRows += fillResult->info.rows;
13,881✔
447
  }
448

449
  (*ppRes) = fillResult;
26,742✔
450
  return code;
26,742✔
451
}
452

453
void destroyFillOperatorInfo(void* param) {
12,939✔
454
  SFillOperatorInfo* pInfo = (SFillOperatorInfo*)param;
12,939✔
455
  pInfo->pFillInfo = taosDestroyFillInfo(pInfo->pFillInfo);
12,939✔
456
  blockDataDestroy(pInfo->pRes);
12,942✔
457
  pInfo->pRes = NULL;
12,943✔
458
  blockDataDestroy(pInfo->pFinalRes);
12,943✔
459
  pInfo->pFinalRes = NULL;
12,943✔
460

461
  cleanupExprSupp(&pInfo->noFillExprSupp);
12,943✔
462
  cleanupExprSupp(&pInfo->fillNullExprSupp);
12,941✔
463

464
  taosMemoryFreeClear(pInfo->p);
12,940!
465
  taosArrayDestroy(pInfo->matchInfo.pList);
12,942✔
466
  taosMemoryFreeClear(param);
12,942!
467
}
12,944✔
468

469
static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t numOfCols, SExprInfo* pNotFillExpr,
12,936✔
470
                            int32_t numOfNotFillCols, SExprInfo* pFillNullExpr, int32_t numOfFillNullExprs,
471
                            SNodeListNode* pValNode, STimeWindow win, int32_t capacity, const char* id,
472
                            SInterval* pInterval, int32_t fillType, int32_t order, SExecTaskInfo* pTaskInfo) {
473
  SFillColInfo* pColInfo =
474
      createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pFillNullExpr, numOfFillNullExprs, pValNode);
12,936✔
475
  if (!pColInfo) {
12,941✔
476
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
2!
477
    return terrno;
2✔
478
  }
479

480
  int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey;
12,939✔
481

482
  //  STimeWindow w = {0};
483
  //  getInitialStartTimeWindow(pInterval, startKey, &w, order == TSDB_ORDER_ASC);
484
  pInfo->pFillInfo = NULL;
12,939✔
485
  int32_t code = taosCreateFillInfo(startKey, numOfCols, numOfNotFillCols, numOfFillNullExprs, capacity, pInterval,
12,939✔
486
                                    fillType, pColInfo, pInfo->primaryTsCol, order, id, pTaskInfo, &pInfo->pFillInfo);
12,939✔
487
  if (code != TSDB_CODE_SUCCESS) {
12,940✔
488
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
1!
UNCOV
489
    return code;
×
490
  }
491

492
  if (order == TSDB_ORDER_ASC) {
12,939✔
493
    pInfo->win.skey = win.skey;
11,510✔
494
    pInfo->win.ekey = win.ekey;
11,510✔
495
  } else {
496
    pInfo->win.skey = win.ekey;
1,429✔
497
    pInfo->win.ekey = win.skey;
1,429✔
498
  }
499
  pInfo->p = taosMemoryCalloc(numOfCols, POINTER_BYTES);
12,939!
500
  if (!pInfo->p) {
12,940!
501
    return terrno;
×
502
  }
503

504
  if (pInfo->pFillInfo == NULL) {
12,940!
505
    taosMemoryFree(pInfo->pFillInfo);
×
506
    taosMemoryFree(pInfo->p);
×
507
    return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
508
  } else {
509
    return TSDB_CODE_SUCCESS;
12,940✔
510
  }
511
}
512

513
static bool isWstartColumnExist(SFillOperatorInfo* pInfo) {
12,936✔
514
  if (pInfo->noFillExprSupp.numOfExprs == 0) {
12,936✔
515
    return false;
194✔
516
  }
517

518
  for (int32_t i = 0; i < pInfo->noFillExprSupp.numOfExprs; ++i) {
18,709✔
519
    SExprInfo* exprInfo = pInfo->noFillExprSupp.pExprInfo + i;
18,641✔
520
    if (exprInfo->pExpr->nodeType == QUERY_NODE_COLUMN && exprInfo->base.numOfParams == 1 &&
18,641!
521
        exprInfo->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
18,641✔
522
      return true;
12,674✔
523
    }
524
  }
525
  return false;
68✔
526
}
527

528
static int32_t createPrimaryTsExprIfNeeded(SFillOperatorInfo* pInfo, SFillPhysiNode* pPhyFillNode, SExprSupp* pExprSupp,
12,937✔
529
                                           const char* idStr) {
530
  bool wstartExist = isWstartColumnExist(pInfo);
12,937✔
531

532
  if (wstartExist == false) {
12,939✔
533
    if (pPhyFillNode->pWStartTs->type != QUERY_NODE_TARGET) {
264!
UNCOV
534
      qError("pWStartTs of fill physical node is not a target node, %s", idStr);
×
UNCOV
535
      return TSDB_CODE_QRY_SYS_ERROR;
×
536
    }
537

538
    SExprInfo* pExpr = taosMemoryRealloc(pExprSupp->pExprInfo, (pExprSupp->numOfExprs + 1) * sizeof(SExprInfo));
264!
539
    if (pExpr == NULL) {
264!
UNCOV
540
      return terrno;
×
541
    }
542

543
    int32_t code = createExprFromTargetNode(&pExpr[pExprSupp->numOfExprs], (STargetNode*)pPhyFillNode->pWStartTs);
264✔
544
    if (code != TSDB_CODE_SUCCESS) {
264!
UNCOV
545
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
UNCOV
546
      pExprSupp->numOfExprs += 1;
×
UNCOV
547
      pExprSupp->pExprInfo = pExpr;
×
UNCOV
548
      return code;
×
549
    }
550

551
    pExprSupp->numOfExprs += 1;
264✔
552
    pExprSupp->pExprInfo = pExpr;
264✔
553
  }
554

555
  return TSDB_CODE_SUCCESS;
12,939✔
556
}
557

558
int32_t createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode* pPhyFillNode,
12,923✔
559
                                      SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
560
  QRY_PARAM_CHECK(pOptrInfo);
12,923!
561
  int32_t code = 0;
12,923✔
562
  int32_t lino = 0;
12,923✔
563

564
  SFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SFillOperatorInfo));
12,923!
565
  SOperatorInfo*     pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
12,936!
566
  if (pInfo == NULL || pOperator == NULL) {
12,937!
UNCOV
567
    code = terrno;
×
UNCOV
568
    goto _error;
×
569
  }
570

571
  pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
12,938✔
572
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
12,941!
573
  SExprInfo* pExprInfo = NULL;
12,941✔
574

575
  code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pExprInfo, &pInfo->numOfExpr);
12,941✔
576
  QUERY_CHECK_CODE(code, lino, _error);
12,939!
577

578
  pOperator->exprSupp.pExprInfo = pExprInfo;
12,939✔
579
  pOperator->exprSupp.numOfExprs = pInfo->numOfExpr;
12,939✔
580

581
  SExprSupp* pNoFillSupp = &pInfo->noFillExprSupp;
12,939✔
582
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &pNoFillSupp->pExprInfo, &pNoFillSupp->numOfExprs);
12,939✔
583
  QUERY_CHECK_CODE(code, lino, _error);
12,939!
584

585
  code = createPrimaryTsExprIfNeeded(pInfo, pPhyFillNode, pNoFillSupp, pTaskInfo->id.str);
12,939✔
586
  QUERY_CHECK_CODE(code, lino, _error);
12,938!
587

588
  code =
589
      initExprSupp(pNoFillSupp, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs, &pTaskInfo->storageAPI.functionStore);
12,938✔
590
  QUERY_CHECK_CODE(code, lino, _error);
12,939!
591

592
  code = createExprInfo(pPhyFillNode->pFillNullExprs, NULL, &pInfo->fillNullExprSupp.pExprInfo,
12,939✔
593
                        &pInfo->fillNullExprSupp.numOfExprs);
594
  QUERY_CHECK_CODE(code, lino, _error);
12,940!
595
  code = initExprSupp(&pInfo->fillNullExprSupp, pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs,
12,940✔
596
                      &pTaskInfo->storageAPI.functionStore);
597
  QUERY_CHECK_CODE(code, lino, _error);
12,941!
598

599
  SInterval* pInterval =
12,941✔
600
      QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL == downstream->operatorType
12,941✔
601
          ? &((SMergeAlignedIntervalAggOperatorInfo*)downstream->info)->intervalAggOperatorInfo->interval
4,160✔
602
          : &((SIntervalAggOperatorInfo*)downstream->info)->interval;
12,941✔
603

604
  int32_t order = (pPhyFillNode->node.inputTsOrder == ORDER_ASC) ? TSDB_ORDER_ASC : TSDB_ORDER_DESC;
12,941✔
605
  int32_t type = convertFillType(pPhyFillNode->mode);
12,941✔
606

607
  SResultInfo* pResultInfo = &pOperator->resultInfo;
12,940✔
608

609
  initResultSizeInfo(&pOperator->resultInfo, 4096);
12,940✔
610
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
12,939✔
611
  if (code != TSDB_CODE_SUCCESS) {
12,942!
UNCOV
612
    goto _error;
×
613
  }
614
  code = initExprSupp(&pOperator->exprSupp, pExprInfo, pInfo->numOfExpr, &pTaskInfo->storageAPI.functionStore);
12,942✔
615
  if (code != TSDB_CODE_SUCCESS) {
12,938!
UNCOV
616
    goto _error;
×
617
  }
618

619
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
12,938✔
620
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
12,938✔
621

622
  int32_t numOfOutputCols = 0;
12,938✔
623
  code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
12,938✔
624
                             COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
625

626
  QUERY_CHECK_CODE(code, lino, _error);
12,935!
627
  code = initFillInfo(pInfo, pExprInfo, pInfo->numOfExpr, pNoFillSupp->pExprInfo, pNoFillSupp->numOfExprs,
12,935✔
628
                      pInfo->fillNullExprSupp.pExprInfo, pInfo->fillNullExprSupp.numOfExprs,
629
                      (SNodeListNode*)pPhyFillNode->pValues, pPhyFillNode->timeRange, pResultInfo->capacity,
12,935✔
630
                      pTaskInfo->id.str, pInterval, type, order, pTaskInfo);
12,935✔
631
  if (code != TSDB_CODE_SUCCESS) {
12,939!
UNCOV
632
    goto _error;
×
633
  }
634

635
  pInfo->pFinalRes = NULL;
12,939✔
636

637
  code = createOneDataBlock(pInfo->pRes, false, &pInfo->pFinalRes);
12,939✔
638
  if (code) {
12,941!
UNCOV
639
    goto _error;
×
640
  }
641

642
  code = blockDataEnsureCapacity(pInfo->pFinalRes, pOperator->resultInfo.capacity);
12,941✔
643
  if (code != TSDB_CODE_SUCCESS) {
12,942!
UNCOV
644
    goto _error;
×
645
  }
646

647
  code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
12,942✔
648
  if (code != TSDB_CODE_SUCCESS) {
12,941!
649
    goto _error;
×
650
  }
651

652
  setOperatorInfo(pOperator, "FillOperator", QUERY_NODE_PHYSICAL_PLAN_FILL, false, OP_NOT_OPENED, pInfo, pTaskInfo);
12,941✔
653
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doFillNext, NULL, destroyFillOperatorInfo, optrDefaultBufFn, NULL,
12,941✔
654
                                         optrDefaultGetNextExtFn, NULL);
655

656
  code = appendDownstream(pOperator, &downstream, 1);
12,942✔
657
  if (code != TSDB_CODE_SUCCESS) {
12,941!
UNCOV
658
    goto _error;
×
659
  }
660

661
  *pOptrInfo = pOperator;
12,941✔
662
  return TSDB_CODE_SUCCESS;
12,941✔
663

UNCOV
664
_error:
×
UNCOV
665
  qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
666

UNCOV
667
  if (pInfo != NULL) {
×
UNCOV
668
    destroyFillOperatorInfo(pInfo);
×
669
  }
UNCOV
670
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
UNCOV
671
  pTaskInfo->code = code;
×
UNCOV
672
  return code;
×
673
}
674

675
static void reviseFillStartAndEndKey(SFillOperatorInfo* pInfo, int32_t order) {
11,877✔
676
  int64_t skey, ekey, next;
677
  if (order == TSDB_ORDER_ASC) {
11,877✔
678
    skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval);
11,203✔
679
    taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
11,203✔
680

681
    ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval);
11,203✔
682
    next = ekey;
11,204✔
683
    while (next < pInfo->win.ekey) {
65,122✔
684
      next = taosTimeAdd(ekey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit,
53,919✔
685
                         pInfo->pFillInfo->interval.precision, NULL);
53,919✔
686
      ekey = next > pInfo->win.ekey ? ekey : next;
53,918✔
687
    }
688
    pInfo->win.ekey = ekey;
11,203✔
689
  } else {
690
    skey = taosTimeTruncate(pInfo->win.skey, &pInfo->pFillInfo->interval);
674✔
691
    next = skey;
674✔
692
    while (next < pInfo->win.skey) {
5,277✔
693
      next = taosTimeAdd(skey, pInfo->pFillInfo->interval.sliding, pInfo->pFillInfo->interval.slidingUnit,
4,603✔
694
                         pInfo->pFillInfo->interval.precision, NULL);
4,603✔
695
      skey = next > pInfo->win.skey ? skey : next;
4,603✔
696
    }
697
    taosFillUpdateStartTimestampInfo(pInfo->pFillInfo, skey);
674✔
698
    pInfo->win.ekey = taosTimeTruncate(pInfo->win.ekey, &pInfo->pFillInfo->interval);
674✔
699
  }
700
}
11,877✔
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc