• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3546

03 Dec 2024 10:02AM UTC coverage: 60.691% (-0.1%) from 60.839%
#3546

push

travis-ci

web-flow
Merge pull request #29015 from taosdata/fix/TS-5668

[TS-5668] fix(keeper): fix endpoint value too long for column/tag and eliminate warnings

120577 of 253823 branches covered (47.5%)

Branch coverage included in aggregate %.

201666 of 277134 relevant lines covered (72.77%)

18719900.08 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.87
/source/libs/executor/src/streamfilloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "os.h"
18
#include "query.h"
19
#include "taosdef.h"
20
#include "tmsg.h"
21
#include "ttypes.h"
22

23
#include "executorInt.h"
24
#include "streamexecutorInt.h"
25
#include "tcommon.h"
26
#include "thash.h"
27
#include "ttime.h"
28

29
#include "function.h"
30
#include "operator.h"
31
#include "querynodes.h"
32
#include "querytask.h"
33
#include "tdatablock.h"
34
#include "tfill.h"
35

36
#define FILL_POS_INVALID 0
37
#define FILL_POS_START   1
38
#define FILL_POS_MID     2
39
#define FILL_POS_END     3
40

41
typedef struct STimeRange {
42
  TSKEY    skey;
43
  TSKEY    ekey;
44
  uint64_t groupId;
45
} STimeRange;
46

47
TSKEY getNextWindowTs(TSKEY ts, SInterval* pInterval) {
58✔
48
  STimeWindow win = {.skey = ts, .ekey = ts};
58✔
49
  getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
58✔
50
  return win.skey;
58✔
51
}
52

53
TSKEY getPrevWindowTs(TSKEY ts, SInterval* pInterval) {
30✔
54
  STimeWindow win = {.skey = ts, .ekey = ts};
30✔
55
  getNextTimeWindow(pInterval, &win, TSDB_ORDER_DESC);
30✔
56
  return win.skey;
30✔
57
}
58

59
int32_t setRowCell(SColumnInfoData* pCol, int32_t rowId, const SResultCellData* pCell) {
93,855✔
60
  return colDataSetVal(pCol, rowId, pCell->pData, pCell->isNull);
93,855✔
61
}
62

63
SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index) {
170,188✔
64
  if (!pRaw || !pRaw->pRowVal) {
170,188!
65
    return NULL;
×
66
  }
67
  char*            pData = (char*)pRaw->pRowVal;
170,246✔
68
  SResultCellData* pCell = pRaw->pRowVal;
170,246✔
69
  for (int32_t i = 0; i < index; i++) {
1,220,145✔
70
    pData += (pCell->bytes + sizeof(SResultCellData));
1,049,899✔
71
    pCell = (SResultCellData*)pData;
1,049,899✔
72
  }
73
  return pCell;
170,246✔
74
}
75

76
void* destroyFillColumnInfo(SFillColInfo* pFillCol, int32_t start, int32_t end) {
468✔
77
  for (int32_t i = start; i < end; i++) {
942✔
78
    destroyExprInfo(pFillCol[i].pExpr, 1);
474✔
79
    taosVariantDestroy(&pFillCol[i].fillVal);
474✔
80
  }
81
  if (start < end) {
468✔
82
    taosMemoryFreeClear(pFillCol[start].pExpr);
441!
83
  }
84
  taosMemoryFree(pFillCol);
468✔
85
  return NULL;
468✔
86
}
87

88
void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
468✔
89
  if (pFillSup == NULL) {
468!
90
    return;
×
91
  }
92
  pFillSup->pAllColInfo = destroyFillColumnInfo(pFillSup->pAllColInfo, pFillSup->numOfFillCols, pFillSup->numOfAllCols);
468✔
93
  tSimpleHashCleanup(pFillSup->pResMap);
468✔
94
  pFillSup->pResMap = NULL;
468✔
95
  cleanupExprSupp(&pFillSup->notFillExprSup);
468✔
96
  if (pFillSup->cur.pRowVal != pFillSup->prev.pRowVal && pFillSup->cur.pRowVal != pFillSup->next.pRowVal) {
468✔
97
    taosMemoryFree(pFillSup->cur.pRowVal);
165✔
98
  }
99
  taosMemoryFree(pFillSup->prev.pRowVal);
468✔
100
  taosMemoryFree(pFillSup->next.pRowVal);
468✔
101
  taosMemoryFree(pFillSup->nextNext.pRowVal);
468✔
102

103
  taosMemoryFree(pFillSup->pOffsetInfo);
468✔
104

105
  taosMemoryFree(pFillSup);
468✔
106
}
107

108
void destroySPoint(void* ptr) {
24,502✔
109
  SPoint* point = (SPoint*)ptr;
24,502✔
110
  taosMemoryFreeClear(point->val);
24,502!
111
}
24,582✔
112

113
void destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) {
468✔
114
  taosArrayDestroyEx(pFillLinear->pEndPoints, destroySPoint);
468✔
115
  taosArrayDestroyEx(pFillLinear->pNextEndPoints, destroySPoint);
468✔
116
  taosMemoryFree(pFillLinear);
468✔
117
}
468✔
118

119
void destroyStreamFillInfo(SStreamFillInfo* pFillInfo) {
468✔
120
  if (pFillInfo == NULL) {
468!
121
    return;
×
122
  } 
123
  if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F ||
468✔
124
      pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
375✔
125
    taosMemoryFreeClear(pFillInfo->pResRow->pRowVal);
215!
126
    taosMemoryFreeClear(pFillInfo->pResRow);
215!
127
    taosMemoryFreeClear(pFillInfo->pNonFillRow->pRowVal);
215!
128
    taosMemoryFreeClear(pFillInfo->pNonFillRow);
215!
129
  }
130
  destroyStreamFillLinearInfo(pFillInfo->pLinearInfo);
468✔
131
  pFillInfo->pLinearInfo = NULL;
468✔
132

133
  taosArrayDestroy(pFillInfo->delRanges);
468✔
134
  taosMemoryFreeClear(pFillInfo->pTempBuff);
468!
135
  taosMemoryFree(pFillInfo);
468✔
136
}
137

138
static void destroyStreamFillOperatorInfo(void* param) {
441✔
139
  SStreamFillOperatorInfo* pInfo = (SStreamFillOperatorInfo*)param;
441✔
140
  destroyStreamFillInfo(pInfo->pFillInfo);
441✔
141
  destroyStreamFillSupporter(pInfo->pFillSup);
441✔
142
  blockDataDestroy(pInfo->pRes);
441✔
143
  pInfo->pRes = NULL;
441✔
144
  blockDataDestroy(pInfo->pSrcBlock);
441✔
145
  pInfo->pSrcBlock = NULL;
441✔
146
  blockDataDestroy(pInfo->pDelRes);
441✔
147
  pInfo->pDelRes = NULL;
441✔
148
  taosArrayDestroy(pInfo->matchInfo.pList);
441✔
149
  pInfo->matchInfo.pList = NULL;
441✔
150
  taosArrayDestroy(pInfo->pUpdated);
441✔
151
  clearGroupResInfo(&pInfo->groupResInfo);
441✔
152
  taosArrayDestroy(pInfo->pCloseTs);
441✔
153

154
  if (pInfo->stateStore.streamFileStateDestroy != NULL) {
441✔
155
    pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
34✔
156
  }
157

158
  if (pInfo->pState != NULL) {
441✔
159
    taosMemoryFreeClear(pInfo->pState);
34!
160
  }
161

162
  taosMemoryFree(pInfo);
441✔
163
}
441✔
164

165
static void resetFillWindow(SResultRowData* pRowData) {
8,342✔
166
  pRowData->key = INT64_MIN;
8,342✔
167
  taosMemoryFreeClear(pRowData->pRowVal);
8,342✔
168
}
8,342✔
169

170
static void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup) {
1,919✔
171
  if (pFillSup->cur.pRowVal != pFillSup->prev.pRowVal && pFillSup->cur.pRowVal != pFillSup->next.pRowVal) {
1,919✔
172
    resetFillWindow(&pFillSup->cur);
1,469✔
173
  } else {
174
    pFillSup->cur.key = INT64_MIN;
450✔
175
    pFillSup->cur.pRowVal = NULL;
450✔
176
  }
177
  resetFillWindow(&pFillSup->prev);
1,919✔
178
  resetFillWindow(&pFillSup->next);
1,922✔
179
  resetFillWindow(&pFillSup->nextNext);
1,922✔
180
}
1,922✔
181

182
void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) {
1,919✔
183
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
1,919✔
184
  void*        pState = pOperator->pTaskInfo->streamInfo.pState;
1,919✔
185
  resetPrevAndNextWindow(pFillSup);
1,919✔
186

187
  SWinKey key = {.ts = ts, .groupId = groupId};
1,922✔
188
  void*   curVal = NULL;
1,922✔
189
  int32_t curVLen = 0;
1,922✔
190
  bool    hasCurKey = true;
1,922✔
191
  int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen, NULL);
1,922✔
192
  if (code == TSDB_CODE_SUCCESS) {
1,921✔
193
    pFillSup->cur.key = key.ts;
1,866✔
194
    pFillSup->cur.pRowVal = curVal;
1,866✔
195
  } else {
196
    qDebug("streamStateFillGet key failed, Data may be deleted. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId);
55✔
197
    pFillSup->cur.key = ts;
55✔
198
    pFillSup->cur.pRowVal = NULL;
55✔
199
    hasCurKey = false;
55✔
200
  }
201

202
  SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, &key);
1,921✔
203
  SWinKey          preKey = {.ts = INT64_MIN, .groupId = groupId};
1,922✔
204
  void*            preVal = NULL;
1,922✔
205
  int32_t          preVLen = 0;
1,922✔
206
  code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen);
1,922✔
207

208
  if (code == TSDB_CODE_SUCCESS) {
1,922✔
209
    pFillSup->prev.key = preKey.ts;
1,461✔
210
    pFillSup->prev.pRowVal = preVal;
1,461✔
211

212
    if (hasCurKey) {
1,461✔
213
      pAPI->stateStore.streamStateCurNext(pState, pCur);
1,406✔
214
    }
215

216
    pAPI->stateStore.streamStateCurNext(pState, pCur);
1,461✔
217
  } else {
218
    pAPI->stateStore.streamStateFreeCur(pCur);
461✔
219
    pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key);
460✔
220
  }
221

222
  SWinKey nextKey = {.ts = INT64_MIN, .groupId = groupId};
1,922✔
223
  void*   nextVal = NULL;
1,922✔
224
  int32_t nextVLen = 0;
1,922✔
225
  code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextVLen);
1,922✔
226
  if (code == TSDB_CODE_SUCCESS) {
1,922✔
227
    pFillSup->next.key = nextKey.ts;
937✔
228
    pFillSup->next.pRowVal = nextVal;
937✔
229
    if (pFillSup->type == TSDB_FILL_PREV || pFillSup->type == TSDB_FILL_NEXT) {
937✔
230
      pAPI->stateStore.streamStateCurNext(pState, pCur);
363✔
231
      SWinKey nextNextKey = {.groupId = groupId};
363✔
232
      void*   nextNextVal = NULL;
363✔
233
      int32_t nextNextVLen = 0;
363✔
234
      code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextNextKey, (const void**)&nextNextVal, &nextNextVLen);
363✔
235
      if (code == TSDB_CODE_SUCCESS) {
363✔
236
        pFillSup->nextNext.key = nextNextKey.ts;
153✔
237
        pFillSup->nextNext.pRowVal = nextNextVal;
153✔
238
      }
239
    }
240
  }
241
  pAPI->stateStore.streamStateFreeCur(pCur);
1,922✔
242
}
1,922✔
243

244
bool hasCurWindow(SStreamFillSupporter* pFillSup) { return pFillSup->cur.key != INT64_MIN; }
×
245
bool hasPrevWindow(SStreamFillSupporter* pFillSup) { return pFillSup->prev.key != INT64_MIN; }
6,225✔
246
bool hasNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->next.key != INT64_MIN; }
3,855✔
247
static bool hasNextNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->nextNext.key != INT64_MIN; }
118✔
248

249
static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal) {
1,886✔
250
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1,886✔
251
  for (int32_t i = 0; i < numOfCols; ++i) {
31,401✔
252
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
29,521✔
253
    SResultCellData* pCell = getResultCell(pRowVal, i);
29,519✔
254
    if (!colDataIsNull_s(pColData, rowId)) {
59,030!
255
      pCell->isNull = false;
29,516✔
256
      pCell->type = pColData->info.type;
29,516✔
257
      pCell->bytes = pColData->info.bytes;
29,516✔
258
      char* val = colDataGetData(pColData, rowId);
29,516!
259
      if (IS_VAR_DATA_TYPE(pCell->type)) {
29,516!
260
        memcpy(pCell->pData, val, varDataTLen(val));
38✔
261
      } else {
262
        memcpy(pCell->pData, val, pCell->bytes);
29,478✔
263
      }
264
    } else {
265
      pCell->isNull = true;
×
266
    }
267
  }
268
  pRowVal->key = ts;
1,880✔
269
}
1,880✔
270

271
static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) {
302✔
272
  for (int32_t i = 0; i < numOfCol; i++) {
5,114✔
273
    if (!pFillCol[i].notFillCol) {
4,811✔
274
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol + i);
4,514✔
275
      SResultCellData* pECell = getResultCell(pEndRow, slotId);
4,514✔
276
      SPoint*          pPoint = taosArrayGet(pEndPoins, slotId);
4,520✔
277
      pPoint->key = pEndRow->key;
4,515✔
278
      memcpy(pPoint->val, pECell->pData, pECell->bytes);
4,515✔
279
    }
280
  }
281
}
303✔
282

283
static void setFillInfoStart(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
1,467✔
284
  ts = taosTimeAdd(ts, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
1,467✔
285
  pFillInfo->start = ts;
1,467✔
286
}
1,467✔
287

288
static void setFillInfoEnd(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
1,467✔
289
  ts = taosTimeAdd(ts, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision);
1,467✔
290
  pFillInfo->end = ts;
1,467✔
291
}
1,467✔
292

293
static void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
1,466✔
294
  setFillInfoStart(start, pInterval, pFillInfo);
1,466✔
295
  pFillInfo->current = pFillInfo->start;
1,467✔
296
  setFillInfoEnd(end, pInterval, pFillInfo);
1,467✔
297
}
1,467✔
298

299
void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
203✔
300
  if (!hasPrevWindow(pFillSup) || !hasNextWindow(pFillSup)) {
203✔
301
    pFillInfo->needFill = false;
93✔
302
    return;
93✔
303
  }
304

305
  TSKEY realStart = taosTimeAdd(pFillSup->prev.key, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
110✔
306
                                pFillSup->interval.precision);
110✔
307

308
  pFillInfo->needFill = true;
110✔
309
  pFillInfo->start = realStart;
110✔
310
  pFillInfo->current = pFillInfo->start;
110✔
311
  pFillInfo->end = end;
110✔
312
  pFillInfo->pos = FILL_POS_INVALID;
110✔
313
  switch (pFillInfo->type) {
110!
314
    case TSDB_FILL_NULL:
44✔
315
    case TSDB_FILL_NULL_F:
316
    case TSDB_FILL_SET_VALUE:
317
    case TSDB_FILL_SET_VALUE_F:
318
      break;
44✔
319
    case TSDB_FILL_PREV:
22✔
320
      pFillInfo->pResRow = &pFillSup->prev;
22✔
321
      break;
22✔
322
    case TSDB_FILL_NEXT:
22✔
323
      pFillInfo->pResRow = &pFillSup->next;
22✔
324
      break;
22✔
325
    case TSDB_FILL_LINEAR: {
22✔
326
      setFillKeyInfo(pFillSup->prev.key, pFillSup->next.key, &pFillSup->interval, pFillInfo);
22✔
327
      pFillInfo->pLinearInfo->hasNext = false;
22✔
328
      pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
22✔
329
      calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
22✔
330
                       pFillSup->numOfAllCols);
331
      pFillInfo->pResRow = &pFillSup->prev;
22✔
332
      pFillInfo->pLinearInfo->winIndex = 0;
22✔
333
    } break;
22✔
334
    default:
×
335
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
336
      break;
×
337
  }
338
}
339

340
void copyNotFillExpData(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
750✔
341
  for (int32_t i = pFillSup->numOfFillCols; i < pFillSup->numOfAllCols; ++i) {
1,601✔
342
    SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
851✔
343
    int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
851✔
344
    SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
851✔
345
    SResultCellData* pCurCell = getResultCell(&pFillSup->cur, slotId);
851✔
346
    pCell->isNull = pCurCell->isNull;
851✔
347
    if (!pCurCell->isNull) {
851!
348
      memcpy(pCell->pData, pCurCell->pData, pCell->bytes);
851✔
349
    }
350
  }
351
}
750✔
352

353
void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillSupporter* pFillSup,
1,719✔
354
                      SStreamFillInfo* pFillInfo) {
355
  pFillInfo->preRowKey = pFillSup->cur.key;
1,719✔
356
  if (!hasPrevWindow(pFillSup) && !hasNextWindow(pFillSup)) {
1,719✔
357
    pFillInfo->needFill = false;
305✔
358
    pFillInfo->pos = FILL_POS_START;
305✔
359
    return;
305✔
360
  }
361
  TSKEY prevWKey = INT64_MIN;
1,414✔
362
  TSKEY nextWKey = INT64_MIN;
1,414✔
363
  if (hasPrevWindow(pFillSup)) {
1,414✔
364
    prevWKey = pFillSup->prev.key;
896✔
365
  }
366
  if (hasNextWindow(pFillSup)) {
1,414✔
367
    nextWKey = pFillSup->next.key;
797✔
368
  }
369

370
  pFillInfo->needFill = true;
1,414✔
371
  pFillInfo->pos = FILL_POS_INVALID;
1,414✔
372
  switch (pFillInfo->type) {
1,414!
373
    case TSDB_FILL_NULL:
604✔
374
    case TSDB_FILL_NULL_F:
375
    case TSDB_FILL_SET_VALUE:
376
    case TSDB_FILL_SET_VALUE_F: {
377
      if (pFillSup->prev.key == pFillInfo->preRowKey) {
604!
378
        resetFillWindow(&pFillSup->prev);
×
379
      }
380
      if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) {
604✔
381
        if (pFillSup->next.key == pFillInfo->nextRowKey) {
137✔
382
          pFillInfo->preRowKey = INT64_MIN;
132✔
383
          setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
132✔
384
          pFillInfo->pos = FILL_POS_END;
132✔
385
        } else {
386
          pFillInfo->needFill = false;
5✔
387
          pFillInfo->pos = FILL_POS_START;
5✔
388
        }
389
      } else if (hasPrevWindow(pFillSup)) {
467✔
390
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
263✔
391
        pFillInfo->pos = FILL_POS_END;
263✔
392
      } else {
393
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
204✔
394
        pFillInfo->pos = FILL_POS_START;
204✔
395
      }
396
      copyNotFillExpData(pFillSup, pFillInfo);
604✔
397
    } break;
604✔
398
    case TSDB_FILL_PREV: {
284✔
399
      if (hasNextWindow(pFillSup) && ((pFillSup->next.key != pFillInfo->nextRowKey) ||
284✔
400
                                      (pFillSup->next.key == pFillInfo->nextRowKey && hasNextNextWindow(pFillSup)) ||
118!
401
                                      (pFillSup->next.key == pFillInfo->nextRowKey && !hasPrevWindow(pFillSup)))) {
84!
402
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
106✔
403
        pFillInfo->pos = FILL_POS_START;
106✔
404
        resetFillWindow(&pFillSup->prev);
106✔
405
        pFillSup->prev.key = pFillSup->cur.key;
106✔
406
        pFillSup->prev.pRowVal = pFillSup->cur.pRowVal;
106✔
407
      } else if (hasPrevWindow(pFillSup)) {
178!
408
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
178✔
409
        pFillInfo->pos = FILL_POS_END;
178✔
410
        pFillInfo->preRowKey = INT64_MIN;
178✔
411
      }
412
      pFillInfo->pResRow = &pFillSup->prev;
284✔
413
    } break;
284✔
414
    case TSDB_FILL_NEXT: {
282✔
415
      if (hasPrevWindow(pFillSup)) {
282✔
416
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
182✔
417
        pFillInfo->pos = FILL_POS_END;
182✔
418
        resetFillWindow(&pFillSup->next);
182✔
419
        pFillSup->next.key = pFillSup->cur.key;
182✔
420
        pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
182✔
421
        pFillInfo->preRowKey = INT64_MIN;
182✔
422
      } else {
423
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
100✔
424
        pFillInfo->pos = FILL_POS_START;
100✔
425
      }
426
      pFillInfo->pResRow = &pFillSup->next;
282✔
427
    } break;
282✔
428
    case TSDB_FILL_LINEAR: {
244✔
429
      pFillInfo->pLinearInfo->winIndex = 0;
244✔
430
      if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) {
244✔
431
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
36✔
432
        pFillInfo->pos = FILL_POS_MID;
36✔
433
        pFillInfo->pLinearInfo->nextEnd = nextWKey;
36✔
434
        calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
36✔
435
                         pFillSup->numOfAllCols);
436
        pFillInfo->pResRow = &pFillSup->prev;
36✔
437

438
        calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo,
36✔
439
                         pFillSup->numOfAllCols);
440
        pFillInfo->pLinearInfo->hasNext = true;
36✔
441
      } else if (hasPrevWindow(pFillSup)) {
208✔
442
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
95✔
443
        pFillInfo->pos = FILL_POS_END;
95✔
444
        pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
95✔
445
        calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
95✔
446
                         pFillSup->numOfAllCols);
447
        pFillInfo->pResRow = &pFillSup->prev;
95✔
448
        pFillInfo->pLinearInfo->hasNext = false;
95✔
449
      } else {
450
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
113✔
451
        pFillInfo->pos = FILL_POS_START;
113✔
452
        pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
113✔
453
        calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
113✔
454
                         pFillSup->numOfAllCols);
455
        pFillInfo->pResRow = &pFillSup->cur;
113✔
456
        pFillInfo->pLinearInfo->hasNext = false;
113✔
457
      }
458
    } break;
244✔
459
    default:
×
460
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
461
      break;
×
462
  }
463
}
464

465
int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes) {
51,611✔
466
  int32_t code = TSDB_CODE_SUCCESS;
51,611✔
467
  int32_t lino = 0;
51,611✔
468
  SWinKey key = {.groupId = groupId, .ts = ts};
51,611✔
469
  if (tSimpleHashGet(pFillSup->pResMap, &key, sizeof(SWinKey)) != NULL) {
51,611✔
470
    (*pRes) = false;
113✔
471
    goto _end;
113✔
472
  }
473
  code = tSimpleHashPut(pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0);
51,940✔
474
  QUERY_CHECK_CODE(code, lino, _end);
51,921!
475
  (*pRes) = true;
51,921✔
476

477
_end:
52,034✔
478
  if (code != TSDB_CODE_SUCCESS) {
52,034!
479
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
480
  }
481
  return code;
52,034✔
482
}
483

484
static int32_t buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock,
37,454✔
485
                               bool* pRes) {
486
  int32_t code = TSDB_CODE_SUCCESS;
37,454✔
487
  int32_t lino = 0;
37,454✔
488
  if (pBlock->info.rows >= pBlock->info.capacity) {
37,454✔
489
    (*pRes) = false;
7✔
490
    goto _end;
7✔
491
  }
492
  uint64_t groupId = pBlock->info.id.groupId;
37,447✔
493
  bool     ckRes = true;
37,447✔
494
  code = checkResult(pFillSup, ts, groupId, &ckRes);
37,447✔
495
  QUERY_CHECK_CODE(code, lino, _end);
37,447!
496

497
  if (pFillSup->hasDelete && !ckRes) {
37,447✔
498
    (*pRes) = true;
28✔
499
    goto _end;
28✔
500
  }
501
  for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
167,633✔
502
    SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
130,189✔
503
    int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
130,189✔
504
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
130,189✔
505
    SFillInfo        tmpInfo = {
130,198✔
506
               .currentKey = ts,
507
               .order = TSDB_ORDER_ASC,
508
               .interval = pFillSup->interval,
509
    };
510
    bool filled = fillIfWindowPseudoColumn(&tmpInfo, pFillCol, pColData, pBlock->info.rows);
130,198✔
511
    if (!filled) {
130,519✔
512
      SResultCellData* pCell = getResultCell(pResRow, slotId);
93,263✔
513
      code = setRowCell(pColData, pBlock->info.rows, pCell);
92,877✔
514
      QUERY_CHECK_CODE(code, lino, _end);
92,958!
515
    }
516
  }
517
  pBlock->info.rows++;
37,444✔
518
  (*pRes) = true;
37,444✔
519

520
_end:
37,479✔
521
  if (code != TSDB_CODE_SUCCESS) {
37,479!
522
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
523
  }
524
  return code;
37,455✔
525
}
526

527
bool hasRemainCalc(SStreamFillInfo* pFillInfo) {
55,609✔
528
  if (pFillInfo->current != INT64_MIN && pFillInfo->current <= pFillInfo->end) {
55,609✔
529
    return true;
48,666✔
530
  }
531
  return false;
6,943✔
532
}
533

534
static void doStreamFillNormal(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
1,215✔
535
  int32_t code = TSDB_CODE_SUCCESS;
1,215✔
536
  int32_t lino = 0;
1,215✔
537
  while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
36,474✔
538
    STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
35,259✔
539
    if (inWinRange(&pFillSup->winRange, &st)) {
35,259!
540
      bool res = true;
35,259✔
541
      code = buildFillResult(pFillInfo->pResRow, pFillSup, pFillInfo->current, pBlock, &res);
35,259✔
542
      QUERY_CHECK_CODE(code, lino, _end);
35,259!
543
    }
544
    pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
35,259✔
545
                                     pFillSup->interval.precision);
35,259✔
546
  }
547

548
_end:
1,216✔
549
  if (code != TSDB_CODE_SUCCESS) {
1,216!
550
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
551
  }
552
}
1,216✔
553

554
static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
294✔
555
  int32_t code = TSDB_CODE_SUCCESS;
294✔
556
  int32_t lino = 0;
294✔
557
  while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
14,918✔
558
    uint64_t    groupId = pBlock->info.id.groupId;
14,624✔
559
    SWinKey     key = {.groupId = groupId, .ts = pFillInfo->current};
14,624✔
560
    STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
14,624✔
561
    bool        ckRes = true;
14,624✔
562
    code = checkResult(pFillSup, pFillInfo->current, groupId, &ckRes);
14,624✔
563
    QUERY_CHECK_CODE(code, lino, _end);
14,624!
564

565
    if ((pFillSup->hasDelete && !ckRes) || !inWinRange(&pFillSup->winRange, &st)) {
14,624!
566
      pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
16✔
567
                                       pFillSup->interval.precision);
8✔
568
      pFillInfo->pLinearInfo->winIndex++;
8✔
569
      continue;
8✔
570
    }
571
    pFillInfo->pLinearInfo->winIndex++;
14,616✔
572
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
51,197✔
573
      SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
36,570✔
574
      SFillInfo     tmp = {
36,570✔
575
              .currentKey = pFillInfo->current,
36,570✔
576
              .order = TSDB_ORDER_ASC,
577
              .interval = pFillSup->interval,
578
      };
579

580
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
36,570✔
581
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
36,570✔
582
      int16_t          type = pColData->info.type;
36,573✔
583
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
36,573✔
584
      int32_t          index = pBlock->info.rows;
36,492✔
585
      if (pFillCol->notFillCol) {
36,492✔
586
        bool filled = fillIfWindowPseudoColumn(&tmp, pFillCol, pColData, index);
14,616✔
587
        if (!filled) {
14,616✔
588
          code = setRowCell(pColData, index, pCell);
7✔
589
          QUERY_CHECK_CODE(code, lino, _end);
×
590
        }
591
      } else {
592
        if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) {
21,876!
593
          colDataSetNULL(pColData, index);
31!
594
          continue;
31✔
595
        }
596
        SPoint* pEnd = taosArrayGet(pFillInfo->pLinearInfo->pEndPoints, slotId);
21,845✔
597
        double  vCell = 0;
21,829✔
598
        SPoint  start = {0};
21,829✔
599
        start.key = pFillInfo->pResRow->key;
21,829✔
600
        start.val = pCell->pData;
21,829✔
601

602
        SPoint cur = {0};
21,829✔
603
        cur.key = pFillInfo->current;
21,829✔
604
        cur.val = taosMemoryCalloc(1, pCell->bytes);
21,829✔
605
        QUERY_CHECK_NULL(cur.val, code, lino, _end, terrno);
21,952!
606
        taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
21,952✔
607
        code = colDataSetVal(pColData, index, (const char*)cur.val, false);
21,912✔
608
        QUERY_CHECK_CODE(code, lino, _end);
21,865!
609
        destroySPoint(&cur);
21,865✔
610
      }
611
    }
612
    pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
29,243✔
613
                                     pFillSup->interval.precision);
14,627✔
614
    pBlock->info.rows++;
14,616✔
615
  }
616

617
_end:
294✔
618
  if (code != TSDB_CODE_SUCCESS) {
294!
619
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
620
  }
621
}
294✔
622

623
static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) {
1,719✔
624
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
1,719✔
625

626
  SWinKey key = {.groupId = groupId, .ts = pRow->key};
1,719✔
627
  int32_t code = pAPI->stateStore.streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len);
1,719✔
628
  qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 "  code:%d", key.ts, key.groupId, code);
1,717✔
629
  if (code != TSDB_CODE_SUCCESS) {
1,717!
630
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
631
  }
632
}
1,717✔
633

634
static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* pFillSup, SSDataBlock* pRes) {
1,783✔
635
  int32_t code = TSDB_CODE_SUCCESS;
1,783✔
636
  int32_t lino = 0;
1,783✔
637
  bool    res = false;
1,783✔
638
  if (pFillInfo->needFill == false) {
1,783✔
639
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
310✔
640
    QUERY_CHECK_CODE(code, lino, _end);
310!
641
    return;
310✔
642
  }
643

644
  if (pFillInfo->pos == FILL_POS_START) {
1,473✔
645
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
523✔
646
    QUERY_CHECK_CODE(code, lino, _end);
523!
647
    if (res) {
523!
648
      pFillInfo->pos = FILL_POS_INVALID;
523✔
649
    }
650
  }
651
  if (pFillInfo->type != TSDB_FILL_LINEAR) {
1,473✔
652
    doStreamFillNormal(pFillSup, pFillInfo, pRes);
1,216✔
653
  } else {
654
    doStreamFillLinear(pFillSup, pFillInfo, pRes);
257✔
655

656
    if (pFillInfo->pos == FILL_POS_MID) {
258✔
657
      code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
36✔
658
      QUERY_CHECK_CODE(code, lino, _end);
36!
659
      if (res) {
36!
660
        pFillInfo->pos = FILL_POS_INVALID;
36✔
661
      }
662
    }
663

664
    if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) {
258✔
665
      pFillInfo->pLinearInfo->hasNext = false;
36✔
666
      pFillInfo->pLinearInfo->winIndex = 0;
36✔
667
      taosArraySwap(pFillInfo->pLinearInfo->pEndPoints, pFillInfo->pLinearInfo->pNextEndPoints);
36✔
668
      pFillInfo->pResRow = &pFillSup->cur;
36✔
669
      setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo);
36✔
670
      doStreamFillLinear(pFillSup, pFillInfo, pRes);
36✔
671
    }
672
  }
673
  if (pFillInfo->pos == FILL_POS_END) {
1,474✔
674
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
857✔
675
    QUERY_CHECK_CODE(code, lino, _end);
857!
676
    if (res) {
857✔
677
      pFillInfo->pos = FILL_POS_INVALID;
850✔
678
    }
679
  }
680

681
_end:
624✔
682
  if (code != TSDB_CODE_SUCCESS) {
1,474!
683
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
684
  }
685
}
686

687
int32_t keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol,
1,719✔
688
                           int32_t rowId, uint64_t groupId, int32_t rowSize) {
689
  int32_t code = TSDB_CODE_SUCCESS;
1,719✔
690
  int32_t lino = 0;
1,719✔
691
  TSKEY ts = tsCol[rowId];
1,719✔
692
  pFillInfo->nextRowKey = ts;
1,719✔
693
  SResultRowData tmpNextRow = {.key = ts};
1,719✔
694
  tmpNextRow.pRowVal = taosMemoryCalloc(1, rowSize);
1,719✔
695
  QUERY_CHECK_NULL(tmpNextRow.pRowVal, code, lino, _end, terrno);
1,718!
696
  transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow);
1,718✔
697
  keepResultInDiscBuf(pOperator, groupId, &tmpNextRow, rowSize);
1,719✔
698
  taosMemoryFreeClear(tmpNextRow.pRowVal);
1,718!
699

700
_end:
×
701
  if (code != TSDB_CODE_SUCCESS) {
1,718!
702
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
703
  }
704
  return code;
1,718✔
705
}
706

707
static void doFillResults(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo,
1,716✔
708
                          SSDataBlock* pBlock, TSKEY* tsCol, int32_t rowId, SSDataBlock* pRes) {
709
  uint64_t groupId = pBlock->info.id.groupId;
1,716✔
710
  getWindowFromDiscBuf(pOperator, tsCol[rowId], groupId, pFillSup);
1,716✔
711
  if (pFillSup->prev.key == pFillInfo->preRowKey) {
1,719✔
712
    resetFillWindow(&pFillSup->prev);
823✔
713
  }
714
  setFillValueInfo(pBlock, tsCol[rowId], rowId, pFillSup, pFillInfo);
1,719✔
715
  doStreamFillRange(pFillInfo, pFillSup, pRes);
1,719✔
716
}
1,719✔
717

718
static void doStreamFillImpl(SOperatorInfo* pOperator) {
1,058✔
719
  int32_t                  code = TSDB_CODE_SUCCESS;
1,058✔
720
  int32_t                  lino = 0;
1,058✔
721
  SStreamFillOperatorInfo* pInfo = pOperator->info;
1,058✔
722
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,058✔
723
  SStreamFillSupporter*    pFillSup = pInfo->pFillSup;
1,058✔
724
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
1,058✔
725
  SSDataBlock*             pBlock = pInfo->pSrcBlock;
1,058✔
726
  uint64_t                 groupId = pBlock->info.id.groupId;
1,058✔
727
  SSDataBlock*             pRes = pInfo->pRes;
1,058✔
728
  SColumnInfoData*         pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
1,058✔
729
  TSKEY*                   tsCol = (TSKEY*)pTsCol->pData;
1,058✔
730
  pRes->info.id.groupId = groupId;
1,058✔
731
  pInfo->srcRowIndex++;
1,058✔
732

733
  if (pInfo->srcRowIndex == 0) {
1,058!
734
    code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
1,058✔
735
    QUERY_CHECK_CODE(code, lino, _end);
1,057!
736
    pInfo->srcRowIndex++;
1,057✔
737
  }
738

739
  while (pInfo->srcRowIndex < pBlock->info.rows) {
1,718✔
740
    code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
661✔
741
    QUERY_CHECK_CODE(code, lino, _end);
661!
742
    doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
661✔
743
    if (pInfo->pRes->info.rows == pInfo->pRes->info.capacity) {
661!
744
      code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
×
745
      QUERY_CHECK_CODE(code, lino, _end);
×
746
      return;
×
747
    }
748
    pInfo->srcRowIndex++;
661✔
749
  }
750
  doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
1,057✔
751
  code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
1,058✔
752
  QUERY_CHECK_CODE(code, lino, _end);
1,058!
753
  blockDataCleanup(pInfo->pSrcBlock);
1,058✔
754

755
_end:
1,058✔
756
  if (code != TSDB_CODE_SUCCESS) {
1,058!
757
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
758
  }
759
}
760

761
static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_t groupId, SSDataBlock* delRes) {
93✔
762
  int32_t          code = TSDB_CODE_SUCCESS;
93✔
763
  int32_t          lino = 0;
93✔
764
  SStorageAPI*     pAPI = &pOp->pTaskInfo->storageAPI;
93✔
765
  void*            pState = pOp->pTaskInfo->streamInfo.pState;
93✔
766
  SExecTaskInfo*   pTaskInfo = pOp->pTaskInfo;
93✔
767
  SSDataBlock*     pBlock = delRes;
93✔
768
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
93✔
769
  SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
93✔
770
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
93✔
771
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
93✔
772
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
93✔
773
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
93✔
774
  SColumnInfoData* pTbNameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
93✔
775
  code = colDataSetVal(pStartCol, pBlock->info.rows, (const char*)&start, false);
93✔
776
  QUERY_CHECK_CODE(code, lino, _end);
93!
777

778
  code = colDataSetVal(pEndCol, pBlock->info.rows, (const char*)&end, false);
93✔
779
  QUERY_CHECK_CODE(code, lino, _end);
93!
780

781
  colDataSetNULL(pUidCol, pBlock->info.rows);
93!
782
  code = colDataSetVal(pGroupCol, pBlock->info.rows, (const char*)&groupId, false);
93✔
783
  QUERY_CHECK_CODE(code, lino, _end);
93!
784

785
  colDataSetNULL(pCalStartCol, pBlock->info.rows);
93!
786
  colDataSetNULL(pCalEndCol, pBlock->info.rows);
93!
787

788
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
93✔
789

790
  void*   tbname = NULL;
93✔
791
  int32_t winCode = TSDB_CODE_SUCCESS;
93✔
792
  code = pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, groupId, &tbname, false, &winCode);
93✔
793
  QUERY_CHECK_CODE(code, lino, _end);
93!
794
  if (winCode != TSDB_CODE_SUCCESS) {
93✔
795
    colDataSetNULL(pTableCol, pBlock->info.rows);
15!
796
  } else {
797
    char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
798
    STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
78✔
799
    code = colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
78✔
800
    QUERY_CHECK_CODE(code, lino, _end);
78!
801
    pAPI->stateStore.streamStateFreeVal(tbname);
78✔
802
  }
803

804
  pBlock->info.rows++;
93✔
805

806
_end:
93✔
807
  if (code != TSDB_CODE_SUCCESS) {
93!
808
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
809
  }
810
  return code;
93✔
811
}
812

813
static int32_t buildDeleteResult(SOperatorInfo* pOperator, TSKEY startTs, TSKEY endTs, uint64_t groupId,
93✔
814
                                 SSDataBlock* delRes) {
815
  int32_t                  code = TSDB_CODE_SUCCESS;
93✔
816
  int32_t                  lino = 0;
93✔
817
  SStreamFillOperatorInfo* pInfo = pOperator->info;
93✔
818
  SStreamFillSupporter*    pFillSup = pInfo->pFillSup;
93✔
819
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
93✔
820
  if (hasPrevWindow(pFillSup)) {
93✔
821
    TSKEY start = getNextWindowTs(pFillSup->prev.key, &pFillSup->interval);
58✔
822
    code = buildDeleteRange(pOperator, start, endTs, groupId, delRes);
58✔
823
    QUERY_CHECK_CODE(code, lino, _end);
58!
824
  } else if (hasNextWindow(pFillSup)) {
35✔
825
    TSKEY end = getPrevWindowTs(pFillSup->next.key, &pFillSup->interval);
30✔
826
    code = buildDeleteRange(pOperator, startTs, end, groupId, delRes);
30✔
827
    QUERY_CHECK_CODE(code, lino, _end);
30!
828
  } else {
829
    code = buildDeleteRange(pOperator, startTs, endTs, groupId, delRes);
5✔
830
    QUERY_CHECK_CODE(code, lino, _end);
5!
831
  }
832

833
_end:
5✔
834
  if (code != TSDB_CODE_SUCCESS) {
93!
835
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
836
  }
837
  return code;
93✔
838
}
839

840
static int32_t doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKEY endTs, uint64_t groupId) {
148✔
841
  int32_t                  code = TSDB_CODE_SUCCESS;
148✔
842
  int32_t                  lino = 0;
148✔
843
  SStorageAPI*             pAPI = &pOperator->pTaskInfo->storageAPI;
148✔
844
  SStreamFillOperatorInfo* pInfo = pOperator->info;
148✔
845
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
148✔
846
  getWindowFromDiscBuf(pOperator, startTs, groupId, pInfo->pFillSup);
148✔
847
  setDeleteFillValueInfo(startTs, endTs, pInfo->pFillSup, pInfo->pFillInfo);
148✔
848
  SWinKey key = {.ts = startTs, .groupId = groupId};
148✔
849
  pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
148✔
850
  if (!pInfo->pFillInfo->needFill) {
147✔
851
    code = buildDeleteResult(pOperator, startTs, endTs, groupId, pInfo->pDelRes);
93✔
852
    QUERY_CHECK_CODE(code, lino, _end);
93!
853
  } else {
854
    STimeRange tw = {
54✔
855
        .skey = startTs,
856
        .ekey = endTs,
857
        .groupId = groupId,
858
    };
859
    void* tmp = taosArrayPush(pInfo->pFillInfo->delRanges, &tw);
54✔
860
    if (!tmp) {
54!
861
      code = terrno;
×
862
      QUERY_CHECK_CODE(code, lino, _end);
×
863
    }
864
  }
865

866
_end:
147✔
867
  if (code != TSDB_CODE_SUCCESS) {
147!
868
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
869
  }
870
  return code;
148✔
871
}
872

873
static void getWindowInfoByKey(SStorageAPI* pAPI, void* pState, TSKEY ts, int64_t groupId, SResultRowData* pWinData) {
×
874
  SWinKey key = {.ts = ts, .groupId = groupId};
×
875
  void*   val = NULL;
×
876
  int32_t len = 0;
×
877
  int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&val, &len, NULL);
×
878
  if (code != TSDB_CODE_SUCCESS) {
×
879
    qDebug("get window info by key failed, Data may be deleted, try next window. ts:%" PRId64 ", groupId:%" PRId64, ts,
×
880
           groupId);
881
    SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key);
×
882
    code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &key, (const void**)&val, &len);
×
883
    pAPI->stateStore.streamStateFreeCur(pCur);
×
884
    qDebug("get window info by key ts:%" PRId64 ", groupId:%" PRId64 ", res%d", ts, groupId, code);
×
885
  }
886

887
  if (code == TSDB_CODE_SUCCESS) {
×
888
    resetFillWindow(pWinData);
×
889
    pWinData->key = key.ts;
×
890
    pWinData->pRowVal = val;
×
891
  }
892
}
×
893

894
static void doDeleteFillFinalize(SOperatorInfo* pOperator) {
3,562✔
895
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
3,562✔
896

897
  SStreamFillOperatorInfo* pInfo = pOperator->info;
3,562✔
898
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
3,562✔
899
  int32_t                  size = taosArrayGetSize(pFillInfo->delRanges);
3,562✔
900
  while (pFillInfo->delIndex < size) {
3,622✔
901
    STimeRange* range = taosArrayGet(pFillInfo->delRanges, pFillInfo->delIndex);
55✔
902
    if (pInfo->pRes->info.id.groupId != 0 && pInfo->pRes->info.id.groupId != range->groupId) {
55!
903
      return;
×
904
    }
905
    getWindowFromDiscBuf(pOperator, range->skey, range->groupId, pInfo->pFillSup);
55✔
906
    TSKEY realEnd = range->ekey + 1;
55✔
907
    if (pInfo->pFillInfo->type == TSDB_FILL_NEXT && pInfo->pFillSup->next.key != realEnd) {
55!
908
      getWindowInfoByKey(pAPI, pOperator->pTaskInfo->streamInfo.pState, realEnd, range->groupId,
×
909
                         &pInfo->pFillSup->next);
×
910
    }
911
    setDeleteFillValueInfo(range->skey, range->ekey, pInfo->pFillSup, pInfo->pFillInfo);
55✔
912
    pFillInfo->delIndex++;
55✔
913
    if (pInfo->pFillInfo->needFill) {
55!
914
      doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
55✔
915
      pInfo->pRes->info.id.groupId = range->groupId;
55✔
916
    }
917
  }
918
}
919

920
static int32_t doDeleteFillResult(SOperatorInfo* pOperator) {
138✔
921
  int32_t                  code = TSDB_CODE_SUCCESS;
138✔
922
  int32_t                  lino = 0;
138✔
923
  SStorageAPI*             pAPI = &pOperator->pTaskInfo->storageAPI;
138✔
924
  SStreamFillOperatorInfo* pInfo = pOperator->info;
138✔
925
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
138✔
926
  SSDataBlock*             pBlock = pInfo->pSrcDelBlock;
138✔
927
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
138✔
928

929
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
138✔
930
  TSKEY*           tsStarts = (TSKEY*)pStartCol->pData;
138✔
931
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
138✔
932
  uint64_t*        groupIds = (uint64_t*)pGroupCol->pData;
138✔
933
  while (pInfo->srcDelRowIndex < pBlock->info.rows) {
349✔
934
    TSKEY            ts = tsStarts[pInfo->srcDelRowIndex];
211✔
935
    TSKEY            endTs = ts;
211✔
936
    uint64_t         groupId = groupIds[pInfo->srcDelRowIndex];
211✔
937
    SWinKey          key = {.ts = ts, .groupId = groupId};
211✔
938
    SStreamStateCur* pCur = pAPI->stateStore.streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &key);
211✔
939

940
    if (!pCur) {
211✔
941
      pInfo->srcDelRowIndex++;
63✔
942
      continue;
63✔
943
    }
944

945
    SWinKey nextKey = {.groupId = groupId, .ts = ts};
148✔
946
    while (pInfo->srcDelRowIndex < pBlock->info.rows) {
418✔
947
      TSKEY    delTs = tsStarts[pInfo->srcDelRowIndex];
343✔
948
      uint64_t delGroupId = groupIds[pInfo->srcDelRowIndex];
343✔
949
      int32_t  winCode = TSDB_CODE_SUCCESS;
343✔
950
      if (groupId != delGroupId) {
343!
951
        break;
73✔
952
      }
953
      if (delTs > nextKey.ts) {
343✔
954
        break;
10✔
955
      }
956

957
      SWinKey delKey = {.groupId = delGroupId, .ts = delTs};
333✔
958
      if (delTs == nextKey.ts) {
333✔
959
        pAPI->stateStore.streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur);
252✔
960
        winCode = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextKey, NULL, NULL);
252✔
961
        // ts will be deleted later
962
        if (delTs != ts) {
252✔
963
          pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &delKey);
104✔
964
          pAPI->stateStore.streamStateFreeCur(pCur);
102✔
965
          pCur = pAPI->stateStore.streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &nextKey);
104✔
966
        }
967
        endTs = TMAX(delTs, nextKey.ts - 1);
252✔
968
        if (winCode != TSDB_CODE_SUCCESS) {
252✔
969
          break;
63✔
970
        }
971
      }
972
      pInfo->srcDelRowIndex++;
270✔
973
    }
974

975
    pAPI->stateStore.streamStateFreeCur(pCur);
148✔
976
    code = doDeleteFillResultImpl(pOperator, ts, endTs, groupId);
148✔
977
    QUERY_CHECK_CODE(code, lino, _end);
148!
978
  }
979

980
  pFillInfo->current = pFillInfo->end + 1;
138✔
981

982
_end:
138✔
983
  if (code != TSDB_CODE_SUCCESS) {
138!
984
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
985
  }
986
  return code;
138✔
987
}
988

989
void resetStreamFillSup(SStreamFillSupporter* pFillSup) {
4,098✔
990
  tSimpleHashClear(pFillSup->pResMap);
4,098✔
991
  pFillSup->hasDelete = false;
4,104✔
992
}
4,104✔
993
void resetStreamFillInfo(SStreamFillOperatorInfo* pInfo) {
3,514✔
994
  resetStreamFillSup(pInfo->pFillSup);
3,514✔
995
  taosArrayClear(pInfo->pFillInfo->delRanges);
3,520✔
996
  pInfo->pFillInfo->delIndex = 0;
3,521✔
997
}
3,521✔
998

999
static int32_t doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pSrcBlock,
1,162✔
1000
                                              SSDataBlock* pDstBlock) {
1001
  int32_t                  code = TSDB_CODE_SUCCESS;
1,162✔
1002
  int32_t                  lino = 0;
1,162✔
1003
  SStreamFillOperatorInfo* pInfo = pOperator->info;
1,162✔
1004
  SExprSupp*               pSup = &pOperator->exprSupp;
1,162✔
1005
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,162✔
1006

1007
  blockDataCleanup(pDstBlock);
1,162✔
1008
  code = blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows);
1,163✔
1009
  QUERY_CHECK_CODE(code, lino, _end);
1,163!
1010

1011
  code = setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
1,163✔
1012
  QUERY_CHECK_CODE(code, lino, _end);
1,163!
1013
  code = projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
1,163✔
1014
  QUERY_CHECK_CODE(code, lino, _end);
1,163!
1015

1016
  pDstBlock->info.rows = 0;
1,163✔
1017
  pSup = &pInfo->pFillSup->notFillExprSup;
1,163✔
1018
  code = setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
1,163✔
1019
  QUERY_CHECK_CODE(code, lino, _end);
1,163!
1020
  code = projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
1,163✔
1021
  QUERY_CHECK_CODE(code, lino, _end);
1,163!
1022

1023
  pDstBlock->info.id.groupId = pSrcBlock->info.id.groupId;
1,163✔
1024

1025
  code = blockDataUpdateTsWindow(pDstBlock, pInfo->primaryTsCol);
1,163✔
1026

1027
_end:
1,163✔
1028
  if (code != TSDB_CODE_SUCCESS) {
1,163!
1029
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1030
  }
1031
  return code;
1,163✔
1032
}
1033

1034
static int32_t doStreamFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
4,872✔
1035
  int32_t                  code = TSDB_CODE_SUCCESS;
4,872✔
1036
  int32_t                  lino = 0;
4,872✔
1037
  SStreamFillOperatorInfo* pInfo = pOperator->info;
4,872✔
1038
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
4,872✔
1039

1040
  if (pOperator->status == OP_EXEC_DONE) {
4,872!
1041
    (*ppRes) = NULL;
×
1042
    return code;
×
1043
  }
1044
  blockDataCleanup(pInfo->pRes);
4,872✔
1045
  if (hasRemainCalc(pInfo->pFillInfo) ||
4,867✔
1046
      (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) {
4,860✔
1047
    doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
10✔
1048
    if (pInfo->pRes->info.rows > 0) {
10!
1049
      printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
10✔
1050
      (*ppRes) = pInfo->pRes;
10✔
1051
      return code;
10✔
1052
    }
1053
  }
1054
  if (pOperator->status == OP_RES_TO_RETURN) {
4,857✔
1055
    doDeleteFillFinalize(pOperator);
46✔
1056
    if (pInfo->pRes->info.rows > 0) {
46!
1057
      printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
1058
      (*ppRes) = pInfo->pRes;
×
1059
      return code;
×
1060
    }
1061
    setOperatorCompleted(pOperator);
46✔
1062
    resetStreamFillInfo(pInfo);
46✔
1063
    (*ppRes) = NULL;
46✔
1064
    return code;
46✔
1065
  }
1066

1067
  SSDataBlock*   fillResult = NULL;
4,811✔
1068
  SOperatorInfo* downstream = pOperator->pDownstream[0];
4,811✔
1069
  while (1) {
1070
    if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows || pInfo->pSrcBlock->info.rows == 0) {
4,854!
1071
      // If there are delete datablocks, we receive  them first.
1072
      SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
4,854✔
1073
      if (pBlock == NULL) {
4,855✔
1074
        pOperator->status = OP_RES_TO_RETURN;
3,520✔
1075
        pInfo->pFillInfo->preRowKey = INT64_MIN;
3,520✔
1076
        if (pInfo->pRes->info.rows > 0) {
3,520!
1077
          printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
1078
          (*ppRes) = pInfo->pRes;
×
1079
          return code;
×
1080
        }
1081
        break;
3,520✔
1082
      }
1083
      printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
1,335✔
1084

1085
      if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) {
1,338✔
1086
        pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId;
298✔
1087
        pInfo->pFillInfo->preRowKey = INT64_MIN;
298✔
1088
      }
1089

1090
      pInfo->pFillSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
1,338✔
1091
      if (pInfo->pFillSup->winRange.ekey <= 0) {
1,338!
1092
        pInfo->pFillSup->winRange.ekey = INT64_MAX;
×
1093
      }
1094

1095
      switch (pBlock->info.type) {
1,338!
1096
        case STREAM_RETRIEVE:
5✔
1097
          (*ppRes) = pBlock;
5✔
1098
          return code;
5✔
1099
        case STREAM_DELETE_RESULT: {
138✔
1100
          pInfo->pSrcDelBlock = pBlock;
138✔
1101
          pInfo->srcDelRowIndex = 0;
138✔
1102
          blockDataCleanup(pInfo->pDelRes);
138✔
1103
          pInfo->pFillSup->hasDelete = true;
138✔
1104
          code = doDeleteFillResult(pOperator);
138✔
1105
          QUERY_CHECK_CODE(code, lino, _end);
137!
1106

1107
          if (pInfo->pDelRes->info.rows > 0) {
137✔
1108
            printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
93✔
1109
            (*ppRes) = pInfo->pDelRes;
93✔
1110
            return code;
93✔
1111
          }
1112
          continue;
44✔
1113
        } break;
1114
        case STREAM_NORMAL:
1,057✔
1115
        case STREAM_INVALID:
1116
        case STREAM_PULL_DATA: {
1117
          code = doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
1,057✔
1118
          QUERY_CHECK_CODE(code, lino, _end);
1,058!
1119

1120
          memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
1,058✔
1121
          pInfo->srcRowIndex = -1;
1,058✔
1122
        } break;
1,058✔
1123
        case STREAM_CHECKPOINT:
138✔
1124
        case STREAM_CREATE_CHILD_TABLE: {
1125
          (*ppRes) = pBlock;
138✔
1126
          return code;
138✔
1127
        } break;
1128
        default:
×
1129
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1130
      }
1131
    }
1132

1133
    doStreamFillImpl(pOperator);
1,058✔
1134
    code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
1,058✔
1135
    QUERY_CHECK_CODE(code, lino, _end);
1,056!
1136

1137
    memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
1,056✔
1138
    pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
1,056✔
1139
    if (pInfo->pRes->info.rows > 0) {
1,056!
1140
      break;
1,057✔
1141
    }
1142
  }
1143
  if (pOperator->status == OP_RES_TO_RETURN) {
4,577✔
1144
    doDeleteFillFinalize(pOperator);
3,517✔
1145
  }
1146

1147
  if (pInfo->pRes->info.rows == 0) {
4,580✔
1148
    setOperatorCompleted(pOperator);
3,477✔
1149
    resetStreamFillInfo(pInfo);
3,473✔
1150
    (*ppRes) = NULL;
3,475✔
1151
    return code;
3,475✔
1152
  }
1153

1154
  pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
1,103✔
1155
  printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
1,103✔
1156
  (*ppRes) = pInfo->pRes;
1,104✔
1157
  return code;
1,104✔
1158

1159
_end:
×
1160
  if (code != TSDB_CODE_SUCCESS) {
×
1161
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1162
    pTaskInfo->code = code;
×
1163
    T_LONG_JMP(pTaskInfo->env, code);
×
1164
  }
1165
  setOperatorCompleted(pOperator);
×
1166
  resetStreamFillInfo(pInfo);
×
1167
  (*ppRes) = NULL;
×
1168
  return code;
×
1169
}
1170

1171
static void resetForceFillWindow(SResultRowData* pRowData) {
471✔
1172
  pRowData->key = INT64_MIN;
471✔
1173
  pRowData->pRowVal = NULL;
471✔
1174
}
471✔
1175

1176
void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup,
394✔
1177
                                SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
1178
  int32_t code = TSDB_CODE_SUCCESS;
394✔
1179
  int32_t lino = 0;
394✔
1180

1181
  SStreamFillOperatorInfo* pInfo = pOperator->info;
394✔
1182
  bool                     res = false;
394✔
1183
  int32_t                  numOfRows = getNumOfTotalRes(pGroupResInfo);
394✔
1184
  for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) {
865✔
1185
    SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index);
614✔
1186
    if (pBlock->info.id.groupId == 0) {
614✔
1187
      pBlock->info.id.groupId = pKey->groupId;
394✔
1188
    } else if (pBlock->info.id.groupId != pKey->groupId) {
220✔
1189
      break;
143✔
1190
    }
1191

1192
    SRowBuffPos* pValPos = NULL;
471✔
1193
    int32_t      len = 0;
471✔
1194
    int32_t      winCode = TSDB_CODE_SUCCESS;
471✔
1195
    code = pInfo->stateStore.streamStateFillGet(pInfo->pState, pKey, (void**)&pValPos, &len, &winCode);
471✔
1196
    QUERY_CHECK_CODE(code, lino, _end);
471!
1197
    qDebug("===stream=== build force fill res. key:%" PRId64 ",groupId:%" PRId64".res:%d", pKey->ts, pKey->groupId, winCode);
471!
1198
    if (winCode == TSDB_CODE_SUCCESS) {
471✔
1199
      pFillSup->cur.key = pKey->ts;
167✔
1200
      pFillSup->cur.pRowVal = pValPos->pRowBuff;
167✔
1201
      code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res);
167✔
1202
      QUERY_CHECK_CODE(code, lino, _end);
167!
1203
      resetForceFillWindow(&pFillSup->cur);
167✔
1204
      releaseOutputBuf(pInfo->pState, pValPos, &pInfo->stateStore);
167✔
1205
    } else {
1206
      SWinKey      preKey = {.ts = INT64_MIN, .groupId = pKey->groupId};
304✔
1207
      SRowBuffPos* prePos = NULL;
304✔
1208
      int32_t      preVLen = 0;
304✔
1209
      code = pInfo->stateStore.streamStateFillGetPrev(pInfo->pState, pKey, &preKey,
304✔
1210
                                                      (void**)&prePos, &preVLen, &winCode);
1211
      QUERY_CHECK_CODE(code, lino, _end);
304!
1212
      if (winCode == TSDB_CODE_SUCCESS) {
304!
1213
        pFillSup->cur.key = pKey->ts;
304✔
1214
        pFillSup->cur.pRowVal = prePos->pRowBuff;
304✔
1215
        if (pFillInfo->type == TSDB_FILL_PREV) {
304✔
1216
          code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res);
158✔
1217
          QUERY_CHECK_CODE(code, lino, _end);
158!
1218
        } else {
1219
          copyNotFillExpData(pFillSup, pFillInfo);
146✔
1220
          pFillInfo->pResRow->key = pKey->ts;
146✔
1221
          code = buildFillResult(pFillInfo->pResRow, pFillSup, pKey->ts, pBlock, &res);
146✔
1222
          QUERY_CHECK_CODE(code, lino, _end);
146!
1223
        }
1224
        resetForceFillWindow(&pFillSup->cur);
304✔
1225
      }
1226
      releaseOutputBuf(pInfo->pState, prePos, &pInfo->stateStore);
304✔
1227
    }
1228
  }
1229

1230
_end:
251✔
1231
  if (code != TSDB_CODE_SUCCESS) {
394!
1232
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1233
  }
1234
}
394✔
1235

1236
void doBuildForceFillResult(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo,
1,226✔
1237
                            SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
1238
  blockDataCleanup(pBlock);
1,226✔
1239
  if (!hasRemainResults(pGroupResInfo)) {
1,226✔
1240
    return;
832✔
1241
  }
1242

1243
  // clear the existed group id
1244
  pBlock->info.id.groupId = 0;
394✔
1245
  doBuildForceFillResultImpl(pOperator, pFillSup, pFillInfo, pBlock, pGroupResInfo);
394✔
1246
}
1247

1248
static int32_t buildForceFillResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,226✔
1249
  int32_t                  code = TSDB_CODE_SUCCESS;
1,226✔
1250
  int32_t                  lino = 0;
1,226✔
1251
  SStreamFillOperatorInfo* pInfo = pOperator->info;
1,226✔
1252
  uint16_t                 opType = pOperator->operatorType;
1,226✔
1253
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,226✔
1254

1255
  doBuildForceFillResult(pOperator, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo);
1,226✔
1256
  if (pInfo->pRes->info.rows != 0) {
1,226✔
1257
    printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
394✔
1258
    (*ppRes) = pInfo->pRes;
394✔
1259
    goto _end;
394✔
1260
  }
1261

1262
  (*ppRes) = NULL;
832✔
1263

1264
_end:
1,226✔
1265
  if (code != TSDB_CODE_SUCCESS) {
1,226!
1266
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1267
  }
1268
  return code;
1,226✔
1269
}
1270

1271
static void keepResultInStateBuf(SStreamFillOperatorInfo* pInfo, uint64_t groupId, SResultRowData* pRow) {
167✔
1272
  int32_t code = TSDB_CODE_SUCCESS;
167✔
1273
  int32_t lino = 0;
167✔
1274

1275
  SWinKey      key = {.groupId = groupId, .ts = pRow->key};
167✔
1276
  int32_t      curVLen = 0;
167✔
1277
  SRowBuffPos* pStatePos = NULL;
167✔
1278
  int32_t      winCode = TSDB_CODE_SUCCESS;
167✔
1279
  code = pInfo->stateStore.streamStateFillAddIfNotExist(pInfo->pState, &key, (void**)&pStatePos,
167✔
1280
                                                        &curVLen, &winCode);
1281
  QUERY_CHECK_CODE(code, lino, _end);
167!
1282
  memcpy(pStatePos->pRowBuff, pRow->pRowVal, pInfo->pFillSup->rowSize);
167✔
1283
  qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 "  code:%d", key.ts, key.groupId, code);
167!
1284

1285
_end:
×
1286
  if (code != TSDB_CODE_SUCCESS) {
167!
1287
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1288
  }
1289
}
167✔
1290

1291
int32_t keepBlockRowInStateBuf(SStreamFillOperatorInfo* pInfo, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol,
167✔
1292
                               int32_t rowId, uint64_t groupId, int32_t rowSize) {
1293
  int32_t code = TSDB_CODE_SUCCESS;
167✔
1294
  int32_t lino = 0;
167✔
1295
  TSKEY ts = tsCol[rowId];
167✔
1296
  pFillInfo->nextRowKey = ts;
167✔
1297
  TAOS_MEMSET(pFillInfo->pTempBuff, 0, rowSize);
167✔
1298
  SResultRowData tmpNextRow = {.key = ts, .pRowVal = pFillInfo->pTempBuff};
167✔
1299

1300
  transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow);
167✔
1301
  keepResultInStateBuf(pInfo, groupId, &tmpNextRow);
167✔
1302

1303
_end:
167✔
1304
  if (code != TSDB_CODE_SUCCESS) {
167!
1305
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1306
  }
1307
  return code;
167✔
1308
}
1309

1310
// force window close impl
1311
static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) {
105✔
1312
  int32_t                  code = TSDB_CODE_SUCCESS;
105✔
1313
  int32_t                  lino = 0;
105✔
1314
  SStreamFillOperatorInfo* pInfo = pOperator->info;
105✔
1315
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
105✔
1316
  SStreamFillSupporter*    pFillSup = pInfo->pFillSup;
105✔
1317
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
105✔
1318
  SSDataBlock*             pBlock = pInfo->pSrcBlock;
105✔
1319
  uint64_t                 groupId = pBlock->info.id.groupId;
105✔
1320
  SColumnInfoData*         pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
105✔
1321
  TSKEY*                   tsCol = (TSKEY*)pTsCol->pData;
105✔
1322
  for (int32_t i = 0; i < pBlock->info.rows; i++){
272✔
1323
    code = keepBlockRowInStateBuf(pInfo, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize);
167✔
1324
    QUERY_CHECK_CODE(code, lino, _end);
167!
1325

1326
    int32_t size =  taosArrayGetSize(pInfo->pCloseTs);
167✔
1327
    if (size > 0) {
167!
1328
      TSKEY* pTs = (TSKEY*) taosArrayGet(pInfo->pCloseTs, 0);
167✔
1329
      TSKEY  resTs = tsCol[i];
167✔
1330
      while (resTs < (*pTs)) {
216✔
1331
        SWinKey key = {.groupId = groupId, .ts = resTs};
77✔
1332
        void* pPushRes = taosArrayPush(pInfo->pUpdated, &key);
77✔
1333
        QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
77!
1334

1335
        if (IS_FILL_CONST_VALUE(pFillSup->type)) {
77!
1336
          break;
1337
        }
1338
        resTs = taosTimeAdd(resTs, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
49✔
1339
                            pFillSup->interval.precision);
49✔
1340
      }
1341
    }
1342
  }
1343
  code = pInfo->stateStore.streamStateGroupPut(pInfo->pState, groupId, NULL, 0);
105✔
1344
  QUERY_CHECK_CODE(code, lino, _end);
105!
1345

1346
_end:
105✔
1347
  if (code != TSDB_CODE_SUCCESS) {
105!
1348
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1349
  }
1350
  return code;
105✔
1351
}
1352

1353
int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY ts, SArray* pUpdated) {
1,404✔
1354
  int32_t          code = TSDB_CODE_SUCCESS;
1,404✔
1355
  int32_t          lino = 0;
1,404✔
1356
  int64_t          groupId = 0;
1,404✔
1357
  SStreamStateCur* pCur = pStateStore->streamStateGroupGetCur(pState);
1,404✔
1358
  while (1) {  
694✔
1359
    int32_t winCode = pStateStore->streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL);
2,098✔
1360
    if (winCode != TSDB_CODE_SUCCESS) {
2,098✔
1361
      break;
1,404✔
1362
    }
1363
    SWinKey key = {.ts = ts, .groupId = groupId};
694✔
1364
    void* pPushRes = taosArrayPush(pUpdated, &key);
694✔
1365
    QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
694!
1366

1367
    pStateStore->streamStateGroupCurNext(pCur);
694✔
1368
  }
1369
  pStateStore->streamStateFreeCur(pCur);
1,404✔
1370
  pCur = NULL;
1,404✔
1371

1372
_end:
1,404✔
1373
  if (code != TSDB_CODE_SUCCESS) {
1,404!
1374
    pStateStore->streamStateFreeCur(pCur);
×
1375
    pCur = NULL;
×
1376
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1377
  }
1378
  return code;
1,404✔
1379
}
1380

1381
static void removeDuplicateResult(SArray* pTsArrray, __compar_fn_t fn) {
832✔
1382
  taosArraySort(pTsArrray, fn);
832✔
1383
  taosArrayRemoveDuplicate(pTsArrray, fn, NULL);
832✔
1384
}
832✔
1385

1386
// force window close
1387
static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,241✔
1388
  int32_t                  code = TSDB_CODE_SUCCESS;
1,241✔
1389
  int32_t                  lino = 0;
1,241✔
1390
  SStreamFillOperatorInfo* pInfo = pOperator->info;
1,241✔
1391
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,241✔
1392

1393
  if (pOperator->status == OP_EXEC_DONE) {
1,241!
1394
    (*ppRes) = NULL;
×
1395
    return code;
×
1396
  }
1397

1398
  if (pOperator->status == OP_RES_TO_RETURN) {
1,241✔
1399
    SSDataBlock* resBlock = NULL;
394✔
1400
    code = buildForceFillResult(pOperator, &resBlock);
394✔
1401
    QUERY_CHECK_CODE(code, lino, _end);
394!
1402

1403
    if (resBlock != NULL) {
394✔
1404
      (*ppRes) = resBlock;
143✔
1405
      goto _end;
143✔
1406
    }
1407

1408
    pInfo->stateStore.streamStateClearExpiredState(pInfo->pState);
251✔
1409
    setStreamOperatorCompleted(pOperator);
251✔
1410
    (*ppRes) = NULL;
251✔
1411
    goto _end;
251✔
1412
  }
1413

1414
  SSDataBlock*   fillResult = NULL;
847✔
1415
  SOperatorInfo* downstream = pOperator->pDownstream[0];
847✔
1416
  while (1) {
925✔
1417
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,772✔
1418
    if (pBlock == NULL) {
1,772✔
1419
      pOperator->status = OP_RES_TO_RETURN;
832✔
1420
      qDebug("===stream===return data:%s.", getStreamOpName(pOperator->operatorType));
832!
1421
      break;
832✔
1422
    }
1423
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
940✔
1424
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
940✔
1425

1426
    switch (pBlock->info.type) {
940!
1427
      case STREAM_NORMAL:
105✔
1428
      case STREAM_INVALID: {
1429
        code = doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
105✔
1430
        QUERY_CHECK_CODE(code, lino, _end);
105!
1431

1432
        memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
105✔
1433
        pInfo->srcRowIndex = -1;
105✔
1434
      } break;
105✔
1435
      case STREAM_CHECKPOINT: {
12✔
1436
        pInfo->stateStore.streamStateCommit(pInfo->pState);
12✔
1437
        (*ppRes) = pBlock;
12✔
1438
        goto _end;
12✔
1439
      } break;
1440
      case STREAM_CREATE_CHILD_TABLE: {
3✔
1441
        (*ppRes) = pBlock;
3✔
1442
        goto _end;
3✔
1443
      } break;
1444
      case STREAM_GET_RESULT: {
1,640✔
1445
        void* pPushRes = taosArrayPush(pInfo->pCloseTs, &pBlock->info.window.skey);
820✔
1446
        QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
820!
1447
        continue;
820✔
1448
      }
1449
      default:
×
1450
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1451
        QUERY_CHECK_CODE(code, lino, _end);
×
1452
    }
1453

1454
    code = doStreamForceFillImpl(pOperator);
105✔
1455
    QUERY_CHECK_CODE(code, lino, _end);
105!
1456
  }
1457

1458
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) {
1,652✔
1459
    TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i);
820✔
1460
    code = buildAllResultKey(&pInfo->stateStore, pInfo->pState, ts, pInfo->pUpdated);
820✔
1461
    QUERY_CHECK_CODE(code, lino, _end);
820!
1462
  }
1463
  taosArrayClear(pInfo->pCloseTs);
832✔
1464
  removeDuplicateResult(pInfo->pUpdated, winKeyCmprImpl);
832✔
1465

1466
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
832✔
1467
  pInfo->groupResInfo.freeItem = false;
832✔
1468

1469
  pInfo->pUpdated = taosArrayInit(1024, sizeof(SWinKey));
832✔
1470
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
832!
1471

1472
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
832✔
1473
  QUERY_CHECK_CODE(code, lino, _end);
832!
1474

1475
  code = buildForceFillResult(pOperator, ppRes);
832✔
1476
  QUERY_CHECK_CODE(code, lino, _end);
832!
1477

1478
  if ((*ppRes) == NULL) {
832✔
1479
    pInfo->stateStore.streamStateClearExpiredState(pInfo->pState);
581✔
1480
    setStreamOperatorCompleted(pOperator);
581✔
1481
  }
1482

1483
_end:
251✔
1484
  if (code != TSDB_CODE_SUCCESS) {
1,241!
1485
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1486
    pTaskInfo->code = code;
×
1487
  }
1488
  return code;
1,241✔
1489
}
1490

1491
static int32_t initResultBuf(SSDataBlock* pInputRes, SStreamFillSupporter* pFillSup) {
441✔
1492
  int32_t numOfCols = taosArrayGetSize(pInputRes->pDataBlock);
441✔
1493
  pFillSup->rowSize = sizeof(SResultCellData) * numOfCols;
441✔
1494
  for (int i = 0; i < numOfCols; i++) {
7,680✔
1495
    SColumnInfoData* pCol = taosArrayGet(pInputRes->pDataBlock, i);
7,239✔
1496
    pFillSup->rowSize += pCol->info.bytes;
7,239✔
1497
  }
1498
  pFillSup->next.key = INT64_MIN;
441✔
1499
  pFillSup->nextNext.key = INT64_MIN;
441✔
1500
  pFillSup->prev.key = INT64_MIN;
441✔
1501
  pFillSup->cur.key = INT64_MIN;
441✔
1502
  pFillSup->next.pRowVal = NULL;
441✔
1503
  pFillSup->nextNext.pRowVal = NULL;
441✔
1504
  pFillSup->prev.pRowVal = NULL;
441✔
1505
  pFillSup->cur.pRowVal = NULL;
441✔
1506

1507
  return TSDB_CODE_SUCCESS;
441✔
1508
}
1509

1510
static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNode, SInterval* pInterval,
441✔
1511
                                               SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI, SSDataBlock* pInputRes) {
1512
  int32_t               code = TSDB_CODE_SUCCESS;
441✔
1513
  int32_t               lino = 0;
441✔
1514
  SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
441✔
1515
  if (!pFillSup) {
441!
1516
    code = terrno;
×
1517
    QUERY_CHECK_CODE(code, lino, _end);
×
1518
  }
1519
  pFillSup->numOfFillCols = numOfFillCols;
441✔
1520
  int32_t    numOfNotFillCols = 0;
441✔
1521
  SExprInfo* noFillExprInfo = NULL;
441✔
1522

1523
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExprInfo, &numOfNotFillCols);
441✔
1524
  QUERY_CHECK_CODE(code, lino, _end);
441!
1525

1526
  pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols,
882✔
1527
                                            NULL, 0, (const SNodeListNode*)(pPhyFillNode->pValues));
441✔
1528
  if (pFillSup->pAllColInfo == NULL) {
441!
1529
    code = terrno;
×
1530
    lino = __LINE__;
×
1531
    destroyExprInfo(noFillExprInfo, numOfNotFillCols);
×
1532
    goto _end;
×
1533
  }
1534

1535
  pFillSup->type = convertFillType(pPhyFillNode->mode);
441✔
1536
  pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols;
441✔
1537
  pFillSup->interval = *pInterval;
441✔
1538
  pFillSup->pAPI = pAPI;
441✔
1539

1540
  code = initResultBuf(pInputRes, pFillSup);
441✔
1541
  QUERY_CHECK_CODE(code, lino, _end);
441!
1542

1543
  SExprInfo* noFillExpr = NULL;
441✔
1544
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExpr, &numOfNotFillCols);
441✔
1545
  QUERY_CHECK_CODE(code, lino, _end);
441!
1546

1547
  code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols, &pAPI->functionStore);
441✔
1548
  QUERY_CHECK_CODE(code, lino, _end);
441!
1549

1550
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
441✔
1551
  pFillSup->pResMap = tSimpleHashInit(16, hashFn);
441✔
1552
  QUERY_CHECK_NULL(pFillSup->pResMap, code, lino, _end, terrno);
441!
1553
  pFillSup->hasDelete = false;
441✔
1554

1555
_end:
441✔
1556
  if (code != TSDB_CODE_SUCCESS) {
441!
1557
    destroyStreamFillSupporter(pFillSup);
×
1558
    pFillSup = NULL;
×
1559
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1560
  }
1561
  return pFillSup;
441✔
1562
}
1563

1564
SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pRes) {
468✔
1565
  int32_t          code = TSDB_CODE_SUCCESS;
468✔
1566
  int32_t          lino = 0;
468✔
1567
  SStreamFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SStreamFillInfo));
468✔
1568
  if (!pFillInfo) {
468!
1569
    code = terrno;
×
1570
    QUERY_CHECK_CODE(code, lino, _end);
×
1571
  }
1572

1573
  pFillInfo->start = INT64_MIN;
468✔
1574
  pFillInfo->current = INT64_MIN;
468✔
1575
  pFillInfo->end = INT64_MIN;
468✔
1576
  pFillInfo->preRowKey = INT64_MIN;
468✔
1577
  pFillInfo->needFill = false;
468✔
1578
  pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo));
468✔
1579
  if (!pFillInfo) {
468!
1580
    code = terrno;
×
1581
    QUERY_CHECK_CODE(code, lino, _end);
×
1582
  }
1583

1584
  pFillInfo->pLinearInfo->hasNext = false;
468✔
1585
  pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
468✔
1586
  pFillInfo->pLinearInfo->pEndPoints = NULL;
468✔
1587
  pFillInfo->pLinearInfo->pNextEndPoints = NULL;
468✔
1588
  if (pFillSup->type == TSDB_FILL_LINEAR) {
468✔
1589
    pFillInfo->pLinearInfo->pEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
80✔
1590
    if (!pFillInfo->pLinearInfo->pEndPoints) {
80!
1591
      code = terrno;
×
1592
      QUERY_CHECK_CODE(code, lino, _end);
×
1593
    }
1594

1595
    pFillInfo->pLinearInfo->pNextEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
80✔
1596
    if (!pFillInfo->pLinearInfo->pNextEndPoints) {
80!
1597
      code = terrno;
×
1598
      QUERY_CHECK_CODE(code, lino, _end);
×
1599
    }
1600

1601
    for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
1,399✔
1602
      SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
1,319✔
1603
      if (pColData == NULL) {
1,319!
1604
        SPoint dummy = {0};
×
1605
        dummy.val = taosMemoryCalloc(1, 1);
×
1606
        void* tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &dummy);
×
1607
        QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
×
1608

1609
        dummy.val = taosMemoryCalloc(1, 1);
×
1610
        tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &dummy);
×
1611
        QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
×
1612

1613
        continue;
×
1614
      }
1615
      SPoint value = {0};
1,319✔
1616
      value.val = taosMemoryCalloc(1, pColData->info.bytes);
1,319✔
1617
      QUERY_CHECK_NULL(value.val, code, lino, _end, terrno);
1,319!
1618

1619
      void* tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &value);
1,319✔
1620
      QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
1,319!
1621

1622
      value.val = taosMemoryCalloc(1, pColData->info.bytes);
1,319✔
1623
      QUERY_CHECK_NULL(value.val, code, lino, _end, terrno);
1,319!
1624

1625
      tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &value);
1,319✔
1626
      QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
1,319!
1627
    }
1628
  }
1629
  pFillInfo->pLinearInfo->winIndex = 0;
468✔
1630

1631
  pFillInfo->pNonFillRow = NULL;
468✔
1632
  pFillInfo->pResRow = NULL;
468✔
1633
  if (pFillSup->type == TSDB_FILL_SET_VALUE || pFillSup->type == TSDB_FILL_SET_VALUE_F ||
468✔
1634
      pFillSup->type == TSDB_FILL_NULL || pFillSup->type == TSDB_FILL_NULL_F) {
375✔
1635
    pFillInfo->pResRow = taosMemoryCalloc(1, sizeof(SResultRowData));
215✔
1636
    QUERY_CHECK_NULL(pFillInfo->pResRow, code, lino, _end, terrno);
215!
1637

1638
    pFillInfo->pResRow->key = INT64_MIN;
215✔
1639
    pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
215✔
1640
    QUERY_CHECK_NULL(pFillInfo->pResRow->pRowVal, code, lino, _end, terrno);
215!
1641

1642
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
3,553✔
1643
      SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
3,338✔
1644
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, i);
3,338✔
1645
      if (pColData == NULL) {
3,338✔
1646
        pCell->bytes = 1;
6✔
1647
        pCell->type = 4;
6✔
1648
        continue;
6✔
1649
      }
1650
      pCell->bytes = pColData->info.bytes;
3,332✔
1651
      pCell->type = pColData->info.type;
3,332✔
1652
    }
1653

1654
    int32_t numOfResCol = taosArrayGetSize(pRes->pDataBlock);
215✔
1655
    if (numOfResCol < pFillSup->numOfAllCols) {
215✔
1656
      int32_t* pTmpBuf = (int32_t*)taosMemoryRealloc(pFillSup->pOffsetInfo, pFillSup->numOfAllCols * sizeof(int32_t));
6✔
1657
      QUERY_CHECK_NULL(pTmpBuf, code, lino, _end, terrno);
6!
1658
      pFillSup->pOffsetInfo = pTmpBuf;
6✔
1659

1660
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, numOfResCol - 1);
6✔
1661
      int32_t preLength = pFillSup->pOffsetInfo[numOfResCol - 1] + pCell->bytes + sizeof(SResultCellData);
6✔
1662
      for (int32_t i = numOfResCol; i < pFillSup->numOfAllCols; i++) {
12✔
1663
        pFillSup->pOffsetInfo[i] = preLength;
6✔
1664
        pCell = getResultCell(pFillInfo->pResRow, i);
6✔
1665
        preLength += pCell->bytes + sizeof(SResultCellData);
6✔
1666
      }
1667
    }
1668

1669
    pFillInfo->pNonFillRow = taosMemoryCalloc(1, sizeof(SResultRowData));
215✔
1670
    QUERY_CHECK_NULL(pFillInfo->pNonFillRow, code, lino, _end, terrno);
215!
1671
    pFillInfo->pNonFillRow->key = INT64_MIN;
215✔
1672
    pFillInfo->pNonFillRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
215✔
1673
    memcpy(pFillInfo->pNonFillRow->pRowVal, pFillInfo->pResRow->pRowVal, pFillSup->rowSize);
215✔
1674
  }
1675

1676
  pFillInfo->type = pFillSup->type;
468✔
1677
  pFillInfo->delRanges = taosArrayInit(16, sizeof(STimeRange));
468✔
1678
  if (!pFillInfo->delRanges) {
468!
1679
    code = terrno;
×
1680
    QUERY_CHECK_CODE(code, lino, _end);
×
1681
  }
1682

1683
  pFillInfo->delIndex = 0;
468✔
1684
  pFillInfo->curGroupId = 0;
468✔
1685
  pFillInfo->hasNext = false;
468✔
1686
  pFillInfo->pTempBuff = taosMemoryCalloc(1, pFillSup->rowSize);
468✔
1687
  return pFillInfo;
468✔
1688

1689
_end:
×
1690
  if (code != TSDB_CODE_SUCCESS) {
×
1691
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1692
  }
1693
  destroyStreamFillInfo(pFillInfo);
×
1694
  return NULL;
×
1695
}
1696

1697
static void setValueForFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
441✔
1698
  if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F) {
441✔
1699
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
1,425✔
1700
      SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
1,342✔
1701
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
1,342✔
1702
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
1,342✔
1703
      SVariant*        pVar = &(pFillCol->fillVal);
1,342✔
1704
      if (pCell->type == TSDB_DATA_TYPE_FLOAT) {
1,342!
1705
        float v = 0;
×
1706
        GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
×
1707
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
×
1708
      } else if (IS_FLOAT_TYPE(pCell->type)) {
1,790!
1709
        double v = 0;
448✔
1710
        GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
448!
1711
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
448!
1712
      } else if (IS_INTEGER_TYPE(pCell->type)) {
1,705!
1713
        int64_t v = 0;
811✔
1714
        GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
811!
1715
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
811!
1716
      } else {
1717
        pCell->isNull = true;
83✔
1718
      }
1719
    }
1720
  } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
358✔
1721
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
2,062✔
1722
      SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
1,942✔
1723
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
1,942✔
1724
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
1,942✔
1725
      pCell->isNull = true;
1,942✔
1726
    }
1727
  }
1728
}
441✔
1729

1730
int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInterval* pInterval) {
441✔
1731
  int32_t code = TSDB_CODE_SUCCESS;
441✔
1732
  int32_t lino = 0;
441✔
1733
  if (IS_NORMAL_INTERVAL_OP(downstream)) {
441✔
1734
    SStreamIntervalOperatorInfo* pInfo = downstream->info;
407✔
1735
    *triggerType = pInfo->twAggSup.calTrigger;
407✔
1736
    *pInterval = pInfo->interval;
407✔
1737
  } else if (IS_CONTINUE_INTERVAL_OP(downstream)) {
34!
1738
    SStreamIntervalSliceOperatorInfo* pInfo = downstream->info;
34✔
1739
    *triggerType = pInfo->twAggSup.calTrigger;
34✔
1740
    *pInterval = pInfo->interval;
34✔
1741
    pInfo->hasFill = true;
34✔
1742
  } else {
1743
    code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1744
  }
1745
  QUERY_CHECK_CODE(code, lino, _end);
441!
1746
  
1747
_end:
441✔
1748
  if (code != TSDB_CODE_SUCCESS) {
441!
1749
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1750
  }
1751
  return code;
441✔
1752
}
1753

1754
int32_t initFillOperatorStateBuff(SStreamFillOperatorInfo* pInfo, SStreamState* pState, SStateStore* pStore,
34✔
1755
                                  SReadHandle* pHandle, const char* taskIdStr, SStorageAPI* pApi) {
1756
  int32_t code = TSDB_CODE_SUCCESS;
34✔
1757
  int32_t lino = 0;
34✔
1758

1759
  pInfo->stateStore = *pStore;
34✔
1760
  pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
34✔
1761
  QUERY_CHECK_NULL(pInfo->pState, code, lino, _end, terrno);
34!
1762

1763
  *(pInfo->pState) = *pState;
34✔
1764
  pInfo->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsCol);
34✔
1765
  code = pInfo->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->pFillSup->rowSize, 0, compareTs,
34✔
1766
                                               pInfo->pState, INT64_MAX, taskIdStr, pHandle->checkpointId,
34✔
1767
                                               STREAM_STATE_BUFF_HASH_SORT, &pInfo->pState->pFileState);
34✔
1768
  QUERY_CHECK_CODE(code, lino, _end);
34!
1769

1770
_end:
34✔
1771
  if (code != TSDB_CODE_SUCCESS) {
34!
1772
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1773
  }
1774
  return code;
34✔
1775
}
1776

1777
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode,
441✔
1778
                                     SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
1779
  QRY_PARAM_CHECK(pOptrInfo);
441!
1780

1781
  int32_t                  code = TSDB_CODE_SUCCESS;
441✔
1782
  int32_t                  lino = 0;
441✔
1783
  SStreamFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFillOperatorInfo));
441✔
1784
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
441✔
1785
  if (pInfo == NULL || pOperator == NULL) {
441!
1786
    code = terrno;
×
1787
    QUERY_CHECK_CODE(code, lino, _error);
×
1788
  }
1789

1790
  int32_t    numOfFillCols = 0;
441✔
1791
  SExprInfo* pFillExprInfo = NULL;
441✔
1792

1793
  code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols);
441✔
1794
  QUERY_CHECK_CODE(code, lino, _error);
441!
1795

1796
  code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore);
441✔
1797
  QUERY_CHECK_CODE(code, lino, _error);
441!
1798

1799
  pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
441✔
1800
  QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno);
441!
1801

1802
  int8_t triggerType = 0;
441✔
1803
  SInterval interval = {0};
441✔
1804
  code = getDownStreamInfo(downstream, &triggerType, &interval);
441✔
1805
  QUERY_CHECK_CODE(code, lino, _error);
441!
1806

1807
  pInfo->pFillSup = initStreamFillSup(pPhyFillNode, &interval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI,
441✔
1808
                                      pInfo->pSrcBlock);
1809
  if (!pInfo->pFillSup) {
441!
1810
    code = TSDB_CODE_FAILED;
×
1811
    QUERY_CHECK_CODE(code, lino, _error);
×
1812
  }
1813

1814
  initResultSizeInfo(&pOperator->resultInfo, 4096);
441✔
1815
  pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
441✔
1816
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
441!
1817

1818
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
441✔
1819
  QUERY_CHECK_CODE(code, lino, _error);
441!
1820

1821
  code = blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity);
441✔
1822
  QUERY_CHECK_CODE(code, lino, _error);
441!
1823

1824
  pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pInfo->pRes);
441✔
1825
  if (!pInfo->pFillInfo) {
441!
1826
    goto _error;
×
1827
  }
1828

1829
  setValueForFillInfo(pInfo->pFillSup, pInfo->pFillInfo);
441✔
1830

1831
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
441✔
1832
  QUERY_CHECK_CODE(code, lino, _error);
441!
1833

1834
  code = blockDataEnsureCapacity(pInfo->pDelRes, pOperator->resultInfo.capacity);
441✔
1835
  QUERY_CHECK_CODE(code, lino, _error);
441!
1836

1837
  pInfo->pUpdated = taosArrayInit(1024, sizeof(SWinKey));
441✔
1838
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _error, terrno);
441!
1839

1840
  pInfo->pCloseTs = taosArrayInit(1024, sizeof(TSKEY));
441✔
1841
  QUERY_CHECK_NULL(pInfo->pCloseTs, code, lino, _error, terrno);
441!
1842

1843
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
441✔
1844
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
441✔
1845

1846
  int32_t numOfOutputCols = 0;
441✔
1847
  code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
441✔
1848
                             COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
1849
  QUERY_CHECK_CODE(code, lino, _error);
441!
1850

1851
  code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
441✔
1852
  QUERY_CHECK_CODE(code, lino, _error);
441!
1853

1854
  pInfo->srcRowIndex = -1;
441✔
1855
  setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
441✔
1856
                  pTaskInfo);
1857

1858
  if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
441✔
1859
    code = initFillOperatorStateBuff(pInfo, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.stateStore, pHandle,
34✔
1860
                              GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
34✔
1861
    QUERY_CHECK_CODE(code, lino, _error);
34!
1862
    pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo,
34✔
1863
                                           optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1864
  } else {
1865
    pInfo->pState = NULL;
407✔
1866
    pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo,
407✔
1867
                                           optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1868
  }
1869
  setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
441✔
1870

1871
  code = appendDownstream(pOperator, &downstream, 1);
441✔
1872
  QUERY_CHECK_CODE(code, lino, _error);
441!
1873

1874
  *pOptrInfo = pOperator;
441✔
1875
  return TSDB_CODE_SUCCESS;
441✔
1876

1877
_error:
×
1878
  qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1879

1880
  if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo);
×
1881
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1882
  pTaskInfo->code = code;
×
1883
  return code;
×
1884
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc