• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3543

29 Nov 2024 02:58AM UTC coverage: 60.842% (+0.02%) from 60.819%
#3543

push

travis-ci

web-flow
Merge pull request #28973 from taosdata/merge/mainto3.0

merge: from main to 3.0

120460 of 253224 branches covered (47.57%)

Branch coverage included in aggregate %.

706 of 908 new or added lines in 18 files covered. (77.75%)

2401 existing lines in 137 files now uncovered.

201633 of 276172 relevant lines covered (73.01%)

19045673.23 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

75.79
/source/libs/executor/src/streamfilloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "os.h"
18
#include "query.h"
19
#include "taosdef.h"
20
#include "tmsg.h"
21
#include "ttypes.h"
22

23
#include "executorInt.h"
24
#include "streamexecutorInt.h"
25
#include "tcommon.h"
26
#include "thash.h"
27
#include "ttime.h"
28

29
#include "function.h"
30
#include "operator.h"
31
#include "querynodes.h"
32
#include "querytask.h"
33
#include "tdatablock.h"
34
#include "tfill.h"
35

36
#define FILL_POS_INVALID 0
37
#define FILL_POS_START   1
38
#define FILL_POS_MID     2
39
#define FILL_POS_END     3
40

41
typedef struct STimeRange {
42
  TSKEY    skey;
43
  TSKEY    ekey;
44
  uint64_t groupId;
45
} STimeRange;
46

47
TSKEY getNextWindowTs(TSKEY ts, SInterval* pInterval) {
58✔
48
  STimeWindow win = {.skey = ts, .ekey = ts};
58✔
49
  getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
58✔
50
  return win.skey;
58✔
51
}
52

53
TSKEY getPrevWindowTs(TSKEY ts, SInterval* pInterval) {
30✔
54
  STimeWindow win = {.skey = ts, .ekey = ts};
30✔
55
  getNextTimeWindow(pInterval, &win, TSDB_ORDER_DESC);
30✔
56
  return win.skey;
30✔
57
}
58

59
int32_t setRowCell(SColumnInfoData* pCol, int32_t rowId, const SResultCellData* pCell) {
93,709✔
60
  return colDataSetVal(pCol, rowId, pCell->pData, pCell->isNull);
93,709✔
61
}
62

63
SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index) {
169,929✔
64
  if (!pRaw || !pRaw->pRowVal) {
169,929!
65
    return NULL;
×
66
  }
67
  char*            pData = (char*)pRaw->pRowVal;
169,995✔
68
  SResultCellData* pCell = pRaw->pRowVal;
169,995✔
69
  for (int32_t i = 0; i < index; i++) {
1,219,564✔
70
    pData += (pCell->bytes + sizeof(SResultCellData));
1,049,569✔
71
    pCell = (SResultCellData*)pData;
1,049,569✔
72
  }
73
  return pCell;
169,995✔
74
}
75

76
void* destroyFillColumnInfo(SFillColInfo* pFillCol, int32_t start, int32_t end) {
460✔
77
  for (int32_t i = start; i < end; i++) {
930✔
78
    destroyExprInfo(pFillCol[i].pExpr, 1);
470✔
79
    taosVariantDestroy(&pFillCol[i].fillVal);
470✔
80
  }
81
  if (start < end) {
460✔
82
    taosMemoryFreeClear(pFillCol[start].pExpr);
437!
83
  }
84
  taosMemoryFree(pFillCol);
460✔
85
  return NULL;
460✔
86
}
87

88
void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
460✔
89
  if (pFillSup == NULL) {
460!
90
    return;
×
91
  }
92
  pFillSup->pAllColInfo = destroyFillColumnInfo(pFillSup->pAllColInfo, pFillSup->numOfFillCols, pFillSup->numOfAllCols);
460✔
93
  tSimpleHashCleanup(pFillSup->pResMap);
460✔
94
  pFillSup->pResMap = NULL;
460✔
95
  cleanupExprSupp(&pFillSup->notFillExprSup);
460✔
96
  if (pFillSup->cur.pRowVal != pFillSup->prev.pRowVal && pFillSup->cur.pRowVal != pFillSup->next.pRowVal) {
460✔
97
    taosMemoryFree(pFillSup->cur.pRowVal);
165✔
98
  }
99
  taosMemoryFree(pFillSup->prev.pRowVal);
460✔
100
  taosMemoryFree(pFillSup->next.pRowVal);
460✔
101
  taosMemoryFree(pFillSup->nextNext.pRowVal);
460✔
102

103
  taosMemoryFree(pFillSup);
460✔
104
}
105

106
void destroySPoint(void* ptr) {
24,490✔
107
  SPoint* point = (SPoint*)ptr;
24,490✔
108
  taosMemoryFreeClear(point->val);
24,490✔
109
}
24,541✔
110

111
void destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) {
460✔
112
  taosArrayDestroyEx(pFillLinear->pEndPoints, destroySPoint);
460✔
113
  taosArrayDestroyEx(pFillLinear->pNextEndPoints, destroySPoint);
460✔
114
  taosMemoryFree(pFillLinear);
460✔
115
}
460✔
116

117
void destroyStreamFillInfo(SStreamFillInfo* pFillInfo) {
460✔
118
  if (pFillInfo == NULL) {
460!
119
    return;
×
120
  } 
121
  if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F ||
460✔
122
      pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
371✔
123
    taosMemoryFreeClear(pFillInfo->pResRow->pRowVal);
207!
124
    taosMemoryFreeClear(pFillInfo->pResRow);
207!
125
    taosMemoryFreeClear(pFillInfo->pNonFillRow->pRowVal);
207!
126
    taosMemoryFreeClear(pFillInfo->pNonFillRow);
207!
127
  }
128
  destroyStreamFillLinearInfo(pFillInfo->pLinearInfo);
460✔
129
  pFillInfo->pLinearInfo = NULL;
460✔
130

131
  taosArrayDestroy(pFillInfo->delRanges);
460✔
132
  taosMemoryFree(pFillInfo);
460✔
133
}
134

135
static void destroyStreamFillOperatorInfo(void* param) {
437✔
136
  SStreamFillOperatorInfo* pInfo = (SStreamFillOperatorInfo*)param;
437✔
137
  destroyStreamFillInfo(pInfo->pFillInfo);
437✔
138
  destroyStreamFillSupporter(pInfo->pFillSup);
437✔
139
  blockDataDestroy(pInfo->pRes);
437✔
140
  pInfo->pRes = NULL;
437✔
141
  blockDataDestroy(pInfo->pSrcBlock);
437✔
142
  pInfo->pSrcBlock = NULL;
437✔
143
  blockDataDestroy(pInfo->pDelRes);
437✔
144
  pInfo->pDelRes = NULL;
437✔
145
  taosArrayDestroy(pInfo->matchInfo.pList);
437✔
146
  pInfo->matchInfo.pList = NULL;
437✔
147
  taosArrayDestroy(pInfo->pUpdated);
437✔
148
  clearGroupResInfo(&pInfo->groupResInfo);
437✔
149
  taosArrayDestroy(pInfo->pCloseTs);
437✔
150

151
  taosMemoryFree(pInfo);
437✔
152
}
437✔
153

154
static void resetFillWindow(SResultRowData* pRowData) {
8,669✔
155
  pRowData->key = INT64_MIN;
8,669✔
156
  taosMemoryFreeClear(pRowData->pRowVal);
8,669✔
157
}
8,669✔
158

159
static void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup) {
1,907✔
160
  if (pFillSup->cur.pRowVal != pFillSup->prev.pRowVal && pFillSup->cur.pRowVal != pFillSup->next.pRowVal) {
1,907✔
161
    resetFillWindow(&pFillSup->cur);
1,451✔
162
  } else {
163
    pFillSup->cur.key = INT64_MIN;
456✔
164
    pFillSup->cur.pRowVal = NULL;
456✔
165
  }
166
  resetFillWindow(&pFillSup->prev);
1,906✔
167
  resetFillWindow(&pFillSup->next);
1,907✔
168
  resetFillWindow(&pFillSup->nextNext);
1,907✔
169
}
1,907✔
170

171
void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) {
1,907✔
172
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
1,907✔
173
  void*        pState = pOperator->pTaskInfo->streamInfo.pState;
1,907✔
174
  resetPrevAndNextWindow(pFillSup);
1,907✔
175

176
  SWinKey key = {.ts = ts, .groupId = groupId};
1,907✔
177
  void*   curVal = NULL;
1,907✔
178
  int32_t curVLen = 0;
1,907✔
179
  bool    hasCurKey = true;
1,907✔
180
  int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen, NULL);
1,907✔
181
  if (code == TSDB_CODE_SUCCESS) {
1,906✔
182
    pFillSup->cur.key = key.ts;
1,851✔
183
    pFillSup->cur.pRowVal = curVal;
1,851✔
184
  } else {
185
    qDebug("streamStateFillGet key failed, Data may be deleted. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId);
55✔
186
    pFillSup->cur.key = ts;
55✔
187
    pFillSup->cur.pRowVal = NULL;
55✔
188
    hasCurKey = false;
55✔
189
  }
190

191
  SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, &key);
1,906✔
192
  SWinKey          preKey = {.ts = INT64_MIN, .groupId = groupId};
1,907✔
193
  void*            preVal = NULL;
1,907✔
194
  int32_t          preVLen = 0;
1,907✔
195
  code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen);
1,907✔
196

197
  if (code == TSDB_CODE_SUCCESS) {
1,906✔
198
    pFillSup->prev.key = preKey.ts;
1,440✔
199
    pFillSup->prev.pRowVal = preVal;
1,440✔
200

201
    if (hasCurKey) {
1,440✔
202
      pAPI->stateStore.streamStateCurNext(pState, pCur);
1,385✔
203
    }
204

205
    pAPI->stateStore.streamStateCurNext(pState, pCur);
1,440✔
206
  } else {
207
    pAPI->stateStore.streamStateFreeCur(pCur);
466✔
208
    pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key);
467✔
209
  }
210

211
  SWinKey nextKey = {.ts = INT64_MIN, .groupId = groupId};
1,907✔
212
  void*   nextVal = NULL;
1,907✔
213
  int32_t nextVLen = 0;
1,907✔
214
  code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextVLen);
1,907✔
215
  if (code == TSDB_CODE_SUCCESS) {
1,907✔
216
    pFillSup->next.key = nextKey.ts;
898✔
217
    pFillSup->next.pRowVal = nextVal;
898✔
218
    if (pFillSup->type == TSDB_FILL_PREV || pFillSup->type == TSDB_FILL_NEXT) {
898✔
219
      pAPI->stateStore.streamStateCurNext(pState, pCur);
347✔
220
      SWinKey nextNextKey = {.groupId = groupId};
347✔
221
      void*   nextNextVal = NULL;
347✔
222
      int32_t nextNextVLen = 0;
347✔
223
      code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextNextKey, (const void**)&nextNextVal, &nextNextVLen);
347✔
224
      if (code == TSDB_CODE_SUCCESS) {
347✔
225
        pFillSup->nextNext.key = nextNextKey.ts;
147✔
226
        pFillSup->nextNext.pRowVal = nextNextVal;
147✔
227
      }
228
    }
229
  }
230
  pAPI->stateStore.streamStateFreeCur(pCur);
1,907✔
231
}
1,907✔
232

233
bool hasCurWindow(SStreamFillSupporter* pFillSup) { return pFillSup->cur.key != INT64_MIN; }
×
234
bool hasPrevWindow(SStreamFillSupporter* pFillSup) { return pFillSup->prev.key != INT64_MIN; }
6,038✔
235
bool hasNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->next.key != INT64_MIN; }
3,693✔
236
static bool hasNextNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->nextNext.key != INT64_MIN; }
110✔
237

238
static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal) {
1,869✔
239
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1,869✔
240
  for (int32_t i = 0; i < numOfCols; ++i) {
31,007✔
241
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
29,155✔
242
    SResultCellData* pCell = getResultCell(pRowVal, i);
29,151✔
243
    if (!colDataIsNull_s(pColData, rowId)) {
58,276!
244
      pCell->isNull = false;
29,144✔
245
      pCell->type = pColData->info.type;
29,144✔
246
      pCell->bytes = pColData->info.bytes;
29,144✔
247
      char* val = colDataGetData(pColData, rowId);
29,144!
248
      if (IS_VAR_DATA_TYPE(pCell->type)) {
29,144!
249
        memcpy(pCell->pData, val, varDataTLen(val));
37✔
250
      } else {
251
        memcpy(pCell->pData, val, pCell->bytes);
29,107✔
252
      }
253
    } else {
254
      pCell->isNull = true;
×
255
    }
256
  }
257
  pRowVal->key = ts;
1,852✔
258
}
1,852✔
259

260
static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) {
299✔
261
  for (int32_t i = 0; i < numOfCol; i++) {
5,080✔
262
    if (!pFillCol[i].notFillCol) {
4,782✔
263
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol + i);
4,484✔
264
      SResultCellData* pECell = getResultCell(pEndRow, slotId);
4,484✔
265
      SPoint*          pPoint = taosArrayGet(pEndPoins, slotId);
4,484✔
266
      pPoint->key = pEndRow->key;
4,483✔
267
      memcpy(pPoint->val, pECell->pData, pECell->bytes);
4,483✔
268
    }
269
  }
270
}
298✔
271

272
static void setFillInfoStart(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
1,446✔
273
  ts = taosTimeAdd(ts, pInterval->sliding, pInterval->slidingUnit, pInterval->precision);
1,446✔
274
  pFillInfo->start = ts;
1,446✔
275
}
1,446✔
276

277
static void setFillInfoEnd(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
1,446✔
278
  ts = taosTimeAdd(ts, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision);
1,446✔
279
  pFillInfo->end = ts;
1,446✔
280
}
1,446✔
281

282
static void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
1,446✔
283
  setFillInfoStart(start, pInterval, pFillInfo);
1,446✔
284
  pFillInfo->current = pFillInfo->start;
1,446✔
285
  setFillInfoEnd(end, pInterval, pFillInfo);
1,446✔
286
}
1,446✔
287

288
void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
203✔
289
  if (!hasPrevWindow(pFillSup) || !hasNextWindow(pFillSup)) {
203✔
290
    pFillInfo->needFill = false;
93✔
291
    return;
93✔
292
  }
293

294
  TSKEY realStart = taosTimeAdd(pFillSup->prev.key, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
110✔
295
                                pFillSup->interval.precision);
110✔
296

297
  pFillInfo->needFill = true;
110✔
298
  pFillInfo->start = realStart;
110✔
299
  pFillInfo->current = pFillInfo->start;
110✔
300
  pFillInfo->end = end;
110✔
301
  pFillInfo->pos = FILL_POS_INVALID;
110✔
302
  switch (pFillInfo->type) {
110!
303
    case TSDB_FILL_NULL:
44✔
304
    case TSDB_FILL_NULL_F:
305
    case TSDB_FILL_SET_VALUE:
306
    case TSDB_FILL_SET_VALUE_F:
307
      break;
44✔
308
    case TSDB_FILL_PREV:
22✔
309
      pFillInfo->pResRow = &pFillSup->prev;
22✔
310
      break;
22✔
311
    case TSDB_FILL_NEXT:
22✔
312
      pFillInfo->pResRow = &pFillSup->next;
22✔
313
      break;
22✔
314
    case TSDB_FILL_LINEAR: {
22✔
315
      setFillKeyInfo(pFillSup->prev.key, pFillSup->next.key, &pFillSup->interval, pFillInfo);
22✔
316
      pFillInfo->pLinearInfo->hasNext = false;
22✔
317
      pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
22✔
318
      calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
22✔
319
                       pFillSup->numOfAllCols);
320
      pFillInfo->pResRow = &pFillSup->prev;
22✔
321
      pFillInfo->pLinearInfo->winIndex = 0;
22✔
322
    } break;
22✔
323
    default:
×
324
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
325
      break;
×
326
  }
327
}
328

329
void copyNotFillExpData(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
697✔
330
  for (int32_t i = pFillSup->numOfFillCols; i < pFillSup->numOfAllCols; ++i) {
1,473✔
331
    SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
776✔
332
    int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
776✔
333
    SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
776✔
334
    SResultCellData* pCurCell = getResultCell(&pFillSup->cur, slotId);
776✔
335
    pCell->isNull = pCurCell->isNull;
776✔
336
    if (!pCurCell->isNull) {
776!
337
      memcpy(pCell->pData, pCurCell->pData, pCell->bytes);
776✔
338
    }
339
  }
340
}
697✔
341

342
void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillSupporter* pFillSup,
1,704✔
343
                      SStreamFillInfo* pFillInfo) {
344
  pFillInfo->preRowKey = pFillSup->cur.key;
1,704✔
345
  if (!hasPrevWindow(pFillSup) && !hasNextWindow(pFillSup)) {
1,704✔
346
    pFillInfo->needFill = false;
302✔
347
    pFillInfo->pos = FILL_POS_START;
302✔
348
    return;
302✔
349
  }
350
  TSKEY prevWKey = INT64_MIN;
1,402✔
351
  TSKEY nextWKey = INT64_MIN;
1,402✔
352
  if (hasPrevWindow(pFillSup)) {
1,402✔
353
    prevWKey = pFillSup->prev.key;
897✔
354
  }
355
  if (hasNextWindow(pFillSup)) {
1,402✔
356
    nextWKey = pFillSup->next.key;
758✔
357
  }
358

359
  pFillInfo->needFill = true;
1,402✔
360
  pFillInfo->pos = FILL_POS_INVALID;
1,402✔
361
  switch (pFillInfo->type) {
1,402!
362
    case TSDB_FILL_NULL:
593✔
363
    case TSDB_FILL_NULL_F:
364
    case TSDB_FILL_SET_VALUE:
365
    case TSDB_FILL_SET_VALUE_F: {
366
      if (pFillSup->prev.key == pFillInfo->preRowKey) {
593!
367
        resetFillWindow(&pFillSup->prev);
×
368
      }
369
      if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) {
593✔
370
        if (pFillSup->next.key == pFillInfo->nextRowKey) {
128✔
371
          pFillInfo->preRowKey = INT64_MIN;
123✔
372
          setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
123✔
373
          pFillInfo->pos = FILL_POS_END;
123✔
374
        } else {
375
          pFillInfo->needFill = false;
5✔
376
          pFillInfo->pos = FILL_POS_START;
5✔
377
        }
378
      } else if (hasPrevWindow(pFillSup)) {
465✔
379
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
267✔
380
        pFillInfo->pos = FILL_POS_END;
267✔
381
      } else {
382
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
198✔
383
        pFillInfo->pos = FILL_POS_START;
198✔
384
      }
385
      copyNotFillExpData(pFillSup, pFillInfo);
593✔
386
    } break;
593✔
387
    case TSDB_FILL_PREV: {
276✔
388
      if (hasNextWindow(pFillSup) && ((pFillSup->next.key != pFillInfo->nextRowKey) ||
276✔
389
                                      (pFillSup->next.key == pFillInfo->nextRowKey && hasNextNextWindow(pFillSup)) ||
110!
390
                                      (pFillSup->next.key == pFillInfo->nextRowKey && !hasPrevWindow(pFillSup)))) {
82!
391
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
103✔
392
        pFillInfo->pos = FILL_POS_START;
103✔
393
        resetFillWindow(&pFillSup->prev);
103✔
394
        pFillSup->prev.key = pFillSup->cur.key;
103✔
395
        pFillSup->prev.pRowVal = pFillSup->cur.pRowVal;
103✔
396
      } else if (hasPrevWindow(pFillSup)) {
173!
397
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
173✔
398
        pFillInfo->pos = FILL_POS_END;
173✔
399
        pFillInfo->preRowKey = INT64_MIN;
173✔
400
      }
401
      pFillInfo->pResRow = &pFillSup->prev;
276✔
402
    } break;
276✔
403
    case TSDB_FILL_NEXT: {
283✔
404
      if (hasPrevWindow(pFillSup)) {
283✔
405
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
188✔
406
        pFillInfo->pos = FILL_POS_END;
188✔
407
        resetFillWindow(&pFillSup->next);
188✔
408
        pFillSup->next.key = pFillSup->cur.key;
188✔
409
        pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
188✔
410
        pFillInfo->preRowKey = INT64_MIN;
188✔
411
      } else {
412
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
95✔
413
        pFillInfo->pos = FILL_POS_START;
95✔
414
      }
415
      pFillInfo->pResRow = &pFillSup->next;
283✔
416
    } break;
283✔
417
    case TSDB_FILL_LINEAR: {
250✔
418
      pFillInfo->pLinearInfo->winIndex = 0;
250✔
419
      if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) {
250✔
420
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
27✔
421
        pFillInfo->pos = FILL_POS_MID;
27✔
422
        pFillInfo->pLinearInfo->nextEnd = nextWKey;
27✔
423
        calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
27✔
424
                         pFillSup->numOfAllCols);
425
        pFillInfo->pResRow = &pFillSup->prev;
27✔
426

427
        calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo,
27✔
428
                         pFillSup->numOfAllCols);
429
        pFillInfo->pLinearInfo->hasNext = true;
27✔
430
      } else if (hasPrevWindow(pFillSup)) {
223✔
431
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
109✔
432
        pFillInfo->pos = FILL_POS_END;
109✔
433
        pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
109✔
434
        calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
109✔
435
                         pFillSup->numOfAllCols);
436
        pFillInfo->pResRow = &pFillSup->prev;
109✔
437
        pFillInfo->pLinearInfo->hasNext = false;
109✔
438
      } else {
439
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
114✔
440
        pFillInfo->pos = FILL_POS_START;
114✔
441
        pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
114✔
442
        calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
114✔
443
                         pFillSup->numOfAllCols);
444
        pFillInfo->pResRow = &pFillSup->cur;
114✔
445
        pFillInfo->pLinearInfo->hasNext = false;
114✔
446
      }
447
    } break;
250✔
UNCOV
448
    default:
×
UNCOV
449
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
450
      break;
×
451
  }
452
}
453

454
int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes) {
51,551✔
455
  int32_t code = TSDB_CODE_SUCCESS;
51,551✔
456
  int32_t lino = 0;
51,551✔
457
  SWinKey key = {.groupId = groupId, .ts = ts};
51,551✔
458
  if (tSimpleHashGet(pFillSup->pResMap, &key, sizeof(SWinKey)) != NULL) {
51,551✔
459
    (*pRes) = false;
113✔
460
    goto _end;
113✔
461
  }
462
  code = tSimpleHashPut(pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0);
52,009✔
463
  QUERY_CHECK_CODE(code, lino, _end);
51,833!
464
  (*pRes) = true;
51,833✔
465

466
_end:
51,946✔
467
  if (code != TSDB_CODE_SUCCESS) {
51,946!
468
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
469
  }
470
  return code;
51,946✔
471
}
472

473
static int32_t buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock,
37,387✔
474
                               bool* pRes) {
475
  int32_t code = TSDB_CODE_SUCCESS;
37,387✔
476
  int32_t lino = 0;
37,387✔
477
  if (pBlock->info.rows >= pBlock->info.capacity) {
37,387✔
478
    (*pRes) = false;
7✔
479
    goto _end;
7✔
480
  }
481
  uint64_t groupId = pBlock->info.id.groupId;
37,380✔
482
  bool     ckRes = true;
37,380✔
483
  code = checkResult(pFillSup, ts, groupId, &ckRes);
37,380✔
484
  QUERY_CHECK_CODE(code, lino, _end);
37,381!
485

486
  if (pFillSup->hasDelete && !ckRes) {
37,381✔
487
    (*pRes) = true;
28✔
488
    goto _end;
28✔
489
  }
490
  for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
167,533✔
491
    SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
130,167✔
492
    int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
130,167✔
493
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
130,167✔
494
    SFillInfo        tmpInfo = {
130,147✔
495
               .currentKey = ts,
496
               .order = TSDB_ORDER_ASC,
497
               .interval = pFillSup->interval,
498
    };
499
    bool filled = fillIfWindowPseudoColumn(&tmpInfo, pFillCol, pColData, pBlock->info.rows);
130,147✔
500
    if (!filled) {
130,397✔
501
      SResultCellData* pCell = getResultCell(pResRow, slotId);
93,151✔
502
      code = setRowCell(pColData, pBlock->info.rows, pCell);
92,932✔
503
      QUERY_CHECK_CODE(code, lino, _end);
92,934!
504
    }
505
  }
506
  pBlock->info.rows++;
37,366✔
507
  (*pRes) = true;
37,366✔
508

509
_end:
37,401✔
510
  if (code != TSDB_CODE_SUCCESS) {
37,401!
511
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
512
  }
513
  return code;
37,387✔
514
}
515

516
bool hasRemainCalc(SStreamFillInfo* pFillInfo) {
55,718✔
517
  if (pFillInfo->current != INT64_MIN && pFillInfo->current <= pFillInfo->end) {
55,718✔
518
    return true;
48,842✔
519
  }
520
  return false;
6,876✔
521
}
522

523
static void doStreamFillNormal(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
1,198✔
524
  int32_t code = TSDB_CODE_SUCCESS;
1,198✔
525
  int32_t lino = 0;
1,198✔
526
  while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
36,472✔
527
    STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
35,274✔
528
    if (inWinRange(&pFillSup->winRange, &st)) {
35,274!
529
      bool res = true;
35,274✔
530
      code = buildFillResult(pFillInfo->pResRow, pFillSup, pFillInfo->current, pBlock, &res);
35,274✔
531
      QUERY_CHECK_CODE(code, lino, _end);
35,274!
532
    }
533
    pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
35,274✔
534
                                     pFillSup->interval.precision);
35,274✔
535
  }
536

537
_end:
1,198✔
538
  if (code != TSDB_CODE_SUCCESS) {
1,198!
539
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
540
  }
541
}
1,198✔
542

543
static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
291✔
544
  int32_t code = TSDB_CODE_SUCCESS;
291✔
545
  int32_t lino = 0;
291✔
546
  while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
14,914✔
547
    uint64_t    groupId = pBlock->info.id.groupId;
14,623✔
548
    SWinKey     key = {.groupId = groupId, .ts = pFillInfo->current};
14,623✔
549
    STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
14,623✔
550
    bool        ckRes = true;
14,623✔
551
    code = checkResult(pFillSup, pFillInfo->current, groupId, &ckRes);
14,623✔
552
    QUERY_CHECK_CODE(code, lino, _end);
14,623!
553

554
    if ((pFillSup->hasDelete && !ckRes) || !inWinRange(&pFillSup->winRange, &st)) {
14,623!
555
      pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
16✔
556
                                       pFillSup->interval.precision);
8✔
557
      pFillInfo->pLinearInfo->winIndex++;
8✔
558
      continue;
8✔
559
    }
560
    pFillInfo->pLinearInfo->winIndex++;
14,615✔
561
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
51,163✔
562
      SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
36,542✔
563
      SFillInfo     tmp = {
36,542✔
564
              .currentKey = pFillInfo->current,
36,542✔
565
              .order = TSDB_ORDER_ASC,
566
              .interval = pFillSup->interval,
567
      };
568

569
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
36,542✔
570
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
36,542✔
571
      int16_t          type = pColData->info.type;
36,544✔
572
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
36,544✔
573
      int32_t          index = pBlock->info.rows;
36,458✔
574
      if (pFillCol->notFillCol) {
36,458✔
575
        bool filled = fillIfWindowPseudoColumn(&tmp, pFillCol, pColData, index);
14,615✔
576
        if (!filled) {
14,615✔
577
          code = setRowCell(pColData, index, pCell);
10✔
578
          QUERY_CHECK_CODE(code, lino, _end);
×
579
        }
580
      } else {
581
        if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) {
21,843!
582
          colDataSetNULL(pColData, index);
31!
583
          continue;
31✔
584
        }
585
        SPoint* pEnd = taosArrayGet(pFillInfo->pLinearInfo->pEndPoints, slotId);
21,812✔
586
        double  vCell = 0;
21,804✔
587
        SPoint  start = {0};
21,804✔
588
        start.key = pFillInfo->pResRow->key;
21,804✔
589
        start.val = pCell->pData;
21,804✔
590

591
        SPoint cur = {0};
21,804✔
592
        cur.key = pFillInfo->current;
21,804✔
593
        cur.val = taosMemoryCalloc(1, pCell->bytes);
21,804✔
594
        QUERY_CHECK_NULL(cur.val, code, lino, _end, terrno);
21,918!
595
        taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type);
21,918✔
596
        code = colDataSetVal(pColData, index, (const char*)cur.val, false);
21,894✔
597
        QUERY_CHECK_CODE(code, lino, _end);
21,867!
598
        destroySPoint(&cur);
21,867✔
599
      }
600
    }
601
    pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
29,236✔
602
                                     pFillSup->interval.precision);
14,621✔
603
    pBlock->info.rows++;
14,615✔
604
  }
605

606
_end:
291✔
607
  if (code != TSDB_CODE_SUCCESS) {
291!
608
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
609
  }
610
}
291✔
611

612
static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) {
1,869✔
613
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
1,869✔
614

615
  SWinKey key = {.groupId = groupId, .ts = pRow->key};
1,869✔
616
  int32_t code = pAPI->stateStore.streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len);
1,869✔
617
  qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 "  code:%d", key.ts, key.groupId, code);
1,869✔
618
  if (code != TSDB_CODE_SUCCESS) {
1,869!
619
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
620
  }
621
}
1,869✔
622

623
static void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* pFillSup, SSDataBlock* pRes) {
1,769✔
624
  int32_t code = TSDB_CODE_SUCCESS;
1,769✔
625
  int32_t lino = 0;
1,769✔
626
  bool    res = false;
1,769✔
627
  if (pFillInfo->needFill == false) {
1,769✔
628
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
307✔
629
    QUERY_CHECK_CODE(code, lino, _end);
307!
630
    return;
307✔
631
  }
632

633
  if (pFillInfo->pos == FILL_POS_START) {
1,462✔
634
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
510✔
635
    QUERY_CHECK_CODE(code, lino, _end);
510!
636
    if (res) {
510!
637
      pFillInfo->pos = FILL_POS_INVALID;
510✔
638
    }
639
  }
640
  if (pFillInfo->type != TSDB_FILL_LINEAR) {
1,462✔
641
    doStreamFillNormal(pFillSup, pFillInfo, pRes);
1,198✔
642
  } else {
643
    doStreamFillLinear(pFillSup, pFillInfo, pRes);
264✔
644

645
    if (pFillInfo->pos == FILL_POS_MID) {
264✔
646
      code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
27✔
647
      QUERY_CHECK_CODE(code, lino, _end);
27!
648
      if (res) {
27!
649
        pFillInfo->pos = FILL_POS_INVALID;
27✔
650
      }
651
    }
652

653
    if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) {
264✔
654
      pFillInfo->pLinearInfo->hasNext = false;
27✔
655
      pFillInfo->pLinearInfo->winIndex = 0;
27✔
656
      taosArraySwap(pFillInfo->pLinearInfo->pEndPoints, pFillInfo->pLinearInfo->pNextEndPoints);
27✔
657
      pFillInfo->pResRow = &pFillSup->cur;
27✔
658
      setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo);
27✔
659
      doStreamFillLinear(pFillSup, pFillInfo, pRes);
27✔
660
    }
661
  }
662
  if (pFillInfo->pos == FILL_POS_END) {
1,462✔
663
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res);
867✔
664
    QUERY_CHECK_CODE(code, lino, _end);
867!
665
    if (res) {
867✔
666
      pFillInfo->pos = FILL_POS_INVALID;
860✔
667
    }
668
  }
669

670
_end:
602✔
671
  if (code != TSDB_CODE_SUCCESS) {
1,462!
672
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
673
  }
674
}
675

676
int32_t keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol,
1,869✔
677
                           int32_t rowId, uint64_t groupId, int32_t rowSize) {
678
  int32_t code = TSDB_CODE_SUCCESS;
1,869✔
679
  int32_t lino = 0;
1,869✔
680
  TSKEY ts = tsCol[rowId];
1,869✔
681
  pFillInfo->nextRowKey = ts;
1,869✔
682
  SResultRowData tmpNextRow = {.key = ts};
1,869✔
683
  tmpNextRow.pRowVal = taosMemoryCalloc(1, rowSize);
1,869✔
684
  QUERY_CHECK_NULL(tmpNextRow.pRowVal, code, lino, _end, terrno);
1,869!
685
  transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow);
1,869✔
686
  keepResultInDiscBuf(pOperator, groupId, &tmpNextRow, rowSize);
1,869✔
687
  taosMemoryFreeClear(tmpNextRow.pRowVal);
1,869!
688

689
_end:
×
690
  if (code != TSDB_CODE_SUCCESS) {
1,869!
691
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
692
  }
693
  return code;
1,869✔
694
}
695

696
static void doFillResults(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo,
1,704✔
697
                          SSDataBlock* pBlock, TSKEY* tsCol, int32_t rowId, SSDataBlock* pRes) {
698
  uint64_t groupId = pBlock->info.id.groupId;
1,704✔
699
  getWindowFromDiscBuf(pOperator, tsCol[rowId], groupId, pFillSup);
1,704✔
700
  if (pFillSup->prev.key == pFillInfo->preRowKey) {
1,704✔
701
    resetFillWindow(&pFillSup->prev);
807✔
702
  }
703
  setFillValueInfo(pBlock, tsCol[rowId], rowId, pFillSup, pFillInfo);
1,704✔
704
  doStreamFillRange(pFillInfo, pFillSup, pRes);
1,704✔
705
}
1,704✔
706

707
static void doStreamFillImpl(SOperatorInfo* pOperator) {
1,084✔
708
  int32_t                  code = TSDB_CODE_SUCCESS;
1,084✔
709
  int32_t                  lino = 0;
1,084✔
710
  SStreamFillOperatorInfo* pInfo = pOperator->info;
1,084✔
711
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,084✔
712
  SStreamFillSupporter*    pFillSup = pInfo->pFillSup;
1,084✔
713
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
1,084✔
714
  SSDataBlock*             pBlock = pInfo->pSrcBlock;
1,084✔
715
  uint64_t                 groupId = pBlock->info.id.groupId;
1,084✔
716
  SSDataBlock*             pRes = pInfo->pRes;
1,084✔
717
  SColumnInfoData*         pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
1,084✔
718
  TSKEY*                   tsCol = (TSKEY*)pTsCol->pData;
1,084✔
719
  pRes->info.id.groupId = groupId;
1,084✔
720
  pInfo->srcRowIndex++;
1,084✔
721

722
  if (pInfo->srcRowIndex == 0) {
1,084!
723
    code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
1,084✔
724
    QUERY_CHECK_CODE(code, lino, _end);
1,084!
725
    pInfo->srcRowIndex++;
1,084✔
726
  }
727

728
  while (pInfo->srcRowIndex < pBlock->info.rows) {
1,704✔
729
    code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
620✔
730
    QUERY_CHECK_CODE(code, lino, _end);
620!
731
    doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
620✔
732
    if (pInfo->pRes->info.rows == pInfo->pRes->info.capacity) {
620!
733
      code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
×
734
      QUERY_CHECK_CODE(code, lino, _end);
×
735
      return;
×
736
    }
737
    pInfo->srcRowIndex++;
620✔
738
  }
739
  doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
1,084✔
740
  code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
1,084✔
741
  QUERY_CHECK_CODE(code, lino, _end);
1,084!
742
  blockDataCleanup(pInfo->pSrcBlock);
1,084✔
743

744
_end:
1,084✔
745
  if (code != TSDB_CODE_SUCCESS) {
1,084!
746
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
747
  }
748
}
749

750
static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_t groupId, SSDataBlock* delRes) {
93✔
751
  int32_t          code = TSDB_CODE_SUCCESS;
93✔
752
  int32_t          lino = 0;
93✔
753
  SStorageAPI*     pAPI = &pOp->pTaskInfo->storageAPI;
93✔
754
  void*            pState = pOp->pTaskInfo->streamInfo.pState;
93✔
755
  SExecTaskInfo*   pTaskInfo = pOp->pTaskInfo;
93✔
756
  SSDataBlock*     pBlock = delRes;
93✔
757
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
93✔
758
  SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
93✔
759
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
93✔
760
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
93✔
761
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
93✔
762
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
93✔
763
  SColumnInfoData* pTbNameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
93✔
764
  code = colDataSetVal(pStartCol, pBlock->info.rows, (const char*)&start, false);
93✔
765
  QUERY_CHECK_CODE(code, lino, _end);
93!
766

767
  code = colDataSetVal(pEndCol, pBlock->info.rows, (const char*)&end, false);
93✔
768
  QUERY_CHECK_CODE(code, lino, _end);
93!
769

770
  colDataSetNULL(pUidCol, pBlock->info.rows);
93!
771
  code = colDataSetVal(pGroupCol, pBlock->info.rows, (const char*)&groupId, false);
93✔
772
  QUERY_CHECK_CODE(code, lino, _end);
93!
773

774
  colDataSetNULL(pCalStartCol, pBlock->info.rows);
93!
775
  colDataSetNULL(pCalEndCol, pBlock->info.rows);
93!
776

777
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
93✔
778

779
  void*   tbname = NULL;
93✔
780
  int32_t winCode = TSDB_CODE_SUCCESS;
93✔
781
  code = pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, groupId, &tbname, false, &winCode);
93✔
782
  QUERY_CHECK_CODE(code, lino, _end);
93!
783
  if (winCode != TSDB_CODE_SUCCESS) {
93✔
784
    colDataSetNULL(pTableCol, pBlock->info.rows);
15!
785
  } else {
786
    char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
787
    STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
78✔
788
    code = colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
78✔
789
    QUERY_CHECK_CODE(code, lino, _end);
78!
790
    pAPI->stateStore.streamStateFreeVal(tbname);
78✔
791
  }
792

793
  pBlock->info.rows++;
93✔
794

795
_end:
93✔
796
  if (code != TSDB_CODE_SUCCESS) {
93!
797
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
798
  }
799
  return code;
93✔
800
}
801

802
static int32_t buildDeleteResult(SOperatorInfo* pOperator, TSKEY startTs, TSKEY endTs, uint64_t groupId,
93✔
803
                                 SSDataBlock* delRes) {
804
  int32_t                  code = TSDB_CODE_SUCCESS;
93✔
805
  int32_t                  lino = 0;
93✔
806
  SStreamFillOperatorInfo* pInfo = pOperator->info;
93✔
807
  SStreamFillSupporter*    pFillSup = pInfo->pFillSup;
93✔
808
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
93✔
809
  if (hasPrevWindow(pFillSup)) {
93✔
810
    TSKEY start = getNextWindowTs(pFillSup->prev.key, &pFillSup->interval);
58✔
811
    code = buildDeleteRange(pOperator, start, endTs, groupId, delRes);
58✔
812
    QUERY_CHECK_CODE(code, lino, _end);
58!
813
  } else if (hasNextWindow(pFillSup)) {
35✔
814
    TSKEY end = getPrevWindowTs(pFillSup->next.key, &pFillSup->interval);
30✔
815
    code = buildDeleteRange(pOperator, startTs, end, groupId, delRes);
30✔
816
    QUERY_CHECK_CODE(code, lino, _end);
30!
817
  } else {
818
    code = buildDeleteRange(pOperator, startTs, endTs, groupId, delRes);
5✔
819
    QUERY_CHECK_CODE(code, lino, _end);
5!
820
  }
821

822
_end:
5✔
823
  if (code != TSDB_CODE_SUCCESS) {
93!
824
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
825
  }
826
  return code;
93✔
827
}
828

829
static int32_t doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKEY endTs, uint64_t groupId) {
148✔
830
  int32_t                  code = TSDB_CODE_SUCCESS;
148✔
831
  int32_t                  lino = 0;
148✔
832
  SStorageAPI*             pAPI = &pOperator->pTaskInfo->storageAPI;
148✔
833
  SStreamFillOperatorInfo* pInfo = pOperator->info;
148✔
834
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
148✔
835
  getWindowFromDiscBuf(pOperator, startTs, groupId, pInfo->pFillSup);
148✔
836
  setDeleteFillValueInfo(startTs, endTs, pInfo->pFillSup, pInfo->pFillInfo);
148✔
837
  SWinKey key = {.ts = startTs, .groupId = groupId};
148✔
838
  pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
148✔
839
  if (!pInfo->pFillInfo->needFill) {
148✔
840
    code = buildDeleteResult(pOperator, startTs, endTs, groupId, pInfo->pDelRes);
93✔
841
    QUERY_CHECK_CODE(code, lino, _end);
93!
842
  } else {
843
    STimeRange tw = {
55✔
844
        .skey = startTs,
845
        .ekey = endTs,
846
        .groupId = groupId,
847
    };
848
    void* tmp = taosArrayPush(pInfo->pFillInfo->delRanges, &tw);
55✔
849
    if (!tmp) {
55!
850
      code = terrno;
×
851
      QUERY_CHECK_CODE(code, lino, _end);
×
852
    }
853
  }
854

855
_end:
148✔
856
  if (code != TSDB_CODE_SUCCESS) {
148!
857
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
858
  }
859
  return code;
148✔
860
}
861

862
static void getWindowInfoByKey(SStorageAPI* pAPI, void* pState, TSKEY ts, int64_t groupId, SResultRowData* pWinData) {
×
863
  SWinKey key = {.ts = ts, .groupId = groupId};
×
864
  void*   val = NULL;
×
865
  int32_t len = 0;
×
866
  int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&val, &len, NULL);
×
867
  if (code != TSDB_CODE_SUCCESS) {
×
868
    qDebug("get window info by key failed, Data may be deleted, try next window. ts:%" PRId64 ", groupId:%" PRId64, ts,
×
869
           groupId);
870
    SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key);
×
871
    code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &key, (const void**)&val, &len);
×
872
    pAPI->stateStore.streamStateFreeCur(pCur);
×
873
    qDebug("get window info by key ts:%" PRId64 ", groupId:%" PRId64 ", res%d", ts, groupId, code);
×
874
  }
875

876
  if (code == TSDB_CODE_SUCCESS) {
×
877
    resetFillWindow(pWinData);
×
878
    pWinData->key = key.ts;
×
879
    pWinData->pRowVal = val;
×
880
  }
881
}
×
882

883
static void doDeleteFillFinalize(SOperatorInfo* pOperator) {
3,611✔
884
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
3,611✔
885

886
  SStreamFillOperatorInfo* pInfo = pOperator->info;
3,611✔
887
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
3,611✔
888
  int32_t                  size = taosArrayGetSize(pFillInfo->delRanges);
3,611✔
889
  while (pFillInfo->delIndex < size) {
3,666✔
890
    STimeRange* range = taosArrayGet(pFillInfo->delRanges, pFillInfo->delIndex);
55✔
891
    if (pInfo->pRes->info.id.groupId != 0 && pInfo->pRes->info.id.groupId != range->groupId) {
55!
892
      return;
×
893
    }
894
    getWindowFromDiscBuf(pOperator, range->skey, range->groupId, pInfo->pFillSup);
55✔
895
    TSKEY realEnd = range->ekey + 1;
55✔
896
    if (pInfo->pFillInfo->type == TSDB_FILL_NEXT && pInfo->pFillSup->next.key != realEnd) {
55!
897
      getWindowInfoByKey(pAPI, pOperator->pTaskInfo->streamInfo.pState, realEnd, range->groupId,
×
898
                         &pInfo->pFillSup->next);
×
899
    }
900
    setDeleteFillValueInfo(range->skey, range->ekey, pInfo->pFillSup, pInfo->pFillInfo);
55✔
901
    pFillInfo->delIndex++;
55✔
902
    if (pInfo->pFillInfo->needFill) {
55!
903
      doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
55✔
904
      pInfo->pRes->info.id.groupId = range->groupId;
55✔
905
    }
906
  }
907
}
908

909
static int32_t doDeleteFillResult(SOperatorInfo* pOperator) {
138✔
910
  int32_t                  code = TSDB_CODE_SUCCESS;
138✔
911
  int32_t                  lino = 0;
138✔
912
  SStorageAPI*             pAPI = &pOperator->pTaskInfo->storageAPI;
138✔
913
  SStreamFillOperatorInfo* pInfo = pOperator->info;
138✔
914
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
138✔
915
  SSDataBlock*             pBlock = pInfo->pSrcDelBlock;
138✔
916
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
138✔
917

918
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
138✔
919
  TSKEY*           tsStarts = (TSKEY*)pStartCol->pData;
138✔
920
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
138✔
921
  uint64_t*        groupIds = (uint64_t*)pGroupCol->pData;
138✔
922
  while (pInfo->srcDelRowIndex < pBlock->info.rows) {
349✔
923
    TSKEY            ts = tsStarts[pInfo->srcDelRowIndex];
211✔
924
    TSKEY            endTs = ts;
211✔
925
    uint64_t         groupId = groupIds[pInfo->srcDelRowIndex];
211✔
926
    SWinKey          key = {.ts = ts, .groupId = groupId};
211✔
927
    SStreamStateCur* pCur = pAPI->stateStore.streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &key);
211✔
928

929
    if (!pCur) {
211✔
930
      pInfo->srcDelRowIndex++;
63✔
931
      continue;
63✔
932
    }
933

934
    SWinKey nextKey = {.groupId = groupId, .ts = ts};
148✔
935
    while (pInfo->srcDelRowIndex < pBlock->info.rows) {
412✔
936
      TSKEY    delTs = tsStarts[pInfo->srcDelRowIndex];
337✔
937
      uint64_t delGroupId = groupIds[pInfo->srcDelRowIndex];
337✔
938
      int32_t  winCode = TSDB_CODE_SUCCESS;
337✔
939
      if (groupId != delGroupId) {
337!
940
        break;
73✔
941
      }
942
      if (delTs > nextKey.ts) {
337✔
943
        break;
10✔
944
      }
945

946
      SWinKey delKey = {.groupId = delGroupId, .ts = delTs};
327✔
947
      if (delTs == nextKey.ts) {
327✔
948
        pAPI->stateStore.streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur);
252✔
949
        winCode = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextKey, NULL, NULL);
252✔
950
        // ts will be deleted later
951
        if (delTs != ts) {
252✔
952
          pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &delKey);
104✔
953
          pAPI->stateStore.streamStateFreeCur(pCur);
104✔
954
          pCur = pAPI->stateStore.streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &nextKey);
104✔
955
        }
956
        endTs = TMAX(delTs, nextKey.ts - 1);
252✔
957
        if (winCode != TSDB_CODE_SUCCESS) {
252✔
958
          break;
63✔
959
        }
960
      }
961
      pInfo->srcDelRowIndex++;
264✔
962
    }
963

964
    pAPI->stateStore.streamStateFreeCur(pCur);
148✔
965
    code = doDeleteFillResultImpl(pOperator, ts, endTs, groupId);
148✔
966
    QUERY_CHECK_CODE(code, lino, _end);
148!
967
  }
968

969
  pFillInfo->current = pFillInfo->end + 1;
138✔
970

971
_end:
138✔
972
  if (code != TSDB_CODE_SUCCESS) {
138!
973
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
974
  }
975
  return code;
138✔
976
}
977

978
void resetStreamFillSup(SStreamFillSupporter* pFillSup) {
3,977✔
979
  tSimpleHashClear(pFillSup->pResMap);
3,977✔
980
  pFillSup->hasDelete = false;
3,978✔
981
}
3,978✔
982
void resetStreamFillInfo(SStreamFillOperatorInfo* pInfo) {
3,564✔
983
  resetStreamFillSup(pInfo->pFillSup);
3,564✔
984
  taosArrayClear(pInfo->pFillInfo->delRanges);
3,565✔
985
  pInfo->pFillInfo->delIndex = 0;
3,565✔
986
}
3,565✔
987

988
static int32_t doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pSrcBlock,
1,187✔
989
                                              SSDataBlock* pDstBlock) {
990
  int32_t                  code = TSDB_CODE_SUCCESS;
1,187✔
991
  int32_t                  lino = 0;
1,187✔
992
  SStreamFillOperatorInfo* pInfo = pOperator->info;
1,187✔
993
  SExprSupp*               pSup = &pOperator->exprSupp;
1,187✔
994
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,187✔
995

996
  blockDataCleanup(pDstBlock);
1,187✔
997
  code = blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows);
1,186✔
998
  QUERY_CHECK_CODE(code, lino, _end);
1,186!
999

1000
  code = setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
1,186✔
1001
  QUERY_CHECK_CODE(code, lino, _end);
1,187!
1002
  code = projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
1,187✔
1003
  QUERY_CHECK_CODE(code, lino, _end);
1,187!
1004

1005
  pDstBlock->info.rows = 0;
1,187✔
1006
  pSup = &pInfo->pFillSup->notFillExprSup;
1,187✔
1007
  code = setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
1,187✔
1008
  QUERY_CHECK_CODE(code, lino, _end);
1,187!
1009
  code = projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
1,187✔
1010
  QUERY_CHECK_CODE(code, lino, _end);
1,187!
1011

1012
  pDstBlock->info.id.groupId = pSrcBlock->info.id.groupId;
1,187✔
1013

1014
  code = blockDataUpdateTsWindow(pDstBlock, pInfo->primaryTsCol);
1,187✔
1015

1016
_end:
1,187✔
1017
  if (code != TSDB_CODE_SUCCESS) {
1,187!
1018
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1019
  }
1020
  return code;
1,187✔
1021
}
1022

1023
static int32_t doStreamFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
4,938✔
1024
  int32_t                  code = TSDB_CODE_SUCCESS;
4,938✔
1025
  int32_t                  lino = 0;
4,938✔
1026
  SStreamFillOperatorInfo* pInfo = pOperator->info;
4,938✔
1027
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
4,938✔
1028

1029
  if (pOperator->status == OP_EXEC_DONE) {
4,938!
1030
    (*ppRes) = NULL;
×
1031
    return code;
×
1032
  }
1033
  blockDataCleanup(pInfo->pRes);
4,938✔
1034
  if (hasRemainCalc(pInfo->pFillInfo) ||
4,935✔
1035
      (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) {
4,930✔
1036
    doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
10✔
1037
    if (pInfo->pRes->info.rows > 0) {
10!
1038
      printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
10✔
1039
      (*ppRes) = pInfo->pRes;
10✔
1040
      return code;
10✔
1041
    }
1042
  }
1043
  if (pOperator->status == OP_RES_TO_RETURN) {
4,927✔
1044
    doDeleteFillFinalize(pOperator);
46✔
1045
    if (pInfo->pRes->info.rows > 0) {
46!
1046
      printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
1047
      (*ppRes) = pInfo->pRes;
×
1048
      return code;
×
1049
    }
1050
    setOperatorCompleted(pOperator);
46✔
1051
    resetStreamFillInfo(pInfo);
46✔
1052
    (*ppRes) = NULL;
46✔
1053
    return code;
46✔
1054
  }
1055

1056
  SSDataBlock*   fillResult = NULL;
4,881✔
1057
  SOperatorInfo* downstream = pOperator->pDownstream[0];
4,881✔
1058
  while (1) {
1059
    if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows || pInfo->pSrcBlock->info.rows == 0) {
4,926!
1060
      // If there are delete datablocks, we receive  them first.
1061
      SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
4,926✔
1062
      if (pBlock == NULL) {
4,930✔
1063
        pOperator->status = OP_RES_TO_RETURN;
3,565✔
1064
        pInfo->pFillInfo->preRowKey = INT64_MIN;
3,565✔
1065
        if (pInfo->pRes->info.rows > 0) {
3,565!
1066
          printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
1067
          (*ppRes) = pInfo->pRes;
×
1068
          return code;
×
1069
        }
1070
        break;
3,565✔
1071
      }
1072
      printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
1,365✔
1073

1074
      if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) {
1,365✔
1075
        pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId;
299✔
1076
        pInfo->pFillInfo->preRowKey = INT64_MIN;
299✔
1077
      }
1078

1079
      pInfo->pFillSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
1,365✔
1080
      if (pInfo->pFillSup->winRange.ekey <= 0) {
1,365!
1081
        pInfo->pFillSup->winRange.ekey = INT64_MAX;
×
1082
      }
1083

1084
      switch (pBlock->info.type) {
1,365!
1085
        case STREAM_RETRIEVE:
5✔
1086
          (*ppRes) = pBlock;
5✔
1087
          return code;
5✔
1088
        case STREAM_DELETE_RESULT: {
138✔
1089
          pInfo->pSrcDelBlock = pBlock;
138✔
1090
          pInfo->srcDelRowIndex = 0;
138✔
1091
          blockDataCleanup(pInfo->pDelRes);
138✔
1092
          pInfo->pFillSup->hasDelete = true;
138✔
1093
          code = doDeleteFillResult(pOperator);
138✔
1094
          QUERY_CHECK_CODE(code, lino, _end);
138!
1095

1096
          if (pInfo->pDelRes->info.rows > 0) {
138✔
1097
            printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
93✔
1098
            (*ppRes) = pInfo->pDelRes;
93✔
1099
            return code;
93✔
1100
          }
1101
          continue;
45✔
1102
        } break;
1103
        case STREAM_NORMAL:
1,084✔
1104
        case STREAM_INVALID:
1105
        case STREAM_PULL_DATA: {
1106
          code = doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
1,084✔
1107
          QUERY_CHECK_CODE(code, lino, _end);
1,084!
1108

1109
          memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
1,084✔
1110
          pInfo->srcRowIndex = -1;
1,084✔
1111
        } break;
1,084✔
1112
        case STREAM_CHECKPOINT:
138✔
1113
        case STREAM_CREATE_CHILD_TABLE: {
1114
          (*ppRes) = pBlock;
138✔
1115
          return code;
138✔
1116
        } break;
1117
        default:
×
1118
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1119
      }
1120
    }
1121

1122
    doStreamFillImpl(pOperator);
1,084✔
1123
    code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
1,084✔
1124
    QUERY_CHECK_CODE(code, lino, _end);
1,084!
1125

1126
    memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
1,084✔
1127
    pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
1,084✔
1128
    if (pInfo->pRes->info.rows > 0) {
1,084!
1129
      break;
1,084✔
1130
    }
1131
  }
1132
  if (pOperator->status == OP_RES_TO_RETURN) {
4,649✔
1133
    doDeleteFillFinalize(pOperator);
3,565✔
1134
  }
1135

1136
  if (pInfo->pRes->info.rows == 0) {
4,649✔
1137
    setOperatorCompleted(pOperator);
3,519✔
1138
    resetStreamFillInfo(pInfo);
3,519✔
1139
    (*ppRes) = NULL;
3,519✔
1140
    return code;
3,519✔
1141
  }
1142

1143
  pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
1,130✔
1144
  printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
1,130✔
1145
  (*ppRes) = pInfo->pRes;
1,130✔
1146
  return code;
1,130✔
1147

1148
_end:
×
1149
  if (code != TSDB_CODE_SUCCESS) {
×
1150
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1151
    pTaskInfo->code = code;
×
1152
    T_LONG_JMP(pTaskInfo->env, code);
×
1153
  }
1154
  setOperatorCompleted(pOperator);
×
1155
  resetStreamFillInfo(pInfo);
×
1156
  (*ppRes) = NULL;
×
1157
  return code;
×
1158
}
1159

1160
void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup,
325✔
1161
                                SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
1162
  int32_t code = TSDB_CODE_SUCCESS;
325✔
1163
  int32_t lino = 0;
325✔
1164
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
325✔
1165
  void*        pState = pOperator->pTaskInfo->streamInfo.pState;
325✔
1166
  bool    res = false;
325✔
1167
  int32_t numOfRows = getNumOfTotalRes(pGroupResInfo);
325✔
1168
  for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) {
727✔
1169
    SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index);
521✔
1170
    if (pBlock->info.id.groupId == 0) {
521✔
1171
      pBlock->info.id.groupId = pKey->groupId;
325✔
1172
    } else if (pBlock->info.id.groupId != pKey->groupId) {
196✔
1173
      break;
119✔
1174
    }
1175
    void*    val = NULL;
402✔
1176
    int32_t  len = 0;
402✔
1177
    int32_t  winCode = pAPI->stateStore.streamStateFillGet(pOperator->pTaskInfo->streamInfo.pState, pKey, (void**)&val, &len, NULL);
402✔
1178
    qDebug("===stream=== build force fill res. key:%" PRId64 ",groupId:%" PRId64".res:%d", pKey->ts, pKey->groupId, winCode);
402!
1179
    if (winCode == TSDB_CODE_SUCCESS) {
402✔
1180
      pFillSup->cur.key = pKey->ts;
165✔
1181
      pFillSup->cur.pRowVal = val;
165✔
1182
      code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res);
165✔
1183
      QUERY_CHECK_CODE(code, lino, _end);
165!
1184
      resetFillWindow(&pFillSup->cur);
165✔
1185
    } else {
1186
      SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, pKey);
237✔
1187
      SWinKey          preKey = {.ts = INT64_MIN, .groupId = pKey->groupId};
237✔
1188
      void*            preVal = NULL;
237✔
1189
      int32_t          preVLen = 0;
237✔
1190
      winCode = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen);
237✔
1191
      if (winCode == TSDB_CODE_SUCCESS) {
237!
1192
        pFillSup->cur.key = pKey->ts;
237✔
1193
        pFillSup->cur.pRowVal = preVal;
237✔
1194
        if (pFillInfo->type == TSDB_FILL_PREV) {
237✔
1195
          code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res);
133✔
1196
          QUERY_CHECK_CODE(code, lino, _end);
133!
1197
        } else {
1198
          copyNotFillExpData(pFillSup, pFillInfo);
104✔
1199
          pFillInfo->pResRow->key = pKey->ts;
104✔
1200
          code = buildFillResult(pFillInfo->pResRow, pFillSup, pKey->ts, pBlock, &res);
104✔
1201
          QUERY_CHECK_CODE(code, lino, _end);
104!
1202
        }
1203
        resetFillWindow(&pFillSup->cur);
237✔
1204
      }
1205
      pAPI->stateStore.streamStateFreeCur(pCur);
237✔
1206
    }
1207
  }
1208

1209
_end:
206✔
1210
  if (code != TSDB_CODE_SUCCESS) {
325!
1211
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1212
  }
1213
}
325✔
1214

1215
void doBuildForceFillResult(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo,
984✔
1216
                            SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
1217
  blockDataCleanup(pBlock);
984✔
1218
  if (!hasRemainResults(pGroupResInfo)) {
983✔
1219
    return;
658✔
1220
  }
1221

1222
  // clear the existed group id
1223
  pBlock->info.id.groupId = 0;
325✔
1224
  doBuildForceFillResultImpl(pOperator, pFillSup, pFillInfo, pBlock, pGroupResInfo);
325✔
1225
}
1226

1227
static int32_t buildForceFillResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
984✔
1228
  int32_t                  code = TSDB_CODE_SUCCESS;
984✔
1229
  int32_t                  lino = 0;
984✔
1230
  SStreamFillOperatorInfo* pInfo = pOperator->info;
984✔
1231
  uint16_t                 opType = pOperator->operatorType;
984✔
1232
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
984✔
1233

1234
  doBuildForceFillResult(pOperator, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo);
984✔
1235
  if (pInfo->pRes->info.rows != 0) {
983✔
1236
    printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
325✔
1237
    (*ppRes) = pInfo->pRes;
325✔
1238
    goto _end;
325✔
1239
  }
1240

1241
  (*ppRes) = NULL;
658✔
1242

1243
_end:
983✔
1244
  if (code != TSDB_CODE_SUCCESS) {
983!
1245
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1246
  }
1247
  return code;
983✔
1248
}
1249

1250
// force window close impl
1251
static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) {
103✔
1252
  int32_t                  code = TSDB_CODE_SUCCESS;
103✔
1253
  int32_t                  lino = 0;
103✔
1254
  SStreamFillOperatorInfo* pInfo = pOperator->info;
103✔
1255
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
103✔
1256
  SStreamFillSupporter*    pFillSup = pInfo->pFillSup;
103✔
1257
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
103✔
1258
  SSDataBlock*             pBlock = pInfo->pSrcBlock;
103✔
1259
  uint64_t                 groupId = pBlock->info.id.groupId;
103✔
1260
  SStreamAggSupporter*     pAggSup = pInfo->pStreamAggSup;
103✔
1261
  SColumnInfoData*         pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
103✔
1262
  TSKEY*                   tsCol = (TSKEY*)pTsCol->pData;
103✔
1263
  for (int32_t i = 0; i < pBlock->info.rows; i++){
268✔
1264
    code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize);
165✔
1265
    QUERY_CHECK_CODE(code, lino, _end);
165!
1266

1267
    int32_t size =  taosArrayGetSize(pInfo->pCloseTs);
165✔
1268
    if (size > 0) {
165!
1269
      TSKEY* pTs = (TSKEY*) taosArrayGet(pInfo->pCloseTs, 0);
165✔
1270
      TSKEY  resTs = tsCol[i];
165✔
1271
      while (resTs < (*pTs)) {
214✔
1272
        SWinKey key = {.groupId = groupId, .ts = resTs};
77✔
1273
        void* pPushRes = taosArrayPush(pInfo->pUpdated, &key);
77✔
1274
        QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
77!
1275

1276
        if (IS_FILL_CONST_VALUE(pFillSup->type)) {
77!
1277
          break;
1278
        }
1279
        resTs = taosTimeAdd(resTs, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
49✔
1280
                            pFillSup->interval.precision);
49✔
1281
      }
1282
    }
1283
  }
1284
  code = pAggSup->stateStore.streamStateGroupPut(pAggSup->pState, groupId, NULL, 0);
103✔
1285
  QUERY_CHECK_CODE(code, lino, _end);
103!
1286

1287
_end:
103✔
1288
  if (code != TSDB_CODE_SUCCESS) {
103!
1289
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1290
  }
1291
  return code;
103✔
1292
}
1293

1294
int32_t buildAllResultKey(SStreamAggSupporter* pAggSup, TSKEY ts, SArray* pUpdated) {
1,060✔
1295
  int32_t          code = TSDB_CODE_SUCCESS;
1,060✔
1296
  int32_t          lino = 0;
1,060✔
1297
  int64_t          groupId = 0;
1,060✔
1298
  SStreamStateCur* pCur = pAggSup->stateStore.streamStateGroupGetCur(pAggSup->pState);
1,060✔
1299
  while (1) {  
562✔
1300
    int32_t winCode = pAggSup->stateStore.streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL);
1,622✔
1301
    if (winCode != TSDB_CODE_SUCCESS) {
1,622✔
1302
      break;
1,060✔
1303
    }
1304
    SWinKey key = {.ts = ts, .groupId = groupId};
562✔
1305
    void* pPushRes = taosArrayPush(pUpdated, &key);
562✔
1306
    QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
562!
1307

1308
    pAggSup->stateStore.streamStateGroupCurNext(pCur);
562✔
1309
  }
1310
  pAggSup->stateStore.streamStateFreeCur(pCur);
1,060✔
1311
  pCur = NULL;
1,060✔
1312

1313
_end:
1,060✔
1314
  if (code != TSDB_CODE_SUCCESS) {
1,060!
1315
    pAggSup->stateStore.streamStateFreeCur(pCur);
×
1316
    pCur = NULL;
×
1317
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1318
  }
1319
  return code;
1,060✔
1320
}
1321

1322
static void removeDuplicateResult(SArray* pTsArrray, __compar_fn_t fn) {
659✔
1323
  taosArraySort(pTsArrray, fn);
659✔
1324
  taosArrayRemoveDuplicate(pTsArrray, fn, NULL);
659✔
1325
}
659✔
1326

1327
// force window close
1328
static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
985✔
1329
  int32_t                  code = TSDB_CODE_SUCCESS;
985✔
1330
  int32_t                  lino = 0;
985✔
1331
  SStreamFillOperatorInfo* pInfo = pOperator->info;
985✔
1332
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
985✔
1333

1334
  if (pOperator->status == OP_EXEC_DONE) {
985!
1335
    (*ppRes) = NULL;
×
1336
    return code;
×
1337
  }
1338

1339
  if (pOperator->status == OP_RES_TO_RETURN) {
985✔
1340
    SSDataBlock* resBlock = NULL;
325✔
1341
    code = buildForceFillResult(pOperator, &resBlock);
325✔
1342
    QUERY_CHECK_CODE(code, lino, _end);
325!
1343

1344
    if (resBlock != NULL) {
325✔
1345
      (*ppRes) = resBlock;
119✔
1346
      goto _end;
119✔
1347
    }
1348
    pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
206✔
1349
    setStreamOperatorCompleted(pOperator);
206✔
1350
    (*ppRes) = NULL;
206✔
1351
    goto _end;
206✔
1352
  }
1353

1354
  SSDataBlock*   fillResult = NULL;
660✔
1355
  SOperatorInfo* downstream = pOperator->pDownstream[0];
660✔
1356
  while (1) {
750✔
1357
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
1,410✔
1358
    if (pBlock == NULL) {
1,410✔
1359
      pOperator->status = OP_RES_TO_RETURN;
659✔
1360
      qDebug("===stream===return data:%s.", getStreamOpName(pOperator->operatorType));
659!
1361
      break;
659✔
1362
    }
1363
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
751✔
1364
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
751✔
1365

1366
    switch (pBlock->info.type) {
751!
1367
      case STREAM_NORMAL:
103✔
1368
      case STREAM_INVALID: {
1369
        code = doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
103✔
1370
        QUERY_CHECK_CODE(code, lino, _end);
103!
1371

1372
        memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
103✔
1373
        pInfo->srcRowIndex = -1;
103✔
1374
      } break;
103✔
1375
      case STREAM_CHECKPOINT:
1✔
1376
      case STREAM_CREATE_CHILD_TABLE: {
1377
        (*ppRes) = pBlock;
1✔
1378
        goto _end;
1✔
1379
      } break;
1380
      case STREAM_GET_RESULT: {
1,294✔
1381
        void* pPushRes = taosArrayPush(pInfo->pCloseTs, &pBlock->info.window.skey);
647✔
1382
        QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
647!
1383
        continue;
647✔
1384
      }
1385
      default:
×
1386
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1387
        QUERY_CHECK_CODE(code, lino, _end);
×
1388
    }
1389

1390
    code = doStreamForceFillImpl(pOperator);
103✔
1391
    QUERY_CHECK_CODE(code, lino, _end);
103!
1392
  }
1393

1394
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) {
1,306✔
1395
    TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i);
647✔
1396
    code = buildAllResultKey(pInfo->pStreamAggSup, ts, pInfo->pUpdated);
647✔
1397
    QUERY_CHECK_CODE(code, lino, _end);
647!
1398
  }
1399
  taosArrayClear(pInfo->pCloseTs);
659✔
1400
  removeDuplicateResult(pInfo->pUpdated, winKeyCmprImpl);
659✔
1401

1402
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
659✔
1403
  pInfo->groupResInfo.freeItem = false;
659✔
1404

1405
  pInfo->pUpdated = taosArrayInit(1024, sizeof(SWinKey));
659✔
1406
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
659!
1407

1408
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
659✔
1409
  QUERY_CHECK_CODE(code, lino, _end);
659!
1410

1411
  code = buildForceFillResult(pOperator, ppRes);
659✔
1412
  QUERY_CHECK_CODE(code, lino, _end);
659!
1413

1414
  if ((*ppRes) == NULL) {
659✔
1415
    pInfo->pStreamAggSup->stateStore.streamStateClearExpiredState(pInfo->pStreamAggSup->pState);
453✔
1416
    setStreamOperatorCompleted(pOperator);
453✔
1417
  }
1418

1419
_end:
206✔
1420
  if (code != TSDB_CODE_SUCCESS) {
985!
1421
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1422
    pTaskInfo->code = code;
×
1423
  }
1424
  return code;
985✔
1425
}
1426

1427
static int32_t initResultBuf(SSDataBlock* pInputRes, SStreamFillSupporter* pFillSup) {
437✔
1428
  int32_t numOfCols = taosArrayGetSize(pInputRes->pDataBlock);
437✔
1429
  pFillSup->rowSize = sizeof(SResultCellData) * numOfCols;
437✔
1430
  for (int i = 0; i < numOfCols; i++) {
7,660✔
1431
    SColumnInfoData* pCol = taosArrayGet(pInputRes->pDataBlock, i);
7,223✔
1432
    pFillSup->rowSize += pCol->info.bytes;
7,223✔
1433
  }
1434
  pFillSup->next.key = INT64_MIN;
437✔
1435
  pFillSup->nextNext.key = INT64_MIN;
437✔
1436
  pFillSup->prev.key = INT64_MIN;
437✔
1437
  pFillSup->cur.key = INT64_MIN;
437✔
1438
  pFillSup->next.pRowVal = NULL;
437✔
1439
  pFillSup->nextNext.pRowVal = NULL;
437✔
1440
  pFillSup->prev.pRowVal = NULL;
437✔
1441
  pFillSup->cur.pRowVal = NULL;
437✔
1442

1443
  return TSDB_CODE_SUCCESS;
437✔
1444
}
1445

1446
static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNode, SInterval* pInterval,
437✔
1447
                                               SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI, SSDataBlock* pInputRes) {
1448
  int32_t               code = TSDB_CODE_SUCCESS;
437✔
1449
  int32_t               lino = 0;
437✔
1450
  SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
437✔
1451
  if (!pFillSup) {
437!
1452
    code = terrno;
×
1453
    QUERY_CHECK_CODE(code, lino, _end);
×
1454
  }
1455
  pFillSup->numOfFillCols = numOfFillCols;
437✔
1456
  int32_t    numOfNotFillCols = 0;
437✔
1457
  SExprInfo* noFillExprInfo = NULL;
437✔
1458

1459
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExprInfo, &numOfNotFillCols);
437✔
1460
  QUERY_CHECK_CODE(code, lino, _end);
437!
1461

1462
  pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols,
874✔
1463
                                            NULL, 0, (const SNodeListNode*)(pPhyFillNode->pValues));
437✔
1464
  if (pFillSup->pAllColInfo == NULL) {
437!
1465
    code = terrno;
×
1466
    lino = __LINE__;
×
1467
    destroyExprInfo(noFillExprInfo, numOfNotFillCols);
×
1468
    goto _end;
×
1469
  }
1470

1471
  pFillSup->type = convertFillType(pPhyFillNode->mode);
437✔
1472
  pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols;
437✔
1473
  pFillSup->interval = *pInterval;
437✔
1474
  pFillSup->pAPI = pAPI;
437✔
1475

1476
  code = initResultBuf(pInputRes, pFillSup);
437✔
1477
  QUERY_CHECK_CODE(code, lino, _end);
437!
1478

1479
  SExprInfo* noFillExpr = NULL;
437✔
1480
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExpr, &numOfNotFillCols);
437✔
1481
  QUERY_CHECK_CODE(code, lino, _end);
437!
1482

1483
  code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols, &pAPI->functionStore);
437✔
1484
  QUERY_CHECK_CODE(code, lino, _end);
437!
1485

1486
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
437✔
1487
  pFillSup->pResMap = tSimpleHashInit(16, hashFn);
437✔
1488
  QUERY_CHECK_NULL(pFillSup->pResMap, code, lino, _end, terrno);
437!
1489
  pFillSup->hasDelete = false;
437✔
1490

1491
_end:
437✔
1492
  if (code != TSDB_CODE_SUCCESS) {
437!
1493
    destroyStreamFillSupporter(pFillSup);
×
1494
    pFillSup = NULL;
×
1495
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1496
  }
1497
  return pFillSup;
437✔
1498
}
1499

1500
SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pRes) {
460✔
1501
  int32_t          code = TSDB_CODE_SUCCESS;
460✔
1502
  int32_t          lino = 0;
460✔
1503
  SStreamFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SStreamFillInfo));
460✔
1504
  if (!pFillInfo) {
460!
1505
    code = terrno;
×
1506
    QUERY_CHECK_CODE(code, lino, _end);
×
1507
  }
1508

1509
  pFillInfo->start = INT64_MIN;
460✔
1510
  pFillInfo->current = INT64_MIN;
460✔
1511
  pFillInfo->end = INT64_MIN;
460✔
1512
  pFillInfo->preRowKey = INT64_MIN;
460✔
1513
  pFillInfo->needFill = false;
460✔
1514
  pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo));
460✔
1515
  if (!pFillInfo) {
460!
1516
    code = terrno;
×
1517
    QUERY_CHECK_CODE(code, lino, _end);
×
1518
  }
1519

1520
  pFillInfo->pLinearInfo->hasNext = false;
460✔
1521
  pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
460✔
1522
  pFillInfo->pLinearInfo->pEndPoints = NULL;
460✔
1523
  pFillInfo->pLinearInfo->pNextEndPoints = NULL;
460✔
1524
  if (pFillSup->type == TSDB_FILL_LINEAR) {
460✔
1525
    pFillInfo->pLinearInfo->pEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
80✔
1526
    if (!pFillInfo->pLinearInfo->pEndPoints) {
80!
1527
      code = terrno;
×
1528
      QUERY_CHECK_CODE(code, lino, _end);
×
1529
    }
1530

1531
    pFillInfo->pLinearInfo->pNextEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
80✔
1532
    if (!pFillInfo->pLinearInfo->pNextEndPoints) {
80!
1533
      code = terrno;
×
1534
      QUERY_CHECK_CODE(code, lino, _end);
×
1535
    }
1536

1537
    for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
1,399✔
1538
      SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
1,319✔
1539
      if (pColData == NULL) {
1,319!
1540
        SPoint dummy = {0};
×
1541
        dummy.val = taosMemoryCalloc(1, 1);
×
1542
        void* tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &dummy);
×
1543
        QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
×
1544

1545
        dummy.val = taosMemoryCalloc(1, 1);
×
1546
        tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &dummy);
×
1547
        QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
×
1548

1549
        continue;
×
1550
      }
1551
      SPoint value = {0};
1,319✔
1552
      value.val = taosMemoryCalloc(1, pColData->info.bytes);
1,319✔
1553
      QUERY_CHECK_NULL(value.val, code, lino, _end, terrno);
1,319!
1554

1555
      void* tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &value);
1,319✔
1556
      QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
1,319!
1557

1558
      value.val = taosMemoryCalloc(1, pColData->info.bytes);
1,319✔
1559
      QUERY_CHECK_NULL(value.val, code, lino, _end, terrno);
1,319!
1560

1561
      tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &value);
1,319✔
1562
      QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
1,319!
1563
    }
1564
  }
1565
  pFillInfo->pLinearInfo->winIndex = 0;
460✔
1566

1567
  pFillInfo->pNonFillRow = NULL;
460✔
1568
  pFillInfo->pResRow = NULL;
460✔
1569
  if (pFillSup->type == TSDB_FILL_SET_VALUE || pFillSup->type == TSDB_FILL_SET_VALUE_F ||
460✔
1570
      pFillSup->type == TSDB_FILL_NULL || pFillSup->type == TSDB_FILL_NULL_F) {
371✔
1571
    pFillInfo->pResRow = taosMemoryCalloc(1, sizeof(SResultRowData));
207✔
1572
    QUERY_CHECK_NULL(pFillInfo->pResRow, code, lino, _end, terrno);
207!
1573

1574
    pFillInfo->pResRow->key = INT64_MIN;
207✔
1575
    pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
207✔
1576
    if (!pFillInfo->pResRow->pRowVal) {
207!
1577
      code = terrno;
×
1578
      QUERY_CHECK_CODE(code, lino, _end);
×
1579
    }
1580

1581
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
3,513✔
1582
      SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
3,306✔
1583
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, i);
3,306✔
1584
      if (pColData == NULL) {
3,306✔
1585
        pCell->bytes = 1;
2✔
1586
        pCell->type = 4;
2✔
1587
        continue;
2✔
1588
      }
1589
      pCell->bytes = pColData->info.bytes;
3,304✔
1590
      pCell->type = pColData->info.type;
3,304✔
1591
    }
1592

1593
    pFillInfo->pNonFillRow = taosMemoryCalloc(1, sizeof(SResultRowData));
207✔
1594
    QUERY_CHECK_NULL(pFillInfo->pNonFillRow, code, lino, _end, terrno);
207!
1595
    pFillInfo->pNonFillRow->key = INT64_MIN;
207✔
1596
    pFillInfo->pNonFillRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
207✔
1597
    memcpy(pFillInfo->pNonFillRow->pRowVal, pFillInfo->pResRow->pRowVal, pFillSup->rowSize);
207✔
1598
  }
1599

1600
  pFillInfo->type = pFillSup->type;
460✔
1601
  pFillInfo->delRanges = taosArrayInit(16, sizeof(STimeRange));
460✔
1602
  if (!pFillInfo->delRanges) {
460!
1603
    code = terrno;
×
1604
    QUERY_CHECK_CODE(code, lino, _end);
×
1605
  }
1606

1607
  pFillInfo->delIndex = 0;
460✔
1608
  pFillInfo->curGroupId = 0;
460✔
1609
  pFillInfo->hasNext = false;
460✔
1610
  return pFillInfo;
460✔
1611

1612
_end:
×
1613
  if (code != TSDB_CODE_SUCCESS) {
×
1614
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1615
  }
1616
  destroyStreamFillInfo(pFillInfo);
×
1617
  return NULL;
×
1618
}
1619

1620
static void setValueForFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
437✔
1621
  if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F) {
437✔
1622
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
1,425✔
1623
      SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
1,342✔
1624
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
1,342✔
1625
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
1,342✔
1626
      SVariant*        pVar = &(pFillCol->fillVal);
1,342✔
1627
      if (pCell->type == TSDB_DATA_TYPE_FLOAT) {
1,342!
1628
        float v = 0;
×
1629
        GET_TYPED_DATA(v, float, pVar->nType, &pVar->i);
×
1630
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
×
1631
      } else if (IS_FLOAT_TYPE(pCell->type)) {
1,790!
1632
        double v = 0;
448✔
1633
        GET_TYPED_DATA(v, double, pVar->nType, &pVar->i);
448!
1634
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
448!
1635
      } else if (IS_INTEGER_TYPE(pCell->type)) {
1,705!
1636
        int64_t v = 0;
811✔
1637
        GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i);
811!
1638
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
811!
1639
      } else {
1640
        pCell->isNull = true;
83✔
1641
      }
1642
    }
1643
  } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
354✔
1644
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
2,042✔
1645
      SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
1,926✔
1646
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
1,926✔
1647
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
1,926✔
1648
      pCell->isNull = true;
1,926✔
1649
    }
1650
  }
1651
}
437✔
1652

1653
int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInterval* pInterval, SStreamAggSupporter** ppAggSup) {
437✔
1654
  int32_t code = TSDB_CODE_SUCCESS;
437✔
1655
  int32_t lino = 0;
437✔
1656
  if (IS_NORMAL_INTERVAL_OP(downstream)) {
437✔
1657
    SStreamIntervalOperatorInfo* pInfo = downstream->info;
407✔
1658
    *triggerType = pInfo->twAggSup.calTrigger;
407✔
1659
    *pInterval = pInfo->interval;
407✔
1660
    (*ppAggSup) = NULL;
407✔
1661
  } else if (IS_CONTINUE_INTERVAL_OP(downstream)) {
30!
1662
    SStreamIntervalSliceOperatorInfo* pInfo = downstream->info;
30✔
1663
    *triggerType = pInfo->twAggSup.calTrigger;
30✔
1664
    *pInterval = pInfo->interval;
30✔
1665
    pInfo->hasFill = true;
30✔
1666
    (*ppAggSup) = &pInfo->streamAggSup;
30✔
1667
    pInfo->streamAggSup.stateStore.streamStateSetFillInfo(pInfo->streamAggSup.pState);
30✔
1668
  } else {
1669
    code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1670
  }
1671
  QUERY_CHECK_CODE(code, lino, _end);
437!
1672
  
1673
_end:
437✔
1674
  if (code != TSDB_CODE_SUCCESS) {
437!
1675
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1676
  }
1677
  return code;
437✔
1678
}
1679

1680
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode,
437✔
1681
                                     SExecTaskInfo* pTaskInfo, SOperatorInfo** pOptrInfo) {
1682
  QRY_PARAM_CHECK(pOptrInfo);
437!
1683

1684
  int32_t                  code = TSDB_CODE_SUCCESS;
437✔
1685
  int32_t                  lino = 0;
437✔
1686
  SStreamFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFillOperatorInfo));
437✔
1687
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
437✔
1688
  if (pInfo == NULL || pOperator == NULL) {
437!
1689
    code = terrno;
×
1690
    QUERY_CHECK_CODE(code, lino, _error);
×
1691
  }
1692

1693
  int32_t    numOfFillCols = 0;
437✔
1694
  SExprInfo* pFillExprInfo = NULL;
437✔
1695

1696
  code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols);
437✔
1697
  QUERY_CHECK_CODE(code, lino, _error);
437!
1698

1699
  code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore);
437✔
1700
  QUERY_CHECK_CODE(code, lino, _error);
437!
1701

1702
  pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
437✔
1703
  QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno);
437!
1704

1705
  int8_t triggerType = 0;
437✔
1706
  SInterval interval = {0};
437✔
1707
  code = getDownStreamInfo(downstream, &triggerType, &interval, &pInfo->pStreamAggSup);
437✔
1708
  QUERY_CHECK_CODE(code, lino, _error);
437!
1709

1710
  pInfo->pFillSup = initStreamFillSup(pPhyFillNode, &interval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI,
437✔
1711
                                      pInfo->pSrcBlock);
1712
  if (!pInfo->pFillSup) {
437!
1713
    code = TSDB_CODE_FAILED;
×
1714
    QUERY_CHECK_CODE(code, lino, _error);
×
1715
  }
1716

1717
  initResultSizeInfo(&pOperator->resultInfo, 4096);
437✔
1718
  pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
437✔
1719
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
437!
1720

1721
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
437✔
1722
  QUERY_CHECK_CODE(code, lino, _error);
437!
1723

1724
  code = blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity);
437✔
1725
  QUERY_CHECK_CODE(code, lino, _error);
437!
1726

1727
  pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pInfo->pRes);
437✔
1728
  if (!pInfo->pFillInfo) {
437!
1729
    goto _error;
×
1730
  }
1731

1732
  setValueForFillInfo(pInfo->pFillSup, pInfo->pFillInfo);
437✔
1733

1734
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
437✔
1735
  QUERY_CHECK_CODE(code, lino, _error);
437!
1736

1737
  code = blockDataEnsureCapacity(pInfo->pDelRes, pOperator->resultInfo.capacity);
437✔
1738
  QUERY_CHECK_CODE(code, lino, _error);
437!
1739

1740
  pInfo->pUpdated = taosArrayInit(1024, sizeof(SWinKey));
437✔
1741
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _error, terrno);
437!
1742

1743
  pInfo->pCloseTs = taosArrayInit(1024, sizeof(TSKEY));
437✔
1744
  QUERY_CHECK_NULL(pInfo->pCloseTs, code, lino, _error, terrno);
437!
1745

1746
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
437✔
1747
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
437✔
1748

1749
  int32_t numOfOutputCols = 0;
437✔
1750
  code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
437✔
1751
                             COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
1752
  QUERY_CHECK_CODE(code, lino, _error);
437!
1753

1754
  code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
437✔
1755
  QUERY_CHECK_CODE(code, lino, _error);
437!
1756

1757
  pInfo->srcRowIndex = -1;
437✔
1758
  setOperatorInfo(pOperator, "StreamFillOperator", QUERY_NODE_PHYSICAL_PLAN_STREAM_FILL, false, OP_NOT_OPENED, pInfo,
437✔
1759
                  pTaskInfo);
1760

1761
  if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
437✔
1762
    pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo,
30✔
1763
                                           optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1764
  } else {
1765
    pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo,
407✔
1766
                                           optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1767
  }
1768
  setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
437✔
1769

1770
  code = appendDownstream(pOperator, &downstream, 1);
437✔
1771
  QUERY_CHECK_CODE(code, lino, _error);
437!
1772

1773
  *pOptrInfo = pOperator;
437✔
1774
  return TSDB_CODE_SUCCESS;
437✔
1775

1776
_error:
×
1777
  qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1778

1779
  if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo);
×
1780
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
1781
  pTaskInfo->code = code;
×
1782
  return code;
×
1783
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc