• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3523

06 Nov 2024 02:29AM UTC coverage: 55.861% (-2.4%) from 58.216%
#3523

push

travis-ci

web-flow
Merge pull request #28551 from taosdata/feat/TS-5215-2

test(blob): testing & fixes for blob

106075 of 245834 branches covered (43.15%)

Branch coverage included in aggregate %.

0 of 15 new or added lines in 2 files covered. (0.0%)

17003 existing lines in 254 files now uncovered.

181910 of 269703 relevant lines covered (67.45%)

1527639.59 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

72.02
/source/libs/executor/src/streamfilloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "os.h"
18
#include "query.h"
19
#include "taosdef.h"
20
#include "tmsg.h"
21
#include "ttypes.h"
22

23
#include "executorInt.h"
24
#include "tcommon.h"
25
#include "thash.h"
26
#include "ttime.h"
27

28
#include "function.h"
29
#include "operator.h"
30
#include "querynodes.h"
31
#include "querytask.h"
32
#include "tdatablock.h"
33
#include "tfill.h"
34

35
#define FILL_POS_INVALID 0
36
#define FILL_POS_START   1
37
#define FILL_POS_MID     2
38
#define FILL_POS_END     3
39

40
typedef struct STimeRange {
41
  TSKEY    skey;
42
  TSKEY    ekey;
43
  uint64_t groupId;
44
} STimeRange;
45

46
TSKEY getNextWindowTs(TSKEY ts, SInterval* pInterval) {
34✔
47
  STimeWindow win = {.skey = ts, .ekey = ts};
34✔
48
  getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
34✔
49
  return win.skey;
34✔
50
}
51

52
TSKEY getPrevWindowTs(TSKEY ts, SInterval* pInterval) {
15✔
53
  STimeWindow win = {.skey = ts, .ekey = ts};
15✔
54
  getNextTimeWindow(pInterval, &win, TSDB_ORDER_DESC);
15✔
55
  return win.skey;
15✔
56
}
57

58
int32_t setRowCell(SColumnInfoData* pCol, int32_t rowId, const SResultCellData* pCell) {
55,909✔
59
  return colDataSetVal(pCol, rowId, pCell->pData, pCell->isNull);
55,909✔
60
}
61

62
SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index) {
103,653✔
63
  if (!pRaw || !pRaw->pRowVal) {
103,653!
UNCOV
64
    return NULL;
×
65
  }
66
  char*            pData = (char*)pRaw->pRowVal;
103,719✔
67
  SResultCellData* pCell = pRaw->pRowVal;
103,719✔
68
  for (int32_t i = 0; i < index; i++) {
531,103✔
69
    pData += (pCell->bytes + sizeof(SResultCellData));
427,384✔
70
    pCell = (SResultCellData*)pData;
427,384✔
71
  }
72
  return pCell;
103,719✔
73
}
74

75
void* destroyFillColumnInfo(SFillColInfo* pFillCol, int32_t start, int32_t end) {
250✔
76
  for (int32_t i = start; i < end; i++) {
520✔
77
    destroyExprInfo(pFillCol[i].pExpr, 1);
270✔
78
    taosVariantDestroy(&pFillCol[i].fillVal);
270✔
79
  }
80
  taosMemoryFreeClear(pFillCol[start].pExpr);
250!
81
  taosMemoryFree(pFillCol);
250✔
82
  return NULL;
250✔
83
}
84

85
void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
250✔
86
  pFillSup->pAllColInfo = destroyFillColumnInfo(pFillSup->pAllColInfo, pFillSup->numOfFillCols, pFillSup->numOfAllCols);
250✔
87
  tSimpleHashCleanup(pFillSup->pResMap);
250✔
88
  pFillSup->pResMap = NULL;
250✔
89
  cleanupExprSupp(&pFillSup->notFillExprSup);
250✔
90
  if (pFillSup->cur.pRowVal != pFillSup->prev.pRowVal && pFillSup->cur.pRowVal != pFillSup->next.pRowVal) {
250✔
91
    taosMemoryFree(pFillSup->cur.pRowVal);
93✔
92
  }
93
  taosMemoryFree(pFillSup->prev.pRowVal);
250✔
94
  taosMemoryFree(pFillSup->next.pRowVal);
250✔
95
  taosMemoryFree(pFillSup->nextNext.pRowVal);
250✔
96

97
  taosMemoryFree(pFillSup);
250✔
98
}
250✔
99

100
void destroySPoint(void* ptr) {
18,840✔
101
  SPoint* point = (SPoint*)ptr;
18,840✔
102
  taosMemoryFreeClear(point->val);
18,840!
103
}
18,866✔
104

105
void destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) {
250✔
106
  taosArrayDestroyEx(pFillLinear->pEndPoints, destroySPoint);
250✔
107
  taosArrayDestroyEx(pFillLinear->pNextEndPoints, destroySPoint);
250✔
108
  taosMemoryFree(pFillLinear);
250✔
109
}
250✔
110

111
void destroyStreamFillInfo(SStreamFillInfo* pFillInfo) {
250✔
112
  if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F ||
250✔
113
      pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
205✔
114
    taosMemoryFreeClear(pFillInfo->pResRow->pRowVal);
110!
115
    taosMemoryFreeClear(pFillInfo->pResRow);
110!
116
  }
117
  destroyStreamFillLinearInfo(pFillInfo->pLinearInfo);
250✔
118
  pFillInfo->pLinearInfo = NULL;
250✔
119

120
  taosArrayDestroy(pFillInfo->delRanges);
250✔
121
  taosMemoryFree(pFillInfo);
250✔
122
}
250✔
123

124
static void destroyStreamFillOperatorInfo(void* param) {
250✔
125
  SStreamFillOperatorInfo* pInfo = (SStreamFillOperatorInfo*)param;
250✔
126
  destroyStreamFillInfo(pInfo->pFillInfo);
250✔
127
  destroyStreamFillSupporter(pInfo->pFillSup);
250✔
128
  blockDataDestroy(pInfo->pRes);
250✔
129
  pInfo->pRes = NULL;
250✔
130
  blockDataDestroy(pInfo->pSrcBlock);
250✔
131
  pInfo->pSrcBlock = NULL;
250✔
132
  blockDataDestroy(pInfo->pDelRes);
250✔
133
  pInfo->pDelRes = NULL;
250✔
134
  taosArrayDestroy(pInfo->matchInfo.pList);
250✔
135
  pInfo->matchInfo.pList = NULL;
250✔
136
  taosMemoryFree(pInfo);
250✔
137
}
250✔
138

139
static void resetFillWindow(SResultRowData* pRowData) {
4,049✔
140
  pRowData->key = INT64_MIN;
4,049✔
141
  taosMemoryFreeClear(pRowData->pRowVal);
4,049✔
142
}
4,049✔
143

144
void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup, void* pState, SStorageAPI* pAPI) {
942✔
145
  if (pFillSup->cur.pRowVal != pFillSup->prev.pRowVal && pFillSup->cur.pRowVal != pFillSup->next.pRowVal) {
942✔
146
    resetFillWindow(&pFillSup->cur);
717✔
147
  } else {
148
    pFillSup->cur.key = INT64_MIN;
225✔
149
    pFillSup->cur.pRowVal = NULL;
225✔
150
  }
151
  resetFillWindow(&pFillSup->prev);
942✔
152
  resetFillWindow(&pFillSup->next);
942✔
153
  resetFillWindow(&pFillSup->nextNext);
942✔
154
}
942✔
155

156
void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) {
942✔
157
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
942✔
158
  void*        pState = pOperator->pTaskInfo->streamInfo.pState;
942✔
159
  resetPrevAndNextWindow(pFillSup, pState, pAPI);
942✔
160

161
  SWinKey key = {.ts = ts, .groupId = groupId};
942✔
162
  void*   curVal = NULL;
942✔
163
  int32_t curVLen = 0;
942✔
164
  bool    hasCurKey = true;
942✔
165
  int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen);
942✔
166
  if (code == TSDB_CODE_SUCCESS) {
942✔
167
    pFillSup->cur.key = key.ts;
902✔
168
    pFillSup->cur.pRowVal = curVal;
902✔
169
  } else {
170
    qDebug("streamStateFillGet key failed, Data may be deleted. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId);
40✔
171
    pFillSup->cur.key = ts;
40✔
172
    pFillSup->cur.pRowVal = NULL;
40✔
173
    hasCurKey = false;
40✔
174
  }
175

176
  SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, &key);
942✔
177
  SWinKey          preKey = {.ts = INT64_MIN, .groupId = groupId};
942✔
178
  void*            preVal = NULL;
942✔
179
  int32_t          preVLen = 0;
942✔
180
  code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen);
942✔
181

182
  if (code == TSDB_CODE_SUCCESS) {
942✔
183
    pFillSup->prev.key = preKey.ts;
699✔
184
    pFillSup->prev.pRowVal = preVal;
699✔
185

186
    if (hasCurKey) {
699✔
187
      pAPI->stateStore.streamStateCurNext(pState, pCur);
659✔
188
    }
189

190
    pAPI->stateStore.streamStateCurNext(pState, pCur);
699✔
191
  } else {
192
    pAPI->stateStore.streamStateFreeCur(pCur);
243✔
193
    pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key);
243✔
194
  }
195

196
  SWinKey nextKey = {.ts = INT64_MIN, .groupId = groupId};
942✔
197
  void*   nextVal = NULL;
942✔
198
  int32_t nextVLen = 0;
942✔
199
  code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextVLen);
942✔
200
  if (code == TSDB_CODE_SUCCESS) {
942✔
201
    pFillSup->next.key = nextKey.ts;
520✔
202
    pFillSup->next.pRowVal = nextVal;
520✔
203
    if (pFillSup->type == TSDB_FILL_PREV || pFillSup->type == TSDB_FILL_NEXT) {
520✔
204
      pAPI->stateStore.streamStateCurNext(pState, pCur);
203✔
205
      SWinKey nextNextKey = {.groupId = groupId};
203✔
206
      void*   nextNextVal = NULL;
203✔
207
      int32_t nextNextVLen = 0;
203✔
208
      code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextNextKey, (const void**)&nextNextVal, &nextNextVLen);
203✔
209
      if (code == TSDB_CODE_SUCCESS) {
203✔
210
        pFillSup->nextNext.key = nextNextKey.ts;
63✔
211
        pFillSup->nextNext.pRowVal = nextNextVal;
63✔
212
      }
213
    }
214
  }
215
  pAPI->stateStore.streamStateFreeCur(pCur);
942✔
216
}
942✔
217

218
static bool hasPrevWindow(SStreamFillSupporter* pFillSup) { return pFillSup->prev.key != INT64_MIN; }
2,539✔
219
static bool hasNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->next.key != INT64_MIN; }
1,535✔
220
static bool hasNextNextWindow(SStreamFillSupporter* pFillSup) {
64✔
221
  return pFillSup->nextNext.key != INT64_MIN;
64✔
222
  return false;
223
}
224

225
static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal) {
807✔
226
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
807✔
227
  for (int32_t i = 0; i < numOfCols; ++i) {
11,555✔
228
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
10,748✔
229
    SResultCellData* pCell = getResultCell(pRowVal, i);
10,748✔
230
    if (!colDataIsNull_s(pColData, rowId)) {
21,494!
231
      pCell->isNull = false;
10,747✔
232
      pCell->type = pColData->info.type;
10,747✔
233
      pCell->bytes = pColData->info.bytes;
10,747✔
234
      char* val = colDataGetData(pColData, rowId);
10,747!
235
      if (IS_VAR_DATA_TYPE(pCell->type)) {
10,747!
236
        memcpy(pCell->pData, val, varDataTLen(val));
39✔
237
      } else {
238
        memcpy(pCell->pData, val, pCell->bytes);
10,708✔
239
      }
240
    } else {
UNCOV
241
      pCell->isNull = true;
×
242
    }
243
  }
244
  pRowVal->key = ts;
807✔
245
}
807✔
246

UNCOV
247
static void calcDeltaData(SSDataBlock* pBlock, int32_t rowId, SResultRowData* pRowVal, SArray* pDelta,
×
248
                          SFillColInfo* pFillCol, int32_t numOfCol, int32_t winCount, int32_t order) {
UNCOV
249
  for (int32_t i = 0; i < numOfCol; i++) {
×
UNCOV
250
    if (!pFillCol[i].notFillCol) {
×
UNCOV
251
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol + i);
×
UNCOV
252
      SColumnInfoData* pCol = taosArrayGet(pBlock->pDataBlock, slotId);
×
UNCOV
253
      char*            var = colDataGetData(pCol, rowId);
×
254
      double           start = 0;
×
UNCOV
255
      GET_TYPED_DATA(start, double, pCol->info.type, var);
×
UNCOV
256
      SResultCellData* pCell = getResultCell(pRowVal, slotId);
×
UNCOV
257
      double           end = 0;
×
UNCOV
258
      GET_TYPED_DATA(end, double, pCell->type, pCell->pData);
×
UNCOV
259
      double delta = 0;
×
UNCOV
260
      if (order == TSDB_ORDER_ASC) {
×
UNCOV
261
        delta = (end - start) / winCount;
×
262
      } else {
UNCOV
263
        delta = (start - end) / winCount;
×
264
      }
UNCOV
265
      taosArraySet(pDelta, slotId, &delta);
×
266
    }
267
  }
UNCOV
268
}
×
269

270
static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) {
149✔
271
  for (int32_t i = 0; i < numOfCol; i++) {
2,028✔
272
    if (!pFillCol[i].notFillCol) {
1,880✔
273
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol + i);
1,734✔
274
      SResultCellData* pECell = getResultCell(pEndRow, slotId);
1,734✔
275
      SPoint*          pPoint = taosArrayGet(pEndPoins, slotId);
1,734✔
276
      pPoint->key = pEndRow->key;
1,733✔
277
      memcpy(pPoint->val, pECell->pData, pECell->bytes);
1,733✔
278
    }
279
  }
280
}
148✔
281

282
static void setFillInfoStart(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
675✔
283
  ts = taosTimeAdd(ts, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
675✔
284
  pFillInfo->start = ts;
675✔
285
}
675✔
286

287
static void setFillInfoEnd(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
675✔
288
  ts = taosTimeAdd(ts, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision);
675✔
289
  pFillInfo->end = ts;
675✔
290
}
675✔
291

292
static void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
675✔
293
  setFillInfoStart(start, pInterval, pFillInfo);
675✔
294
  pFillInfo->current = pFillInfo->start;
675✔
295
  setFillInfoEnd(end, pInterval, pFillInfo);
675✔
296
}
675✔
297

298
void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
134✔
299
  if (!hasPrevWindow(pFillSup) || !hasNextWindow(pFillSup)) {
134✔
300
    pFillInfo->needFill = false;
54✔
301
    return;
54✔
302
  }
303

304
  TSKEY realStart = taosTimeAdd(pFillSup->prev.key, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
80✔
305
                                pFillSup->interval.precision);
80✔
306

307
  pFillInfo->needFill = true;
80✔
308
  pFillInfo->start = realStart;
80✔
309
  pFillInfo->current = pFillInfo->start;
80✔
310
  pFillInfo->end = end;
80✔
311
  pFillInfo->pos = FILL_POS_INVALID;
80✔
312
  switch (pFillInfo->type) {
80!
313
    case TSDB_FILL_NULL:
32✔
314
    case TSDB_FILL_NULL_F:
315
    case TSDB_FILL_SET_VALUE:
316
    case TSDB_FILL_SET_VALUE_F:
317
      break;
32✔
318
    case TSDB_FILL_PREV:
16✔
319
      pFillInfo->pResRow = &pFillSup->prev;
16✔
320
      break;
16✔
321
    case TSDB_FILL_NEXT:
16✔
322
      pFillInfo->pResRow = &pFillSup->next;
16✔
323
      break;
16✔
324
    case TSDB_FILL_LINEAR: {
16✔
325
      setFillKeyInfo(pFillSup->prev.key, pFillSup->next.key, &pFillSup->interval, pFillInfo);
16✔
326
      pFillInfo->pLinearInfo->hasNext = false;
16✔
327
      pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
16✔
328
      calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
16✔
329
                       pFillSup->numOfAllCols);
330
      pFillInfo->pResRow = &pFillSup->prev;
16✔
331
      pFillInfo->pLinearInfo->winIndex = 0;
16✔
332
    } break;
16✔
UNCOV
333
    default:
×
UNCOV
334
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
UNCOV
335
      break;
×
336
  }
337
}
338

339
void copyNotFillExpData(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
280✔
340
  for (int32_t i = pFillSup->numOfFillCols; i < pFillSup->numOfAllCols; ++i) {
568✔
341
    SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
288✔
342
    int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
288✔
343
    SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
288✔
344
    SResultCellData* pCurCell = getResultCell(&pFillSup->cur, slotId);
288✔
345
    pCell->isNull = pCurCell->isNull;
288✔
346
    if (!pCurCell->isNull) {
288!
347
      memcpy(pCell->pData, pCurCell->pData, pCell->bytes);
288✔
348
    }
349
  }
350
}
280✔
351

352
void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillSupporter* pFillSup,
808✔
353
                      SStreamFillInfo* pFillInfo) {
354
  pFillInfo->preRowKey = pFillSup->cur.key;
808✔
355
  if (!hasPrevWindow(pFillSup) && !hasNextWindow(pFillSup)) {
808✔
356
    pFillInfo->needFill = false;
170✔
357
    pFillInfo->pos = FILL_POS_START;
170✔
358
    return;
170✔
359
  }
360
  TSKEY prevWKey = INT64_MIN;
638✔
361
  TSKEY nextWKey = INT64_MIN;
638✔
362
  if (hasPrevWindow(pFillSup)) {
638✔
363
    prevWKey = pFillSup->prev.key;
434✔
364
  }
365
  if (hasNextWindow(pFillSup)) {
638✔
366
    nextWKey = pFillSup->next.key;
425✔
367
  }
368

369
  pFillInfo->needFill = true;
638✔
370
  pFillInfo->pos = FILL_POS_INVALID;
638✔
371
  switch (pFillInfo->type) {
638!
372
    case TSDB_FILL_NULL:
280✔
373
    case TSDB_FILL_NULL_F:
374
    case TSDB_FILL_SET_VALUE:
375
    case TSDB_FILL_SET_VALUE_F: {
376
      if (pFillSup->prev.key == pFillInfo->preRowKey) {
280!
UNCOV
377
        resetFillWindow(&pFillSup->prev);
×
378
      }
379
      if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) {
280✔
380
        if (pFillSup->next.key == pFillInfo->nextRowKey) {
108✔
381
          pFillInfo->preRowKey = INT64_MIN;
103✔
382
          setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
103✔
383
          pFillInfo->pos = FILL_POS_END;
103✔
384
        } else {
385
          pFillInfo->needFill = false;
5✔
386
          pFillInfo->pos = FILL_POS_START;
5✔
387
        }
388
      } else if (hasPrevWindow(pFillSup)) {
172✔
389
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
93✔
390
        pFillInfo->pos = FILL_POS_END;
93✔
391
      } else {
392
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
79✔
393
        pFillInfo->pos = FILL_POS_START;
79✔
394
      }
395
      copyNotFillExpData(pFillSup, pFillInfo);
280✔
396
    } break;
280✔
397
    case TSDB_FILL_PREV: {
128✔
398
      if (hasNextWindow(pFillSup) && ((pFillSup->next.key != pFillInfo->nextRowKey) ||
128✔
399
                                      (pFillSup->next.key == pFillInfo->nextRowKey && hasNextNextWindow(pFillSup)) ||
64!
400
                                      (pFillSup->next.key == pFillInfo->nextRowKey && !hasPrevWindow(pFillSup)))) {
60!
401
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
46✔
402
        pFillInfo->pos = FILL_POS_START;
46✔
403
        resetFillWindow(&pFillSup->prev);
46✔
404
        pFillSup->prev.key = pFillSup->cur.key;
46✔
405
        pFillSup->prev.pRowVal = pFillSup->cur.pRowVal;
46✔
406
      } else if (hasPrevWindow(pFillSup)) {
82!
407
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
82✔
408
        pFillInfo->pos = FILL_POS_END;
82✔
409
        pFillInfo->preRowKey = INT64_MIN;
82✔
410
      }
411
      pFillInfo->pResRow = &pFillSup->prev;
128✔
412
    } break;
128✔
413
    case TSDB_FILL_NEXT: {
123✔
414
      if (hasPrevWindow(pFillSup)) {
123✔
415
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
86✔
416
        pFillInfo->pos = FILL_POS_END;
86✔
417
        resetFillWindow(&pFillSup->next);
86✔
418
        pFillSup->next.key = pFillSup->cur.key;
86✔
419
        pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
86✔
420
        pFillInfo->preRowKey = INT64_MIN;
86✔
421
      } else {
422
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
37✔
423
        pFillInfo->pos = FILL_POS_START;
37✔
424
      }
425
      pFillInfo->pResRow = &pFillSup->next;
123✔
426
    } break;
123✔
427
    case TSDB_FILL_LINEAR: {
107✔
428
      pFillInfo->pLinearInfo->winIndex = 0;
107✔
429
      if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) {
107✔
430
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
26✔
431
        pFillInfo->pos = FILL_POS_MID;
26✔
432
        pFillInfo->pLinearInfo->nextEnd = nextWKey;
26✔
433
        calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
26✔
434
                         pFillSup->numOfAllCols);
435
        pFillInfo->pResRow = &pFillSup->prev;
26✔
436

437
        calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo,
26✔
438
                         pFillSup->numOfAllCols);
439
        pFillInfo->pLinearInfo->hasNext = true;
26✔
440
      } else if (hasPrevWindow(pFillSup)) {
81✔
441
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
34✔
442
        pFillInfo->pos = FILL_POS_END;
34✔
443
        pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
34✔
444
        calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
34✔
445
                         pFillSup->numOfAllCols);
446
        pFillInfo->pResRow = &pFillSup->prev;
34✔
447
        pFillInfo->pLinearInfo->hasNext = false;
34✔
448
      } else {
449
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
47✔
450
        pFillInfo->pos = FILL_POS_START;
47✔
451
        pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
47✔
452
        calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
47✔
453
                         pFillSup->numOfAllCols);
454
        pFillInfo->pResRow = &pFillSup->cur;
47✔
455
        pFillInfo->pLinearInfo->hasNext = false;
47✔
456
      }
457
    } break;
107✔
UNCOV
458
    default:
×
UNCOV
459
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
UNCOV
460
      break;
×
461
  }
462
}
463

464
static int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes) {
48,912✔
465
  int32_t code = TSDB_CODE_SUCCESS;
48,912✔
466
  int32_t lino = 0;
48,912✔
467
  SWinKey key = {.groupId = groupId, .ts = ts};
48,912✔
468
  if (tSimpleHashGet(pFillSup->pResMap, &key, sizeof(SWinKey)) != NULL) {
48,912✔
469
    (*pRes) = false;
36✔
470
    goto _end;
36✔
471
  }
472
  code = tSimpleHashPut(pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0);
49,269✔
473
  QUERY_CHECK_CODE(code, lino, _end);
49,287!
474
  (*pRes) = true;
49,287✔
475

476
_end:
49,323✔
477
  if (code != TSDB_CODE_SUCCESS) {
49,323!
UNCOV
478
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
479
  }
480
  return code;
49,329✔
481
}
482

483
static int32_t buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock,
35,081✔
484
                               bool* pRes) {
485
  int32_t code = TSDB_CODE_SUCCESS;
35,081✔
486
  int32_t lino = 0;
35,081✔
487
  if (pBlock->info.rows >= pBlock->info.capacity) {
35,081✔
488
    (*pRes) = false;
7✔
489
    goto _end;
7✔
490
  }
491
  uint64_t groupId = pBlock->info.id.groupId;
35,074✔
492
  bool     ckRes = true;
35,074✔
493
  code = checkResult(pFillSup, ts, groupId, &ckRes);
35,074✔
494
  QUERY_CHECK_CODE(code, lino, _end);
35,073!
495

496
  if (pFillSup->hasDelete && !ckRes) {
35,073✔
497
    (*pRes) = true;
28✔
498
    goto _end;
28✔
499
  }
500
  for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
125,943✔
501
    SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
90,878✔
502
    int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
90,878✔
503
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
90,878✔
504
    SFillInfo        tmpInfo = {
90,862✔
505
               .currentKey = ts,
506
               .order = TSDB_ORDER_ASC,
507
               .interval = pFillSup->interval,
508
    };
509
    bool filled = fillIfWindowPseudoColumn(&tmpInfo, pFillCol, pColData, pBlock->info.rows);
90,862✔
510
    if (!filled) {
91,079✔
511
      SResultCellData* pCell = getResultCell(pResRow, slotId);
56,104✔
512
      code = setRowCell(pColData, pBlock->info.rows, pCell);
55,910✔
513
      QUERY_CHECK_CODE(code, lino, _end);
55,923!
514
    }
515
  }
516
  pBlock->info.rows++;
35,065✔
517
  (*pRes) = true;
35,065✔
518

519
_end:
35,100✔
520
  if (code != TSDB_CODE_SUCCESS) {
35,100!
UNCOV
521
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
522
  }
523
  return code;
35,081✔
524
}
525

526
static bool hasRemainCalc(SStreamFillInfo* pFillInfo) {
50,457✔
527
  if (pFillInfo->current != INT64_MIN && pFillInfo->current <= pFillInfo->end) {
50,457✔
528
    return true;
47,519✔
529
  }
530
  return false;
2,938✔
531
}
532

533
static void doStreamFillNormal(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
565✔
534
  int32_t code = TSDB_CODE_SUCCESS;
565✔
535
  int32_t lino = 0;
565✔
536
  while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
34,831✔
537
    STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
34,266✔
538
    if (inWinRange(&pFillSup->winRange, &st)) {
34,266!
539
      bool res = true;
34,266✔
540
      code = buildFillResult(pFillInfo->pResRow, pFillSup, pFillInfo->current, pBlock, &res);
34,266✔
541
      QUERY_CHECK_CODE(code, lino, _end);
34,266!
542
    }
543
    pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
34,266✔
544
                                     pFillSup->interval.precision);
34,266✔
545
  }
546

547
_end:
565✔
548
  if (code != TSDB_CODE_SUCCESS) {
565!
UNCOV
549
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
550
  }
551
}
565✔
552

553
static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
144✔
554
  int32_t code = TSDB_CODE_SUCCESS;
144✔
555
  int32_t lino = 0;
144✔
556
  while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
14,518✔
557
    uint64_t    groupId = pBlock->info.id.groupId;
14,374✔
558
    SWinKey     key = {.groupId = groupId, .ts = pFillInfo->current};
14,374✔
559
    STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
14,374✔
560
    bool        ckRes = true;
14,374✔
561
    code = checkResult(pFillSup, pFillInfo->current, groupId, &ckRes);
14,374✔
562
    QUERY_CHECK_CODE(code, lino, _end);
14,374!
563

564
    if ((pFillSup->hasDelete && !ckRes) || !inWinRange(&pFillSup->winRange, &st)) {
14,374!
565
      pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
16✔
566
                                       pFillSup->interval.precision);
8✔
567
      pFillInfo->pLinearInfo->winIndex++;
8✔
568
      continue;
8✔
569
    }
570
    pFillInfo->pLinearInfo->winIndex++;
14,366✔
571
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
46,098✔
572
      SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
31,731✔
573
      SFillInfo     tmp = {
31,731✔
574
              .currentKey = pFillInfo->current,
31,731✔
575
              .order = TSDB_ORDER_ASC,
576
              .interval = pFillSup->interval,
577
      };
578

579
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
31,731✔
580
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
31,731✔
581
      int16_t          type = pColData->info.type;
31,723✔
582
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
31,723✔
583
      int32_t          index = pBlock->info.rows;
31,679✔
584
      if (pFillCol->notFillCol) {
31,679✔
585
        bool filled = fillIfWindowPseudoColumn(&tmp, pFillCol, pColData, index);
14,366✔
586
        if (!filled) {
14,366✔
587
          code = setRowCell(pColData, index, pCell);
7✔
UNCOV
588
          QUERY_CHECK_CODE(code, lino, _end);
×
589
        }
590
      } else {
591
        if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) {
17,313!
592
          colDataSetNULL(pColData, index);
31!
593
          continue;
31✔
594
        }
595
        SPoint* pEnd = taosArrayGet(pFillInfo->pLinearInfo->pEndPoints, slotId);
17,282✔
596
        double  vCell = 0;
17,282✔
597
        SPoint  start = {0};
17,282✔
598
        start.key = pFillInfo->pResRow->key;
17,282✔
599
        start.val = pCell->pData;
17,282✔
600

601
        SPoint cur = {0};
17,282✔
602
        cur.key = pFillInfo->current;
17,282✔
603
        cur.val = taosMemoryCalloc(1, pCell->bytes);
17,282✔
604
        QUERY_CHECK_NULL(cur.val, code, lino, _end, terrno);
17,341!
605
        taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
17,341✔
606
        code = colDataSetVal(pColData, index, (const char*)cur.val, false);
17,331✔
607
        QUERY_CHECK_CODE(code, lino, _end);
17,316!
608
        destroySPoint(&cur);
17,316✔
609
      }
610
    }
611
    pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
28,733✔
612
                                     pFillSup->interval.precision);
14,367✔
613
    pBlock->info.rows++;
14,366✔
614
  }
615

616
_end:
144✔
617
  if (code != TSDB_CODE_SUCCESS) {
144!
UNCOV
618
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
619
  }
620
}
144✔
621

622
static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) {
808✔
623
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
808✔
624

625
  SWinKey key = {.groupId = groupId, .ts = pRow->key};
808✔
626
  int32_t code = pAPI->stateStore.streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len);
808✔
627
  qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 "  code:%d", key.ts, key.groupId, code);
807✔
628
  if (code != TSDB_CODE_SUCCESS) {
807!
UNCOV
629
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
630
  }
631
}
807✔
632

633
static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* pFillSup, SSDataBlock* pRes) {
858✔
634
  int32_t code = TSDB_CODE_SUCCESS;
858✔
635
  int32_t lino = 0;
858✔
636
  bool    res = false;
858✔
637
  if (pFillInfo->needFill == false) {
858✔
638
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
175✔
639
    QUERY_CHECK_CODE(code, lino, _end);
175!
640
    return;
175✔
641
  }
642

643
  if (pFillInfo->pos == FILL_POS_START) {
683✔
644
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
209✔
645
    QUERY_CHECK_CODE(code, lino, _end);
209!
646
    if (res) {
209!
647
      pFillInfo->pos = FILL_POS_INVALID;
209✔
648
    }
649
  }
650
  if (pFillInfo->type != TSDB_FILL_LINEAR) {
683✔
651
    doStreamFillNormal(pFillSup, pFillInfo, pRes);
565✔
652
  } else {
653
    doStreamFillLinear(pFillSup, pFillInfo, pRes);
118✔
654

655
    if (pFillInfo->pos == FILL_POS_MID) {
118✔
656
      code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
26✔
657
      QUERY_CHECK_CODE(code, lino, _end);
26!
658
      if (res) {
26!
659
        pFillInfo->pos = FILL_POS_INVALID;
26✔
660
      }
661
    }
662

663
    if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) {
118✔
664
      pFillInfo->pLinearInfo->hasNext = false;
26✔
665
      pFillInfo->pLinearInfo->winIndex = 0;
26✔
666
      taosArraySwap(pFillInfo->pLinearInfo->pEndPoints, pFillInfo->pLinearInfo->pNextEndPoints);
26✔
667
      pFillInfo->pResRow = &pFillSup->cur;
26✔
668
      setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo);
26✔
669
      doStreamFillLinear(pFillSup, pFillInfo, pRes);
26✔
670
    }
671
  }
672
  if (pFillInfo->pos == FILL_POS_END) {
683✔
673
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
405✔
674
    QUERY_CHECK_CODE(code, lino, _end);
405!
675
    if (res) {
405✔
676
      pFillInfo->pos = FILL_POS_INVALID;
398✔
677
    }
678
  }
679

680
_end:
285✔
681
  if (code != TSDB_CODE_SUCCESS) {
683!
UNCOV
682
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
683
  }
684
}
685

686
int32_t keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol,
807✔
687
                           int32_t rowId, uint64_t groupId, int32_t rowSize) {
688
  int32_t code = TSDB_CODE_SUCCESS;
807✔
689
  int32_t lino = 0;
807✔
690
  TSKEY ts = tsCol[rowId];
807✔
691
  pFillInfo->nextRowKey = ts;
807✔
692
  SResultRowData tmpNextRow = {.key = ts};
807✔
693
  tmpNextRow.pRowVal = taosMemoryCalloc(1, rowSize);
807✔
694
  QUERY_CHECK_NULL(tmpNextRow.pRowVal, code, lino, _end, terrno);
807!
695
  transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow);
807✔
696
  keepResultInDiscBuf(pOperator, groupId, &tmpNextRow, rowSize);
808✔
697
  taosMemoryFreeClear(tmpNextRow.pRowVal);
807!
698

UNCOV
699
_end:
×
700
  if (code != TSDB_CODE_SUCCESS) {
808!
UNCOV
701
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
702
  }
703
  return code;
808✔
704
}
705

706
static void doFillResults(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo,
808✔
707
                          SSDataBlock* pBlock, TSKEY* tsCol, int32_t rowId, SSDataBlock* pRes) {
708
  uint64_t groupId = pBlock->info.id.groupId;
808✔
709
  getWindowFromDiscBuf(pOperator, tsCol[rowId], groupId, pFillSup);
808✔
710
  if (pFillSup->prev.key == pFillInfo->preRowKey) {
808✔
711
    resetFillWindow(&pFillSup->prev);
374✔
712
  }
713
  setFillValueInfo(pBlock, tsCol[rowId], rowId, pFillSup, pFillInfo);
808✔
714
  doStreamFillRange(pFillInfo, pFillSup, pRes);
808✔
715
}
808✔
716

717
static void doStreamFillImpl(SOperatorInfo* pOperator) {
462✔
718
  int32_t                  code = TSDB_CODE_SUCCESS;
462✔
719
  int32_t                  lino = 0;
462✔
720
  SStreamFillOperatorInfo* pInfo = pOperator->info;
462✔
721
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
462✔
722
  SStreamFillSupporter*    pFillSup = pInfo->pFillSup;
462✔
723
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
462✔
724
  SSDataBlock*             pBlock = pInfo->pSrcBlock;
462✔
725
  uint64_t                 groupId = pBlock->info.id.groupId;
462✔
726
  SSDataBlock*             pRes = pInfo->pRes;
462✔
727
  SColumnInfoData*         pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
462✔
728
  TSKEY*                   tsCol = (TSKEY*)pTsCol->pData;
462✔
729
  pRes->info.id.groupId = groupId;
462✔
730
  pInfo->srcRowIndex++;
462✔
731

732
  if (pInfo->srcRowIndex == 0) {
462!
733
    code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
462✔
734
    QUERY_CHECK_CODE(code, lino, _end);
463!
735
    pInfo->srcRowIndex++;
463✔
736
  }
737

738
  while (pInfo->srcRowIndex < pBlock->info.rows) {
808✔
739
    TSKEY ts = tsCol[pInfo->srcRowIndex];
345✔
740
    code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
345✔
741
    QUERY_CHECK_CODE(code, lino, _end);
345!
742
    doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
345✔
743
    if (pInfo->pRes->info.rows == pInfo->pRes->info.capacity) {
345!
UNCOV
744
      code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
×
UNCOV
745
      QUERY_CHECK_CODE(code, lino, _end);
×
746
      return;
×
747
    }
748
    pInfo->srcRowIndex++;
345✔
749
  }
750
  doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
463✔
751
  code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
463✔
752
  QUERY_CHECK_CODE(code, lino, _end);
463!
753
  blockDataCleanup(pInfo->pSrcBlock);
463✔
754

755
_end:
463✔
756
  if (code != TSDB_CODE_SUCCESS) {
463!
UNCOV
757
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
758
  }
759
}
760

761
static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_t groupId, SSDataBlock* delRes) {
54✔
762
  int32_t          code = TSDB_CODE_SUCCESS;
54✔
763
  int32_t          lino = 0;
54✔
764
  SStorageAPI*     pAPI = &pOp->pTaskInfo->storageAPI;
54✔
765
  void*            pState = pOp->pTaskInfo->streamInfo.pState;
54✔
766
  SExecTaskInfo*   pTaskInfo = pOp->pTaskInfo;
54✔
767
  SSDataBlock*     pBlock = delRes;
54✔
768
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
54✔
769
  SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
54✔
770
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
54✔
771
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
54✔
772
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
54✔
773
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
54✔
774
  SColumnInfoData* pTbNameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
54✔
775
  code = colDataSetVal(pStartCol, pBlock->info.rows, (const char*)&start, false);
54✔
776
  QUERY_CHECK_CODE(code, lino, _end);
54!
777

778
  code = colDataSetVal(pEndCol, pBlock->info.rows, (const char*)&end, false);
54✔
779
  QUERY_CHECK_CODE(code, lino, _end);
54!
780

781
  colDataSetNULL(pUidCol, pBlock->info.rows);
54!
782
  code = colDataSetVal(pGroupCol, pBlock->info.rows, (const char*)&groupId, false);
54✔
783
  QUERY_CHECK_CODE(code, lino, _end);
54!
784

785
  colDataSetNULL(pCalStartCol, pBlock->info.rows);
54!
786
  colDataSetNULL(pCalEndCol, pBlock->info.rows);
54!
787

788
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
54✔
789

790
  void*   tbname = NULL;
54✔
791
  int32_t winCode = TSDB_CODE_SUCCESS;
54✔
792
  code = pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, groupId, &tbname, false, &winCode);
54✔
793
  QUERY_CHECK_CODE(code, lino, _end);
54!
794
  if (winCode != TSDB_CODE_SUCCESS) {
54✔
795
    colDataSetNULL(pTableCol, pBlock->info.rows);
15!
796
  } else {
797
    char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
798
    STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
39✔
799
    code = colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
39✔
800
    QUERY_CHECK_CODE(code, lino, _end);
39!
801
    pAPI->stateStore.streamStateFreeVal(tbname);
39✔
802
  }
803

804
  pBlock->info.rows++;
54✔
805

806
_end:
54✔
807
  if (code != TSDB_CODE_SUCCESS) {
54!
UNCOV
808
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
809
  }
810
  return code;
54✔
811
}
812

813
static int32_t buildDeleteResult(SOperatorInfo* pOperator, TSKEY startTs, TSKEY endTs, uint64_t groupId,
54✔
814
                                 SSDataBlock* delRes) {
815
  int32_t                  code = TSDB_CODE_SUCCESS;
54✔
816
  int32_t                  lino = 0;
54✔
817
  SStreamFillOperatorInfo* pInfo = pOperator->info;
54✔
818
  SStreamFillSupporter*    pFillSup = pInfo->pFillSup;
54✔
819
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
54✔
820
  if (hasPrevWindow(pFillSup)) {
54✔
821
    TSKEY start = getNextWindowTs(pFillSup->prev.key, &pFillSup->interval);
34✔
822
    code = buildDeleteRange(pOperator, start, endTs, groupId, delRes);
34✔
823
    QUERY_CHECK_CODE(code, lino, _end);
34!
824
  } else if (hasNextWindow(pFillSup)) {
20✔
825
    TSKEY end = getPrevWindowTs(pFillSup->next.key, &pFillSup->interval);
15✔
826
    code = buildDeleteRange(pOperator, startTs, end, groupId, delRes);
15✔
827
    QUERY_CHECK_CODE(code, lino, _end);
15!
828
  } else {
829
    code = buildDeleteRange(pOperator, startTs, endTs, groupId, delRes);
5✔
830
    QUERY_CHECK_CODE(code, lino, _end);
5!
831
  }
832

833
_end:
5✔
834
  if (code != TSDB_CODE_SUCCESS) {
54!
UNCOV
835
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
836
  }
837
  return code;
54✔
838
}
839

840
static int32_t doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKEY endTs, uint64_t groupId) {
94✔
841
  int32_t                  code = TSDB_CODE_SUCCESS;
94✔
842
  int32_t                  lino = 0;
94✔
843
  SStorageAPI*             pAPI = &pOperator->pTaskInfo->storageAPI;
94✔
844
  SStreamFillOperatorInfo* pInfo = pOperator->info;
94✔
845
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
94✔
846
  getWindowFromDiscBuf(pOperator, startTs, groupId, pInfo->pFillSup);
94✔
847
  setDeleteFillValueInfo(startTs, endTs, pInfo->pFillSup, pInfo->pFillInfo);
94✔
848
  SWinKey key = {.ts = startTs, .groupId = groupId};
94✔
849
  pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
94✔
850
  if (!pInfo->pFillInfo->needFill) {
94✔
851
    code = buildDeleteResult(pOperator, startTs, endTs, groupId, pInfo->pDelRes);
54✔
852
    QUERY_CHECK_CODE(code, lino, _end);
54!
853
  } else {
854
    STimeRange tw = {
40✔
855
        .skey = startTs,
856
        .ekey = endTs,
857
        .groupId = groupId,
858
    };
859
    void* tmp = taosArrayPush(pInfo->pFillInfo->delRanges, &tw);
40✔
860
    if (!tmp) {
40!
UNCOV
861
      code = terrno;
×
862
      QUERY_CHECK_CODE(code, lino, _end);
×
863
    }
864
  }
865

866
_end:
94✔
867
  if (code != TSDB_CODE_SUCCESS) {
94!
868
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
869
  }
870
  return code;
94✔
871
}
872

873
static void getWindowInfoByKey(SStorageAPI* pAPI, void* pState, TSKEY ts, int64_t groupId, SResultRowData* pWinData) {
×
UNCOV
874
  SWinKey key = {.ts = ts, .groupId = groupId};
×
UNCOV
875
  void*   val = NULL;
×
876
  int32_t len = 0;
×
877
  int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&val, &len);
×
878
  if (code != TSDB_CODE_SUCCESS) {
×
879
    qDebug("get window info by key failed, Data may be deleted, try next window. ts:%" PRId64 ", groupId:%" PRId64, ts,
×
880
           groupId);
881
    SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key);
×
UNCOV
882
    code = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &key, (const void**)&val, &len);
×
UNCOV
883
    pAPI->stateStore.streamStateFreeCur(pCur);
×
UNCOV
884
    qDebug("get window info by key ts:%" PRId64 ", groupId:%" PRId64 ", res%d", ts, groupId, code);
×
885
  }
886

UNCOV
887
  if (code == TSDB_CODE_SUCCESS) {
×
UNCOV
888
    resetFillWindow(pWinData);
×
UNCOV
889
    pWinData->key = key.ts;
×
UNCOV
890
    pWinData->pRowVal = val;
×
891
  }
892
}
×
893

894
static void doDeleteFillFinalize(SOperatorInfo* pOperator) {
1,659✔
895
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
1,659✔
896

897
  SStreamFillOperatorInfo* pInfo = pOperator->info;
1,659✔
898
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
1,659✔
899
  int32_t                  size = taosArrayGetSize(pFillInfo->delRanges);
1,659✔
900
  while (pFillInfo->delIndex < size) {
1,698✔
901
    STimeRange* range = taosArrayGet(pFillInfo->delRanges, pFillInfo->delIndex);
40✔
902
    if (pInfo->pRes->info.id.groupId != 0 && pInfo->pRes->info.id.groupId != range->groupId) {
40!
UNCOV
903
      return;
×
904
    }
905
    getWindowFromDiscBuf(pOperator, range->skey, range->groupId, pInfo->pFillSup);
40✔
906
    TSKEY realEnd = range->ekey + 1;
40✔
907
    if (pInfo->pFillInfo->type == TSDB_FILL_NEXT && pInfo->pFillSup->next.key != realEnd) {
40!
UNCOV
908
      getWindowInfoByKey(pAPI, pOperator->pTaskInfo->streamInfo.pState, realEnd, range->groupId,
×
UNCOV
909
                         &pInfo->pFillSup->next);
×
910
    }
911
    setDeleteFillValueInfo(range->skey, range->ekey, pInfo->pFillSup, pInfo->pFillInfo);
40✔
912
    pFillInfo->delIndex++;
40✔
913
    if (pInfo->pFillInfo->needFill) {
40!
914
      doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
40✔
915
      pInfo->pRes->info.id.groupId = range->groupId;
40✔
916
    }
917
  }
918
}
919

920
static int32_t doDeleteFillResult(SOperatorInfo* pOperator) {
84✔
921
  int32_t                  code = TSDB_CODE_SUCCESS;
84✔
922
  int32_t                  lino = 0;
84✔
923
  SStorageAPI*             pAPI = &pOperator->pTaskInfo->storageAPI;
84✔
924
  SStreamFillOperatorInfo* pInfo = pOperator->info;
84✔
925
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
84✔
926
  SSDataBlock*             pBlock = pInfo->pSrcDelBlock;
84✔
927
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
84✔
928

929
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
84✔
930
  TSKEY*           tsStarts = (TSKEY*)pStartCol->pData;
84✔
931
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
84✔
932
  uint64_t*        groupIds = (uint64_t*)pGroupCol->pData;
84✔
933
  while (pInfo->srcDelRowIndex < pBlock->info.rows) {
217✔
934
    TSKEY            ts = tsStarts[pInfo->srcDelRowIndex];
133✔
935
    TSKEY            endTs = ts;
133✔
936
    uint64_t         groupId = groupIds[pInfo->srcDelRowIndex];
133✔
937
    SWinKey          key = {.ts = ts, .groupId = groupId};
133✔
938
    SStreamStateCur* pCur = pAPI->stateStore.streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &key);
133✔
939

940
    if (!pCur) {
133✔
941
      pInfo->srcDelRowIndex++;
39✔
942
      continue;
39✔
943
    }
944

945
    SWinKey nextKey = {.groupId = groupId, .ts = ts};
94✔
946
    while (pInfo->srcDelRowIndex < pBlock->info.rows) {
283✔
947
      TSKEY    delTs = tsStarts[pInfo->srcDelRowIndex];
238✔
948
      uint64_t delGroupId = groupIds[pInfo->srcDelRowIndex];
238✔
949
      int32_t  winCode = TSDB_CODE_SUCCESS;
238✔
950
      if (groupId != delGroupId) {
238!
951
        break;
49✔
952
      }
953
      if (delTs > nextKey.ts) {
238✔
954
        break;
10✔
955
      }
956

957
      SWinKey delKey = {.groupId = delGroupId, .ts = delTs};
228✔
958
      if (delTs == nextKey.ts) {
228✔
959
        pAPI->stateStore.streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur);
144✔
960
        winCode = pAPI->stateStore.streamStateGetGroupKVByCur(pCur, &nextKey, NULL, NULL);
144✔
961
        // ts will be deleted later
962
        if (delTs != ts) {
144✔
963
          pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &delKey);
50✔
964
          pAPI->stateStore.streamStateFreeCur(pCur);
50✔
965
          pCur = pAPI->stateStore.streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &nextKey);
50✔
966
        }
967
        endTs = TMAX(delTs, nextKey.ts - 1);
144✔
968
        if (winCode != TSDB_CODE_SUCCESS) {
144✔
969
          break;
39✔
970
        }
971
      }
972
      pInfo->srcDelRowIndex++;
189✔
973
    }
974

975
    pAPI->stateStore.streamStateFreeCur(pCur);
94✔
976
    code = doDeleteFillResultImpl(pOperator, ts, endTs, groupId);
94✔
977
    QUERY_CHECK_CODE(code, lino, _end);
94!
978
  }
979

980
  pFillInfo->current = pFillInfo->end + 1;
84✔
981

982
_end:
84✔
983
  if (code != TSDB_CODE_SUCCESS) {
84!
UNCOV
984
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
985
  }
986
  return code;
84✔
987
}
988

989
static void resetStreamFillInfo(SStreamFillOperatorInfo* pInfo) {
1,626✔
990
  tSimpleHashClear(pInfo->pFillSup->pResMap);
1,626✔
991
  pInfo->pFillSup->hasDelete = false;
1,629✔
992
  taosArrayClear(pInfo->pFillInfo->delRanges);
1,629✔
993
  pInfo->pFillInfo->delIndex = 0;
1,629✔
994
}
1,629✔
995

996
static int32_t doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pSrcBlock,
463✔
997
                                              SSDataBlock* pDstBlock) {
998
  int32_t                  code = TSDB_CODE_SUCCESS;
463✔
999
  int32_t                  lino = 0;
463✔
1000
  SStreamFillOperatorInfo* pInfo = pOperator->info;
463✔
1001
  SExprSupp*               pSup = &pOperator->exprSupp;
463✔
1002
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
463✔
1003

1004
  blockDataCleanup(pDstBlock);
463✔
1005
  code = blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows);
463✔
1006
  QUERY_CHECK_CODE(code, lino, _end);
463!
1007

1008
  code = setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
463✔
1009
  QUERY_CHECK_CODE(code, lino, _end);
463!
1010
  code = projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
463✔
1011
  QUERY_CHECK_CODE(code, lino, _end);
463!
1012

1013
  pDstBlock->info.rows = 0;
463✔
1014
  pSup = &pInfo->pFillSup->notFillExprSup;
463✔
1015
  code = setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
463✔
1016
  QUERY_CHECK_CODE(code, lino, _end);
463!
1017
  code = projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
463✔
1018
  QUERY_CHECK_CODE(code, lino, _end);
462!
1019

1020
  pDstBlock->info.id.groupId = pSrcBlock->info.id.groupId;
462✔
1021

1022
  code = blockDataUpdateTsWindow(pDstBlock, pInfo->primaryTsCol);
462✔
1023

1024
_end:
462✔
1025
  if (code != TSDB_CODE_SUCCESS) {
462!
UNCOV
1026
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1027
  }
1028
  return code;
462✔
1029
}
1030

1031
static int32_t doStreamFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
2,248✔
1032
  int32_t                  code = TSDB_CODE_SUCCESS;
2,248✔
1033
  int32_t                  lino = 0;
2,248✔
1034
  SStreamFillOperatorInfo* pInfo = pOperator->info;
2,248✔
1035
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
2,248✔
1036

1037
  if (pOperator->status == OP_EXEC_DONE) {
2,248!
UNCOV
1038
    (*ppRes) = NULL;
×
UNCOV
1039
    return code;
×
1040
  }
1041
  blockDataCleanup(pInfo->pRes);
2,248✔
1042
  if (hasRemainCalc(pInfo->pFillInfo) ||
2,246✔
1043
      (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) {
2,241✔
1044
    doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
10✔
1045
    if (pInfo->pRes->info.rows > 0) {
10!
1046
      printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
10✔
1047
      (*ppRes) = pInfo->pRes;
10✔
1048
      return code;
10✔
1049
    }
1050
  }
1051
  if (pOperator->status == OP_RES_TO_RETURN) {
2,238✔
1052
    doDeleteFillFinalize(pOperator);
31✔
1053
    if (pInfo->pRes->info.rows > 0) {
31!
UNCOV
1054
      printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
UNCOV
1055
      (*ppRes) = pInfo->pRes;
×
UNCOV
1056
      return code;
×
1057
    }
1058
    setOperatorCompleted(pOperator);
31✔
1059
    resetStreamFillInfo(pInfo);
31✔
1060
    (*ppRes) = NULL;
31✔
1061
    return code;
31✔
1062
  }
1063

1064
  SSDataBlock*   fillResult = NULL;
2,207✔
1065
  SOperatorInfo* downstream = pOperator->pDownstream[0];
2,207✔
1066
  while (1) {
1067
    if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows || pInfo->pSrcBlock->info.rows == 0) {
2,237!
1068
      // If there are delete datablocks, we receive  them first.
1069
      SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
2,237✔
1070
      if (pBlock == NULL) {
2,243✔
1071
        pOperator->status = OP_RES_TO_RETURN;
1,627✔
1072
        pInfo->pFillInfo->preRowKey = INT64_MIN;
1,627✔
1073
        if (pInfo->pRes->info.rows > 0) {
1,627!
UNCOV
1074
          printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
UNCOV
1075
          (*ppRes) = pInfo->pRes;
×
UNCOV
1076
          return code;
×
1077
        }
1078
        break;
1,627✔
1079
      }
1080
      printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
616✔
1081

1082
      if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) {
616✔
1083
        pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId;
167✔
1084
        pInfo->pFillInfo->preRowKey = INT64_MIN;
167✔
1085
      }
1086

1087
      pInfo->pFillSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
616✔
1088
      if (pInfo->pFillSup->winRange.ekey <= 0) {
616!
UNCOV
1089
        pInfo->pFillSup->winRange.ekey = INT64_MAX;
×
1090
      }
1091

1092
      switch (pBlock->info.type) {
616!
UNCOV
1093
        case STREAM_RETRIEVE:
×
UNCOV
1094
          (*ppRes) = pBlock;
×
UNCOV
1095
          return code;
×
1096
        case STREAM_DELETE_RESULT: {
84✔
1097
          pInfo->pSrcDelBlock = pBlock;
84✔
1098
          pInfo->srcDelRowIndex = 0;
84✔
1099
          blockDataCleanup(pInfo->pDelRes);
84✔
1100
          pInfo->pFillSup->hasDelete = true;
84✔
1101
          code = doDeleteFillResult(pOperator);
84✔
1102
          QUERY_CHECK_CODE(code, lino, _end);
84!
1103

1104
          if (pInfo->pDelRes->info.rows > 0) {
84✔
1105
            printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
54✔
1106
            (*ppRes) = pInfo->pDelRes;
54✔
1107
            return code;
54✔
1108
          }
1109
          continue;
30✔
1110
        } break;
1111
        case STREAM_NORMAL:
463✔
1112
        case STREAM_INVALID:
1113
        case STREAM_PULL_DATA: {
1114
          code = doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
463✔
1115
          QUERY_CHECK_CODE(code, lino, _end);
462!
1116

1117
          memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
462✔
1118
          pInfo->srcRowIndex = -1;
462✔
1119
        } break;
462✔
1120
        case STREAM_CHECKPOINT:
69✔
1121
        case STREAM_CREATE_CHILD_TABLE: {
1122
          (*ppRes) = pBlock;
69✔
1123
          return code;
69✔
1124
        } break;
UNCOV
1125
        default:
×
UNCOV
1126
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1127
      }
1128
    }
1129

1130
    doStreamFillImpl(pOperator);
462✔
1131
    code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
463✔
1132
    QUERY_CHECK_CODE(code, lino, _end);
463!
1133

1134
    memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
463✔
1135
    pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
463✔
1136
    if (pInfo->pRes->info.rows > 0) {
463!
1137
      break;
463✔
1138
    }
1139
  }
1140
  if (pOperator->status == OP_RES_TO_RETURN) {
2,090✔
1141
    doDeleteFillFinalize(pOperator);
1,628✔
1142
  }
1143

1144
  if (pInfo->pRes->info.rows == 0) {
2,091✔
1145
    setOperatorCompleted(pOperator);
1,597✔
1146
    resetStreamFillInfo(pInfo);
1,596✔
1147
    (*ppRes) = NULL;
1,598✔
1148
    return code;
1,598✔
1149
  }
1150

1151
  pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
494✔
1152
  printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
494✔
1153
  (*ppRes) = pInfo->pRes;
493✔
1154
  return code;
493✔
1155

1156
_end:
×
1157
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1158
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
UNCOV
1159
    pTaskInfo->code = code;
×
UNCOV
1160
    T_LONG_JMP(pTaskInfo->env, code);
×
1161
  }
UNCOV
1162
  setOperatorCompleted(pOperator);
×
UNCOV
1163
  resetStreamFillInfo(pInfo);
×
UNCOV
1164
  (*ppRes) = NULL;
×
UNCOV
1165
  return code;
×
1166
}
1167

1168
static int32_t initResultBuf(SSDataBlock* pInputRes, SStreamFillSupporter* pFillSup) {
250✔
1169
  int32_t numOfCols = taosArrayGetSize(pInputRes->pDataBlock);
250✔
1170
  pFillSup->rowSize = sizeof(SResultCellData) * numOfCols;
250✔
1171
  for (int i = 0; i < numOfCols; i++) {
4,274✔
1172
    SColumnInfoData* pCol = taosArrayGet(pInputRes->pDataBlock, i);
4,024✔
1173
    pFillSup->rowSize += pCol->info.bytes;
4,024✔
1174
  }
1175
  pFillSup->next.key = INT64_MIN;
250✔
1176
  pFillSup->nextNext.key = INT64_MIN;
250✔
1177
  pFillSup->prev.key = INT64_MIN;
250✔
1178
  pFillSup->cur.key = INT64_MIN;
250✔
1179
  pFillSup->next.pRowVal = NULL;
250✔
1180
  pFillSup->nextNext.pRowVal = NULL;
250✔
1181
  pFillSup->prev.pRowVal = NULL;
250✔
1182
  pFillSup->cur.pRowVal = NULL;
250✔
1183

1184
  return TSDB_CODE_SUCCESS;
250✔
1185
}
1186

1187
static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNode, SInterval* pInterval,
250✔
1188
                                               SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI, SSDataBlock* pInputRes) {
1189
  int32_t               code = TSDB_CODE_SUCCESS;
250✔
1190
  int32_t               lino = 0;
250✔
1191
  SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
250✔
1192
  if (!pFillSup) {
250!
UNCOV
1193
    code = terrno;
×
UNCOV
1194
    QUERY_CHECK_CODE(code, lino, _end);
×
1195
  }
1196
  pFillSup->numOfFillCols = numOfFillCols;
250✔
1197
  int32_t    numOfNotFillCols = 0;
250✔
1198
  SExprInfo* noFillExprInfo = NULL;
250✔
1199

1200
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExprInfo, &numOfNotFillCols);
250✔
1201
  QUERY_CHECK_CODE(code, lino, _end);
250!
1202

1203
  pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols,
500✔
1204
                                            NULL, 0, (const SNodeListNode*)(pPhyFillNode->pValues));
250✔
1205
  if (pFillSup->pAllColInfo == NULL) {
250!
UNCOV
1206
    code = terrno;
×
UNCOV
1207
    lino = __LINE__;
×
UNCOV
1208
    destroyExprInfo(noFillExprInfo, numOfNotFillCols);
×
UNCOV
1209
    goto _end;
×
1210
  }
1211

1212
  pFillSup->type = convertFillType(pPhyFillNode->mode);
250✔
1213
  pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols;
250✔
1214
  pFillSup->interval = *pInterval;
250✔
1215
  pFillSup->pAPI = pAPI;
250✔
1216

1217
  code = initResultBuf(pInputRes, pFillSup);
250✔
1218
  QUERY_CHECK_CODE(code, lino, _end);
250!
1219

1220
  SExprInfo* noFillExpr = NULL;
250✔
1221
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExpr, &numOfNotFillCols);
250✔
1222
  QUERY_CHECK_CODE(code, lino, _end);
250!
1223

1224
  code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols, &pAPI->functionStore);
250✔
1225
  QUERY_CHECK_CODE(code, lino, _end);
250!
1226

1227
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
250✔
1228
  pFillSup->pResMap = tSimpleHashInit(16, hashFn);
250✔
1229
  QUERY_CHECK_NULL(pFillSup->pResMap, code, lino, _end, terrno);
250!
1230
  pFillSup->hasDelete = false;
250✔
1231

1232
_end:
250✔
1233
  if (code != TSDB_CODE_SUCCESS) {
250!
UNCOV
1234
    destroyStreamFillSupporter(pFillSup);
×
UNCOV
1235
    pFillSup = NULL;
×
UNCOV
1236
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1237
  }
1238
  return pFillSup;
250✔
1239
}
1240

1241
SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pRes) {
250✔
1242
  int32_t          code = TSDB_CODE_SUCCESS;
250✔
1243
  int32_t          lino = 0;
250✔
1244
  SStreamFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SStreamFillInfo));
250✔
1245
  if (!pFillInfo) {
250!
UNCOV
1246
    code = terrno;
×
UNCOV
1247
    QUERY_CHECK_CODE(code, lino, _end);
×
1248
  }
1249

1250
  pFillInfo->start = INT64_MIN;
250✔
1251
  pFillInfo->current = INT64_MIN;
250✔
1252
  pFillInfo->end = INT64_MIN;
250✔
1253
  pFillInfo->preRowKey = INT64_MIN;
250✔
1254
  pFillInfo->needFill = false;
250✔
1255
  pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo));
250✔
1256
  if (!pFillInfo) {
250!
UNCOV
1257
    code = terrno;
×
UNCOV
1258
    QUERY_CHECK_CODE(code, lino, _end);
×
1259
  }
1260

1261
  pFillInfo->pLinearInfo->hasNext = false;
250✔
1262
  pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
250✔
1263
  pFillInfo->pLinearInfo->pEndPoints = NULL;
250✔
1264
  pFillInfo->pLinearInfo->pNextEndPoints = NULL;
250✔
1265
  if (pFillSup->type == TSDB_FILL_LINEAR) {
250✔
1266
    pFillInfo->pLinearInfo->pEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
51✔
1267
    if (!pFillInfo->pLinearInfo->pEndPoints) {
51!
UNCOV
1268
      code = terrno;
×
UNCOV
1269
      QUERY_CHECK_CODE(code, lino, _end);
×
1270
    }
1271

1272
    pFillInfo->pLinearInfo->pNextEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
51✔
1273
    if (!pFillInfo->pLinearInfo->pNextEndPoints) {
51!
UNCOV
1274
      code = terrno;
×
UNCOV
1275
      QUERY_CHECK_CODE(code, lino, _end);
×
1276
    }
1277

1278
    for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
813✔
1279
      SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
762✔
1280
      SPoint           value = {0};
762✔
1281
      value.val = taosMemoryCalloc(1, pColData->info.bytes);
762✔
1282
      if (!value.val) {
762!
UNCOV
1283
        code = terrno;
×
UNCOV
1284
        QUERY_CHECK_CODE(code, lino, _end);
×
1285
      }
1286

1287
      void* tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &value);
762✔
1288
      if (!tmpRes) {
762!
1289
        code = terrno;
×
UNCOV
1290
        QUERY_CHECK_CODE(code, lino, _end);
×
1291
      }
1292

1293
      value.val = taosMemoryCalloc(1, pColData->info.bytes);
762✔
1294
      if (!value.val) {
762!
UNCOV
1295
        code = terrno;
×
UNCOV
1296
        QUERY_CHECK_CODE(code, lino, _end);
×
1297
      }
1298

1299
      tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &value);
762✔
1300
      if (!tmpRes) {
762!
UNCOV
1301
        code = terrno;
×
UNCOV
1302
        QUERY_CHECK_CODE(code, lino, _end);
×
1303
      }
1304
    }
1305
  }
1306
  pFillInfo->pLinearInfo->winIndex = 0;
250✔
1307

1308
  pFillInfo->pResRow = NULL;
250✔
1309
  if (pFillSup->type == TSDB_FILL_SET_VALUE || pFillSup->type == TSDB_FILL_SET_VALUE_F ||
250✔
1310
      pFillSup->type == TSDB_FILL_NULL || pFillSup->type == TSDB_FILL_NULL_F) {
205✔
1311
    pFillInfo->pResRow = taosMemoryCalloc(1, sizeof(SResultRowData));
110✔
1312
    if (!pFillInfo->pResRow) {
110!
UNCOV
1313
      code = terrno;
×
UNCOV
1314
      QUERY_CHECK_CODE(code, lino, _end);
×
1315
    }
1316

1317
    pFillInfo->pResRow->key = INT64_MIN;
110✔
1318
    pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
110✔
1319
    if (!pFillInfo->pResRow->pRowVal) {
110!
UNCOV
1320
      code = terrno;
×
UNCOV
1321
      QUERY_CHECK_CODE(code, lino, _end);
×
1322
    }
1323

1324
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
1,919✔
1325
      SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
1,809✔
1326
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, i);
1,809✔
1327
      pCell->bytes = pColData->info.bytes;
1,809✔
1328
      pCell->type = pColData->info.type;
1,809✔
1329
    }
1330
  }
1331

1332
  pFillInfo->type = pFillSup->type;
250✔
1333
  pFillInfo->delRanges = taosArrayInit(16, sizeof(STimeRange));
250✔
1334
  if (!pFillInfo->delRanges) {
250!
1335
    code = terrno;
×
1336
    QUERY_CHECK_CODE(code, lino, _end);
×
1337
  }
1338

1339
  pFillInfo->delIndex = 0;
250✔
1340
  pFillInfo->curGroupId = 0;
250✔
1341
  return pFillInfo;
250✔
1342

UNCOV
1343
_end:
×
UNCOV
1344
  if (code != TSDB_CODE_SUCCESS) {
×
UNCOV
1345
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1346
  }
UNCOV
1347
  destroyStreamFillInfo(pFillInfo);
×
UNCOV
1348
  return NULL;
×
1349
}
1350

1351
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode,
250✔
1352
                                            SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1353
  QRY_PARAM_CHECK(pOptrInfo);
250!
1354

1355
  int32_t                  code = TSDB_CODE_SUCCESS;
250✔
1356
  int32_t                  lino = 0;
250✔
1357
  SStreamFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFillOperatorInfo));
250✔
1358
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
250✔
1359
  if (pInfo == NULL || pOperator == NULL) {
250!
UNCOV
1360
    code = terrno;
×
UNCOV
1361
    QUERY_CHECK_CODE(code, lino, _error);
×
1362
  }
1363

1364
  SInterval* pInterval = &((SStreamIntervalOperatorInfo*)downstream->info)->interval;
250✔
1365
  int32_t    numOfFillCols = 0;
250✔
1366
  SExprInfo* pFillExprInfo = NULL;
250✔
1367

1368
  code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols);
250✔
1369
  QUERY_CHECK_CODE(code, lino, _error);
250!
1370

1371
  code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore);
250✔
1372
  QUERY_CHECK_CODE(code, lino, _error);
250!
1373

1374
  pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
250✔
1375
  QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno);
250!
1376

1377
  pInfo->pFillSup = initStreamFillSup(pPhyFillNode, pInterval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI,
250✔
1378
                                      pInfo->pSrcBlock);
1379
  if (!pInfo->pFillSup) {
250!
UNCOV
1380
    code = TSDB_CODE_FAILED;
×
UNCOV
1381
    QUERY_CHECK_CODE(code, lino, _error);
×
1382
  }
1383

1384
  initResultSizeInfo(&pOperator->resultInfo, 4096);
250✔
1385
  pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
250✔
1386
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
250!
1387

1388
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
250✔
1389
  QUERY_CHECK_CODE(code, lino, _error);
250!
1390

1391
  code = blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity);
250✔
1392
  QUERY_CHECK_CODE(code, lino, _error);
250!
1393

1394
  pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pInfo->pRes);
250✔
1395
  if (!pInfo->pFillInfo) {
250!
UNCOV
1396
    goto _error;
×
1397
  }
1398

1399
  if (pInfo->pFillInfo->type == TSDB_FILL_SET_VALUE || pInfo->pFillInfo->type == TSDB_FILL_SET_VALUE_F) {
250✔
1400
    for (int32_t i = 0; i < pInfo->pFillSup->numOfAllCols; ++i) {
771✔
1401
      SFillColInfo*    pFillCol = pInfo->pFillSup->pAllColInfo + i;
726✔
1402
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
726✔
1403
      SResultCellData* pCell = getResultCell(pInfo->pFillInfo->pResRow, slotId);
726✔
1404
      SVariant*        pVar = &(pFillCol->fillVal);
726✔
1405
      if (pCell->type == TSDB_DATA_TYPE_FLOAT) {
726!
UNCOV
1406
        float v = 0;
×
UNCOV
1407
        GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
×
UNCOV
1408
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
×
1409
      } else if (IS_FLOAT_TYPE(pCell->type)) {
969!
1410
        double v = 0;
243✔
1411
        GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
243!
1412
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
243!
1413
      } else if (IS_INTEGER_TYPE(pCell->type)) {
921!
1414
        int64_t v = 0;
438✔
1415
        GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
438!
1416
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
438!
1417
      } else {
1418
        pCell->isNull = true;
45✔
1419
      }
1420
    }
1421
  } else if (pInfo->pFillInfo->type == TSDB_FILL_NULL || pInfo->pFillInfo->type == TSDB_FILL_NULL_F) {
205✔
1422
    for (int32_t i = 0; i < pInfo->pFillSup->numOfAllCols; ++i) {
1,148✔
1423
      SFillColInfo*    pFillCol = pInfo->pFillSup->pAllColInfo + i;
1,083✔
1424
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
1,083✔
1425
      SResultCellData* pCell = getResultCell(pInfo->pFillInfo->pResRow, slotId);
1,083✔
1426
      pCell->isNull = true;
1,083✔
1427
    }
1428
  }
1429

1430
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
250✔
1431
  QUERY_CHECK_CODE(code, lino, _error);
250!
1432

1433
  code = blockDataEnsureCapacity(pInfo->pDelRes, pOperator->resultInfo.capacity);
250✔
1434
  QUERY_CHECK_CODE(code, lino, _error);
250!
1435

1436
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
250✔
1437
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
250✔
1438

1439
  int32_t numOfOutputCols = 0;
250✔
1440
  code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
250✔
1441
                             COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
1442
  QUERY_CHECK_CODE(code, lino, _error);
250!
1443

1444
  code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
250✔
1445
  QUERY_CHECK_CODE(code, lino, _error);
250!
1446

1447
  pInfo->srcRowIndex = -1;
250✔
1448
  setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
250✔
1449
                  pTaskInfo);
1450
  pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo,
250✔
1451
                                         optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1452
  setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
250✔
1453

1454
  code = appendDownstream(pOperator, &downstream, 1);
250✔
1455
  QUERY_CHECK_CODE(code, lino, _error);
250!
1456

1457
  *pOptrInfo = pOperator;
250✔
1458
  return TSDB_CODE_SUCCESS;
250✔
1459

UNCOV
1460
_error:
×
UNCOV
1461
  qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1462

UNCOV
1463
  if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo);
×
UNCOV
1464
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1465
  pTaskInfo->code = code;
×
1466
  return code;
×
1467
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc