• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

55.27
/source/libs/executor/src/tfill.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "os.h"
18
#include "query.h"
19
#include "taosdef.h"
20
#include "tmsg.h"
21
#include "ttypes.h"
22

23
#include "executorInt.h"
24
#include "tcommon.h"
25
#include "thash.h"
26
#include "ttime.h"
27

28
#include "executorInt.h"
29
#include "function.h"
30
#include "querynodes.h"
31
#include "querytask.h"
32
#include "tdatablock.h"
33
#include "tfill.h"
34

35
#define FILL_IS_ASC_FILL(_f) ((_f)->order == TSDB_ORDER_ASC)
36
#define DO_INTERPOLATION(_v1, _v2, _k1, _k2, _k) \
37
  ((_v1) + ((_v2) - (_v1)) * (((double)(_k)) - ((double)(_k1))) / (((double)(_k2)) - ((double)(_k1))))
38

39
static int32_t doSetVal(SColumnInfoData* pDstColInfoData, int32_t rowIndex, const SGroupKeys* pKey);
40

41
static bool setNotFillColumn(SFillInfo* pFillInfo, SColumnInfoData* pDstColInfo, int32_t rowIndex, int32_t colIdx) {
54,648,133✔
42
  SFillColInfo* pCol = &pFillInfo->pFillCol[colIdx];
54,648,133✔
43
  if (pCol->fillNull) {
54,648,133✔
44
    colDataSetNULL(pDstColInfo, rowIndex);
2!
45
  } else {
46
    SRowVal* p = NULL;
54,648,131✔
47
    bool     ascNext = false, descPrev = false;
54,648,131✔
48
    if (pFillInfo->type == TSDB_FILL_NEXT) {
54,648,131✔
49
      p = FILL_IS_ASC_FILL(pFillInfo) ? &pFillInfo->next : &pFillInfo->prev;
23,638,980✔
50
      if (FILL_IS_ASC_FILL(pFillInfo)) ascNext = true;
23,638,980✔
51
    } else {
52
      p = FILL_IS_ASC_FILL(pFillInfo) ? &pFillInfo->prev : &pFillInfo->next;
31,009,151✔
53
      if (!FILL_IS_ASC_FILL(pFillInfo)) descPrev = true;
31,009,151✔
54
    }
55

56
    const bool* pNullValueFlag = taosArrayGet(p->pNullValueFlag, colIdx);
54,648,131✔
57
    if (*pNullValueFlag && pFillInfo->numOfRows > 0) {
54,614,053✔
58
      if (ascNext || descPrev) return true;
5,804,167✔
59
    }
60

61
    SGroupKeys* pKey = taosArrayGet(p->pRowVal, colIdx);
50,521,668✔
62
    if (!pKey) {
50,469,674!
63
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(terrno));
×
64
      T_LONG_JMP(pFillInfo->pTaskInfo->env, terrno);
×
65
    }
66
    int32_t code = doSetVal(pDstColInfo, rowIndex, pKey);
50,469,674✔
67
    if (code != TSDB_CODE_SUCCESS) {
50,560,803!
68
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
69
      T_LONG_JMP(pFillInfo->pTaskInfo->env, code);
×
70
    }
71
  }
72
  return false;
50,562,246✔
73
}
74

75
static void setNullCol(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowIdx, int32_t colIdx) {
81,177,351✔
76
  SFillColInfo*    pCol = &pFillInfo->pFillCol[colIdx];
81,177,351✔
77
  int32_t          dstSlotId = GET_DEST_SLOT_ID(pCol);
81,177,351✔
78
  SColumnInfoData* pDstColInfo = taosArrayGet(pBlock->pDataBlock, dstSlotId);
81,177,351✔
79
  if (pCol->notFillCol) {
81,192,976✔
80
    bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfo, rowIdx);
19,061,866✔
81
    if (!filled) {
19,183,950✔
82
      setNotFillColumn(pFillInfo, pDstColInfo, rowIdx, colIdx);
5,839,781✔
83
    }
84
  } else {
85
    colDataSetNULL(pDstColInfo, rowIdx);
62,131,110✔
86
  }
87
}
81,298,740✔
88

89
static void setNullRow(SSDataBlock* pBlock, SFillInfo* pFillInfo, int32_t rowIndex) {
×
90
  for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
×
91
    setNullCol(pBlock, pFillInfo, rowIndex, i);
×
92
  }
93
}
×
94

95
static int32_t doSetUserSpecifiedValue(SColumnInfoData* pDst, SVariant* pVar, int32_t rowIndex, int64_t currentKey) {
2,061,515✔
96
  int32_t code = TSDB_CODE_SUCCESS;
2,061,515✔
97
  int32_t lino = 0;
2,061,515✔
98
  bool    isNull = (TSDB_DATA_TYPE_NULL == pVar->nType) ? true : false;
2,061,515✔
99
  if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
2,061,515✔
100
    float v = 0;
182✔
101
    GET_TYPED_DATA(v, float, pVar->nType, &pVar->f, typeGetTypeModFromColInfo(&pDst->info));
182!
102
    code = colDataSetVal(pDst, rowIndex, (char*)&v, isNull);
182✔
103
    QUERY_CHECK_CODE(code, lino, _end);
182!
104
  } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
2,061,333✔
105
    double v = 0;
836,526✔
106
    GET_TYPED_DATA(v, double, pVar->nType, &pVar->d, typeGetTypeModFromColInfo(&pDst->info));
836,526!
107
    code = colDataSetVal(pDst, rowIndex, (char*)&v, isNull);
836,526✔
108
    QUERY_CHECK_CODE(code, lino, _end);
836,526!
109
  } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type) || pDst->info.type == TSDB_DATA_TYPE_BOOL) {
2,058,925✔
110
    int64_t v = 0;
834,118✔
111
    GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i, typeGetTypeModFromColInfo(&pDst->info));
834,118!
112
    code = colDataSetVal(pDst, rowIndex, (char*)&v, isNull);
834,118✔
113
    QUERY_CHECK_CODE(code, lino, _end);
834,118!
114
  } else if (IS_UNSIGNED_NUMERIC_TYPE(pDst->info.type)) {
390,689!
115
    uint64_t v = 0;
×
116
    GET_TYPED_DATA(v, uint64_t, pVar->nType, &pVar->u, typeGetTypeModFromColInfo(&pDst->info));
×
117
    code = colDataSetVal(pDst, rowIndex, (char*)&v, isNull);
×
118
    QUERY_CHECK_CODE(code, lino, _end);
×
119
  } else if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
390,689✔
120
    int64_t v = 0;
13,752✔
121
    GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->u, typeGetTypeModFromColInfo(&pDst->info));
13,752!
122
    code = colDataSetVal(pDst, rowIndex, (const char*)&v, isNull);
13,752✔
123
    QUERY_CHECK_CODE(code, lino, _end);
13,752!
124
  } else if (pDst->info.type == TSDB_DATA_TYPE_NCHAR || pDst->info.type == TSDB_DATA_TYPE_VARCHAR ||
376,937✔
125
             pDst->info.type == TSDB_DATA_TYPE_VARBINARY) {
24!
126
    code = colDataSetVal(pDst, rowIndex, pVar->pz, isNull);
376,937✔
127
    QUERY_CHECK_CODE(code, lino, _end);
376,937!
128
  } else if (pDst->info.type == TSDB_DATA_TYPE_DECIMAL64) {
×
129
    code = colDataSetVal(pDst, rowIndex, (char*)&pVar->i, isNull);
×
130
  } else if (pDst->info.type == TSDB_DATA_TYPE_DECIMAL) {
×
131
    code = colDataSetVal(pDst, rowIndex, (char*)pVar->pz, isNull);
×
132
  } else {  // others data
133
    colDataSetNULL(pDst, rowIndex);
×
134
  }
135

136
_end:
2,061,515✔
137
  if (code != TSDB_CODE_SUCCESS) {
2,061,515!
138
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
139
  }
140
  return code;
2,061,515✔
141
}
142

143
// fill windows pseudo column, _wstart, _wend, _wduration and return true, otherwise return false
144
bool fillIfWindowPseudoColumn(SFillInfo* pFillInfo, SFillColInfo* pCol, SColumnInfoData* pDstColInfoData,
79,119,156✔
145
                              int32_t rowIndex) {
146
  int32_t code = TSDB_CODE_SUCCESS;
79,119,156✔
147
  int32_t lino = 0;
79,119,156✔
148
  if (!pCol->notFillCol) {
79,119,156✔
149
    return false;
41,566,922✔
150
  }
151
  if (pCol->pExpr->pExpr->nodeType == QUERY_NODE_COLUMN) {
37,552,234!
152
    if (pCol->pExpr->base.numOfParams != 1) {
37,707,760!
153
      return false;
×
154
    }
155
    if (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_START) {
37,707,760✔
156
      code = colDataSetVal(pDstColInfoData, rowIndex, (const char*)&pFillInfo->currentKey, false);
24,613,183✔
157
      QUERY_CHECK_CODE(code, lino, _end);
24,641,151!
158
      return true;
24,641,151✔
159
    } else if (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_END) {
13,094,577✔
160
      // TODO: include endpoint
161
      SInterval* pInterval = &pFillInfo->interval;
518✔
162
      int64_t    windowEnd =
518✔
163
          taosTimeAdd(pFillInfo->currentKey, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL);
518✔
164
      code = colDataSetVal(pDstColInfoData, rowIndex, (const char*)&windowEnd, false);
518✔
165
      QUERY_CHECK_CODE(code, lino, _end);
518!
166
      return true;
518✔
167
    } else if (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_WINDOW_DURATION) {
13,094,059!
168
      // TODO: include endpoint
169
      code = colDataSetVal(pDstColInfoData, rowIndex, (const char*)&pFillInfo->interval.sliding, false);
×
170
      QUERY_CHECK_CODE(code, lino, _end);
×
171
      return true;
×
172
    } else if (pCol->pExpr->base.pParam[0].pCol->colType == COLUMN_TYPE_IS_WINDOW_FILLED) {
13,094,059!
173
      code = colDataSetVal(pDstColInfoData, rowIndex, (const char*)&pFillInfo->isFilled, false);
×
174
      QUERY_CHECK_CODE(code, lino, _end);
×
175
      return true;
×
176
    }
177
  }
178

179
_end:
12,938,533✔
180
  if (code != TSDB_CODE_SUCCESS) {
12,938,533!
181
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
182
    T_LONG_JMP(pFillInfo->pTaskInfo->env, code);
×
183
  }
184
  return false;
12,938,533✔
185
}
186

187
static void doFillOneRow(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* pSrcBlock, int64_t ts,
×
188
                         bool outOfBound) {
189
  int32_t code = TSDB_CODE_SUCCESS;
×
190
  int32_t lino = 0;
×
191
  SPoint  point1, point2, point;
192
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
×
193

194
  // set the primary timestamp column value
195
  int32_t index = pBlock->info.rows;
×
196

197
  // set the other values
198
  if (pFillInfo->type == TSDB_FILL_PREV) {
×
199
    for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
×
200
      SFillColInfo* pCol = &pFillInfo->pFillCol[i];
×
201

202
      SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
×
203
      bool             filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfoData, index);
×
204
      if (!filled) {
×
205
        setNotFillColumn(pFillInfo, pDstColInfoData, index, i);
×
206
      }
207
    }
208
  } else if (pFillInfo->type == TSDB_FILL_NEXT) {
×
209
    // todo  refactor: start from 0 not 1
210
    for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
×
211
      SFillColInfo*    pCol = &pFillInfo->pFillCol[i];
×
212
      SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
×
213
      bool             filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfoData, index);
×
214
      if (!filled) {
×
215
        setNotFillColumn(pFillInfo, pDstColInfoData, index, i);
×
216
      }
217
    }
218
  } else if (pFillInfo->type == TSDB_FILL_LINEAR) {
×
219
    // TODO : linear interpolation supports NULL value
220
    if (outOfBound) {
×
221
      setNullRow(pBlock, pFillInfo, index);
×
222
    } else {
223
      for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
×
224
        SFillColInfo* pCol = &pFillInfo->pFillCol[i];
×
225

226
        int32_t          dstSlotId = GET_DEST_SLOT_ID(pCol);
×
227
        SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
×
228
        int16_t          type = pDstCol->info.type;
×
229

230
        if (pCol->notFillCol) {
×
231
          bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstCol, index);
×
232
          if (!filled) {
×
233
            setNotFillColumn(pFillInfo, pDstCol, index, i);
×
234
          }
235
        } else {
236
          SRowVal*    pRVal = &pFillInfo->prev;
×
237
          SGroupKeys* pKey = taosArrayGet(pRVal->pRowVal, i);
×
238
          if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull) {
×
239
            colDataSetNULL(pDstCol, index);
×
240
            continue;
×
241
          }
242

243
          SGroupKeys* pKey1 = taosArrayGet(pRVal->pRowVal, pFillInfo->tsSlotId);
×
244

245
          int64_t prevTs = *(int64_t*)pKey1->pData;
×
246
          int32_t srcSlotId = GET_DEST_SLOT_ID(pCol);
×
247

248
          SColumnInfoData* pSrcCol = taosArrayGet(pSrcBlock->pDataBlock, srcSlotId);
×
249
          char*            data = colDataGetData(pSrcCol, pFillInfo->index);
×
250

251
          point1 = (SPoint){.key = prevTs, .val = pKey->pData};
×
252
          point2 = (SPoint){.key = ts, .val = data};
×
253

254
          int64_t out = 0;
×
255
          point = (SPoint){.key = pFillInfo->currentKey, .val = &out};
×
256
          taosGetLinearInterpolationVal(&point, type, &point1, &point2, type,
×
257
                                        typeGetTypeModFromColInfo(&pDstCol->info));
×
258

259
          code = colDataSetVal(pDstCol, index, (const char*)&out, false);
×
260
          QUERY_CHECK_CODE(code, lino, _end);
×
261
        }
262
      }
263
    }
264
  } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {  // fill with NULL
×
265
    setNullRow(pBlock, pFillInfo, index);
×
266
  } else {  // fill with user specified value for each column
267
    for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
×
268
      SFillColInfo* pCol = &pFillInfo->pFillCol[i];
×
269

270
      int32_t          slotId = GET_DEST_SLOT_ID(pCol);
×
271
      SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, slotId);
×
272

273
      if (pCol->notFillCol) {
×
274
        bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDst, index);
×
275
        if (!filled) {
×
276
          setNotFillColumn(pFillInfo, pDst, index, i);
×
277
        }
278
      } else {
279
        SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
×
280
        code = doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey);
×
281
        QUERY_CHECK_CODE(code, lino, _end);
×
282
      }
283
    }
284
  }
285

286
  //  setTagsValue(pFillInfo, data, index);
287
  SInterval* pInterval = &pFillInfo->interval;
×
288
  pFillInfo->currentKey =
×
289
      taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision, NULL);
×
290
  pBlock->info.rows += 1;
×
291
  pFillInfo->numOfCurrent++;
×
292

293
_end:
×
294
  if (code != TSDB_CODE_SUCCESS) {
×
295
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
296
    T_LONG_JMP(pFillInfo->pTaskInfo->env, code);
×
297
  }
298
}
×
299

300
int32_t doSetVal(SColumnInfoData* pDstCol, int32_t rowIndex, const SGroupKeys* pKey) {
50,468,905✔
301
  int32_t code = TSDB_CODE_SUCCESS;
50,468,905✔
302
  int32_t lino = 0;
50,468,905✔
303
  if (pKey->isNull) {
50,468,905✔
304
    colDataSetNULL(pDstCol, rowIndex);
18,746,582✔
305
  } else {
306
    code = colDataSetVal(pDstCol, rowIndex, pKey->pData, false);
31,722,323✔
307
    QUERY_CHECK_CODE(code, lino, _end);
31,812,119!
308
  }
309

310
_end:
31,812,119✔
311
  if (code != TSDB_CODE_SUCCESS) {
50,558,701!
312
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
313
  }
314
  return code;
50,564,985✔
315
}
316

317
static int32_t initBeforeAfterDataBuf(SFillInfo* pFillInfo) {
3,108✔
318
  int32_t code = TSDB_CODE_SUCCESS;
3,108✔
319
  int32_t lino = 0;
3,108✔
320
  if (taosArrayGetSize(pFillInfo->next.pRowVal) > 0) {
3,108!
321
    goto _end;
×
322
  }
323

324
  for (int i = 0; i < pFillInfo->numOfCols; i++) {
20,521✔
325
    SFillColInfo* pCol = &pFillInfo->pFillCol[i];
17,409✔
326

327
    SGroupKeys  key = {0};
17,409✔
328
    SResSchema* pSchema = &pCol->pExpr->base.resSchema;
17,409✔
329
    key.pData = taosMemoryMalloc(pSchema->bytes);
17,409!
330
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
17,418!
331
    key.isNull = true;
17,418✔
332
    key.bytes = pSchema->bytes;
17,418✔
333
    key.type = pSchema->type;
17,418✔
334
    bool nullValueFlag = false;
17,418✔
335

336
    void* tmp = taosArrayPush(pFillInfo->next.pRowVal, &key);
17,418✔
337
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,415!
338

339
    tmp = taosArrayPush(pFillInfo->next.pNullValueFlag, &nullValueFlag);
17,415✔
340
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,412!
341

342
    key.pData = taosMemoryMalloc(pSchema->bytes);
17,412✔
343
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
17,418!
344

345
    tmp = taosArrayPush(pFillInfo->prev.pRowVal, &key);
17,418✔
346
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,416!
347

348
    tmp = taosArrayPush(pFillInfo->prev.pNullValueFlag, &nullValueFlag);
17,416✔
349
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
17,412!
350
  }
351

352
_end:
3,112✔
353
  if (code != TSDB_CODE_SUCCESS) {
3,112!
354
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
355
  }
356
  return code;
3,108✔
357
}
358

359
static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull);
360

361
static int32_t copyCurrentRowIntoBuf(SFillInfo* pFillInfo, int32_t rowIndex, SRowVal* pRowVal, bool reset) {
×
362
  int32_t          code = TSDB_CODE_SUCCESS;
×
363
  int32_t          lino = 0;
×
364
  SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
×
365
  QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
×
366
  pRowVal->key = ((int64_t*)pTsCol->pData)[rowIndex];
×
367

368
  for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
×
369
    int32_t type = pFillInfo->pFillCol[i].pExpr->pExpr->nodeType;
×
370
    if (type == QUERY_NODE_COLUMN || type == QUERY_NODE_OPERATOR || type == QUERY_NODE_FUNCTION) {
×
371
      if (!pFillInfo->pFillCol[i].notFillCol) {
×
372
        if (FILL_IS_ASC_FILL(pFillInfo) && pFillInfo->type != TSDB_FILL_NEXT) continue;
×
373
        if (!FILL_IS_ASC_FILL(pFillInfo) && pFillInfo->type != TSDB_FILL_PREV) continue;
×
374
      }
375
      int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]);
×
376

377
      if (srcSlotId == pFillInfo->srcTsSlotId && pFillInfo->type == TSDB_FILL_LINEAR) {
×
378
        continue;
×
379
      }
380

381
      SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
×
382
      QUERY_CHECK_NULL(pSrcCol, code, lino, _end, terrno);
×
383

384
      bool  isNull = colDataIsNull_s(pSrcCol, rowIndex);
×
385
      char* p = colDataGetData(pSrcCol, rowIndex);
×
386

387
      saveColData(pRowVal->pRowVal, i, p, reset ? true : isNull);
×
388
    } else {
389
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
390
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
391
      QUERY_CHECK_CODE(code, lino, _end);
×
392
    }
393
  }
394

395
_end:
×
396
  if (code != TSDB_CODE_SUCCESS) {
×
397
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
398
  }
399
  return code;
×
400
}
401

402
static int32_t fillResultImpl(SFillInfo* pFillInfo, SSDataBlock* pBlock, int32_t outputRows) {
×
403
  pFillInfo->numOfCurrent = 0;
×
404
  int32_t          code = TSDB_CODE_SUCCESS;
×
405
  int32_t          lino = 0;
×
406
  SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
×
407

408
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
×
409
  bool    ascFill = FILL_IS_ASC_FILL(pFillInfo);
×
410

411
  while (pFillInfo->numOfCurrent < outputRows) {
×
412
    int64_t ts = ((int64_t*)pTsCol->pData)[pFillInfo->index];
×
413

414
    // set the next value for interpolation
415
    if (pFillInfo->currentKey < ts && ascFill) {
×
416
      SRowVal* pRVal = pFillInfo->type == TSDB_FILL_NEXT ? &pFillInfo->next : &pFillInfo->prev;
×
417
      code = copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pRVal, false);
×
418
      QUERY_CHECK_CODE(code, lino, _end);
×
419
    } else if (pFillInfo->currentKey > ts && !ascFill) {
×
420
      SRowVal* pRVal = pFillInfo->type == TSDB_FILL_NEXT ? &pFillInfo->prev : &pFillInfo->next;
×
421
      code = copyCurrentRowIntoBuf(pFillInfo, pFillInfo->index, pRVal, false);
×
422
      QUERY_CHECK_CODE(code, lino, _end);
×
423
    }
424

425
    if (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) &&
×
426
        pFillInfo->numOfCurrent < outputRows) {
×
427
      // fill the gap between two input rows
428
      while (((pFillInfo->currentKey < ts && ascFill) || (pFillInfo->currentKey > ts && !ascFill)) &&
×
429
             pFillInfo->numOfCurrent < outputRows) {
×
430
        doFillOneRow(pFillInfo, pBlock, pFillInfo->pSrcBlock, ts, false);
×
431
      }
432

433
      // output buffer is full, abort
434
      if (pFillInfo->numOfCurrent == outputRows) {
×
435
        pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
×
436
        goto _end;
×
437
      }
438
    } else {
439
      QUERY_CHECK_CONDITION((pFillInfo->currentKey == ts), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
440
      int32_t index = pBlock->info.rows;
×
441

442
      int32_t nextRowIndex = pFillInfo->index + 1;
×
443
      if (pFillInfo->type == TSDB_FILL_NEXT) {
×
444
        if ((pFillInfo->index + 1) < pFillInfo->numOfRows) {
×
445
          code = copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, false);
×
446
          QUERY_CHECK_CODE(code, lino, _end);
×
447
        } else {
448
          // reset to null after last row
449
          code = copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, true);
×
450
          QUERY_CHECK_CODE(code, lino, _end);
×
451
        }
452
      }
453
      if (pFillInfo->type == TSDB_FILL_PREV) {
×
454
        if (nextRowIndex + 1 >= pFillInfo->numOfRows && !FILL_IS_ASC_FILL(pFillInfo)) {
×
455
          code = copyCurrentRowIntoBuf(pFillInfo, nextRowIndex, &pFillInfo->next, true);
×
456
          QUERY_CHECK_CODE(code, lino, _end);
×
457
        }
458
      }
459

460
      // copy rows to dst buffer
461
      for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
×
462
        SFillColInfo* pCol = &pFillInfo->pFillCol[i];
×
463

464
        int32_t dstSlotId = GET_DEST_SLOT_ID(pCol);
×
465

466
        SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, dstSlotId);
×
467
        SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, dstSlotId);
×
468

469
        char* src = colDataGetData(pSrc, pFillInfo->index);
×
470
        if (!colDataIsNull_s(pSrc, pFillInfo->index)) {
×
471
          code = colDataSetVal(pDst, index, src, false);
×
472
          QUERY_CHECK_CODE(code, lino, _end);
×
473
          SRowVal* pRVal = &pFillInfo->prev;
×
474
          saveColData(pRVal->pRowVal, i, src, false);
×
475
          if (pFillInfo->srcTsSlotId == dstSlotId) {
×
476
            pRVal->key = *(int64_t*)src;
×
477
          }
478
        } else {  // the value is null
479
          if (pDst->info.type == TSDB_DATA_TYPE_TIMESTAMP) {
×
480
            code = colDataSetVal(pDst, index, (const char*)&pFillInfo->currentKey, false);
×
481
            QUERY_CHECK_CODE(code, lino, _end);
×
482
          } else {  // i > 0 and data is null , do interpolation
483
            if (pFillInfo->type == TSDB_FILL_PREV) {
×
484
              SArray*     p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->prev.pRowVal : pFillInfo->next.pRowVal;
×
485
              SGroupKeys* pKey = taosArrayGet(p, i);
×
486
              QUERY_CHECK_NULL(pKey, code, lino, _end, terrno);
×
487
              code = doSetVal(pDst, index, pKey);
×
488
              QUERY_CHECK_CODE(code, lino, _end);
×
489
            } else if (pFillInfo->type == TSDB_FILL_LINEAR) {
×
490
              bool isNull = colDataIsNull_s(pSrc, pFillInfo->index);
×
491
              code = colDataSetVal(pDst, index, src, isNull);
×
492
              QUERY_CHECK_CODE(code, lino, _end);
×
493

494
              SArray* p = pFillInfo->prev.pRowVal;
×
495
              saveColData(p, i, src, isNull);  // todo:
×
496
            } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
×
497
              colDataSetNULL(pDst, index);
×
498
            } else if (pFillInfo->type == TSDB_FILL_NEXT) {
×
499
              SArray*     p = FILL_IS_ASC_FILL(pFillInfo) ? pFillInfo->next.pRowVal : pFillInfo->prev.pRowVal;
×
500
              SGroupKeys* pKey = taosArrayGet(p, i);
×
501
              QUERY_CHECK_NULL(pKey, code, lino, _end, terrno);
×
502
              code = doSetVal(pDst, index, pKey);
×
503
              QUERY_CHECK_CODE(code, lino, _end);
×
504
            } else {
505
              SVariant* pVar = &pFillInfo->pFillCol[i].fillVal;
×
506
              code = doSetUserSpecifiedValue(pDst, pVar, index, pFillInfo->currentKey);
×
507
              QUERY_CHECK_CODE(code, lino, _end);
×
508
            }
509
          }
510
        }
511
      }
512

513
      // set the tag value for final result
514
      SInterval* pInterval = &pFillInfo->interval;
×
515
      pFillInfo->currentKey = taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit,
×
516
                                          pInterval->precision, NULL);
×
517

518
      pBlock->info.rows += 1;
×
519
      pFillInfo->index += 1;
×
520
      pFillInfo->numOfCurrent += 1;
×
521
    }
522

523
    if (pFillInfo->index >= pFillInfo->numOfRows || pFillInfo->numOfCurrent >= outputRows) {
×
524
      pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
×
525
      goto _end;
×
526
    }
527
  }
528

529
_end:
×
530
  if (code != TSDB_CODE_SUCCESS) {
×
531
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
532
  }
533
  return code;
×
534
}
535

536
static void saveColData(SArray* rowBuf, int32_t columnIndex, const char* src, bool isNull) {
11,404,358✔
537
  SGroupKeys* pKey = taosArrayGet(rowBuf, columnIndex);
11,404,358✔
538
  if (isNull) {
11,356,963✔
539
    pKey->isNull = true;
1,476,890✔
540
  } else {
541
    if (IS_VAR_DATA_TYPE(pKey->type)) {
9,880,073!
542
      int32_t bytes = calcStrBytesByType(pKey->type, (char*)src);
2,137,457✔
543
      memcpy(pKey->pData, src, bytes);
2,148,658✔
544
    } else {
545
      memcpy(pKey->pData, src, pKey->bytes);
7,742,616✔
546
    }
547
    pKey->isNull = false;
9,891,274✔
548
  }
549
}
11,368,164✔
550

551
static int32_t appendFilledResult(SFillInfo* pFillInfo, SSDataBlock* pBlock, int64_t resultCapacity) {
×
552
  int32_t code = TSDB_CODE_SUCCESS;
×
553
  int32_t lino = 0;
×
554
  /*
555
   * These data are generated according to fill strategy, since the current timestamp is out of the time window of
556
   * real result set. Note that we need to keep the direct previous result rows, to generated the filled data.
557
   */
558
  pFillInfo->numOfCurrent = 0;
×
559
  while (pFillInfo->numOfCurrent < resultCapacity) {
×
560
    doFillOneRow(pFillInfo, pBlock, pFillInfo->pSrcBlock, pFillInfo->start, true);
×
561
  }
562

563
  pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
×
564

565
  QUERY_CHECK_CONDITION((pFillInfo->numOfCurrent == resultCapacity), code, lino, _end,
×
566
                        TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
567

568
_end:
×
569
  if (code != TSDB_CODE_SUCCESS) {
×
570
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
571
  }
572
  return code;
×
573
}
574

575
static int32_t taosNumOfRemainRows(SFillInfo* pFillInfo) {
21,466✔
576
  if (pFillInfo->numOfRows == 0 || (pFillInfo->numOfRows > 0 && pFillInfo->index >= pFillInfo->numOfRows)) {
21,466!
577
    return 0;
19,498✔
578
  }
579

580
  return pFillInfo->numOfRows - pFillInfo->index;
1,968✔
581
}
582

583
int32_t taosCreateFillInfo(TSKEY skey, int32_t numOfFillCols, int32_t numOfNotFillCols, int32_t fillNullCols,
3,107✔
584
                           int32_t capacity, SInterval* pInterval, int32_t fillType, struct SFillColInfo* pCol,
585
                           int32_t primaryTsSlotId, int32_t order, const char* id, SExecTaskInfo* pTaskInfo,
586
                           SFillInfo** ppFillInfo) {
587
  int32_t code = TSDB_CODE_SUCCESS;
3,107✔
588
  int32_t lino = 0;
3,107✔
589
  if (fillType == TSDB_FILL_NONE) {
3,107!
590
    (*ppFillInfo) = NULL;
×
591
    return code;
×
592
  }
593

594
  SFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SFillInfo));
3,107!
595
  QUERY_CHECK_NULL(pFillInfo, code, lino, _end, terrno);
3,109!
596

597
  pFillInfo->order = order;
3,109✔
598
  pFillInfo->srcTsSlotId = primaryTsSlotId;
3,109✔
599

600
  for (int32_t i = 0; i < numOfNotFillCols; ++i) {
4,537!
601
    SFillColInfo* p = &pCol[i + numOfFillCols];
4,537✔
602
    int32_t       srcSlotId = GET_DEST_SLOT_ID(p);
4,537✔
603
    if (srcSlotId == primaryTsSlotId) {
4,537✔
604
      pFillInfo->tsSlotId = i + numOfFillCols;
3,109✔
605
      break;
3,109✔
606
    }
607
  }
608

609
  taosResetFillInfo(pFillInfo, skey);
3,109✔
610

611
  pFillInfo->type = fillType;
3,109✔
612
  pFillInfo->pFillCol = pCol;
3,109✔
613
  pFillInfo->numOfCols = numOfFillCols + numOfNotFillCols + fillNullCols;
3,109✔
614
  pFillInfo->alloc = capacity;
3,109✔
615
  pFillInfo->id = id;
3,109✔
616
  pFillInfo->interval = *pInterval;
3,109✔
617

618
  pFillInfo->next.pRowVal = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
3,109✔
619
  QUERY_CHECK_NULL(pFillInfo->next.pRowVal, code, lino, _end, terrno);
3,107!
620

621
  pFillInfo->next.pNullValueFlag = taosArrayInit(pFillInfo->numOfCols, sizeof(bool));
3,107✔
622
  QUERY_CHECK_NULL(pFillInfo->next.pNullValueFlag, code, lino, _end, terrno);
3,109!
623

624
  pFillInfo->prev.pRowVal = taosArrayInit(pFillInfo->numOfCols, sizeof(SGroupKeys));
3,109✔
625
  QUERY_CHECK_NULL(pFillInfo->prev.pRowVal, code, lino, _end, terrno);
3,108!
626

627
  pFillInfo->prev.pNullValueFlag = taosArrayInit(pFillInfo->numOfCols, sizeof(bool));
3,108✔
628
  QUERY_CHECK_NULL(pFillInfo->prev.pNullValueFlag, code, lino, _end, terrno);
3,109!
629

630
  code = initBeforeAfterDataBuf(pFillInfo);
3,109✔
631
  QUERY_CHECK_CODE(code, lino, _end);
3,108!
632

633
  pFillInfo->pTaskInfo = pTaskInfo;
3,108✔
634

635
_end:
3,108✔
636
  if (code != TSDB_CODE_SUCCESS) {
3,108!
637
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
638
    pFillInfo = taosDestroyFillInfo(pFillInfo);
×
639
  }
640
  (*ppFillInfo) = pFillInfo;
3,108✔
641
  return code;
3,108✔
642
}
643

644
void taosResetFillInfo(SFillInfo* pFillInfo, TSKEY startTimestamp) {
6,366✔
645
  pFillInfo->start = startTimestamp;
6,366✔
646
  pFillInfo->currentKey = startTimestamp;
6,366✔
647
  pFillInfo->end = startTimestamp;
6,366✔
648
  pFillInfo->index = -1;
6,366✔
649
  pFillInfo->numOfRows = 0;
6,366✔
650
  pFillInfo->numOfCurrent = 0;
6,366✔
651
  pFillInfo->numOfTotal = 0;
6,366✔
652
}
6,366✔
653

654
void* taosDestroyFillInfo(SFillInfo* pFillInfo) {
3,109✔
655
  if (pFillInfo == NULL) {
3,109!
656
    return NULL;
×
657
  }
658
  for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->prev.pRowVal); ++i) {
20,527✔
659
    SGroupKeys* pKey = taosArrayGet(pFillInfo->prev.pRowVal, i);
17,419✔
660
    if (pKey) taosMemoryFree(pKey->pData);
17,419!
661
  }
662
  taosArrayDestroy(pFillInfo->prev.pNullValueFlag);
3,108✔
663
  taosArrayDestroy(pFillInfo->prev.pRowVal);
3,109✔
664
  for (int32_t i = 0; i < taosArrayGetSize(pFillInfo->next.pRowVal); ++i) {
20,528✔
665
    SGroupKeys* pKey = taosArrayGet(pFillInfo->next.pRowVal, i);
17,419✔
666
    if (pKey) taosMemoryFree(pKey->pData);
17,418!
667
  }
668
  taosArrayDestroy(pFillInfo->next.pNullValueFlag);
3,108✔
669
  taosArrayDestroy(pFillInfo->next.pRowVal);
3,109✔
670

671
  //  for (int32_t i = 0; i < pFillInfo->numOfTags; ++i) {
672
  //    taosMemoryFreeClear(pFillInfo->pTags[i].tagVal);
673
  //  }
674

675
  // free pFillCol
676
  if (pFillInfo->pFillCol) {
3,109!
677
    for (int32_t i = 0; i < pFillInfo->numOfCols; i++) {
20,528✔
678
      SFillColInfo* pCol = &pFillInfo->pFillCol[i];
17,419✔
679
      if (!pCol->notFillCol) {
17,419✔
680
        if (pCol->fillVal.nType == TSDB_DATA_TYPE_VARBINARY || pCol->fillVal.nType == TSDB_DATA_TYPE_VARCHAR ||
12,745✔
681
            pCol->fillVal.nType == TSDB_DATA_TYPE_NCHAR || pCol->fillVal.nType == TSDB_DATA_TYPE_JSON) {
12,687!
682
          if (pCol->fillVal.pz) {
107!
683
            taosMemoryFree(pCol->fillVal.pz);
107!
684
            pCol->fillVal.pz = NULL;
107✔
685
          }
686
        }
687
      }
688
    }
689
  }
690

691
  taosMemoryFreeClear(pFillInfo->pTags);
3,109!
692
  taosMemoryFreeClear(pFillInfo->pFillCol);
3,109!
693
  taosArrayDestroy(pFillInfo->pColFillProgress);
3,109✔
694
  tdListFreeP(pFillInfo->pFillSavedBlockList, destroyFillBlock);
3,109✔
695
  taosMemoryFreeClear(pFillInfo);
3,109!
696
  return NULL;
3,109✔
697
}
698

699
void taosFillSetStartInfo(SFillInfo* pFillInfo, int32_t numOfRows, TSKEY endKey) {
14,672✔
700
  if (pFillInfo->type == TSDB_FILL_NONE) {
14,672!
701
    return;
×
702
  }
703

704
  // the endKey is now the aligned time window value. truncate time window isn't correct.
705
  pFillInfo->end = endKey;
14,672✔
706
  pFillInfo->index = 0;
14,672✔
707
  pFillInfo->numOfRows = numOfRows;
14,672✔
708
}
709

710
void taosFillSetInputDataBlock(SFillInfo* pFillInfo, const SSDataBlock* pInput) {
7,524✔
711
  pFillInfo->pSrcBlock = (SSDataBlock*)pInput;
7,524✔
712
}
7,524✔
713

714
void taosFillUpdateStartTimestampInfo(SFillInfo* pFillInfo, int64_t ts) {
5,230✔
715
  pFillInfo->start = ts;
5,230✔
716
  pFillInfo->currentKey = ts;
5,230✔
717
}
5,230✔
718

719
bool taosFillNotStarted(const SFillInfo* pFillInfo) { return pFillInfo->start == pFillInfo->currentKey; }
3,089✔
720

721
bool taosFillHasMoreResults(SFillInfo* pFillInfo) {
13,282✔
722
  int32_t remain = taosNumOfRemainRows(pFillInfo);
13,282✔
723
  if (remain > 0) {
13,282✔
724
    return true;
1,968✔
725
  }
726

727
  bool ascFill = FILL_IS_ASC_FILL(pFillInfo);
11,314✔
728
  if (pFillInfo->numOfTotal > 0 &&
11,314✔
729
      (((pFillInfo->end > pFillInfo->start) && ascFill) || (pFillInfo->end < pFillInfo->start && !ascFill))) {
8,205!
730
    return getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, 4096) > 0;
8,184✔
731
  }
732

733
  return false;
3,130✔
734
}
735

736
int64_t getNumOfResultsAfterFillGap(SFillInfo* pFillInfo, TSKEY ekey, int32_t maxNumOfRows) {
8,184✔
737
  int32_t numOfRows = taosNumOfRemainRows(pFillInfo);
8,184✔
738

739
  TSKEY ekey1 = ekey;
8,184✔
740

741
  int64_t numOfRes = -1;
8,184✔
742
  if (numOfRows > 0) {  // still fill gap within current data block, not generating data after the result set.
8,184!
743
    SColumnInfoData* pCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
×
744
    int64_t*         tsList = (int64_t*)pCol->pData;
×
745
    TSKEY            lastKey = tsList[pFillInfo->numOfRows - 1];
×
746
    numOfRes =
×
747
        taosTimeCountIntervalForFill(lastKey, pFillInfo->currentKey, pFillInfo->interval.sliding,
×
748
                                     pFillInfo->interval.slidingUnit, pFillInfo->interval.precision, pFillInfo->order);
×
749
  } else {  // reach the end of data
750
    if ((ekey1 < pFillInfo->currentKey && FILL_IS_ASC_FILL(pFillInfo)) ||
8,184✔
751
        (ekey1 > pFillInfo->currentKey && !FILL_IS_ASC_FILL(pFillInfo))) {
3,432✔
752
      return 0;
5,177✔
753
    }
754

755
    numOfRes =
3,007✔
756
        taosTimeCountIntervalForFill(ekey1, pFillInfo->currentKey, pFillInfo->interval.sliding,
3,007✔
757
                                     pFillInfo->interval.slidingUnit, pFillInfo->interval.precision, pFillInfo->order);
3,007✔
758
  }
759

760
  return (numOfRes > maxNumOfRows) ? maxNumOfRows : numOfRes;
3,007✔
761
}
762

763
void taosGetLinearInterpolationVal(SPoint* point, int32_t outputType, SPoint* point1, SPoint* point2, int32_t inputType,
592,345✔
764
                                   STypeMod inputTypeMod) {
765
  double v1 = -1, v2 = -1;
592,345✔
766
  GET_TYPED_DATA(v1, double, inputType, point1->val, inputTypeMod);
592,345!
767
  GET_TYPED_DATA(v2, double, inputType, point2->val, inputTypeMod);
592,553!
768

769
  double r = 0;
592,361✔
770
  if (!IS_BOOLEAN_TYPE(inputType)) {
592,361✔
771
    r = DO_INTERPOLATION(v1, v2, point1->key, point2->key, point->key);
592,331✔
772
  } else {
773
    r = (v1 < 1 || v2 < 1) ? 0 : 1;
30✔
774
  }
775
  SET_TYPED_DATA(point->val, outputType, r);
592,361!
776
}
592,361✔
777

778
int32_t taosFillResultDataBlock(SFillInfo* pFillInfo, SSDataBlock* p, int32_t capacity) {
×
779
  int32_t code = TSDB_CODE_SUCCESS;
×
780
  int32_t lino = 0;
×
781
  int32_t remain = taosNumOfRemainRows(pFillInfo);
×
782

783
  int64_t numOfRes = getNumOfResultsAfterFillGap(pFillInfo, pFillInfo->end, capacity);
×
784
  QUERY_CHECK_CONDITION((numOfRes <= capacity), code, lino, _end, TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
×
785

786
  // no data existed for fill operation now, append result according to the fill strategy
787
  if (remain == 0) {
×
788
    code = appendFilledResult(pFillInfo, p, numOfRes);
×
789
    QUERY_CHECK_CODE(code, lino, _end);
×
790
  } else {
791
    code = fillResultImpl(pFillInfo, p, (int32_t)numOfRes);
×
792
    QUERY_CHECK_CODE(code, lino, _end);
×
793
    QUERY_CHECK_CONDITION((numOfRes == pFillInfo->numOfCurrent), code, lino, _end,
×
794
                          TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR);
795
  }
796

797
  qDebug("fill:%p, generated fill result, src block:%d, index:%d, brange:%" PRId64 "-%" PRId64 ", currentKey:%" PRId64
×
798
         ", current : % d, total : % d, %s",
799
         pFillInfo, pFillInfo->numOfRows, pFillInfo->index, pFillInfo->start, pFillInfo->end, pFillInfo->currentKey,
800
         pFillInfo->numOfCurrent, pFillInfo->numOfTotal, pFillInfo->id);
801
_end:
×
802
  if (code != TSDB_CODE_SUCCESS) {
×
803
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
804
  }
805
  return code;
×
806
}
807

808
int64_t getFillInfoStart(struct SFillInfo* pFillInfo) { return pFillInfo->start; }
3,258✔
809

810
SFillColInfo* createFillColInfo(SExprInfo* pExpr, int32_t numOfFillExpr, SExprInfo* pNotFillExpr,
16,017✔
811
                                int32_t numOfNoFillExpr, SExprInfo* pFillNullExpr, int32_t numOfFillNullExpr,
812
                                const struct SNodeListNode* pValNode) {
813
  int32_t       code = TSDB_CODE_SUCCESS;
16,017✔
814
  int32_t       lino = 0;
16,017✔
815
  SFillColInfo* pFillCol = taosMemoryCalloc(numOfFillExpr + numOfNoFillExpr + numOfFillNullExpr, sizeof(SFillColInfo));
16,017!
816
  if (pFillCol == NULL) {
16,018!
817
    return NULL;
×
818
  }
819

820
  size_t len = (pValNode != NULL) ? LIST_LENGTH(pValNode->pNodeList) : 0;
16,018!
821
  for (int32_t i = 0; i < numOfFillExpr; ++i) {
86,255✔
822
    SExprInfo* pExprInfo = &pExpr[i];
70,239✔
823
    pFillCol[i].pExpr = pExprInfo;
70,239✔
824
    pFillCol[i].notFillCol = false;
70,239✔
825

826
    // todo refactor
827
    if (len > 0) {
70,239✔
828
      // if the user specified value is less than the column, alway use the last one as the fill value
829
      int32_t index = (i >= len) ? (len - 1) : i;
27,922✔
830

831
      SValueNode* pv = (SValueNode*)nodesListGetNode(pValNode->pNodeList, index);
27,922✔
832
      QUERY_CHECK_NULL(pv, code, lino, _end, terrno);
27,919!
833
      code = nodesValueNodeToVariant(pv, &pFillCol[i].fillVal);
27,919✔
834
      QUERY_CHECK_CODE(code, lino, _end);
27,920!
835
    }
836
    if (TSDB_CODE_SUCCESS != code) {
70,237!
837
      goto _end;
×
838
    }
839
  }
840
  pFillCol->numOfFillExpr = numOfFillExpr;
16,016✔
841

842
  for (int32_t i = 0; i < numOfNoFillExpr; ++i) {
20,689✔
843
    SExprInfo* pExprInfo = &pNotFillExpr[i];
4,673✔
844
    pFillCol[i + numOfFillExpr].pExpr = pExprInfo;
4,673✔
845
    pFillCol[i + numOfFillExpr].notFillCol = true;
4,673✔
846
  }
847

848
  for (int32_t i = 0; i < numOfFillNullExpr; ++i) {
16,017✔
849
    SExprInfo* pExprInfo = &pFillNullExpr[i];
1✔
850
    pFillCol[i + numOfFillExpr + numOfNoFillExpr].pExpr = pExprInfo;
1✔
851
    pFillCol[i + numOfFillExpr + numOfNoFillExpr].notFillCol = true;
1✔
852
    pFillCol[i + numOfFillExpr + numOfNoFillExpr].fillNull = true;
1✔
853
  }
854

855
  return pFillCol;
16,016✔
856

857
_end:
×
858
  for (int32_t i = 0; i < numOfFillExpr; ++i) {
×
859
    taosVariantDestroy(&pFillCol[i].fillVal);
×
860
  }
861
  taosMemoryFree(pFillCol);
×
862
  if (code != TSDB_CODE_SUCCESS) {
×
863
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
864
  }
865
  return NULL;
×
866
}
867

868
static bool fillShouldPause(SFillInfo* pFillInfo, const SSDataBlock* pDstBlock) {
13,057,575✔
869
  if (pFillInfo->pSrcBlock && pFillInfo->index >= pFillInfo->pSrcBlock->info.rows) return true;
13,057,575✔
870
  if (pDstBlock->info.rows > 0) return true;
13,050,073✔
871
  if (pFillInfo->numOfRows == 0) return true;
13,039,867✔
872
  if (pFillInfo->order == TSDB_ORDER_ASC && pFillInfo->currentKey > pFillInfo->end) return true;
13,037,927!
873
  if (pFillInfo->order == TSDB_ORDER_DESC && pFillInfo->currentKey < pFillInfo->end) return true;
13,037,927!
874
  if (pDstBlock->info.rows > 0) return true;
13,037,927!
875
  return false;
13,037,927✔
876
}
877

878
static TSKEY getBlockCurTs(const struct SFillInfo* pFillInfo, const SSDataBlock* pBlock, int32_t rowIdx) {
15,845,238✔
879
  if (pBlock) {
15,845,238!
880
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pFillInfo->srcTsSlotId);
15,845,531✔
881
    return ((TSKEY*)pTsCol->pData)[rowIdx];
15,820,729✔
882
  }
883
  return -1;
×
884
}
885

886
static int32_t copyCurrentRowIntoBuf2(SFillInfo* pFillInfo, int32_t rowIndex, SRowVal* pRowVal, TSKEY blockCurTs) {
2,821,530✔
887
  int32_t          code = TSDB_CODE_SUCCESS;
2,821,530✔
888
  int32_t          lino = 0;
2,821,530✔
889
  bool             ascFill = FILL_IS_ASC_FILL(pFillInfo);
2,821,530✔
890
  int32_t          fillType = pFillInfo->type;
2,821,530✔
891
  bool             fillNext = fillType == TSDB_FILL_NEXT, fillPrev = fillType == TSDB_FILL_PREV;
2,821,530✔
892
  bool             ascNext = ascFill && fillNext, descPrev = !ascFill && fillPrev;
2,821,530✔
893
  SColumnInfoData* pTsCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, pFillInfo->srcTsSlotId);
2,821,530✔
894
  QUERY_CHECK_NULL(pTsCol, code, lino, _end, terrno);
2,820,122!
895
  pRowVal->key = ((int64_t*)pTsCol->pData)[rowIndex];
2,820,627✔
896

897
  for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
16,561,607✔
898
    int32_t type = pFillInfo->pFillCol[i].pExpr->pExpr->nodeType;
13,752,429✔
899
    if (type == QUERY_NODE_COLUMN || type == QUERY_NODE_OPERATOR || type == QUERY_NODE_FUNCTION) {
25,119,389!
900
      int32_t srcSlotId = GET_DEST_SLOT_ID(&pFillInfo->pFillCol[i]);
13,752,429✔
901

902
      if (blockCurTs != pFillInfo->currentKey) {
13,752,429✔
903
        if (!pFillInfo->pFillCol[i].notFillCol) {
2,720,935✔
904
          if (!ascNext && !descPrev) continue;
2,107,228✔
905
        }
906
        if (srcSlotId == pFillInfo->srcTsSlotId && pFillInfo->type == TSDB_FILL_LINEAR) {
763,189✔
907
          continue;
128,079✔
908
        }
909
      }
910

911
      SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
11,666,604✔
912
      QUERY_CHECK_NULL(pSrcCol, code, lino, _end, terrno);
11,643,295!
913

914
      bool  isNull = colDataIsNull_s(pSrcCol, rowIndex);
11,651,455✔
915
      char* p = colDataGetData(pSrcCol, rowIndex);
11,651,455!
916
      // only save null value flag for fill prev/next and need fill cols
917
      if (!pFillInfo->pFillCol[i].notFillCol && pRowVal->pNullValueFlag && (fillNext || fillPrev)) {
11,651,455!
918
        bool* pNullValueFlag = taosArrayGet(pRowVal->pNullValueFlag, i);
1,887,875✔
919
        *pNullValueFlag = isNull;
1,887,546✔
920
        bool ascPrevOrDescNext = (fillPrev && ascFill) || (fillNext && !ascFill);
1,887,546✔
921
        // For ascPrev and descNext, we do not save NULL values into prev/next pRowVal, cause the last prev or last next
922
        // will be used for later filling, should not use NULL to override the last value
923
        if (isNull && ascPrevOrDescNext) continue;
1,887,546✔
924
      }
925

926
      saveColData(pRowVal->pRowVal, i, p, isNull);
11,364,543✔
927
    } else {
928
      code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
929
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
930
      QUERY_CHECK_CODE(code, lino, _end);
1,612!
931
    }
932
  }
933

934
_end:
2,809,178✔
935
  if (code != TSDB_CODE_SUCCESS) {
2,809,178!
936
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
937
  }
938
  return code;
2,832,540✔
939
}
940

941
static int32_t fillTrySaveRow(struct SFillInfo* pFillInfo, const SSDataBlock* pBlock, int32_t rowIdx) {
2,836,314✔
942
  if (!pBlock) return 0;
2,836,314!
943
  int32_t  code = 0;
2,836,314✔
944
  bool     ascFill = FILL_IS_ASC_FILL(pFillInfo);
2,836,314✔
945
  TSKEY    blockCurKey = getBlockCurTs(pFillInfo, pBlock, rowIdx);
2,836,314✔
946
  int32_t  fillType = pFillInfo->type;
2,820,934✔
947
  SRowVal* pFillRow = ascFill ? (fillType == TSDB_FILL_NEXT ? &pFillInfo->next : &pFillInfo->prev)
2,674,485✔
948
                              : (fillType == TSDB_FILL_PREV ? &pFillInfo->next : &pFillInfo->prev);
5,495,419✔
949
  return copyCurrentRowIntoBuf2(pFillInfo, rowIdx, pFillRow, blockCurKey);
2,820,934✔
950
}
951

952
static int32_t tIsColFallBehind(struct SFillInfo* pFillInfo, int32_t colIdx) {
95,902,191✔
953
  SColumnFillProgress* pColProgress = taosArrayGet(pFillInfo->pColFillProgress, colIdx);
95,902,191✔
954
  if (!pColProgress) {
95,807,117!
955
    qError("failed to get col progress for col %d, size: %lu", colIdx, taosArrayGetSize(pFillInfo->pColFillProgress));
×
956
    return TSDB_CODE_INTERNAL_ERROR;
×
957
  }
958
  if (pColProgress->pBlockNode) {
95,812,419✔
959
    return true;
124,030✔
960
  }
961
  return false;
95,688,389✔
962
}
963

964
static bool tFillTrySaveColProgress(struct SFillInfo* pFillInfo, int32_t colIdx, SListNode* pBlockNode,
5,780,081✔
965
                                    int32_t rowIdx) {
966
  bool ascNext = pFillInfo->type == TSDB_FILL_NEXT && pFillInfo->order == TSDB_ORDER_ASC;
5,780,081✔
967
  bool descPrev = pFillInfo->type == TSDB_FILL_PREV && pFillInfo->order == TSDB_ORDER_DESC;
5,780,081✔
968
  if (ascNext || descPrev) {
5,780,081✔
969
    SColumnFillProgress* pProgress = taosArrayGet(pFillInfo->pColFillProgress, colIdx);
4,486,238✔
970
    if (!pProgress->pBlockNode) {
4,483,586✔
971
      pProgress->pBlockNode = pBlockNode;
123,971✔
972
      pProgress->rowIdx = rowIdx;
123,971✔
973
      SFillBlock*         pFillBlock = (SFillBlock*)pBlockNode->data;
123,971✔
974
      SBlockFillProgress* pFillProg = taosArrayGet(pFillBlock->pFillProgress, colIdx);
123,971✔
975
      pFillProg->rowIdx = rowIdx;
124,039✔
976
    }
977
    return true;
4,483,654✔
978
  }
979
  return false;
1,293,843✔
980
}
981

982
static bool doFillOneCol(SFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY ts, int32_t colIdx, int32_t rowIdx,
159,893,043✔
983
                         bool outOfBound) {
984
  int32_t code = 0;
159,893,043✔
985
  int32_t lino = 0;
159,893,043✔
986
  bool    saveProgress = false;
159,893,043✔
987
  // set the other values
988
  if (pFillInfo->type == TSDB_FILL_PREV) {
159,893,043✔
989
    SFillColInfo* pCol = &pFillInfo->pFillCol[colIdx];
26,518,543✔
990

991
    SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
26,518,543✔
992
    bool             filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfoData, rowIdx);
26,482,921✔
993
    if (!filled) {
26,528,796✔
994
      saveProgress =
22,759,536✔
995
          setNotFillColumn(pFillInfo, pDstColInfoData, rowIdx, colIdx) && pFillInfo->order == TSDB_ORDER_DESC;
22,758,718!
996
    }
997
  } else if (pFillInfo->type == TSDB_FILL_NEXT) {
133,374,500✔
998
    // todo  refactor: start from 0 not 1
999
    SFillColInfo*    pCol = &pFillInfo->pFillCol[colIdx];
27,262,923✔
1000
    SColumnInfoData* pDstColInfoData = taosArrayGet(pBlock->pDataBlock, GET_DEST_SLOT_ID(pCol));
27,262,923✔
1001
    bool             filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstColInfoData, rowIdx);
27,231,872✔
1002
    if (!filled) {
27,262,196✔
1003
      saveProgress = setNotFillColumn(pFillInfo, pDstColInfoData, rowIdx, colIdx) && pFillInfo->order == TSDB_ORDER_ASC;
23,631,115!
1004
    }
1005
  } else if (pFillInfo->type == TSDB_FILL_LINEAR) {
106,111,577✔
1006
    // TODO : linear interpolation supports NULL value
1007
    if (outOfBound) {
40,264,368✔
1008
      setNullCol(pBlock, pFillInfo, rowIdx, colIdx);
18,270,209✔
1009
    } else {
1010
      SFillColInfo* pCol = &pFillInfo->pFillCol[colIdx];
21,994,159✔
1011

1012
      int32_t          dstSlotId = GET_DEST_SLOT_ID(pCol);
21,994,159✔
1013
      SColumnInfoData* pDstCol = taosArrayGet(pBlock->pDataBlock, dstSlotId);
21,994,159✔
1014
      int16_t          type = pDstCol->info.type;
22,011,426✔
1015

1016
      if (pCol->notFillCol) {
22,011,426✔
1017
        bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDstCol, rowIdx);
5,357,913✔
1018
        if (!filled) {
5,382,155✔
1019
          (void)setNotFillColumn(pFillInfo, pDstCol, rowIdx, colIdx);
2,161,934✔
1020
        }
1021
      } else {
1022
        SRowVal*         pRVal = &pFillInfo->prev;
16,653,513✔
1023
        SGroupKeys*      pKey = taosArrayGet(pRVal->pRowVal, colIdx);
16,653,513✔
1024
        int32_t          srcSlotId = GET_DEST_SLOT_ID(pCol);
16,699,925✔
1025
        SColumnInfoData* pSrcCol = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, srcSlotId);
16,699,925✔
1026
        if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pKey->isNull ||
16,720,435!
1027
            colDataIsNull_s(pSrcCol, pFillInfo->index)) {
1,600,714✔
1028
          colDataSetNULL(pDstCol, rowIdx);
16,190,262✔
1029
        } else {
1030
          SGroupKeys* pKey1 = taosArrayGet(pRVal->pRowVal, pFillInfo->tsSlotId);
530,173✔
1031

1032
          int64_t prevTs = *(int64_t*)pKey1->pData;
529,951✔
1033
          char*   data = colDataGetData(pSrcCol, pFillInfo->index);
529,951!
1034
          SPoint  point1, point2, point;
1035

1036
          point1 = (SPoint){.key = prevTs, .val = pKey->pData};
529,951✔
1037
          point2 = (SPoint){.key = ts, .val = data};
529,951✔
1038

1039
          int64_t out = 0;
529,951✔
1040
          point = (SPoint){.key = pFillInfo->currentKey, .val = &out};
529,951✔
1041
          taosGetLinearInterpolationVal(&point, type, &point1, &point2, type,
530,174✔
1042
                                        typeGetTypeModFromColInfo(&pDstCol->info));
529,951✔
1043

1044
          code = colDataSetVal(pDstCol, rowIdx, (const char*)&out, false);
529,731✔
1045
          QUERY_CHECK_CODE(code, lino, _end);
529,785!
1046
        }
1047
      }
1048
    }
1049
  } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {  // fill with NULL
65,847,209✔
1050
    setNullCol(pBlock, pFillInfo, rowIdx, colIdx);
62,936,915✔
1051
  } else {  // fill with user specified value for each column
1052
    SFillColInfo* pCol = &pFillInfo->pFillCol[colIdx];
2,910,294✔
1053

1054
    int32_t          slotId = GET_DEST_SLOT_ID(pCol);
2,910,294✔
1055
    SColumnInfoData* pDst = taosArrayGet(pBlock->pDataBlock, slotId);
2,910,294✔
1056

1057
    if (pCol->notFillCol) {
2,910,294✔
1058
      bool filled = fillIfWindowPseudoColumn(pFillInfo, pCol, pDst, rowIdx);
848,779✔
1059
      if (!filled) {
848,779✔
1060
        (void)setNotFillColumn(pFillInfo, pDst, rowIdx, colIdx);
254,799✔
1061
      }
1062
    } else {
1063
      SVariant* pVar = &pFillInfo->pFillCol[colIdx].fillVal;
2,061,515✔
1064
      code = doSetUserSpecifiedValue(pDst, pVar, rowIdx, pFillInfo->currentKey);
2,061,515✔
1065
      QUERY_CHECK_CODE(code, lino, _end);
2,061,515!
1066
    }
1067
  }
1068
_end:
2,061,515✔
1069
  if (code != TSDB_CODE_SUCCESS) {
159,988,403!
1070
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1071
    T_LONG_JMP(pFillInfo->pTaskInfo->env, code);
×
1072
  }
1073
  return saveProgress;
159,988,403✔
1074
}
1075

1076
static int32_t tFillFromHeadForCol(struct SFillInfo* pFillInfo, TSKEY ts, int32_t colIdx, bool outOfBound) {
124,044✔
1077
  int32_t code = 0;
124,044✔
1078
  // Check the progress of this col, start fill from the start block
1079
  // Here we will always fill till the last row of last block in list. Cause this is always the first time we meet
1080
  // non-null value after fill till current key, we should update it's progress, set no lag for this col
1081
  SColumnFillProgress* pColProgress = taosArrayGet(pFillInfo->pColFillProgress, colIdx);
124,044✔
1082
  if (!pColProgress) {
124,032✔
1083
    qError("failed to get col progress for col %d, size: %lu", colIdx, taosArrayGetSize(pFillInfo->pColFillProgress));
2!
1084
    return TSDB_CODE_INTERNAL_ERROR;
×
1085
  }
1086
  SListNode* pListNode = pColProgress->pBlockNode;
124,030✔
1087

1088
  while (pListNode) {
248,788✔
1089
    SFillBlock*                pFillBlock = (SFillBlock*)pListNode->data;
124,761✔
1090
    const SColumnFillProgress* pProgress = taosArrayGet(pFillInfo->pColFillProgress, colIdx);
124,761✔
1091
    for (int32_t rowIdx = pProgress->rowIdx; rowIdx < pFillBlock->pBlock->info.rows; ++rowIdx) {
4,698,949✔
1092
      doFillOneCol(pFillInfo, pFillBlock->pBlock, ts, colIdx, rowIdx, outOfBound);
4,574,009✔
1093
    }
1094
    SBlockFillProgress* pMyBlockProgress = taosArrayGet(pFillBlock->pFillProgress, colIdx);
124,940✔
1095
    pMyBlockProgress->rowIdx = pFillBlock->pBlock->info.rows;
124,792✔
1096
    bool allColFinished = true;
124,792✔
1097
    for (int32_t i = 0; i < taosArrayGetSize(pFillBlock->pFillProgress); ++i) {
280,294✔
1098
      SBlockFillProgress* pBProgress = taosArrayGet(pFillBlock->pFillProgress, i);
276,032✔
1099
      if (pBProgress->rowIdx < pFillBlock->pBlock->info.rows) {
276,015✔
1100
        allColFinished = false;
120,513✔
1101
        break;
120,513✔
1102
      }
1103
    }
1104
    pFillBlock->allColFinished = allColFinished;
124,758✔
1105
    pListNode = TD_DLIST_NODE_NEXT(pListNode);
124,758✔
1106
    // update progress
1107
    pColProgress->pBlockNode = pListNode;
124,758✔
1108
    pColProgress->rowIdx = 0;
124,758✔
1109
  }
1110
  return code;
124,027✔
1111
}
1112

1113
static void doFillOneRow2(SFillInfo* pFillInfo, SSDataBlock* pBlock, SSDataBlock* pSrcBlock, int64_t ts,
24,636,407✔
1114
                          bool outOfBound) {
1115
  int32_t code = TSDB_CODE_SUCCESS;
24,636,407✔
1116
  int32_t lino = 0;
24,636,407✔
1117
  int32_t step = GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order);
24,636,407✔
1118
  int32_t rowIdx = pBlock->info.rows;
24,636,407✔
1119

1120
  // set the primary timestamp column value
1121
  for (int32_t colIdx = 0; code == 0 && colIdx < pFillInfo->numOfCols; ++colIdx) {
178,775,666✔
1122
    if (outOfBound && tIsColFallBehind(pFillInfo, colIdx)) {
154,080,155✔
1123
      code = tFillFromHeadForCol(pFillInfo, ts, colIdx, true);
982✔
1124
      if (code != 0) goto _end;
982!
1125
    }
1126
    bool saveProgress = doFillOneCol(pFillInfo, pBlock, ts, colIdx, rowIdx, outOfBound);
154,039,740✔
1127
    // if this col meet a null value during fill the first time, save it's progress
1128
    if (saveProgress) {
154,137,546✔
1129
      SListNode* pFillBlockListNode = tdListGetTail(pFillInfo->pFillSavedBlockList);
4,094,510✔
1130
      (void)tFillTrySaveColProgress(pFillInfo, colIdx, pFillBlockListNode, rowIdx);
4,094,512✔
1131
    }
1132
  }
1133
  SInterval* pInterval = &pFillInfo->interval;
24,695,511✔
1134
  pFillInfo->currentKey =
24,640,266✔
1135
      taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * step, pInterval->slidingUnit, pInterval->precision, NULL);
24,695,511✔
1136
  pBlock->info.rows += 1;
24,640,266✔
1137
  pFillInfo->numOfCurrent++;
24,640,266✔
1138

1139
_end:
24,640,266✔
1140
  if (code != TSDB_CODE_SUCCESS) {
24,640,266!
1141
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1142
    T_LONG_JMP(pFillInfo->pTaskInfo->env, code);
×
1143
  }
1144
}
24,640,266✔
1145

1146
static void tryExtractReadyBlocks(struct SFillInfo* pFillInfo, SSDataBlock* pDstBlock, int32_t capacity) {
13,058,439✔
1147
  SListNode* pListNode = tdListGetHead(pFillInfo->pFillSavedBlockList);
13,058,439✔
1148
  bool       allFinished = true;
13,057,435✔
1149
  bool       noMoreBlocks = pFillInfo->numOfRows == 0;
13,057,435✔
1150
  if (pListNode) {
13,057,435✔
1151
    SFillBlock* pFillBlock = (SFillBlock*)pListNode->data;
13,052,695✔
1152
    if (!noMoreBlocks) {
13,052,695✔
1153
      if (pFillBlock->pBlock->info.rows < capacity) return;
13,042,964✔
1154
      for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
1,261,833✔
1155
        SColumnFillProgress* pProg = taosArrayGet(pFillInfo->pColFillProgress, colIdx);
1,260,137✔
1156
        if (pProg->pBlockNode == pListNode) {
1,260,137✔
1157
          allFinished = false;
594,192✔
1158
          break;
594,192✔
1159
        }
1160
      }
1161
    }
1162
    if (allFinished || noMoreBlocks) {
605,619!
1163
      TSWAP(pDstBlock->info.rows, pFillBlock->pBlock->info.rows);
11,427✔
1164
      TSWAP(pDstBlock->pDataBlock, pFillBlock->pBlock->pDataBlock);
11,427✔
1165
      tdListPopNode(pFillInfo->pFillSavedBlockList, pListNode);
11,427✔
1166
      destroyFillBlock(pListNode->data);
12,146✔
1167
      taosMemFreeClear(pListNode);
12,146!
1168
    }
1169
  }
1170
}
1171

1172
static SSDataBlock* createNewSavedBlock(struct SFillInfo* pFillInfo, SSDataBlock* pDstBlock, int32_t capacity) {
12,145✔
1173
  int32_t      code = 0;
12,145✔
1174
  SSDataBlock* pBlock = NULL;
12,145✔
1175
  code = createOneDataBlock(pDstBlock, false, &pBlock);
12,145✔
1176
  if (code != 0) return NULL;
12,146!
1177
  code = blockDataEnsureCapacity(pBlock, capacity);
12,146✔
1178
  if (code != 0) {
12,146!
1179
    blockDataDestroy(pBlock);
×
1180
    return NULL;
×
1181
  }
1182
  return pBlock;
12,146✔
1183
}
1184

1185
static int32_t trySaveNewBlock(struct SFillInfo* pFillInfo, SSDataBlock* pDstBlock, int32_t capacity,
12,145✔
1186
                               SFillBlock** ppFillBlock) {
1187
  int32_t      code = 0;
12,145✔
1188
  SSDataBlock* pBlock = createNewSavedBlock(pFillInfo, pDstBlock, capacity);
12,145✔
1189
  if (!pBlock) {
12,146!
1190
    code = terrno;
×
1191
    goto _end;
×
1192
  }
1193
  SArray* pProgress = taosArrayInit(pFillInfo->numOfCols, sizeof(SBlockFillProgress));
12,146✔
1194
  if (!pProgress) {
12,146!
1195
    code = terrno;
×
1196
    goto _end;
×
1197
  }
1198
  SBlockFillProgress prog = {INT32_MAX};
12,146✔
1199
  for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
80,636✔
1200
    if (NULL == taosArrayPush(pProgress, &prog)) {
68,490!
1201
      code = terrno;
×
1202
      goto _end;
×
1203
    }
1204
  }
1205
  *ppFillBlock = tFillSaveBlock(pFillInfo, pBlock, pProgress);
12,146✔
1206
  if (!*ppFillBlock) {
12,146!
1207
    code = terrno;
×
1208
    goto _end;
×
1209
  }
1210
  return 0;
12,146✔
1211
_end:
×
1212
  if (pBlock) blockDataDestroy(pBlock);
×
1213
  if (pProgress) taosArrayDestroy(pProgress);
×
1214
  return code;
×
1215
}
1216

1217
static int32_t fillInitSavedBlockList(struct SFillInfo* pFillInfo, SSDataBlock* pDstBlock, int32_t capacity) {
1,973✔
1218
  int32_t     code = 0;
1,973✔
1219
  SFillBlock* pFillBlock = NULL;
1,973✔
1220
  pFillInfo->pFillSavedBlockList = tdListNew(sizeof(SFillBlock));
1,973✔
1221
  if (!pFillInfo->pFillSavedBlockList) return terrno;
1,973!
1222
  code = trySaveNewBlock(pFillInfo, pDstBlock, capacity, &pFillBlock);
1,973✔
1223
  if (code != 0) return code;
1,973!
1224

1225
  pFillInfo->pColFillProgress = taosArrayInit(pFillInfo->numOfCols, sizeof(SColumnFillProgress));
1,973✔
1226
  if (!pFillInfo->pColFillProgress) {
1,973!
1227
    return terrno;
×
1228
  }
1229
  SColumnFillProgress prog = {.pBlockNode = NULL, .rowIdx = 0};
1,973✔
1230
  for (int32_t i = 0; i < pFillInfo->numOfCols; ++i) {
12,580✔
1231
    if (NULL == taosArrayPush(pFillInfo->pColFillProgress, &prog)) {
21,214!
1232
      return terrno;
×
1233
    }
1234
  }
1235
  return code;
1,973✔
1236
}
1237

1238
static void tryResetColNextPrev(struct SFillInfo* pFillInfo, int32_t colIdx) {
11,073,833✔
1239
  bool    ascFill = FILL_IS_ASC_FILL(pFillInfo);
11,073,833✔
1240
  int32_t fillType = pFillInfo->type;
11,073,833✔
1241
  bool    ascNext = ascFill && fillType == TSDB_FILL_NEXT, descPrev = !ascFill && fillType == TSDB_FILL_PREV;
11,073,833✔
1242
  if ((ascNext || descPrev) && !pFillInfo->pFillCol[colIdx].notFillCol) {
11,073,833✔
1243
    SRowVal*    pFillRow = ascFill ? (fillType == TSDB_FILL_NEXT ? &pFillInfo->next : &pFillInfo->prev)
759,251!
1244
                                   : (fillType == TSDB_FILL_NEXT ? &pFillInfo->prev : &pFillInfo->next);
1,575,478!
1245
    SGroupKeys* pKey = taosArrayGet(pFillRow->pRowVal, colIdx);
816,227✔
1246
    pKey->isNull = true;
816,024✔
1247
  }
1248
}
11,073,630✔
1249

1250
int32_t taosFillResultDataBlock2(struct SFillInfo* pFillInfo, SSDataBlock* pDstBlock, int32_t capacity,
19,648✔
1251
                                 bool* wantMoreBlock) {
1252
  int32_t     code = TSDB_CODE_SUCCESS;
19,648✔
1253
  int32_t     lino = 0;
19,648✔
1254
  bool        ascFill = FILL_IS_ASC_FILL(pFillInfo);
19,648✔
1255
  SFillBlock* pFillBlock = NULL;
19,648✔
1256
  SListNode*  pFillBlockListNode = NULL;
19,648✔
1257
  pFillInfo->numOfCurrent = 0;
19,648✔
1258

1259
  if (!pFillInfo->pFillSavedBlockList) {
19,648✔
1260
    code = fillInitSavedBlockList(pFillInfo, pDstBlock, capacity);
1,973✔
1261
    if (code != 0) goto _end;
1,973!
1262
  }
1263
  pFillBlockListNode = tdListGetTail(pFillInfo->pFillSavedBlockList);
19,648✔
1264
  pFillBlock = pFillBlockListNode ? (SFillBlock*)pFillBlockListNode->data : NULL;
19,648✔
1265

1266
  // if all blocks are consumed, we have to fill for not filled cols
1267
  if (pFillInfo->numOfRows == 0) {
19,648✔
1268
    if (!pFillBlock) {
10,155✔
1269
      code = trySaveNewBlock(pFillInfo, pDstBlock, capacity, &pFillBlock);
4,947✔
1270
      if (code != 0) goto _end;
4,947!
1271
      pFillBlockListNode = tdListGetTail(pFillInfo->pFillSavedBlockList);
4,947✔
1272
    }
1273
    bool allFilled = pFillInfo->order == TSDB_ORDER_ASC ? pFillInfo->currentKey > pFillInfo->end
28,760✔
1274
                                                        : pFillInfo->currentKey < pFillInfo->end;
10,155✔
1275
    while (!allFilled && pFillBlock->pBlock->info.rows < capacity) {
14,008,030✔
1276
      doFillOneRow2(pFillInfo, pFillBlock->pBlock, pFillInfo->pSrcBlock, pFillInfo->start, true);
13,997,878✔
1277
      allFilled = pFillInfo->order == TSDB_ORDER_ASC ? pFillInfo->currentKey > pFillInfo->end
13,997,875✔
1278
                                                     : pFillInfo->currentKey < pFillInfo->end;
13,997,875✔
1279
    }
1280

1281
    for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
66,193✔
1282
      if (tIsColFallBehind(pFillInfo, colIdx)) {
56,041✔
1283
        code = tFillFromHeadForCol(pFillInfo, pFillInfo->start, colIdx, true);
84✔
1284
        if (code != 0) goto _end;
84!
1285
      }
1286
    }
1287
  }
1288

1289
  // check from list head if we have already filled all rows in blocks, if any block is full, send it out
1290
  tryExtractReadyBlocks(pFillInfo, pDstBlock, capacity);
19,645✔
1291
  TSKEY lastSavedTs = -1;
19,648✔
1292
  while (!fillShouldPause(pFillInfo, pDstBlock)) {
13,058,354✔
1293
    if (!pFillBlock || pFillBlock->pBlock->info.rows >= capacity) {
13,037,429✔
1294
      code = trySaveNewBlock(pFillInfo, pDstBlock, capacity, &pFillBlock);
5,395✔
1295
      if (code != 0) goto _end;
5,226!
1296
      pFillBlockListNode = tdListGetTail(pFillInfo->pFillSavedBlockList);
5,226✔
1297
    }
1298
    TSKEY fillCurTs = pFillInfo->currentKey;
13,037,260✔
1299
    TSKEY blockCurTs = getBlockCurTs(pFillInfo, pFillInfo->pSrcBlock, pFillInfo->index);
13,037,260✔
1300
    if (pFillInfo->pSrcBlock && (lastSavedTs != blockCurTs || blockCurTs == fillCurTs))
13,022,604!
1301
      code = fillTrySaveRow(pFillInfo, pFillInfo->pSrcBlock, pFillInfo->index);
2,836,351✔
1302
    lastSavedTs = blockCurTs;
13,017,531✔
1303
    if (code != 0) goto _end;
13,017,531!
1304

1305
    if (blockCurTs != fillCurTs || !pFillInfo->pSrcBlock) {
13,017,531!
1306
      doFillOneRow2(pFillInfo, pFillBlock->pBlock, pFillInfo->pSrcBlock, blockCurTs, false);
10,634,860✔
1307
    } else {
1308
      for (int32_t colIdx = 0; colIdx < pFillInfo->numOfCols; ++colIdx) {
13,453,404✔
1309
        SFillColInfo*    pCol = &pFillInfo->pFillCol[colIdx];
11,074,130✔
1310
        int32_t          rowIdx = pFillBlock->pBlock->info.rows;
11,074,130✔
1311
        int32_t          dstSlotId = GET_DEST_SLOT_ID(pCol);
11,074,130✔
1312
        SColumnInfoData* pDst = taosArrayGet(pFillBlock->pBlock->pDataBlock, dstSlotId);
11,074,130✔
1313
        SColumnInfoData* pSrc = taosArrayGet(pFillInfo->pSrcBlock->pDataBlock, dstSlotId);
10,998,641✔
1314

1315
        char* src = colDataGetData(pSrc, pFillInfo->index);
10,970,285!
1316
        if (!colDataIsNull_s(pSrc, pFillInfo->index)) {
21,940,570✔
1317
          if (tIsColFallBehind(pFillInfo, colIdx)) code = tFillFromHeadForCol(pFillInfo, blockCurTs, colIdx, false);
9,297,735✔
1318
          QUERY_CHECK_CODE(code, lino, _end);
9,265,427!
1319
          code = colDataSetVal(pDst, rowIdx, src, false);
9,265,427✔
1320
          QUERY_CHECK_CODE(code, lino, _end);
9,393,695!
1321
        } else {
1322
          // if col value in block is NULL, skip setting value for this col, save current position, wait till we got
1323
          // non-null data if there is no lag for this col, then we should fill from (pFillBlock, index) when we got
1324
          // non-null value. if this col is already fall behind, do nothing. Cause when we meet non-null value for this
1325
          // col, we will fill till the last row of last block in list.
1326
          bool saved = tFillTrySaveColProgress(pFillInfo, colIdx, pFillBlockListNode, rowIdx);
1,672,550✔
1327
          if (!saved) {
1,685,685✔
1328
            doFillOneCol(pFillInfo, pFillBlock->pBlock, blockCurTs, colIdx, rowIdx, false);
1,293,838✔
1329
          }
1330
        }
1331
        tryResetColNextPrev(pFillInfo, colIdx);
11,079,187✔
1332
      }
1333
      SInterval* pInterval = &pFillInfo->interval;
2,379,274✔
1334
      pFillInfo->currentKey =
2,391,184✔
1335
          taosTimeAdd(pFillInfo->currentKey, pInterval->sliding * GET_FORWARD_DIRECTION_FACTOR(pFillInfo->order),
2,379,274✔
1336
                      pInterval->slidingUnit, pInterval->precision, NULL);
2,379,274✔
1337
      pFillBlock->pBlock->info.rows += 1;
2,391,184✔
1338
      pFillInfo->index += 1;
2,391,184✔
1339
      pFillInfo->numOfCurrent += 1;
2,391,184✔
1340
    }
1341
    tryExtractReadyBlocks(pFillInfo, pDstBlock, capacity);
13,034,296✔
1342
  }
1343

1344
_end:
19,648✔
1345
  pFillInfo->numOfTotal += pFillInfo->numOfCurrent;
19,648✔
1346
  if (!isListEmpty(pFillInfo->pFillSavedBlockList)) {
19,648✔
1347
    if (wantMoreBlock) *wantMoreBlock = true;
7,755✔
1348
  } else {
1349
    if (wantMoreBlock) *wantMoreBlock = false;
11,893✔
1350
  }
1351
  return code;
19,648✔
1352
}
1353

1354
void destroyFillBlock(void* p) {
12,146✔
1355
  SFillBlock* pFillBlock = p;
12,146✔
1356
  taosArrayDestroy(pFillBlock->pFillProgress);
12,146✔
1357
  blockDataDestroy(pFillBlock->pBlock);
12,146✔
1358
}
12,146✔
1359

1360
SFillBlock* tFillSaveBlock(SFillInfo* pFill, SSDataBlock* pBlock, SArray* pProgress) {
12,146✔
1361
  SFillBlock block = {.pBlock = pBlock, .pFillProgress = pProgress, .allColFinished = false};
12,146✔
1362
  SListNode* pNode = tdListAdd(pFill->pFillSavedBlockList, &block);
12,146✔
1363
  if (!pNode) {
12,146!
1364
    return NULL;
×
1365
  }
1366
  return (SFillBlock*)pNode->data;
12,146✔
1367
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc