• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #5011

03 Apr 2026 03:59PM UTC coverage: 72.3% (+0.008%) from 72.292%
#5011

push

travis-ci

web-flow
merge: from main to 3.0 branch #35067

4053 of 5985 new or added lines in 68 files covered. (67.72%)

732 existing lines in 143 files now uncovered.

257430 of 356056 relevant lines covered (72.3%)

131834103.52 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

90.53
/source/libs/executor/src/timesliceoperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15
#include "executorInt.h"
16
#include "filter.h"
17
#include "function.h"
18
#include "functionMgt.h"
19
#include "operator.h"
20
#include "querytask.h"
21
#include "storageapi.h"
22
#include "tcommon.h"
23
#include "tcompare.h"
24
#include "tdatablock.h"
25
#include "tfill.h"
26
#include "ttime.h"
27

28
typedef struct STimeSliceOperatorInfo {
29
  SSDataBlock*         pRes;
30
  STimeWindow          win;
31
  SNode*               pWin;        // for stream
32
  SInterval            interval;
33
  int64_t              current;
34
  SArray*              pPrevRow;     // SArray<SGroupValue>
35
  SArray*              pNextRow;     // SArray<SGroupValue>
36
  SArray*              pLinearInfo;  // SArray<SFillLinearInfo>
37
  bool                 isPrevRowSet;
38
  bool                 isNextRowSet;
39
  int32_t              fillType;      // fill type
40
  SColumn              tsCol;         // primary timestamp column
41
  SExprSupp            scalarSup;     // scalar calculation
42
  struct SFillColInfo* pFillColInfo;  // fill column info
43
  SRowKey              prevKey;       // record previous row key
44
  bool                 prevTsSet;     // denotes if previous timestamp is set
45
  uint64_t             groupId;
46
  SArray*              pPrevGroupKeys;
47
  SSDataBlock*         pNextGroupRes;
48
  SSDataBlock*         pRemainRes;   // save block unfinished processing
49
  int32_t              remainIndex;  // the remaining index in the block to be processed
50
  bool                 hasPk;
51
  SColumn              pkCol;
52
  bool                 prevNotified;
53
  bool                 nextNotified;
54
  int64_t              surroundingTime;
55
} STimeSliceOperatorInfo;
56

57
static void destroyTimeSliceOperatorInfo(void* param);
58

59
static void doKeepPrevRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
2,147,483,647✔
60
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
2,147,483,647✔
61
  for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
62
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
2,147,483,647✔
63

64
    SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, i);
2,147,483,647✔
65
    if (!colDataIsNull_s(pColInfoData, rowIndex)) {
2,147,483,647✔
66
      pkey->isNull = false;
2,147,483,647✔
67
      char* val = colDataGetData(pColInfoData, rowIndex);
2,147,483,647✔
68
      if (IS_VAR_DATA_TYPE(pkey->type)) {
2,147,483,647✔
69
        int32_t bytes = calcStrBytesByType(pkey->type, val);
1,019,080,105✔
70
        memcpy(pkey->pData, val, bytes);
1,018,010,497✔
71
      } else {
72
        memcpy(pkey->pData, val, pkey->bytes);
2,147,483,647✔
73
      }
74
    } else {
75
      pkey->isNull = true;
503,543,384✔
76
    }
77
  }
78

79
  pSliceInfo->isPrevRowSet = true;
2,147,483,647✔
80
}
2,147,483,647✔
81

82
static void doKeepNextRows(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
280,487,371✔
83
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
280,487,371✔
84
  for (int32_t i = 0; i < numOfCols; ++i) {
1,123,348,648✔
85
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
842,853,353✔
86

87
    SGroupKeys* pkey = taosArrayGet(pSliceInfo->pNextRow, i);
842,771,420✔
88
    if (!colDataIsNull_s(pColInfoData, rowIndex)) {
1,685,509,999✔
89
      pkey->isNull = false;
841,657,587✔
90
      char* val = colDataGetData(pColInfoData, rowIndex);
841,659,212✔
91
      if (!IS_VAR_DATA_TYPE(pkey->type)) {
841,810,292✔
92
        memcpy(pkey->pData, val, pkey->bytes);
841,251,460✔
93
      } else {
94
        int32_t bytes = calcStrBytesByType(pkey->type, val);
83,952✔
95
        memcpy(pkey->pData, val, bytes);
590,532✔
96
      }
97
    } else {
98
      pkey->isNull = true;
1,120,141✔
99
    }
100
  }
101

102
  pSliceInfo->isNextRowSet = true;
280,495,295✔
103
}
280,495,295✔
104

105
static void doKeepLinearInfo(STimeSliceOperatorInfo* pSliceInfo, const SSDataBlock* pBlock, int32_t rowIndex) {
2,147,483,647✔
106
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
2,147,483,647✔
107
  for (int32_t i = 0; i < numOfCols; ++i) {
2,147,483,647✔
108
    SColumnInfoData* pColInfoData = taosArrayGet(pBlock->pDataBlock, i);
2,147,483,647✔
109
    SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
2,147,483,647✔
110
    SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, i);
2,147,483,647✔
111

112
    if (!IS_MATHABLE_TYPE(pColInfoData->info.type)) {
2,147,483,647✔
113
      continue;
1,018,010,497✔
114
    }
115

116
    // null value is represented by using key = INT64_MIN for now.
117
    // TODO: optimize to ignore null values for linear interpolation.
118
    if (!pLinearInfo->isStartSet) {
2,147,483,647✔
119
      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
21,383,851✔
120
        pLinearInfo->start.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
10,658,830✔
121
        char* p = colDataGetData(pColInfoData, rowIndex);
10,658,319✔
122
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
10,659,259✔
123
          if (IS_STR_DATA_BLOB(pColInfoData->info.type)) {
×
124
            memcpy(pLinearInfo->start.val, p, blobDataTLen(p));
×
125
          } else {
126
            memcpy(pLinearInfo->start.val, p, varDataTLen(p));
×
127
          }
128
        } else {
129
          memcpy(pLinearInfo->start.val, p, pLinearInfo->bytes);
10,659,259✔
130
        }
131
      }
132
      pLinearInfo->isStartSet = true;
10,692,181✔
133
    } else if (!pLinearInfo->isEndSet) {
2,147,483,647✔
134
      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
14,799,614✔
135
        pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
7,355,086✔
136

137
        char* p = colDataGetData(pColInfoData, rowIndex);
7,355,086✔
138
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
7,355,086✔
UNCOV
139
          if (IS_STR_DATA_BLOB(pColInfoData->info.type)) {
×
140
            memcpy(pLinearInfo->end.val, p, blobDataTLen(p));
×
141
          } else {
142
            memcpy(pLinearInfo->end.val, p, varDataTLen(p));
×
143
          }
144
        } else {
145
          memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes);
7,355,086✔
146
        }
147
      }
148
      pLinearInfo->isEndSet = true;
7,399,378✔
149
    } else {
150
      pLinearInfo->start.key = pLinearInfo->end.key;
2,147,483,647✔
151
      memcpy(pLinearInfo->start.val, pLinearInfo->end.val, pLinearInfo->bytes);
2,147,483,647✔
152

153
      if (!colDataIsNull_s(pColInfoData, rowIndex)) {
2,147,483,647✔
154
        pLinearInfo->end.key = *(int64_t*)colDataGetData(pTsCol, rowIndex);
2,147,483,647✔
155

156
        char* p = colDataGetData(pColInfoData, rowIndex);
2,147,483,647✔
157
        if (IS_VAR_DATA_TYPE(pColInfoData->info.type)) {
2,147,483,647✔
158
          if (IS_STR_DATA_BLOB(pColInfoData->info.type)) {
869,583✔
159
            memcpy(pLinearInfo->end.val, p, blobDataTLen(p));
×
160
          } else {
161
            memcpy(pLinearInfo->end.val, p, varDataTLen(p));
×
162
          }
163
        } else {
164
          memcpy(pLinearInfo->end.val, p, pLinearInfo->bytes);
2,147,483,647✔
165
        }
166

167
      } else {
168
        pLinearInfo->end.key = INT64_MIN;
502,931,149✔
169
      }
170
    }
171
  }
172
}
2,147,483,647✔
173

174
static FORCE_INLINE int32_t timeSliceEnsureBlockCapacity(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock) {
175
  if (pBlock->info.rows < pBlock->info.capacity) {
701,734,239✔
176
    return TSDB_CODE_SUCCESS;
701,732,035✔
177
  }
178

179
  uint32_t winNum = (pSliceInfo->win.ekey - pSliceInfo->win.skey) / pSliceInfo->interval.interval;
2,556✔
180
  uint32_t newRowsNum = pBlock->info.rows + TMIN(winNum / 4 + 1, 1048576);
2,556✔
181
  int32_t  code = blockDataEnsureCapacity(pBlock, newRowsNum);
2,556✔
182
  if (code != TSDB_CODE_SUCCESS) {
2,556✔
183
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
184
    return code;
×
185
  }
186

187
  return TSDB_CODE_SUCCESS;
2,556✔
188
}
189

190
bool isIrowtsPseudoColumn(SExprInfo* pExprInfo) {
2,147,483,647✔
191
  char* name = pExprInfo->pExpr->_function.functionName;
2,147,483,647✔
192
  return (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_irowts") == 0);
2,147,483,647✔
193
}
194

195
bool isIsfilledPseudoColumn(SExprInfo* pExprInfo) {
2,147,483,647✔
196
  char* name = pExprInfo->pExpr->_function.functionName;
2,147,483,647✔
197
  return (IS_BOOLEAN_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_isfilled") == 0);
2,147,483,647✔
198
}
199

200
bool isIrowtsOriginPseudoColumn(SExprInfo* pExprInfo) {
1,951,056,252✔
201
  const char* name = pExprInfo->pExpr->_function.functionName;
1,951,056,252✔
202
  return (IS_TIMESTAMP_TYPE(pExprInfo->base.resSchema.type) && strcasecmp(name, "_irowts_origin") == 0);
1,951,067,299✔
203
}
204

205
static void tRowGetKeyFromColData(int64_t ts, SColumnInfoData* pPkCol, int32_t rowIndex, SRowKey* pKey) {
×
206
  pKey->ts = ts;
×
207
  pKey->numOfPKs = 1;
×
208

209
  int8_t t = pPkCol->info.type;
×
210

211
  pKey->pks[0].type = t;
×
212
  if (IS_NUMERIC_TYPE(t)) {
×
213
    valueSetDatum(pKey->pks, t, colDataGetData(pPkCol, rowIndex), tDataTypes[t].bytes);
×
214
  } else {
215
    char* p = colDataGetVarData(pPkCol, rowIndex);
×
216
    pKey->pks[0].pData = (uint8_t*)varDataVal(p);
×
217
    pKey->pks[0].nData = varDataLen(p);
×
218
  }
219
}
×
220

221
typedef enum {
222
  INVALID_TIMESTAMP_REASON_NONE = 0,  /* not invalid */
223
  INVALID_TIMESTAMP_REASON_PREV_TS_EQUAL = 1,
224
  INVALID_TIMESTAMP_REASON_PREV_TS_SMALLER = 2,
225
} EInvalidTimestampReason;
226

227
/**
228
  @brief Timestamp is invalid if current timestamp <= previous timestamp.
229
  Only timestamp is considered even if composite primary key exists.
230
*/
231
static EInvalidTimestampReason isInvalidTimestamp(
2,147,483,647✔
232
  STimeSliceOperatorInfo* pSliceInfo, int64_t currentTs,
233
  SColumnInfoData* pPkCol, int32_t curIndex) {
234
  if (currentTs > pSliceInfo->win.ekey) {
2,147,483,647✔
235
    return INVALID_TIMESTAMP_REASON_NONE;
1,757,047✔
236
  }
237
  if (pSliceInfo->prevTsSet && currentTs <= pSliceInfo->prevKey.ts) {
2,147,483,647✔
238
    /**
239
      Input data of time slice operator must be ordered by
240
      timestamp ascendingly, except the prev scan.
241
      So prevTs should never be updated to equal or smaller timestamp.
242
    */
243
    return currentTs == pSliceInfo->prevKey.ts ?
290,894,659✔
244
      INVALID_TIMESTAMP_REASON_PREV_TS_EQUAL :
290,894,659✔
245
      INVALID_TIMESTAMP_REASON_PREV_TS_SMALLER;
246
  }
247

248
  SRowKey cur = {.ts = currentTs, .numOfPKs = (pPkCol != NULL) ? 1 : 0};
2,147,483,647✔
249
  if (pPkCol != NULL) {
2,147,483,647✔
250
    cur.pks[0].type = pPkCol->info.type;
2,063,600✔
251
    if (IS_VAR_DATA_TYPE(pPkCol->info.type)) {
2,063,600✔
252
      cur.pks[0].pData = (uint8_t*)colDataGetVarData(pPkCol, curIndex);
648,560✔
253
    } else {
254
      valueSetDatum(cur.pks, pPkCol->info.type,
1,415,040✔
255
                    colDataGetData(pPkCol, curIndex), pPkCol->info.bytes);
1,415,040✔
256
    }
257
  }
258

259
  pSliceInfo->prevTsSet = true;
2,147,483,647✔
260
  tRowKeyAssign(&pSliceInfo->prevKey, &cur);
2,147,483,647✔
261

262
  return INVALID_TIMESTAMP_REASON_NONE;
2,147,483,647✔
263
}
264

265
bool isInterpFunc(SExprInfo* pExprInfo) {
2,147,483,647✔
266
  int32_t functionType = pExprInfo->pExpr->_function.functionType;
2,147,483,647✔
267
  return (functionType == FUNCTION_TYPE_INTERP);
2,147,483,647✔
268
}
269

270
static bool isGroupKeyFunc(SExprInfo* pExprInfo) {
74,259,664✔
271
  int32_t functionType = pExprInfo->pExpr->_function.functionType;
74,259,664✔
272
  return (functionType == FUNCTION_TYPE_GROUP_KEY);
74,259,664✔
273
}
274

275
static bool isSelectGroupConstValueFunc(SExprInfo* pExprInfo) {
714,603✔
276
  int32_t functionType = pExprInfo->pExpr->_function.functionType;
714,603✔
277
  return (functionType == FUNCTION_TYPE_GROUP_CONST_VALUE);
714,603✔
278
}
279

280
bool getIgoreNullRes(SExprSupp* pExprSup) {
16,052,746✔
281
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
82,082,279✔
282
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];
69,616,955✔
283

284
    if (isInterpFunc(pExprInfo)) {
69,616,526✔
285
      for (int32_t j = 0; j < pExprInfo->base.numOfParams; ++j) {
79,977,003✔
286
        SFunctParam* pFuncParam = &pExprInfo->base.pParam[j];
55,744,010✔
287
        if (pFuncParam->type == FUNC_PARAM_TYPE_VALUE) {
55,744,010✔
288
          return pFuncParam->param.i ? true : false;
3,587,422✔
289
        }
290
      }
291
    }
292
  }
293

294
  return false;
12,465,324✔
295
}
296

297
bool checkNullRow(SExprSupp* pExprSup, SSDataBlock* pSrcBlock, int32_t index, bool ignoreNull) {
2,147,483,647✔
298
  if (!ignoreNull) {
2,147,483,647✔
299
    return false;
2,147,483,647✔
300
  }
301

302
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
2,147,483,647✔
303
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
2,147,483,647✔
304

305
    if (isInterpFunc(pExprInfo)) {
2,147,483,647✔
306
      int32_t          srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
2,147,483,647✔
307
      SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
2,147,483,647✔
308

309
      if (colDataIsNull_s(pSrc, index)) {
2,147,483,647✔
310
        return true;
153,381,142✔
311
      }
312
    }
313
  }
314

315
  return false;
2,034,572,872✔
316
}
317

318
static int32_t interpColSetKey(SColumnInfoData* pDst, int32_t rowNum, SGroupKeys* pKey) {
1,346,382,161✔
319
  int32_t code = 0;
1,346,382,161✔
320
  if (pKey->isNull == false) {
1,346,382,161✔
321
    code = colDataSetVal(pDst, rowNum, pKey->pData, false);
1,346,315,798✔
322
  } else {
323
    colDataSetNULL(pDst, rowNum);
71,410✔
324
  }
325
  return code;
1,346,530,322✔
326
}
327

328
/**
329
  @brief Check if the time difference between the fill reference row and
330
  target row exceeds surroundingTime, if so, set the fill row to NULL and use
331
  fillVal to fill.
332
*/
333
static void checkSurroundingTime(const STimeSliceOperatorInfo* pSliceInfo,
519,098,528✔
334
                                 SArray** ppFillRow, SArray* pFillRefRow,
335
                                 int64_t fillRefRowTs) {
336
  *ppFillRow = NULL;
519,098,528✔
337
  uint64_t diff = safe_abs_diff_i64(fillRefRowTs, pSliceInfo->current);
519,102,119✔
338
  if (pSliceInfo->surroundingTime > 0 &&
519,106,390✔
339
      diff > (uint64_t)pSliceInfo->surroundingTime) {
2,865,067✔
340
    return;
922,530✔
341
  }
342
  *ppFillRow = pFillRefRow;
518,185,229✔
343
}
344

345
static bool interpDetermineNearFillRow(STimeSliceOperatorInfo* pSliceInfo, SArray** ppNearRow) {
414,448,586✔
346
  if (!pSliceInfo->isPrevRowSet && !pSliceInfo->isNextRowSet) {
414,448,586✔
347
    *ppNearRow = NULL;
×
348
    return false;
×
349
  }
350
  SGroupKeys *pPrevTsKey = NULL, *pNextTsKey = NULL;
414,449,092✔
351
  int64_t    *pPrevTs = NULL, *pNextTs = NULL;
414,449,092✔
352
  if (pSliceInfo->isPrevRowSet) {
414,449,092✔
353
    pPrevTsKey = taosArrayGet(pSliceInfo->pPrevRow, pSliceInfo->tsCol.slotId);
327,597,741✔
354
    pPrevTs = (int64_t*)pPrevTsKey->pData;
327,591,484✔
355
  }
356
  if (pSliceInfo->isNextRowSet) {
414,441,982✔
357
    pNextTsKey = taosArrayGet(pSliceInfo->pNextRow, pSliceInfo->tsCol.slotId);
390,758,768✔
358
    pNextTs = (int64_t*)pNextTsKey->pData;
390,745,127✔
359
  }
360
  if (!pPrevTsKey) {
414,427,946✔
361
    *ppNearRow = pSliceInfo->pNextRow;
86,849,332✔
362
    checkSurroundingTime(pSliceInfo, ppNearRow, pSliceInfo->pNextRow, *pNextTs);
86,849,761✔
363
  } else if (!pNextTsKey) {
327,578,614✔
364
    *ppNearRow = pSliceInfo->pPrevRow;
23,683,407✔
365
    checkSurroundingTime(pSliceInfo, ppNearRow, pSliceInfo->pPrevRow, *pPrevTs);
23,683,407✔
366
  } else {
367
    if (llabs(pSliceInfo->current - *pPrevTs) <= 
303,895,207✔
368
        llabs(*pNextTs - pSliceInfo->current)) {
303,898,764✔
369
      /* take prev if euqal */
370
      checkSurroundingTime(pSliceInfo, ppNearRow, pSliceInfo->pPrevRow,
167,312,843✔
371
                           *pPrevTs);
372
    } else {
373
      checkSurroundingTime(pSliceInfo, ppNearRow, pSliceInfo->pNextRow,
136,598,545✔
374
                           *pNextTs);
375
    }
376
  }
377
  return true;
414,430,973✔
378
}
379

380
static bool interpDetermineFillRefRow(STimeSliceOperatorInfo* pSliceInfo, SArray** ppOutRow) {
697,706,035✔
381
  bool needFill = false;
697,706,035✔
382
  if (pSliceInfo->fillType == TSDB_FILL_PREV) {
697,706,035✔
383
    if (pSliceInfo->isPrevRowSet) {
65,798,020✔
384
      SGroupKeys* pTsCol = taosArrayGet(pSliceInfo->pPrevRow, pSliceInfo->tsCol.slotId);
65,547,079✔
385
      checkSurroundingTime(pSliceInfo, ppOutRow, pSliceInfo->pPrevRow,
65,547,079✔
386
                           *(int64_t*)pTsCol->pData);
65,547,079✔
387
      needFill = true;
65,547,079✔
388
    }
389
  } else if (pSliceInfo->fillType == TSDB_FILL_NEXT) {
631,907,157✔
390
    if (pSliceInfo->isNextRowSet) {
39,128,097✔
391
      SGroupKeys* pTsCol = taosArrayGet(pSliceInfo->pNextRow, pSliceInfo->tsCol.slotId);
39,128,097✔
392
      checkSurroundingTime(pSliceInfo, ppOutRow, pSliceInfo->pNextRow,
39,128,097✔
393
                           *(int64_t*)pTsCol->pData);
39,128,097✔
394
      needFill = true;
39,128,097✔
395
    }
396
  } else if (pSliceInfo->fillType == TSDB_FILL_NEAR) {
592,779,407✔
397
    needFill = interpDetermineNearFillRow(pSliceInfo, ppOutRow);
414,449,246✔
398
  } else {
399
    needFill = true;
178,330,161✔
400
  }
401
  return needFill;
697,686,740✔
402
}
403

404
static bool genInterpolationResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
697,417,204✔
405
                                   SSDataBlock* pSrcBlock, int32_t index, bool beforeTs, SExecTaskInfo* pTaskInfo) {
406
  int32_t code = TSDB_CODE_SUCCESS;
697,417,204✔
407
  int32_t lino = 0;
697,417,204✔
408
  int32_t rows = pResBlock->info.rows;
697,417,204✔
409
  code = timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock);
697,706,469✔
410
  QUERY_CHECK_CODE(code, lino, _end);
697,706,469✔
411
  // todo set the correct primary timestamp column
412

413
  // output the result
414
  int32_t fillColIndex = 0;
