• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4473

08 Jul 2025 09:38AM UTC coverage: 62.922% (+0.7%) from 62.22%
#4473

push

travis-ci

web-flow
Merge pull request #31712 from taosdata/merge/mainto3.0

merge: from main to 3.0 branch

158525 of 321496 branches covered (49.31%)

Branch coverage included in aggregate %.

56 of 60 new or added lines in 13 files covered. (93.33%)

1333 existing lines in 67 files now uncovered.

245526 of 320647 relevant lines covered (76.57%)

17689640.25 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

56.5
/source/libs/executor/src/tfill.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "os.h"
18
#include "query.h"
19
#include "taosdef.h"
20
#include "tmsg.h"
21
#include "ttypes.h"
22

23
#include "executorInt.h"
24
#include "tcommon.h"
25
#include "thash.h"
26
#include "ttime.h"
27

28
#include "executorInt.h"
29
#include "function.h"
30
#include "querynodes.h"
31
#include "querytask.h"
32
#include "tdatablock.h"
33
#include "tfill.h"
34

35
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
36
#define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) \
37
  ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1))))
38

39
static int32_t doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
40

41
static bool setNotFillColumn(SFillInfo* pFillInfo, SColumnInfoData* pDstColInfo, int32_t rowIndex, int32_t colIdx) {
901,038,015✔
42
  SFillColInfo* pCol = &pFillInfo->pFillCol[colIdx];
901,038,015✔
43
  if (pCol->fillNull) {
901,038,015✔
44
    colDataSetNULL(pDstColInfo, rowIndex);
787,379,739!
45
  } else {
46
    SRowVal* p = NULL;
113,658,276✔
47
    bool ascNext = false, descPrev = false;
113,658,276✔
48
    if (pFillInfo->type == TSDB_FILL_NEXT) {
113,658,276✔
49
      p = FILL_IS_ASC_FILL(pFillInfo) ? &pFillInfo->next : &pFillInfo->prev;
59,261,823✔
50
      if (FILL_IS_ASC_FILL(pFillInfo)) ascNext = true;
59,261,823✔
51
    } else {
52
      p = FILL_IS_ASC_FILL(pFillInfo) ? &pFillInfo->prev : &pFillInfo->next;
54,396,453✔
53
      if (!FILL_IS_ASC_FILL(pFillInfo)) descPrev = true;
54,396,453✔
54
    }
55

56
    const bool* pNullValueFlag = taosArrayGet(p->pNullValueFlag, colIdx);
113,658,276✔
57
    if (*pNullValueFlag && pFillInfo->numOfRows > 0) {
120,108,979✔
58
      if (ascNext || descPrev) return true;
8,713,510✔
59
    }
60

61
    SGroupKeys* pKey = taosArrayGet(p->pRowVal, colIdx);
114,848,142✔
62
    if (!pKey) {
114,839,329!
63
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
64
      T_LONG_JMP(pFillInfo->pTaskInfo->env, terrno);
×
65
    }
66
    int32_t     code = doSetVal(pDstColInfo, rowIndex, pKey);
114,839,329✔
67
    if (code != TSDB_CODE_SUCCESS) {
115,526,687✔
68
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
2,292,145!
69
      T_LONG_JMP(pFillInfo->pTaskInfo->env, code);
2,292,145!
70
    }
71
  }
72
  return false;
900,614,281✔
73
}
74

75
static void setNullCol(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowIdx, int32_t colIdx) {
634,625,177✔
76
  SFillColInfo*    pCol = &pFillInfo->pFillCol[colIdx];
634,625,177✔
77
  int32_t          dstSlotId = GET_DEST_SLOT_ID(pCol);
634,625,177✔
78
  SColumnInfoData* pDstColInfo = taosArrayGet(pBlock->pDataBlock, dstSlotId);
634,625,177✔
79
  if (pCol->notFillCol) {
628,282,685✔
80
    bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfo, rowIdx);
331,852,756✔
81
    if (!filled) {
343,368,862✔
82
      setNotFillColumn(pFillInfo, pDstColInfo, rowIdx, colIdx);
101,716,626✔
83
    }
84
  } else {
85
    colDataSetNULL(pDstColInfo, rowIdx);
296,429,929✔
86
  }
87
}
639,892,977✔
88

89
static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowIndex) {
×
90
  for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
×
91
    setNullCol(pBlock, pFillInfo, rowIndex, i);
×
92
  }
93
}
×
94

95

96
static int32_t doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32_t rowIndex, int64_t currentKey) {
1,436,369,314✔
97
  int32_t code = TSDB_CODE_SUCCESS;
1,436,369,314✔
98
  int32_t lino = 0;
1,436,369,314✔
99
  bool    isNull = (TSDB_DATA_TYPE_NULL == pVar->nType) ? true : false;
1,436,369,314✔
100
  if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
1,436,369,314✔
101
    float v = 0;
131,465,974✔
102
    GET_TYPED_DATA(v, float, pVar->nType, &pVar->f, typeGetTypeModFromColInfo(&pDst->info));
131,465,974!
103
    code = colDataSetVal(pDst, rowIndex, (char*)&v, isNull);
131,465,974✔
104
    QUERY_CHECK_CODE(code, lino, _end);
131,465,974!
105
  } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
1,304,903,340✔
106
    double v = 0;
577,480,067✔
107
    GET_TYPED_DATA(v, double, pVar->nType, &pVar->d, typeGetTypeModFromColInfo(&pDst->info));
577,480,067!
108
    code = colDataSetVal(pDst, rowIndex, (char*)&v, isNull);
577,480,067✔
109
    QUERY_CHECK_CODE(code, lino, _end);
581,073,032!
110
  } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type) || pDst->info.type == TSDB_DATA_TYPE_BOOL) {
1,143,608,878!
111
    int64_t v = 0;
252,654,456✔
112
    GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i, typeGetTypeModFromColInfo(&pDst->info));
252,654,456!
113
    code = colDataSetVal(pDst, rowIndex, (char*)&v, isNull);
252,654,456✔
114
    QUERY_CHECK_CODE(code, lino, _end);
416,185,605!
115
  } else if (IS_UNSIGNED_NUMERIC_TYPE(pDst->info.type)) {
474,769,206✔
116
    uint64_t v = 0;
389✔
117
    GET_TYPED_DATA(v, uint64_t, pVar->nType, &pVar->u, typeGetTypeModFromColInfo(&pDst->info));
389!
118
    code = colDataSetVal(pDst, rowIndex, (char*)&v, isNull);
389✔
119
    QUERY_CHECK_CODE(code, lino, _end);
389!
120
  } else if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
474,768,428✔
121
    int64_t v = 0;
206,461,074✔
122
    GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->u, typeGetTypeModFromColInfo(&pDst->info));
206,461,074!
123
    code = colDataSetVal(pDst, rowIndex, (const char*)&v, isNull);
206,461,074✔
124
    QUERY_CHECK_CODE(code, lino, _end);
206,461,074!
125
  } else if (pDst->info.type == TSDB_DATA_TYPE_NCHAR || pDst->info.type == TSDB_DATA_TYPE_VARCHAR ||
268,307,354✔
126
             pDst->info.type == TSDB_DATA_TYPE_VARBINARY) {
854✔
127
    code = colDataSetVal(pDst, rowIndex, pVar->pz, isNull);
268,306,524✔
128
    QUERY_CHECK_CODE(code, lino, _end);
279,942,325!
129
  } else if (pDst->info.type == TSDB_DATA_TYPE_DECIMAL64) {
830!
130
    code = colDataSetVal(pDst, rowIndex, (char*)&pVar->i, isNull);
×
131
  } else if (pDst->info.type == TSDB_DATA_TYPE_DECIMAL) {
830!
132
    code = colDataSetVal(pDst, rowIndex, (char*)pVar->pz, isNull);
830✔
133
  } else {  // others data
134
    colDataSetNULL(pDst, rowIndex);
×
135
  }
136

137
_end:
1,615,129,229✔
138
  if (code != TSDB_CODE_SUCCESS) {
1,615,129,229!
139
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
140
  }
141
  return code;
1,462,040,284✔
142
}
143

144
// fill windows pseudo column, _wstart, _wend, _wduration and return true, otherwise return false
145
bool fillIfWindowPseudoColumn(SFillInfo* pFillInfo, SFillColInfo* pCol, SColumnInfoData* pDstColInfoData,
2,147,483,647✔
146
                              int32_t rowIndex) {
147
  int32_t code = TSDB_CODE_SUCCESS;
2,147,483,647✔
148
  int32_t lino = 0;
2,147,483,647✔
149
  if (!pCol->notFillCol) {
2,147,483,647✔
150
    return false;
106,398,602✔
151
  }
152
  if (pCol->pExpr->pExpr->nodeType == QUERY_NODE_COLUMN) {
2,147,483,647!
153
    if (pCol->pExpr->base.numOfParams != 1) {
2,147,483,647!
154
      return false;
×
155
    }
156
    if (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
2,147,483,647✔
157
      code = colDataSetVal(pDstColInfoData, rowIndex, (const char*)&pFillInfo->currentKey, false);
1,677,981,499✔
158
      QUERY_CHECK_CODE(code, lino, _end);
1,732,860,392!
159
      return true;
1,732,860,392✔
160
    } else if (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_END) {
620,287,716✔
161
      // TODO: include endpoint
162
      SInterval* pInterval = &pFillInfo->interval;
14,032,530✔
163
      int64_t    windowEnd =
14,075,917✔
164
          taosTimeAdd(pFillInfo->currentKey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL);
14,032,530✔
165
      code = colDataSetVal(pDstColInfoData, rowIndex, (const char*)&windowEnd, false);
14,075,917✔
166
      QUERY_CHECK_CODE(code, lino, _end);
14,041,841!
167
      return true;
14,041,841✔
168
    } else if (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_DURATION) {
606,255,186!
169
      // TODO: include endpoint
170
      code = colDataSetVal(pDstColInfoData, rowIndex, (const char*)&pFillInfo->interval.sliding, false);
×
171
      QUERY_CHECK_CODE(code, lino, _end);
×
172
      return true;
×
173
    } else if (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_IS_WINDOW_FILLED) {
606,255,186!
174
      code = colDataSetVal(pDstColInfoData, rowIndex, (const char*)&pFillInfo->isFilled, false);
×
175
      QUERY_CHECK_CODE(code, lino, _end);
×
176
      return true;
×
177
    }
178
  }
179

180
_end:
573,167,636✔
181
  if (code != TSDB_CODE_SUCCESS) {
573,167,636!
182
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
183
    T_LONG_JMP(pFillInfo->pTaskInfo->env, code);
×
184
  }
185
  return false;
573,167,636✔
186
}
187

188
static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* pSrcBlock, int64_t ts,
×
189
                         bool outOfBound) {
190
  int32_t code = TSDB_CODE_SUCCESS;
×
191
  int32_t lino = 0;
×
192
  SPoint  point1, point2, point;
193
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
×
194

195
  // set the primary timestamp column value
196
  int32_t index = pBlock->info.rows;
×
197

198
  // set the other values
199
  if (pFillInfo->type == TSDB_FILL_PREV) {
×
200
    for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
×
201
      SFillColInfo* pCol = &pFillInfo->pFillCol[i];
×
202

203
      SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
×
204
      bool             filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfoData, index);
×
205
      if (!filled) {
×
206
        setNotFillColumn(pFillInfo, pDstColInfoData, index, i);
×
207
      }
208
    }
209
  } else if (pFillInfo->type == TSDB_FILL_NEXT) {
×
210
    // todo  refactor: start from 0 not 1
211
    for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
×
212
      SFillColInfo*    pCol = &pFillInfo->pFillCol[i];
×
213
      SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
×
214
      bool             filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfoData, index);
×
215
      if (!filled) {
×
216
        setNotFillColumn(pFillInfo, pDstColInfoData, index, i);
×
217
      }
218
    }
219
  } else if (pFillInfo->type == TSDB_FILL_LINEAR) {
×
220
    // TODO : linear interpolation supports NULL value
221
    if (outOfBound) {
×
222
      setNullRow(pBlock, pFillInfo, index);
×
223
    } else {
224
      for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
×
225
        SFillColInfo* pCol = &pFillInfo->pFillCol[i];
×
226

227
        int32_t          dstSlotId = GET_DEST_SLOT_ID(pCol);
×
228
        SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
×
229
        int16_t          type = pDstCol->info.type;
×
230

231
        if (pCol->notFillCol) {
×
232
          bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstCol, index);
×
233
          if (!filled) {
×
234
            setNotFillColumn(pFillInfo, pDstCol, index, i);
×
235
          }
236
        } else {
237
          SRowVal*    pRVal = &pFillInfo->prev;
×
238
          SGroupKeys* pKey = taosArrayGet(pRVal->pRowVal, i);
×
239
          if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
×
240
            colDataSetNULL(pDstCol, index);
×
241
            continue;
×
242
          }
243

244
          SGroupKeys* pKey1 = taosArrayGet(pRVal->pRowVal, pFillInfo->tsSlotId);
×
245

246
          int64_t prevTs = *(int64_t*)pKey1->pData;
×
247
          int32_t srcSlotId = GET_DEST_SLOT_ID(pCol);
×
248

249
          SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
×
250
          char*            data = colDataGetData(pSrcCol, pFillInfo->index);
×
251

252
          point1 = (SPoint){.key = prevTs, .val = pKey->pData};
×
253
          point2 = (SPoint){.key = ts, .val = data};
×
254

255
          int64_t out = 0;
×
256
          point = (SPoint){.key = pFillInfo->currentKey, .val = &out};
×
257
          taosGetLinearInterpolationVal(&point, type, &point1, &point2, type, typeGetTypeModFromColInfo(&pDstCol->info));
×
258

259
          code = colDataSetVal(pDstCol, index, (const char*)&out, false);
×
260
          QUERY_CHECK_CODE(code, lino, _end);
×
261
        }
262
      }
263
    }
264
  } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {  // fill with NULL
×
265
    setNullRow(pBlock, pFillInfo, index);
×
266
  } else {  // fill with user specified value for each column
267
    for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
×
268
      SFillColInfo* pCol = &pFillInfo->pFillCol[i];
×
269

270
      int32_t          slotId = GET_DEST_SLOT_ID(pCol);
×
271
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, slotId);
×
272

273
      if (pCol->notFillCol) {
×
274
        bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDst, index);
×
275
        if (!filled) {
×
276
          setNotFillColumn(pFillInfo, pDst, index, i);
×
277
        }
278
      } else {
279
        SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
×
280
        code = doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey);
×
281
        QUERY_CHECK_CODE(code, lino, _end);
×
282
      }
283
    }
284
  }
285

286
  //  setTagsValue(pFillInfo, data, index);
287
  SInterval* pInterval = &pFillInfo->interval;
×
288
  pFillInfo->currentKey =
×
289
      taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision, NULL);
×
290
  pBlock->info.rows += 1;
×
291
  pFillInfo->numOfCurrent++;
×
292

293
_end:
×
294
  if (code != TSDB_CODE_SUCCESS) {
×
295
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
296
    T_LONG_JMP(pFillInfo->pTaskInfo->env, code);
×
297
  }
298
}
×
299

300
int32_t doSetVal(SColumnInfoData* pDstCol, int32_t rowIndex, const SGroupKeys* pKey) {
114,917,056✔
301
  int32_t code = TSDB_CODE_SUCCESS;
114,917,056✔
302
  int32_t lino = 0;
114,917,056✔
303
  if (pKey->isNull) {
114,917,056✔
304
    colDataSetNULL(pDstCol, rowIndex);
27,509,485✔
305
  } else {
306
    code = colDataSetVal(pDstCol, rowIndex, pKey->pData, false);
87,407,571✔
307
    QUERY_CHECK_CODE(code, lino, _end);
88,066,763!
308
  }
309

310
_end:
88,066,763✔
311
  if (code != TSDB_CODE_SUCCESS) {
115,576,248!
312
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
313
  }
314
  return code;
115,526,290✔
315
}
316

317
static int32_t initBeforeAfterDataBuf(SFillInfo* pFillInfo) {
489,680✔
318
  int32_t code = TSDB_CODE_SUCCESS;
489,680✔
319
  int32_t lino = 0;
489,680✔
320
  if (taosArrayGetSize(pFillInfo->next.pRowVal) > 0) {
489,680!
321
    goto _end;
×
322
  }
323

324
  for (int i = 0; i < pFillInfo->numOfCols; i++) {
1,688,099✔
325
    SFillColInfo* pCol = &pFillInfo->pFillCol[i];
1,198,415✔
326

327
    SGroupKeys  key = {0};
1,198,415✔
328
    SResSchema* pSchema = &pCol->pExpr->base.resSchema;
1,198,415✔
329
    key.pData = taosMemoryMalloc(pSchema->bytes);
1,198,415✔
330
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
1,198,496!
331
    key.isNull = true;
1,198,496✔
332
    key.bytes = pSchema->bytes;
1,198,496✔
333
    key.type = pSchema->type;
1,198,496✔
334
    bool nullValueFlag = false;
1,198,496✔
335

336
    void* tmp = taosArrayPush(pFillInfo->next.pRowVal, &key);
1,198,496✔
337
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,198,482!
338

339
    tmp = taosArrayPush(pFillInfo->next.pNullValueFlag, &nullValueFlag);
1,198,482✔
340
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,198,427!
341

342
    key.pData = taosMemoryMalloc(pSchema->bytes);
1,198,427!
343
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
1,198,506!
344

345
    tmp = taosArrayPush(pFillInfo->prev.pRowVal, &key);
1,198,506✔
346
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,198,490!
347

348
    tmp = taosArrayPush(pFillInfo->prev.pNullValueFlag, &nullValueFlag);
1,198,490✔
349
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
1,198,431!
350
  }
351

352
_end:
489,684✔
353
  if (code != TSDB_CODE_SUCCESS) {
489,684!
354
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
355
  }
356
  return code;
489,682✔
357
}
358

359
static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull);
360

361
static int32_t copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SRowVal* pRowVal, bool reset) {
×
362
  int32_t          code = TSDB_CODE_SUCCESS;
×
363
  int32_t          lino = 0;
×
364
  SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
×
365
  QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
×
366
  pRowVal->key = ((int64_t*)pTsCol->pData)[rowIndex];
×
367

368
  for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
×
369
    int32_t type = pFillInfo->pFillCol[i].pExpr->pExpr->nodeType;
×
370
    if (type == QUERY_NODE_COLUMN || type == QUERY_NODE_OPERATOR || type == QUERY_NODE_FUNCTION) {
×
371
      if (!pFillInfo->pFillCol[i].notFillCol) {
×
372
        if (FILL_IS_ASC_FILL(pFillInfo) && pFillInfo->type != TSDB_FILL_NEXT) continue;
×
373
        if (!FILL_IS_ASC_FILL(pFillInfo) && pFillInfo->type != TSDB_FILL_PREV) continue;
×
374
      }
375
      int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]);
×
376

377
      if (srcSlotId == pFillInfo->srcTsSlotId && pFillInfo->type == TSDB_FILL_LINEAR) {
×
378
        continue;
×
379
      }
380

381
      SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
×
382
      QUERY_CHECK_NULL(pSrcCol, code, lino, _end, terrno);
×
383

384
      bool  isNull = colDataIsNull_s(pSrcCol, rowIndex);
×
385
      char* p = colDataGetData(pSrcCol, rowIndex);
×
386

387
      saveColData(pRowVal->pRowVal, i, p, reset ? true : isNull);
×
388
    } else {
389
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
390
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
391
      QUERY_CHECK_CODE(code, lino, _end);
×
392
    }
393
  }
394

395
_end:
×
396
  if (code != TSDB_CODE_SUCCESS) {
×
397
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
398
  }
399
  return code;
×
400
}
401

402
static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) {
×
403
  pFillInfo->numOfCurrent = 0;
×
404
  int32_t          code = TSDB_CODE_SUCCESS;
×
405
  int32_t          lino = 0;
×
406
  SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
×
407

408
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
×
409
  bool    ascFill = FILL_IS_ASC_FILL(pFillInfo);
×
410

411
  while (pFillInfo->numOfCurrent < outputRows) {
×
412
    int64_t ts = ((int64_t*)pTsCol->pData)[pFillInfo->index];
×
413

414
    // set the next value for interpolation
415
    if (pFillInfo->currentKey < ts && ascFill) {
×
416
      SRowVal* pRVal = pFillInfo->type == TSDB_FILL_NEXT ? &pFillInfo->next : &pFillInfo->prev;
×
417
      code = copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pRVal, false);
×
418
      QUERY_CHECK_CODE(code, lino, _end);
×
419
    } else if (pFillInfo->currentKey > ts && !ascFill) {
×
420
      SRowVal* pRVal = pFillInfo->type == TSDB_FILL_NEXT ? &pFillInfo->prev : &pFillInfo->next;
×
421
      code = copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pRVal, false);
×
422
      QUERY_CHECK_CODE(code, lino, _end);
×
423
    }
424

425
    if (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) &&
×
426
        pFillInfo->numOfCurrent < outputRows) {
×
427
      // fill the gap between two input rows
428
      while (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) &&
×
429
             pFillInfo->numOfCurrent < outputRows) {
×
430
        doFillOneRow(pFillInfo, pBlock, pFillInfo->pSrcBlock, ts, false);
×
431
      }
432

433
      // output buffer is full, abort
434
      if (pFillInfo->numOfCurrent == outputRows) {
×
435
        pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
×
436
        goto _end;
×
437
      }
438
    } else {
439
      QUERY_CHECK_CONDITION((pFillInfo->currentKey == ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
440
      int32_t index = pBlock->info.rows;
×
441

442
      int32_t nextRowIndex = pFillInfo->index + 1;
×
443
      if (pFillInfo->type == TSDB_FILL_NEXT) {
×
444
        if ((pFillInfo->index + 1) < pFillInfo->numOfRows) {
×
445
          code = copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, false);
×
446
          QUERY_CHECK_CODE(code, lino, _end);
×
447
        } else {
448
          // reset to null after last row
449
          code = copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, true);
×
450
          QUERY_CHECK_CODE(code, lino, _end);
×
451
        }
452
      }
453
      if (pFillInfo->type == TSDB_FILL_PREV) {
×
454
        if (nextRowIndex + 1 >= pFillInfo->numOfRows && !FILL_IS_ASC_FILL(pFillInfo)) {
×
455
          code = copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, true);
×
456
          QUERY_CHECK_CODE(code, lino, _end);
×
457
        }
458
      }
459

460
      // copy rows to dst buffer
461
      for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
×
462
        SFillColInfo* pCol = &pFillInfo->pFillCol[i];
×
463

464
        int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
×
465

466
        SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlotId);
×
467
        SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, dstSlotId);
×
468

469
        char* src = colDataGetData(pSrc, pFillInfo->index);
×
470
        if (!colDataIsNull_s(pSrc, pFillInfo->index)) {
×
471
          code = colDataSetVal(pDst, index, src, false);
×
472
          QUERY_CHECK_CODE(code, lino, _end);
×
473
          SRowVal* pRVal = &pFillInfo->prev;
×
474
          saveColData(pRVal->pRowVal, i, src, false);
×
475
          if (pFillInfo->srcTsSlotId == dstSlotId) {
×
476
            pRVal->key = *(int64_t*)src;
×
477
          }
478
        } else {  // the value is null
479
          if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
×
480
            code = colDataSetVal(pDst, index, (const char*)&pFillInfo->currentKey, false);
×
481
            QUERY_CHECK_CODE(code, lino, _end);
×
482
          } else {  // i > 0 and data is null , do interpolation
483
            if (pFillInfo->type == TSDB_FILL_PREV) {
×
484
              SArray*     p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
×
485
              SGroupKeys* pKey = taosArrayGet(p, i);
×
486
              QUERY_CHECK_NULL(pKey, code, lino, _end, terrno);
×
487
              code = doSetVal(pDst, index, pKey);
×
488
              QUERY_CHECK_CODE(code, lino, _end);
×
489
            } else if (pFillInfo->type == TSDB_FILL_LINEAR) {
×
490
              bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
×
491
              code = colDataSetVal(pDst, index, src, isNull);
×
492
              QUERY_CHECK_CODE(code, lino, _end);
×
493

494
              SArray* p = pFillInfo->prev.pRowVal;
×
495
              saveColData(p, i, src, isNull);  // todo:
×
496
            } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
×
497
              colDataSetNULL(pDst, index);
×
498
            } else if (pFillInfo->type == TSDB_FILL_NEXT) {
×
499
              SArray*     p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next.pRowVal : pFillInfo->prev.pRowVal;
×
500
              SGroupKeys* pKey = taosArrayGet(p, i);
×
501
              QUERY_CHECK_NULL(pKey, code, lino, _end, terrno);
×
502
              code = doSetVal(pDst, index, pKey);
×
503
              QUERY_CHECK_CODE(code, lino, _end);
×
504
            } else {
505
              SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
×
506
              code = doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey);
×
507
              QUERY_CHECK_CODE(code, lino, _end);
×
508
            }
509
          }
510
        }
511
      }
512

513
      // set the tag value for final result
514
      SInterval* pInterval = &pFillInfo->interval;
×
515
      pFillInfo->currentKey =
×
516
          taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision, NULL);
×
517

518
      pBlock->info.rows += 1;
×
519
      pFillInfo->index += 1;
×
520
      pFillInfo->numOfCurrent += 1;
×
521
    }
522

523
    if (pFillInfo->index >= pFillInfo->numOfRows || pFillInfo->numOfCurrent >= outputRows) {
×
524
      pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
×
525
      goto _end;
×
526
    }
527
  }
528

529
_end:
×
530
  if (code != TSDB_CODE_SUCCESS) {
×
531
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
532
  }
533
  return code;
×
534
}
535

536
static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull) {
72,783,822✔
537
  SGroupKeys* pKey = taosArrayGet(rowBuf, columnIndex);
72,783,822✔
538
  if (isNull) {
72,647,499✔
539
    pKey->isNull = true;
1,305,141✔
540
  } else {
541
    if (IS_VAR_DATA_TYPE(pKey->type)) {
71,342,358!
542
      memcpy(pKey->pData, src, varDataTLen(src));
4,557,716✔
543
    } else {
544
      memcpy(pKey->pData, src, pKey->bytes);
66,784,642✔
545
    }
546
    pKey->isNull = false;
71,342,358✔
547
  }
548
}
72,647,499✔
549

550
static int32_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_t resultCapacity) {
×
551
  int32_t code = TSDB_CODE_SUCCESS;
×
552
  int32_t lino = 0;
×
553
  /*
554
   * These data are generated according to fill strategy, since the current timestamp is out of the time window of
555
   * real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
556
   */
557
  pFillInfo->numOfCurrent = 0;
×
558
  while (pFillInfo->numOfCurrent < resultCapacity) {
×
559
    doFillOneRow(pFillInfo, pBlock, pFillInfo->pSrcBlock, pFillInfo->start, true);
×
560
  }
561

562
  pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
×
563

564
  QUERY_CHECK_CONDITION((pFillInfo->numOfCurrent == resultCapacity), code, lino, _end,
×
565
                        TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
566

567
_end:
×
568
  if (code != TSDB_CODE_SUCCESS) {
×
569
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
570
  }
571
  return code;
×
572
}
573

574
static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
1,972,970✔
575
  if (pFillInfo->numOfRows == 0 || (pFillInfo->numOfRows > 0 && pFillInfo->index >= pFillInfo->numOfRows)) {
1,972,970!
576
    return 0;
1,764,425✔
577
  }
578

579
  return pFillInfo->numOfRows - pFillInfo->index;
208,545✔
580
}
581

582
int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t fillNullCols,
489,674✔
583
                           int32_t capacity, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol,
584
                           int32_t primaryTsSlotId, int32_t order, const char* id, SExecTaskInfo* pTaskInfo,
585
                           SFillInfo** ppFillInfo) {
586
  int32_t code = TSDB_CODE_SUCCESS;
489,674✔
587
  int32_t lino = 0;
489,674✔
588
  if (fillType == TSDB_FILL_NONE) {
489,674!
589
    (*ppFillInfo) = NULL;
×
590
    return code;
×
591
  }
592

593
  SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo));
489,674!
594
  QUERY_CHECK_NULL(pFillInfo, code, lino, _end, terrno);
489,677!
595

596
  pFillInfo->order = order;
489,677✔
597
  pFillInfo->srcTsSlotId = primaryTsSlotId;
489,677✔
598

599
  for (int32_t i = 0; i < numOfNotFillCols; ++i) {
499,644!
600
    SFillColInfo* p = &pCol[i + numOfFillCols];
499,649✔
601
    int32_t       srcSlotId = GET_DEST_SLOT_ID(p);
499,649✔
602
    if (srcSlotId == primaryTsSlotId) {
499,649✔
603
      pFillInfo->tsSlotId = i + numOfFillCols;
489,682✔
604
      break;
489,682✔
605
    }
606
  }
607

608
  taosResetFillInfo(pFillInfo, skey);
489,677✔
609

610
  pFillInfo->type = fillType;
489,680✔
611
  pFillInfo->pFillCol = pCol;
489,680✔
612
  pFillInfo->numOfCols = numOfFillCols + numOfNotFillCols + fillNullCols;
489,680✔
613
  pFillInfo->alloc = capacity;
489,680✔
614
  pFillInfo->id = id;
489,680✔
615
  pFillInfo->interval = *pInterval;
489,680✔
616

617
  pFillInfo->next.pRowVal = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
489,680✔
618
  QUERY_CHECK_NULL(pFillInfo->next.pRowVal, code, lino, _end, terrno);
489,685!
619

620
  pFillInfo->next.pNullValueFlag = taosArrayInit(pFillInfo->numOfCols, sizeof(bool));
489,685✔
621
  QUERY_CHECK_NULL(pFillInfo->next.pNullValueFlag, code, lino, _end, terrno);
489,687!
622

623
  pFillInfo->prev.pRowVal = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
489,687✔
624
  QUERY_CHECK_NULL(pFillInfo->prev.pRowVal, code, lino, _end, terrno);
489,689!
625

626
  pFillInfo->prev.pNullValueFlag = taosArrayInit(pFillInfo->numOfCols, sizeof(bool));
489,689✔
627
  QUERY_CHECK_NULL(pFillInfo->prev.pNullValueFlag, code, lino, _end, terrno);
489,688!
628

629
  code = initBeforeAfterDataBuf(pFillInfo);
489,688✔
630
  QUERY_CHECK_CODE(code, lino, _end);
489,685!
631

632
  pFillInfo->pTaskInfo = pTaskInfo;
489,685✔
633

634
_end:
489,685✔
635
  if (code != TSDB_CODE_SUCCESS) {
489,685!
636
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
637
    pFillInfo = taosDestroyFillInfo(pFillInfo);
×
638
  }
639
  (*ppFillInfo) = pFillInfo;
489,681✔
640
  return code;
489,681✔
641
}
642

643
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
498,268✔
644
  pFillInfo->start = startTimestamp;
498,268✔
645
  pFillInfo->currentKey = startTimestamp;
498,268✔
646
  pFillInfo->end = startTimestamp;
498,268✔
647
  pFillInfo->index = -1;
498,268✔
648
  pFillInfo->numOfRows = 0;
498,268✔
649
  pFillInfo->numOfCurrent = 0;
498,268✔
650
  pFillInfo->numOfTotal = 0;
498,268✔
651
}
498,268✔
652

653
void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
489,687✔
654
  if (pFillInfo == NULL) {
489,687!
655
    return NULL;
×
656
  }
657
  for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->prev.pRowVal); ++i) {
1,688,233✔
658
    SGroupKeys* pKey = taosArrayGet(pFillInfo->prev.pRowVal, i);
1,198,510✔
659
    if (pKey) taosMemoryFree(pKey->pData);
1,198,461!
660
  }
661
  taosArrayDestroy(pFillInfo->prev.pNullValueFlag);
489,652✔
662
  taosArrayDestroy(pFillInfo->prev.pRowVal);
489,692✔
663
  for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->next.pRowVal); ++i) {
1,688,239✔
664
    SGroupKeys* pKey = taosArrayGet(pFillInfo->next.pRowVal, i);
1,198,508✔
665
    if (pKey) taosMemoryFree(pKey->pData);
1,198,457!
666
  }
667
  taosArrayDestroy(pFillInfo->next.pNullValueFlag);
489,665✔
668
  taosArrayDestroy(pFillInfo->next.pRowVal);
489,694✔
669

670
  //  for (int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
671
  //    taosMemoryFreeClear(pFillInfo->pTags[i].tagVal);
672
  //  }
673

674
  // free pFillCol
675
  if (pFillInfo->pFillCol) {
489,694✔
676
    for (int32_t i = 0; i < pFillInfo->numOfCols; i++) {
1,688,238✔
677
      SFillColInfo* pCol = &pFillInfo->pFillCol[i];
1,198,545✔
678
      if (!pCol->notFillCol) {
1,198,545✔
679
        if (pCol->fillVal.nType == TSDB_DATA_TYPE_VARBINARY || pCol->fillVal.nType == TSDB_DATA_TYPE_VARCHAR ||
519,194✔
680
            pCol->fillVal.nType == TSDB_DATA_TYPE_NCHAR || pCol->fillVal.nType == TSDB_DATA_TYPE_JSON) {
471,178!
681
          if (pCol->fillVal.pz) {
74,945!
682
            taosMemoryFree(pCol->fillVal.pz);
74,945!
683
            pCol->fillVal.pz = NULL;
74,945✔
684
          }
685
        }
686
      }
687
    }
688
  }
689

690
  taosMemoryFreeClear(pFillInfo->pTags);
489,694!
691
  taosMemoryFreeClear(pFillInfo->pFillCol);
489,694✔
692
  taosArrayDestroy(pFillInfo->pColFillProgress);
489,695✔
693
  tdListFreeP(pFillInfo->pFillSavedBlockList, destroyFillBlock);
489,694✔
694
  taosMemoryFreeClear(pFillInfo);
489,691!
695
  return NULL;
489,693✔
696
}
697

698
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) {
1,102,303✔
699
  if (pFillInfo->type == TSDB_FILL_NONE) {
1,102,303!
700
    return;
×
701
  }
702

703
  // the endKey is now the aligned time window value. truncate time window isn't correct.
704
  pFillInfo->end = endKey;
1,102,303✔
705
  pFillInfo->index = 0;
1,102,303✔
706
  pFillInfo->numOfRows = numOfRows;
1,102,303✔
707
}
708

709
void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) {
239,754✔
710
  pFillInfo->pSrcBlock = (SSDataBlock*)pInput;
239,754✔
711
}
239,754✔
712

713
void taosFillUpdateStartTimestampInfo(SFillInfo* pFillInfo, int64_t ts) {
438,864✔
714
  pFillInfo->start = ts;
438,864✔
715
  pFillInfo->currentKey = ts;
438,864✔
716
}
438,864✔
717

718
bool taosFillNotStarted(const SFillInfo* pFillInfo) { return pFillInfo->start == pFillInfo->currentKey; }
651,874✔
719

720
bool taosFillHasMoreResults(SFillInfo* pFillInfo) {
1,360,751✔
721
  int32_t remain = taosNumOfRemainRows(pFillInfo);
1,360,751✔
722
  if (remain > 0) {
1,360,756✔
723
    return true;
208,555✔
724
  }
725

726
  bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
1,152,201✔
727
  if (pFillInfo->numOfTotal > 0 &&
1,152,201✔
728
      (((pFillInfo->end > pFillInfo->start) && ascFill) || (pFillInfo->end < pFillInfo->start && !ascFill))) {
662,540!
729
    return getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, 4096) > 0;
612,217✔
730
  }
731

732
  return false;
539,984✔
733
}
734

735
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) {
612,225✔
736
  int32_t numOfRows = taosNumOfRemainRows(pFillInfo);
612,225✔
737

738
  TSKEY ekey1 = ekey;
612,225✔
739

740
  int64_t numOfRes = -1;
612,225✔
741
  if (numOfRows > 0) {  // still fill gap within current data block, not generating data after the result set.
612,225!
742
    SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
×
743
    int64_t*         tsList = (int64_t*)pCol->pData;
×
744
    TSKEY            lastKey = tsList[pFillInfo->numOfRows - 1];
×
745
    numOfRes =
×
746
        taosTimeCountIntervalForFill(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding,
×
747
                                     pFillInfo->interval.slidingUnit, pFillInfo->interval.precision, pFillInfo->order);
×
748
  } else {  // reach the end of data
749
    if ((ekey1 < pFillInfo->currentKey && FILL_IS_ASC_FILL(pFillInfo)) ||
612,225✔
750
        (ekey1 > pFillInfo->currentKey && !FILL_IS_ASC_FILL(pFillInfo))) {
257,827✔
751
      return 0;
383,316✔
752
    }
753

754
    numOfRes =
228,910✔
755
        taosTimeCountIntervalForFill(ekey1, pFillInfo->currentKey, pFillInfo->interval.sliding,
228,909✔
756
                                     pFillInfo->interval.slidingUnit, pFillInfo->interval.precision, pFillInfo->order);
228,909✔
757
  }
758

759
  return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;
228,910✔
760
}
761

762
void taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2,
49,068,133✔
763
                                   int32_t inputType, STypeMod inputTypeMod) {
764
  double v1 = -1, v2 = -1;
49,068,133✔
765
  GET_TYPED_DATA(v1, double, inputType, point1->val, inputTypeMod);
49,068,133!
766
  GET_TYPED_DATA(v2, double, inputType, point2->val, inputTypeMod);
49,072,598!
767

768
  double r = 0;
49,071,599✔
769
  if (!IS_BOOLEAN_TYPE(inputType)) {
49,071,599✔
770
    r = DO_INTERPOLATION(v1, v2, point1->key, point2->key, point->key);
49,071,569✔
771
  } else {
772
    r = (v1 < 1 || v2 < 1) ? 0 : 1;
30✔
773
  }
774
  SET_TYPED_DATA(point->val, outputType, r);
49,071,599!
775
}
49,071,599✔
776

777
int32_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity) {
×
778
  int32_t code = TSDB_CODE_SUCCESS;
×
779
  int32_t lino = 0;
×
780
  int32_t remain = taosNumOfRemainRows(pFillInfo);
×
781

782
  int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);
×
783
  QUERY_CHECK_CONDITION((numOfRes <= capacity), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
784

785
  // no data existed for fill operation now, append result according to the fill strategy
786
  if (remain == 0) {
×
787
    code = appendFilledResult(pFillInfo, p, numOfRes);
×
788
    QUERY_CHECK_CODE(code, lino, _end);
×
789
  } else {
790
    code = fillResultImpl(pFillInfo, p, (int32_t)numOfRes);
×
791
    QUERY_CHECK_CODE(code, lino, _end);
×
792
    QUERY_CHECK_CONDITION((numOfRes == pFillInfo->numOfCurrent), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
793
  }
794

795
  qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%" PRId64 "-%" PRId64 ", currentKey:%" PRId64
×
796
         ", current : % d, total : % d, %s",
797
         pFillInfo, pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->end, pFillInfo->currentKey,
798
         pFillInfo->numOfCurrent, pFillInfo->numOfTotal, pFillInfo->id);
799
_end:
×
800
  if (code != TSDB_CODE_SUCCESS) {
×
801
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
802
  }
803
  return code;
×
804
}
805

806
int64_t getFillInfoStart(struct SFillInfo* pFillInfo) { return pFillInfo->start; }
8,588✔
807

808
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
568,724✔
809
                                int32_t numOfNoFillExpr, SExprInfo* pFillNullExpr, int32_t numOfFillNullExpr,
810
                                const struct SNodeListNode* pValNode) {
811
  int32_t       code = TSDB_CODE_SUCCESS;
568,724✔
812
  int32_t       lino = 0;
568,724✔
813
  SFillColInfo* pFillCol = taosMemoryCalloc(numOfFillExpr + numOfNoFillExpr + numOfFillNullExpr, sizeof(SFillColInfo));
568,724!
814
  if (pFillCol == NULL) {
568,735!
815
    return NULL;
×
816
  }
817

818
  size_t len = (pValNode != NULL) ? LIST_LENGTH(pValNode->pNodeList) : 0;
568,735!
819
  for (int32_t i = 0; i < numOfFillExpr; ++i) {
1,285,153✔
820
    SExprInfo* pExprInfo = &pExpr[i];
716,421✔
821
    pFillCol[i].pExpr = pExprInfo;
716,421✔
822
    pFillCol[i].notFillCol = false;
716,421✔
823

824
    // todo refactor
825
    if (len > 0) {
716,421✔
826
      // if the user specified value is less than the column, alway use the last one as the fill value
827
      int32_t index = (i >= len) ? (len - 1) : i;
527,923✔
828

829
      SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index);
527,923✔
830
      QUERY_CHECK_NULL(pv, code, lino, _end, terrno);
527,922!
831
      code = nodesValueNodeToVariant(pv, &pFillCol[i].fillVal);
527,922✔
832
      QUERY_CHECK_CODE(code, lino, _end);
527,920!
833
    }
834
    if (TSDB_CODE_SUCCESS != code) {
716,418!
835
      goto _end;
×
836
    }
837
  }
838
  pFillCol->numOfFillExpr = numOfFillExpr;
568,732✔
839

840
  for (int32_t i = 0; i < numOfNoFillExpr; ++i) {
1,076,391✔
841
    SExprInfo* pExprInfo = &pNotFillExpr[i];
507,659✔
842
    pFillCol[i + numOfFillExpr].pExpr = pExprInfo;
507,659✔
843
    pFillCol[i + numOfFillExpr].notFillCol = true;
507,659✔
844
  }
845

846
  for (int32_t i = 0; i < numOfFillNullExpr; ++i) {
740,921✔
847
    SExprInfo* pExprInfo = &pFillNullExpr[i];
172,189✔
848
    pFillCol[i + numOfFillExpr + numOfNoFillExpr].pExpr = pExprInfo;
172,189✔
849
    pFillCol[i + numOfFillExpr + numOfNoFillExpr].notFillCol = true;
172,189✔
850
    pFillCol[i + numOfFillExpr + numOfNoFillExpr].fillNull = true;
172,189✔
851
  }
852

853
  return pFillCol;
568,732✔
854

855
_end:
×
856
  for (int32_t i = 0; i < numOfFillExpr; ++i) {
×
857
    taosVariantDestroy(&pFillCol[i].fillVal);
×
858
  }
859
  taosMemoryFree(pFillCol);
×
860
  if (code != TSDB_CODE_SUCCESS) {
×
861
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
862
  }
863
  return NULL;
×
864
}
865

866
static bool fillShouldPause(SFillInfo* pFillInfo, const SSDataBlock* pDstBlock) {
961,982,839✔
867
  if (pFillInfo->pSrcBlock && pFillInfo->index >= pFillInfo->pSrcBlock->info.rows) return true;
961,982,839✔
868
  if (pDstBlock->info.rows > 0) return true;
961,744,398✔
869
  if (pFillInfo->numOfRows == 0) return true;
960,868,078✔
870
  if (pFillInfo->order == TSDB_ORDER_ASC && pFillInfo->currentKey > pFillInfo->end) return true;
960,443,076✔
871
  if (pFillInfo->order == TSDB_ORDER_DESC && pFillInfo->currentKey < pFillInfo->end) return true;
960,443,074!
872
  if (pDstBlock->info.rows > 0) return true;
960,443,074!
873
  return false;
960,443,074✔
874
}
875

876
static TSKEY getBlockCurTs(const struct SFillInfo* pFillInfo, const SSDataBlock* pBlock, int32_t rowIdx) {
985,912,223✔
877
  if (pBlock) {
985,912,223!
878
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pFillInfo->srcTsSlotId);
986,220,822✔
879
    return ((TSKEY*)pTsCol->pData)[rowIdx];
953,815,656✔
880
  }
881
  return -1;
×
882
}
883

884
static int32_t copyCurrentRowIntoBuf2(SFillInfo* pFillInfo, int32_t rowIndex, SRowVal* pRowVal, TSKEY blockCurTs) {
27,645,089✔
885
  int32_t          code = TSDB_CODE_SUCCESS;
27,645,089✔
886
  int32_t          lino = 0;
27,645,089✔
887
  bool             ascFill = FILL_IS_ASC_FILL(pFillInfo);
27,645,089✔
888
  int32_t          fillType = pFillInfo->type;
27,645,089✔
889
  bool             fillNext = fillType == TSDB_FILL_NEXT, fillPrev = fillType == TSDB_FILL_PREV;
27,645,089✔
890
  bool             ascNext = ascFill && fillNext, descPrev = !ascFill && fillPrev;
27,645,089✔
891
  SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
27,645,089✔
892
  QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
27,664,978!
893
  pRowVal->key = ((int64_t*)pTsCol->pData)[rowIndex];
27,669,527✔
894

895
  for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
110,963,687✔
896
    int32_t type = pFillInfo->pFillCol[i].pExpr->pExpr->nodeType;
83,327,272✔
897
    if (type == QUERY_NODE_COLUMN || type == QUERY_NODE_OPERATOR || type == QUERY_NODE_FUNCTION) {
156,034,677!
898
      int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]);
83,327,272✔
899

900
      if (blockCurTs != pFillInfo->currentKey) {
83,327,272✔
901
        if (!pFillInfo->pFillCol[i].notFillCol) {
17,973,919✔
902
          if (!ascNext && !descPrev) continue;
9,517,445✔
903
        }
904
        if (srcSlotId == pFillInfo->srcTsSlotId && pFillInfo->type == TSDB_FILL_LINEAR) {
8,679,792✔
905
          continue;
959,460✔
906
        }
907
      }
908

909
      SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
73,073,685✔
910
      QUERY_CHECK_NULL(pSrcCol, code, lino, _end, terrno);
72,941,905!
911

912
      bool  isNull = colDataIsNull_s(pSrcCol, rowIndex);
72,981,671✔
913
      char* p = colDataGetData(pSrcCol, rowIndex);
72,981,671!
914
      // only save null value flag for fill prev/next and need fill cols
915
      if (!pFillInfo->pFillCol[i].notFillCol && pRowVal->pNullValueFlag && (fillNext || fillPrev)) {
72,981,671!
916
        bool* pNullValueFlag = taosArrayGet(pRowVal->pNullValueFlag, i);
3,103,060✔
917
        *pNullValueFlag = isNull;
3,109,684✔
918
        bool ascPrevOrDescNext = (fillPrev && ascFill) || (fillNext && !ascFill);
3,109,684✔
919
        // For ascPrev and descNext, we do not save NULL values into prev/next pRowVal, cause the last prev or last next
920
        // will be used for later filling, should not use NULL to override the last value
921
        if (isNull && ascPrevOrDescNext)  continue;
3,109,684✔
922
      }
923

924
      saveColData(pRowVal->pRowVal, i, p, isNull);
72,628,093✔
925
    } else {
926
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
927
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
928
      QUERY_CHECK_CODE(code, lino, _end);
×
929
    }
930
  }
931

932
_end:
27,636,415✔
933
  if (code != TSDB_CODE_SUCCESS) {
27,636,415!
934
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
935
  }
936
  return code;
27,664,776✔
937
}
938

939

940
static int32_t fillTrySaveRow(struct SFillInfo* pFillInfo, const SSDataBlock* pBlock, int32_t rowIdx) {
27,708,300✔
941
  if (!pBlock) return 0;
27,708,300!
942
  int32_t  code = 0;
27,708,300✔
943
  bool     ascFill = FILL_IS_ASC_FILL(pFillInfo);
27,708,300✔
944
  TSKEY    blockCurKey = getBlockCurTs(pFillInfo, pBlock, rowIdx);
27,708,300✔
945
  int32_t  fillType = pFillInfo->type;
27,642,293✔
946
  SRowVal* pFillRow = ascFill ? (fillType == TSDB_FILL_NEXT ? &pFillInfo->next : &pFillInfo->prev)
25,750,792✔
947
                              : (fillType == TSDB_FILL_PREV ? &pFillInfo->next : &pFillInfo->prev);
53,393,085✔
948
  return copyCurrentRowIntoBuf2(pFillInfo, rowIdx, pFillRow, blockCurKey);
27,642,293✔
949
}
950

951
static int32_t tIsColFallBehind(struct SFillInfo* pFillInfo, int32_t colIdx) {
2,147,483,647✔
952
  SColumnFillProgress* pColProgress = taosArrayGet(pFillInfo->pColFillProgress, colIdx);
2,147,483,647✔
953
  if (!pColProgress) {
2,128,806,586!
954
    qError("failed to get col progress for col %d, size: %lu", colIdx, taosArrayGetSize(pFillInfo->pColFillProgress));
×
955
    return TSDB_CODE_INTERNAL_ERROR;
×
956
  }
957
  if (pColProgress->pBlockNode) {
2,131,885,687✔
958
    return true;
128,031✔
959
  }
960
  return false;
2,131,757,656✔
961
}
962

963
static bool tFillTrySaveColProgress(struct SFillInfo* pFillInfo, int32_t colIdx, SListNode* pBlockNode,
6,886,122✔
964
                                    int32_t rowIdx) {
965
  bool ascNext = pFillInfo->type == TSDB_FILL_NEXT && pFillInfo->order == TSDB_ORDER_ASC;
6,886,122✔
966
  bool descPrev = pFillInfo->type == TSDB_FILL_PREV && pFillInfo->order == TSDB_ORDER_DESC;
6,886,122✔
967
  if (ascNext || descPrev) {
6,886,122✔
968
    SColumnFillProgress* pProgress = taosArrayGet(pFillInfo->pColFillProgress, colIdx);
5,597,455✔
969
    if (!pProgress->pBlockNode) {
5,595,970✔
970
      pProgress->pBlockNode = pBlockNode;
127,862✔
971
      pProgress->rowIdx = rowIdx;
127,862✔
972
      SFillBlock* pFillBlock = (SFillBlock*)pBlockNode->data;
127,862✔
973
      SBlockFillProgress* pFillProg = taosArrayGet(pFillBlock->pFillProgress, colIdx);
127,862✔
974
      pFillProg->rowIdx = rowIdx;
128,031✔
975
    }
976
    return true;
5,596,139✔
977
  }
978
  return false;
1,288,667✔
979
}
980

981
static bool doFillOneCol(SFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY ts, int32_t colIdx, int32_t rowIdx, bool outOfBound) {
2,147,483,647✔
982
  int32_t code = 0;
2,147,483,647✔
983
  int32_t lino = 0;
2,147,483,647✔
984
  bool    saveProgress = false;
2,147,483,647✔
985
  // set the other values
986
  if (pFillInfo->type == TSDB_FILL_PREV) {
2,147,483,647✔
987
    SFillColInfo* pCol = &pFillInfo->pFillCol[colIdx];
97,811,050✔
988

989
    SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
97,811,050✔
990
    bool             filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfoData, rowIdx);
97,473,286✔
991
    if (!filled) {
97,612,491✔
992
      saveProgress = setNotFillColumn(pFillInfo, pDstColInfoData, rowIdx, colIdx) && pFillInfo->order == TSDB_ORDER_DESC;
64,796,724!
993
    }
994
  } else if (pFillInfo->type == TSDB_FILL_NEXT) {
2,147,483,647✔
995
    // todo  refactor: start from 0 not 1
996
    SFillColInfo*    pCol = &pFillInfo->pFillCol[colIdx];
92,980,863✔
997
    SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
92,980,863✔
998
    bool             filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfoData, rowIdx);
92,690,921✔
999
    if (!filled) {
92,908,096✔
1000
      saveProgress = setNotFillColumn(pFillInfo, pDstColInfoData, rowIdx, colIdx) && pFillInfo->order == TSDB_ORDER_ASC;
60,404,669!
1001
    }
1002
  } else if (pFillInfo->type == TSDB_FILL_LINEAR) {
2,147,483,647✔
1003
    // TODO : linear interpolation supports NULL value
1004
    if (outOfBound) {
115,699,178✔
1005
      setNullCol(pBlock, pFillInfo, rowIdx, colIdx);
11,542,956✔
1006
    } else {
1007
      SFillColInfo* pCol = &pFillInfo->pFillCol[colIdx];
104,156,222✔
1008

1009
      int32_t          dstSlotId = GET_DEST_SLOT_ID(pCol);
104,156,222✔
1010
      SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
104,156,222✔
1011
      int16_t          type = pDstCol->info.type;
104,165,696✔
1012

1013
      if (pCol->notFillCol) {
104,165,696✔
1014
        bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstCol, rowIdx);
42,582,257✔
1015
        if (!filled) {
42,586,476✔
1016
          (void)setNotFillColumn(pFillInfo, pDstCol, rowIdx, colIdx);
2,749,112✔
1017
        }
1018
      } else {
1019
        SRowVal*         pRVal = &pFillInfo->prev;
61,583,439✔
1020
        SGroupKeys*      pKey = taosArrayGet(pRVal->pRowVal, colIdx);
61,583,439✔
1021
        int32_t          srcSlotId = GET_DEST_SLOT_ID(pCol);
61,589,873✔
1022
        SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
61,589,873✔
1023
        if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull ||
61,600,174!
1024
            colDataIsNull_s(pSrcCol, pFillInfo->index)) {
81,614,030✔
1025
          colDataSetNULL(pDstCol, rowIdx);
20,865,993✔
1026
        } else {
1027
          SGroupKeys* pKey1 = taosArrayGet(pRVal->pRowVal, pFillInfo->tsSlotId);
40,734,181✔
1028

1029
          int64_t prevTs = *(int64_t*)pKey1->pData;
40,733,111✔
1030
          char*            data = colDataGetData(pSrcCol, pFillInfo->index);
40,733,111!
1031
          SPoint           point1, point2, point;
1032

1033
          point1 = (SPoint){.key = prevTs, .val = pKey->pData};
40,733,111✔
1034
          point2 = (SPoint){.key = ts, .val = data};
40,733,111✔
1035

1036
          int64_t out = 0;
40,733,111✔
1037
          point = (SPoint){.key = pFillInfo->currentKey, .val = &out};
40,733,111✔
1038
          taosGetLinearInterpolationVal(&point, type, &point1, &point2, type,
40,736,996✔
1039
                                        typeGetTypeModFromColInfo(&pDstCol->info));
40,733,111✔
1040

1041
          code = colDataSetVal(pDstCol, rowIdx, (const char*)&out, false);
40,737,492✔
1042
          QUERY_CHECK_CODE(code, lino, _end);
40,736,028!
1043
        }
1044
      }
1045
    }
1046
  } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {  // fill with NULL
2,147,483,647✔
1047
    setNullCol(pBlock, pFillInfo, rowIdx, colIdx);
307,017,565✔
1048
  } else {  // fill with user specified value for each column
1049
    SFillColInfo* pCol = &pFillInfo->pFillCol[colIdx];
2,147,483,647✔
1050

1051
    int32_t          slotId = GET_DEST_SLOT_ID(pCol);
2,147,483,647✔
1052
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, slotId);
2,147,483,647✔
1053

1054
    if (pCol->notFillCol) {
2,147,483,647✔
1055
      bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDst, rowIdx);
1,910,495,924✔
1056
      if (!filled) {
1,984,319,523✔
1057
        (void)setNotFillColumn(pFillInfo, pDst, rowIdx, colIdx);
673,680,852✔
1058
      }
1059
    } else {
1060
      SVariant* pVar = &pFillInfo->pFillCol[colIdx].fillVal;
1,023,827,293✔
1061
      code = doSetUserSpecifiedValue(pDst, pVar, rowIdx, pFillInfo->currentKey);
1,023,827,293✔
1062
      QUERY_CHECK_CODE(code, lino, _end);
1,454,980,878!
1063
    }
1064
  }
1065
_end:
1,454,980,878✔
1066
  if (code != TSDB_CODE_SUCCESS) {
2,147,483,647!
1067
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1068
    T_LONG_JMP(pFillInfo->pTaskInfo->env, code);
×
1069
  }
1070
  return saveProgress;
2,147,483,647✔
1071
}
1072

1073
static int32_t tFillFromHeadForCol(struct SFillInfo* pFillInfo, TSKEY ts, int32_t colIdx, bool outOfBound) {
128,032✔
1074
  int32_t code = 0;
128,032✔
1075
  // Check the progress of this col, start fill from the start block
1076
  // Here we will always fill till the last row of last block in list. Cause this is always the first time we meet non-null value
1077
  // after fill till current key, we should update it's progress, set no lag for this col
1078
  SColumnFillProgress* pColProgress = taosArrayGet(pFillInfo->pColFillProgress, colIdx);
128,032✔
1079
  if (!pColProgress) {
128,030!
1080
    qError("failed to get col progress for col %d, size: %lu", colIdx, taosArrayGetSize(pFillInfo->pColFillProgress));
×
1081
    return TSDB_CODE_INTERNAL_ERROR;
×
1082
  }
1083
  SListNode* pListNode = pColProgress->pBlockNode;
128,030✔
1084

1085
  while (pListNode) {
256,868✔
1086
    SFillBlock* pFillBlock = (SFillBlock*)pListNode->data;
128,836✔
1087
    const SColumnFillProgress* pProgress = taosArrayGet(pFillInfo->pColFillProgress, colIdx);
128,836✔
1088
    for (int32_t rowIdx = pProgress->rowIdx; rowIdx < pFillBlock->pBlock->info.rows; ++rowIdx) {
5,772,987✔
1089
      doFillOneCol(pFillInfo, pFillBlock->pBlock, ts, colIdx, rowIdx, outOfBound);
5,643,949✔
1090
    }
1091
    SBlockFillProgress* pMyBlockProgress = taosArrayGet(pFillBlock->pFillProgress, colIdx);
129,038✔
1092
    pMyBlockProgress->rowIdx = pFillBlock->pBlock->info.rows;
128,841✔
1093
    bool allColFinished = true;
128,841✔
1094
    for (int32_t i = 0; i < taosArrayGetSize(pFillBlock->pFillProgress); ++i) {
297,363✔
1095
      SBlockFillProgress* pBProgress = taosArrayGet(pFillBlock->pFillProgress, i);
293,325✔
1096
      if (pBProgress->rowIdx < pFillBlock->pBlock->info.rows) {
293,327✔
1097
        allColFinished = false;
124,805✔
1098
        break;
124,805✔
1099
      }
1100
    }
1101
    pFillBlock->allColFinished = allColFinished;
128,838✔
1102
    pListNode = TD_DLIST_NODE_NEXT(pListNode);
128,838✔
1103
    // update progress
1104
    pColProgress->pBlockNode = pListNode;
128,838✔
1105
    pColProgress->rowIdx = 0;
128,838✔
1106
  }
1107
  return code;
128,032✔
1108
}
1109

1110
static void doFillOneRow2(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* pSrcBlock, int64_t ts,
1,791,266,425✔
1111
                         bool outOfBound) {
1112
  int32_t code = TSDB_CODE_SUCCESS;
1,791,266,425✔
1113
  int32_t lino = 0;
1,791,266,425✔
1114
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
1,791,266,425✔
1115
  int32_t rowIdx = pBlock->info.rows;
1,791,266,425✔
1116

1117
  // set the primary timestamp column value
1118
  for (int32_t colIdx = 0; code == 0 && colIdx < pFillInfo->numOfCols; ++colIdx) {
2,147,483,647✔
1119
    if (outOfBound && tIsColFallBehind(pFillInfo, colIdx)) {
2,147,483,647✔
1120
      code = tFillFromHeadForCol(pFillInfo, ts, colIdx, true);
1,195✔
1121
      if (code != 0) goto _end;
1,195!
1122
    }
1123
    bool saveProgress = doFillOneCol(pFillInfo, pBlock, ts, colIdx, rowIdx, outOfBound);
2,147,483,647✔
1124
    // if this col meet a null value during fill the first time, save it's progress
1125
    if (saveProgress) {
2,147,483,647✔
1126
      SListNode*  pFillBlockListNode = tdListGetTail(pFillInfo->pFillSavedBlockList);
5,261,519✔
1127
      (void)tFillTrySaveColProgress(pFillInfo, colIdx, pFillBlockListNode, rowIdx);
5,260,887✔
1128
    }
1129
  }
1130
  SInterval* pInterval = &pFillInfo->interval;
1,695,496,212✔
1131
  pFillInfo->currentKey =
1,811,060,310✔
1132
      taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision, NULL);
1,695,496,212✔
1133
  pBlock->info.rows += 1;
1,811,060,310✔
1134
  pFillInfo->numOfCurrent++;
1,811,060,310✔
1135

1136
_end:
1,811,060,310✔
1137
  if (code != TSDB_CODE_SUCCESS) {
1,811,060,310!
1138
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1139
    T_LONG_JMP(pFillInfo->pTaskInfo->env, code);
×
1140
  }
1141
}
1,811,060,310✔
1142

1143
static void tryExtractReadyBlocks(struct SFillInfo* pFillInfo, SSDataBlock* pDstBlock, int32_t capacity) {
945,792,795✔
1144
  SListNode* pListNode = tdListGetHead(pFillInfo->pFillSavedBlockList);
945,792,795✔
1145
  bool allFinished = true;
948,673,953✔
1146
  bool noMoreBlocks = pFillInfo->numOfRows == 0;
948,673,953✔
1147
  if (pListNode) {
948,673,953!
1148
    SFillBlock* pFillBlock = (SFillBlock*)pListNode->data;
956,412,714✔
1149
    if (!noMoreBlocks) {
956,412,714✔
1150
      if (pFillBlock->pBlock->info.rows < capacity) return;
956,145,902!
UNCOV
1151
      for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
×
1152
        SColumnFillProgress* pProg = taosArrayGet(pFillInfo->pColFillProgress, colIdx);
1,462,698✔
1153
        if (pProg->pBlockNode == pListNode) {
1,462,680✔
1154
          allFinished = false;
572,343✔
1155
          break;
572,343✔
1156
        }
1157
      }
1158
    }
UNCOV
1159
    if (allFinished || noMoreBlocks) {
×
UNCOV
1160
      TSWAP(pDstBlock->info.rows, pFillBlock->pBlock->info.rows);
×
UNCOV
1161
      TSWAP(pDstBlock->pDataBlock, pFillBlock->pBlock->pDataBlock);
×
UNCOV
1162
      tdListPopNode(pFillInfo->pFillSavedBlockList, pListNode);
×
1163
      destroyFillBlock(pListNode->data);
1,301,346✔
1164
      taosMemFreeClear(pListNode);
1,301,365!
1165
    }
1166
  }
1167
}
1168

1169
static SSDataBlock* createNewSavedBlock(struct SFillInfo* pFillInfo, SSDataBlock* pDstBlock, int32_t capacity) {
1,301,335✔
1170
  int32_t code = 0;
1,301,335✔
1171
  SSDataBlock* pBlock = NULL;
1,301,335✔
1172
  code = createOneDataBlock(pDstBlock, false, &pBlock);
1,301,335✔
1173
  if (code != 0) return NULL;
1,301,348!
1174
  code = blockDataEnsureCapacity(pBlock, capacity);
1,301,348✔
1175
  if (code != 0) {
1,301,376!
1176
    blockDataDestroy(pBlock);
×
1177
    return NULL;
×
1178
  }
1179
  return pBlock;
1,301,376✔
1180
}
1181

1182
static int32_t trySaveNewBlock(struct SFillInfo* pFillInfo, SSDataBlock* pDstBlock, int32_t capacity, SFillBlock**ppFillBlock) {
1,301,342✔
1183
  int32_t code = 0;
1,301,342✔
1184
  SSDataBlock* pBlock = createNewSavedBlock(pFillInfo, pDstBlock, capacity);
1,301,342✔
1185
  if (!pBlock) {
1,301,376!
1186
    code = terrno;
×
1187
    goto _end;
×
1188
  }
1189
  SArray* pProgress = taosArrayInit(pFillInfo->numOfCols, sizeof(SBlockFillProgress));
1,301,376✔
1190
  if (!pProgress) {
1,301,360!
1191
    code = terrno;
×
1192
    goto _end;
×
1193
  }
1194
  SBlockFillProgress prog = {INT32_MAX};
1,301,361✔
1195
  for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
4,505,491✔
1196
    if (NULL == taosArrayPush(pProgress, &prog)) {
3,204,130!
1197
      code = terrno;
×
1198
      goto _end;
×
1199
    }
1200
  }
1201
  *ppFillBlock = tFillSaveBlock(pFillInfo, pBlock, pProgress);
1,301,336✔
1202
  if (!*ppFillBlock) {
1,301,363✔
1203
    code = terrno;
2✔
1204
    goto _end;
×
1205
  }
1206
  return 0;
1,301,361✔
1207
_end:
×
1208
  if (pBlock) blockDataDestroy(pBlock);
×
1209
  if (pProgress) taosArrayDestroy(pProgress);
×
1210
  return code;
×
1211
}
1212

1213
static int32_t fillInitSavedBlockList(struct SFillInfo* pFillInfo, SSDataBlock* pDstBlock, int32_t capacity) {
430,273✔
1214
  int32_t code = 0;
430,273✔
1215
  SFillBlock* pFillBlock = NULL;
430,273✔
1216
  pFillInfo->pFillSavedBlockList = tdListNew(sizeof(SFillBlock));
430,273✔
1217
  if (!pFillInfo->pFillSavedBlockList) return terrno;
430,271!
1218
  code = trySaveNewBlock(pFillInfo, pDstBlock, capacity, &pFillBlock);
430,271✔
1219
  if (code != 0) return code;
430,278!
1220

1221
  pFillInfo->pColFillProgress = taosArrayInit(pFillInfo->numOfCols, sizeof(SColumnFillProgress));
430,278✔
1222
  if (!pFillInfo->pColFillProgress) {
430,277✔
1223
    return terrno;
1✔
1224
  }
1225
  SColumnFillProgress prog = {.pBlockNode = NULL, .rowIdx = 0};
430,276✔
1226
  for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
1,478,096✔
1227
    if (NULL == taosArrayPush(pFillInfo->pColFillProgress, &prog)) {
2,095,642!
1228
      return terrno;
×
1229
    }
1230
  }
1231
  return code;
430,274✔
1232
}
1233

1234
static void tryResetColNextPrev(struct SFillInfo* pFillInfo, int32_t colIdx) {
65,845,314✔
1235
  bool     ascFill = FILL_IS_ASC_FILL(pFillInfo);
65,845,314✔
1236
  int32_t  fillType = pFillInfo->type;
65,845,314✔
1237
  bool     ascNext = ascFill && fillType == TSDB_FILL_NEXT, descPrev = !ascFill && fillType == TSDB_FILL_PREV;
65,845,314✔
1238
  if ((ascNext || descPrev) && !pFillInfo->pFillCol[colIdx].notFillCol) {
65,845,314✔
1239
    SRowVal* pFillRow = ascFill ? (fillType == TSDB_FILL_NEXT ? &pFillInfo->next : &pFillInfo->prev)
1,176,104!
1240
      : (fillType == TSDB_FILL_NEXT ? &pFillInfo->prev : &pFillInfo->next);
2,472,031!
1241
    SGroupKeys* pKey = taosArrayGet(pFillRow->pRowVal, colIdx);
1,295,927✔
1242
    pKey->isNull = true;
1,295,913✔
1243
  }
1244
}
65,845,300✔
1245

1246
int32_t taosFillResultDataBlock2(struct SFillInfo* pFillInfo, SSDataBlock* pDstBlock, int32_t capacity, bool* wantMoreBlock) {
1,539,750✔
1247
  int32_t     code = TSDB_CODE_SUCCESS;
1,539,750✔
1248
  int32_t     lino = 0;
1,539,750✔
1249
  bool        ascFill = FILL_IS_ASC_FILL(pFillInfo);
1,539,750✔
1250
  SFillBlock* pFillBlock = NULL;
1,539,750✔
1251
  SListNode*  pFillBlockListNode = NULL;
1,539,750✔
1252
  pFillInfo->numOfCurrent = 0;
1,539,750✔
1253

1254
  if (!pFillInfo->pFillSavedBlockList) {
1,539,750✔
1255
    code = fillInitSavedBlockList(pFillInfo, pDstBlock, capacity);
430,276✔
1256
    if (code != 0) goto _end;
430,272!
1257
  }
1258
  pFillBlockListNode = tdListGetTail(pFillInfo->pFillSavedBlockList);
1,539,746✔
1259
  pFillBlock = pFillBlockListNode ? (SFillBlock*)pFillBlockListNode->data : NULL;
1,539,743✔
1260

1261
  // if all blocks are consumed, we have to fill for not filled cols
1262
  if (pFillInfo->numOfRows == 0) {
1,539,743✔
1263
    if (!pFillBlock) {
1,091,457✔
1264
      code = trySaveNewBlock(pFillInfo, pDstBlock, capacity, &pFillBlock);
653,946✔
1265
      if (code != 0) goto _end;
653,945!
1266
      pFillBlockListNode = tdListGetTail(pFillInfo->pFillSavedBlockList);
653,945✔
1267
    }
1268
    bool allFilled = pFillInfo->order == TSDB_ORDER_ASC ? pFillInfo->currentKey > pFillInfo->end : pFillInfo->currentKey < pFillInfo->end;
1,091,458✔
1269
    while (!allFilled && pFillBlock->pBlock->info.rows < capacity) {
983,905,034!
1270
      doFillOneRow2(pFillInfo, pFillBlock->pBlock, pFillInfo->pSrcBlock, pFillInfo->start, true);
989,275,286✔
1271
      allFilled = pFillInfo->order == TSDB_ORDER_ASC ? pFillInfo->currentKey > pFillInfo->end : pFillInfo->currentKey < pFillInfo->end;
982,813,576✔
1272
    }
1273

UNCOV
1274
    for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
×
1275
      if (tIsColFallBehind(pFillInfo, colIdx)) {
2,675,525✔
1276
        code = tFillFromHeadForCol(pFillInfo, pFillInfo->start, colIdx, true);
181✔
1277
        if (code != 0) goto _end;
181!
1278
      }
1279
    }
1280
  }
1281

1282
  // check from list head if we have already filled all rows in blocks, if any block is full, send it out
UNCOV
1283
  tryExtractReadyBlocks(pFillInfo, pDstBlock, capacity);
×
1284
  TSKEY lastSavedTs = -1;
1,539,751✔
1285
  while (!fillShouldPause(pFillInfo, pDstBlock)) {
960,442,048✔
1286
    if (!pFillBlock || pFillBlock->pBlock->info.rows >= capacity) {
960,592,429!
UNCOV
1287
      code = trySaveNewBlock(pFillInfo, pDstBlock, capacity, &pFillBlock);
×
1288
      if (code != 0) goto _end;
217,142!
1289
      pFillBlockListNode = tdListGetTail(pFillInfo->pFillSavedBlockList);
217,142✔
1290
    }
1291
    TSKEY fillCurTs = pFillInfo->currentKey;
960,907,028✔
1292
    TSKEY blockCurTs = getBlockCurTs(pFillInfo, pFillInfo->pSrcBlock, pFillInfo->index);
960,907,028✔
1293
    if (pFillInfo->pSrcBlock && (lastSavedTs != blockCurTs || blockCurTs == fillCurTs))
927,313,146!
1294
      code = fillTrySaveRow(pFillInfo, pFillInfo->pSrcBlock, pFillInfo->index);
26,164,985✔
1295
    lastSavedTs = blockCurTs;
926,894,221✔
1296
    if (code != 0) goto _end;
926,894,221!
1297

1298
    if (blockCurTs != fillCurTs || !pFillInfo->pSrcBlock) {
926,894,221!
1299
      doFillOneRow2(pFillInfo, pFillBlock->pBlock, pFillInfo->pSrcBlock, blockCurTs, false);
904,764,553✔
1300
    } else {
1301
      for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
87,966,544✔
1302
        SFillColInfo*    pCol = &pFillInfo->pFillCol[colIdx];
65,941,861✔
1303
        int32_t          rowIdx = pFillBlock->pBlock->info.rows;
65,941,861✔
1304
        int32_t          dstSlotId = GET_DEST_SLOT_ID(pCol);
65,941,861✔
1305
        SColumnInfoData* pDst = taosArrayGet(pFillBlock->pBlock->pDataBlock, dstSlotId);
65,941,861✔
1306
        SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, dstSlotId);
65,253,554✔
1307

1308
        char* src = colDataGetData(pSrc, pFillInfo->index);
64,953,454!
1309
        if (!colDataIsNull_s(pSrc, pFillInfo->index)) {
129,906,908✔
1310
          if (tIsColFallBehind(pFillInfo, colIdx)) code = tFillFromHeadForCol(pFillInfo, blockCurTs, colIdx, false);
63,325,854✔
1311
          QUERY_CHECK_CODE(code, lino, _end);
63,324,848!
1312
          code = colDataSetVal(pDst, rowIdx, src, false);
63,324,848✔
1313
          QUERY_CHECK_CODE(code, lino, _end);
64,115,478!
1314
        } else {
1315
          // if col value in block is NULL, skip setting value for this col, save current position, wait till we got non-null data
1316
          // if there is no lag for this col, then we should fill from (pFillBlock, index) when we got non-null value.
1317
          // if this col is already fall behind, do nothing.
1318
          // Cause when we meet non-null value for this col, we will fill till the last row of last block in list.
1319
          bool saved = tFillTrySaveColProgress(pFillInfo, colIdx, pFillBlockListNode, rowIdx);
1,627,600✔
1320
          if (!saved) {
1,625,538✔
1321
            doFillOneCol(pFillInfo, pFillBlock->pBlock, blockCurTs, colIdx, rowIdx, false);
1,288,662✔
1322
          }
1323
        }
1324
        tryResetColNextPrev(pFillInfo, colIdx);
65,740,938✔
1325
      }
1326
      SInterval* pInterval = &pFillInfo->interval;
22,024,683✔
1327
      pFillInfo->currentKey =
22,255,361✔
1328
          taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order),
22,024,683✔
1329
                      pInterval->slidingUnit, pInterval->precision, NULL);
22,024,683✔
1330
      pFillBlock->pBlock->info.rows += 1;
22,255,361✔
1331
      pFillInfo->index += 1;
22,255,361✔
1332
      pFillInfo->numOfCurrent += 1;
22,255,361✔
1333

1334
    }
1335
    tryExtractReadyBlocks(pFillInfo, pDstBlock, capacity);
942,215,208✔
1336
  }
1337

1338
_end:
1,539,751✔
1339
  pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
1,539,751✔
1340
  if (!isListEmpty(pFillInfo->pFillSavedBlockList)) {
1,539,751✔
1341
    if (wantMoreBlock) *wantMoreBlock = true;
238,677✔
1342
  } else {
1343
    if (wantMoreBlock) *wantMoreBlock = false;
1,301,074✔
1344
  }
1345
  return code;
1,539,751✔
1346
}
1347

1348
void destroyFillBlock(void* p) {
1,301,345✔
1349
  SFillBlock* pFillBlock = p;
1,301,345✔
1350
  taosArrayDestroy(pFillBlock->pFillProgress);
1,301,345✔
1351
  blockDataDestroy(pFillBlock->pBlock);
1,301,349✔
1352
}
1,301,368✔
1353

1354
SFillBlock* tFillSaveBlock(SFillInfo* pFill, SSDataBlock* pBlock, SArray* pProgress) {
1,301,324✔
1355
  SFillBlock block = {.pBlock = pBlock, .pFillProgress = pProgress, .allColFinished = false};
1,301,324✔
1356
  SListNode* pNode = tdListAdd(pFill->pFillSavedBlockList, &block);
1,301,324✔
1357
  if (!pNode) {
1,301,366!
1358
    return NULL;
×
1359
  }
1360
  return (SFillBlock*)pNode->data;
1,301,366✔
1361
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc