• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #4720

08 Sep 2025 08:43AM UTC coverage: 58.139% (-0.6%) from 58.762%
#4720

push

travis-ci

web-flow
Merge pull request #32881 from taosdata/enh/add-new-windows-ci

fix(ci): update workflow reference to use new Windows CI YAML

133181 of 292179 branches covered (45.58%)

Branch coverage included in aggregate %.

201691 of 283811 relevant lines covered (71.07%)

5442780.71 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

68.16
/source/libs/executor/src/timesliceoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "storageapi.h"
22
#include "tcommon.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "tfill.h"
26
#include "ttime.h"
27

28
typedef struct STimeSliceOperatorInfo {
29
  SSDataBlock*         pRes;
30
  STimeWindow          win;
31
  SInterval            interval;
32
  int64_t              current;
33
  SArray*              pPrevRow;     // SArray<SGroupValue>
34
  SArray*              pNextRow;     // SArray<SGroupValue>
35
  SArray*              pLinearInfo;  // SArray<SFillLinearInfo>
36
  bool                 isPrevRowSet;
37
  bool                 isNextRowSet;
38
  int32_t              fillType;      // fill type
39
  SColumn              tsCol;         // primary timestamp column
40
  SExprSupp            scalarSup;     // scalar calculation
41
  struct SFillColInfo* pFillColInfo;  // fill column info
42
  SRowKey              prevKey;
43
  bool                 prevTsSet;
44
  uint64_t             groupId;
45
  SArray*              pPrevGroupKeys;
46
  SSDataBlock*         pNextGroupRes;
47
  SSDataBlock*         pRemainRes;   // save block unfinished processing
48
  int32_t              remainIndex;  // the remaining index in the block to be processed
49
  bool                 hasPk;
50
  SColumn              pkCol;
51
  int64_t              rangeInterval;
52
} STimeSliceOperatorInfo;
53

54
static void destroyTimeSliceOperatorInfo(void* param);
55

56
static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
10,840,423✔
57
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
10,840,423✔
58
  for (int32_t i = 0; i < numOfCols; ++i) {
42,877,840✔
59
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
32,131,955✔
60

61
    SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i);
32,114,542✔
62
    if (!colDataIsNull_s(pColInfoData, rowIndex)) {
64,080,852✔
63
      pkey->isNull = false;
30,655,729✔
64
      char* val = colDataGetData(pColInfoData, rowIndex);
30,655,729!
65
      if (IS_VAR_DATA_TYPE(pkey->type)) {
30,655,729!
66
        int32_t bytes = calcStrBytesByType(pkey->type, val);
2,317✔
67
        memcpy(pkey->pData, val, bytes);
3,342✔
68
      } else {
69
        memcpy(pkey->pData, val, pkey->bytes);
30,653,412✔
70
      }
71
    } else {
72
      pkey->isNull = true;
1,384,697✔
73
    }
74
  }
75

76
  pSliceInfo->isPrevRowSet = true;
10,745,885✔
77
}
10,745,885✔
78

79
static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
10,814,233✔
80
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
10,814,233✔
81
  for (int32_t i = 0; i < numOfCols; ++i) {
42,799,094✔
82
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
32,075,363✔
83

84
    SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, i);
32,051,503✔
85
    if (!colDataIsNull_s(pColInfoData, rowIndex)) {
64,001,768✔
86
      pkey->isNull = false;
30,616,871✔
87
      char* val = colDataGetData(pColInfoData, rowIndex);
30,616,871!
88
      if (!IS_VAR_DATA_TYPE(pkey->type)) {
30,616,871!
89
        memcpy(pkey->pData, val, pkey->bytes);
30,602,517✔
90
      } else {
91
        int32_t bytes = calcStrBytesByType(pkey->type, val);
14,354✔
92
        memcpy(pkey->pData, val, bytes);
2,670✔
93
      }
94
    } else {
95
      pkey->isNull = true;
1,384,013✔
96
    }
97
  }
98

99
  pSliceInfo->isNextRowSet = true;
10,723,731✔
100
}
10,723,731✔
101

102
static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
10,777,344✔
103
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
10,777,344✔
104
  for (int32_t i = 0; i < numOfCols; ++i) {
42,869,073✔
105
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
32,293,951✔
106
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
32,219,118✔
107
    SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i);
32,111,362✔
108

109
    if (!IS_MATHABLE_TYPE(pColInfoData->info.type)) {
32,092,955!
110
      continue;
3,032✔
111
    }
112

113
    // null value is represented by using key = INT64_MIN for now.
114
    // TODO: optimize to ignore null values for linear interpolation.
115
    if (!pLinearInfo->isStartSet) {
32,089,923✔
116
      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
62,426✔
117
        pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
30,911!
118
        char* p = colDataGetData(pColInfoData, rowIndex);
30,911!
119
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
30,911!
120
          if (IS_STR_DATA_BLOB(pColInfoData->info.type)) {
×
121
            memcpy(pLinearInfo->start.val, p, blobDataTLen(p));
×
122
          } else {
123
            memcpy(pLinearInfo->start.val, p, varDataTLen(p));
×
124
          }
125
        } else {
126
          memcpy(pLinearInfo->start.val, p, pLinearInfo->bytes);
30,912✔
127
        }
128
      }
129
      pLinearInfo->isStartSet = true;
31,213✔
130
    } else if (!pLinearInfo->isEndSet) {
32,058,710✔
131
      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
51,858✔
132
        pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
25,589!
133

134
        char* p = colDataGetData(pColInfoData, rowIndex);
25,589!
135
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
25,589!
136
          if (IS_STR_DATA_BLOB(pColInfoData->info.type)) {
×
137
            memcpy(pLinearInfo->end.val, p, blobDataTLen(p));
×
138
          } else {
139
            memcpy(pLinearInfo->end.val, p, varDataTLen(p));
×
140
          }
141
        } else {
142
          memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes);
25,589✔
143
        }
144
      }
145
      pLinearInfo->isEndSet = true;
25,929✔
146
    } else {
147
      pLinearInfo->start.key = pLinearInfo->end.key;
32,032,781✔
148
      memcpy(pLinearInfo->start.val, pLinearInfo->end.val, pLinearInfo->bytes);
32,032,781✔
149

150
      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
64,065,562✔
151
        pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
30,646,543!
152

153
        char* p = colDataGetData(pColInfoData, rowIndex);
30,646,543!
154
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
30,646,543!
155
          if (IS_STR_DATA_BLOB(pColInfoData->info.type)) {
×
156
            memcpy(pLinearInfo->end.val, p, blobDataTLen(p));
×
157
          } else {
158
            memcpy(pLinearInfo->end.val, p, varDataTLen(p));
×
159
          }
160
        } else {
161
          memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes);
30,803,813✔
162
        }
163

164
      } else {
165
        pLinearInfo->end.key = INT64_MIN;
1,386,238✔
166
      }
167
    }
168
  }
169
}
10,575,122✔
170

171
static FORCE_INLINE int32_t timeSliceEnsureBlockCapacity(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock) {
172
  if (pBlock->info.rows < pBlock->info.capacity) {
1,633,326✔
173
    return TSDB_CODE_SUCCESS;
1,633,397✔
174
  }
175

176
  uint32_t winNum = (pSliceInfo->win.ekey - pSliceInfo->win.skey) / pSliceInfo->interval.interval;
×
177
  uint32_t newRowsNum = pBlock->info.rows + TMIN(winNum / 4 + 1, 1048576);
×
178
  int32_t  code = blockDataEnsureCapacity(pBlock, newRowsNum);
×
179
  if (code != TSDB_CODE_SUCCESS) {
9!
180
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
181
    return code;
×
182
  }
183

184
  return TSDB_CODE_SUCCESS;
9✔
185
}
186

187
bool isIrowtsPseudoColumn(SExprInfo* pExprInfo) {
7,164,064✔
188
  char* name = pExprInfo->pExpr->_function.functionName;
7,164,064✔
189
  return (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_irowts") == 0);
7,164,064✔
190
}
191

192
bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) {
5,582,732✔
193
  char* name = pExprInfo->pExpr->_function.functionName;
5,582,732✔
194
  return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0);
5,582,732✔
195
}
196

197
bool isIrowtsOriginPseudoColumn(SExprInfo* pExprInfo) {
5,188,395✔
198
  const char* name = pExprInfo->pExpr->_function.functionName;
5,188,395✔
199
  return (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_irowts_origin") == 0);
5,188,395!
200
}
201

202
static void tRowGetKeyFromColData(int64_t ts, SColumnInfoData* pPkCol, int32_t rowIndex, SRowKey* pKey) {
×
203
  pKey->ts = ts;
×
204
  pKey->numOfPKs = 1;
×
205

206
  int8_t t = pPkCol->info.type;
×
207

208
  pKey->pks[0].type = t;
×
209
  if (IS_NUMERIC_TYPE(t)) {
×
210
    valueSetDatum(pKey->pks, t, colDataGetData(pPkCol, rowIndex), tDataTypes[t].bytes);
×
211
  } else {
212
    char* p = colDataGetVarData(pPkCol, rowIndex);
×
213
    pKey->pks[0].pData = (uint8_t*)varDataVal(p);
×
214
    pKey->pks[0].nData = varDataLen(p);
×
215
  }
216
}
×
217

218
// only the timestamp is needed to complete the duplicated timestamp check.
219
static bool checkDuplicateTimestamps(STimeSliceOperatorInfo* pSliceInfo, SColumnInfoData* pTsCol,
10,858,207✔
220
                                     SColumnInfoData* pPkCol, int32_t curIndex, int32_t rows) {
221
  int64_t currentTs = *(int64_t*)colDataGetData(pTsCol, curIndex);
10,858,207!
222
  if (currentTs > pSliceInfo->win.ekey) {
10,858,207✔
223
    return false;
4,777✔
224
  }
225

226
  SRowKey cur = {.ts = currentTs, .numOfPKs = (pPkCol != NULL) ? 1 : 0};
10,853,430✔
227
  if (pPkCol != NULL) {
10,853,430✔
228
    cur.pks[0].type = pPkCol->info.type;
38,720✔
229
    if (IS_VAR_DATA_TYPE(pPkCol->info.type)) {
38,720!
230
      cur.pks[0].pData = (uint8_t*)colDataGetVarData(pPkCol, curIndex);
9,680✔
231
    } else {
232
      valueSetDatum(cur.pks, pPkCol->info.type, colDataGetData(pPkCol, curIndex), pPkCol->info.bytes);
29,040!
233
    }
234
  }
235

236
  // let's discard the duplicated ts
237
  if ((pSliceInfo->prevTsSet == true) && (currentTs == pSliceInfo->prevKey.ts)) {
10,853,430!
238
    return true;
60,441✔
239
  }
240

241
  pSliceInfo->prevTsSet = true;
10,792,989✔
242
  tRowKeyAssign(&pSliceInfo->prevKey, &cur);
10,792,989✔
243

244
  return false;
10,813,050✔
245
}
246

247
bool isInterpFunc(SExprInfo* pExprInfo) {
4,108,174✔
248
  int32_t functionType = pExprInfo->pExpr->_function.functionType;
4,108,174✔
249
  return (functionType == FUNCTION_TYPE_INTERP);
4,108,174✔
250
}
251

252
static bool isGroupKeyFunc(SExprInfo* pExprInfo) {
148,192✔
253
  int32_t functionType = pExprInfo->pExpr->_function.functionType;
148,192✔
254
  return (functionType == FUNCTION_TYPE_GROUP_KEY);
148,192✔
255
}
256

257
static bool isSelectGroupConstValueFunc(SExprInfo* pExprInfo) {
3,362✔
258
  int32_t functionType = pExprInfo->pExpr->_function.functionType;
3,362✔
259
  return (functionType == FUNCTION_TYPE_GROUP_CONST_VALUE);
3,362✔
260
}
261

262
bool getIgoreNullRes(SExprSupp* pExprSup) {
25,932✔
263
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
142,693✔
264
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];
116,919✔
265

266
    if (isInterpFunc(pExprInfo)) {
116,919✔
267
      for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
140,195✔
268
        SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
93,620✔
269
        if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
93,620✔
270
          return pFuncParam->param.i ? true : false;
158✔
271
        }
272
      }
273
    }
274
  }
275

276
  return false;
25,774✔
277
}
278

279
bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull) {
10,821,923✔
280
  if (!ignoreNull) {
10,821,923✔
281
    return false;
10,821,860✔
282
  }
283

284
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
1,798!
285
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
2,085✔
286

287
    if (isInterpFunc(pExprInfo)) {
2,085✔
288
      int32_t          srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
635✔
289
      SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
635✔
290

291
      if (colDataIsNull_s(pSrc, index)) {
1,270!
292
        return true;
350✔
293
      }
294
    }
295
  }
296

297
  return false;
×
298
}
299

300
static int32_t interpColSetKey(SColumnInfoData* pDst, int32_t rowNum, SGroupKeys* pKey) {
3,690,828✔
301
  int32_t code = 0;
3,690,828✔
302
  if (pKey->isNull == false) {
3,690,828✔
303
    code = colDataSetVal(pDst, rowNum, pKey->pData, false);
3,690,306✔
304
  } else {
305
    colDataSetNULL(pDst, rowNum);
522!
306
  }
307
  return code;
3,694,418✔
308
}
309

310
static bool interpSetFillRowWithRangeIntervalCheck(STimeSliceOperatorInfo* pSliceInfo, SArray** ppFillRow,
1,338,873✔
311
                                                   SArray* pFillRefRow, int64_t fillRefRowTs) {
312
  *ppFillRow = NULL;
1,338,873✔
313
  if (pSliceInfo->rangeInterval <= 0 || llabs(fillRefRowTs - pSliceInfo->current) <= pSliceInfo->rangeInterval) {
1,338,873✔
314
    *ppFillRow = pFillRefRow;
1,337,411✔
315
    return true;
1,337,411✔
316
  }
317
  return false;
1,462✔
318
}
319

320
static bool interpDetermineNearFillRow(STimeSliceOperatorInfo* pSliceInfo, SArray** ppNearRow) {
1,180,351✔
321
  if (!pSliceInfo->isPrevRowSet && !pSliceInfo->isNextRowSet) {
1,180,351!
322
    *ppNearRow = NULL;
×
323
    return false;
×
324
  }
325
  SGroupKeys *pPrevTsKey = NULL, *pNextTsKey = NULL;
1,180,351✔
326
  int64_t *   pPrevTs = NULL, *pNextTs = NULL;
1,180,351✔
327
  if (pSliceInfo->isPrevRowSet) {
1,180,351✔
328
    pPrevTsKey = taosArrayGet(pSliceInfo->pPrevRow, pSliceInfo->tsCol.slotId);
1,013,923✔
329
    pPrevTs = (int64_t*)pPrevTsKey->pData;
1,013,881✔
330
  }
331
  if (pSliceInfo->isNextRowSet) {
1,180,309✔
332
    pNextTsKey = taosArrayGet(pSliceInfo->pNextRow, pSliceInfo->tsCol.slotId);
1,179,984✔
333
    pNextTs = (int64_t*)pNextTsKey->pData;
1,179,583✔
334
  }
335
  if (!pPrevTsKey) {
1,179,908✔
336
    *ppNearRow = pSliceInfo->pNextRow;
166,782✔
337
    (void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppNearRow, pSliceInfo->pNextRow, *pNextTs);
166,782✔
338
  } else if (!pNextTsKey) {
1,013,126✔
339
    *ppNearRow = pSliceInfo->pPrevRow;
305✔
340
    (void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppNearRow, pSliceInfo->pPrevRow, *pPrevTs);
305✔
341
  } else {
342
    if (llabs(pSliceInfo->current - *pPrevTs) <= llabs(*pNextTs - pSliceInfo->current)) {
1,012,821✔
343
      // take prev if euqal
344
      (void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppNearRow, pSliceInfo->pPrevRow, *pPrevTs);
559,965✔
345
    } else {
346
      (void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppNearRow, pSliceInfo->pNextRow, *pNextTs);
452,856✔
347
    }
348
  }
349
  return true;
1,179,958✔
350
}
351

352
static bool interpDetermineFillRefRow(STimeSliceOperatorInfo* pSliceInfo, SArray** ppOutRow) {
1,623,018✔
353
  bool needFill = false;
1,623,018✔
354
  if (pSliceInfo->fillType == TSDB_FILL_PREV) {
1,623,018✔
355
    if (pSliceInfo->isPrevRowSet) {
100,178✔
356
      SGroupKeys* pTsCol = taosArrayGet(pSliceInfo->pPrevRow, pSliceInfo->tsCol.slotId);
99,175✔
357
      (void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppOutRow, pSliceInfo->pPrevRow,
99,175✔
358
                                                   *(int64_t*)pTsCol->pData);
99,175✔
359
      needFill = true;
99,175✔
360
    }
361
  } else if (pSliceInfo->fillType == TSDB_FILL_NEXT) {
1,522,840✔
362
    if (pSliceInfo->isNextRowSet) {
59,690!
363
      SGroupKeys* pTsCol = taosArrayGet(pSliceInfo->pNextRow, pSliceInfo->tsCol.slotId);
59,690✔
364
      (void)interpSetFillRowWithRangeIntervalCheck(pSliceInfo, ppOutRow, pSliceInfo->pNextRow,
59,690✔
365
                                                   *(int64_t*)pTsCol->pData);
59,690✔
366
      needFill = true;
59,690✔
367
    }
368
  } else if (pSliceInfo->fillType == TSDB_FILL_NEAR) {
1,463,150✔
369
    needFill = interpDetermineNearFillRow(pSliceInfo, ppOutRow);
1,180,393✔
370
  } else {
371
    needFill = true;
282,757✔
372
  }
373
  return needFill;
1,622,542✔
374
}
375

376
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
1,622,660✔
377
                                   SSDataBlock* pSrcBlock, int32_t index, bool beforeTs, SExecTaskInfo* pTaskInfo) {
378
  int32_t code = TSDB_CODE_SUCCESS;
1,622,660✔
379
  int32_t lino = 0;
1,622,660✔
380
  int32_t rows = pResBlock->info.rows;
1,622,660!
381
  code = timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock);
1,622,740✔
382
  QUERY_CHECK_CODE(code, lino, _end);
1,622,740!
383
  // todo set the correct primary timestamp column
384

385
  // output the result
386
  int32_t fillColIndex = 0;
1,622,740✔
387
  int32_t groupKeyIndex = 0;
1,622,740✔
388
  bool    hasInterp = true;
1,622,740✔
389
  SArray* pFillRefRow = NULL;
1,622,740✔
390
  bool    needFill = interpDetermineFillRefRow(pSliceInfo, &pFillRefRow);
1,622,740✔
391
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
8,766,648✔
392
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
7,136,720✔
393

394
    int32_t          dstSlot = pExprInfo->base.resSchema.slotId;
7,136,720✔
395
    SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
7,136,720✔
396

397
    if (isIrowtsPseudoColumn(pExprInfo)) {
7,135,015✔
398
      code = colDataSetVal(pDst, rows, (char*)&pSliceInfo->current, false);
1,577,493✔
399
      QUERY_CHECK_CODE(code, lino, _end);
1,577,671!
400
      continue;
1,577,671✔
401
    } else if (isIsfilledPseudoColumn(pExprInfo)) {
5,562,480✔
402
      bool isFilled = true;
1,575,719✔
403
      code = colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false);
1,575,719✔
404
      QUERY_CHECK_CODE(code, lino, _end);
1,576,364!
405
      continue;
1,576,364✔
406
    } else if (!isInterpFunc(pExprInfo) && !isIrowtsOriginPseudoColumn(pExprInfo)) {
3,986,053✔
407
      if (isGroupKeyFunc(pExprInfo) || isSelectGroupConstValueFunc(pExprInfo)) {
8,572!
408
        if (pSrcBlock != NULL) {
8,572✔
409
          int32_t          srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
6,216✔
410
          SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
6,216✔
411

412
          if (colDataIsNull_s(pSrc, index)) {
12,432✔
413
            colDataSetNULL(pDst, pResBlock->info.rows);
56!
414
            continue;
56✔
415
          }
416

417
          char* v = colDataGetData(pSrc, index);
6,160!
418
          code = colDataSetVal(pDst, pResBlock->info.rows, v, false);
6,160✔
419
          QUERY_CHECK_CODE(code, lino, _end);
6,160!
420
        } else if (!isSelectGroupConstValueFunc(pExprInfo)) {
2,356✔
421
          // use stored group key
422
          SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevGroupKeys, groupKeyIndex);
2,134✔
423
          QUERY_CHECK_NULL(pkey, code, lino, _end, terrno);
2,134!
424
          groupKeyIndex++;
2,134✔
425
          if (pkey->isNull == false) {
2,134!
426
            code = colDataSetVal(pDst, rows, pkey->pData, false);
2,134✔
427
            QUERY_CHECK_CODE(code, lino, _end);
2,134!
428
          } else {
429
            colDataSetNULL(pDst, rows);
×
430
          }
431
        } else {
432
          int32_t     srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
222✔
433
          SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
222✔
434
          if (pkey->isNull == false) {
222!
435
            code = colDataSetVal(pDst, rows, pkey->pData, false);
222✔
436
            QUERY_CHECK_CODE(code, lino, _end);
222!
437
          } else {
438
            colDataSetNULL(pDst, rows);
×
439
          }
440
        }
441
      }
442
      continue;
8,516✔
443
    }
444

445
    int32_t srcSlot =
3,979,427✔
446
        isIrowtsOriginPseudoColumn(pExprInfo) ? pSliceInfo->tsCol.slotId : pExprInfo->base.pParam[0].pCol->slotId;
3,980,574✔
447
    switch (pSliceInfo->fillType) {
3,979,427!
448
      case TSDB_FILL_NULL:
66,292✔
449
      case TSDB_FILL_NULL_F: {
450
        colDataSetNULL(pDst, rows);
66,292!
451
        break;
66,292✔
452
      }
453

454
      case TSDB_FILL_PREV:
3,695,503✔
455
      case TSDB_FILL_NEAR:
456
      case TSDB_FILL_NEXT: {
457
        if (!needFill) {
3,695,503✔
458
          hasInterp = false;
1,147✔
459
          break;
1,147✔
460
        }
461
        if (pFillRefRow) {
3,694,356✔
462
          code = interpColSetKey(pDst, rows, taosArrayGet(pFillRefRow, srcSlot));
3,691,197✔
463
          QUERY_CHECK_CODE(code, lino, _end);
3,694,374!
464
          break;
3,694,374✔
465
        }
466
        // no fillRefRow, fall through to fill specified values
467
        if (srcSlot == pSliceInfo->tsCol.slotId) {
3,159✔
468
          // if is _irowts_origin, there is no value to fill, just set to null
469
          colDataSetNULL(pDst, rows);
900!
470
          break;
900✔
471
        }
472
      }
473
      case TSDB_FILL_SET_VALUE:
474
      case TSDB_FILL_SET_VALUE_F: {
475
        SVariant* pVar = &pSliceInfo->pFillColInfo[fillColIndex].fillVal;
167,876✔
476

477
        bool isNull = (TSDB_DATA_TYPE_NULL == pVar->nType) ? true : false;
167,876✔
478
        if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
167,876✔
479
          float v = 0;
324✔
480
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
324!
481
            GET_TYPED_DATA(v, float, pVar->nType, &pVar->f, 0);
324!
482
          } else {
483
            v = taosStr2Float(varDataVal(pVar->pz), NULL);
×
484
          }
485
          code = colDataSetVal(pDst, rows, (char*)&v, isNull);
324✔
486
          QUERY_CHECK_CODE(code, lino, _end);
324!
487
        } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
167,552✔
488
          double v = 0;
15,005✔
489
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
15,005!
490
            GET_TYPED_DATA(v, double, pVar->nType, &pVar->d, 0);
15,005!
491
          } else {
492
            v = taosStr2Double(varDataVal(pVar->pz), NULL);
×
493
          }
494
          code = colDataSetVal(pDst, rows, (char*)&v, isNull);
15,005✔
495
          QUERY_CHECK_CODE(code, lino, _end);
15,005!
496
        } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
303,361✔
497
          int64_t v = 0;
150,814✔
498
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
150,814!
499
            GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i, 0);
150,814!
500
          } else {
501
            v = taosStr2Int64(varDataVal(pVar->pz), NULL, 10);
×
502
          }
503
          code = colDataSetVal(pDst, rows, (char*)&v, isNull);
150,814✔
504
          QUERY_CHECK_CODE(code, lino, _end);
150,814!
505
        } else if (IS_UNSIGNED_NUMERIC_TYPE(pDst->info.type)) {
2,533!
506
          uint64_t v = 0;
800✔
507
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
800!
508
            GET_TYPED_DATA(v, uint64_t, pVar->nType, &pVar->u, 0);
800!
509
          } else {
510
            v = taosStr2UInt64(varDataVal(pVar->pz), NULL, 10);
×
511
          }
512
          code = colDataSetVal(pDst, rows, (char*)&v, isNull);
800✔
513
          QUERY_CHECK_CODE(code, lino, _end);
800!
514
        } else if (IS_BOOLEAN_TYPE(pDst->info.type)) {
933✔
515
          bool v = false;
930✔
516
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
930!
517
            GET_TYPED_DATA(v, bool, pVar->nType, &pVar->i, 0);
930!
518
          } else {
519
            v = taosStr2Int8(varDataVal(pVar->pz), NULL, 10);
×
520
          }
521
          code = colDataSetVal(pDst, rows, (char*)&v, isNull);
930✔
522
          QUERY_CHECK_CODE(code, lino, _end);
930!
523
        }
524

525
        ++fillColIndex;
167,876✔
526
        break;
167,876✔
527
      }
528

529
      case TSDB_FILL_LINEAR: {
52,015✔
530
        SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot);
52,015✔
531

532
        SPoint start = pLinearInfo->start;
52,015✔
533
        SPoint end = pLinearInfo->end;
52,015✔
534
        SPoint current = {.key = pSliceInfo->current};
52,015✔
535

536
        // do not interpolate before ts range, only increate pSliceInfo->current
537
        if (beforeTs && !pLinearInfo->isEndSet) {
52,015✔
538
          return true;
1,067✔
539
        }
540

541
        if (!pLinearInfo->isStartSet || !pLinearInfo->isEndSet) {
50,948!
542
          hasInterp = false;
8✔
543
          break;
50,948✔
544
        }
545

546
        if (end.key != INT64_MIN && end.key < pSliceInfo->current) {
50,940✔
547
          hasInterp = false;
131✔
548
          break;
131✔
549
        }
550

551
        if (start.key == INT64_MIN || end.key == INT64_MIN) {
50,809✔
552
          colDataSetNULL(pDst, rows);
7,659!
553
          break;
7,659✔
554
        }
555

556
        current.val = taosMemoryCalloc(pLinearInfo->bytes, 1);
43,150!
557
        QUERY_CHECK_NULL(current.val, code, lino, _end, terrno);
43,150!
558
        taosGetLinearInterpolationVal(&current, pLinearInfo->type, &start, &end, pLinearInfo->type,
43,150✔
559
                                      typeGetTypeModFromColInfo(&pDst->info));
43,150✔
560
        code = colDataSetVal(pDst, rows, (char*)current.val, false);
43,150✔
561
        QUERY_CHECK_CODE(code, lino, _end);
43,150!
562

563
        taosMemoryFree(current.val);
43,150!
564
        break;
43,150✔
565
      }
566
      case TSDB_FILL_NONE:
×
567
      default:
568
        break;
×
569
    }
570
  }
571

572
  if (hasInterp) {
1,629,928✔
573
    pResBlock->info.rows += 1;
1,621,071✔
574
  }
575

576
_end:
8,857✔
577
  if (code != TSDB_CODE_SUCCESS) {
1,629,928!
578
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
579
    pTaskInfo->code = code;
×
580
    T_LONG_JMP(pTaskInfo->env, code);
×
581
  }
582
  return hasInterp;
1,629,928✔
583
}
584

585
static int32_t addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
10,666✔
586
                                     SSDataBlock* pSrcBlock, int32_t index) {
587
  int32_t code = TSDB_CODE_SUCCESS;
10,666✔
588
  int32_t lino = 0;
10,666!
589
  code = timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock);
10,666✔
590
  QUERY_CHECK_CODE(code, lino, _end);
10,666!
591
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
39,936✔
592
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
29,270✔
593

594
    int32_t          dstSlot = pExprInfo->base.resSchema.slotId;
29,270✔
595
    SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
29,270✔
596

597
    if (isIrowtsPseudoColumn(pExprInfo) || isIrowtsOriginPseudoColumn(pExprInfo)) {
29,270✔
598
      code = colDataSetVal(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false);
8,650✔
599
      QUERY_CHECK_CODE(code, lino, _end);
8,650!
600
    } else if (isIsfilledPseudoColumn(pExprInfo)) {
20,620✔
601
      bool isFilled = false;
6,386✔
602
      code = colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false);
6,386✔
603
      QUERY_CHECK_CODE(code, lino, _end);
6,386!
604
    } else {
605
      int32_t          srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
14,234✔
606
      SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
14,234✔
607

608
      if (colDataIsNull_s(pSrc, index)) {
28,468✔
609
        colDataSetNULL(pDst, pResBlock->info.rows);
2,472!
610
        continue;
2,472✔
611
      }
612

613
      char* v = colDataGetData(pSrc, index);
11,762!
614
      code = colDataSetVal(pDst, pResBlock->info.rows, v, false);
11,762✔
615
      QUERY_CHECK_CODE(code, lino, _end);
11,762!
616
    }
617
  }
618

619
  pResBlock->info.rows += 1;
10,666✔
620

621
_end:
10,666✔
622
  if (code != TSDB_CODE_SUCCESS) {
10,666!
623
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
624
  }
625
  return code;
10,666✔
626
}
627

628
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
21,979✔
629
  int32_t code = TSDB_CODE_SUCCESS;
21,979✔
630
  int32_t lino = 0;
21,979✔
631
  if (pInfo->pPrevRow != NULL) {
21,979✔
632
    return TSDB_CODE_SUCCESS;
12,258✔
633
  }
634

635
  pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys));
9,721✔
636
  if (pInfo->pPrevRow == NULL) {
9,721!
637
    return terrno;
×
638
  }
639

640
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
9,721✔
641
  for (int32_t i = 0; i < numOfCols; ++i) {
36,856✔
642
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
27,135✔
643

644
    SGroupKeys key = {0};
27,135✔
645
    key.bytes = pColInfo->info.bytes;
27,135✔
646
    key.type = pColInfo->info.type;
27,135✔
647
    key.isNull = false;
27,135✔
648
    key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
27,135!
649
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
27,135!
650
    void* tmp = taosArrayPush(pInfo->pPrevRow, &key);
27,135✔
651
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
27,135!
652
  }
653

654
  pInfo->isPrevRowSet = false;
9,721✔
655

656
_end:
9,721✔
657
  if (code != TSDB_CODE_SUCCESS) {
9,721!
658
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
659
  }
660
  return code;
9,721✔
661
}
662

663
static int32_t initNextRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
21,979✔
664
  int32_t code = TSDB_CODE_SUCCESS;
21,979✔
665
  int32_t lino = 0;
21,979✔
666
  if (pInfo->pNextRow != NULL) {
21,979✔
667
    return TSDB_CODE_SUCCESS;
12,258✔
668
  }
669

670
  pInfo->pNextRow = taosArrayInit(4, sizeof(SGroupKeys));
9,721✔
671
  if (pInfo->pNextRow == NULL) {
9,721!
672
    return terrno;
×
673
  }
674

675
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
9,721✔
676
  for (int32_t i = 0; i < numOfCols; ++i) {
36,856✔
677
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
27,135✔
678

679
    SGroupKeys key = {0};
27,135✔
680
    key.bytes = pColInfo->info.bytes;
27,135✔
681
    key.type = pColInfo->info.type;
27,135✔
682
    key.isNull = false;
27,135✔
683
    key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
27,135!
684
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
27,135!
685

686
    void* tmp = taosArrayPush(pInfo->pNextRow, &key);
27,135✔
687
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
27,135!
688
  }
689

690
  pInfo->isNextRowSet = false;
9,721✔
691

692
_end:
9,721✔
693
  if (code != TSDB_CODE_SUCCESS) {
9,721!
694
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
695
  }
696
  return code;
9,721✔
697
}
698

699
static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
21,979✔
700
  int32_t code = TSDB_CODE_SUCCESS;
21,979✔
701
  int32_t lino = 0;
21,979✔
702
  if (pInfo->pLinearInfo != NULL) {
21,979✔
703
    return TSDB_CODE_SUCCESS;
12,258✔
704
  }
705

706
  pInfo->pLinearInfo = taosArrayInit(4, sizeof(SFillLinearInfo));
9,721✔
707
  if (pInfo->pLinearInfo == NULL) {
9,721!
708
    return terrno;
×
709
  }
710

711
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
9,721✔
712
  for (int32_t i = 0; i < numOfCols; ++i) {
36,856✔
713
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
27,135✔
714

715
    SFillLinearInfo linearInfo = {0};
27,135✔
716
    linearInfo.start.key = INT64_MIN;
27,135✔
717
    linearInfo.end.key = INT64_MIN;
27,135✔
718
    linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes);
27,135!
719
    QUERY_CHECK_NULL(linearInfo.start.val, code, lino, _end, terrno);
27,135!
720

721
    linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes);
27,135!
722
    QUERY_CHECK_NULL(linearInfo.end.val, code, lino, _end, terrno);
27,135!
723
    linearInfo.isStartSet = false;
27,135✔
724
    linearInfo.isEndSet = false;
27,135✔
725
    linearInfo.type = pColInfo->info.type;
27,135✔
726
    linearInfo.bytes = pColInfo->info.bytes;
27,135✔
727
    void* tmp = taosArrayPush(pInfo->pLinearInfo, &linearInfo);
27,135✔
728
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
27,135!
729
  }
730

731
_end:
9,721✔
732
  if (code != TSDB_CODE_SUCCESS) {
9,721!
733
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
734
  }
735
  return code;
9,721✔
736
}
737

738
static void destroyGroupKey(void* pKey) {
252✔
739
  SGroupKeys* key = (SGroupKeys*)pKey;
252✔
740
  if (key->pData != NULL) {
252!
741
    taosMemoryFreeClear(key->pData);
252!
742
  }
743
}
252✔
744

745
static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExprSup) {
21,979✔
746
  if (pInfo->pPrevGroupKeys != NULL) {
21,979✔
747
    return TSDB_CODE_SUCCESS;
12,258✔
748
  }
749

750
  pInfo->pPrevGroupKeys = taosArrayInit(pExprSup->numOfExprs, sizeof(SGroupKeys));
9,721✔
751
  if (pInfo->pPrevGroupKeys == NULL) {
9,721!
752
    return terrno;
×
753
  }
754

755
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
51,390✔
756
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];
41,669✔
757

758
    if (isGroupKeyFunc(pExprInfo)) {
41,669✔
759
      SGroupKeys key = {.bytes = pExprInfo->base.resSchema.bytes,
504✔
760
                        .type = pExprInfo->base.resSchema.type,
252✔
761
                        .isNull = false,
762
                        .pData = taosMemoryCalloc(1, pExprInfo->base.resSchema.bytes)};
252!
763
      if (!key.pData) {
252!
764
        taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey);
×
765
        pInfo->pPrevGroupKeys = NULL;
×
766
        return terrno;
×
767
      }
768
      if (NULL == taosArrayPush(pInfo->pPrevGroupKeys, &key)) {
504!
769
        taosMemoryFree(key.pData);
×
770
        taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey);
×
771
        pInfo->pPrevGroupKeys = NULL;
×
772
        return terrno;
×
773
      }
774
    }
775
  }
776

777
  return TSDB_CODE_SUCCESS;
9,721✔
778
}
779

780
static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock, SExprSupp* pExprSup) {
21,979✔
781
  int32_t code;
782
  code = initPrevRowsKeeper(pInfo, pBlock);
21,979✔
783
  if (code != TSDB_CODE_SUCCESS) {
21,979!
784
    return TSDB_CODE_FAILED;
×
785
  }
786

787
  code = initNextRowsKeeper(pInfo, pBlock);
21,979✔
788
  if (code != TSDB_CODE_SUCCESS) {
21,979!
789
    return TSDB_CODE_FAILED;
×
790
  }
791

792
  code = initFillLinearInfo(pInfo, pBlock);
21,979✔
793
  if (code != TSDB_CODE_SUCCESS) {
21,979!
794
    return TSDB_CODE_FAILED;
×
795
  }
796

797
  code = initGroupKeyKeeper(pInfo, pExprSup);
21,979✔
798
  if (code != TSDB_CODE_SUCCESS) {
21,979!
799
    return TSDB_CODE_FAILED;
×
800
  }
801

802
  return TSDB_CODE_SUCCESS;
21,979✔
803
}
804

805
static void resetPrevRowsKeeper(STimeSliceOperatorInfo* pInfo) {
1,720✔
806
  if (pInfo->pPrevRow == NULL) {
1,720!
807
    return;
×
808
  }
809

810
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
7,320✔
811
    SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);
5,600✔
812
    pKey->isNull = false;
5,600✔
813
  }
814

815
  pInfo->isPrevRowSet = false;
1,720✔
816

817
  return;
1,720✔
818
}
819

820
static void resetNextRowsKeeper(STimeSliceOperatorInfo* pInfo) {
1,720✔
821
  if (pInfo->pNextRow == NULL) {
1,720!
822
    return;
×
823
  }
824

825
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
7,320✔
826
    SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);
5,600✔
827
    pKey->isNull = false;
5,600✔
828
  }
829

830
  pInfo->isNextRowSet = false;
1,720✔
831

832
  return;
1,720✔
833
}
834

835
static void resetFillLinearInfo(STimeSliceOperatorInfo* pInfo) {
1,720✔
836
  if (pInfo->pLinearInfo == NULL) {
1,720!
837
    return;
×
838
  }
839

840
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
7,320✔
841
    SFillLinearInfo* pLinearInfo = taosArrayGet(pInfo->pLinearInfo, i);
5,600✔
842
    pLinearInfo->start.key = INT64_MIN;
5,600✔
843
    pLinearInfo->end.key = INT64_MIN;
5,600✔
844
    pLinearInfo->isStartSet = false;
5,600✔
845
    pLinearInfo->isEndSet = false;
5,600✔
846
  }
847

848
  return;
1,720✔
849
}
850

851
static void resetKeeperInfo(STimeSliceOperatorInfo* pInfo) {
1,720✔
852
  resetPrevRowsKeeper(pInfo);
1,720✔
853
  resetNextRowsKeeper(pInfo);
1,720✔
854
  resetFillLinearInfo(pInfo);
1,720✔
855
}
1,720✔
856

857
static bool checkThresholdReached(STimeSliceOperatorInfo* pSliceInfo, int32_t threshold) {
927,644✔
858
  SSDataBlock* pResBlock = pSliceInfo->pRes;
927,644✔
859
  if (pResBlock->info.rows > threshold) {
927,644✔
860
    return true;
54✔
861
  }
862

863
  return false;
927,590✔
864
}
865

866
static bool checkWindowBoundReached(STimeSliceOperatorInfo* pSliceInfo) {
974,682✔
867
  if (pSliceInfo->current > pSliceInfo->win.ekey) {
974,682✔
868
    return true;
25,054✔
869
  }
870

871
  return false;
949,628✔
872
}
873

874
static void saveBlockStatus(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, int32_t curIndex) {
27✔
875
  SSDataBlock* pResBlock = pSliceInfo->pRes;
27✔
876

877
  SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
27✔
878
  if (curIndex < pBlock->info.rows - 1) {
27!
879
    pSliceInfo->pRemainRes = pBlock;
27✔
880
    pSliceInfo->remainIndex = curIndex + 1;
27✔
881
    return;
27✔
882
  }
883

884
  // all data in remaining block processed
885
  pSliceInfo->pRemainRes = NULL;
×
886
}
887

888
static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock,
21,978✔
889
                            SExecTaskInfo* pTaskInfo, bool ignoreNull) {
890
  int32_t      code = TSDB_CODE_SUCCESS;
21,978✔
891
  int32_t      lino = 0;
21,978✔
892
  SSDataBlock* pResBlock = pSliceInfo->pRes;
21,978✔
893
  SInterval*   pInterval = &pSliceInfo->interval;
21,978✔
894

895
  SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
21,978✔
896
  SColumnInfoData* pPkCol = NULL;
21,978✔
897

898
  if (pSliceInfo->hasPk) {
21,978✔
899
    pPkCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->pkCol.slotId);
154✔
900
  }
901

902
  int32_t i = (pSliceInfo->pRemainRes == NULL) ? 0 : pSliceInfo->remainIndex;
21,978✔
903
  for (; i < pBlock->info.rows; ++i) {
10,852,601✔
904
    int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
10,850,652!
905

906
    // check for duplicate timestamps
907
    if (checkDuplicateTimestamps(pSliceInfo, pTsCol, pPkCol, i, pBlock->info.rows)) {
10,850,652✔
908
      continue;
60,441✔
909
    }
910

911
    if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) {
10,822,566✔
912
      continue;
350✔
913
    }
914

915
    if (ts == pSliceInfo->current) {
10,822,353✔
916
      code = addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
3,925✔
917
      QUERY_CHECK_CODE(code, lino, _end);
3,925!
918

919
      doKeepPrevRows(pSliceInfo, pBlock, i);
3,925✔
920
      doKeepLinearInfo(pSliceInfo, pBlock, i);
3,925✔
921

922
      pSliceInfo->current =
3,925✔
923
          taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL);
3,925✔
924

925
      if (checkWindowBoundReached(pSliceInfo)) {
3,925✔
926
        break;
475✔
927
      }
928

929
      if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
3,450!
930
        saveBlockStatus(pSliceInfo, pBlock, i);
×
931
        return;
×
932
      }
933
    } else if (ts < pSliceInfo->current) {
10,818,428✔
934
      // in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
935
      doKeepPrevRows(pSliceInfo, pBlock, i);
10,810,226✔
936
      doKeepLinearInfo(pSliceInfo, pBlock, i);
10,757,126✔
937

938
      if (i < pBlock->info.rows - 1) {
10,808,112✔
939
        // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
940
        doKeepNextRows(pSliceInfo, pBlock, i + 1);
10,796,337✔
941
        int64_t nextTs = *(int64_t*)colDataGetData(pTsCol, i + 1);
10,722,767!
942
        if (nextTs > pSliceInfo->current) {
10,722,767✔
943
          while (pSliceInfo->current < nextTs && pSliceInfo->current <= pSliceInfo->win.ekey) {
1,812,319!
944
            if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, false, pTaskInfo) &&
910,719✔
945
                pSliceInfo->fillType == TSDB_FILL_LINEAR) {
139!
946
              break;
947
            } else {
948
              pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
911,701✔
949
                                                pInterval->precision, NULL);
911,733✔
950
            }
951
          }
952

953
          if (checkWindowBoundReached(pSliceInfo)) {
901,739✔
954
            break;
3,007✔
955
          }
956
          if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
898,952✔
957
            saveBlockStatus(pSliceInfo, pBlock, i);
23✔
958
            return;
23✔
959
          }
960
        } else {
961
          // ignore current row, and do nothing
962
        }
963
      } else {  // it is the last row of current block
964
        doKeepPrevRows(pSliceInfo, pBlock, i);
11,775✔
965
      }
966
    } else {  // ts > pSliceInfo->current
967
      // in case of interpolation window starts and ends between two datapoints, fill(next) need to interpolate
968
      doKeepNextRows(pSliceInfo, pBlock, i);
8,202✔
969
      doKeepLinearInfo(pSliceInfo, pBlock, i);
16,939✔
970

971
      while (pSliceInfo->current < ts && pSliceInfo->current <= pSliceInfo->win.ekey) {
415,411✔
972
        if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i, true, pTaskInfo) &&
398,471✔
973
            pSliceInfo->fillType == TSDB_FILL_LINEAR) {
967!
974
          break;
975
        } else {
976
          pSliceInfo->current = taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit,
398,473✔
977
                                            pInterval->precision, NULL);
398,478✔
978
        }
979
      }
980

981
      // add current row if timestamp match
982
      if (ts == pSliceInfo->current && pSliceInfo->current <= pSliceInfo->win.ekey) {
16,940✔
983
        code = addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp, pResBlock, pBlock, i);
6,741✔
984
        QUERY_CHECK_CODE(code, lino, _end);
6,741!
985

986
        pSliceInfo->current =
6,741✔
987
            taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL);
6,741✔
988
      }
989
      doKeepPrevRows(pSliceInfo, pBlock, i);
16,940✔
990

991
      if (checkWindowBoundReached(pSliceInfo)) {
16,939✔
992
        break;
5,092✔
993
      }
994
      if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
11,847✔
995
        saveBlockStatus(pSliceInfo, pBlock, i);
4✔
996
        return;
4✔
997
      }
998
    }
999
  }
1000

1001
  // if reached here, meaning block processing finished naturally,
1002
  // or interpolation reach window upper bound
1003
  pSliceInfo->pRemainRes = NULL;
10,523✔
1004

1005
_end:
10,523✔
1006
  if (code != TSDB_CODE_SUCCESS) {
10,523!
1007
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1008
    pTaskInfo->code = code;
×
1009
    T_LONG_JMP(pTaskInfo->env, code);
×
1010
  }
1011
}
1012

1013
static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) {
14,614✔
1014
  SSDataBlock* pResBlock = pSliceInfo->pRes;
14,614✔
1015
  SInterval*   pInterval = &pSliceInfo->interval;
14,614✔
1016

1017
  if (pSliceInfo->fillType == TSDB_FILL_NEXT || pSliceInfo->fillType == TSDB_FILL_LINEAR ||
14,614✔
1018
      pSliceInfo->pPrevGroupKeys == NULL) {
12,779✔
1019
    return;
5,001✔
1020
  }
1021

1022
  while (pSliceInfo->current <= pSliceInfo->win.ekey) {
322,788✔
1023
    (void)genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false, pOperator->pTaskInfo);
313,175✔
1024
    pSliceInfo->current =
313,175✔
1025
        taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL);
313,175✔
1026
  }
1027
}
1028

1029
static int32_t copyPrevGroupKey(SExprSupp* pExprSup, SArray* pGroupKeys, SSDataBlock* pSrcBlock) {
21,979✔
1030
  int32_t groupKeyIdx = 0;
21,979✔
1031
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
119,930✔
1032
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
97,951✔
1033

1034
    if (isGroupKeyFunc(pExprInfo)) {
97,951✔
1035
      int32_t     srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
1,532✔
1036
      SGroupKeys* pGroupKey = taosArrayGet(pGroupKeys, groupKeyIdx);
1,532✔
1037
      if (pGroupKey == NULL) {
1,532!
1038
        return terrno;
×
1039
      }
1040
      groupKeyIdx++;
1,532✔
1041
      SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
1,532✔
1042

1043
      if (colDataIsNull_s(pSrc, 0)) {
1,532!
1044
        pGroupKey->isNull = true;
×
1045
        break;
×
1046
      }
1047

1048
      char* v = colDataGetData(pSrc, 0);
1,532!
1049
      if (IS_VAR_DATA_TYPE(pGroupKey->type)) {
1,532!
1050
        if (IS_STR_DATA_BLOB(pGroupKey->type)) {
1,202!
1051
          memcpy(pGroupKey->pData, v, blobDataTLen(v));
×
1052
        } else {
1053
          memcpy(pGroupKey->pData, v, varDataTLen(v));
1,202✔
1054
        }
1055
      } else {
1056
        memcpy(pGroupKey->pData, v, pGroupKey->bytes);
330✔
1057
      }
1058

1059
      pGroupKey->isNull = false;
1,532✔
1060
    }
1061
  }
1062
  return TSDB_CODE_SUCCESS;
21,979✔
1063
}
1064

1065
static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
1,720✔
1066
  pSliceInfo->current = pSliceInfo->win.skey;
1,720✔
1067
  pSliceInfo->prevTsSet = false;
1,720✔
1068
  resetKeeperInfo(pSliceInfo);
1,720✔
1069
}
1,720✔
1070

1071
static void doHandleTimeslice(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
25,932✔
1072
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
25,932✔
1073

1074
  STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
25,932✔
1075
  SExprSupp*              pSup = &pOperator->exprSupp;
25,932✔
1076
  bool                    ignoreNull = getIgoreNullRes(pSup);
25,932✔
1077
  int32_t                 order = TSDB_ORDER_ASC;
25,932✔
1078

1079
  if (checkWindowBoundReached(pSliceInfo)) {
25,932✔
1080
    return;
3,953✔
1081
  }
1082

1083
  int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
21,979✔
1084
  if (code != TSDB_CODE_SUCCESS) {
21,979!
1085
    T_LONG_JMP(pTaskInfo->env, code);
×
1086
  }
1087

1088
  if (pSliceInfo->scalarSup.pExprInfo != NULL) {
21,979✔
1089
    SExprSupp* pExprSup = &pSliceInfo->scalarSup;
171✔
1090
    code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock, pExprSup->pCtx, pExprSup->numOfExprs, NULL,
171✔
1091
                                 GET_STM_RTINFO(pOperator->pTaskInfo));
171!
1092
    if (code != TSDB_CODE_SUCCESS) {
171!
1093
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1094
      T_LONG_JMP(pTaskInfo->env, code);
×
1095
    }
1096
  }
1097

1098
  // the pDataBlock are always the same one, no need to call this again
1099
  code = setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
21,979✔
1100
  if (code != TSDB_CODE_SUCCESS) {
21,978!
1101
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1102
    T_LONG_JMP(pTaskInfo->env, code);
×
1103
  }
1104
  doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo, ignoreNull);
21,978✔
1105
  code = copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKeys, pBlock);
21,979✔
1106
  if (code != TSDB_CODE_SUCCESS) {
21,979!
1107
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1108
    T_LONG_JMP(pTaskInfo->env, code);
×
1109
  }
1110
}
1111

1112
static int32_t doTimesliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
23,641✔
1113
  int32_t        code = TSDB_CODE_SUCCESS;
23,641✔
1114
  int32_t        lino = 0;
23,641✔
1115
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
23,641✔
1116
  if (pOperator->status == OP_EXEC_DONE) {
23,641✔
1117
    (*ppRes) = NULL;
1,884✔
1118
    return code;
1,884✔
1119
  }
1120

1121
  STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
21,757✔
1122
  SSDataBlock*            pResBlock = pSliceInfo->pRes;
21,757✔
1123

1124
  blockDataCleanup(pResBlock);
21,757✔
1125

1126
  while (1) {
1127
    if (pSliceInfo->pNextGroupRes != NULL) {
23,157✔
1128
      doHandleTimeslice(pOperator, pSliceInfo->pNextGroupRes);
1,708✔
1129
      if (checkWindowBoundReached(pSliceInfo) || checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
1,708!
1130
        code = doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
1,048✔
1131
        QUERY_CHECK_CODE(code, lino, _finished);
1,048!
1132
        if (pSliceInfo->pRemainRes == NULL) {
1,048!
1133
          pSliceInfo->pNextGroupRes = NULL;
1,048✔
1134
        }
1135
        if (pResBlock->info.rows != 0) {
1,048✔
1136
          goto _finished;
704✔
1137
        } else {
1138
          // after fillter if result block has 0 rows, go back to
1139
          // process pNextGroupRes again for unfinished data
1140
          continue;
344✔
1141
        }
1142
      }
1143
      pSliceInfo->pNextGroupRes = NULL;
660✔
1144
    }
1145

1146
    while (1) {
16,729✔
1147
      SSDataBlock* pBlock = pSliceInfo->pRemainRes ? pSliceInfo->pRemainRes : getNextBlockFromDownstream(pOperator, 0);
38,838✔
1148
      if (pBlock == NULL) {
38,838✔
1149
        setOperatorCompleted(pOperator);
12,894✔
1150
        break;
12,894✔
1151
      }
1152

1153
      pResBlock->info.scanFlag = pBlock->info.scanFlag;
25,944✔
1154
      if (pSliceInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
25,944✔
1155
        pSliceInfo->groupId = pBlock->info.id.groupId;
484✔
1156
      } else {
1157
        if (pSliceInfo->groupId != pBlock->info.id.groupId) {
25,460✔
1158
          pSliceInfo->groupId = pBlock->info.id.groupId;
1,720✔
1159
          pSliceInfo->pNextGroupRes = pBlock;
1,720✔
1160
          break;
1,720✔
1161
        }
1162
      }
1163

1164
      doHandleTimeslice(pOperator, pBlock);
24,224✔
1165
      if (checkWindowBoundReached(pSliceInfo) || checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
24,224✔
1166
        code = doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
11,506✔
1167
        QUERY_CHECK_CODE(code, lino, _finished);
11,506!
1168
        if (pResBlock->info.rows != 0) {
11,506✔
1169
          goto _finished;
7,495✔
1170
        }
1171
      }
1172
    }
1173
    // post work for a specific group
1174

1175
    // check if need to interpolate after last datablock
1176
    // except for fill(next), fill(linear)
1177
    genInterpAfterDataBlock(pSliceInfo, pOperator, 0);
14,614✔
1178

1179
    code = doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL);
14,614✔
1180
    QUERY_CHECK_CODE(code, lino, _finished);
14,614!
1181
    if (pOperator->status == OP_EXEC_DONE) {
14,614✔
1182
      break;
12,894✔
1183
    }
1184

1185
    // restore initial value for next group
1186
    resetTimesliceInfo(pSliceInfo);
1,720✔
1187
    if (pResBlock->info.rows != 0) {
1,720✔
1188
      break;
664✔
1189
    }
1190
  }
1191

1192
_finished:
21,757✔
1193
  // restore the value
1194
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
21,757✔
1195
  if (pResBlock->info.rows == 0) {
21,756✔
1196
    pOperator->status = OP_EXEC_DONE;
10,980✔
1197
  }
1198
  if (code != TSDB_CODE_SUCCESS) {
21,756!
1199
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1200
    pTaskInfo->code = code;
×
1201
    T_LONG_JMP(pTaskInfo->env, code);
×
1202
  }
1203

1204
  (*ppRes) = pResBlock->info.rows == 0 ? NULL : pResBlock;
21,756✔
1205
  return code;
21,756✔
1206
}
1207

1208
static int32_t extractPkColumnFromFuncs(SNodeList* pFuncs, bool* pHasPk, SColumn* pPkColumn) {
12,909✔
1209
  SNode* pNode;
1210
  FOREACH(pNode, pFuncs) {
70,175!
1211
    if ((nodeType(pNode) == QUERY_NODE_TARGET) && (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
57,397!
1212
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
57,397✔
1213
      if (fmIsInterpFunc(pFunc->funcId) && pFunc->hasPk) {
57,397✔
1214
        SNode* pNode2 = (pFunc->pParameterList->pTail->pNode);
132✔
1215
        if ((nodeType(pNode2) == QUERY_NODE_COLUMN) && ((SColumnNode*)pNode2)->isPk) {
132!
1216
          *pHasPk = true;
132✔
1217
          *pPkColumn = extractColumnFromColumnNode((SColumnNode*)pNode2);
132✔
1218
          break;
132✔
1219
        }
1220
      }
1221
    }
1222
  }
1223
  return TSDB_CODE_SUCCESS;
12,910✔
1224
}
1225

1226
/**
1227
 * @brief Determine the actual time range for reading data based on the RANGE clause and the WHERE conditions.
1228
 * @param[in] cond The range specified by WHERE condition.
1229
 * @param[in] range The range specified by RANGE clause.
1230
 * @param[out] twindow The range to be read in DESC order, and only one record is needed.
1231
 * @param[out] extTwindow The external range to read for only one record, which is used for FILL clause.
1232
 * @note `cond` and `twindow` may be the same address.
1233
 */
1234
static int32_t getQueryExtWindow(const STimeWindow* cond, const STimeWindow* range, STimeWindow* twindow,
11,845✔
1235
                                 STimeWindow* extTwindows) {
1236
  int32_t     code = TSDB_CODE_SUCCESS;
11,845✔
1237
  int32_t     lino = 0;
11,845✔
1238
  STimeWindow tempWindow;
1239

1240
  if (cond->skey > cond->ekey || range->skey > range->ekey) {
11,845!
1241
    *twindow = extTwindows[0] = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
9✔
1242
    return code;
9✔
1243
  }
1244

1245
  if (range->ekey < cond->skey) {
11,836✔
1246
    extTwindows[1] = *cond;
1,890✔
1247
    *twindow = extTwindows[0] = TSWINDOW_DESC_INITIALIZER;
1,890✔
1248
    return code;
1,890✔
1249
  }
1250

1251
  if (cond->ekey < range->skey) {
9,946✔
1252
    extTwindows[0] = *cond;
1,264✔
1253
    *twindow = extTwindows[1] = TSWINDOW_DESC_INITIALIZER;
1,264✔
1254
    return code;
1,264✔
1255
  }
1256

1257
  // Only scan data in the time range intersecion.
1258
  extTwindows[0] = extTwindows[1] = *cond;
8,682✔
1259
  twindow->skey = TMAX(cond->skey, range->skey);
8,682✔
1260
  twindow->ekey = TMIN(cond->ekey, range->ekey);
8,682✔
1261
  extTwindows[0].ekey = twindow->skey - 1;
8,682✔
1262
  extTwindows[1].skey = twindow->ekey + 1;
8,682✔
1263

1264
  return code;
8,682✔
1265
}
1266

1267
static int32_t resetTimeSliceOperState(SOperatorInfo* pOper) {
×
1268
  STimeSliceOperatorInfo* pInfo = pOper->info;
×
1269
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
×
1270
  SInterpFuncPhysiNode* pPhynode = (SInterpFuncPhysiNode*)pOper->pPhyNode;
×
1271
  pOper->status = OP_NOT_OPENED;
×
1272

1273
  setTaskStatus(pOper->pTaskInfo, TASK_NOT_COMPLETED);
×
1274

1275
  int32_t  code = resetExprSupp(&pOper->exprSupp, pTaskInfo, pPhynode->pFuncs, NULL,
×
1276
                         &pTaskInfo->storageAPI.functionStore);
1277
  if (code == 0) {
×
1278
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->pExprs, NULL,
×
1279
                         &pTaskInfo->storageAPI.functionStore);
1280
  }
1281

1282
  pInfo->current = pInfo->win.skey;
×
1283
  pInfo->prevTsSet = false;
×
1284
  pInfo->prevKey.ts = INT64_MIN;
×
1285
  pInfo->groupId = 0;
×
1286
  pInfo->pNextGroupRes = NULL;
×
1287
  pInfo->pRemainRes = NULL;
×
1288
  pInfo->remainIndex = 0;
×
1289

1290
  if (pInfo->hasPk) {
×
1291
    pInfo->prevKey.numOfPKs = 1;
×
1292
    pInfo->prevKey.pks[0].type = pInfo->pkCol.type;
×
1293

1294
    if (IS_VAR_DATA_TYPE(pInfo->pkCol.type)) {
×
1295
      memset(pInfo->prevKey.pks[0].pData, 0, pInfo->pkCol.bytes);
×
1296
    }
1297
  }
1298
  blockDataCleanup(pInfo->pRes);
×
1299

1300
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) {
×
1301
    SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);
×
1302
    taosMemoryFree(pKey->pData);
×
1303
  }
1304
  taosArrayDestroy(pInfo->pPrevRow);
×
1305
  pInfo->pPrevRow = NULL;
×
1306

1307
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pNextRow); ++i) {
×
1308
    SGroupKeys* pKey = taosArrayGet(pInfo->pNextRow, i);
×
1309
    taosMemoryFree(pKey->pData);
×
1310
  }
1311
  taosArrayDestroy(pInfo->pNextRow);
×
1312
  pInfo->pNextRow = NULL;
×
1313

1314
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
×
1315
    SFillLinearInfo* pKey = taosArrayGet(pInfo->pLinearInfo, i);
×
1316
    taosMemoryFree(pKey->start.val);
×
1317
    taosMemoryFree(pKey->end.val);
×
1318
  }
1319
  taosArrayDestroy(pInfo->pLinearInfo);
×
1320
  pInfo->pLinearInfo = NULL;
×
1321

1322
  if (pInfo->pPrevGroupKeys) {
×
1323
    taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey);
×
1324
    pInfo->pPrevGroupKeys = NULL;
×
1325
  }
1326

1327
  return code;
×
1328
}
1329

1330
int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
12,909✔
1331
  QRY_PARAM_CHECK(pOptrInfo);
12,909!
1332

1333
  int32_t                 code = 0;
12,909✔
1334
  int32_t                 lino = 0;
12,909✔
1335
  STimeSliceOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
12,909!
1336
  SOperatorInfo*          pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
12,908!
1337

1338
  if (pOperator == NULL || pInfo == NULL) {
12,908!
1339
    code = terrno;
×
1340
    goto _error;
×
1341
  }
1342

1343
  pOperator->pPhyNode = pPhyNode;
12,909✔
1344
  SInterpFuncPhysiNode* pInterpPhyNode = (SInterpFuncPhysiNode*)pPhyNode;
12,909✔
1345
  SExprSupp*            pSup = &pOperator->exprSupp;
12,909✔
1346

1347
  int32_t    numOfExprs = 0;
12,909✔
1348
  SExprInfo* pExprInfo = NULL;
12,909✔
1349
  code = createExprInfo(pInterpPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
12,909✔
1350
  QUERY_CHECK_CODE(code, lino, _error);
12,908!
1351

1352
  code = initExprSupp(pSup, pExprInfo, numOfExprs, &pTaskInfo->storageAPI.functionStore);
12,908✔
1353
  QUERY_CHECK_CODE(code, lino, _error);
12,907!
1354

1355
  if (pInterpPhyNode->pExprs != NULL) {
12,907✔
1356
    int32_t    num = 0;
149✔
1357
    SExprInfo* pScalarExprInfo = NULL;
149✔
1358
    code = createExprInfo(pInterpPhyNode->pExprs, NULL, &pScalarExprInfo, &num);
149✔
1359
    QUERY_CHECK_CODE(code, lino, _error);
149!
1360

1361
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num, &pTaskInfo->storageAPI.functionStore);
149✔
1362
    QUERY_CHECK_CODE(code, lino, _error);
149!
1363
  }
1364

1365
  code = filterInitFromNode((SNode*)pInterpPhyNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0,
12,907✔
1366
                            pTaskInfo->pStreamRuntimeInfo);
12,907✔
1367
  QUERY_CHECK_CODE(code, lino, _error);
12,907!
1368

1369
  pInfo->tsCol = extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries);
12,907✔
1370
  code = extractPkColumnFromFuncs(pInterpPhyNode->pFuncs, &pInfo->hasPk, &pInfo->pkCol);
12,909✔
1371
  QUERY_CHECK_CODE(code, lino, _error);
12,908!
1372

1373
  pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
12,908✔
1374
  initResultSizeInfo(&pOperator->resultInfo, 4096);
12,908✔
1375

1376
  pInfo->pFillColInfo =
12,907✔
1377
      createFillColInfo(pExprInfo, numOfExprs, NULL, 0, NULL, 0, (SNodeListNode*)pInterpPhyNode->pFillValues);
12,908✔
1378
  QUERY_CHECK_NULL(pInfo->pFillColInfo, code, lino, _error, terrno);
12,907!
1379

1380
  pInfo->pLinearInfo = NULL;
12,907✔
1381
  pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
12,907✔
1382
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
12,909!
1383
  pInfo->win = pInterpPhyNode->timeRange;
12,909✔
1384
  pInfo->interval.interval = pInterpPhyNode->interval;
12,909✔
1385
  pInfo->current = pInfo->win.skey;
12,909✔
1386
  pInfo->prevTsSet = false;
12,909✔
1387
  pInfo->prevKey.ts = INT64_MIN;
12,909✔
1388
  pInfo->groupId = 0;
12,909✔
1389
  pInfo->pPrevGroupKeys = NULL;
12,909✔
1390
  pInfo->pNextGroupRes = NULL;
12,909✔
1391
  pInfo->pRemainRes = NULL;
12,909✔
1392
  pInfo->remainIndex = 0;
12,909✔
1393
  pInfo->rangeInterval = pInterpPhyNode->rangeInterval;
12,909✔
1394

1395
  if (pInfo->hasPk) {
12,909✔
1396
    pInfo->prevKey.numOfPKs = 1;
132✔
1397
    pInfo->prevKey.ts = INT64_MIN;
132✔
1398
    pInfo->prevKey.pks[0].type = pInfo->pkCol.type;
132✔
1399

1400
    if (IS_VAR_DATA_TYPE(pInfo->pkCol.type)) {
132!
1401
      pInfo->prevKey.pks[0].pData = taosMemoryCalloc(1, pInfo->pkCol.bytes);
44!
1402
      QUERY_CHECK_NULL(pInfo->prevKey.pks[0].pData, code, lino, _error, terrno);
44!
1403
    }
1404
  }
1405

1406
  if (downstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN) {
12,909✔
1407
    STableScanInfo*      pScanInfo = (STableScanInfo*)downstream->info;
11,845✔
1408
    SQueryTableDataCond* cond = &pScanInfo->base.cond;
11,845✔
1409
    cond->type = TIMEWINDOW_RANGE_EXTERNAL;
11,845✔
1410
    code = getQueryExtWindow(&cond->twindows, &pInfo->win, &cond->twindows, cond->extTwindows);
11,845✔
1411
    QUERY_CHECK_CODE(code, lino, _error);
11,846!
1412
  }
1413
  
1414
  setOperatorInfo(pOperator, "TimeSliceOperator", QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED, pInfo,
12,910✔
1415
                  pTaskInfo);
1416
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTimesliceNext, NULL, destroyTimeSliceOperatorInfo,
12,908✔
1417
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1418

1419
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
12,908✔
1420
  QUERY_CHECK_CODE(code, lino, _error);
12,909!
1421

1422
  //  int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
1423
  setOperatorResetStateFn(pOperator, resetTimeSliceOperState);
12,909✔
1424
  
1425
  code = appendDownstream(pOperator, &downstream, 1);
12,909✔
1426
  QUERY_CHECK_CODE(code, lino, _error);
12,908!
1427

1428
  *pOptrInfo = pOperator;
12,908✔
1429
  return TSDB_CODE_SUCCESS;
12,908✔
1430

1431
_error:
×
1432
  if (code != TSDB_CODE_SUCCESS) {
×
1433
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1434
  }
1435
  if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo);
×
1436
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1437
  pTaskInfo->code = code;
×
1438
  return code;
×
1439
}
1440

1441
void destroyTimeSliceOperatorInfo(void* param) {
12,909✔
1442
  STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param;
12,909✔
1443

1444
  blockDataDestroy(pInfo->pRes);
12,909✔
1445
  pInfo->pRes = NULL;
12,909✔
1446

1447
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) {
40,044✔
1448
    SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);
27,135✔
1449
    taosMemoryFree(pKey->pData);
27,135!
1450
  }
1451
  taosArrayDestroy(pInfo->pPrevRow);
12,909✔
1452

1453
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pNextRow); ++i) {
40,044✔
1454
    SGroupKeys* pKey = taosArrayGet(pInfo->pNextRow, i);
27,135✔
1455
    taosMemoryFree(pKey->pData);
27,135!
1456
  }
1457
  taosArrayDestroy(pInfo->pNextRow);
12,909✔
1458

1459
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
40,044✔
1460
    SFillLinearInfo* pKey = taosArrayGet(pInfo->pLinearInfo, i);
27,135✔
1461
    taosMemoryFree(pKey->start.val);
27,135!
1462
    taosMemoryFree(pKey->end.val);
27,135!
1463
  }
1464
  taosArrayDestroy(pInfo->pLinearInfo);
12,909✔
1465

1466
  if (pInfo->pPrevGroupKeys) {
12,909✔
1467
    taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey);
9,721✔
1468
    pInfo->pPrevGroupKeys = NULL;
9,721✔
1469
  }
1470
  if (pInfo->hasPk && IS_VAR_DATA_TYPE(pInfo->pkCol.type)) {
12,909!
1471
    taosMemoryFreeClear(pInfo->prevKey.pks[0].pData);
44!
1472
  }
1473

1474
  cleanupExprSupp(&pInfo->scalarSup);
12,909✔
1475
  if (pInfo->pFillColInfo != NULL) {
12,909!
1476
    for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) {
70,414✔
1477
      taosVariantDestroy(&pInfo->pFillColInfo[i].fillVal);
57,505✔
1478
    }
1479
    taosMemoryFree(pInfo->pFillColInfo);
12,909!
1480
  }
1481
  taosMemoryFreeClear(param);
12,909!
1482
}
12,909✔
1483

1484
int64_t getMinWindowSize(struct SOperatorInfo* pOperator) {
571,995✔
1485
  if (pOperator == NULL) {
571,995!
1486
    return 0;
×
1487
  }
1488

1489
  switch (pOperator->operatorType) {
571,995✔
1490
    case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
3,135✔
1491
      return ((SStateWindowOperatorInfo*)pOperator->info)->trueForLimit;
3,135✔
1492
    case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
1,282✔
1493
      return ((SEventWindowOperatorInfo*)pOperator->info)->trueForLimit;
1,282✔
1494
    default:
567,578✔
1495
      return 0;
567,578✔
1496
  }
1497
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2025 Coveralls, Inc