697,706,469✔
415
  int32_t groupKeyIndex = 0;
697,706,469✔
416
  bool    hasInterp = true;
697,706,469✔
417
  SArray* pFillRefRow = NULL;
697,706,469✔
418
  bool    needFill = interpDetermineFillRefRow(pSliceInfo, &pFillRefRow);
697,701,234✔
419
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
2,147,483,647✔
420
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
2,147,483,647✔
421

422
    int32_t          dstSlot = pExprInfo->base.resSchema.slotId;
2,147,483,647✔
423
    SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
2,147,483,647✔
424

425
    if (isIrowtsPseudoColumn(pExprInfo)) {
2,147,483,647✔
426
      code = colDataSetVal(pDst, rows, (char*)&pSliceInfo->current, false);
675,118,980✔
427
      QUERY_CHECK_CODE(code, lino, _end);
675,120,195✔
428
      continue;
675,120,195✔
429
    } else if (isIsfilledPseudoColumn(pExprInfo)) {
2,147,483,647✔
430
      bool isFilled = true;
674,320,257✔
431
      code = colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false);
674,320,257✔
432
      QUERY_CHECK_CODE(code, lino, _end);
674,329,107✔
433
      continue;
674,329,107✔
434
    } else if (!isInterpFunc(pExprInfo) && !isIrowtsOriginPseudoColumn(pExprInfo)) {
1,529,004,996✔
435
      if (isGroupKeyFunc(pExprInfo) || isSelectGroupConstValueFunc(pExprInfo)) {
1,980,195✔
436
        if (pSrcBlock != NULL) {
1,980,195✔
437
          int32_t          srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
1,432,815✔
438
          SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
1,432,815✔
439

440
          if (colDataIsNull_s(pSrc, index)) {
2,865,630✔
441
            colDataSetNULL(pDst, pResBlock->info.rows);
6,006✔
442
            continue;
6,006✔
443
          }
444

445
          char* v = colDataGetData(pSrc, index);
1,426,809✔
446
          code = colDataSetVal(pDst, pResBlock->info.rows, v, false);
1,426,809✔
447
          QUERY_CHECK_CODE(code, lino, _end);
1,426,809✔
448
        } else if (!isSelectGroupConstValueFunc(pExprInfo)) {
547,380✔
449
          // use stored group key
450
          SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevGroupKeys, groupKeyIndex);
511,533✔
451
          QUERY_CHECK_NULL(pkey, code, lino, _end, terrno);
511,533✔
452
          groupKeyIndex++;
511,533✔
453
          if (pkey->isNull == false) {
511,533✔
454
            code = colDataSetVal(pDst, rows, pkey->pData, false);
511,533✔
455
            QUERY_CHECK_CODE(code, lino, _end);
511,533✔
456
          } else {
457
            colDataSetNULL(pDst, rows);
×
458
          }
459
        } else {
460
          int32_t     srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
35,847✔
461
          SGroupKeys* pkey = taosArrayGet(pSliceInfo->pPrevRow, srcSlot);
35,847✔
462
          if (pkey->isNull == false) {
35,847✔
463
            code = colDataSetVal(pDst, rows, pkey->pData, false);
35,847✔
464
            QUERY_CHECK_CODE(code, lino, _end);
35,847✔
465
          } else {
466
            colDataSetNULL(pDst, rows);
×
467
          }
468
        }
469
      }
470
      continue;
1,974,189✔
471
    }
472

473
    int32_t srcSlot =
1,527,258,338✔
474
        isIrowtsOriginPseudoColumn(pExprInfo) ? pSliceInfo->tsCol.slotId : pExprInfo->base.pParam[0].pCol->slotId;
1,527,136,636✔
475
    switch (pSliceInfo->fillType) {
1,527,258,338✔
476
      case TSDB_FILL_NULL:
38,995,761✔
477
      case TSDB_FILL_NULL_F: {
478
        colDataSetNULL(pDst, rows);
38,995,761✔
479
        break;
38,995,761✔
480
      }
481

482
      case TSDB_FILL_PREV:
1,348,564,876✔
483
      case TSDB_FILL_NEAR:
484
      case TSDB_FILL_NEXT: {
485
        if (!needFill) {
1,348,564,876✔
486
          hasInterp = false;
357,726✔
487
          break;
357,726✔
488
        }
489
        if (pFillRefRow) {
1,348,207,150✔
490
          code = interpColSetKey(pDst, rows, taosArrayGet(pFillRefRow, srcSlot));
1,346,263,838✔
491
          QUERY_CHECK_CODE(code, lino, _end);
1,346,529,893✔
492
          break;
1,346,529,893✔
493
        }
494
        // no fillRefRow, fall through to fill specified values
495
        if (srcSlot == pSliceInfo->tsCol.slotId) {
1,943,312✔
496
          // if is _irowts_origin, there is no value to fill, just set to null
497
          colDataSetNULL(pDst, rows);
631,827✔
498
          break;
631,827✔
499
        }
500
      }
501
      case TSDB_FILL_SET_VALUE:
502
      case TSDB_FILL_SET_VALUE_F: {
503
        SVariant* pVar = &pSliceInfo->pFillColInfo[fillColIndex].fillVal;
104,943,178✔
504

505
        bool isNull = (TSDB_DATA_TYPE_NULL == pVar->nType) ? true : false;
105,184,866✔
506
        if (pDst->info.type == TSDB_DATA_TYPE_FLOAT) {
105,184,866✔
507
          float v = 0;
71,760✔
508
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
71,760✔
509
            GET_TYPED_DATA(v, float, pVar->nType, &pVar->f, 0);
71,760✔
510
          } else {
511
            v = taosStr2Float(varDataVal(pVar->pz), NULL);
×
512
          }
513
          code = colDataSetVal(pDst, rows, (char*)&v, isNull);
71,760✔
514
          QUERY_CHECK_CODE(code, lino, _end);
71,760✔
515
        } else if (pDst->info.type == TSDB_DATA_TYPE_DOUBLE) {
105,113,106✔
516
          double v = 0;
6,345,615✔
517
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
6,345,615✔
518
            GET_TYPED_DATA(v, double, pVar->nType, &pVar->d, 0);
6,345,615✔
519
          } else {
520
            v = taosStr2Double(varDataVal(pVar->pz), NULL);
×
521
          }
522
          code = colDataSetVal(pDst, rows, (char*)&v, isNull);
6,345,615✔
523
          QUERY_CHECK_CODE(code, lino, _end);
6,345,615✔
524
        } else if (IS_SIGNED_NUMERIC_TYPE(pDst->info.type)) {
197,276,646✔
525
          int64_t v = 0;
98,509,155✔
526
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
98,509,155✔
527
            GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i, 0);
98,509,155✔
528
          } else {
529
            v = taosStr2Int64(varDataVal(pVar->pz), NULL, 10);
×
530
          }
531
          code = colDataSetVal(pDst, rows, (char*)&v, isNull);
98,509,155✔
532
          QUERY_CHECK_CODE(code, lino, _end);
98,509,155✔
533
        } else if (IS_UNSIGNED_NUMERIC_TYPE(pDst->info.type)) {
326,976✔
534
          uint64_t v = 0;
68,640✔
535
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
68,640✔
536
            GET_TYPED_DATA(v, uint64_t, pVar->nType, &pVar->u, 0);
68,640✔
537
          } else {
538
            v = taosStr2UInt64(varDataVal(pVar->pz), NULL, 10);
×
539
          }
540
          code = colDataSetVal(pDst, rows, (char*)&v, isNull);
68,640✔
541
          QUERY_CHECK_CODE(code, lino, _end);
68,640✔
542
        } else if (IS_BOOLEAN_TYPE(pDst->info.type)) {
189,696✔
543
          bool v = false;
189,696✔
544
          if (!IS_VAR_DATA_TYPE(pVar->nType)) {
189,696✔
545
            GET_TYPED_DATA(v, bool, pVar->nType, &pVar->i, 0);
189,696✔
546
          } else {
547
            v = taosStr2Int8(varDataVal(pVar->pz), NULL, 10);
×
548
          }
549
          code = colDataSetVal(pDst, rows, (char*)&v, isNull);
189,696✔
550
          QUERY_CHECK_CODE(code, lino, _end);
189,696✔
551
        }
552

553
        ++fillColIndex;
105,184,866✔
554
        break;
105,184,866✔
555
      }
556

557
      case TSDB_FILL_LINEAR: {
35,575,452✔
558
        SFillLinearInfo* pLinearInfo = taosArrayGet(pSliceInfo->pLinearInfo, srcSlot);
35,575,452✔
559

560
        SPoint start = pLinearInfo->start;
35,575,452✔
561
        SPoint end = pLinearInfo->end;
35,575,452✔
562
        SPoint current = {.key = pSliceInfo->current};
35,575,452✔
563

564
        // do not interpolate before ts range, only increate pSliceInfo->current
565
        if (beforeTs && !pLinearInfo->isEndSet) {
35,575,452✔
566
          return true;
218,580✔
567
        }
568

569
        if (!pLinearInfo->isStartSet || !pLinearInfo->isEndSet) {
35,356,872✔
570
          hasInterp = false;
×
571
          break;
×
572
        }
573

574
        if (end.key != INT64_MIN && end.key < pSliceInfo->current) {
35,356,872✔
575
          hasInterp = false;
×
576
          break;
×
577
        }
578

579
        if (start.key == INT64_MIN || end.key == INT64_MIN) {
35,356,872✔
580
          colDataSetNULL(pDst, rows);
6,357,657✔
581
          break;
6,357,657✔
582
        }
583

584
        current.val = taosMemoryCalloc(pLinearInfo->bytes, 1);
28,999,215✔
585
        QUERY_CHECK_NULL(current.val, code, lino, _end, terrno);
28,999,215✔
586
        taosGetLinearInterpolationVal(&current, pLinearInfo->type, &start, &end, pLinearInfo->type,
28,999,215✔
587
                                      typeGetTypeModFromColInfo(&pDst->info));
28,999,215✔
588
        code = colDataSetVal(pDst, rows, (char*)current.val, false);
28,999,215✔
589
        QUERY_CHECK_CODE(code, lino, _end);
28,999,215✔
590

591
        taosMemoryFree(current.val);
28,999,215✔
592
        break;
28,999,215✔
593
      }
594
      case TSDB_FILL_NONE:
×
595
      default:
596
        break;
×
597
    }
598
  }
599

600
  if (hasInterp) {
697,485,180✔
601
    pResBlock->info.rows += 1;
697,234,239✔
602
  }
603

604
_end:
697,484,669✔
605
  if (code != TSDB_CODE_SUCCESS) {
697,484,669✔
606
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
940✔
607
    pTaskInfo->code = code;
940✔
608
    T_LONG_JMP(pTaskInfo->env, code);
×
609
  }
610
  return hasInterp;
697,483,729✔
611
}
612

613
static int32_t addCurrentRowToResult(STimeSliceOperatorInfo* pSliceInfo, SExprSupp* pExprSup, SSDataBlock* pResBlock,
4,028,122✔
614
                                     SSDataBlock* pSrcBlock, int32_t index) {
615
  int32_t code = TSDB_CODE_SUCCESS;
4,028,122✔
616
  int32_t lino = 0;
4,028,122✔
617
  code = timeSliceEnsureBlockCapacity(pSliceInfo, pResBlock);
4,028,122✔
618
  QUERY_CHECK_CODE(code, lino, _end);
4,028,122✔
619
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
12,599,857✔
620
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
8,571,735✔
621

622
    int32_t          dstSlot = pExprInfo->base.resSchema.slotId;
8,571,735✔
623
    SColumnInfoData* pDst = taosArrayGet(pResBlock->pDataBlock, dstSlot);
8,571,735✔
624

625
    if (isIrowtsPseudoColumn(pExprInfo) || isIrowtsOriginPseudoColumn(pExprInfo)) {
8,571,735✔
626
      code = colDataSetVal(pDst, pResBlock->info.rows, (char*)&pSliceInfo->current, false);
2,310,553✔
627
      QUERY_CHECK_CODE(code, lino, _end);
2,310,553✔
628
    } else if (isIsfilledPseudoColumn(pExprInfo)) {
6,261,182✔
629
      bool isFilled = false;
1,328,234✔
630
      code = colDataSetVal(pDst, pResBlock->info.rows, (char*)&isFilled, false);
1,328,234✔
631
      QUERY_CHECK_CODE(code, lino, _end);
1,328,234✔
632
    } else {
633
      int32_t          srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
4,932,948✔
634
      SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
4,932,948✔
635

636
      if (colDataIsNull_s(pSrc, index)) {
9,865,896✔
637
        colDataSetNULL(pDst, pResBlock->info.rows);
1,183,152✔
638
        continue;
1,183,152✔
639
      }
640

641
      char* v = colDataGetData(pSrc, index);
3,749,796✔
642
      code = colDataSetVal(pDst, pResBlock->info.rows, v, false);
3,749,796✔
643
      QUERY_CHECK_CODE(code, lino, _end);
3,749,796✔
644
    }
645
  }
646

647
  pResBlock->info.rows += 1;
4,028,122✔
648

649
_end:
4,028,122✔
650
  if (code != TSDB_CODE_SUCCESS) {
4,028,122✔
651
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
652
  }
653
  return code;
4,028,122✔
654
}
655

656
static int32_t initPrevRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
11,867,277✔
657
  int32_t code = TSDB_CODE_SUCCESS;
11,867,277✔
658
  int32_t lino = 0;
11,867,277✔
659
  if (pInfo->pPrevRow != NULL) {
11,867,277✔
660
    return TSDB_CODE_SUCCESS;
8,333,947✔
661
  }
662

663
  pInfo->pPrevRow = taosArrayInit(4, sizeof(SGroupKeys));
3,533,836✔
664
  if (pInfo->pPrevRow == NULL) {
3,533,836✔
665
    return terrno;
×
666
  }
667

668
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
3,533,836✔
669
  for (int32_t i = 0; i < numOfCols; ++i) {
13,730,411✔
670
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
10,196,575✔
671

672
    SGroupKeys key = {0};
10,196,575✔
673
    key.bytes = pColInfo->info.bytes;
10,196,575✔
674
    key.type = pColInfo->info.type;
10,196,575✔
675
    key.isNull = false;
10,196,575✔
676
    key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
10,196,575✔
677
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
10,196,575✔
678
    void* tmp = taosArrayPush(pInfo->pPrevRow, &key);
10,196,575✔
679
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
10,196,575✔
680
  }
681

682
  pInfo->isPrevRowSet = false;
3,533,836✔
683

684
_end:
3,533,836✔
685
  if (code != TSDB_CODE_SUCCESS) {
3,533,836✔
686
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
687
  }
688
  return code;
3,533,836✔
689
}
690

691
static int32_t initNextRowsKeeper(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
11,867,783✔
692
  int32_t code = TSDB_CODE_SUCCESS;
11,867,783✔
693
  int32_t lino = 0;
11,867,783✔
694
  if (pInfo->pNextRow != NULL) {
11,867,783✔
695
    return TSDB_CODE_SUCCESS;
8,333,947✔
696
  }
697

698
  pInfo->pNextRow = taosArrayInit(4, sizeof(SGroupKeys));
3,533,836✔
699
  if (pInfo->pNextRow == NULL) {
3,533,325✔
700
    return terrno;
×
701
  }
702

703
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
3,533,325✔
704
  for (int32_t i = 0; i < numOfCols; ++i) {
13,730,411✔
705
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
10,196,575✔
706

707
    SGroupKeys key = {0};
10,196,575✔
708
    key.bytes = pColInfo->info.bytes;
10,196,575✔
709
    key.type = pColInfo->info.type;
10,196,575✔
710
    key.isNull = false;
10,196,575✔
711
    key.pData = taosMemoryCalloc(1, pColInfo->info.bytes);
10,196,575✔
712
    QUERY_CHECK_NULL(key.pData, code, lino, _end, terrno);
10,196,575✔
713

714
    void* tmp = taosArrayPush(pInfo->pNextRow, &key);
10,196,575✔
715
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
10,196,575✔
716
  }
717

718
  pInfo->isNextRowSet = false;
3,533,836✔
719

720
_end:
3,533,836✔
721
  if (code != TSDB_CODE_SUCCESS) {
3,533,836✔
722
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
723
  }
724
  return code;
3,533,836✔
725
}
726

727
static int32_t initFillLinearInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock) {
11,867,783✔
728
  int32_t code = TSDB_CODE_SUCCESS;
11,867,783✔
729
  int32_t lino = 0;
11,867,783✔
730
  if (pInfo->pLinearInfo != NULL) {
11,867,783✔
731
    return TSDB_CODE_SUCCESS;
8,333,947✔
732
  }
733

734
  pInfo->pLinearInfo = taosArrayInit(4, sizeof(SFillLinearInfo));
3,533,836✔
735
  if (pInfo->pLinearInfo == NULL) {
3,533,836✔
736
    return terrno;
×
737
  }
738

739
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
3,533,325✔
740
  for (int32_t i = 0; i < numOfCols; ++i) {
13,729,900✔
741
    SColumnInfoData* pColInfo = taosArrayGet(pBlock->pDataBlock, i);
10,196,064✔
742

743
    SFillLinearInfo linearInfo = {0};
10,196,064✔
744
    linearInfo.start.key = INT64_MIN;
10,196,064✔
745
    linearInfo.end.key = INT64_MIN;
10,196,064✔
746
    linearInfo.start.val = taosMemoryCalloc(1, pColInfo->info.bytes);
10,196,064✔
747
    QUERY_CHECK_NULL(linearInfo.start.val, code, lino, _end, terrno);
10,196,575✔
748

749
    linearInfo.end.val = taosMemoryCalloc(1, pColInfo->info.bytes);
10,196,575✔
750
    QUERY_CHECK_NULL(linearInfo.end.val, code, lino, _end, terrno);
10,196,064✔
751
    linearInfo.isStartSet = false;
10,196,064✔
752
    linearInfo.isEndSet = false;
10,196,064✔
753
    linearInfo.type = pColInfo->info.type;
10,196,064✔
754
    linearInfo.bytes = pColInfo->info.bytes;
10,196,575✔
755
    void* tmp = taosArrayPush(pInfo->pLinearInfo, &linearInfo);
10,196,064✔
756
    QUERY_CHECK_NULL(tmp, code, lino, _end, terrno);
10,196,575✔
757
  }
758

759
_end:
3,533,836✔
760
  if (code != TSDB_CODE_SUCCESS) {
3,533,836✔
761
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
762
  }
763
  return code;
3,533,836✔
764
}
765

766
static void destroyGroupKey(void* pKey) {
71,172✔
767
  SGroupKeys* key = (SGroupKeys*)pKey;
71,172✔
768
  if (key->pData != NULL) {
71,172✔
769
    taosMemoryFreeClear(key->pData);
71,172✔
770
  }
771
}
71,172✔
772

773
static int32_t initGroupKeyKeeper(STimeSliceOperatorInfo* pInfo, SExprSupp* pExprSup) {
11,867,783✔
774
  if (pInfo->pPrevGroupKeys != NULL) {
11,867,783✔
775
    return TSDB_CODE_SUCCESS;
8,333,947✔
776
  }
777

778
  pInfo->pPrevGroupKeys = taosArrayInit(pExprSup->numOfExprs, sizeof(SGroupKeys));
3,533,836✔
779
  if (pInfo->pPrevGroupKeys == NULL) {
3,533,836✔
780
    return terrno;
×
781
  }
782

783
  for (int32_t i = 0; i < pExprSup->numOfExprs; ++i) {
19,914,427✔
784
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[i];
16,381,102✔
785

786
    if (isGroupKeyFunc(pExprInfo)) {
16,381,102✔
787
      SGroupKeys key = {.bytes = pExprInfo->base.resSchema.bytes,
142,344✔
788
                        .type = pExprInfo->base.resSchema.type,
71,172✔
789
                        .isNull = false,
790
                        .pData = taosMemoryCalloc(1, pExprInfo->base.resSchema.bytes)};
71,172✔
791
      if (!key.pData) {
71,172✔
792
        taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey);
×
793
        pInfo->pPrevGroupKeys = NULL;
×
794
        return terrno;
×
795
      }
796
      if (NULL == taosArrayPush(pInfo->pPrevGroupKeys, &key)) {
142,344✔
797
        taosMemoryFree(key.pData);
×
798
        taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey);
×
799
        pInfo->pPrevGroupKeys = NULL;
×
800
        return terrno;
×
801
      }
802
    }
803
  }
804

805
  return TSDB_CODE_SUCCESS;
3,533,325✔
806
}
807

808
static int32_t initKeeperInfo(STimeSliceOperatorInfo* pInfo, SSDataBlock* pBlock, SExprSupp* pExprSup) {
11,867,783✔
809
  int32_t code;
810
  code = initPrevRowsKeeper(pInfo, pBlock);
11,867,783✔
811
  if (code != TSDB_CODE_SUCCESS) {
11,867,783✔
812
    return TSDB_CODE_FAILED;
×
813
  }
814

815
  code = initNextRowsKeeper(pInfo, pBlock);
11,867,783✔
816
  if (code != TSDB_CODE_SUCCESS) {
11,867,783✔
817
    return TSDB_CODE_FAILED;
×
818
  }
819

820
  code = initFillLinearInfo(pInfo, pBlock);
11,867,783✔
821
  if (code != TSDB_CODE_SUCCESS) {
11,867,783✔
822
    return TSDB_CODE_FAILED;
×
823
  }
824

825
  code = initGroupKeyKeeper(pInfo, pExprSup);
11,867,783✔
826
  if (code != TSDB_CODE_SUCCESS) {
11,867,272✔
827
    return TSDB_CODE_FAILED;
×
828
  }
829

830
  return TSDB_CODE_SUCCESS;
11,867,272✔
831
}
832

833
static void resetPrevRowsKeeper(STimeSliceOperatorInfo* pInfo) {
223,456✔
834
  if (pInfo->pPrevRow == NULL) {
223,456✔
835
    return;
×
836
  }
837

838
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
963,510✔
839
    SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);
740,054✔
840
    pKey->isNull = false;
740,054✔
841
  }
842

843
  pInfo->isPrevRowSet = false;
223,456✔
844

845
  return;
223,456✔
846
}
847

848
static void resetNextRowsKeeper(STimeSliceOperatorInfo* pInfo) {
223,456✔
849
  if (pInfo->pNextRow == NULL) {
223,456✔
850
    return;
×
851
  }
852

853
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
963,510✔
854
    SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);
740,054✔
855
    pKey->isNull = false;
740,054✔
856
  }
857

858
  pInfo->isNextRowSet = false;
223,456✔
859

860
  return;
223,456✔
861
}
862

863
static void resetFillLinearInfo(STimeSliceOperatorInfo* pInfo) {
223,456✔
864
  if (pInfo->pLinearInfo == NULL) {
223,456✔
865
    return;
×
866
  }
867

868
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
963,510✔
869
    SFillLinearInfo* pLinearInfo = taosArrayGet(pInfo->pLinearInfo, i);
740,054✔
870
    pLinearInfo->start.key = INT64_MIN;
740,054✔
871
    pLinearInfo->end.key = INT64_MIN;
740,054✔
872
    pLinearInfo->isStartSet = false;
740,054✔
873
    pLinearInfo->isEndSet = false;
740,054✔
874
  }
875

876
  return;
223,456✔
877
}
878

879
static void resetKeeperInfo(STimeSliceOperatorInfo* pInfo) {
223,456✔
880
  resetPrevRowsKeeper(pInfo);
223,456✔
881
  resetNextRowsKeeper(pInfo);
223,456✔
882
  resetFillLinearInfo(pInfo);
223,456✔
883
}
223,456✔
884

885
static bool checkThresholdReached(STimeSliceOperatorInfo* pSliceInfo, int32_t threshold) {
287,708,434✔
886
  SSDataBlock* pResBlock = pSliceInfo->pRes;
287,708,434✔
887
  if (pResBlock->info.rows > threshold) {
287,777,282✔
888
    return true;
22,062✔
889
  }
890

891
  return false;
287,753,856✔
892
}
893

894
static bool checkWindowBoundReached(STimeSliceOperatorInfo* pSliceInfo) {
313,537,507✔
895
  if (pSliceInfo->current > pSliceInfo->win.ekey) {
313,537,507✔
896
    return true;
13,890,133✔
897
  }
898

899
  return false;
299,647,027✔
900
}
901

902
static void saveBlockStatus(STimeSliceOperatorInfo* pSliceInfo, SSDataBlock* pBlock, int32_t curIndex) {
11,031✔
903
  SSDataBlock* pResBlock = pSliceInfo->pRes;
11,031✔
904

905
  SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->tsCol.slotId);
11,031✔
906
  if (curIndex < pBlock->info.rows - 1) {
11,031✔
907
    pSliceInfo->pRemainRes = pBlock;
11,031✔
908
    pSliceInfo->remainIndex = curIndex + 1;
11,031✔
909
    return;
11,031✔
910
  }
911

912
  // all data in remaining block processed
913
  pSliceInfo->pRemainRes = NULL;
×
914
}
915

916
/**
917
  @brief set the 'get param' for the downstream operator to notify the prev/next
918
  scan when the current timestamp is reached the notifyTs. Here we use the 'get
919
  param' to notify the downstream because the notification is going to impact
920
  the data query flow.
921
  @param pOperator: the current operator(parent operator)
922
  @param notifyTs: the timestamp to notify the downstream operator
923
*/
924
static int32_t setDownstreamOpGetParam(SOperatorInfo* pOperator,
8,417,752✔
925
                                       TSKEY notifyTs) {
926
  int32_t code = TSDB_CODE_SUCCESS;
8,417,752✔
927
  int32_t lino = 0;
8,417,752✔
928

929
  if (pOperator->pDownstreamGetParams == NULL) {
8,417,752✔
930
    pOperator->pDownstreamGetParams =
3,110,202✔
931
      taosMemoryCalloc(pOperator->numOfDownstream, POINTER_BYTES);
3,109,691✔
932
    QUERY_CHECK_NULL(pOperator->pDownstreamGetParams, code, lino, _end,
3,110,202✔
933
                     terrno);
934
  }
935

936
  for (int32_t i = 0; i < pOperator->numOfDownstream; ++i) {
16,835,075✔
937
    SOperatorInfo* pDownstream = pOperator->pDownstream[i];
8,417,752✔
938
    if (pDownstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN &&
8,417,241✔
939
        pDownstream->operatorType != QUERY_NODE_PHYSICAL_PLAN_EXCHANGE) {
5,294,158✔
940
      /**
941
        Only table scan and exchange operator are supported right now.
942
      */
943
      qWarn("%s, %s only table scan and exchange operators are supported "
1,096,594✔
944
             "for notify right now, but got %d, skip notify step done",
945
             GET_TASKID(pOperator->pTaskInfo), __func__,
946
             pDownstream->operatorType);
947
      continue;
1,096,594✔
948
    }
949
    SOperatorParam* pParam = pOperator->pDownstreamGetParams[i];
7,320,729✔
950
    if (pParam == NULL) {
7,320,647✔
951
      pParam = (SOperatorParam*)taosMemoryCalloc(1, sizeof(SOperatorParam));
7,320,647✔
952
      QUERY_CHECK_NULL(pParam, code, lino, _end, terrno);
7,321,158✔
953
    }
954

955
    if (pParam->value == NULL) {
7,321,158✔
956
      void* tsParam = NULL;
7,321,158✔
957
      switch (pDownstream->operatorType) {
7,321,158✔
958
        case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
3,123,594✔
959
          tsParam = taosMemoryCalloc(1, sizeof(STableScanOperatorParam));
3,123,594✔
960
          QUERY_CHECK_NULL(tsParam, code, lino, _end, terrno);
3,122,654✔
961
          break;
3,122,654✔
962
        }
963
        case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
4,197,564✔
964
          tsParam = taosMemoryCalloc(1, sizeof(SExchangeOperatorParam));
4,197,564✔
965
          QUERY_CHECK_NULL(tsParam, code, lino, _end, terrno);
4,197,564✔
966
          break;
4,197,564✔
967
        }
968
        default:
×
969
          break;
×
970
      }
971
      pParam->value = tsParam;
7,320,218✔
972
    }
973

974
    switch (pDownstream->operatorType) {
7,321,158✔
975
      case QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN: {
3,123,083✔
976
        STableScanOperatorParam* p = (STableScanOperatorParam*)pParam->value;
3,123,083✔
977
        p->paramType = NOTIFY_TYPE_SCAN_PARAM;
3,123,594✔
978
        p->notifyToProcess = true;
3,122,654✔
979
        p->notifyTs = notifyTs;
3,123,594✔
980
        break;
3,122,654✔
981
      }
982
      case QUERY_NODE_PHYSICAL_PLAN_EXCHANGE: {
4,197,564✔
983
        SExchangeOperatorParam* p = (SExchangeOperatorParam*)pParam->value;
4,197,564✔
984
        p->multiParams = false;
4,197,564✔
985
        p->basic.paramType = NOTIFY_TYPE_EXCHANGE_PARAM;
4,197,564✔
986
        p->basic.notifyTs = notifyTs;
4,197,564✔
987
        break;
4,197,564✔
988
      }
989
      default: {
511✔
990
        /**
991
          Only table scan and exchange operator are supported right now.
992
        */
993
        qWarn("%s, %s only table scan and exchange operators are supported "
511✔
994
               "for notify right now, but got %d, skip notify step done",
995
               GET_TASKID(pOperator->pTaskInfo), __func__,
996
               pDownstream->operatorType);
997
        continue;
×
998
      }
999
    }
1000

1001
    pParam->opType = pDownstream->operatorType;
7,320,218✔
1002
    pParam->downstreamIdx = i;
7,320,647✔
1003
    pParam->pChildren = NULL;
7,321,158✔
1004
    pParam->reUse = true;
7,321,158✔
1005
    pOperator->pDownstreamGetParams[i] = pParam;
7,320,218✔
1006
  }
1007

1008
_end:
8,417,241✔
1009
  if (code != TSDB_CODE_SUCCESS) {
8,417,241✔
1010
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1011
  }
1012
  return code;
8,417,323✔
1013
}
1014

1015
static int64_t getNextTimestamp(int64_t current, SInterval* pInterval) {
519,394,412✔
1016
  return taosTimeAdd(current, pInterval->interval,
519,394,928✔
1017
                     pInterval->intervalUnit, pInterval->precision, NULL);
519,394,412✔
1018
}
1019

1020
static void doTimesliceImpl(SOperatorInfo* pOperator,
11,867,783✔
1021
                            STimeSliceOperatorInfo* pSliceInfo,
1022
                            SSDataBlock* pBlock, SExecTaskInfo* pTaskInfo,
1023
                            bool ignoreNull) {
1024
  int32_t      code = TSDB_CODE_SUCCESS;
11,867,783✔
1025
  int32_t      lino = 0;
11,867,783✔
1026
  SSDataBlock* pResBlock = pSliceInfo->pRes;
11,867,783✔
1027
  SInterval*   pInterval = &pSliceInfo->interval;
11,867,783✔
1028

1029
  SColumnInfoData* pTsCol = taosArrayGet(pBlock->pDataBlock,
11,867,783✔
1030
                                         pSliceInfo->tsCol.slotId);
11,867,783✔
1031
  SColumnInfoData* pPkCol = NULL;
11,867,783✔
1032

1033
  if (pSliceInfo->hasPk) {
11,867,783✔
1034
    pPkCol = taosArrayGet(pBlock->pDataBlock, pSliceInfo->pkCol.slotId);
103,180✔
1035
  }
1036

1037
  int32_t i = (pSliceInfo->pRemainRes == NULL) ? 0 : pSliceInfo->remainIndex;
11,867,783✔
1038
  for (; i < pBlock->info.rows; ++i) {
2,147,483,647✔
1039
    int64_t ts = *(int64_t*)colDataGetData(pTsCol, i);
2,147,483,647✔
1040

1041
    if (checkNullRow(&pOperator->exprSupp, pBlock, i, ignoreNull)) {
2,147,483,647✔
1042
      continue;
153,385,185✔
1043
    }
1044

1045
    if ((!pSliceInfo->prevNotified && ts < pSliceInfo->win.skey) ||
2,147,483,647✔
1046
        (!pSliceInfo->nextNotified && ts > pSliceInfo->win.ekey)) {
2,147,483,647✔
1047
      code = setDownstreamOpGetParam(pOperator, ts);
4,233,141✔
1048
      QUERY_CHECK_CODE(code, lino, _end);
4,232,360✔
1049
      if (ts < pSliceInfo->win.skey) {
4,232,360✔
1050
        pSliceInfo->prevNotified = true;
2,475,742✔
1051
      } else {
1052
        pSliceInfo->nextNotified = true;
1,757,047✔
1053
      }
1054
    }
1055

1056
    EInvalidTimestampReason invalidReason = isInvalidTimestamp(pSliceInfo, ts,
2,147,483,647✔
1057
                                                               pPkCol, i);
1058
    if (invalidReason != INVALID_TIMESTAMP_REASON_NONE) {
2,147,483,647✔
1059
      if (invalidReason == INVALID_TIMESTAMP_REASON_PREV_TS_EQUAL) {
290,894,659✔
1060
        continue;
285,914,414✔
1061
      } else if (invalidReason == INVALID_TIMESTAMP_REASON_PREV_TS_SMALLER) {
4,980,245✔
1062
        break;
4,980,245✔
1063
      }
1064
    }
1065

1066
    if (ts == pSliceInfo->current) {
2,147,483,647✔
1067
      code = addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp,
936,204✔
1068
                                   pResBlock, pBlock, i);
1069
      QUERY_CHECK_CODE(code, lino, _end);
936,204✔
1070

1071
      doKeepPrevRows(pSliceInfo, pBlock, i);
936,204✔
1072
      doKeepLinearInfo(pSliceInfo, pBlock, i);
936,204✔
1073

1074
      pSliceInfo->current = getNextTimestamp(pSliceInfo->current, pInterval);
936,204✔
1075
      if (checkWindowBoundReached(pSliceInfo)) {
936,204✔
1076
        break;
78,770✔
1077
      }
1078
      if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
857,434✔
1079
        saveBlockStatus(pSliceInfo, pBlock, i);
×
1080
        return;
×
1081
      }
1082
    } else if (ts < pSliceInfo->current) {
2,147,483,647✔
1083
      doKeepPrevRows(pSliceInfo, pBlock, i);
2,147,483,647✔
1084
      doKeepLinearInfo(pSliceInfo, pBlock, i);
2,147,483,647✔
1085
    } else {  /* ts > pSliceInfo->current */
1086
      doKeepNextRows(pSliceInfo, pBlock, i);
280,489,916✔
1087
      doKeepLinearInfo(pSliceInfo, pBlock, i);
280,492,740✔
1088

1089
      while (pSliceInfo->current < ts &&
795,835,257✔
1090
             pSliceInfo->current <= pSliceInfo->win.ekey) {
516,108,215✔
1091
        if (!genInterpolationResult(pSliceInfo, &pOperator->exprSupp,
515,370,281✔
1092
                                    pResBlock, pBlock, i, true, pTaskInfo) &&
247,080✔
1093
            pSliceInfo->fillType == TSDB_FILL_LINEAR) {
247,080✔
1094
          break;
1095
        } else {
1096
          pSliceInfo->current = getNextTimestamp(pSliceInfo->current,
515,367,741✔
1097
                                                 pInterval);
1098
        }
1099
      }
1100

1101
      // add current row if timestamp matches
1102
      if (ts == pSliceInfo->current &&
280,409,275✔
1103
          pSliceInfo->current <= pSliceInfo->win.ekey) {
3,140,761✔
1104
        code = addCurrentRowToResult(pSliceInfo, &pOperator->exprSupp,
3,091,918✔
1105
                                     pResBlock, pBlock, i);
1106
        QUERY_CHECK_CODE(code, lino, _end);
3,091,918✔
1107

1108
        pSliceInfo->current = getNextTimestamp(pSliceInfo->current, pInterval);
3,091,918✔
1109
      }
1110
      doKeepPrevRows(pSliceInfo, pBlock, i);
280,488,653✔
1111

1112
      if (checkWindowBoundReached(pSliceInfo)) {
280,497,339✔
1113
        break;
2,681,548✔
1114
      }
1115
      if (checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
277,814,422✔
1116
        saveBlockStatus(pSliceInfo, pBlock, i);
11,031✔
1117
        return;
11,031✔
1118
      }
1119
    }
1120
  }
1121

1122
  // if reached here, meaning block processing finished naturally,
1123
  // or interpolation reach window upper bound
1124
  pSliceInfo->pRemainRes = NULL;
11,723,147✔
1125

1126
_end:
11,856,752✔
1127
  if (code != TSDB_CODE_SUCCESS) {
11,856,752✔
1128
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1129
    pTaskInfo->code = code;
×
1130
    T_LONG_JMP(pTaskInfo->env, code);
×
1131
  }
1132
}
1133

1134
static void genInterpAfterDataBlock(STimeSliceOperatorInfo* pSliceInfo, SOperatorInfo* pOperator, int32_t index) {
3,766,037✔
1135
  SSDataBlock* pResBlock = pSliceInfo->pRes;
3,766,037✔
1136
  SInterval*   pInterval = &pSliceInfo->interval;
3,766,037✔
1137

1138
  if (pSliceInfo->fillType == TSDB_FILL_NEXT || pSliceInfo->fillType == TSDB_FILL_LINEAR ||
3,766,037✔
1139
      pSliceInfo->pPrevGroupKeys == NULL) {
3,400,258✔
1140
    return;
374,614✔
1141
  }
1142

1143
  while (pSliceInfo->current <= pSliceInfo->win.ekey) {
185,725,562✔
1144
    (void)genInterpolationResult(pSliceInfo, &pOperator->exprSupp, pResBlock, NULL, index, false, pOperator->pTaskInfo);
182,334,139✔
1145
    pSliceInfo->current =
182,334,568✔
1146
        taosTimeAdd(pSliceInfo->current, pInterval->interval, pInterval->intervalUnit, pInterval->precision, NULL);
182,334,139✔
1147
  }
1148
}
1149

1150
static int32_t copyPrevGroupKey(SExprSupp* pExprSup, SArray* pGroupKeys, SSDataBlock* pSrcBlock) {
11,867,783✔
1151
  int32_t groupKeyIdx = 0;
11,867,783✔
1152
  for (int32_t j = 0; j < pExprSup->numOfExprs; ++j) {
67,766,150✔
1153
    SExprInfo* pExprInfo = &pExprSup->pExprInfo[j];
55,898,367✔
1154

1155
    if (isGroupKeyFunc(pExprInfo)) {
55,898,367✔
1156
      int32_t     srcSlot = pExprInfo->base.pParam[0].pCol->slotId;
1,262,526✔
1157
      SGroupKeys* pGroupKey = taosArrayGet(pGroupKeys, groupKeyIdx);
1,262,526✔
1158
      if (pGroupKey == NULL) {
1,262,526✔
1159
        return terrno;
×
1160
      }
1161
      groupKeyIdx++;
1,262,526✔
1162
      SColumnInfoData* pSrc = taosArrayGet(pSrcBlock->pDataBlock, srcSlot);
1,262,526✔
1163

1164
      if (colDataIsNull_s(pSrc, 0)) {
1,262,526✔
1165
        pGroupKey->isNull = true;
×
1166
        break;
×
1167
      }
1168

1169
      char* v = colDataGetData(pSrc, 0);
1,262,526✔
1170
      if (IS_VAR_DATA_TYPE(pGroupKey->type)) {
1,262,526✔
1171
        if (IS_STR_DATA_BLOB(pGroupKey->type)) {
1,173,732✔
1172
          memcpy(pGroupKey->pData, v, blobDataTLen(v));
×
1173
        } else {
1174
          memcpy(pGroupKey->pData, v, varDataTLen(v));
1,173,732✔
1175
        }
1176
      } else {
1177
        memcpy(pGroupKey->pData, v, pGroupKey->bytes);
88,794✔
1178
      }
1179

1180
      pGroupKey->isNull = false;
1,262,526✔
1181
    }
1182
  }
1183
  return TSDB_CODE_SUCCESS;
11,867,783✔
1184
}
1185

1186
static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
223,456✔
1187
  pSliceInfo->current = pSliceInfo->win.skey;
223,456✔
1188
  pSliceInfo->prevTsSet = false;
223,456✔
1189
  pSliceInfo->prevNotified = false;
223,456✔
1190
  pSliceInfo->nextNotified = false;
223,456✔
1191
  resetKeeperInfo(pSliceInfo);
223,456✔
1192
}
223,456✔
1193

1194
static void doHandleTimeslice(SOperatorInfo* pOperator, SSDataBlock* pBlock) {
16,052,746✔
1195
  int32_t code = TSDB_CODE_SUCCESS;
16,052,746✔
1196
  int32_t lino = 0;
16,052,746✔
1197
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
16,052,746✔
1198

1199
  STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
16,052,746✔
1200
  SExprSupp*              pSup = &pOperator->exprSupp;
16,052,317✔
1201
  bool                    ignoreNull = getIgoreNullRes(pSup);
16,052,746✔
1202
  int32_t                 order = TSDB_ORDER_ASC;
16,052,746✔
1203

1204
  if (checkWindowBoundReached(pSliceInfo)) {
16,052,746✔
1205
    code = setDownstreamOpGetParam(pOperator, pSliceInfo->win.ekey + 1);
4,184,534✔
1206
    QUERY_CHECK_CODE(code, lino, _end);
4,184,963✔
1207
    goto _end;
4,184,963✔
1208
  }
1209

1210
  code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
11,867,783✔
1211
  QUERY_CHECK_CODE(code, lino, _end);
11,867,272✔
1212

1213
  if (pSliceInfo->scalarSup.pExprInfo != NULL) {
11,867,272✔
1214
    SExprSupp* pExprSup = &pSliceInfo->scalarSup;
105,321✔
1215
    code = projectApplyFunctions(pExprSup->pExprInfo, pBlock, pBlock,
105,321✔
1216
                                 pExprSup->pCtx, pExprSup->numOfExprs, NULL,
1217
                                 GET_STM_RTINFO(pOperator->pTaskInfo));
105,321✔
1218
    QUERY_CHECK_CODE(code, lino, _end);
105,321✔
1219
  }
1220

1221
  // the pDataBlock are always the same one, no need to call this again
1222
  code = setInputDataBlock(pSup, pBlock, order, MAIN_SCAN, true);
11,867,272✔
1223
  QUERY_CHECK_CODE(code, lino, _end);
11,867,783✔
1224
  doTimesliceImpl(pOperator, pSliceInfo, pBlock, pTaskInfo, ignoreNull);
11,867,783✔
1225
  code = copyPrevGroupKey(&pOperator->exprSupp, pSliceInfo->pPrevGroupKeys, pBlock);
11,867,783✔
1226
  QUERY_CHECK_CODE(code, lino, _end);
11,867,783✔
1227

1228
_end:
11,867,783✔
1229
  if (code != TSDB_CODE_SUCCESS) {
16,052,746✔
1230
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1231
    T_LONG_JMP(pTaskInfo->env, code);
×
1232
  }
1233
}
16,052,746✔
1234

1235
static int32_t doTimesliceNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
7,218,450✔
1236
  int32_t        code = TSDB_CODE_SUCCESS;
7,218,450✔
1237
  int32_t        lino = 0;
7,218,450✔
1238
  SExecTaskInfo* pTaskInfo = pOperator->pTaskInfo;
7,218,450✔
1239
  STimeSliceOperatorInfo* pSliceInfo = pOperator->info;
7,218,450✔
1240
  if (pOperator->status == OP_EXEC_DONE) {
7,218,450✔
1241
    (*ppRes) = NULL;
855,631✔
1242
    return code;
855,631✔
1243
  } else if (pOperator->status == OP_NOT_OPENED && pSliceInfo->pWin) {
6,362,819✔
1244
    code = streamCalcCurrWinTimeRange((STimeRangeNode*)pSliceInfo->pWin,
170,856✔
1245
                                       &pTaskInfo->pStreamRuntimeInfo->funcInfo,
85,428✔
1246
                                       &pSliceInfo->win, NULL, 3);
1247
    QUERY_CHECK_CODE(code, lino, _finished);
85,428✔
1248
    OPTR_SET_OPENED(pOperator);    
85,428✔
1249
    pSliceInfo->current = pSliceInfo->win.skey;
85,428✔
1250
  }
1251

1252
  SSDataBlock* pResBlock = pSliceInfo->pRes;
6,362,819✔
1253
  blockDataCleanup(pResBlock);
6,362,819✔
1254

1255
  if (IS_STREAM_MODE(pTaskInfo)) {
6,362,819✔
1256
    /**
1257
      For stream calculation, the interp operator is triggered by the window,
1258
      so we need to reset the notified status for each window.
1259
    */
1260
    pSliceInfo->prevNotified = false;
173,907✔
1261
    pSliceInfo->nextNotified = false;
173,907✔
1262
  }
1263

1264
  while (1) {
1265
    if (pSliceInfo->pNextGroupRes != NULL) {
6,525,707✔
1266
      doHandleTimeslice(pOperator, pSliceInfo->pNextGroupRes);
222,169✔
1267
      if (checkWindowBoundReached(pSliceInfo) ||
321,236✔
1268
          checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
99,067✔
1269
        code = doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
123,102✔
1270
        QUERY_CHECK_CODE(code, lino, _finished);
123,102✔
1271
        if (pSliceInfo->pRemainRes == NULL) {
123,102✔
1272
          pSliceInfo->pNextGroupRes = NULL;
123,102✔
1273
        }
1274
        if (pResBlock->info.rows != 0) {
123,102✔
1275
          goto _finished;
84,174✔
1276
        } else {
1277
          // after fillter if result block has 0 rows, go back to
1278
          // process pNextGroupRes again for unfinished data
1279
          continue;
38,928✔
1280
        }
1281
      }
1282
      pSliceInfo->pNextGroupRes = NULL;
99,067✔
1283
    }
1284

1285
    while (1) {
13,194,009✔
1286
      SSDataBlock* pBlock = pSliceInfo->pRemainRes ?
19,596,614✔
1287
        pSliceInfo->pRemainRes : getNextBlockFromDownstream(pOperator, 0);
19,596,614✔
1288
      if (pBlock == NULL) {
19,596,614✔
1289
        setOperatorCompleted(pOperator);
3,542,581✔
1290
        break;
3,542,581✔
1291
      }
1292
      printDataBlock(pBlock, "doTimesliceNext",
32,108,066✔
1293
                    GET_TASKID(pOperator->pTaskInfo),
16,054,033✔
1294
                    pOperator->pTaskInfo->id.queryId);
16,054,033✔
1295

1296
      pResBlock->info.scanFlag = pBlock->info.scanFlag;
16,053,604✔
1297
      if (pSliceInfo->groupId == 0 && pBlock->info.id.groupId != 0) {
16,054,033✔
1298
        pSliceInfo->groupId = pBlock->info.id.groupId;
80,538✔
1299
      } else {
1300
        if (pSliceInfo->groupId != pBlock->info.id.groupId) {
15,973,066✔
1301
          pSliceInfo->groupId = pBlock->info.id.groupId;
223,456✔
1302
          pSliceInfo->pNextGroupRes = pBlock;
223,456✔
1303
          break;
223,456✔
1304
        }
1305
      }
1306

1307
      doHandleTimeslice(pOperator, pBlock);
15,830,577✔
1308
      if (checkWindowBoundReached(pSliceInfo) ||
24,838,975✔
1309
          checkThresholdReached(pSliceInfo, pOperator->resultInfo.threshold)) {
9,008,398✔
1310
        code = doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
6,833,210✔
1311
        QUERY_CHECK_CODE(code, lino, _finished);
6,833,210✔
1312
        if (pResBlock->info.rows != 0) {
6,833,210✔
1313
          goto _finished;
2,636,568✔
1314
        }
1315
      }
1316
    }
1317
    // post work for a specific group
1318

1319
    // check if need to interpolate after last datablock
1320
    // except for fill(next), fill(linear)
1321
    genInterpAfterDataBlock(pSliceInfo, pOperator, 0);
3,766,037✔
1322

1323
    code = doFilter(pResBlock, pOperator->exprSupp.pFilterInfo, NULL, NULL);
3,766,037✔
1324
    QUERY_CHECK_CODE(code, lino, _finished);
3,766,037✔
1325
    if (pOperator->status == OP_EXEC_DONE) {
3,766,037✔
1326
      break;
3,542,581✔
1327
    }
1328

1329
    // restore initial value for next group
1330
    resetTimesliceInfo(pSliceInfo);
223,456✔
1331
    if (pResBlock->info.rows != 0) {
223,456✔
1332
      break;
99,496✔
1333
    }
1334
  }
1335

1336
_finished:
6,362,819✔
1337
  // restore the value
1338
  setTaskStatus(pOperator->pTaskInfo, TASK_COMPLETED);
6,362,819✔
1339
  if (pResBlock->info.rows == 0) {
6,362,819✔
1340
    pOperator->status = OP_EXEC_DONE;
2,683,947✔
1341
  }
1342
  if (code != TSDB_CODE_SUCCESS) {
6,362,819✔
1343
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1344
    pTaskInfo->code = code;
×
1345
    T_LONG_JMP(pTaskInfo->env, code);
×
1346
  }
1347

1348
  (*ppRes) = pResBlock->info.rows == 0 ? NULL : pResBlock;
6,362,819✔
1349
  return code;
6,362,819✔
1350
}
1351

1352
static int32_t extractPkColumnFromFuncs(SNodeList* pFuncs, bool* pHasPk,
3,472,339✔
1353
                                        SColumn* pPkColumn) {
1354
  SNode* pNode;
1355
  FOREACH(pNode, pFuncs) {
19,469,066✔
1356
    if ((nodeType(pNode) == QUERY_NODE_TARGET) &&
16,085,596✔
1357
        (nodeType(((STargetNode*)pNode)->pExpr) == QUERY_NODE_FUNCTION)) {
16,085,596✔
1358
      SFunctionNode* pFunc = (SFunctionNode*)((STargetNode*)pNode)->pExpr;
16,085,596✔
1359
      if (fmIsInterpFunc(pFunc->funcId) && pFunc->hasPk) {
16,085,167✔
1360
        SNode* pNode2 = (pFunc->pParameterList->pTail->pNode);
88,440✔
1361
        if ((nodeType(pNode2) == QUERY_NODE_COLUMN) &&
88,440✔
1362
            ((SColumnNode*)pNode2)->isPk) {
88,440✔
1363
          *pHasPk = true;
88,440✔
1364
          *pPkColumn = extractColumnFromColumnNode((SColumnNode*)pNode2);
88,440✔
1365
          break;
88,440✔
1366
        }
1367
      }
1368
    }
1369
  }
1370
  return TSDB_CODE_SUCCESS;
3,471,910✔
1371
}
1372

1373
static int32_t resetTimeSliceOperState(SOperatorInfo* pOper) {
92,547✔
1374
  STimeSliceOperatorInfo* pInfo = pOper->info;
92,547✔
1375
  SExecTaskInfo*           pTaskInfo = pOper->pTaskInfo;
92,547✔
1376
  SInterpFuncPhysiNode* pPhynode = (SInterpFuncPhysiNode*)pOper->pPhyNode;
92,547✔
1377
  pOper->status = OP_NOT_OPENED;
92,547✔
1378

1379
  setTaskStatus(pOper->pTaskInfo, TASK_NOT_COMPLETED);
92,547✔
1380

1381
  int32_t code = resetExprSupp(&pOper->exprSupp, pTaskInfo, pPhynode->pFuncs,
92,547✔
1382
                               NULL, &pTaskInfo->storageAPI.functionStore);
1383
  if (code == 0) {
92,547✔
1384
    code = resetExprSupp(&pInfo->scalarSup, pTaskInfo, pPhynode->pExprs, NULL,
92,547✔
1385
                         &pTaskInfo->storageAPI.functionStore);
1386
  }
1387

1388
  pInfo->current = pInfo->win.skey;
92,547✔
1389
  pInfo->prevTsSet = false;
92,547✔
1390
  pInfo->prevKey.ts = INT64_MIN;
92,547✔
1391
  pInfo->groupId = 0;
92,547✔
1392
  pInfo->pNextGroupRes = NULL;
92,547✔
1393
  pInfo->pRemainRes = NULL;
92,547✔
1394
  pInfo->remainIndex = 0;
92,547✔
1395

1396
  if (pInfo->hasPk) {
92,547✔
1397
    pInfo->prevKey.numOfPKs = 1;
×
1398
    pInfo->prevKey.pks[0].type = pInfo->pkCol.type;
×
1399

1400
    if (IS_VAR_DATA_TYPE(pInfo->pkCol.type)) {
×
1401
      memset(pInfo->prevKey.pks[0].pData, 0, pInfo->pkCol.bytes);
×
1402
    }
1403
  }
1404
  blockDataCleanup(pInfo->pRes);
92,547✔
1405

1406
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) {
277,641✔
1407
    SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);
185,094✔
1408
    taosMemoryFree(pKey->pData);
185,094✔
1409
  }
1410
  taosArrayDestroy(pInfo->pPrevRow);
92,547✔
1411
  pInfo->pPrevRow = NULL;
92,547✔
1412

1413
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pNextRow); ++i) {
277,641✔
1414
    SGroupKeys* pKey = taosArrayGet(pInfo->pNextRow, i);
185,094✔
1415
    taosMemoryFree(pKey->pData);
185,094✔
1416
  }
1417
  taosArrayDestroy(pInfo->pNextRow);
92,547✔
1418
  pInfo->pNextRow = NULL;
92,547✔
1419

1420
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
277,641✔
1421
    SFillLinearInfo* pKey = taosArrayGet(pInfo->pLinearInfo, i);
185,094✔
1422
    taosMemoryFree(pKey->start.val);
185,094✔
1423
    taosMemoryFree(pKey->end.val);
185,094✔
1424
  }
1425
  taosArrayDestroy(pInfo->pLinearInfo);
92,547✔
1426
  pInfo->pLinearInfo = NULL;
92,547✔
1427

1428
  if (pInfo->pPrevGroupKeys) {
92,547✔
1429
    taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey);
92,547✔
1430
    pInfo->pPrevGroupKeys = NULL;
92,547✔
1431
  }
1432

1433
  return code;
92,547✔
1434
}
1435

1436
int32_t createTimeSliceOperatorInfo(SOperatorInfo* downstream,
3,472,339✔
1437
                                    SPhysiNode* pPhyNode,
1438
                                    SExecTaskInfo* pTaskInfo,
1439
                                    SOperatorInfo** pOptrInfo) {
1440
  QRY_PARAM_CHECK(pOptrInfo);
3,472,339✔
1441

1442
  int32_t                 code = 0;
3,472,339✔
1443
  int32_t                 lino = 0;
3,472,339✔
1444
  STimeSliceOperatorInfo* pInfo =
6,944,678✔
1445
    taosMemoryCalloc(1, sizeof(STimeSliceOperatorInfo));
3,472,339✔
1446
  SOperatorInfo* pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
3,472,339✔
1447

1448
  if (pOperator == NULL || pInfo == NULL) {
3,472,339✔
1449
    code = terrno;
×
1450
    goto _error;
×
1451
  }
1452
  initOperatorCostInfo(pOperator);
3,472,339✔
1453

1454
  pOperator->pPhyNode = pPhyNode;
3,472,339✔
1455
  SInterpFuncPhysiNode* pInterpPhyNode = (SInterpFuncPhysiNode*)pPhyNode;
3,472,339✔
1456
  SExprSupp*            pSup = &pOperator->exprSupp;
3,472,339✔
1457

1458
  int32_t    numOfExprs = 0;
3,472,339✔
1459
  SExprInfo* pExprInfo = NULL;
3,472,339✔
1460
  code = createExprInfo(pInterpPhyNode->pFuncs, NULL, &pExprInfo, &numOfExprs);
3,472,339✔
1461
  QUERY_CHECK_CODE(code, lino, _error);
3,472,339✔
1462

1463
  code = initExprSupp(pSup, pExprInfo, numOfExprs,
3,472,339✔
1464
                      &pTaskInfo->storageAPI.functionStore);
1465
  QUERY_CHECK_CODE(code, lino, _error);
3,472,339✔
1466

1467
  if (pInterpPhyNode->pExprs != NULL) {
3,472,339✔
1468
    int32_t    num = 0;
90,581✔
1469
    SExprInfo* pScalarExprInfo = NULL;
90,581✔
1470
    code = createExprInfo(pInterpPhyNode->pExprs, NULL, &pScalarExprInfo, &num);
90,581✔
1471
    QUERY_CHECK_CODE(code, lino, _error);
90,581✔
1472

1473
    code = initExprSupp(&pInfo->scalarSup, pScalarExprInfo, num,
90,581✔
1474
                        &pTaskInfo->storageAPI.functionStore);
1475
    QUERY_CHECK_CODE(code, lino, _error);
90,581✔
1476
  }
1477

1478
  code = filterInitFromNode((SNode*)pInterpPhyNode->node.pConditions,
3,472,339✔
1479
                            &pOperator->exprSupp.pFilterInfo, 0,
1480
                            pTaskInfo->pStreamRuntimeInfo);
3,472,339✔
1481
  QUERY_CHECK_CODE(code, lino, _error);
3,471,910✔
1482

1483
  pInfo->tsCol =
1484
    extractColumnFromColumnNode((SColumnNode*)pInterpPhyNode->pTimeSeries);
3,471,910✔
1485
  code = extractPkColumnFromFuncs(pInterpPhyNode->pFuncs, &pInfo->hasPk,
3,472,339✔
1486
                                  &pInfo->pkCol);
1487
  QUERY_CHECK_CODE(code, lino, _error);
3,471,910✔
1488

1489
  pInfo->fillType = convertFillType(pInterpPhyNode->fillMode);
3,471,910✔
1490
  initResultSizeInfo(&pOperator->resultInfo, 4096);
3,472,339✔
1491

1492
  pInfo->pFillColInfo =
3,472,339✔
1493
    createFillColInfo(pExprInfo, numOfExprs, NULL, 0, NULL, 0,
3,471,910✔
1494
                      (SNodeListNode*)pInterpPhyNode->pFillValues);
3,471,910✔
1495
  QUERY_CHECK_NULL(pInfo->pFillColInfo, code, lino, _error, terrno);
3,472,339✔
1496

1497
  pInfo->pLinearInfo = NULL;
3,471,910✔
1498
  pInfo->pRes = createDataBlockFromDescNode(pPhyNode->pOutputDataBlockDesc);
3,471,910✔
1499
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
3,472,339✔
1500
  pInfo->win = pInterpPhyNode->timeRange;
3,472,339✔
1501
  code = nodesCloneNode(pInterpPhyNode->pTimeRange, &pInfo->pWin);
3,472,339✔
1502
  QUERY_CHECK_CODE(code, lino, _error);
3,471,910✔
1503
  pInfo->interval.interval = pInterpPhyNode->interval;
3,471,910✔
1504
  pInfo->current = pInfo->win.skey;
3,472,339✔
1505
  pInfo->prevTsSet = false;
3,472,339✔
1506
  pInfo->prevKey.ts = INT64_MIN;
3,471,910✔
1507
  pInfo->groupId = 0;
3,472,339✔
1508
  pInfo->pPrevGroupKeys = NULL;
3,471,910✔
1509
  pInfo->pNextGroupRes = NULL;
3,472,339✔
1510
  pInfo->pRemainRes = NULL;
3,472,339✔
1511
  pInfo->remainIndex = 0;
3,472,339✔
1512
  pInfo->surroundingTime = pInterpPhyNode->surroundingTime;
3,472,339✔
1513

1514
  if (pInfo->hasPk) {
3,472,339✔
1515
    pInfo->prevKey.numOfPKs = 1;
88,440✔
1516
    pInfo->prevKey.ts = INT64_MIN;
88,440✔
1517
    pInfo->prevKey.pks[0].type = pInfo->pkCol.type;
88,440✔
1518

1519
    if (IS_VAR_DATA_TYPE(pInfo->pkCol.type)) {
88,440✔
1520
      pInfo->prevKey.pks[0].pData = taosMemoryCalloc(1, pInfo->pkCol.bytes);
29,480✔
1521
      QUERY_CHECK_NULL(pInfo->prevKey.pks[0].pData, code, lino, _error, terrno);
29,480✔
1522
    }
1523
  }
1524

1525
  setOperatorInfo(pOperator, "TimeSliceOperator",
3,472,339✔
1526
                  QUERY_NODE_PHYSICAL_PLAN_INTERP_FUNC, false, OP_NOT_OPENED,
1527
                  pInfo, pTaskInfo);
1528
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doTimesliceNext,
3,472,339✔
1529
                                         NULL, destroyTimeSliceOperatorInfo,
1530
                                         optrDefaultBufFn, NULL,
1531
                                         optrDefaultGetNextExtFn, NULL);
1532

1533
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
3,471,910✔
1534
  QUERY_CHECK_CODE(code, lino, _error);
3,472,339✔
1535

1536
  //  int32_t code = initKeeperInfo(pSliceInfo, pBlock, &pOperator->exprSupp);
1537
  setOperatorResetStateFn(pOperator, resetTimeSliceOperState);
3,472,339✔
1538
  
1539
  code = appendDownstream(pOperator, &downstream, 1);
3,472,339✔
1540
  QUERY_CHECK_CODE(code, lino, _error);
3,472,339✔
1541

1542
  *pOptrInfo = pOperator;
3,472,339✔
1543
  return TSDB_CODE_SUCCESS;
3,472,339✔
1544

1545
_error:
×
1546
  if (code != TSDB_CODE_SUCCESS) {
×
1547
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1548
  }
1549
  if (pInfo != NULL) destroyTimeSliceOperatorInfo(pInfo);
×
1550
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1551
  pTaskInfo->code = code;
×
1552
  return code;
×
1553
}
1554

1555
void destroyTimeSliceOperatorInfo(void* param) {
3,472,339✔
1556
  STimeSliceOperatorInfo* pInfo = (STimeSliceOperatorInfo*)param;
3,472,339✔
1557

1558
  blockDataDestroy(pInfo->pRes);
3,472,339✔
1559
  pInfo->pRes = NULL;
3,472,339✔
1560

1561
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pPrevRow); ++i) {
13,483,820✔
1562
    SGroupKeys* pKey = taosArrayGet(pInfo->pPrevRow, i);
10,011,481✔
1563
    taosMemoryFree(pKey->pData);
10,011,481✔
1564
  }
1565
  taosArrayDestroy(pInfo->pPrevRow);
3,472,339✔
1566

1567
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pNextRow); ++i) {
13,483,820✔
1568
    SGroupKeys* pKey = taosArrayGet(pInfo->pNextRow, i);
10,011,481✔
1569
    taosMemoryFree(pKey->pData);
10,011,481✔
1570
  }
1571
  taosArrayDestroy(pInfo->pNextRow);
3,472,339✔
1572

1573
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pLinearInfo); ++i) {
13,483,820✔
1574
    SFillLinearInfo* pKey = taosArrayGet(pInfo->pLinearInfo, i);
10,011,481✔
1575
    taosMemoryFree(pKey->start.val);
10,011,481✔
1576
    taosMemoryFree(pKey->end.val);
10,011,481✔
1577
  }
1578
  taosArrayDestroy(pInfo->pLinearInfo);
3,472,339✔
1579

1580
  if (pInfo->pPrevGroupKeys) {
3,472,339✔
1581
    taosArrayDestroyEx(pInfo->pPrevGroupKeys, destroyGroupKey);
3,441,289✔
1582
    pInfo->pPrevGroupKeys = NULL;
3,441,289✔
1583
  }
1584
  if (pInfo->hasPk && IS_VAR_DATA_TYPE(pInfo->pkCol.type)) {
3,472,339✔
1585
    taosMemoryFreeClear(pInfo->prevKey.pks[0].pData);
29,480✔
1586
  }
1587

1588
  cleanupExprSupp(&pInfo->scalarSup);
3,472,339✔
1589
  if (pInfo->pFillColInfo != NULL) {
3,472,339✔
1590
    for (int32_t i = 0; i < pInfo->pFillColInfo->numOfFillExpr; ++i) {
19,623,971✔
1591
      taosVariantDestroy(&pInfo->pFillColInfo[i].fillVal);
16,151,632✔
1592
    }
1593
    taosMemoryFree(pInfo->pFillColInfo);
3,472,339✔
1594
  }
1595
  nodesDestroyNode(pInfo->pWin);
3,472,339✔
1596
  taosMemoryFreeClear(param);
3,472,339✔
1597
}
3,472,339✔
1598

1599
STrueForInfo* getTrueForInfo(struct SOperatorInfo* pOperator) {
123,429,193✔
1600
  if (pOperator == NULL) {
123,429,193✔
1601
    return NULL;
×
1602
  }
1603

1604
  switch (pOperator->operatorType) {
123,429,193✔
1605
    case QUERY_NODE_PHYSICAL_PLAN_MERGE_STATE:
3,838,127✔
1606
      return &((SStateWindowOperatorInfo*)pOperator->info)->trueForInfo;
3,838,127✔
1607
    case QUERY_NODE_PHYSICAL_PLAN_MERGE_EVENT:
4,510,325✔
1608
      return &((SEventWindowOperatorInfo*)pOperator->info)->trueForInfo;
4,510,325✔
1609
    default:
115,084,169✔
1610
      return NULL;
115,084,169✔
1611
  }
1612
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc