• Home
  • Features
  • Pricing
  • Docs
  • Announcements
  • Sign In

taosdata / TDengine / #3660

15 Mar 2025 09:06AM UTC coverage: 62.039% (-1.3%) from 63.314%
#3660

push

travis-ci

web-flow
feat(stream): support stream processing for virtual tables (#30144)

* enh: add client processing

* enh: add mnode vtables processing

* enh: add mnode vtable processing

* enh: add normal child vtable support

* fix: compile issues

* fix: compile issues

* fix: create stream issues

* fix: multi stream scan issue

* fix: remove debug info

* fix: agg task and task level issues

* fix: correct task output type

* fix: split vtablescan from agg

* fix: memory leak issues

* fix: add limitations

* Update 09-error-code.md

* Update 09-error-code.md

* fix: remove usless case

* feat(stream): extract original table data in source scan task

Implemented functionality in the source task to extract data
corresponding to the virtual table from the original table using WAL.
The extracted data is then sent to the downstream merge task for further
processing.

* feat(stream): multi-way merge using loser tree in virtual merge task

Implemented multi-way merge in the merge task using a loser tree to
combine data from multiple original table into a single virtual table.
The merged virtual table data is then pushed downstream for further
processing.  Introduced memory limit handling during the merge process
with configurable behavior when the memory limit is reached.

* fix(test): remove useless cases

---------

Co-authored-by: dapan1121 <wpan@taosdata.com>
Co-authored-by: Pan Wei <72057773+dapan1121@users.noreply.github.com>

154078 of 317582 branches covered (48.52%)

Branch coverage included in aggregate %.

313 of 2391 new or added lines in 34 files covered. (13.09%)

26134 existing lines in 205 files now uncovered.

240261 of 318051 relevant lines covered (75.54%)

16655189.27 hits per line

Source File
Press 'n' to go to next uncovered line, 'b' for previous

74.48
/source/libs/executor/src/streamfilloperator.c
1
/*
2
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
3
 *
4
 * This program is free software: you can use, redistribute, and/or modify
5
 * it under the terms of the GNU Affero General Public License, version 3
6
 * or later ("AGPL"), as published by the Free Software Foundation.
7
 *
8
 * This program is distributed in the hope that it will be useful, but WITHOUT
9
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
10
 * FITNESS FOR A PARTICULAR PURPOSE.
11
 *
12
 * You should have received a copy of the GNU Affero General Public License
13
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
14
 */
15

16
#include "filter.h"
17
#include "os.h"
18
#include "query.h"
19
#include "taosdef.h"
20
#include "tmsg.h"
21
#include "ttypes.h"
22

23
#include "executorInt.h"
24
#include "streamexecutorInt.h"
25
#include "streaminterval.h"
26
#include "tcommon.h"
27
#include "thash.h"
28
#include "ttime.h"
29

30
#include "function.h"
31
#include "operator.h"
32
#include "querynodes.h"
33
#include "querytask.h"
34
#include "tdatablock.h"
35
#include "tfill.h"
36

37
#define FILL_POS_INVALID 0
38
#define FILL_POS_START   1
39
#define FILL_POS_MID     2
40
#define FILL_POS_END     3
41

42
TSKEY getNextWindowTs(TSKEY ts, SInterval* pInterval) {
55✔
43
  STimeWindow win = {.skey = ts, .ekey = ts};
55✔
44
  getNextTimeWindow(pInterval, &win, TSDB_ORDER_ASC);
55✔
45
  return win.skey;
55✔
46
}
47

48
TSKEY getPrevWindowTs(TSKEY ts, SInterval* pInterval) {
30✔
49
  STimeWindow win = {.skey = ts, .ekey = ts};
30✔
50
  getNextTimeWindow(pInterval, &win, TSDB_ORDER_DESC);
30✔
51
  return win.skey;
30✔
52
}
53

54
int32_t setRowCell(SColumnInfoData* pCol, int32_t rowId, const SResultCellData* pCell) {
411,421✔
55
  return colDataSetVal(pCol, rowId, pCell->pData, pCell->isNull);
411,421✔
56
}
57

58
SResultCellData* getResultCell(SResultRowData* pRaw, int32_t index) {
162,870✔
59
  if (!pRaw || !pRaw->pRowVal) {
162,870!
UNCOV
60
    return NULL;
×
61
  }
62
  char*            pData = (char*)pRaw->pRowVal;
162,906✔
63
  SResultCellData* pCell = pRaw->pRowVal;
162,906✔
64
  for (int32_t i = 0; i < index; i++) {
1,129,739✔
65
    pData += (pCell->bytes + sizeof(SResultCellData));
966,833✔
66
    pCell = (SResultCellData*)pData;
966,833✔
67
  }
68
  return pCell;
162,906✔
69
}
70

71
void* destroyFillColumnInfo(SFillColInfo* pFillCol, int32_t start, int32_t end) {
672✔
72
  for (int32_t i = start; i < end; i++) {
1,164✔
73
    destroyExprInfo(pFillCol[i].pExpr, 1);
492✔
74
    taosVariantDestroy(&pFillCol[i].fillVal);
492✔
75
  }
76
  if (start < end) {
672✔
77
    taosMemoryFreeClear(pFillCol[start].pExpr);
451!
78
  }
79
  taosMemoryFree(pFillCol);
672!
80
  return NULL;
672✔
81
}
82

83
void destroyStreamFillSupporter(SStreamFillSupporter* pFillSup) {
672✔
84
  if (pFillSup == NULL) {
672!
UNCOV
85
    return;
×
86
  }
87
  pFillSup->pAllColInfo = destroyFillColumnInfo(pFillSup->pAllColInfo, pFillSup->numOfFillCols, pFillSup->numOfAllCols);
672✔
88
  tSimpleHashCleanup(pFillSup->pResMap);
672✔
89
  pFillSup->pResMap = NULL;
672✔
90
  cleanupExprSupp(&pFillSup->notFillExprSup);
672✔
91
  if (pFillSup->cur.pRowVal != pFillSup->prev.pRowVal && pFillSup->cur.pRowVal != pFillSup->next.pRowVal) {
672✔
92
    taosMemoryFree(pFillSup->cur.pRowVal);
169!
93
  }
94
  taosMemoryFree(pFillSup->prev.pRowVal);
672!
95
  taosMemoryFree(pFillSup->next.pRowVal);
672!
96
  taosMemoryFree(pFillSup->nextNext.pRowVal);
672!
97

98
  taosMemoryFree(pFillSup->pOffsetInfo);
672!
99
  taosArrayDestroy(pFillSup->pResultRange);
672✔
100
  pFillSup->pResultRange = NULL;
672✔
101

102
  taosMemoryFree(pFillSup);
672!
103
}
104

105
void destroySPoint(void* ptr) {
104,092✔
106
  SPoint* point = (SPoint*)ptr;
104,092✔
107
  taosMemoryFreeClear(point->val);
104,092!
108
}
104,106✔
109

110
void destroyStreamFillLinearInfo(SStreamFillLinearInfo* pFillLinear) {
672✔
111
  taosArrayDestroyEx(pFillLinear->pEndPoints, destroySPoint);
672✔
112
  taosArrayDestroyEx(pFillLinear->pNextEndPoints, destroySPoint);
672✔
113
  taosMemoryFree(pFillLinear);
672!
114
}
672✔
115

116
void destroyStreamFillInfo(SStreamFillInfo* pFillInfo) {
672✔
117
  if (pFillInfo == NULL) {
672!
UNCOV
118
    return;
×
119
  } 
120
  if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F ||
672✔
121
      pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
534✔
122
    taosMemoryFreeClear(pFillInfo->pResRow->pRowVal);
303!
123
    taosMemoryFreeClear(pFillInfo->pResRow);
303!
124
    taosMemoryFreeClear(pFillInfo->pNonFillRow->pRowVal);
303!
125
    taosMemoryFreeClear(pFillInfo->pNonFillRow);
303!
126
  }
127
  destroyStreamFillLinearInfo(pFillInfo->pLinearInfo);
672✔
128
  pFillInfo->pLinearInfo = NULL;
672✔
129

130
  taosArrayDestroy(pFillInfo->delRanges);
672✔
131
  taosMemoryFreeClear(pFillInfo->pTempBuff);
672!
132
  taosMemoryFree(pFillInfo);
672!
133
}
134

135
void destroyStreamFillOperatorInfo(void* param) {
451✔
136
  SStreamFillOperatorInfo* pInfo = (SStreamFillOperatorInfo*)param;
451✔
137
  destroyStreamFillInfo(pInfo->pFillInfo);
451✔
138
  destroyStreamFillSupporter(pInfo->pFillSup);
451✔
139
  blockDataDestroy(pInfo->pRes);
451✔
140
  pInfo->pRes = NULL;
451✔
141
  blockDataDestroy(pInfo->pSrcBlock);
451✔
142
  pInfo->pSrcBlock = NULL;
451✔
143
  blockDataDestroy(pInfo->pDelRes);
451✔
144
  pInfo->pDelRes = NULL;
451✔
145
  taosArrayDestroy(pInfo->matchInfo.pList);
451✔
146
  pInfo->matchInfo.pList = NULL;
451✔
147
  taosArrayDestroy(pInfo->pUpdated);
451✔
148
  clearGroupResInfo(&pInfo->groupResInfo);
451✔
149
  taosArrayDestroy(pInfo->pCloseTs);
451✔
150

151
  if (pInfo->stateStore.streamFileStateDestroy != NULL) {
451✔
152
    pInfo->stateStore.streamFileStateDestroy(pInfo->pState->pFileState);
44✔
153
  }
154

155
  if (pInfo->pState != NULL) {
451✔
156
    taosMemoryFreeClear(pInfo->pState);
44!
157
  }
158
  destroyStreamBasicInfo(&pInfo->basic);
451✔
159
  destroyNonBlockAggSupptor(&pInfo->nbSup);
451✔
160

161
  taosMemoryFree(pInfo);
451!
162
}
451✔
163

164
static void resetFillWindow(SResultRowData* pRowData) {
7,925✔
165
  pRowData->key = INT64_MIN;
7,925✔
166
  taosMemoryFreeClear(pRowData->pRowVal);
7,925!
167
}
7,927✔
168

169
static void resetPrevAndNextWindow(SStreamFillSupporter* pFillSup) {
1,801✔
170
  if (pFillSup->cur.pRowVal != pFillSup->prev.pRowVal && pFillSup->cur.pRowVal != pFillSup->next.pRowVal) {
1,801✔
171
    resetFillWindow(&pFillSup->cur);
1,364✔
172
  } else {
173
    pFillSup->cur.key = INT64_MIN;
437✔
174
    pFillSup->cur.pRowVal = NULL;
437✔
175
  }
176
  resetFillWindow(&pFillSup->prev);
1,800✔
177
  resetFillWindow(&pFillSup->next);
1,801✔
178
  resetFillWindow(&pFillSup->nextNext);
1,801✔
179
}
1,801✔
180

181
void getWindowFromDiscBuf(SOperatorInfo* pOperator, TSKEY ts, uint64_t groupId, SStreamFillSupporter* pFillSup) {
1,801✔
182
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
1,801✔
183
  void*        pState = pOperator->pTaskInfo->streamInfo.pState;
1,801✔
184
  resetPrevAndNextWindow(pFillSup);
1,801✔
185

186
  SWinKey key = {.ts = ts, .groupId = groupId};
1,801✔
187
  void*   curVal = NULL;
1,801✔
188
  int32_t curVLen = 0;
1,801✔
189
  bool    hasCurKey = true;
1,801✔
190
  int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&curVal, &curVLen, NULL);
1,801✔
191
  if (code == TSDB_CODE_SUCCESS) {
1,799✔
192
    pFillSup->cur.key = key.ts;
1,744✔
193
    pFillSup->cur.pRowVal = curVal;
1,744✔
194
  } else {
195
    qDebug("streamStateFillGet key failed, Data may be deleted. ts:%" PRId64 ", groupId:%" PRId64, ts, groupId);
55✔
196
    pFillSup->cur.key = ts;
55✔
197
    pFillSup->cur.pRowVal = NULL;
55✔
198
    hasCurKey = false;
55✔
199
  }
200

201
  SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyPrev(pState, &key);
1,799✔
202
  SWinKey          preKey = {.ts = INT64_MIN, .groupId = groupId};
1,801✔
203
  void*            preVal = NULL;
1,801✔
204
  int32_t          preVLen = 0;
1,801✔
205
  code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &preKey, (const void**)&preVal, &preVLen);
1,801✔
206

207
  if (code == TSDB_CODE_SUCCESS) {
1,798✔
208
    pFillSup->prev.key = preKey.ts;
1,411✔
209
    pFillSup->prev.pRowVal = preVal;
1,411✔
210

211
    if (hasCurKey) {
1,411✔
212
      pAPI->stateStore.streamStateCurNext(pState, pCur);
1,356✔
213
    }
214

215
    pAPI->stateStore.streamStateCurNext(pState, pCur);
1,411✔
216
  } else {
217
    pAPI->stateStore.streamStateFreeCur(pCur);
387✔
218
    pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key);
390✔
219
  }
220

221
  SWinKey nextKey = {.ts = INT64_MIN, .groupId = groupId};
1,801✔
222
  void*   nextVal = NULL;
1,801✔
223
  int32_t nextVLen = 0;
1,801✔
224
  code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextKey, (const void**)&nextVal, &nextVLen);
1,801✔
225
  if (code == TSDB_CODE_SUCCESS) {
1,800✔
226
    pFillSup->next.key = nextKey.ts;
1,014✔
227
    pFillSup->next.pRowVal = nextVal;
1,014✔
228
    if (pFillSup->type == TSDB_FILL_PREV || pFillSup->type == TSDB_FILL_NEXT) {
1,014✔
229
      pAPI->stateStore.streamStateCurNext(pState, pCur);
389✔
230
      SWinKey nextNextKey = {.groupId = groupId};
389✔
231
      void*   nextNextVal = NULL;
389✔
232
      int32_t nextNextVLen = 0;
389✔
233
      code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextNextKey, (const void**)&nextNextVal, &nextNextVLen);
389✔
234
      if (code == TSDB_CODE_SUCCESS) {
389✔
235
        pFillSup->nextNext.key = nextNextKey.ts;
110✔
236
        pFillSup->nextNext.pRowVal = nextNextVal;
110✔
237
      }
238
    }
239
  }
240
  pAPI->stateStore.streamStateFreeCur(pCur);
1,800✔
241
}
1,800✔
242

UNCOV
243
bool hasCurWindow(SStreamFillSupporter* pFillSup) { return pFillSup->cur.key != INT64_MIN; }
×
244
bool hasPrevWindow(SStreamFillSupporter* pFillSup) { return pFillSup->prev.key != INT64_MIN; }
8,258✔
245
bool hasNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->next.key != INT64_MIN; }
5,889✔
246
static bool hasNextNextWindow(SStreamFillSupporter* pFillSup) { return pFillSup->nextNext.key != INT64_MIN; }
147✔
247

248
static void transBlockToResultRow(const SSDataBlock* pBlock, int32_t rowId, TSKEY ts, SResultRowData* pRowVal) {
1,783✔
249
  int32_t numOfCols = taosArrayGetSize(pBlock->pDataBlock);
1,783✔
250
  for (int32_t i = 0; i < numOfCols; ++i) {
29,102✔
251
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, i);
27,322✔
252
    SResultCellData* pCell = getResultCell(pRowVal, i);
27,320✔
253
    if (!colDataIsNull_s(pColData, rowId)) {
54,636!
254
      pCell->isNull = false;
27,319✔
255
      pCell->type = pColData->info.type;
27,319✔
256
      pCell->bytes = pColData->info.bytes;
27,319✔
257
      char* val = colDataGetData(pColData, rowId);
27,319!
258
      if (IS_VAR_DATA_TYPE(pCell->type)) {
27,319!
259
        memcpy(pCell->pData, val, varDataTLen(val));
33✔
260
      } else {
261
        memcpy(pCell->pData, val, pCell->bytes);
27,286✔
262
      }
263
    } else {
UNCOV
264
      pCell->isNull = true;
×
265
    }
266
  }
267
  pRowVal->key = ts;
1,780✔
268
}
1,780✔
269

270
static void calcRowDeltaData(SResultRowData* pEndRow, SArray* pEndPoins, SFillColInfo* pFillCol, int32_t numOfCol) {
279✔
271
  for (int32_t i = 0; i < numOfCol; i++) {
4,687✔
272
    if (!pFillCol[i].notFillCol) {
4,407✔
273
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol + i);
4,138✔
274
      SResultCellData* pECell = getResultCell(pEndRow, slotId);
4,138✔
275
      SPoint*          pPoint = taosArrayGet(pEndPoins, slotId);
4,150✔
276
      pPoint->key = pEndRow->key;
4,139✔
277
      memcpy(pPoint->val, pECell->pData, pECell->bytes);
4,139✔
278
    }
279
  }
280
}
280✔
281

282
static void setFillInfoStart(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
1,378✔
283
  ts = taosTimeAdd(ts, pInterval->sliding, pInterval->slidingUnit, pInterval->precision, NULL);
1,378✔
284
  pFillInfo->start = ts;
1,383✔
285
}
1,383✔
286

287
static void setFillInfoEnd(TSKEY ts, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
1,384✔
288
  ts = taosTimeAdd(ts, pInterval->sliding * -1, pInterval->slidingUnit, pInterval->precision, NULL);
1,384✔
289
  pFillInfo->end = ts;
1,383✔
290
}
1,383✔
291

292
static void setFillKeyInfo(TSKEY start, TSKEY end, SInterval* pInterval, SStreamFillInfo* pFillInfo) {
1,381✔
293
  setFillInfoStart(start, pInterval, pFillInfo);
1,381✔
294
  pFillInfo->current = pFillInfo->start;
1,383✔
295
  setFillInfoEnd(end, pInterval, pFillInfo);
1,383✔
296
}
1,383✔
297

298
void setDeleteFillValueInfo(TSKEY start, TSKEY end, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
200✔
299
  if (!hasPrevWindow(pFillSup) || !hasNextWindow(pFillSup)) {
200✔
300
    pFillInfo->needFill = false;
90✔
301
    return;
90✔
302
  }
303

304
  TSKEY realStart = taosTimeAdd(pFillSup->prev.key, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
110✔
305
                                pFillSup->interval.precision, NULL);
110✔
306

307
  pFillInfo->needFill = true;
110✔
308
  pFillInfo->start = realStart;
110✔
309
  pFillInfo->current = pFillInfo->start;
110✔
310
  pFillInfo->end = end;
110✔
311
  pFillInfo->pos = FILL_POS_INVALID;
110✔
312
  switch (pFillInfo->type) {
110!
313
    case TSDB_FILL_NULL:
44✔
314
    case TSDB_FILL_NULL_F:
315
    case TSDB_FILL_SET_VALUE:
316
    case TSDB_FILL_SET_VALUE_F:
317
      break;
44✔
318
    case TSDB_FILL_PREV:
22✔
319
      pFillInfo->pResRow = &pFillSup->prev;
22✔
320
      break;
22✔
321
    case TSDB_FILL_NEXT:
22✔
322
      pFillInfo->pResRow = &pFillSup->next;
22✔
323
      break;
22✔
324
    case TSDB_FILL_LINEAR: {
22✔
325
      setFillKeyInfo(pFillSup->prev.key, pFillSup->next.key, &pFillSup->interval, pFillInfo);
22✔
326
      pFillInfo->pLinearInfo->hasNext = false;
22✔
327
      pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
22✔
328
      calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
22✔
329
                       pFillSup->numOfAllCols);
330
      pFillInfo->pResRow = &pFillSup->prev;
22✔
331
      pFillInfo->pLinearInfo->winIndex = 0;
22✔
332
    } break;
22✔
UNCOV
333
    default:
×
334
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
×
335
      break;
×
336
  }
337
}
338

339
void copyNotFillExpData(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
761✔
340
  for (int32_t i = pFillSup->numOfFillCols; i < pFillSup->numOfAllCols; ++i) {
1,652✔
341
    SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
891✔
342
    int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
891✔
343
    SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
891✔
344
    SResultCellData* pCurCell = getResultCell(&pFillSup->cur, slotId);
891✔
345
    pCell->isNull = pCurCell->isNull;
891✔
346
    if (!pCurCell->isNull) {
891!
347
      memcpy(pCell->pData, pCurCell->pData, pCell->bytes);
891✔
348
    }
349
  }
350
}
761✔
351

352
void setFillValueInfo(SSDataBlock* pBlock, TSKEY ts, int32_t rowId, SStreamFillSupporter* pFillSup,
1,597✔
353
                      SStreamFillInfo* pFillInfo) {
354
  pFillInfo->preRowKey = pFillSup->cur.key;
1,597✔
355
  if (!hasPrevWindow(pFillSup) && !hasNextWindow(pFillSup)) {
1,597✔
356
    pFillInfo->needFill = false;
276✔
357
    pFillInfo->pos = FILL_POS_START;
276✔
358
    return;
276✔
359
  }
360
  TSKEY prevWKey = INT64_MIN;
1,319✔
361
  TSKEY nextWKey = INT64_MIN;
1,319✔
362
  if (hasPrevWindow(pFillSup)) {
1,319✔
363
    prevWKey = pFillSup->prev.key;
705✔
364
  }
365
  if (hasNextWindow(pFillSup)) {
1,323✔
366
    nextWKey = pFillSup->next.key;
871✔
367
  }
368

369
  pFillInfo->needFill = true;
1,322✔
370
  pFillInfo->pos = FILL_POS_INVALID;
1,322✔
371
  switch (pFillInfo->type) {
1,322✔
372
    case TSDB_FILL_NULL:
583✔
373
    case TSDB_FILL_NULL_F:
374
    case TSDB_FILL_SET_VALUE:
375
    case TSDB_FILL_SET_VALUE_F: {
376
      if (pFillSup->prev.key == pFillInfo->preRowKey) {
583!
UNCOV
377
        resetFillWindow(&pFillSup->prev);
×
378
      }
379
      if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) {
583✔
380
        if (pFillSup->next.key == pFillInfo->nextRowKey) {
122✔
381
          pFillInfo->preRowKey = INT64_MIN;
120✔
382
          setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
120✔
383
          pFillInfo->pos = FILL_POS_END;
120✔
384
        } else {
385
          pFillInfo->needFill = false;
2✔
386
          pFillInfo->pos = FILL_POS_START;
2✔
387
        }
388
      } else if (hasPrevWindow(pFillSup)) {
461✔
389
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
202✔
390
        pFillInfo->pos = FILL_POS_END;
202✔
391
      } else {
392
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
259✔
393
        pFillInfo->pos = FILL_POS_START;
259✔
394
      }
395
      copyNotFillExpData(pFillSup, pFillInfo);
583✔
396
    } break;
583✔
397
    case TSDB_FILL_PREV: {
268✔
398
      if (hasNextWindow(pFillSup) && ((pFillSup->next.key != pFillInfo->nextRowKey) ||
268✔
399
                                      (pFillSup->next.key == pFillInfo->nextRowKey && hasNextNextWindow(pFillSup)) ||
147!
400
                                      (pFillSup->next.key == pFillInfo->nextRowKey && !hasPrevWindow(pFillSup)))) {
123!
401
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
126✔
402
        pFillInfo->pos = FILL_POS_START;
127✔
403
        resetFillWindow(&pFillSup->prev);
127✔
404
        pFillSup->prev.key = pFillSup->cur.key;
127✔
405
        pFillSup->prev.pRowVal = pFillSup->cur.pRowVal;
127✔
406
      } else if (hasPrevWindow(pFillSup)) {
141!
407
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
141✔
408
        pFillInfo->pos = FILL_POS_END;
141✔
409
        pFillInfo->preRowKey = INT64_MIN;
141✔
410
      }
411
      pFillInfo->pResRow = &pFillSup->prev;
268✔
412
    } break;
268✔
413
    case TSDB_FILL_NEXT: {
253✔
414
      if (hasPrevWindow(pFillSup)) {
253✔
415
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
140✔
416
        pFillInfo->pos = FILL_POS_END;
141✔
417
        resetFillWindow(&pFillSup->next);
141✔
418
        pFillSup->next.key = pFillSup->cur.key;
140✔
419
        pFillSup->next.pRowVal = pFillSup->cur.pRowVal;
140✔
420
        pFillInfo->preRowKey = INT64_MIN;
140✔
421
      } else {
422
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
113✔
423
        pFillInfo->pos = FILL_POS_START;
114✔
424
      }
425
      pFillInfo->pResRow = &pFillSup->next;
254✔
426
    } break;
254✔
427
    case TSDB_FILL_LINEAR: {
217✔
428
      pFillInfo->pLinearInfo->winIndex = 0;
217✔
429
      if (hasPrevWindow(pFillSup) && hasNextWindow(pFillSup)) {
217✔
430
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
39✔
431
        pFillInfo->pos = FILL_POS_MID;
39✔
432
        pFillInfo->pLinearInfo->nextEnd = nextWKey;
39✔
433
        calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
39✔
434
                         pFillSup->numOfAllCols);
435
        pFillInfo->pResRow = &pFillSup->prev;
39✔
436

437
        calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pNextEndPoints, pFillSup->pAllColInfo,
39✔
438
                         pFillSup->numOfAllCols);
439
        pFillInfo->pLinearInfo->hasNext = true;
39✔
440
      } else if (hasPrevWindow(pFillSup)) {
178✔
441
        setFillKeyInfo(prevWKey, ts, &pFillSup->interval, pFillInfo);
58✔
442
        pFillInfo->pos = FILL_POS_END;
58✔
443
        pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
58✔
444
        calcRowDeltaData(&pFillSup->cur, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
58✔
445
                         pFillSup->numOfAllCols);
446
        pFillInfo->pResRow = &pFillSup->prev;
58✔
447
        pFillInfo->pLinearInfo->hasNext = false;
58✔
448
      } else {
449
        setFillKeyInfo(ts, nextWKey, &pFillSup->interval, pFillInfo);
120✔
450
        pFillInfo->pos = FILL_POS_START;
120✔
451
        pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
120✔
452
        calcRowDeltaData(&pFillSup->next, pFillInfo->pLinearInfo->pEndPoints, pFillSup->pAllColInfo,
120✔
453
                         pFillSup->numOfAllCols);
454
        pFillInfo->pResRow = &pFillSup->cur;
121✔
455
        pFillInfo->pLinearInfo->hasNext = false;
121✔
456
      }
457
    } break;
218✔
458
    default:
1✔
459
      qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR));
1!
460
      break;
×
461
  }
462
}
463

464
int32_t checkResult(SStreamFillSupporter* pFillSup, TSKEY ts, uint64_t groupId, bool* pRes) {
202,692✔
465
  int32_t code = TSDB_CODE_SUCCESS;
202,692✔
466
  int32_t lino = 0;
202,692✔
467
  SWinKey key = {.groupId = groupId, .ts = ts};
202,692✔
468
  if (tSimpleHashGet(pFillSup->pResMap, &key, sizeof(SWinKey)) != NULL) {
202,692✔
469
    (*pRes) = false;
50,206✔
470
    goto _end;
50,206✔
471
  }
472
  code = tSimpleHashPut(pFillSup->pResMap, &key, sizeof(SWinKey), NULL, 0);
152,949✔
473
  QUERY_CHECK_CODE(code, lino, _end);
153,045!
474
  (*pRes) = true;
153,045✔
475

476
_end:
203,251✔
477
  if (code != TSDB_CODE_SUCCESS) {
203,251!
UNCOV
478
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
479
  }
480
  return code;
203,248✔
481
}
482

483
static int32_t buildFillResult(SResultRowData* pResRow, SStreamFillSupporter* pFillSup, TSKEY ts, SSDataBlock* pBlock,
37,310✔
484
                               bool* pRes, bool isFilld) {
485
  int32_t code = TSDB_CODE_SUCCESS;
37,310✔
486
  int32_t lino = 0;
37,310✔
487
  if (pBlock->info.rows >= pBlock->info.capacity) {
37,310✔
488
    (*pRes) = false;
4✔
489
    goto _end;
4✔
490
  }
491
  uint64_t groupId = pBlock->info.id.groupId;
37,306✔
492
  bool     ckRes = true;
37,306✔
493
  code = checkResult(pFillSup, ts, groupId, &ckRes);
37,306✔
494
  QUERY_CHECK_CODE(code, lino, _end);
37,311!
495

496
  if (pFillSup->hasDelete && !ckRes) {
37,311✔
497
    (*pRes) = true;
28✔
498
    goto _end;
28✔
499
  }
500
  for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
162,397✔
501
    SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
125,118✔
502
    int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
125,118✔
503
    SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
125,118✔
504
    SFillInfo        tmpInfo = {
125,117✔
505
               .currentKey = ts,
506
               .order = TSDB_ORDER_ASC,
507
               .interval = pFillSup->interval,
508
               .isFilled = isFilld,
509
    };
510
    bool filled = fillIfWindowPseudoColumn(&tmpInfo, pFillCol, pColData, pBlock->info.rows);
125,117✔
511
    if (!filled) {
125,254✔
512
      SResultCellData* pCell = getResultCell(pResRow, slotId);
88,018✔
513
      code = setRowCell(pColData, pBlock->info.rows, pCell);
87,935✔
514
      QUERY_CHECK_CODE(code, lino, _end);
87,878!
515
    }
516
  }
517
  pBlock->info.rows++;
37,279✔
518
  (*pRes) = true;
37,279✔
519

520
_end:
37,311✔
521
  if (code != TSDB_CODE_SUCCESS) {
37,311!
522
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
523
  }
524
  return code;
37,315✔
525
}
526

527
bool hasRemainCalc(SStreamFillInfo* pFillInfo) {
206,615✔
528
  if (pFillInfo->current != INT64_MIN && pFillInfo->current <= pFillInfo->end) {
206,615✔
529
    return true;
200,010✔
530
  }
531
  return false;
6,605✔
532
}
533

534
static void doStreamFillNormal(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
1,154✔
535
  int32_t code = TSDB_CODE_SUCCESS;
1,154✔
536
  int32_t lino = 0;
1,154✔
537
  while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
36,205✔
538
    STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
35,051✔
539
    if (inWinRange(&pFillSup->winRange, &st)) {
35,051!
540
      bool res = true;
35,050✔
541
      code = buildFillResult(pFillInfo->pResRow, pFillSup, pFillInfo->current, pBlock, &res, true);
35,050✔
542
      QUERY_CHECK_CODE(code, lino, _end);
35,051!
543
    }
544
    pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
35,051✔
545
                                     pFillSup->interval.precision, NULL);
35,051✔
546
  }
547

548
_end:
1,154✔
549
  if (code != TSDB_CODE_SUCCESS) {
1,154!
550
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
551
  }
552
}
1,154✔
553

554
static void doStreamFillLinear(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock) {
270✔
555
  int32_t code = TSDB_CODE_SUCCESS;
270✔
556
  int32_t lino = 0;
270✔
557
  while (hasRemainCalc(pFillInfo) && pBlock->info.rows < pBlock->info.capacity) {
14,835✔
558
    uint64_t    groupId = pBlock->info.id.groupId;
14,565✔
559
    SWinKey     key = {.groupId = groupId, .ts = pFillInfo->current};
14,565✔
560
    STimeWindow st = {.skey = pFillInfo->current, .ekey = pFillInfo->current};
14,565✔
561
    bool        ckRes = true;
14,565✔
562
    code = checkResult(pFillSup, pFillInfo->current, groupId, &ckRes);
14,565✔
563
    QUERY_CHECK_CODE(code, lino, _end);
14,565!
564

565
    if ((pFillSup->hasDelete && !ckRes) || !inWinRange(&pFillSup->winRange, &st)) {
14,565!
566
      pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
16✔
567
                                       pFillSup->interval.precision, NULL);
8✔
568
      pFillInfo->pLinearInfo->winIndex++;
8✔
569
      continue;
8✔
570
    }
571
    pFillInfo->pLinearInfo->winIndex++;
14,557✔
572
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
50,122✔
573
      SFillColInfo* pFillCol = pFillSup->pAllColInfo + i;
35,566✔
574
      SFillInfo     tmp = {
35,566✔
575
              .currentKey = pFillInfo->current,
35,566✔
576
              .order = TSDB_ORDER_ASC,
577
              .interval = pFillSup->interval,
578
              .isFilled = true,
579
      };
580

581
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
35,566✔
582
      SColumnInfoData* pColData = taosArrayGet(pBlock->pDataBlock, slotId);
35,566✔
583
      int16_t          type = pColData->info.type;
35,578✔
584
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
35,578✔
585
      int32_t          index = pBlock->info.rows;
35,571✔
586
      if (pFillCol->notFillCol) {
35,571✔
587
        bool filled = fillIfWindowPseudoColumn(&tmp, pFillCol, pColData, index);
14,557✔
588
        if (!filled) {
14,557✔
589
          code = setRowCell(pColData, index, pCell);
9✔
UNCOV
590
          QUERY_CHECK_CODE(code, lino, _end);
×
591
        }
592
      } else {
593
        if (IS_VAR_DATA_TYPE(type) || type == TSDB_DATA_TYPE_BOOL || pCell->isNull) {
21,014!
594
          colDataSetNULL(pColData, index);
23!
595
          continue;
23✔
596
        }
597
        SPoint* pEnd = taosArrayGet(pFillInfo->pLinearInfo->pEndPoints, slotId);
20,991✔
598
        double  vCell = 0;
20,989✔
599
        SPoint  start = {0};
20,989✔
600
        start.key = pFillInfo->pResRow->key;
20,989✔
601
        start.val = pCell->pData;
20,989✔
602

603
        SPoint cur = {0};
20,989✔
604
        cur.key = pFillInfo->current;
20,989✔
605
        cur.val = taosMemoryCalloc(1, pCell->bytes);
20,989!
606
        QUERY_CHECK_NULL(cur.val, code, lino, _end, terrno);
20,989!
607
        taosGetLinearInterpolationVal(&cur, pCell->type, &start, pEnd, pCell->type, typeGetTypeModFromColInfo(&pColData->info));
20,989✔
608
        code = colDataSetVal(pColData, index, (const char*)cur.val, false);
20,980✔
609
        QUERY_CHECK_CODE(code, lino, _end);
20,986!
610
        destroySPoint(&cur);
20,986✔
611
      }
612
    }
613
    pFillInfo->current = taosTimeAdd(pFillInfo->current, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
29,113✔
614
                                     pFillSup->interval.precision, NULL);
14,556✔
615
    pBlock->info.rows++;
14,557✔
616
  }
617

618
_end:
270✔
619
  if (code != TSDB_CODE_SUCCESS) {
270!
UNCOV
620
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
621
  }
622
}
270✔
623

624
static void keepResultInDiscBuf(SOperatorInfo* pOperator, uint64_t groupId, SResultRowData* pRow, int32_t len) {
1,600✔
625
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
1,600✔
626

627
  SWinKey key = {.groupId = groupId, .ts = pRow->key};
1,600✔
628
  int32_t code = pAPI->stateStore.streamStateFillPut(pOperator->pTaskInfo->streamInfo.pState, &key, pRow->pRowVal, len);
1,600✔
629
  qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 "  code:%d", key.ts, key.groupId, code);
1,601✔
630
  if (code != TSDB_CODE_SUCCESS) {
1,601!
UNCOV
631
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
632
  }
633
}
1,601✔
634

635
void doStreamFillRange(SStreamFillInfo* pFillInfo, SStreamFillSupporter* pFillSup, SSDataBlock* pRes) {
1,662✔
636
  int32_t code = TSDB_CODE_SUCCESS;
1,662✔
637
  int32_t lino = 0;
1,662✔
638
  bool    res = false;
1,662✔
639
  if (pFillInfo->needFill == false) {
1,662✔
640
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res, false);
278✔
641
    QUERY_CHECK_CODE(code, lino, _end);
278!
642
    return;
278✔
643
  }
644

645
  if (pFillInfo->pos == FILL_POS_START) {
1,384✔
646
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res, false);
620✔
647
    QUERY_CHECK_CODE(code, lino, _end);
621!
648
    if (res) {
621!
649
      pFillInfo->pos = FILL_POS_INVALID;
621✔
650
    }
651
  }
652
  if (pFillInfo->type != TSDB_FILL_LINEAR) {
1,385✔
653
    doStreamFillNormal(pFillSup, pFillInfo, pRes);
1,154✔
654
  } else {
655
    doStreamFillLinear(pFillSup, pFillInfo, pRes);
231✔
656

657
    if (pFillInfo->pos == FILL_POS_MID) {
232✔
658
      code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res, false);
39✔
659
      QUERY_CHECK_CODE(code, lino, _end);
39!
660
      if (res) {
39!
661
        pFillInfo->pos = FILL_POS_INVALID;
39✔
662
      }
663
    }
664

665
    if (pFillInfo->current > pFillInfo->end && pFillInfo->pLinearInfo->hasNext) {
232✔
666
      pFillInfo->pLinearInfo->hasNext = false;
39✔
667
      pFillInfo->pLinearInfo->winIndex = 0;
39✔
668
      taosArraySwap(pFillInfo->pLinearInfo->pEndPoints, pFillInfo->pLinearInfo->pNextEndPoints);
39✔
669
      pFillInfo->pResRow = &pFillSup->cur;
39✔
670
      setFillKeyInfo(pFillSup->cur.key, pFillInfo->pLinearInfo->nextEnd, &pFillSup->interval, pFillInfo);
39✔
671
      doStreamFillLinear(pFillSup, pFillInfo, pRes);
39✔
672
    }
673
  }
674
  if (pFillInfo->pos == FILL_POS_END) {
1,385✔
675
    code = buildFillResult(&pFillSup->cur, pFillSup, pFillSup->cur.key, pRes, &res, false);
665✔
676
    QUERY_CHECK_CODE(code, lino, _end);
667!
677
    if (res) {
667✔
678
      pFillInfo->pos = FILL_POS_INVALID;
663✔
679
    }
680
  }
681

682
_end:
724✔
683
  if (code != TSDB_CODE_SUCCESS) {
1,387!
UNCOV
684
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
685
  }
686
}
687

688
int32_t keepBlockRowInDiscBuf(SOperatorInfo* pOperator, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol,
1,598✔
689
                           int32_t rowId, uint64_t groupId, int32_t rowSize) {
690
  int32_t code = TSDB_CODE_SUCCESS;
1,598✔
691
  int32_t lino = 0;
1,598✔
692
  TSKEY ts = tsCol[rowId];
1,598✔
693
  pFillInfo->nextRowKey = ts;
1,598✔
694
  SResultRowData tmpNextRow = {.key = ts};
1,598✔
695
  tmpNextRow.pRowVal = taosMemoryCalloc(1, rowSize);
1,598!
696
  QUERY_CHECK_NULL(tmpNextRow.pRowVal, code, lino, _end, terrno);
1,600!
697
  transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow);
1,600✔
698
  keepResultInDiscBuf(pOperator, groupId, &tmpNextRow, rowSize);
1,601✔
699
  taosMemoryFreeClear(tmpNextRow.pRowVal);
1,599!
700

UNCOV
701
_end:
×
702
  if (code != TSDB_CODE_SUCCESS) {
1,601!
UNCOV
703
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
704
  }
705
  return code;
1,601✔
706
}
707

708
static void doFillResults(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo,
1,601✔
709
                          SSDataBlock* pBlock, TSKEY* tsCol, int32_t rowId, SSDataBlock* pRes) {
710
  uint64_t groupId = pBlock->info.id.groupId;
1,601✔
711
  getWindowFromDiscBuf(pOperator, tsCol[rowId], groupId, pFillSup);
1,601✔
712
  if (pFillSup->prev.key == pFillInfo->preRowKey) {
1,597✔
713
    resetFillWindow(&pFillSup->prev);
894✔
714
  }
715
  setFillValueInfo(pBlock, tsCol[rowId], rowId, pFillSup, pFillInfo);
1,596✔
716
  doStreamFillRange(pFillInfo, pFillSup, pRes);
1,597✔
717
}
1,601✔
718

719
static void doStreamFillImpl(SOperatorInfo* pOperator) {
744✔
720
  int32_t                  code = TSDB_CODE_SUCCESS;
744✔
721
  int32_t                  lino = 0;
744✔
722
  SStreamFillOperatorInfo* pInfo = pOperator->info;
744✔
723
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
744✔
724
  SStreamFillSupporter*    pFillSup = pInfo->pFillSup;
744✔
725
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
744✔
726
  SSDataBlock*             pBlock = pInfo->pSrcBlock;
744✔
727
  uint64_t                 groupId = pBlock->info.id.groupId;
744✔
728
  SSDataBlock*             pRes = pInfo->pRes;
744✔
729
  SColumnInfoData*         pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
744✔
730
  TSKEY*                   tsCol = (TSKEY*)pTsCol->pData;
744✔
731
  pRes->info.id.groupId = groupId;
744✔
732
  pInfo->srcRowIndex++;
744✔
733

734
  if (pInfo->srcRowIndex == 0) {
744✔
735
    code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
739✔
736
    QUERY_CHECK_CODE(code, lino, _end);
739!
737
    pInfo->srcRowIndex++;
739✔
738
  }
739

740
  while (pInfo->srcRowIndex < pBlock->info.rows) {
1,601✔
741
    code = keepBlockRowInDiscBuf(pOperator, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex, groupId, pFillSup->rowSize);
859✔
742
    QUERY_CHECK_CODE(code, lino, _end);
860!
743
    doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
860✔
744
    if (pInfo->pRes->info.rows == pInfo->pRes->info.capacity) {
859✔
745
      code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
2✔
746
      QUERY_CHECK_CODE(code, lino, _end);
3!
747
      return;
3✔
748
    }
749
    pInfo->srcRowIndex++;
857✔
750
  }
751
  doFillResults(pOperator, pFillSup, pFillInfo, pBlock, tsCol, pInfo->srcRowIndex - 1, pRes);
742✔
752
  code = blockDataUpdateTsWindow(pRes, pInfo->primaryTsCol);
741✔
753
  QUERY_CHECK_CODE(code, lino, _end);
740!
754
  blockDataCleanup(pInfo->pSrcBlock);
740✔
755

756
_end:
741✔
757
  if (code != TSDB_CODE_SUCCESS) {
741!
UNCOV
758
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
759
  }
760
}
761

762
static int32_t buildDeleteRange(SOperatorInfo* pOp, TSKEY start, TSKEY end, uint64_t groupId, SSDataBlock* delRes) {
90✔
763
  int32_t          code = TSDB_CODE_SUCCESS;
90✔
764
  int32_t          lino = 0;
90✔
765
  SStorageAPI*     pAPI = &pOp->pTaskInfo->storageAPI;
90✔
766
  void*            pState = pOp->pTaskInfo->streamInfo.pState;
90✔
767
  SExecTaskInfo*   pTaskInfo = pOp->pTaskInfo;
90✔
768
  SSDataBlock*     pBlock = delRes;
90✔
769
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
90✔
770
  SColumnInfoData* pEndCol = taosArrayGet(pBlock->pDataBlock, END_TS_COLUMN_INDEX);
90✔
771
  SColumnInfoData* pUidCol = taosArrayGet(pBlock->pDataBlock, UID_COLUMN_INDEX);
90✔
772
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
90✔
773
  SColumnInfoData* pCalStartCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_START_TS_COLUMN_INDEX);
90✔
774
  SColumnInfoData* pCalEndCol = taosArrayGet(pBlock->pDataBlock, CALCULATE_END_TS_COLUMN_INDEX);
90✔
775
  SColumnInfoData* pTbNameCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
90✔
776
  code = colDataSetVal(pStartCol, pBlock->info.rows, (const char*)&start, false);
90✔
777
  QUERY_CHECK_CODE(code, lino, _end);
90!
778

779
  code = colDataSetVal(pEndCol, pBlock->info.rows, (const char*)&end, false);
90✔
780
  QUERY_CHECK_CODE(code, lino, _end);
90!
781

782
  colDataSetNULL(pUidCol, pBlock->info.rows);
90!
783
  code = colDataSetVal(pGroupCol, pBlock->info.rows, (const char*)&groupId, false);
90✔
784
  QUERY_CHECK_CODE(code, lino, _end);
90!
785

786
  colDataSetNULL(pCalStartCol, pBlock->info.rows);
90!
787
  colDataSetNULL(pCalEndCol, pBlock->info.rows);
90!
788

789
  SColumnInfoData* pTableCol = taosArrayGet(pBlock->pDataBlock, TABLE_NAME_COLUMN_INDEX);
90✔
790

791
  void*   tbname = NULL;
90✔
792
  int32_t winCode = TSDB_CODE_SUCCESS;
90✔
793
  code = pAPI->stateStore.streamStateGetParName(pOp->pTaskInfo->streamInfo.pState, groupId, &tbname, false, &winCode);
90✔
794
  QUERY_CHECK_CODE(code, lino, _end);
90!
795
  if (winCode != TSDB_CODE_SUCCESS) {
90✔
796
    colDataSetNULL(pTableCol, pBlock->info.rows);
15!
797
  } else {
798
    char parTbName[VARSTR_HEADER_SIZE + TSDB_TABLE_NAME_LEN];
799
    STR_WITH_MAXSIZE_TO_VARSTR(parTbName, tbname, sizeof(parTbName));
75✔
800
    code = colDataSetVal(pTableCol, pBlock->info.rows, (const char*)parTbName, false);
75✔
801
    QUERY_CHECK_CODE(code, lino, _end);
75!
802
    pAPI->stateStore.streamStateFreeVal(tbname);
75✔
803
  }
804

805
  pBlock->info.rows++;
90✔
806

807
_end:
90✔
808
  if (code != TSDB_CODE_SUCCESS) {
90!
UNCOV
809
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
810
  }
811
  return code;
90✔
812
}
813

814
int32_t buildDeleteResult(SOperatorInfo* pOperator, TSKEY startTs, TSKEY endTs, uint64_t groupId, SSDataBlock* delRes) {
90✔
815
  int32_t                  code = TSDB_CODE_SUCCESS;
90✔
816
  int32_t                  lino = 0;
90✔
817
  SStreamFillOperatorInfo* pInfo = pOperator->info;
90✔
818
  SStreamFillSupporter*    pFillSup = pInfo->pFillSup;
90✔
819
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
90✔
820
  if (hasPrevWindow(pFillSup)) {
90✔
821
    TSKEY start = getNextWindowTs(pFillSup->prev.key, &pFillSup->interval);
55✔
822
    code = buildDeleteRange(pOperator, start, endTs, groupId, delRes);
55✔
823
    QUERY_CHECK_CODE(code, lino, _end);
55!
824
  } else if (hasNextWindow(pFillSup)) {
35✔
825
    TSKEY end = getPrevWindowTs(pFillSup->next.key, &pFillSup->interval);
30✔
826
    code = buildDeleteRange(pOperator, startTs, end, groupId, delRes);
30✔
827
    QUERY_CHECK_CODE(code, lino, _end);
30!
828
  } else {
829
    code = buildDeleteRange(pOperator, startTs, endTs, groupId, delRes);
5✔
830
    QUERY_CHECK_CODE(code, lino, _end);
5!
831
  }
832

833
_end:
5✔
834
  if (code != TSDB_CODE_SUCCESS) {
90!
835
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
836
  }
837
  return code;
90✔
838
}
839

840
static int32_t doDeleteFillResultImpl(SOperatorInfo* pOperator, TSKEY startTs, TSKEY endTs, uint64_t groupId) {
145✔
841
  int32_t                  code = TSDB_CODE_SUCCESS;
145✔
842
  int32_t                  lino = 0;
145✔
843
  SStorageAPI*             pAPI = &pOperator->pTaskInfo->storageAPI;
145✔
844
  SStreamFillOperatorInfo* pInfo = pOperator->info;
145✔
845
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
145✔
846
  getWindowFromDiscBuf(pOperator, startTs, groupId, pInfo->pFillSup);
145✔
847
  setDeleteFillValueInfo(startTs, endTs, pInfo->pFillSup, pInfo->pFillInfo);
145✔
848
  SWinKey key = {.ts = startTs, .groupId = groupId};
145✔
849
  pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &key);
145✔
850
  if (!pInfo->pFillInfo->needFill) {
145✔
851
    code = buildDeleteResult(pOperator, startTs, endTs, groupId, pInfo->pDelRes);
90✔
852
    QUERY_CHECK_CODE(code, lino, _end);
90!
853
  } else {
854
    STimeFillRange tw = {
55✔
855
        .skey = startTs,
856
        .ekey = endTs,
857
        .groupId = groupId,
858
    };
859
    void* tmp = taosArrayPush(pInfo->pFillInfo->delRanges, &tw);
55✔
860
    if (!tmp) {
55!
861
      code = terrno;
×
862
      QUERY_CHECK_CODE(code, lino, _end);
×
863
    }
864
  }
865

866
_end:
145✔
867
  if (code != TSDB_CODE_SUCCESS) {
145!
868
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
869
  }
870
  return code;
145✔
871
}
872

873
static void getWindowInfoByKey(SStorageAPI* pAPI, void* pState, TSKEY ts, int64_t groupId, SResultRowData* pWinData) {
×
874
  SWinKey key = {.ts = ts, .groupId = groupId};
×
875
  void*   val = NULL;
×
876
  int32_t len = 0;
×
877
  int32_t code = pAPI->stateStore.streamStateFillGet(pState, &key, (void**)&val, &len, NULL);
×
878
  if (code != TSDB_CODE_SUCCESS) {
×
879
    qDebug("get window info by key failed, Data may be deleted, try next window. ts:%" PRId64 ", groupId:%" PRId64, ts,
×
880
           groupId);
881
    SStreamStateCur* pCur = pAPI->stateStore.streamStateFillSeekKeyNext(pState, &key);
×
882
    code = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &key, (const void**)&val, &len);
×
883
    pAPI->stateStore.streamStateFreeCur(pCur);
×
884
    qDebug("get window info by key ts:%" PRId64 ", groupId:%" PRId64 ", res%d", ts, groupId, code);
×
885
  }
886

887
  if (code == TSDB_CODE_SUCCESS) {
×
888
    resetFillWindow(pWinData);
×
889
    pWinData->key = key.ts;
×
890
    pWinData->pRowVal = val;
×
891
  }
892
}
×
893

894
static void doDeleteFillFinalize(SOperatorInfo* pOperator) {
2,209✔
895
  SStorageAPI* pAPI = &pOperator->pTaskInfo->storageAPI;
2,209✔
896

897
  SStreamFillOperatorInfo* pInfo = pOperator->info;
2,209✔
898
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
2,209✔
899
  int32_t                  size = taosArrayGetSize(pFillInfo->delRanges);
2,209✔
900
  while (pFillInfo->delIndex < size) {
2,266✔
901
    STimeFillRange* range = taosArrayGet(pFillInfo->delRanges, pFillInfo->delIndex);
55✔
902
    if (pInfo->pRes->info.id.groupId != 0 && pInfo->pRes->info.id.groupId != range->groupId) {
55!
903
      return;
×
904
    }
905
    getWindowFromDiscBuf(pOperator, range->skey, range->groupId, pInfo->pFillSup);
55✔
906
    TSKEY realEnd = range->ekey + 1;
55✔
907
    if (pInfo->pFillInfo->type == TSDB_FILL_NEXT && pInfo->pFillSup->next.key != realEnd) {
55!
908
      getWindowInfoByKey(pAPI, pOperator->pTaskInfo->streamInfo.pState, realEnd, range->groupId,
×
909
                         &pInfo->pFillSup->next);
×
910
    }
911
    setDeleteFillValueInfo(range->skey, range->ekey, pInfo->pFillSup, pInfo->pFillInfo);
55✔
912
    pFillInfo->delIndex++;
55✔
913
    if (pInfo->pFillInfo->needFill) {
55!
914
      doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
55✔
915
      pInfo->pRes->info.id.groupId = range->groupId;
55✔
916
    }
917
  }
918
}
919

920
static int32_t doDeleteFillResult(SOperatorInfo* pOperator) {
135✔
921
  int32_t                  code = TSDB_CODE_SUCCESS;
135✔
922
  int32_t                  lino = 0;
135✔
923
  SStorageAPI*             pAPI = &pOperator->pTaskInfo->storageAPI;
135✔
924
  SStreamFillOperatorInfo* pInfo = pOperator->info;
135✔
925
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
135✔
926
  SSDataBlock*             pBlock = pInfo->pSrcDelBlock;
135✔
927
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
135✔
928

929
  SColumnInfoData* pStartCol = taosArrayGet(pBlock->pDataBlock, START_TS_COLUMN_INDEX);
135✔
930
  TSKEY*           tsStarts = (TSKEY*)pStartCol->pData;
135✔
931
  SColumnInfoData* pGroupCol = taosArrayGet(pBlock->pDataBlock, GROUPID_COLUMN_INDEX);
135✔
932
  uint64_t*        groupIds = (uint64_t*)pGroupCol->pData;
135✔
933
  while (pInfo->srcDelRowIndex < pBlock->info.rows) {
340✔
934
    TSKEY            ts = tsStarts[pInfo->srcDelRowIndex];
205✔
935
    TSKEY            endTs = ts;
205✔
936
    uint64_t         groupId = groupIds[pInfo->srcDelRowIndex];
205✔
937
    SWinKey          key = {.ts = ts, .groupId = groupId};
205✔
938
    SStreamStateCur* pCur = pAPI->stateStore.streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &key);
205✔
939

940
    if (!pCur) {
204✔
941
      pInfo->srcDelRowIndex++;
60✔
942
      continue;
60✔
943
    }
944

945
    SWinKey nextKey = {.groupId = groupId, .ts = ts};
144✔
946
    while (pInfo->srcDelRowIndex < pBlock->info.rows) {
411✔
947
      TSKEY    delTs = tsStarts[pInfo->srcDelRowIndex];
335✔
948
      uint64_t delGroupId = groupIds[pInfo->srcDelRowIndex];
335✔
949
      int32_t  winCode = TSDB_CODE_SUCCESS;
335✔
950
      if (groupId != delGroupId) {
335!
951
        break;
70✔
952
      }
953
      if (delTs > nextKey.ts) {
335✔
954
        break;
10✔
955
      }
956

957
      SWinKey delKey = {.groupId = delGroupId, .ts = delTs};
325✔
958
      if (delTs == nextKey.ts) {
325✔
959
        pAPI->stateStore.streamStateCurNext(pOperator->pTaskInfo->streamInfo.pState, pCur);
241✔
960
        winCode = pAPI->stateStore.streamStateFillGetGroupKVByCur(pCur, &nextKey, NULL, NULL);
243✔
961
        // ts will be deleted later
962
        if (delTs != ts) {
243✔
963
          pAPI->stateStore.streamStateFillDel(pOperator->pTaskInfo->streamInfo.pState, &delKey);
98✔
964
          pAPI->stateStore.streamStateFreeCur(pCur);
98✔
965
          pCur = pAPI->stateStore.streamStateGetAndCheckCur(pOperator->pTaskInfo->streamInfo.pState, &nextKey);
98✔
966
        }
967
        endTs = TMAX(delTs, nextKey.ts - 1);
243✔
968
        if (winCode != TSDB_CODE_SUCCESS) {
243✔
969
          break;
60✔
970
        }
971
      }
972
      pInfo->srcDelRowIndex++;
267✔
973
    }
974

975
    pAPI->stateStore.streamStateFreeCur(pCur);
146✔
976
    code = doDeleteFillResultImpl(pOperator, ts, endTs, groupId);
145✔
977
    QUERY_CHECK_CODE(code, lino, _end);
145!
978
  }
979

980
  pFillInfo->current = pFillInfo->end + 1;
135✔
981

982
_end:
135✔
983
  if (code != TSDB_CODE_SUCCESS) {
135!
984
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
985
  }
986
  return code;
135✔
987
}
988

989
void resetStreamFillSup(SStreamFillSupporter* pFillSup) {
5,118✔
990
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
5,118✔
991
  SSHashObj* pNewMap = tSimpleHashInit(16, hashFn);
5,118✔
992
  if (pNewMap != NULL) {
5,116!
993
    tSimpleHashCleanup(pFillSup->pResMap);
5,116✔
994
    pFillSup->pResMap = pNewMap;
5,120✔
995
  } else {
996
    tSimpleHashClear(pFillSup->pResMap);
×
997
  }
998
  pFillSup->hasDelete = false;
5,120✔
999
}
5,120✔
1000
void resetStreamFillInfo(SStreamFillOperatorInfo* pInfo) {
3,164✔
1001
  resetStreamFillSup(pInfo->pFillSup);
3,164✔
1002
  taosArrayClear(pInfo->pFillInfo->delRanges);
3,166✔
1003
  pInfo->pFillInfo->delIndex = 0;
3,165✔
1004
}
3,165✔
1005

1006
int32_t doApplyStreamScalarCalculation(SOperatorInfo* pOperator, SSDataBlock* pSrcBlock,
862✔
1007
                                              SSDataBlock* pDstBlock) {
1008
  int32_t                  code = TSDB_CODE_SUCCESS;
862✔
1009
  int32_t                  lino = 0;
862✔
1010
  SStreamFillOperatorInfo* pInfo = pOperator->info;
862✔
1011
  SExprSupp*               pSup = &pOperator->exprSupp;
862✔
1012
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
862✔
1013

1014
  blockDataCleanup(pDstBlock);
862✔
1015
  code = blockDataEnsureCapacity(pDstBlock, pSrcBlock->info.rows);
862✔
1016
  QUERY_CHECK_CODE(code, lino, _end);
862!
1017

1018
  code = setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
862✔
1019
  QUERY_CHECK_CODE(code, lino, _end);
862!
1020
  code = projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
862✔
1021
  QUERY_CHECK_CODE(code, lino, _end);
862!
1022

1023
  pDstBlock->info.rows = 0;
862✔
1024
  pSup = &pInfo->pFillSup->notFillExprSup;
862✔
1025
  code = setInputDataBlock(pSup, pSrcBlock, TSDB_ORDER_ASC, MAIN_SCAN, false);
862✔
1026
  QUERY_CHECK_CODE(code, lino, _end);
862!
1027
  code = projectApplyFunctions(pSup->pExprInfo, pDstBlock, pSrcBlock, pSup->pCtx, pSup->numOfExprs, NULL);
862✔
1028
  QUERY_CHECK_CODE(code, lino, _end);
862!
1029

1030
  pDstBlock->info.id.groupId = pSrcBlock->info.id.groupId;
862✔
1031

1032
  code = blockDataUpdateTsWindow(pDstBlock, pInfo->primaryTsCol);
862✔
1033

1034
_end:
862✔
1035
  if (code != TSDB_CODE_SUCCESS) {
862!
1036
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1037
  }
1038
  return code;
862✔
1039
}
1040

1041
static int32_t doStreamFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
3,197✔
1042
  int32_t                  code = TSDB_CODE_SUCCESS;
3,197✔
1043
  int32_t                  lino = 0;
3,197✔
1044
  SStreamFillOperatorInfo* pInfo = pOperator->info;
3,197✔
1045
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
3,197✔
1046

1047
  if (pOperator->status == OP_EXEC_DONE) {
3,197!
1048
    (*ppRes) = NULL;
×
1049
    return code;
×
1050
  }
1051
  blockDataCleanup(pInfo->pRes);
3,197✔
1052
  if (hasRemainCalc(pInfo->pFillInfo) ||
3,195✔
1053
      (pInfo->pFillInfo->pos != FILL_POS_INVALID && pInfo->pFillInfo->needFill == true)) {
3,186!
1054
    doStreamFillRange(pInfo->pFillInfo, pInfo->pFillSup, pInfo->pRes);
9✔
1055
    if (pInfo->pRes->info.rows > 0) {
10!
1056
      printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
10✔
1057
      (*ppRes) = pInfo->pRes;
10✔
1058
      return code;
10✔
1059
    }
1060
  }
1061
  if (pOperator->status == OP_RES_TO_RETURN) {
3,187✔
1062
    doDeleteFillFinalize(pOperator);
46✔
1063
    if (pInfo->pRes->info.rows > 0) {
46!
1064
      printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
1065
      (*ppRes) = pInfo->pRes;
×
1066
      return code;
×
1067
    }
1068
    setOperatorCompleted(pOperator);
46✔
1069
    resetStreamFillInfo(pInfo);
46✔
1070
    (*ppRes) = NULL;
46✔
1071
    return code;
46✔
1072
  }
1073

1074
  SSDataBlock*   fillResult = NULL;
3,141✔
1075
  SOperatorInfo* downstream = pOperator->pDownstream[0];
3,141✔
1076
  while (1) {
1077
    if (pInfo->srcRowIndex >= pInfo->pSrcBlock->info.rows || pInfo->pSrcBlock->info.rows == 0) {
3,186✔
1078
      // If there are delete datablocks, we receive  them first.
1079
      SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
3,183✔
1080
      if (pBlock == NULL) {
3,182✔
1081
        pOperator->status = OP_RES_TO_RETURN;
2,163✔
1082
        pInfo->pFillInfo->preRowKey = INT64_MIN;
2,163✔
1083
        if (pInfo->pRes->info.rows > 0) {
2,163!
1084
          printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
×
1085
          (*ppRes) = pInfo->pRes;
×
1086
          return code;
×
1087
        }
1088
        break;
2,163✔
1089
      }
1090
      printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
1,019✔
1091

1092
      if (pInfo->pFillInfo->curGroupId != pBlock->info.id.groupId) {
1,017✔
1093
        pInfo->pFillInfo->curGroupId = pBlock->info.id.groupId;
287✔
1094
        pInfo->pFillInfo->preRowKey = INT64_MIN;
287✔
1095
      }
1096

1097
      pInfo->pFillSup->winRange = pTaskInfo->streamInfo.fillHistoryWindow;
1,017✔
1098
      if (pInfo->pFillSup->winRange.ekey <= 0) {
1,017!
1099
        pInfo->pFillSup->winRange.ekey = INT64_MAX;
×
1100
      }
1101

1102
      switch (pBlock->info.type) {
1,017!
1103
        case STREAM_RETRIEVE:
5✔
1104
          (*ppRes) = pBlock;
5✔
1105
          return code;
5✔
1106
        case STREAM_DELETE_RESULT: {
135✔
1107
          pInfo->pSrcDelBlock = pBlock;
135✔
1108
          pInfo->srcDelRowIndex = 0;
135✔
1109
          blockDataCleanup(pInfo->pDelRes);
135✔
1110
          pInfo->pFillSup->hasDelete = true;
135✔
1111
          code = doDeleteFillResult(pOperator);
135✔
1112
          QUERY_CHECK_CODE(code, lino, _end);
135!
1113

1114
          if (pInfo->pDelRes->info.rows > 0) {
135✔
1115
            printDataBlock(pInfo->pDelRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
90✔
1116
            (*ppRes) = pInfo->pDelRes;
90✔
1117
            return code;
90✔
1118
          }
1119
          continue;
45✔
1120
        } break;
1121
        case STREAM_NORMAL:
741✔
1122
        case STREAM_INVALID:
1123
        case STREAM_PULL_DATA: {
1124
          code = doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
741✔
1125
          QUERY_CHECK_CODE(code, lino, _end);
741!
1126

1127
          memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
741✔
1128
          pInfo->srcRowIndex = -1;
741✔
1129
        } break;
741✔
1130
        case STREAM_CHECKPOINT:
136✔
1131
        case STREAM_CREATE_CHILD_TABLE: {
1132
          (*ppRes) = pBlock;
136✔
1133
          return code;
136✔
1134
        } break;
1135
        default:
×
1136
          return TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1137
      }
1138
    }
1139

1140
    doStreamFillImpl(pOperator);
744✔
1141
    code = doFilter(pInfo->pRes, pOperator->exprSupp.pFilterInfo, &pInfo->matchInfo);
744✔
1142
    QUERY_CHECK_CODE(code, lino, _end);
743!
1143

1144
    memcpy(pInfo->pRes->info.parTbName, pInfo->pSrcBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
743✔
1145
    pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
743✔
1146
    if (pInfo->pRes->info.rows > 0) {
743!
1147
      break;
743✔
1148
    }
1149
  }
1150
  if (pOperator->status == OP_RES_TO_RETURN) {
2,906✔
1151
    doDeleteFillFinalize(pOperator);
2,163✔
1152
  }
1153

1154
  if (pInfo->pRes->info.rows == 0) {
2,908✔
1155
    setOperatorCompleted(pOperator);
2,119✔
1156
    resetStreamFillInfo(pInfo);
2,117✔
1157
    (*ppRes) = NULL;
2,118✔
1158
    return code;
2,118✔
1159
  }
1160

1161
  pOperator->resultInfo.totalRows += pInfo->pRes->info.rows;
789✔
1162
  printDataBlock(pInfo->pRes, getStreamOpName(pOperator->operatorType), GET_TASKID(pTaskInfo));
789✔
1163
  (*ppRes) = pInfo->pRes;
789✔
1164
  return code;
789✔
1165

1166
_end:
×
1167
  if (code != TSDB_CODE_SUCCESS) {
×
1168
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1169
    pTaskInfo->code = code;
×
1170
    T_LONG_JMP(pTaskInfo->env, code);
×
1171
  }
1172
  setOperatorCompleted(pOperator);
×
1173
  resetStreamFillInfo(pInfo);
×
1174
  (*ppRes) = NULL;
×
1175
  return code;
×
1176
}
1177

1178
static void resetForceFillWindow(SResultRowData* pRowData) {
659✔
1179
  pRowData->key = INT64_MIN;
659✔
1180
  pRowData->pRowVal = NULL;
659✔
1181
}
659✔
1182

1183
void doBuildForceFillResultImpl(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup,
516✔
1184
                                SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
1185
  int32_t code = TSDB_CODE_SUCCESS;
516✔
1186
  int32_t lino = 0;
516✔
1187

1188
  SStreamFillOperatorInfo* pInfo = pOperator->info;
516✔
1189
  bool                     res = false;
516✔
1190
  int32_t                  numOfRows = getNumOfTotalRes(pGroupResInfo);
516✔
1191
  for (; pGroupResInfo->index < numOfRows; pGroupResInfo->index++) {
1,175✔
1192
    SWinKey* pKey = (SWinKey*)taosArrayGet(pGroupResInfo->pRows, pGroupResInfo->index);
815✔
1193
    if (pBlock->info.id.groupId == 0) {
815✔
1194
      pBlock->info.id.groupId = pKey->groupId;
516✔
1195
    } else if (pBlock->info.id.groupId != pKey->groupId) {
299✔
1196
      break;
156✔
1197
    }
1198

1199
    SRowBuffPos* pValPos = NULL;
659✔
1200
    int32_t      len = 0;
659✔
1201
    int32_t      winCode = TSDB_CODE_SUCCESS;
659✔
1202
    code = pInfo->stateStore.streamStateFillGet(pInfo->pState, pKey, (void**)&pValPos, &len, &winCode);
659✔
1203
    QUERY_CHECK_CODE(code, lino, _end);
659!
1204
    qDebug("===stream=== build force fill res. key:%" PRId64 ",groupId:%" PRId64".res:%d", pKey->ts, pKey->groupId, winCode);
659!
1205
    if (winCode == TSDB_CODE_SUCCESS) {
659✔
1206
      pFillSup->cur.key = pKey->ts;
184✔
1207
      pFillSup->cur.pRowVal = pValPos->pRowBuff;
184✔
1208
      code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res, false);
184✔
1209
      QUERY_CHECK_CODE(code, lino, _end);
184!
1210
      resetForceFillWindow(&pFillSup->cur);
184✔
1211
      releaseOutputBuf(pInfo->pState, pValPos, &pInfo->stateStore);
184✔
1212
    } else {
1213
      SWinKey      preKey = {.ts = INT64_MIN, .groupId = pKey->groupId};
475✔
1214
      SRowBuffPos* prePos = NULL;
475✔
1215
      int32_t      preVLen = 0;
475✔
1216
      code = pInfo->stateStore.streamStateFillGetPrev(pInfo->pState, pKey, &preKey,
475✔
1217
                                                      (void**)&prePos, &preVLen, &winCode);
1218
      QUERY_CHECK_CODE(code, lino, _end);
475!
1219
      if (winCode == TSDB_CODE_SUCCESS) {
475!
1220
        pFillSup->cur.key = pKey->ts;
475✔
1221
        pFillSup->cur.pRowVal = prePos->pRowBuff;
475✔
1222
        if (pFillInfo->type == TSDB_FILL_PREV) {
475✔
1223
          code = buildFillResult(&pFillSup->cur, pFillSup, pKey->ts, pBlock, &res, true);
297✔
1224
          QUERY_CHECK_CODE(code, lino, _end);
297!
1225
        } else {
1226
          copyNotFillExpData(pFillSup, pFillInfo);
178✔
1227
          pFillInfo->pResRow->key = pKey->ts;
178✔
1228
          code = buildFillResult(pFillInfo->pResRow, pFillSup, pKey->ts, pBlock, &res, true);
178✔
1229
          QUERY_CHECK_CODE(code, lino, _end);
178!
1230
        }
1231
        resetForceFillWindow(&pFillSup->cur);
475✔
1232
      }
1233
      releaseOutputBuf(pInfo->pState, prePos, &pInfo->stateStore);
475✔
1234
    }
1235
  }
1236

1237
_end:
360✔
1238
  if (code != TSDB_CODE_SUCCESS) {
516!
1239
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1240
  }
1241
}
516✔
1242

1243
void doBuildForceFillResult(SOperatorInfo* pOperator, SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo,
1,517✔
1244
                            SSDataBlock* pBlock, SGroupResInfo* pGroupResInfo) {
1245
  blockDataCleanup(pBlock);
1,517✔
1246
  if (!hasRemainResults(pGroupResInfo)) {
1,517✔
1247
    return;
1,001✔
1248
  }
1249

1250
  // clear the existed group id
1251
  pBlock->info.id.groupId = 0;
516✔
1252
  doBuildForceFillResultImpl(pOperator, pFillSup, pFillInfo, pBlock, pGroupResInfo);
516✔
1253
}
1254

1255
static int32_t buildForceFillResult(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,517✔
1256
  int32_t                  code = TSDB_CODE_SUCCESS;
1,517✔
1257
  int32_t                  lino = 0;
1,517✔
1258
  SStreamFillOperatorInfo* pInfo = pOperator->info;
1,517✔
1259
  uint16_t                 opType = pOperator->operatorType;
1,517✔
1260
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,517✔
1261

1262
  doBuildForceFillResult(pOperator, pInfo->pFillSup, pInfo->pFillInfo, pInfo->pRes, &pInfo->groupResInfo);
1,517✔
1263
  if (pInfo->pRes->info.rows != 0) {
1,517✔
1264
    printDataBlock(pInfo->pRes, getStreamOpName(opType), GET_TASKID(pTaskInfo));
516✔
1265
    (*ppRes) = pInfo->pRes;
516✔
1266
    goto _end;
516✔
1267
  }
1268

1269
  (*ppRes) = NULL;
1,001✔
1270

1271
_end:
1,517✔
1272
  if (code != TSDB_CODE_SUCCESS) {
1,517!
1273
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1274
  }
1275
  return code;
1,517✔
1276
}
1277

1278
static void keepResultInStateBuf(SStreamFillOperatorInfo* pInfo, uint64_t groupId, SResultRowData* pRow) {
184✔
1279
  int32_t code = TSDB_CODE_SUCCESS;
184✔
1280
  int32_t lino = 0;
184✔
1281

1282
  SWinKey      key = {.groupId = groupId, .ts = pRow->key};
184✔
1283
  int32_t      curVLen = 0;
184✔
1284
  SRowBuffPos* pStatePos = NULL;
184✔
1285
  int32_t      winCode = TSDB_CODE_SUCCESS;
184✔
1286
  code = pInfo->stateStore.streamStateFillAddIfNotExist(pInfo->pState, &key, (void**)&pStatePos,
184✔
1287
                                                        &curVLen, &winCode);
1288
  QUERY_CHECK_CODE(code, lino, _end);
184!
1289
  memcpy(pStatePos->pRowBuff, pRow->pRowVal, pInfo->pFillSup->rowSize);
184✔
1290
  qDebug("===stream===fill operator save key ts:%" PRId64 " group id:%" PRIu64 "  code:%d", key.ts, key.groupId, code);
184!
1291

1292
_end:
×
1293
  if (code != TSDB_CODE_SUCCESS) {
184!
1294
    qError("%s failed at line %d since %s", __func__, __LINE__, tstrerror(code));
×
1295
  }
1296
}
184✔
1297

1298
int32_t keepBlockRowInStateBuf(SStreamFillOperatorInfo* pInfo, SStreamFillInfo* pFillInfo, SSDataBlock* pBlock, TSKEY* tsCol,
184✔
1299
                               int32_t rowId, uint64_t groupId, int32_t rowSize) {
1300
  int32_t code = TSDB_CODE_SUCCESS;
184✔
1301
  int32_t lino = 0;
184✔
1302
  TSKEY ts = tsCol[rowId];
184✔
1303
  pFillInfo->nextRowKey = ts;
184✔
1304
  TAOS_MEMSET(pFillInfo->pTempBuff, 0, rowSize);
184✔
1305
  SResultRowData tmpNextRow = {.key = ts, .pRowVal = pFillInfo->pTempBuff};
184✔
1306

1307
  transBlockToResultRow(pBlock, rowId, ts, &tmpNextRow);
184✔
1308
  keepResultInStateBuf(pInfo, groupId, &tmpNextRow);
184✔
1309

1310
_end:
184✔
1311
  if (code != TSDB_CODE_SUCCESS) {
184!
1312
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1313
  }
1314
  return code;
184✔
1315
}
1316

1317
// force window close impl
1318
static int32_t doStreamForceFillImpl(SOperatorInfo* pOperator) {
121✔
1319
  int32_t                  code = TSDB_CODE_SUCCESS;
121✔
1320
  int32_t                  lino = 0;
121✔
1321
  SStreamFillOperatorInfo* pInfo = pOperator->info;
121✔
1322
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
121✔
1323
  SStreamFillSupporter*    pFillSup = pInfo->pFillSup;
121✔
1324
  SStreamFillInfo*         pFillInfo = pInfo->pFillInfo;
121✔
1325
  SSDataBlock*             pBlock = pInfo->pSrcBlock;
121✔
1326
  uint64_t                 groupId = pBlock->info.id.groupId;
121✔
1327
  SColumnInfoData*         pTsCol = taosArrayGet(pInfo->pSrcBlock->pDataBlock, pInfo->primaryTsCol);
121✔
1328
  TSKEY*                   tsCol = (TSKEY*)pTsCol->pData;
121✔
1329
  for (int32_t i = 0; i < pBlock->info.rows; i++){
305✔
1330
    code = keepBlockRowInStateBuf(pInfo, pFillInfo, pBlock, tsCol, i, groupId, pFillSup->rowSize);
184✔
1331
    QUERY_CHECK_CODE(code, lino, _end);
184!
1332

1333
    int32_t size =  taosArrayGetSize(pInfo->pCloseTs);
184✔
1334
    if (size > 0) {
184!
1335
      TSKEY* pTs = (TSKEY*) taosArrayGet(pInfo->pCloseTs, 0);
184✔
1336
      TSKEY  resTs = tsCol[i];
184✔
1337
      while (resTs < (*pTs)) {
299✔
1338
        SWinKey key = {.groupId = groupId, .ts = resTs};
143✔
1339
        void* pPushRes = taosArrayPush(pInfo->pUpdated, &key);
143✔
1340
        QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
143!
1341

1342
        if (IS_FILL_CONST_VALUE(pFillSup->type)) {
143!
1343
          break;
1344
        }
1345
        resTs = taosTimeAdd(resTs, pFillSup->interval.sliding, pFillSup->interval.slidingUnit,
115✔
1346
                            pFillSup->interval.precision, NULL);
115✔
1347
      }
1348
    }
1349
  }
1350
  code = pInfo->stateStore.streamStateGroupPut(pInfo->pState, groupId, NULL, 0);
121✔
1351
  QUERY_CHECK_CODE(code, lino, _end);
121!
1352

1353
_end:
121✔
1354
  if (code != TSDB_CODE_SUCCESS) {
121!
1355
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1356
  }
1357
  return code;
121✔
1358
}
1359

1360
int32_t buildAllResultKey(SStateStore* pStateStore, SStreamState* pState, TSKEY ts, SArray* pUpdated) {
2,655✔
1361
  int32_t          code = TSDB_CODE_SUCCESS;
2,655✔
1362
  int32_t          lino = 0;
2,655✔
1363
  int64_t          groupId = 0;
2,655✔
1364
  SStreamStateCur* pCur = pStateStore->streamStateGroupGetCur(pState);
2,655✔
1365
  while (1) {  
1,329✔
1366
    int32_t winCode = pStateStore->streamStateGroupGetKVByCur(pCur, &groupId, NULL, NULL);
3,984✔
1367
    if (winCode != TSDB_CODE_SUCCESS) {
3,984✔
1368
      break;
2,655✔
1369
    }
1370
    SWinKey key = {.ts = ts, .groupId = groupId};
1,329✔
1371
    void* pPushRes = taosArrayPush(pUpdated, &key);
1,329✔
1372
    QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
1,329!
1373

1374
    pStateStore->streamStateGroupCurNext(pCur);
1,329✔
1375
  }
1376
  pStateStore->streamStateFreeCur(pCur);
2,655✔
1377
  pCur = NULL;
2,655✔
1378

1379
_end:
2,655✔
1380
  if (code != TSDB_CODE_SUCCESS) {
2,655!
1381
    pStateStore->streamStateFreeCur(pCur);
×
1382
    pCur = NULL;
×
1383
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1384
  }
1385
  return code;
2,655✔
1386
}
1387

1388
void removeDuplicateResult(SArray* pTsArrray, __compar_fn_t fn) {
1,001✔
1389
  taosArraySort(pTsArrray, fn);
1,001✔
1390
  taosArrayRemoveDuplicate(pTsArrray, fn, NULL);
1,001✔
1391
}
1,001✔
1392

1393
// force window close
1394
static int32_t doStreamForceFillNext(SOperatorInfo* pOperator, SSDataBlock** ppRes) {
1,521✔
1395
  int32_t                  code = TSDB_CODE_SUCCESS;
1,521✔
1396
  int32_t                  lino = 0;
1,521✔
1397
  SStreamFillOperatorInfo* pInfo = pOperator->info;
1,521✔
1398
  SExecTaskInfo*           pTaskInfo = pOperator->pTaskInfo;
1,521✔
1399

1400
  if (pOperator->status == OP_EXEC_DONE) {
1,521!
1401
    (*ppRes) = NULL;
×
1402
    return code;
×
1403
  }
1404

1405
  if (pOperator->status == OP_RES_TO_RETURN) {
1,521✔
1406
    SSDataBlock* resBlock = NULL;
516✔
1407
    code = buildForceFillResult(pOperator, &resBlock);
516✔
1408
    QUERY_CHECK_CODE(code, lino, _end);
516!
1409

1410
    if (resBlock != NULL) {
516✔
1411
      (*ppRes) = resBlock;
156✔
1412
      goto _end;
156✔
1413
    }
1414

1415
    pInfo->stateStore.streamStateClearExpiredState(pInfo->pState, 1, INT64_MAX);
360✔
1416
    resetStreamFillInfo(pInfo);
360✔
1417
    setStreamOperatorCompleted(pOperator);
360✔
1418
    (*ppRes) = NULL;
360✔
1419
    goto _end;
360✔
1420
  }
1421

1422
  SSDataBlock*   fillResult = NULL;
1,005✔
1423
  SOperatorInfo* downstream = pOperator->pDownstream[0];
1,005✔
1424
  while (1) {
1,121✔
1425
    SSDataBlock* pBlock = getNextBlockFromDownstream(pOperator, 0);
2,126✔
1426
    if (pBlock == NULL) {
2,126✔
1427
      pOperator->status = OP_RES_TO_RETURN;
1,001✔
1428
      qDebug("===stream===return data:%s.", getStreamOpName(pOperator->operatorType));
1,001!
1429
      break;
1,001✔
1430
    }
1431
    printSpecDataBlock(pBlock, getStreamOpName(pOperator->operatorType), "recv", GET_TASKID(pTaskInfo));
1,125✔
1432
    setStreamOperatorState(&pInfo->basic, pBlock->info.type);
1,125✔
1433

1434
    switch (pBlock->info.type) {
1,125!
1435
      case STREAM_NORMAL:
121✔
1436
      case STREAM_INVALID: {
1437
        code = doApplyStreamScalarCalculation(pOperator, pBlock, pInfo->pSrcBlock);
121✔
1438
        QUERY_CHECK_CODE(code, lino, _end);
121!
1439

1440
        memcpy(pInfo->pSrcBlock->info.parTbName, pBlock->info.parTbName, TSDB_TABLE_NAME_LEN);
121✔
1441
        pInfo->srcRowIndex = -1;
121✔
1442
      } break;
121✔
1443
      case STREAM_CHECKPOINT: {
1✔
1444
        pInfo->stateStore.streamStateCommit(pInfo->pState);
1✔
1445
        (*ppRes) = pBlock;
1✔
1446
        goto _end;
1✔
1447
      } break;
1448
      case STREAM_CREATE_CHILD_TABLE: {
3✔
1449
        (*ppRes) = pBlock;
3✔
1450
        goto _end;
3✔
1451
      } break;
1452
      case STREAM_GET_RESULT: {
2,000✔
1453
        void* pPushRes = taosArrayPush(pInfo->pCloseTs, &pBlock->info.window.skey);
1,000✔
1454
        QUERY_CHECK_NULL(pPushRes, code, lino, _end, terrno);
1,000!
1455
        continue;
1,000✔
1456
      }
1457
      default:
×
1458
        code = TSDB_CODE_QRY_EXECUTOR_INTERNAL_ERROR;
×
1459
        QUERY_CHECK_CODE(code, lino, _end);
×
1460
    }
1461

1462
    code = doStreamForceFillImpl(pOperator);
121✔
1463
    QUERY_CHECK_CODE(code, lino, _end);
121!
1464
  }
1465

1466
  for (int32_t i = 0; i < taosArrayGetSize(pInfo->pCloseTs); i++) {
2,001✔
1467
    TSKEY ts = *(TSKEY*) taosArrayGet(pInfo->pCloseTs, i);
1,000✔
1468
    code = buildAllResultKey(&pInfo->stateStore, pInfo->pState, ts, pInfo->pUpdated);
1,000✔
1469
    QUERY_CHECK_CODE(code, lino, _end);
1,000!
1470
  }
1471
  taosArrayClear(pInfo->pCloseTs);
1,001✔
1472
  removeDuplicateResult(pInfo->pUpdated, winKeyCmprImpl);
1,001✔
1473

1474
  initMultiResInfoFromArrayList(&pInfo->groupResInfo, pInfo->pUpdated);
1,001✔
1475
  pInfo->groupResInfo.freeItem = false;
1,001✔
1476

1477
  pInfo->pUpdated = taosArrayInit(1024, sizeof(SWinKey));
1,001✔
1478
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _end, terrno);
1,001!
1479

1480
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
1,001✔
1481
  QUERY_CHECK_CODE(code, lino, _end);
1,001!
1482

1483
  code = buildForceFillResult(pOperator, ppRes);
1,001✔
1484
  QUERY_CHECK_CODE(code, lino, _end);
1,001!
1485

1486
  if ((*ppRes) == NULL) {
1,001✔
1487
    pInfo->stateStore.streamStateClearExpiredState(pInfo->pState, 1, INT64_MAX);
641✔
1488
    resetStreamFillInfo(pInfo);
641✔
1489
    setStreamOperatorCompleted(pOperator);
641✔
1490
  }
1491

1492
_end:
360✔
1493
  if (code != TSDB_CODE_SUCCESS) {
1,521!
1494
    qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1495
    pTaskInfo->code = code;
×
1496
  }
1497
  return code;
1,521✔
1498
}
1499

1500
static int32_t initResultBuf(SSDataBlock* pInputRes, SStreamFillSupporter* pFillSup) {
451✔
1501
  int32_t numOfCols = taosArrayGetSize(pInputRes->pDataBlock);
451✔
1502
  pFillSup->rowSize = sizeof(SResultCellData) * numOfCols;
451✔
1503
  for (int i = 0; i < numOfCols; i++) {
7,724✔
1504
    SColumnInfoData* pCol = taosArrayGet(pInputRes->pDataBlock, i);
7,273✔
1505
    pFillSup->rowSize += pCol->info.bytes;
7,273✔
1506
  }
1507
  pFillSup->next.key = INT64_MIN;
451✔
1508
  pFillSup->nextNext.key = INT64_MIN;
451✔
1509
  pFillSup->prev.key = INT64_MIN;
451✔
1510
  pFillSup->cur.key = INT64_MIN;
451✔
1511
  pFillSup->next.pRowVal = NULL;
451✔
1512
  pFillSup->nextNext.pRowVal = NULL;
451✔
1513
  pFillSup->prev.pRowVal = NULL;
451✔
1514
  pFillSup->cur.pRowVal = NULL;
451✔
1515

1516
  return TSDB_CODE_SUCCESS;
451✔
1517
}
1518

1519
static SStreamFillSupporter* initStreamFillSup(SStreamFillPhysiNode* pPhyFillNode, SInterval* pInterval,
451✔
1520
                                               SExprInfo* pFillExprInfo, int32_t numOfFillCols, SStorageAPI* pAPI, SSDataBlock* pInputRes) {
1521
  int32_t               code = TSDB_CODE_SUCCESS;
451✔
1522
  int32_t               lino = 0;
451✔
1523
  SStreamFillSupporter* pFillSup = taosMemoryCalloc(1, sizeof(SStreamFillSupporter));
451!
1524
  if (!pFillSup) {
451!
1525
    code = terrno;
×
1526
    QUERY_CHECK_CODE(code, lino, _end);
×
1527
  }
1528
  pFillSup->numOfFillCols = numOfFillCols;
451✔
1529
  int32_t    numOfNotFillCols = 0;
451✔
1530
  SExprInfo* noFillExprInfo = NULL;
451✔
1531

1532
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExprInfo, &numOfNotFillCols);
451✔
1533
  QUERY_CHECK_CODE(code, lino, _end);
451!
1534

1535
  pFillSup->pAllColInfo = createFillColInfo(pFillExprInfo, pFillSup->numOfFillCols, noFillExprInfo, numOfNotFillCols,
902✔
1536
                                            NULL, 0, (const SNodeListNode*)(pPhyFillNode->pValues));
451✔
1537
  if (pFillSup->pAllColInfo == NULL) {
451!
1538
    code = terrno;
×
1539
    lino = __LINE__;
×
1540
    destroyExprInfo(noFillExprInfo, numOfNotFillCols);
×
1541
    goto _end;
×
1542
  }
1543

1544
  pFillSup->type = convertFillType(pPhyFillNode->mode);
451✔
1545
  pFillSup->numOfAllCols = pFillSup->numOfFillCols + numOfNotFillCols;
451✔
1546
  pFillSup->interval = *pInterval;
451✔
1547
  pFillSup->pAPI = pAPI;
451✔
1548

1549
  code = initResultBuf(pInputRes, pFillSup);
451✔
1550
  QUERY_CHECK_CODE(code, lino, _end);
451!
1551

1552
  SExprInfo* noFillExpr = NULL;
451✔
1553
  code = createExprInfo(pPhyFillNode->pNotFillExprs, NULL, &noFillExpr, &numOfNotFillCols);
451✔
1554
  QUERY_CHECK_CODE(code, lino, _end);
451!
1555

1556
  code = initExprSupp(&pFillSup->notFillExprSup, noFillExpr, numOfNotFillCols, &pAPI->functionStore);
451✔
1557
  QUERY_CHECK_CODE(code, lino, _end);
451!
1558

1559
  _hash_fn_t hashFn = taosGetDefaultHashFunction(TSDB_DATA_TYPE_BINARY);
451✔
1560
  pFillSup->pResMap = tSimpleHashInit(16, hashFn);
451✔
1561
  QUERY_CHECK_NULL(pFillSup->pResMap, code, lino, _end, terrno);
451!
1562
  pFillSup->hasDelete = false;
451✔
1563
  pFillSup->normalFill = true;
451✔
1564
  pFillSup->pResultRange = taosArrayInit(2, POINTER_BYTES);
451✔
1565

1566

1567
_end:
451✔
1568
  if (code != TSDB_CODE_SUCCESS) {
451!
UNCOV
1569
    destroyStreamFillSupporter(pFillSup);
×
UNCOV
1570
    pFillSup = NULL;
×
UNCOV
1571
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1572
  }
1573
  return pFillSup;
451✔
1574
}
1575

1576
SStreamFillInfo* initStreamFillInfo(SStreamFillSupporter* pFillSup, SSDataBlock* pRes) {
672✔
1577
  int32_t          code = TSDB_CODE_SUCCESS;
672✔
1578
  int32_t          lino = 0;
672✔
1579
  SStreamFillInfo* pFillInfo = taosMemoryCalloc(1, sizeof(SStreamFillInfo));
672!
1580
  if (!pFillInfo) {
672!
UNCOV
1581
    code = terrno;
×
UNCOV
1582
    QUERY_CHECK_CODE(code, lino, _end);
×
1583
  }
1584

1585
  pFillInfo->start = INT64_MIN;
672✔
1586
  pFillInfo->current = INT64_MIN;
672✔
1587
  pFillInfo->end = INT64_MIN;
672✔
1588
  pFillInfo->preRowKey = INT64_MIN;
672✔
1589
  pFillInfo->needFill = false;
672✔
1590
  pFillInfo->pLinearInfo = taosMemoryCalloc(1, sizeof(SStreamFillLinearInfo));
672!
1591
  if (!pFillInfo) {
672!
UNCOV
1592
    code = terrno;
×
UNCOV
1593
    QUERY_CHECK_CODE(code, lino, _end);
×
1594
  }
1595

1596
  pFillInfo->pLinearInfo->hasNext = false;
672✔
1597
  pFillInfo->pLinearInfo->nextEnd = INT64_MIN;
672✔
1598
  pFillInfo->pLinearInfo->pEndPoints = NULL;
672✔
1599
  pFillInfo->pLinearInfo->pNextEndPoints = NULL;
672✔
1600
  if (pFillSup->type == TSDB_FILL_LINEAR) {
672✔
1601
    pFillInfo->pLinearInfo->pEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
106✔
1602
    if (!pFillInfo->pLinearInfo->pEndPoints) {
106!
UNCOV
1603
      code = terrno;
×
UNCOV
1604
      QUERY_CHECK_CODE(code, lino, _end);
×
1605
    }
1606

1607
    pFillInfo->pLinearInfo->pNextEndPoints = taosArrayInit(pFillSup->numOfAllCols, sizeof(SPoint));
106✔
1608
    if (!pFillInfo->pLinearInfo->pNextEndPoints) {
106!
UNCOV
1609
      code = terrno;
×
UNCOV
1610
      QUERY_CHECK_CODE(code, lino, _end);
×
1611
    }
1612

1613
    for (int32_t i = 0; i < pFillSup->numOfAllCols; i++) {
1,538✔
1614
      SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
1,432✔
1615
      if (pColData == NULL) {
1,432✔
1616
        SPoint dummy = {0};
16✔
1617
        dummy.val = taosMemoryCalloc(1, 1);
16!
1618
        void* tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &dummy);
16✔
1619
        QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
16!
1620

1621
        dummy.val = taosMemoryCalloc(1, 1);
16!
1622
        tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &dummy);
16✔
1623
        QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
16!
1624

1625
        continue;
16✔
1626
      }
1627
      SPoint value = {0};
1,416✔
1628
      value.val = taosMemoryCalloc(1, pColData->info.bytes);
1,416!
1629
      QUERY_CHECK_NULL(value.val, code, lino, _end, terrno);
1,416!
1630

1631
      void* tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pEndPoints, &value);
1,416✔
1632
      QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
1,416!
1633

1634
      value.val = taosMemoryCalloc(1, pColData->info.bytes);
1,416!
1635
      QUERY_CHECK_NULL(value.val, code, lino, _end, terrno);
1,416!
1636

1637
      tmpRes = taosArrayPush(pFillInfo->pLinearInfo->pNextEndPoints, &value);
1,416✔
1638
      QUERY_CHECK_NULL(tmpRes, code, lino, _end, terrno);
1,416!
1639
    }
1640
  }
1641
  pFillInfo->pLinearInfo->winIndex = 0;
672✔
1642

1643
  pFillInfo->pNonFillRow = NULL;
672✔
1644
  pFillInfo->pResRow = NULL;
672✔
1645
  if (pFillSup->type == TSDB_FILL_SET_VALUE || pFillSup->type == TSDB_FILL_SET_VALUE_F ||
672✔
1646
      pFillSup->type == TSDB_FILL_NULL || pFillSup->type == TSDB_FILL_NULL_F) {
534✔
1647
    pFillInfo->pResRow = taosMemoryCalloc(1, sizeof(SResultRowData));
303!
1648
    QUERY_CHECK_NULL(pFillInfo->pResRow, code, lino, _end, terrno);
303!
1649

1650
    pFillInfo->pResRow->key = INT64_MIN;
303✔
1651
    pFillInfo->pResRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
303!
1652
    QUERY_CHECK_NULL(pFillInfo->pResRow->pRowVal, code, lino, _end, terrno);
303!
1653

1654
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
4,025✔
1655
      SColumnInfoData* pColData = taosArrayGet(pRes->pDataBlock, i);
3,722✔
1656
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, i);
3,722✔
1657
      if (pColData == NULL) {
3,722✔
1658
        pCell->bytes = 1;
71✔
1659
        pCell->type = 4;
71✔
1660
        continue;
71✔
1661
      }
1662
      pCell->bytes = pColData->info.bytes;
3,651✔
1663
      pCell->type = pColData->info.type;
3,651✔
1664
    }
1665

1666
    int32_t numOfResCol = taosArrayGetSize(pRes->pDataBlock);
303✔
1667
    if (numOfResCol < pFillSup->numOfAllCols) {
303✔
1668
      int32_t* pTmpBuf = (int32_t*)taosMemoryRealloc(pFillSup->pOffsetInfo, pFillSup->numOfAllCols * sizeof(int32_t));
67!
1669
      QUERY_CHECK_NULL(pTmpBuf, code, lino, _end, terrno);
67!
1670
      pFillSup->pOffsetInfo = pTmpBuf;
67✔
1671

1672
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, numOfResCol - 1);
67✔
1673
      int32_t preLength = pFillSup->pOffsetInfo[numOfResCol - 1] + pCell->bytes + sizeof(SResultCellData);
67✔
1674
      for (int32_t i = numOfResCol; i < pFillSup->numOfAllCols; i++) {
138✔
1675
        pFillSup->pOffsetInfo[i] = preLength;
71✔
1676
        pCell = getResultCell(pFillInfo->pResRow, i);
71✔
1677
        preLength += pCell->bytes + sizeof(SResultCellData);
71✔
1678
      }
1679
    }
1680

1681
    pFillInfo->pNonFillRow = taosMemoryCalloc(1, sizeof(SResultRowData));
303!
1682
    QUERY_CHECK_NULL(pFillInfo->pNonFillRow, code, lino, _end, terrno);
303!
1683
    pFillInfo->pNonFillRow->key = INT64_MIN;
303✔
1684
    pFillInfo->pNonFillRow->pRowVal = taosMemoryCalloc(1, pFillSup->rowSize);
303!
1685
    memcpy(pFillInfo->pNonFillRow->pRowVal, pFillInfo->pResRow->pRowVal, pFillSup->rowSize);
303✔
1686
  }
1687

1688
  pFillInfo->type = pFillSup->type;
672✔
1689
  pFillInfo->delRanges = taosArrayInit(16, sizeof(STimeFillRange));
672✔
1690
  if (!pFillInfo->delRanges) {
672!
UNCOV
1691
    code = terrno;
×
UNCOV
1692
    QUERY_CHECK_CODE(code, lino, _end);
×
1693
  }
1694

1695
  pFillInfo->delIndex = 0;
672✔
1696
  pFillInfo->curGroupId = 0;
672✔
1697
  pFillInfo->hasNext = false;
672✔
1698
  pFillInfo->pTempBuff = taosMemoryCalloc(1, pFillSup->rowSize);
672!
1699
  return pFillInfo;
672✔
1700

UNCOV
1701
_end:
×
1702
  if (code != TSDB_CODE_SUCCESS) {
×
1703
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1704
  }
UNCOV
1705
  destroyStreamFillInfo(pFillInfo);
×
UNCOV
1706
  return NULL;
×
1707
}
1708

1709
static void setValueForFillInfo(SStreamFillSupporter* pFillSup, SStreamFillInfo* pFillInfo) {
451✔
1710
  if (pFillInfo->type == TSDB_FILL_SET_VALUE || pFillInfo->type == TSDB_FILL_SET_VALUE_F) {
451✔
1711
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
1,445✔
1712
      SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
1,358✔
1713
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
1,358✔
1714
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
1,358✔
1715
      SVariant*        pVar = &(pFillCol->fillVal);
1,358✔
1716
      if (pCell->type == TSDB_DATA_TYPE_FLOAT) {
1,358!
UNCOV
1717
        float v = 0;
×
UNCOV
1718
        GET_TYPED_DATA(v, float, pVar->nType, &pVar->i, 0);
×
UNCOV
1719
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
×
1720
      } else if (IS_FLOAT_TYPE(pCell->type)) {
1,806!
1721
        double v = 0;
448✔
1722
        GET_TYPED_DATA(v, double, pVar->nType, &pVar->i, 0);
448!
1723
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
448!
1724
      } else if (IS_INTEGER_TYPE(pCell->type)) {
1,733!
1725
        int64_t v = 0;
823✔
1726
        GET_TYPED_DATA(v, int64_t, pVar->nType, &pVar->i, 0);
823!
1727
        SET_TYPED_DATA(pCell->pData, pCell->type, v);
823!
1728
      } else {
1729
        pCell->isNull = true;
87✔
1730
      }
1731
    }
1732
  } else if (pFillInfo->type == TSDB_FILL_NULL || pFillInfo->type == TSDB_FILL_NULL_F) {
364✔
1733
    for (int32_t i = 0; i < pFillSup->numOfAllCols; ++i) {
2,078✔
1734
      SFillColInfo*    pFillCol = pFillSup->pAllColInfo + i;
1,954✔
1735
      int32_t          slotId = GET_DEST_SLOT_ID(pFillCol);
1,954✔
1736
      SResultCellData* pCell = getResultCell(pFillInfo->pResRow, slotId);
1,954✔
1737
      pCell->isNull = true;
1,954✔
1738
    }
1739
  }
1740
}
451✔
1741

1742
int32_t getDownStreamInfo(SOperatorInfo* downstream, int8_t* triggerType, SInterval* pInterval, int16_t* pOperatorFlag) {
451✔
1743
  int32_t code = TSDB_CODE_SUCCESS;
451✔
1744
  int32_t lino = 0;
451✔
1745
  if (IS_NORMAL_INTERVAL_OP(downstream)) {
451✔
1746
    SStreamIntervalOperatorInfo* pInfo = downstream->info;
407✔
1747
    *triggerType = pInfo->twAggSup.calTrigger;
407✔
1748
    *pInterval = pInfo->interval;
407✔
1749
    *pOperatorFlag = pInfo->basic.operatorFlag;
407✔
1750
  } else if (IS_CONTINUE_INTERVAL_OP(downstream)) {
44!
1751
    SStreamIntervalSliceOperatorInfo* pInfo = downstream->info;
44✔
1752
    *triggerType = pInfo->twAggSup.calTrigger;
44✔
1753
    *pInterval = pInfo->interval;
44✔
1754
    pInfo->hasFill = true;
44✔
1755
    *pOperatorFlag = pInfo->basic.operatorFlag;
44✔
1756
  } else {
UNCOV
1757
    code = TSDB_CODE_STREAM_INTERNAL_ERROR;
×
1758
  }
1759
  QUERY_CHECK_CODE(code, lino, _end);
451!
1760
  
1761
_end:
451✔
1762
  if (code != TSDB_CODE_SUCCESS) {
451!
UNCOV
1763
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1764
  }
1765
  return code;
451✔
1766
}
1767

1768
int32_t initFillOperatorStateBuff(SStreamFillOperatorInfo* pInfo, SStreamState* pState, SStateStore* pStore,
44✔
1769
                                  SReadHandle* pHandle, const char* taskIdStr, SStorageAPI* pApi) {
1770
  int32_t code = TSDB_CODE_SUCCESS;
44✔
1771
  int32_t lino = 0;
44✔
1772

1773
  pInfo->stateStore = *pStore;
44✔
1774
  pInfo->pState = taosMemoryCalloc(1, sizeof(SStreamState));
44!
1775
  QUERY_CHECK_NULL(pInfo->pState, code, lino, _end, terrno);
44!
1776

1777
  *(pInfo->pState) = *pState;
44✔
1778
  pInfo->stateStore.streamStateSetNumber(pInfo->pState, -1, pInfo->primaryTsCol);
44✔
1779
  code = pInfo->stateStore.streamFileStateInit(tsStreamBufferSize, sizeof(SWinKey), pInfo->pFillSup->rowSize, 0, compareTs,
44✔
1780
                                               pInfo->pState, INT64_MAX, taskIdStr, pHandle->checkpointId,
44✔
1781
                                               STREAM_STATE_BUFF_HASH_SORT, &pInfo->pState->pFileState);
44✔
1782
  QUERY_CHECK_CODE(code, lino, _end);
44!
1783

1784
_end:
44✔
1785
  if (code != TSDB_CODE_SUCCESS) {
44!
UNCOV
1786
    qError("%s failed at line %d since %s", __func__, lino, tstrerror(code));
×
1787
  }
1788
  return code;
44✔
1789
}
1790

1791
int32_t createStreamFillOperatorInfo(SOperatorInfo* downstream, SStreamFillPhysiNode* pPhyFillNode,
451✔
1792
                                     SExecTaskInfo* pTaskInfo, SReadHandle* pHandle, SOperatorInfo** pOptrInfo) {
1793
  QRY_PARAM_CHECK(pOptrInfo);
451!
1794

1795
  int32_t                  code = TSDB_CODE_SUCCESS;
451✔
1796
  int32_t                  lino = 0;
451✔
1797
  SStreamFillOperatorInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamFillOperatorInfo));
451!
1798
  SOperatorInfo*           pOperator = taosMemoryCalloc(1, sizeof(SOperatorInfo));
451!
1799
  if (pInfo == NULL || pOperator == NULL) {
451!
UNCOV
1800
    code = terrno;
×
UNCOV
1801
    QUERY_CHECK_CODE(code, lino, _error);
×
1802
  }
1803

1804
  int32_t    numOfFillCols = 0;
451✔
1805
  SExprInfo* pFillExprInfo = NULL;
451✔
1806

1807
  code = createExprInfo(pPhyFillNode->pFillExprs, NULL, &pFillExprInfo, &numOfFillCols);
451✔
1808
  QUERY_CHECK_CODE(code, lino, _error);
451!
1809

1810
  code = initExprSupp(&pOperator->exprSupp, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI.functionStore);
451✔
1811
  QUERY_CHECK_CODE(code, lino, _error);
451!
1812

1813
  pInfo->pSrcBlock = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
451✔
1814
  QUERY_CHECK_NULL(pInfo->pSrcBlock, code, lino, _error, terrno);
451!
1815

1816
  int8_t triggerType = 0;
451✔
1817
  SInterval interval = {0};
451✔
1818
  int16_t opFlag = 0;
451✔
1819
  code = getDownStreamInfo(downstream, &triggerType, &interval, &opFlag);
451✔
1820
  QUERY_CHECK_CODE(code, lino, _error);
451!
1821

1822
  pInfo->pFillSup = initStreamFillSup(pPhyFillNode, &interval, pFillExprInfo, numOfFillCols, &pTaskInfo->storageAPI,
451✔
1823
                                      pInfo->pSrcBlock);
1824
  if (!pInfo->pFillSup) {
451!
UNCOV
1825
    code = TSDB_CODE_FAILED;
×
UNCOV
1826
    QUERY_CHECK_CODE(code, lino, _error);
×
1827
  }
1828

1829
  initResultSizeInfo(&pOperator->resultInfo, 4096);
451✔
1830
  pInfo->pRes = createDataBlockFromDescNode(pPhyFillNode->node.pOutputDataBlockDesc);
451✔
1831
  QUERY_CHECK_NULL(pInfo->pRes, code, lino, _error, terrno);
451!
1832

1833
  code = blockDataEnsureCapacity(pInfo->pRes, pOperator->resultInfo.capacity);
451✔
1834
  QUERY_CHECK_CODE(code, lino, _error);
451!
1835

1836
  code = blockDataEnsureCapacity(pInfo->pSrcBlock, pOperator->resultInfo.capacity);
451✔
1837
  QUERY_CHECK_CODE(code, lino, _error);
451!
1838

1839
  pInfo->pFillInfo = initStreamFillInfo(pInfo->pFillSup, pInfo->pRes);
451✔
1840
  if (!pInfo->pFillInfo) {
451!
UNCOV
1841
    goto _error;
×
1842
  }
1843

1844
  setValueForFillInfo(pInfo->pFillSup, pInfo->pFillInfo);
451✔
1845

1846
  code = createSpecialDataBlock(STREAM_DELETE_RESULT, &pInfo->pDelRes);
451✔
1847
  QUERY_CHECK_CODE(code, lino, _error);
451!
1848

1849
  code = blockDataEnsureCapacity(pInfo->pDelRes, pOperator->resultInfo.capacity);
451✔
1850
  QUERY_CHECK_CODE(code, lino, _error);
451!
1851

1852
  pInfo->pUpdated = taosArrayInit(1024, sizeof(SWinKey));
451✔
1853
  QUERY_CHECK_NULL(pInfo->pUpdated, code, lino, _error, terrno);
451!
1854

1855
  pInfo->pCloseTs = taosArrayInit(1024, sizeof(TSKEY));
451✔
1856
  QUERY_CHECK_NULL(pInfo->pCloseTs, code, lino, _error, terrno);
451!
1857

1858
  pInfo->primaryTsCol = ((STargetNode*)pPhyFillNode->pWStartTs)->slotId;
451✔
1859
  pInfo->primarySrcSlotId = ((SColumnNode*)((STargetNode*)pPhyFillNode->pWStartTs)->pExpr)->slotId;
451✔
1860

1861
  int32_t numOfOutputCols = 0;
451✔
1862
  code = extractColMatchInfo(pPhyFillNode->pFillExprs, pPhyFillNode->node.pOutputDataBlockDesc, &numOfOutputCols,
451✔
1863
                             COL_MATCH_FROM_SLOT_ID, &pInfo->matchInfo);
1864
  QUERY_CHECK_CODE(code, lino, _error);
451!
1865

1866
  code = filterInitFromNode((SNode*)pPhyFillNode->node.pConditions, &pOperator->exprSupp.pFilterInfo, 0);
451✔
1867
  QUERY_CHECK_CODE(code, lino, _error);
451!
1868

1869
  pInfo->srcRowIndex = -1;
451✔
1870
  setOperatorInfo(pOperator, "StreamFillOperator", nodeType(pPhyFillNode), false, OP_NOT_OPENED, pInfo,
451✔
1871
                  pTaskInfo);
1872

1873
  if (triggerType == STREAM_TRIGGER_FORCE_WINDOW_CLOSE) {
451✔
1874
    code = initFillOperatorStateBuff(pInfo, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.stateStore, pHandle,
44✔
1875
                              GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
44✔
1876
    QUERY_CHECK_CODE(code, lino, _error);
44!
1877
    pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamForceFillNext, NULL, destroyStreamFillOperatorInfo,
44✔
1878
                                           optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1879
  } else if (triggerType == STREAM_TRIGGER_CONTINUOUS_WINDOW_CLOSE) {
407!
UNCOV
1880
    code = initFillOperatorStateBuff(pInfo, pTaskInfo->streamInfo.pState, &pTaskInfo->storageAPI.stateStore, pHandle,
×
UNCOV
1881
                              GET_TASKID(pTaskInfo), &pTaskInfo->storageAPI);
×
UNCOV
1882
    QUERY_CHECK_CODE(code, lino, _error);
×
1883

UNCOV
1884
    initNonBlockAggSupptor(&pInfo->nbSup, &pInfo->pFillSup->interval, downstream);
×
UNCOV
1885
    code = initStreamBasicInfo(&pInfo->basic, pOperator);
×
1886
    QUERY_CHECK_CODE(code, lino, _error);
×
1887
    pInfo->basic.operatorFlag = opFlag;
×
UNCOV
1888
    if (isFinalOperator(&pInfo->basic)) {
×
1889
      pInfo->nbSup.numOfKeep++;
×
1890
    }
1891
    code = initFillSupRowInfo(pInfo->pFillSup, pInfo->pRes);
×
1892
    QUERY_CHECK_CODE(code, lino, _error);
×
UNCOV
1893
    pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamNonblockFillNext, NULL, destroyStreamNonblockFillOperatorInfo,
×
1894
                                           optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1895
  } else {
1896
    pInfo->pState = NULL;
407✔
1897
    pOperator->fpSet = createOperatorFpSet(optrDummyOpenFn, doStreamFillNext, NULL, destroyStreamFillOperatorInfo,
407✔
1898
                                           optrDefaultBufFn, NULL, optrDefaultGetNextExtFn, NULL);
1899
  }
1900
  setOperatorStreamStateFn(pOperator, streamOpReleaseState, streamOpReloadState);
451✔
1901

1902
  code = appendDownstream(pOperator, &downstream, 1);
451✔
1903
  QUERY_CHECK_CODE(code, lino, _error);
451!
1904

1905
  *pOptrInfo = pOperator;
451✔
1906
  return TSDB_CODE_SUCCESS;
451✔
1907

UNCOV
1908
_error:
×
UNCOV
1909
  qError("%s failed at line %d since %s. task:%s", __func__, lino, tstrerror(code), GET_TASKID(pTaskInfo));
×
1910

UNCOV
1911
  if (pInfo != NULL) destroyStreamFillOperatorInfo(pInfo);
×
UNCOV
1912
  destroyOperatorAndDownstreams(pOperator, &downstream, 1);
×
UNCOV
1913
  pTaskInfo->code = code;
×
UNCOV
1914
  return code;
×
1915
}
STATUS · Troubleshooting · Open an Issue · Sales · Support · CAREERS · ENTERPRISE · START FREE · SCHEDULE DEMO
ANNOUNCEMENTS · TWITTER · TOS & SLA · Supported CI Services · What's a CI service? · Automated Testing

© 2026 Coveralls, Inc